Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Remove failed future when topic load error #20561

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jun 12, 2023

fixes: #14941

Motivation

Issue-1(this PR fixes)

When a topic load fails due to timeout ex, Broker will not clean up the exception future, causing other APIs to fail when they touch this future. You can reproduce it by testUnloadNamespaceAfterLoadTopicFailed

Issue-2

If the program is executed as the flow below, there will still be one orphan topic that is not closed.

time create topic timeout check
1 Start to create a topic
2 Open managed ledger, and do recover for every cursor
3 Time out and remove the topic from the cache
4 An orphan topic still is there

I will try to reproduce this scenario later(not in this PR)

Contexts

2023-06-12T06:55:17,479+0000 [bookkeeper-ml-scheduler-OrderedScheduler-21-0] ERROR
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1099) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
	at org.apache.pulsar.broker.service.BrokerService.lambda$unloadServiceUnit$88(BrokerService.java:1951) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:544) ~[io.streamnative-pulsar-common-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:272) ~[io.streamnative-pulsar-common-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.service.BrokerService.unloadServiceUnit(BrokerService.java:1945) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.service.BrokerService.unloadServiceUnit(BrokerService.java:1921) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.namespace.OwnedBundle.lambda$handleUnloadRequest$0(OwnedBundle.java:138) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
	at org.apache.pulsar.broker.namespace.OwnedBundle.handleUnloadRequest(OwnedBundle.java:138) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.namespace.NamespaceService.unloadNamespaceBundle(NamespaceService.java:734) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.namespace.NamespaceService.unloadNamespaceBundle(NamespaceService.java:723) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.namespace.NamespaceService.unloadNamespaceBundle(NamespaceService.java:719) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.admin.impl.NamespacesBase.lambda$internalUnloadNamespaceBundle$51(NamespacesBase.java:1228) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
	at org.apache.pulsar.broker.admin.impl.NamespacesBase.internalUnloadNamespaceBundle(NamespacesBase.java:1211) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at org.apache.pulsar.broker.admin.v2.Namespaces.unloadNamespaceBundle(Namespaces.java:555) ~[io.streamnative-pulsar-broker-2.10.3.5.jar:2.10.3.5]
	at jdk.internal.reflect.GeneratedMethodAccessor131.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]

Modifications

Remove failed future when topic load error.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 12, 2023
@poorbarcode poorbarcode self-assigned this Jun 12, 2023
@poorbarcode poorbarcode added this to the 3.1.0 milestone Jun 12, 2023
@@ -1091,6 +1091,9 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}
});
}
final CompletableFuture<Optional<Topic>> topicFutureToRemove = topicFuture;
topicFuture.whenComplete((ignore, ex) -> topics.remove(topicName.toString(), topicFutureToRemove));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note that you are appending a new whenComplete listener every time this method is called and the object is already in the map.
we have to call "whenComplete" only if the body of the lambda expression in computeIfAbsent is executed.
I know that it sounds ugly, but we must to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli

Ah, sorry. I lost a check: if (ex != null). It only is removed if the future was failed, and we call remove(k, v), and this method will not do incorrect deletions. Could you take a look again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we call whenComplete every time we invoke this method and only the first time we create the object and add it to the Map.

So everytime we enqueue the execution. If this method is called in an hotpath you will see many allocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli

You are right. Thanks, already fixed

@lhotari
Copy link
Member

lhotari commented Jul 5, 2023

this is already covered in #20540 which was created before this PR. It uses a different approach in the failTopicFuture and completeTopicFuture method.
I agree that the approach in this PR could resolve the particular issue, but #20540 targets to cover also the cases where the bundle gets unloaded or the broker gets shutdown while a topic is loading.

@Technoboy- Technoboy- modified the milestones: 3.1.0, 3.2.0 Jul 31, 2023
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Aug 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@lhotari lhotari removed this from the 4.0.0 milestone Oct 14, 2024
@lhotari lhotari added this to the 4.1.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Topic is temporarily unavailable
5 participants