You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Transaction buffer stores aborted transaction IDs to filter messages which are aborted. In order to recover, the Transaction buffer will take snapshots to store the aborted transaction IDs in the bookkeeper, but the size of aborted transaction IDs is not limited. When the size of aborted transaction IDs is bigger than the size that a bookkeeper entry can store, the Transaction buffer needs to store multiple-snapshot into multiple entries to store aborted transaction IDs.
Challenges
Due to compression and incomplete sending, there are some challenges to achieve multiple-snapshot.
Due to broker restart, transaction buffer may only write a part of multiple snapshots.
eg. Transaction buffer needs to write multiple-snapshot(1,2,3). but the transaction buffer only write snapshot 1, 2,and then broker restart.
2. Due to compression, the new snapshot will cover the old snapshot with the same key.
* eg. This will make a multiple-snapshot(1, 2 , 3) may have snapshot 1, 2 writed the second time, and snapshot writed the first time.
Approach
Implement
change aborts from LinkedMap to ConcurrentSkipListMap.
send multiple snapshots with key (topicName-1, topicName-2 .... topicName-end) and send normal snapshots with key (topicName-end).
Only store maxReadPosition into the snapshots with key (topicName-end), and others are earliest.
Goal
Make aborted transaction IDs be sorted by the position of the aborted marker. And then aborts will be a FIFO map.
And then the new snapshot covering the old snapshot will not make an error.
There always is a snapshot with the right maxReadPosition to recover.
Examples
Normal Flow
The first snapshot is taken when new a producer to send message, So there must be a snasphot with key (topicName-end) which has maxReadPosition to recover.
Write incompletely
When transaction IDs are sorted by the position of the aborted marker and transaction IDs have not been deleted from aborts, the txn IDs stored in snapshots are the same for the snapshot same key (Exclude key topicName-end).
Write incompletely and have transaction IDs been removed due to the ledger deleted
Because it is deleted in the order of the position of the aborted marker, no message will be lost when compressing with the new snapshot. There always is a valid maxReadPsoition that can be used to recover.
As you can see in the figure below, the ledger where txn025 is located has been deleted, and the corresponding txn025 have also been removed from aborts. But this does not affect the information in the snapshot.
private CompletableFuture<Void> takeSnapshot() {
changeMaxReadPositionAndAddAbortTimes.set(0);
return takeSnapshotWriter.thenCompose(writer -> {
TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
ArrayList<TransactionBufferSnapshot> snapshots = new ArrayList<>();
synchronized (TopicTransactionBuffer.this) {
List<AbortTxnMetadata> list = new ArrayList<>();
aborts.forEach((k, v) -> {
AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
abortTxnMetadata.setLedgerId(v.getLedgerId());
abortTxnMetadata.setEntryId(v.getEntryId());
list.add(abortTxnMetadata);
});
while (list.size() > maxSize) {
List<AbortTxnMetadata> newList = new ArrayList<>();
while (newList.size() < maxSzie) {
newList.add(list.remove(0));
}
snapshot.setAborts(newList);
snapshot.setTopicName(topic.getName());
snapshots.add(snapshot);
snapshot = new TransactionBufferSnapshot();
}
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
snapshot.setAborts(list);
snapshot.setTopicName(topic.getName());
snapshots.add(snapshot);
}
List <CompletableFuture<Void>> completableFutures = new LinkedList<>();
snapshots.forEach(snapshot_ -> {
completableFutures.add(writer.writeAsync(snapshot_).thenAccept(messageId-> {
this.lastSnapshotTimestamps = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}]Transaction buffer take snapshot success! "
+ "messageId : {}", topic.getName(), messageId);
}
}).exceptionally(e -> {
log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
return null;
}));
});
return FutureUtil.waitForAll(completableFutures);
});
Reject Alternatives
Add a snapshotEntryCounts field in TransactionBufferSnapshot
Add a snapshotEntryCounts field for each transactionBufferSnapshot. For the normal transactionBufferSnapshot, snapshotEntryCount will be set to 1; for the multiple-snapshot, snapshotEntryCount will be set to the number of entries to store the snapshot.
public class TransactionBufferSnapshot {
private String topicName;
private long maxReadPositionLedgerId;
private long maxReadPositionEntryId;
private long snapshotEntryCount;
private List<AbortTxnMetadata> aborts;
}
marked multiple-snapshot with null field
For the multiple-snapshot, we only write the data of aborts and maxRead Position in the front entries without setting topicName . Only set topicName in the last entry. When the reader reads TopicName = null, it means the beginning of a multiple-snapshot, and read topicName! =null is the end of this multiple-snapshot.
API changes
interface Writer<T> {
/**
* Write event to the system topic.
* @param t pulsar event
* @param topic the topicName for the pulsar event
* @return message id
* @throws PulsarClientException exception while write event cause
*/
MessageId write(T t, String Topic) throws PulsarClientException;
/**
* Async write event to the system topic.
* @param t pulsar event
* @param topic the topicName for the pulsar event
* @return message id future
*/
CompletableFuture<MessageId> writeAsync(T t, String topic);
Motivation
Transaction buffer stores aborted transaction IDs to filter messages which are aborted. In order to recover, the Transaction buffer will take snapshots to store the aborted transaction IDs in the bookkeeper, but the size of aborted transaction IDs is not limited. When the size of aborted transaction IDs is bigger than the size that a bookkeeper entry can store, the Transaction buffer needs to store multiple-snapshot into multiple entries to store aborted transaction IDs.
Challenges
Due to compression and incomplete sending, there are some challenges to achieve multiple-snapshot.
2. Due to compression, the new snapshot will cover the old snapshot with the same key. * eg. This will make a multiple-snapshot(1, 2 , 3) may have snapshot 1, 2 writed the second time, and snapshot writed the first time.
Approach
Implement
Goal
Examples
Normal Flow
The first snapshot is taken when new a producer to send message, So there must be a snasphot with key (topicName-end) which has maxReadPosition to recover.
Write incompletely
When transaction IDs are sorted by the position of the aborted marker and transaction IDs have not been deleted from aborts, the txn IDs stored in snapshots are the same for the snapshot same key (Exclude key topicName-end).
Write incompletely and have transaction IDs been removed due to the ledger deleted
Because it is deleted in the order of the position of the aborted marker, no message will be lost when compressing with the new snapshot. There always is a valid maxReadPsoition that can be used to recover.
As you can see in the figure below, the ledger where txn0
25 is located has been deleted, and the corresponding txn025 have also been removed from aborts. But this does not affect the information in the snapshot.Code Implement
handleSnapshot
takeSnapshot
Reject Alternatives
Add a snapshotEntryCounts field in TransactionBufferSnapshot
Add a snapshotEntryCounts field for each transactionBufferSnapshot. For the normal transactionBufferSnapshot, snapshotEntryCount will be set to 1; for the multiple-snapshot, snapshotEntryCount will be set to the number of entries to store the snapshot.
marked multiple-snapshot with null field
For the multiple-snapshot, we only write the data of aborts and maxRead Position in the front entries without setting topicName . Only set topicName in the last entry. When the reader reads TopicName = null, it means the beginning of a multiple-snapshot, and read topicName! =null is the end of this multiple-snapshot.
API changes
Implement
The text was updated successfully, but these errors were encountered: