提交 e4790f37 编写于 作者: E Easyfan Zheng 提交者: Jia Zhai

Merge Request for #4808: TYPO in C++ client producer method for processing...

Merge Request for #4808: TYPO in C++ client producer method for processing failure case, and add corresponding unit test case. (#4873)

Definitely, this is a typo. This method is dealing with the Failed Message with the GIVEN result, but not a CERTAIN result.

Contribution Checklist
#4808 : TYPO in C++ client producer method for processing failure case
Add c++ client producer failure message unit test case.

UT passed:

BatchMessageTest
(cherry picked from commit b90b4ea1)
上级 ade95423
...@@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer { ...@@ -147,6 +147,9 @@ class PULSAR_PUBLIC Producer {
friend class PulsarWrapper; friend class PulsarWrapper;
ProducerImplBasePtr impl_; ProducerImplBasePtr impl_;
// For unit test case BatchMessageTest::producerFailureResult only
void producerFailMessages(Result result);
}; };
} // namespace pulsar } // namespace pulsar
......
...@@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) { ...@@ -97,4 +97,11 @@ void Producer::flushAsync(FlushCallback callback) {
impl_->flushAsync(callback); impl_->flushAsync(callback);
} }
void Producer::producerFailMessages(Result result) {
if (impl_) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
producerImpl->failPendingMessages(result);
}
}
} // namespace pulsar } // namespace pulsar
...@@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) { ...@@ -256,7 +256,7 @@ void ProducerImpl::failPendingMessages(Result result) {
} }
// this function can handle null pointer // this function can handle null pointer
BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL); BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
} }
void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
......
...@@ -43,6 +43,8 @@ typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr; ...@@ -43,6 +43,8 @@ typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
class PulsarFriend; class PulsarFriend;
class Producer;
struct OpSendMsg { struct OpSendMsg {
Message msg_; Message msg_;
SendCallback sendCallback_; SendCallback sendCallback_;
...@@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase, ...@@ -110,6 +112,8 @@ class ProducerImpl : public HandlerBase,
friend class PulsarFriend; friend class PulsarFriend;
friend class Producer;
friend class BatchMessageContainer; friend class BatchMessageContainer;
virtual void connectionOpened(const ClientConnectionPtr& connection); virtual void connectionOpened(const ClientConnectionPtr& connection);
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include <thread> #include <thread>
#include "LogUtils.h" #include "LogUtils.h"
#include "PulsarFriend.h" #include "PulsarFriend.h"
#include <unistd.h>
#include <functional>
#include "ConsumerTest.h" #include "ConsumerTest.h"
#include "HttpHelper.h" #include "HttpHelper.h"
DECLARE_LOG_OBJECT(); DECLARE_LOG_OBJECT();
...@@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) { ...@@ -55,6 +57,8 @@ static void sendCallBack(Result r, const Message& msg) {
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString()); LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
} }
static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }
static int globalPublishCountSuccess = 0; static int globalPublishCountSuccess = 0;
static int globalPublishCountQueueFull = 0; static int globalPublishCountQueueFull = 0;
...@@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) { ...@@ -914,3 +918,33 @@ TEST(BatchMessageTest, testPartitionedTopics) {
// Number of messages consumed // Number of messages consumed
ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull); ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
} }
TEST(BatchMessageTest, producerFailureResult) {
std::string testName = std::to_string(epochTime) + "testCumulativeAck";
ClientConfiguration clientConfig;
clientConfig.setStatsIntervalInSeconds(100);
Client client(lookupUrl, clientConfig);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
int batchSize = 100;
int numOfMessages = 10000;
ProducerConfiguration conf;
conf.setCompressionType(CompressionZLib);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBatchingMaxPublishDelayMs(50000);
conf.setBlockIfQueueFull(false);
conf.setMaxPendingMessages(10);
Result res = Result::ResultBrokerMetadataError;
client.createProducer(topicName, conf, producer);
Message msg = MessageBuilder().setContent("test").build();
producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
PulsarFriend::producerFailMessages(producer, res);
}
...@@ -56,6 +56,10 @@ class PulsarFriend { ...@@ -56,6 +56,10 @@ class PulsarFriend {
return *producerImpl; return *producerImpl;
} }
static void producerFailMessages(Producer producer, Result result) {
producer.producerFailMessages(result);
}
static ConsumerImpl& getConsumerImpl(Consumer consumer) { static ConsumerImpl& getConsumerImpl(Consumer consumer) {
ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get()); ConsumerImpl* consumerImpl = static_cast<ConsumerImpl*>(consumer.impl_.get());
return *consumerImpl; return *consumerImpl;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册