diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index d71513652e9b8..f7e09a2bec546 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -54,6 +54,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -67,6 +68,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.PulsarClusterMetadataSetup; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -97,7 +99,6 @@ import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -125,9 +126,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; - private final ConcurrentOpenHashMap> getOwnerRequests; + private final Map> getOwnerRequests; private final String lookupServiceAddress; - private final ConcurrentOpenHashMap> cleanupJobs; + private final Map> cleanupJobs; private final StateChangeListeners stateChangeListeners; private ExtensibleLoadManagerImpl loadManager; private BrokerRegistry brokerRegistry; @@ -204,9 +205,8 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) { this.config = pulsar.getConfig(); this.lookupServiceAddress = pulsar.getLookupServiceAddress(); this.schema = Schema.JSON(ServiceUnitStateData.class); - this.getOwnerRequests = ConcurrentOpenHashMap.>newBuilder().build(); - this.cleanupJobs = ConcurrentOpenHashMap.>newBuilder().build(); + this.getOwnerRequests = new ConcurrentHashMap<>(); + this.cleanupJobs = new ConcurrentHashMap<>(); this.stateChangeListeners = new StateChangeListeners(); this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() * 1000; @@ -826,20 +826,28 @@ private boolean isTargetBroker(String broker) { } private CompletableFuture deferGetOwnerRequest(String serviceUnit) { - return getOwnerRequests - .computeIfAbsent(serviceUnit, k -> { - CompletableFuture future = new CompletableFuture<>(); - future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) - .whenComplete((v, e) -> { - if (e != null) { - getOwnerRequests.remove(serviceUnit, future); - log.warn("Failed to getOwner for serviceUnit:{}", - serviceUnit, e); - } + var requested = new MutableObject>(); + try { + return getOwnerRequests + .computeIfAbsent(serviceUnit, k -> { + CompletableFuture future = new CompletableFuture<>(); + requested.setValue(future); + return future; + }); + } finally { + var future = requested.getValue(); + if (future != null) { + future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) + .whenComplete((v, e) -> { + if (e != null) { + getOwnerRequests.remove(serviceUnit, future); + log.warn("Failed to getOwner for serviceUnit:{}", + serviceUnit, e); } - ); - return future; - }); + } + ); + } + } } private CompletableFuture closeServiceUnit(String serviceUnit) { @@ -1114,24 +1122,34 @@ private void handleBrokerDeletionEvent(String broker) { } private void scheduleCleanup(String broker, long delayInSecs) { - cleanupJobs.computeIfAbsent(broker, k -> { - Executor delayed = CompletableFuture - .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); - totalInactiveBrokerCleanupScheduledCnt++; - return CompletableFuture - .runAsync(() -> { - try { - doCleanup(broker); - } catch (Throwable e) { - log.error("Failed to run the cleanup job for the broker {}, " - + "totalCleanupErrorCnt:{}.", - broker, totalCleanupErrorCnt.incrementAndGet(), e); - } finally { - cleanupJobs.remove(broker); + var scheduled = new MutableObject>(); + try { + cleanupJobs.computeIfAbsent(broker, k -> { + Executor delayed = CompletableFuture + .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); + totalInactiveBrokerCleanupScheduledCnt++; + var future = CompletableFuture + .runAsync(() -> { + try { + doCleanup(broker); + } catch (Throwable e) { + log.error("Failed to run the cleanup job for the broker {}, " + + "totalCleanupErrorCnt:{}.", + broker, totalCleanupErrorCnt.incrementAndGet(), e); + } } - } - , delayed); - }); + , delayed); + scheduled.setValue(future); + return future; + }); + } finally { + var future = scheduled.getValue(); + if (future != null) { + future.whenComplete((v, ex) -> { + cleanupJobs.remove(broker); + }); + } + } log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", broker, delayInSecs, cleanupJobs.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index a226df53e12f3..f9893ea3f63dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -60,6 +60,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -88,7 +89,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -1558,9 +1558,9 @@ public void testOverrideOrphanStateData() } - private static ConcurrentOpenHashMap>> getOwnerRequests( + private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { - return (ConcurrentOpenHashMap>>) + return (ConcurrentHashMap>>) FieldUtils.readDeclaredField(channel, "getOwnerRequests", true); } @@ -1577,9 +1577,9 @@ private static long getLastMetadataSessionEventTimestamp(ServiceUnitStateChannel FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true); } - private static ConcurrentOpenHashMap> getCleanupJobs( + private static ConcurrentHashMap> getCleanupJobs( ServiceUnitStateChannel channel) throws IllegalAccessException { - return (ConcurrentOpenHashMap>) + return (ConcurrentHashMap>) FieldUtils.readField(channel, "cleanupJobs", true); }