diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index c3f25ef3d420d3403acfb85a17f2321267e00207..ae55093367dc69fc58fc58400f552a9560a5bb95 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer { friend class PulsarWrapper; ProducerImplBasePtr impl_; + + // For unit test case BatchMessageTest::producerFailureResult only + void producerFailMessages(Result result); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 1659a21e7878ee8a810a6ebedc8be63d372d32bd..e02f1d8a5bb1f825ca71826e579a864b9f5deb51 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) { impl_->flushAsync(callback); } + +void Producer::producerFailMessages(Result result) { + if (impl_) { + ProducerImpl* producerImpl = static_cast(impl_.get()); + producerImpl->failPendingMessages(result); + } +} } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index bf9e3ac6e4f42482bc3280dd714da9dc4676268b..666df3b27f184741da07a9e76f20c9fc5167f94c 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) { } // this function can handle null pointer - BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL); + BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL); } void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index d7ac60342ae08699324cdb230c540d1544617843..cb2a8a6e8a9902db15531e2a5cf7c47101d5ed65 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -43,6 +43,8 @@ typedef std::shared_ptr MessageCryptoPtr; class PulsarFriend; +class Producer; + struct OpSendMsg { Message msg_; SendCallback sendCallback_; @@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase, friend class PulsarFriend; + friend class Producer; + friend class BatchMessageContainer; virtual void connectionOpened(const ClientConnectionPtr& connection); diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index c1bfe60895f9277343ed05dded310caf8f3b9ee5..62f68cfea61931f8e9007095e40b897053200d81 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -30,6 +30,8 @@ #include #include "LogUtils.h" #include "PulsarFriend.h" +#include +#include #include "ConsumerTest.h" #include "HttpHelper.h" DECLARE_LOG_OBJECT(); @@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) { LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString()); } +static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); } + static int globalPublishCountSuccess = 0; static int globalPublishCountQueueFull = 0; @@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) { // Number of messages consumed ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull); } + +TEST(BatchMessageTest, producerFailureResult) { + std::string testName = std::to_string(epochTime) + "testCumulativeAck"; + + ClientConfiguration clientConfig; + clientConfig.setStatsIntervalInSeconds(100); + + Client client(lookupUrl, clientConfig); + std::string topicName = "persistent://public/default/" + testName; + std::string subName = "subscription-name"; + Producer producer; + + int batchSize = 100; + int numOfMessages = 10000; + ProducerConfiguration conf; + + conf.setCompressionType(CompressionZLib); + conf.setBatchingMaxMessages(batchSize); + conf.setBatchingEnabled(true); + conf.setBatchingMaxPublishDelayMs(50000); + conf.setBlockIfQueueFull(false); + conf.setMaxPendingMessages(10); + + Result res = Result::ResultBrokerMetadataError; + + client.createProducer(topicName, conf, producer); + Message msg = MessageBuilder().setContent("test").build(); + producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res)); + PulsarFriend::producerFailMessages(producer, res); +} diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index 95c49f885f0e0bfcf1e62fad36653f01080d0001..a50bd678c24939a63d404033e7ab6143bd67c485 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -56,6 +56,10 @@ class PulsarFriend { return *producerImpl; } + static void producerFailMessages(Producer producer, Result result) { + producer.producerFailMessages(result); + } + static ConsumerImpl& getConsumerImpl(Consumer consumer) { ConsumerImpl* consumerImpl = static_cast(consumer.impl_.get()); return *consumerImpl;