Skip to content

Commit

Permalink
[fix][broker] fix replicated subscriptions for transactional messages (
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled authored May 13, 2024
1 parent 936afec commit 9fd1b61
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand Down Expand Up @@ -272,10 +273,13 @@ protected TopicStatsHelper initialValue() {

@Getter
protected final TransactionBuffer transactionBuffer;
@Getter
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack =
(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp();

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
// Record the last time max read position is moved forward, unless it's a marker message.
@Getter
private volatile long lastDataMessagePublishedTimestamp = 0;
private volatile long lastMaxReadPositionMovedForwardTimestamp = 0;
@Getter
private final ExecutorService orderedExecutor;

Expand Down Expand Up @@ -410,7 +414,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource());
} else {
Expand Down Expand Up @@ -719,6 +723,10 @@ private void decrementPendingWriteOpsAndCheck() {
}
}

private void updateMaxReadPositionMovedForwardTimestamp() {
lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis();
}

@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
Expand All @@ -727,12 +735,9 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);

if (!publishContext.isMarkerMessage()) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}

// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(),
publishContext.isMarkerMessage());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
private void startNewSnapshot() {
cleanupTimedOutSnapshots();

if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastDataMessagePublishedTimestamp() == 0) {
if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ public interface TransactionBuffer {
/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message.
*/
void syncMaxReadPositionForNormalPublish(PositionImpl position);
void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage);

/**
* Get the can read max position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand Down Expand Up @@ -213,11 +214,17 @@ public TransactionBufferReader newReader(long sequenceId) throws
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;

public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
Expand Down Expand Up @@ -369,8 +376,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final AbortedTxnProcessor snapshotAbortedTxnProcessor;

private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;

public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
Expand All @@ -120,6 +121,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void handleTxnEntry(Entry entry) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
Expand Down Expand Up @@ -290,7 +292,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
// max read position is less than first ongoing transaction message position
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition),
false);
}
}

Expand All @@ -314,7 +317,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
Expand Down Expand Up @@ -361,7 +364,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
removeTxnAndUpdateMaxReadPosition(txnID);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
txnAbortedCounter.increment();
Expand Down Expand Up @@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}

void updateMaxReadPosition(TxnID txnID) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
/**
* remove the specified transaction from ongoing transaction list and update the max read position.
* @param txnID
*/
void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
} else {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
this.changeMaxReadPositionCount.getAndIncrement();
}

/**
* update the max read position. if the new position is greater than the current max read position,
* we will trigger the callback, unless the disableCallback is true.
* Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
* For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
* to trigger the callback.
* @param newPosition new max read position to update.
* @param disableCallback whether disable the callback.
*/
void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
this.maxReadPosition = newPosition;
if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
if (!checkIfNoSnapshot()) {
this.changeMaxReadPositionCount.getAndIncrement();
}
if (!disableCallback) {
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition);
}
}
}

Expand All @@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}

/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
* @param isMarkerMessage whether the message is marker message, in such case, we
* don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
*/
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
// when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
// thread is the same tread, in this time the lastAddConfirm don't content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
this.maxReadPosition = position;
updateMaxReadPosition(position, isMarkerMessage);
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
changeMaxReadPositionCount.incrementAndGet();
updateMaxReadPosition(position, isMarkerMessage);
}
}
}
Expand Down Expand Up @@ -674,6 +704,18 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
}
}

/**
* A functional interface to handle the max read position move forward.
*/
public interface MaxReadPositionCallBack {
/**
* callback method when max read position move forward.
* @param oldPosition the old max read position.
* @param newPosition the new max read position.
*/
void maxReadPositionMovedForward(PositionImpl oldPosition, PositionImpl newPosition);
}

static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand All @@ -42,8 +43,14 @@
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;
private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
if (topic instanceof PersistentTopic) {
this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack();
} else {
this.maxReadPositionCallBack = null;
}
}

@Override
Expand Down Expand Up @@ -91,8 +98,10 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
}

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
//no-op
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ public void testPublishMessage() throws Exception {
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();

/*
* MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder();
* messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setProducerName("producer-name");
Expand All @@ -322,10 +324,10 @@ public void setMetadataFromEntryData(ByteBuf entryData) {
assertEquals(entryData.array(), payload.array());
}
};

topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -40,8 +42,10 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -728,6 +732,21 @@ public void testReplicatedSubscriptionRestApi3() throws Exception {
consumer4.close();
}

/**
* before sending message, we should wait for transaction buffer recover complete,
* or the MaxReadPosition will not move forward when the message is sent, and the
* MaxReadPositionMovedForwardTimestamp will not be updated, then the replication will not be triggered.
* @param topicName
* @throws Exception
*/
private void waitTBRecoverComplete(PulsarService pulsarService, String topicName) throws Exception {
TopicTransactionBufferState buffer = (TopicTransactionBufferState) ((PersistentTopic) pulsarService.getBrokerService()
.getTopic(topicName, false).get().get()).getTransactionBuffer();
Field stateField = TopicTransactionBufferState.class.getDeclaredField("state");
stateField.setAccessible(true);
Awaitility.await().until(() -> !stateField.get(buffer).toString().equals("Initializing"));
}

/**
* Tests replicated subscriptions when replicator producer is closed
*/
Expand Down Expand Up @@ -755,6 +774,9 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
.subscribe();

// send one message to trigger replication
if (config1.isTransactionCoordinatorEnabled()) {
waitTBRecoverComplete(pulsar1, topicName);
}
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
Expand Down Expand Up @@ -917,6 +939,9 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
.statsInterval(0, TimeUnit.SECONDS).build();

Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create();
if (config1.isTransactionCoordinatorEnabled()) {
waitTBRecoverComplete(pulsar1, topicName);
}
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.close();
Expand Down
Loading

0 comments on commit 9fd1b61

Please sign in to comment.