Skip to content

Commit

Permalink
[fix][broker] Fix brokers still retry start replication after closed …
Browse files Browse the repository at this point in the history
…the topic (apache#23237)

(cherry picked from commit aee2ee5)
  • Loading branch information
poorbarcode committed Sep 5, 2024
1 parent 9fce4f4 commit 311b6af
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,23 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private boolean isClosed() {
if (closeFutures == null) {
return false;
}
if (closeFutures.notWaitDisconnectClients != null
&& closeFutures.notWaitDisconnectClients.isDone()
&& !closeFutures.notWaitDisconnectClients.isCompletedExceptionally()) {
return true;
}
if (closeFutures.waitDisconnectClients != null
&& closeFutures.waitDisconnectClients.isDone()
&& !closeFutures.waitDisconnectClients.isCompletedExceptionally()) {
return true;
}
return false;
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
Expand All @@ -1697,6 +1714,9 @@ private void disposeTopic(CompletableFuture<?> closeFuture) {

@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
if (isClosed()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
result.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
Expand All @@ -43,6 +44,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1249,4 +1251,46 @@ public void testReplicationCountMetrics() throws Exception {
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}

/**
* This test used to confirm the "start replicator retry task" will be skipped after the topic is closed.
*/
@Test
public void testCloseTopicAfterStartReplicationFailed() throws Exception {
Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache");
fieldTopicNameCache.setAccessible(true);
ConcurrentHashMap<String, TopicName> topicNameCache =
(ConcurrentHashMap<String, TopicName>) fieldTopicNameCache.get(null);
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.topics().createNonPartitionedTopic(topicName);
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make "start replicator" to fail.
AsyncLoadingCache<String, Boolean> existsCache =
WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(), "existsCache");
String path = "/admin/partitioned-topics/" + TopicName.get(topicName).getPersistenceNamingEncoding();
existsCache.put(path, CompletableFuture.completedFuture(true));

// 2.Enable replication and unload topic after failed to start replicator.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
Thread.sleep(3000);
producer1.close();
existsCache.synchronous().invalidate(path);
admin1.topics().unload(topicName);
// Verify: the "start replicator retry task" will be skipped after the topic is closed.
// - Retry delay is "PersistentTopic.POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS": 60s, so wait for 70s.
// - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by
// Replication again.
Thread.sleep(10 * 1000);
topicNameCache.remove(topicName);
Thread.sleep(60 * 1000);
assertTrue(!topicNameCache.containsKey(topicName));

// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
admin1.topics().delete(topicName, false);
}
}

0 comments on commit 311b6af

Please sign in to comment.