提交 6b3c7b73 编写于 作者: L lipenghui 提交者: xiaolong.ran

Add support for partitioned topic consumer seek by time. (#5435)

Add support for partitioned topic consumer seek by time.

Call each partition consumer seekAsync() while call partitioned consumer seekAsync()

Update unit tests for consumer.seek().

(cherry picked from commit a95bea60)
上级 370bfc39
......@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
......@@ -145,18 +146,58 @@ public class SubscriptionSeekTest extends BrokerTestBase {
@Test
public void testSeekTimeOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
long timestamp = 1550479732;
admin.topics().createPartitionedTopic(topicName, 2);
final String resetTimeStr = "100s";
final int partitions = 2;
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();
try {
consumer.seek(timestamp);
fail("Should not have succeeded");
} catch (PulsarClientException e) {
// Expected
List<PersistentSubscription> subs = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topicName + TopicName.PARTITIONED_TOPIC_SUFFIX + i).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
assertEquals(topicRef.getSubscriptions().size(), 1);
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
assertNotNull(sub);
subs.add(sub);
}
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
long backlogs = 0;
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}
assertEquals(backlogs, 10);
backlogs = 0;
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}
assertEquals(backlogs, 2);
// Wait for consumer to reconnect
Thread.sleep(1000);
consumer.seek(currentTimestamp - resetTimeInMillis);
backlogs = 0;
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog();
}
assertEquals(backlogs, 10);
}
}
......@@ -589,7 +589,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
return FutureUtil.waitForAll(futures);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册