Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] fix replicated subscriptions for transactional messages #22452

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand Down Expand Up @@ -270,10 +271,13 @@ protected TopicStatsHelper initialValue() {

@Getter
protected final TransactionBuffer transactionBuffer;
@Getter
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack =
(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp();

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
// Record the last time max read position is moved forward, unless it's a marker message.
@Getter
private volatile long lastDataMessagePublishedTimestamp = 0;
private volatile long lastMaxReadPositionMovedForwardTimestamp = 0;
@Getter
private final ExecutorService orderedExecutor;

Expand Down Expand Up @@ -407,7 +411,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource());
} else {
Expand Down Expand Up @@ -715,6 +719,10 @@ private void decrementPendingWriteOpsAndCheck() {
}
}

private void updateMaxReadPositionMovedForwardTimestamp() {
lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis();
}

@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
Expand All @@ -723,12 +731,9 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);

if (!publishContext.isMarkerMessage()) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}

// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(),
publishContext.isMarkerMessage());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
private void startNewSnapshot() {
cleanupTimedOutSnapshots();

if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastDataMessagePublishedTimestamp() == 0) {
if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ public interface TransactionBuffer {
/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message.
*/
void syncMaxReadPositionForNormalPublish(PositionImpl position);
void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage);

/**
* Get the can read max position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand Down Expand Up @@ -213,11 +214,17 @@ public TransactionBufferReader newReader(long sequenceId) throws
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;

public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
Expand Down Expand Up @@ -369,8 +376,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final AbortedTxnProcessor snapshotAbortedTxnProcessor;

private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;

public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
Expand All @@ -120,6 +121,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void handleTxnEntry(Entry entry) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
Expand Down Expand Up @@ -290,7 +292,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
// max read position is less than first ongoing transaction message position
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition),
false);
}
}

Expand All @@ -314,7 +317,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
Expand Down Expand Up @@ -361,7 +364,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
txnAbortedCounter.increment();
Expand Down Expand Up @@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}

void updateMaxReadPosition(TxnID txnID) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
/**
* remove the specified transaction from ongoing transaction list and update the max read position.
* @param txnID
*/
void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
} else {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
}

/**
* update the max read position. if the new position is greater than the current max read position,
* we will trigger the callback, unless the disableCallback is true.
* Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
* For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
* to trigger the callback.
* @param newPosition new max read position to update.
* @param disableCallback whether disable the callback.
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
*/
void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
this.maxReadPosition = newPosition;
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
if (!checkIfNoSnapshot()) {
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
}
if (!disableCallback) {
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition);
}
}
}

Expand All @@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}

/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message, in such case, we
* don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
*/
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
// when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
// thread is the same tread, in this time the lastAddConfirm don't content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
this.maxReadPosition = position;
updateMaxReadPosition(position, isMarkerMessage);
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
updateMaxReadPosition(position, isMarkerMessage);
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -674,6 +704,18 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
}
}

/**
* A functional interface to handle the max read position move forward.
*/
public interface MaxReadPositionCallBack {
/**
* callback method when max read position move forward.
* @param oldPosition the old max read position.
* @param newPosition the new max read position.
*/
void maxReadPositionMovedForward(PositionImpl oldPosition, PositionImpl newPosition);
}

static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand All @@ -42,8 +43,14 @@
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
Expand Down Expand Up @@ -91,8 +98,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ public void testPublishMessage() throws Exception {
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();

/*
* MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder();
* messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setProducerName("producer-name");
Expand All @@ -322,10 +324,10 @@ public void setMetadataFromEntryData(ByteBuf entryData) {
assertEquals(entryData.array(), payload.array());
}
};

topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
}

@Test
Expand Down
Loading
Loading