Skip to content

Commit

Permalink
[improve][broker] Avoid using blocking calls for the async method ``c…
Browse files Browse the repository at this point in the history
…heckTopicOwnership`` (apache#15023)
  • Loading branch information
mattisonchao authored and nicklixinyang committed Apr 20, 2022
1 parent 294e517 commit 04185f7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {

public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
return getBundleAsync(topicName)
.thenApply(ownershipCache::checkOwnership);
.thenCompose(ownershipCache::checkOwnershipAsync);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,13 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
* @param bundle namespace bundle
* @return future that will complete with check result
*/
public boolean checkOwnership(NamespaceBundle bundle) {
return getOwnedBundle(bundle) != null;
public CompletableFuture<Boolean> checkOwnershipAsync(NamespaceBundle bundle) {
Optional<CompletableFuture<OwnedBundle>> ownedBundleFuture = getOwnedBundleAsync(bundle);
if (!ownedBundleFuture.isPresent()) {
return CompletableFuture.completedFuture(false);
}
return ownedBundleFuture.get()
.thenApply(bd -> bd != null && bd.isActive());
}

/**
Expand Down Expand Up @@ -279,6 +284,10 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
}
}

public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBundle bundle) {
return Optional.ofNullable(ownedBundlesCache.getIfPresent(bundle));
}

/**
* Disable bundle in local cache and on zk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void testReestablishOwnership() throws Exception {
assertFalse(data3.isDisabled());
assertNotNull(cache.getOwnedBundle(testFullBundle));

assertTrue(cache.checkOwnership(testFullBundle));
assertTrue(cache.checkOwnershipAsync(testFullBundle).get());
assertEquals(data2.getNativeUrl(), selfBrokerUrl);
assertFalse(data2.isDisabled());
assertNotNull(cache.getOwnedBundle(testFullBundle));
Expand Down

0 comments on commit 04185f7

Please sign in to comment.