Skip to content

Commit

Permalink
apply comment
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Dec 26, 2024
1 parent 356c876 commit 52ab2d1
Showing 1 changed file with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import lombok.Data;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
Expand Down Expand Up @@ -270,17 +271,9 @@ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicNam
final CompletableFuture<Boolean> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
// switch thread to avoid potential metadata thread cost and recursive deadlock
return preparedFuture.thenComposeAsync(inserted -> {
@Data
class PoliciesFutureHolder {
CompletableFuture<Optional<TopicPolicies>> future;

public CompletableFuture<Optional<TopicPolicies>> getFuture() {
return Objects.requireNonNullElseGet(future,
() -> CompletableFuture.failedFuture(
new IllegalStateException("BUG! unexpected topic policy init.")));
}
}
final var policiesFutureHolder = new PoliciesFutureHolder();
final Mutable<CompletableFuture<Optional<TopicPolicies>>> policiesFutureHolder =
new MutableObject<>(CompletableFuture
.failedFuture(new IllegalStateException("BUG! unexpected topic policy init.")));
// NOTICE: avoid using any callback with lock scope to avoid deadlock
policyCacheInitMap.compute(namespace, (___, existingFuture) -> {
if (!inserted || existingFuture != null) {
Expand All @@ -291,14 +284,14 @@ public CompletableFuture<Optional<TopicPolicies>> getFuture() {
case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName);
case LOCAL_ONLY -> policiesCache.get(partitionedTopicName);
});
policiesFutureHolder.setFuture(CompletableFuture.completedFuture(policies));
policiesFutureHolder.setValue(CompletableFuture.completedFuture(policies));
} else {
log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace);
policiesFutureHolder.setFuture(getTopicPoliciesAsync(topicName, type));
policiesFutureHolder.setValue(getTopicPoliciesAsync(topicName, type));
}
return existingFuture;
});
return policiesFutureHolder.getFuture();
return policiesFutureHolder.getValue();
});
}

Expand Down

0 comments on commit 52ab2d1

Please sign in to comment.