From 5716cce6d423a0e6c3d4fb83904d21ad1ece801f Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Thu, 24 Oct 2019 21:41:42 -0700 Subject: [PATCH] [pulsar-client] Fix message corruption on OOM for batch messages (#5443) * [pulsar-client] Fix message corruption on OOM for batch messages * remove comments * Address comments: index in local-var + remove lastSerializedMessageIndex var (cherry picked from commit bf9a9019adf3a073a8af6911383f654225a240e1) --- .../impl/BatchMessageContainerImpl.java | 28 ++++++++++++++++--- .../pulsar/client/impl/ProducerImpl.java | 2 +- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 5ee23570225..518e499c304 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -82,11 +82,31 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } private ByteBuf getCompressedBatchMetadataAndPayload() { - for (MessageImpl msg : messages) { + int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); + int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); + + for (int i = 0, n = messages.size(); i < n; i++) { + MessageImpl msg = messages.get(i); PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, - msg.getDataBuffer(), batchedMessageMetadataAndPayload); - msgBuilder.recycle(); + msg.getDataBuffer().markReaderIndex(); + try { + batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, + msg.getDataBuffer(), batchedMessageMetadataAndPayload); + } catch (Throwable th) { + // serializing batch message can corrupt the index of message and batch-message. Reset the index so, + // next iteration doesn't send corrupt message to broker. + for (int j = 0; j <= i; j++) { + MessageImpl previousMsg = messages.get(j); + previousMsg.getDataBuffer().resetReaderIndex(); + } + batchedMessageMetadataAndPayload.writerIndex(batchWriteIndex); + batchedMessageMetadataAndPayload.readerIndex(batchReadIndex); + throw new RuntimeException(th); + } + } + // Recycle messages only once they serialized successfully in batch + for (MessageImpl msg : messages) { + msg.getMessageBuilder().recycle(); } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 4cad806d4a3..7c381e575d6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1286,7 +1286,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private void batchMessageAndSend() { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", topic, producerName, - batchMessageContainer.getNumMessagesInBatch()); + batchMessageContainer.getNumMessagesInBatch()); } if (!batchMessageContainer.isEmpty()) { try { -- GitLab