diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java deleted file mode 100644 index 84f4eac59489411abb7781b5687714db755880fe..0000000000000000000000000000000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.transaction.buffer.impl; - -import io.netty.buffer.ByteBuf; -import java.util.List; -import java.util.Set; -import java.util.SortedMap; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import lombok.Builder; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData; -import org.apache.pulsar.common.protocol.Markers; -import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; -import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; -import org.apache.pulsar.broker.transaction.buffer.TransactionCursor; -import org.apache.pulsar.broker.transaction.buffer.TransactionMeta; -import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; -import org.apache.pulsar.transaction.impl.common.TxnID; - -/** - * A persistent transaction buffer implementation. - */ -@Slf4j -public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer { - - private TransactionCursor txnCursor; - private ManagedCursor retentionCursor; - - - abstract static class TxnCtx implements PublishContext { - private final long sequenceId; - private final CompletableFuture completableFuture; - private final String producerName; - - TxnCtx(String producerName, long sequenceId, CompletableFuture future) { - this.sequenceId = sequenceId; - this.completableFuture = future; - this.producerName = producerName; - } - - - - @Override - public String getProducerName() { - return this.producerName; - } - - @Override - public long getSequenceId() { - return this.sequenceId; - } - } - - public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService) - throws BrokerServiceException.NamingException, ManagedLedgerException { - super(topic, ledger, brokerService); - this.txnCursor = new TransactionCursorImpl(); - this.retentionCursor = ledger.newNonDurableCursor( - PositionImpl.earliest, "txn-buffer-retention"); - } - - @Override - public CompletableFuture getTransactionMeta(TxnID txnID) { - return txnCursor.getTxnMeta(txnID, false); - } - - @Override - public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { - return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position, - sequenceId)); - } - - private CompletableFuture appendBuffer(TxnID txnID, Position position, long sequenceId) { - return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position)); - } - - @Override - public CompletableFuture openTransactionBufferReader(TxnID txnID, long startSequenceId) { - return txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader); - } - - private CompletableFuture createNewReader(TransactionMeta meta) { - CompletableFuture createReaderFuture = new CompletableFuture<>(); - - try { - PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, ledger); - createReaderFuture.complete(reader); - } catch (TransactionNotSealedException e) { - createReaderFuture.completeExceptionally(e); - } - - return createReaderFuture; - } - - @Builder - final static class Marker { - long sequenceId; - ByteBuf marker; - } - - @Override - public CompletableFuture commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) { - return txnCursor.getTxnMeta(txnID, false) - .thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId)) - .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId)) - .thenCompose(position -> txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID, - position)); - } - - private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) { - if (log.isDebugEnabled()) { - log.debug("Transaction {} create a commit marker", meta.id()); - } - long sequenceId = meta.lastSequenceId() + 1; - MessageIdData messageIdData = MessageIdData.newBuilder() - .setLedgerId(committedAtLedgerId) - .setEntryId(committedAtEntryId) - .build(); - ByteBuf commitMarker = Markers.newTxnCommitMarker(sequenceId, meta.id().getMostSigBits(), - meta.id().getLeastSigBits(), messageIdData); - Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build(); - return marker; - } - - @Override - public CompletableFuture abortTxn(TxnID txnID) { - return txnCursor.getTxnMeta(txnID, false) - .thenApply(meta -> createAbortMarker(meta)) - .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId)) - .thenCompose(position -> txnCursor.abortTxn(txnID)); - } - - private Marker createAbortMarker(TransactionMeta meta) { - if (log.isDebugEnabled()) { - log.debug("Transaction {} create a abort marker", meta.id()); - } - long sequenceId = meta.lastSequenceId() + 1; - ByteBuf abortMarker = Markers.newTxnAbortMarker(sequenceId, meta.id().getMostSigBits(), - meta.id().getLeastSigBits()); - Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build(); - return marker; - } - - - private CompletableFuture publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) { - CompletableFuture publishFuture = new CompletableFuture<>(); - publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) { - @Override - public void completed(Exception e, long ledgerId, long entryId) { - if (e != null) { - publishFuture.completeExceptionally(e); - } else { - publishFuture.complete(PositionImpl.get(ledgerId, entryId)); - } - } - }); - return publishFuture; - } - - @Override - public CompletableFuture purgeTxns(List dataLedgers) { - if (log.isDebugEnabled()) { - log.debug("Begin to purge the ledgers {}", dataLedgers); - } - - List> futures = dataLedgers.stream().map(dataLedger -> cleanTxnsOnLedger(dataLedger)) - .collect(Collectors.toList()); - return FutureUtil.waitForAll(futures).thenCompose(v -> removeCommittedLedgerFromIndex(dataLedgers)); - } - - private CompletableFuture removeCommittedLedgerFromIndex(List dataLedgers) { - List> removeFutures = dataLedgers.stream().map( - dataLedger -> txnCursor.removeTxnsCommittedAtLedger(dataLedger)).collect(Collectors.toList()); - return FutureUtil.waitForAll(removeFutures); - } - - private CompletableFuture cleanTxnsOnLedger(long dataledger) { - if (log.isDebugEnabled()) { - log.debug("Start to clean ledger {}", dataledger); - } - return txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> deleteTxns(txnIDS)); - } - - private CompletableFuture deleteTxns(Set txnIDS) { - if (log.isDebugEnabled()) { - log.debug("Start delete txns {} under ledger", txnIDS); - } - List> futures = txnIDS.stream().map(txnID -> deleteTxn(txnID)) - .collect(Collectors.toList()); - return FutureUtil.waitForAll(futures); - } - - private CompletableFuture deleteTxn(TxnID txnID) { - if (log.isDebugEnabled()) { - log.debug("Start to delete txn {} entries", txnID); - } - return txnCursor.getTxnMeta(txnID, false) - .thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L)) - .thenCompose(longPositionSortedMap -> deleteEntries(longPositionSortedMap, txnID)); - } - - private CompletableFuture deleteEntries(SortedMap entriesMap, TxnID txnID) { - if (log.isDebugEnabled()) { - log.debug("Delete entries {}", entriesMap); - } - List> deleteFutures = entriesMap.values().stream() - .map(position -> asyncDeletePosition(position, txnID)) - .collect(Collectors.toList()); - - return FutureUtil.waitForAll(deleteFutures); - } - - private CompletableFuture asyncDeletePosition(Position position, TxnID txnID) { - if (log.isDebugEnabled()) { - log.debug("Ready to delete position {} for txn {}", position, txnID); - } - CompletableFuture deleteFuture = new CompletableFuture<>(); - retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - if (log.isDebugEnabled()) { - log.debug("Success delete transaction `{}` entry on position {}", txnID, position); - } - deleteFuture.complete(null); - } - - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.error("Failed delete transaction `{}` entry on position {}", txnID, position, exception); - deleteFuture.completeExceptionally(exception); - } - }, null); - - return deleteFuture; - } - - @Override - public CompletableFuture closeAsync() { - return FutureUtil.failedFuture(new UnsupportedOperationException()); - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 760a2fa9de65438a2ea1718287e0787e47266ccd..b88523f04e82ccdb8be23ba45e2ff7dc4b338df6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -87,9 +87,9 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.Schema; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; @@ -767,7 +767,7 @@ public class Commands { } public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType, - ValidationError validationError, Map properties) { + ValidationError validationError, Map properties) { CommandAck.Builder ackBuilder = CommandAck.newBuilder(); ackBuilder.setConsumerId(consumerId); ackBuilder.setAckType(ackType); @@ -779,7 +779,7 @@ public class Commands { if (validationError != null) { ackBuilder.setValidationError(validationError); } - for (Map.Entry e : properties.entrySet()) { + for (Map.Entry e : properties.entrySet()) { ackBuilder.addProperties( PulsarApi.KeyLongValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java index 049176f311c6fc16d8e8954cb5ef63c118612ce3..8be02f1c9f5e7618c8825c7fd608d12ef49e93d1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java @@ -26,7 +26,6 @@ import java.util.Optional; import lombok.SneakyThrows; import lombok.experimental.UtilityClass; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.api.proto.PulsarMarkers; import org.apache.pulsar.common.api.proto.PulsarMarkers.ClusterMessageId; import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType; import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;