Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] release orphan replicator after replicator is closed. #21203

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

private volatile boolean isClosed = false;

public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
Expand Down Expand Up @@ -116,6 +118,13 @@ public String getRemoteCluster() {
// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer
// the end result can be disconnect.
public synchronized void startProducer() {
// This method comes from some actives call and may be call again after disconnect
// so here we will first mark isClosed is false
isClosed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the stat is set to closed, it can no longer be recovered to no-closed. If the replicator needs to keep working, it shouldn't be set to closed, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally this is the case. But here, we can call the method startProducer again after call the method disconnect.
In fact, this class can be closed and opened again.
In fact, this class can be closed and opened again.

So we should only set the stat of Replicator to closed when it is exactly is closed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the stat is set to closed, it can no longer be recovered to no-closed. If the replicator needs to keep working, it shouldn't be set to closed, right?

Normally this is the case. But here, we can call the method startProducer again after call the method disconnect.
In fact, this class can be closed and opened again.

Essentially, what we want to do here is to allow the retry thread (checkTopicActiveAndRetryStartProducer) to end normally when the replicator is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we add a stat, it must be meaningful, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui @Technoboy- What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, this class can be closed and opened again.

There is already a state [Stopped, Starting, Started, Stopping] is working on this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is currently no clear indication that this replicator will not be used again after it is disconnect.

Mabey it is not very reasonable to use the close status identifier here, but it can work normally.

I think it is also reasonable to use some other markers as judgment conditions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a definition for this new state isClosed? How is it different from State.Stopped when it is true?

startProducerInternal();
}

public synchronized void startProducerInternal() {
if (STATE_UPDATER.get(this) == State.Stopping) {
long waitTimeMs = backOff.next();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -164,9 +173,14 @@ public synchronized void startProducer() {
}

protected void checkTopicActiveAndRetryStartProducer() {
// if replicator is closed do not retry start producer
if (isClosed) {
log.info("[{}] Do not retry start replicator because of replicator is already closed.", replicatorId);
return;
}
isLocalTopicActive().thenAccept(isTopicActive -> {
if (isTopicActive) {
startProducer();
startProducerInternal();
}
}).exceptionally(ex -> {
log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId,
Expand Down Expand Up @@ -227,6 +241,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

isClosed = true;

if (STATE_UPDATER.get(this) == State.Stopping) {
// Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by
// closeProducerAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,68 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
});
}

@Test
public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Exception {
final String localCluster = "localCluster";
final String remoteCluster = "remoteCluster";
final String topicName = "remoteTopicName";
final String replicatorPrefix = "pulsar.repl";
final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
// Mock services.
final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class);
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
topics.put(topicName, CompletableFuture.completedFuture(Optional.of(localTopic)));
when(broker.executor()).thenReturn(eventLoopGroup);
when(broker.getTopics()).thenReturn(topics);
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
when(broker.pulsar()).thenReturn(pulsar);
when(pulsar.getClient()).thenReturn(localClient);
when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
when(localTopic.getName()).thenReturn(topicName);
when(producerBuilder.topic(any())).thenReturn(producerBuilder);
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
// Mock create producer fail.
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex"));
hanmz marked this conversation as resolved.
Show resolved Hide resolved
when(producerBuilder.createAsync())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex")));
// Make race condition: "retry start producer" and "close replicator".
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
replicatorPrefix, broker, remoteClient);
replicator.startProducer();
replicator.disconnect();

// Verify task will done.
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
AtomicInteger taskCounter = new AtomicInteger();
CountDownLatch checkTaskFinished = new CountDownLatch(1);
eventLoopGroup.execute(() -> {
synchronized (replicator) {
LinkedBlockingQueue taskQueue = WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue");
DefaultPriorityQueue scheduledTaskQueue =
WhiteboxImpl.getInternalState(eventLoopGroup, "scheduledTaskQueue");
taskCounter.set(taskQueue.size() + scheduledTaskQueue.size());
checkTaskFinished.countDown();
}
});
checkTaskFinished.await();
Assert.assertEquals(taskCounter.get(), 0);
});
}

private static class ReplicatorInTest extends AbstractReplicator {

public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
Expand Down
Loading