提交 02f9fd3e 编写于 作者: R Rajan Dhabalia 提交者: Sijie Guo

[pulsar-client] Fix broken replication msg to specific cluster (#4930)

上级 bdfefc72
......@@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
......@@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
@Test
public void testReplicatedCluster() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
final String namespace = "pulsar/global/repl";
final String topicName = String.format("persistent://%s/topic1", namespace);
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
admin1.topics().createPartitionedTopic(topicName, 4);
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
byte[] value = "test".getBytes();
// publish message local only
TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
msg.send();
assertEquals(consumer1.receive().getValue(), value);
Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
if (msg2 != null) {
fail("msg should have not been replicated to remote cluster");
}
consumer1.close();
consumer2.close();
producer1.close();
}
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
}
......@@ -1420,6 +1420,9 @@ public class Commands {
if (builder.hasReplicatedFrom()) {
messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
}
if (builder.getReplicateToCount() > 0) {
messageMetadata.addAllReplicateTo(builder.getReplicateToList());
}
if (builder.hasSchemaVersion()) {
messageMetadata.setSchemaVersion(builder.getSchemaVersion());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册