diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 0950bddc83327108c4892ec9228b8d869d3e71ec..f2eaca9cdac869098dabffd323b5611b019b8c7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -31,6 +31,7 @@ import io.netty.util.Timeout; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -769,7 +770,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { consumer.close(); } - + @Test(timeOut = testTimeout) public void testDefaultBacklogTTL() throws Exception { @@ -808,5 +809,101 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { assertEquals(subscription.getNumberOfEntriesInBacklog(), 0); } - + + @Test(timeOut = testTimeout) + public void testGetLastMessageId() throws Exception { + String key = "TopicGetLastMessageId"; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 30; + + final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; + final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; + final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; + List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 1. producer connect + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + + // 2. Create consumer + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + + // 3. producer publish messages + for (int i = 0; i < totalMessages; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + } + + MessageId messageId = consumer.getLastMessageId(); + assertTrue(messageId instanceof MultiMessageIdImpl); + MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId; + Map map = multiMessageId.getMap(); + assertEquals(map.size(), 6); + map.forEach((k, v) -> { + log.info("topic: {}, messageId:{} ", k, v.toString()); + assertTrue(v instanceof MessageIdImpl); + MessageIdImpl messageId1 = (MessageIdImpl) v; + if (k.contains(topicName1)) { + assertEquals(messageId1.entryId, totalMessages - 1); + } else if (k.contains(topicName2)) { + assertEquals(messageId1.entryId, totalMessages / 2 - 1); + } else { + assertEquals(messageId1.entryId, totalMessages / 3 - 1); + } + }); + + for (int i = 0; i < totalMessages; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + } + + messageId = consumer.getLastMessageId(); + assertTrue(messageId instanceof MultiMessageIdImpl); + MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId; + Map map2 = multiMessageId2.getMap(); + assertEquals(map2.size(), 6); + map2.forEach((k, v) -> { + log.info("topic: {}, messageId:{} ", k, v.toString()); + assertTrue(v instanceof MessageIdImpl); + MessageIdImpl messageId1 = (MessageIdImpl) v; + if (k.contains(topicName1)) { + assertEquals(messageId1.entryId, totalMessages * 2 - 1); + } else if (k.contains(topicName2)) { + assertEquals(messageId1.entryId, totalMessages - 1); + } else { + assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1); + } + }); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + } + } diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java index 0896cb587671d3f4b872053e0c2fd18bd538d7f6..764a849987e00538eb6a03de1ead5271f7594d24 100644 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java +++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java @@ -158,4 +158,11 @@ public class ConsumerV1Impl implements Consumer { return consumer.unsubscribeAsync(); } + public MessageId getLastMessageId() throws PulsarClientException { + return consumer.getLastMessageId(); + } + + public CompletableFuture getLastMessageIdAsync() { + return consumer.getLastMessageIdAsync(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index deb84abb94db33696a2b6cbf0955c13ca1aae26a..2f823a757ccfaeb0571416b85c51332077f641f6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -367,6 +367,20 @@ public interface Consumer extends Closeable { */ CompletableFuture seekAsync(long timestamp); + /** + * Get the last message id available available for consume. + * + * @return the last message id. + */ + MessageId getLastMessageId() throws PulsarClientException; + + /** + * Get the last message id available available for consume. + * + * @return a future that can be used to track the completion of the operation. + */ + CompletableFuture getLastMessageIdAsync(); + /** * @return Whether the consumer is connected to the broker */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index fe9cf6669661d6299cf19875edc2b89e2d497191..b9ec35906ee139ab82248d976bada3a1f566fda8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -133,9 +133,9 @@ public abstract class ConsumerBase extends HandlerState implements Consumer internalReceive() throws PulsarClientException; + protected abstract Message internalReceive() throws PulsarClientException; - abstract protected CompletableFuture> internalReceiveAsync(); + protected abstract CompletableFuture> internalReceiveAsync(); @Override public Message receive(int timeout, TimeUnit unit) throws PulsarClientException { @@ -165,7 +165,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer internalReceive(int timeout, TimeUnit unit) throws PulsarClientException; + protected abstract Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException; @Override public void acknowledge(Message message) throws PulsarClientException { @@ -241,7 +241,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer doAcknowledge(MessageId messageId, AckType ackType, + protected abstract CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties); @Override @@ -254,7 +254,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer unsubscribeAsync(); + public abstract CompletableFuture unsubscribeAsync(); @Override public void close() throws PulsarClientException { @@ -266,7 +266,20 @@ public abstract class ConsumerBase extends HandlerState implements Consumer closeAsync(); + public abstract CompletableFuture closeAsync(); + + + @Override + public MessageId getLastMessageId() throws PulsarClientException { + try { + return getLastMessageIdAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public abstract CompletableFuture getLastMessageIdAsync(); private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { return SubscriptionType.Shared != type; @@ -292,9 +305,9 @@ public abstract class ConsumerBase extends HandlerState implements Consumer> subscribeFuture() { return subscribeFuture; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 11bb52082252b118b690191d3f4ad9034f89b8b4..b20bfdab8603015f7cad576801a144cceace727f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1481,7 +1481,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle } } - CompletableFuture getLastMessageIdAsync() { + @Override + public CompletableFuture getLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..e6e7557fbb0cf45ff57108f67dda9c659551d678 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import lombok.Getter; +import org.apache.pulsar.client.api.MessageId; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * A MessageId implementation that contains a map of . + * This is useful when MessageId is need for partition/multi-topics/pattern consumer. + * e.g. seek(), ackCumulative(), getLastMessageId(). + */ +public class MultiMessageIdImpl implements MessageId { + @Getter + private Map map; + + MultiMessageIdImpl(Map map) { + this.map = map; + } + + // TODO: Add support for Serialization and Deserialization + // https://github.com/apache/pulsar/issues/4940 + @Override + public byte[] toByteArray() { + throw new NotImplementedException(); + } + + @Override + public int hashCode() { + return Objects.hash(map); + } + + // If all messageId in map are same size, and all bigger/smaller than the other, return valid value. + @Override + public int compareTo(MessageId o) { + if (!(o instanceof MultiMessageIdImpl)) { + throw new IllegalArgumentException( + "expected MultiMessageIdImpl object. Got instance of " + o.getClass().getName()); + } + + MultiMessageIdImpl other = (MultiMessageIdImpl) o; + Map otherMap = other.getMap(); + + if ((map == null || map.isEmpty()) && (otherMap == null || otherMap.isEmpty())) { + return 0; + } + + if (otherMap == null || map == null || otherMap.size() != map.size()) { + throw new IllegalArgumentException("Current size and other size not equals"); + } + + int result = 0; + for (Entry entry : map.entrySet()) { + MessageId otherMessage = otherMap.get(entry.getKey()); + if (otherMessage == null) { + throw new IllegalArgumentException( + "Other MessageId not have topic " + entry.getKey()); + } + + int currentResult = entry.getValue().compareTo(otherMessage); + if (result == 0) { + result = currentResult; + } else if (currentResult == 0) { + continue; + } else if (result != currentResult) { + throw new IllegalArgumentException( + "Different MessageId in Map get different compare result"); + } else { + continue; + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MultiMessageIdImpl)) { + throw new IllegalArgumentException( + "expected MultiMessageIdImpl object. Got instance of " + obj.getClass().getName()); + } + + MultiMessageIdImpl other = (MultiMessageIdImpl) obj; + + try { + return compareTo(other) == 0; + } catch (IllegalArgumentException e) { + return false; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 9130ecf54ca2b32f2886d67e41925058c8841c4a..9ce8c98d0cf5f84d0281d571248d2d526d4d8bfc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -22,7 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; +import com.google.protobuf.MapEntry; import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.ArrayList; @@ -44,6 +47,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; @@ -1060,5 +1064,33 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { return partitionsAutoUpdateTimeout; } + @Override + public CompletableFuture getLastMessageIdAsync() { + CompletableFuture returnFuture = new CompletableFuture<>(); + + Map> messageIdFutures = consumers.entrySet().stream() + .map(entry -> Pair.of(entry.getKey(),entry.getValue().getLastMessageIdAsync())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + CompletableFuture + .allOf(messageIdFutures.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + Builder builder = ImmutableMap.builder(); + messageIdFutures.forEach((key, future) -> { + MessageId messageId; + try { + messageId = future.get(); + } catch(Exception e) { + log.warn("[{}] Exception when topic {} getLastMessageId.", key, e); + messageId = MessageId.earliest; + } + builder.put(key, messageId); + }); + returnFuture.complete(new MultiMessageIdImpl(builder.build())); + }); + + return returnFuture; + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 86869172c64daeb8ae8b531e1110fbaf334d40d3..7a612b1fbaba174db8b37c3ef6918d238183ef29 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -19,8 +19,14 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; +import org.apache.pulsar.client.api.MessageId; import org.testng.annotations.Test; /** @@ -164,4 +170,219 @@ public class MessageIdCompareToTest { assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal"); } + @Test + public void testMultiMessageIdEqual() { + // null + MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); + MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); + assertEquals(null1, null2); + + // empty + MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); + MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); + assertEquals(empty1, empty2); + + // null empty + assertEquals(null1, empty2); + assertEquals(empty2, null1); + + // 1 item + String topic1 = "topicName1"; + MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); + + MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); + MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); + assertEquals(item1, item2); + + // 1 item, empty not equal + assertNotEquals(item1, null1); + assertNotEquals(null1, item1); + + // key not equal + String topic2 = "topicName2"; + MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); + assertNotEquals(item1, item3); + assertNotEquals(item3, item1); + + // value not equal + MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); + assertNotEquals(item1, item4); + assertNotEquals(item4, item1); + + // key value not equal + assertNotEquals(item3, item4); + assertNotEquals(item4, item3); + + // 2 items + Map map1 = Maps.newHashMap(); + Map map2 = Maps.newHashMap(); + map1.put(topic1, messageIdImpl1); + map1.put(topic2, messageIdImpl2); + map2.put(topic2, messageIdImpl2); + map2.put(topic1, messageIdImpl1); + + MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); + MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); + + assertEquals(item5, item6); + + assertNotEquals(item5, null1); + assertNotEquals(item5, empty1); + assertNotEquals(item5, item1); + assertNotEquals(item5, item3); + assertNotEquals(item5, item4); + + assertNotEquals(null1, item5); + assertNotEquals(empty1, item5); + assertNotEquals(item1, item5); + assertNotEquals(item3, item5); + assertNotEquals(item4, item5); + + map2.put(topic1, messageIdImpl3); + MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); + assertNotEquals(item5, item7); + assertNotEquals(item7, item5); + } + + @Test + public void testMultiMessageIdCompareto() { + // null + MultiMessageIdImpl null1 = new MultiMessageIdImpl(null); + MultiMessageIdImpl null2 = new MultiMessageIdImpl(null); + assertEquals(0, null1.compareTo(null2)); + + // empty + MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap()); + MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap()); + assertEquals(0, empty1.compareTo(empty2)); + + // null empty + assertEquals(0, null1.compareTo(empty2)); + assertEquals(0, empty2.compareTo(null1)); + + // 1 item + String topic1 = "topicName1"; + MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); + MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567); + + MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1)); + MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2)); + assertEquals(0, item1.compareTo(item2)); + + // 1 item, empty not equal + try { + item1.compareTo(null1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + null1.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // key not equal + String topic2 = "topicName2"; + MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2)); + try { + item1.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + item3.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // value not equal + MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3)); + assertTrue(item1.compareTo(item4) < 0); + assertTrue(item4.compareTo(item1) > 0); + + // key value not equal + try { + item3.compareTo(item4); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + try { + item4.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + // 2 items + Map map1 = Maps.newHashMap(); + Map map2 = Maps.newHashMap(); + map1.put(topic1, messageIdImpl1); + map1.put(topic2, messageIdImpl2); + map2.put(topic2, messageIdImpl2); + map2.put(topic1, messageIdImpl1); + + MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1); + MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2); + + assertTrue(item5.compareTo(item6) == 0); + + try { + item5.compareTo(null1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(empty1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item1); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item3); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + item5.compareTo(item4); + fail("should throw exception for not comparable"); + } catch (IllegalArgumentException e) { + // expected + } + + map2.put(topic1, messageIdImpl3); + MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2); + + assertTrue(item7.compareTo(item5) > 0); + assertTrue(item5.compareTo(item7) < 0); + + Map map3 = Maps.newHashMap(); + map3.put(topic1, messageIdImpl3); + map3.put(topic2, messageIdImpl3); + MultiMessageIdImpl item8 = new MultiMessageIdImpl(map3); + assertTrue(item8.compareTo(item5) > 0); + assertTrue(item8.compareTo(item7) > 0); + + assertTrue(item5.compareTo(item8) < 0); + assertTrue(item7.compareTo(item8) < 0); + } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java index a666bb02dc16aaaf62c7da37a67573a4e790ad3c..a0b94716e70989428546197777386c2c699099c3 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -530,6 +530,16 @@ public class PulsarConsumerSourceTests { @Override public void resume() { } + + @Override + public MessageId getLastMessageId() throws PulsarClientException { + return null; + } + + @Override + public CompletableFuture getLastMessageIdAsync() { + return null; + } } private static List createMessages(int startIndex, int numMessages) {