提交 d4e953fb 编写于 作者: J Jia Zhai 提交者: xiaolong.ran

expose getLastMessageId method in ConsumerImpl (#4911)

Fixes #4909
### Motivation

It would be good to expose method `getLastMessageId` in `ConsumerImpl` to a public method. 
eg. some times user would like to know the lag messages; or only consume messages before current time.

### Modifications

- expose method `getLastMessageId` in consumer api.
- add unit test.

### Verifying this change
Ut passed

(cherry picked from commit 93d95c74)
上级 c4140688
......@@ -31,6 +31,7 @@ import io.netty.util.Timeout;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
......@@ -769,7 +770,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
consumer.close();
}
@Test(timeOut = testTimeout)
public void testDefaultBacklogTTL() throws Exception {
......@@ -808,5 +809,101 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
assertEquals(subscription.getNumberOfEntriesInBacklog(), 0);
}
@Test(timeOut = testTimeout)
public void testGetLastMessageId() throws Exception {
String key = "TopicGetLastMessageId";
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 30;
final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
admin.tenants().createTenant("prop", new TenantInfo());
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);
// 1. producer connect
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2)
.enableBatching(false)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3)
.enableBatching(false)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();
// 2. Create consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topics(topicNames)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}
MessageId messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map = multiMessageId.getMap();
assertEquals(map.size(), 6);
map.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages / 2 - 1);
} else {
assertEquals(messageId1.entryId, totalMessages / 3 - 1);
}
});
for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}
messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map2 = multiMessageId2.getMap();
assertEquals(map2.size(), 6);
map2.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages * 2 - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages - 1);
} else {
assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1);
}
});
consumer.unsubscribe();
consumer.close();
producer1.close();
producer2.close();
producer3.close();
}
}
......@@ -158,4 +158,11 @@ public class ConsumerV1Impl implements Consumer {
return consumer.unsubscribeAsync();
}
public MessageId getLastMessageId() throws PulsarClientException {
return consumer.getLastMessageId();
}
public CompletableFuture<MessageId> getLastMessageIdAsync() {
return consumer.getLastMessageIdAsync();
}
}
......@@ -367,6 +367,20 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Void> seekAsync(long timestamp);
/**
* Get the last message id available available for consume.
*
* @return the last message id.
*/
MessageId getLastMessageId() throws PulsarClientException;
/**
* Get the last message id available available for consume.
*
* @return a future that can be used to track the completion of the operation.
*/
CompletableFuture<MessageId> getLastMessageIdAsync();
/**
* @return Whether the consumer is connected to the broker
*/
......
......@@ -133,9 +133,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return internalReceiveAsync();
}
abstract protected Message<T> internalReceive() throws PulsarClientException;
protected abstract Message<T> internalReceive() throws PulsarClientException;
abstract protected CompletableFuture<Message<T>> internalReceiveAsync();
protected abstract CompletableFuture<Message<T>> internalReceiveAsync();
@Override
public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
......@@ -165,7 +165,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return internalReceive(timeout, unit);
}
abstract protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
protected abstract Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
@Override
public void acknowledge(Message<?> message) throws PulsarClientException {
......@@ -241,7 +241,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
negativeAcknowledge(message.getMessageId());
}
abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties);
@Override
......@@ -254,7 +254,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
@Override
abstract public CompletableFuture<Void> unsubscribeAsync();
public abstract CompletableFuture<Void> unsubscribeAsync();
@Override
public void close() throws PulsarClientException {
......@@ -266,7 +266,20 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
@Override
abstract public CompletableFuture<Void> closeAsync();
public abstract CompletableFuture<Void> closeAsync();
@Override
public MessageId getLastMessageId() throws PulsarClientException {
try {
return getLastMessageIdAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public abstract CompletableFuture<MessageId> getLastMessageIdAsync();
private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
return SubscriptionType.Shared != type;
......@@ -292,9 +305,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return null;
}
abstract public int getAvailablePermits();
public abstract int getAvailablePermits();
abstract public int numMessagesInQueue();
public abstract int numMessagesInQueue();
public CompletableFuture<Consumer<T>> subscribeFuture() {
return subscribeFuture;
......
......@@ -1464,7 +1464,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
CompletableFuture<MessageId> getLastMessageIdAsync() {
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import lombok.Getter;
import org.apache.pulsar.client.api.MessageId;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* A MessageId implementation that contains a map of <partitionName, MessageId>.
* This is useful when MessageId is need for partition/multi-topics/pattern consumer.
* e.g. seek(), ackCumulative(), getLastMessageId().
*/
public class MultiMessageIdImpl implements MessageId {
@Getter
private Map<String, MessageId> map;
MultiMessageIdImpl(Map<String, MessageId> map) {
this.map = map;
}
// TODO: Add support for Serialization and Deserialization
// https://github.com/apache/pulsar/issues/4940
@Override
public byte[] toByteArray() {
throw new NotImplementedException();
}
@Override
public int hashCode() {
return Objects.hash(map);
}
// If all messageId in map are same size, and all bigger/smaller than the other, return valid value.
@Override
public int compareTo(MessageId o) {
if (!(o instanceof MultiMessageIdImpl)) {
throw new IllegalArgumentException(
"expected MultiMessageIdImpl object. Got instance of " + o.getClass().getName());
}
MultiMessageIdImpl other = (MultiMessageIdImpl) o;
Map<String, MessageId> otherMap = other.getMap();
if ((map == null || map.isEmpty()) && (otherMap == null || otherMap.isEmpty())) {
return 0;
}
if (otherMap == null || map == null || otherMap.size() != map.size()) {
throw new IllegalArgumentException("Current size and other size not equals");
}
int result = 0;
for (Entry<String, MessageId> entry : map.entrySet()) {
MessageId otherMessage = otherMap.get(entry.getKey());
if (otherMessage == null) {
throw new IllegalArgumentException(
"Other MessageId not have topic " + entry.getKey());
}
int currentResult = entry.getValue().compareTo(otherMessage);
if (result == 0) {
result = currentResult;
} else if (currentResult == 0) {
continue;
} else if (result != currentResult) {
throw new IllegalArgumentException(
"Different MessageId in Map get different compare result");
} else {
continue;
}
}
return result;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MultiMessageIdImpl)) {
throw new IllegalArgumentException(
"expected MultiMessageIdImpl object. Got instance of " + obj.getClass().getName());
}
MultiMessageIdImpl other = (MultiMessageIdImpl) obj;
try {
return compareTo(other) == 0;
} catch (IllegalArgumentException e) {
return false;
}
}
}
......@@ -22,7 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import com.google.protobuf.MapEntry;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
......@@ -44,6 +47,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
......@@ -1060,5 +1064,33 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return partitionsAutoUpdateTimeout;
}
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
CompletableFuture<MessageId> returnFuture = new CompletableFuture<>();
Map<String, CompletableFuture<MessageId>> messageIdFutures = consumers.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(),entry.getValue().getLastMessageIdAsync()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
CompletableFuture
.allOf(messageIdFutures.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
Builder<String, MessageId> builder = ImmutableMap.<String, MessageId>builder();
messageIdFutures.forEach((key, future) -> {
MessageId messageId;
try {
messageId = future.get();
} catch(Exception e) {
log.warn("[{}] Exception when topic {} getLastMessageId.", key, e);
messageId = MessageId.earliest;
}
builder.put(key, messageId);
});
returnFuture.complete(new MultiMessageIdImpl(builder.build()));
});
return returnFuture;
}
private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
}
......@@ -19,8 +19,14 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.testng.annotations.Test;
/**
......@@ -164,4 +170,219 @@ public class MessageIdCompareToTest {
assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal");
}
@Test
public void testMultiMessageIdEqual() {
// null
MultiMessageIdImpl null1 = new MultiMessageIdImpl(null);
MultiMessageIdImpl null2 = new MultiMessageIdImpl(null);
assertEquals(null1, null2);
// empty
MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap());
MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap());
assertEquals(empty1, empty2);
// null empty
assertEquals(null1, empty2);
assertEquals(empty2, null1);
// 1 item
String topic1 = "topicName1";
MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567);
MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1));
MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2));
assertEquals(item1, item2);
// 1 item, empty not equal
assertNotEquals(item1, null1);
assertNotEquals(null1, item1);
// key not equal
String topic2 = "topicName2";
MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2));
assertNotEquals(item1, item3);
assertNotEquals(item3, item1);
// value not equal
MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3));
assertNotEquals(item1, item4);
assertNotEquals(item4, item1);
// key value not equal
assertNotEquals(item3, item4);
assertNotEquals(item4, item3);
// 2 items
Map<String, MessageId> map1 = Maps.newHashMap();
Map<String, MessageId> map2 = Maps.newHashMap();
map1.put(topic1, messageIdImpl1);
map1.put(topic2, messageIdImpl2);
map2.put(topic2, messageIdImpl2);
map2.put(topic1, messageIdImpl1);
MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1);
MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2);
assertEquals(item5, item6);
assertNotEquals(item5, null1);
assertNotEquals(item5, empty1);
assertNotEquals(item5, item1);
assertNotEquals(item5, item3);
assertNotEquals(item5, item4);
assertNotEquals(null1, item5);
assertNotEquals(empty1, item5);
assertNotEquals(item1, item5);
assertNotEquals(item3, item5);
assertNotEquals(item4, item5);
map2.put(topic1, messageIdImpl3);
MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2);
assertNotEquals(item5, item7);
assertNotEquals(item7, item5);
}
@Test
public void testMultiMessageIdCompareto() {
// null
MultiMessageIdImpl null1 = new MultiMessageIdImpl(null);
MultiMessageIdImpl null2 = new MultiMessageIdImpl(null);
assertEquals(0, null1.compareTo(null2));
// empty
MultiMessageIdImpl empty1 = new MultiMessageIdImpl(Collections.emptyMap());
MultiMessageIdImpl empty2 = new MultiMessageIdImpl(Collections.emptyMap());
assertEquals(0, empty1.compareTo(empty2));
// null empty
assertEquals(0, null1.compareTo(empty2));
assertEquals(0, empty2.compareTo(null1));
// 1 item
String topic1 = "topicName1";
MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567);
MessageIdImpl messageIdImpl3 = new MessageIdImpl(345L, 456L, 567);
MultiMessageIdImpl item1 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl1));
MultiMessageIdImpl item2 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl2));
assertEquals(0, item1.compareTo(item2));
// 1 item, empty not equal
try {
item1.compareTo(null1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
null1.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
// key not equal
String topic2 = "topicName2";
MultiMessageIdImpl item3 = new MultiMessageIdImpl(Collections.singletonMap(topic2, messageIdImpl2));
try {
item1.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item3.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
// value not equal
MultiMessageIdImpl item4 = new MultiMessageIdImpl(Collections.singletonMap(topic1, messageIdImpl3));
assertTrue(item1.compareTo(item4) < 0);
assertTrue(item4.compareTo(item1) > 0);
// key value not equal
try {
item3.compareTo(item4);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item4.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
// 2 items
Map<String, MessageId> map1 = Maps.newHashMap();
Map<String, MessageId> map2 = Maps.newHashMap();
map1.put(topic1, messageIdImpl1);
map1.put(topic2, messageIdImpl2);
map2.put(topic2, messageIdImpl2);
map2.put(topic1, messageIdImpl1);
MultiMessageIdImpl item5 = new MultiMessageIdImpl(map1);
MultiMessageIdImpl item6 = new MultiMessageIdImpl(map2);
assertTrue(item5.compareTo(item6) == 0);
try {
item5.compareTo(null1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item5.compareTo(empty1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item5.compareTo(item1);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item5.compareTo(item3);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
try {
item5.compareTo(item4);
fail("should throw exception for not comparable");
} catch (IllegalArgumentException e) {
// expected
}
map2.put(topic1, messageIdImpl3);
MultiMessageIdImpl item7 = new MultiMessageIdImpl(map2);
assertTrue(item7.compareTo(item5) > 0);
assertTrue(item5.compareTo(item7) < 0);
Map<String, MessageId> map3 = Maps.newHashMap();
map3.put(topic1, messageIdImpl3);
map3.put(topic2, messageIdImpl3);
MultiMessageIdImpl item8 = new MultiMessageIdImpl(map3);
assertTrue(item8.compareTo(item5) > 0);
assertTrue(item8.compareTo(item7) > 0);
assertTrue(item5.compareTo(item8) < 0);
assertTrue(item7.compareTo(item8) < 0);
}
}
......@@ -530,6 +530,16 @@ public class PulsarConsumerSourceTests {
@Override
public void resume() {
}
@Override
public MessageId getLastMessageId() throws PulsarClientException {
return null;
}
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
return null;
}
}
private static List<Message> createMessages(int startIndex, int numMessages) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册