Skip to content

Commit

Permalink
[improve][txn] Improve Reader in TransactionBuffer to reduce GC press…
Browse files Browse the repository at this point in the history
…ure (#23779)
  • Loading branch information
dao-jun authored Jan 3, 2025
1 parent a6986b1 commit 4a93e25
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.poolMessages(true)
.createAsync()
.thenApply(reader -> {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,16 @@ private CompletableFuture<Void> clearAllSnapshotSegments() {
try {
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var message = wait(reader.readNextAsync(), "read next");
if (topic.getName().equals(message.getValue().getTopicName())) {
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
final String topicName;
final String key;
try {
topicName = message.getValue().getTopicName();
key = message.getKey();
} finally {
message.release();
}
if (topic.getName().equals(topicName)) {
snapshotSegmentsWriter.getFuture().get().write(key, null);
}
}
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ public T readLatest(String topic) throws Exception {
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
if (msg.getValue() != null) {
snapshots.put(msg.getKey(), msg.getValue());
} else {
snapshots.remove(msg.getKey());
try {
if (msg.getKey() != null) {
if (msg.getValue() != null) {
snapshots.put(msg.getKey(), msg.getValue());
} else {
snapshots.remove(msg.getKey());
}
}
} finally {
msg.release();
}
}
return snapshots.get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private void verifySnapshotSegmentsSize(String topic, int size) throws Exception
.createReader(TopicName.get(topic)).get();
int segmentCount = 0;
while (reader.hasMoreEvents()) {
@Cleanup("release")
Message<TransactionBufferSnapshotSegment> message = reader.readNextAsync()
.get(5, TimeUnit.SECONDS);
if (topic.equals(message.getValue().getTopicName())) {
Expand All @@ -364,6 +365,7 @@ private void verifySnapshotSegmentsIndexSize(String topic, int size) throws Exce
.createReader(TopicName.get(topic)).get();
int indexCount = 0;
while (reader.hasMoreEvents()) {
@Cleanup("release")
Message<TransactionBufferSnapshotIndexes> message = reader.readNextAsync()
.get(5, TimeUnit.SECONDS);
if (topic.equals(message.getValue().getTopicName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ public void testTransactionBufferIndexSystemTopic() throws Exception {
indexesWriter.write(SNAPSHOT_INDEX, transactionBufferTransactionBufferSnapshotIndexes);

assertTrue(indexesReader.hasMoreEvents());
transactionBufferTransactionBufferSnapshotIndexes = indexesReader.readNext().getValue();
@Cleanup("release")
Message<TransactionBufferSnapshotIndexes> message = indexesReader.readNext();
transactionBufferTransactionBufferSnapshotIndexes = message.getValue();
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(), SNAPSHOT_INDEX);
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(), 5);
assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());
Expand Down

0 comments on commit 4a93e25

Please sign in to comment.