Skip to content

Commit

Permalink
[fix][broker] Fix MessageDeduplication replay timeout cause topic loa…
Browse files Browse the repository at this point in the history
…ding stuck (#23004)

Co-authored-by: fanjianye <fanjianye@bigo.sg>
  • Loading branch information
TakaHiR07 and fanjianye authored Jul 5, 2024
1 parent 8351c07 commit 41ef3f6
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
CompletableFuture<Position> future = new CompletableFuture<>();
replayCursor(future);
return future.thenAccept(lastPosition -> {
return future.thenCompose(lastPosition -> {
if (lastPosition != null && snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(lastPosition);
return takeSnapshot(lastPosition);
}
return CompletableFuture.completedFuture(null);
});
}

Expand Down Expand Up @@ -438,13 +439,15 @@ public void resetHighestSequenceIdPushed() {
}
}

private void takeSnapshot(Position position) {
private CompletableFuture<Void> takeSnapshot(Position position) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}

if (!snapshotTaking.compareAndSet(false, true)) {
return;
future.complete(null);
return future;
}

Map<String, Long> snapshot = new TreeMap<>();
Expand All @@ -462,14 +465,17 @@ public void markDeleteComplete(Object ctx) {
}
lastSnapshotTimestamp = System.currentTimeMillis();
snapshotTaking.set(false);
future.complete(null);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
snapshotTaking.set(false);
future.completeExceptionally(exception);
}
}, null);
return future;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -33,12 +36,18 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -529,6 +538,101 @@ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro
persistentTopic.checkDeduplicationSnapshot();
}

@Test
public void testFinishTakeSnapshotWhenTopicLoading() throws Exception {
cleanup();
setup();

// Create a topic and wait deduplication is started.
int brokerDeduplicationEntriesInterval = 1000;
pulsar.getConfiguration().setBrokerDeduplicationEnabled(true);
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topic);
final PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
Awaitility.await().untilAsserted(() -> {
ManagedCursorImpl cursor1 =
(ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
assertNotNull(cursor1);
});
final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication();


// Send 999 messages, it is less than "brokerDeduplicationEntriesInterval".
// So it would not trigger takeSnapshot
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).enableBatching(false).create();
for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
producer.send(i + "");
}
producer.close();
int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter");
assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);


// Unload and load topic, simulate topic load is timeout.
// SetBrokerDeduplicationEntriesInterval to 10, therefore recoverSequenceIdsMap#takeSnapshot
// would trigger and should update the snapshot position.
// However, if topic close and takeSnapshot are concurrent,
// it would result in takeSnapshot throw exception
admin.topics().unload(topic);
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10);

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1);
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(2 * 1000, (op, path) -> {
if (mlPath.equals(path)) {
return true;
}
return false;
});

Field field2 = BrokerService.class.getDeclaredField("topics");
field2.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>)
field2.get(pulsar.getBrokerService());

try {
pulsar.getBrokerService().getTopic(topic, false).join().get();
Assert.fail();
} catch (Exception e) {
// topic loading should timeout.
}
Awaitility.await().untilAsserted(() -> {
// topic loading timeout then close topic and remove from topicsMap
Assert.assertFalse(topics.containsKey(topic));
});


// Load topic again, setBrokerDeduplicationEntriesInterval to 10000,
// make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot.
// But actually it should not replay again in recoverSequenceIdsMap,
// since previous topic loading should finish the replay process.
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000);
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60);
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger();
MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication();

Awaitility.await().untilAsserted(() -> {
int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
Assert.assertEquals(snapshotCounter3, 0);
Assert.assertEquals(ml2.getLedgersInfo().size(), 1);
});


// cleanup.
admin.topics().delete(topic);
cleanup();
setup();
}

private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);
Expand Down

0 comments on commit 41ef3f6

Please sign in to comment.