Skip to content

Commit

Permalink
[improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel an…
Browse files Browse the repository at this point in the history
…d avoid recursive update error (#21282)
  • Loading branch information
heesung-sn authored Oct 11, 2023
1 parent eb9fa63 commit aecdb03
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -67,6 +68,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -97,7 +99,6 @@
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
Expand Down Expand Up @@ -125,9 +126,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String lookupServiceAddress;
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
private ExtensibleLoadManagerImpl loadManager;
private BrokerRegistry brokerRegistry;
Expand Down Expand Up @@ -204,9 +205,8 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) {
this.config = pulsar.getConfig();
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
this.schema = Schema.JSON(ServiceUnitStateData.class);
this.getOwnerRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<String>>newBuilder().build();
this.cleanupJobs = ConcurrentOpenHashMap.<String, CompletableFuture<Void>>newBuilder().build();
this.getOwnerRequests = new ConcurrentHashMap<>();
this.cleanupJobs = new ConcurrentHashMap<>();
this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
Expand Down Expand Up @@ -826,20 +826,28 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
);
return future;
});
}
);
}
}
}

private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
Expand Down Expand Up @@ -1114,24 +1122,34 @@ private void handleBrokerDeletionEvent(String broker) {
}

private void scheduleCleanup(String broker, long delayInSecs) {
cleanupJobs.computeIfAbsent(broker, k -> {
Executor delayed = CompletableFuture
.delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
totalInactiveBrokerCleanupScheduledCnt++;
return CompletableFuture
.runAsync(() -> {
try {
doCleanup(broker);
} catch (Throwable e) {
log.error("Failed to run the cleanup job for the broker {}, "
+ "totalCleanupErrorCnt:{}.",
broker, totalCleanupErrorCnt.incrementAndGet(), e);
} finally {
cleanupJobs.remove(broker);
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
cleanupJobs.computeIfAbsent(broker, k -> {
Executor delayed = CompletableFuture
.delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
totalInactiveBrokerCleanupScheduledCnt++;
var future = CompletableFuture
.runAsync(() -> {
try {
doCleanup(broker);
} catch (Throwable e) {
log.error("Failed to run the cleanup job for the broker {}, "
+ "totalCleanupErrorCnt:{}.",
broker, totalCleanupErrorCnt.incrementAndGet(), e);
}
}
}
, delayed);
});
, delayed);
scheduled.setValue(future);
return future;
});
} finally {
var future = scheduled.getValue();
if (future != null) {
future.whenComplete((v, ex) -> {
cleanupJobs.remove(broker);
});
}
}

log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.",
broker, delayInSecs, cleanupJobs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -88,7 +89,6 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
Expand Down Expand Up @@ -1558,9 +1558,9 @@ public void testOverrideOrphanStateData()
}


private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
return (ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>>)
return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)
FieldUtils.readDeclaredField(channel,
"getOwnerRequests", true);
}
Expand All @@ -1577,9 +1577,9 @@ private static long getLastMetadataSessionEventTimestamp(ServiceUnitStateChannel
FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true);
}

private static ConcurrentOpenHashMap<String, CompletableFuture<Void>> getCleanupJobs(
private static ConcurrentHashMap<String, CompletableFuture<Void>> getCleanupJobs(
ServiceUnitStateChannel channel) throws IllegalAccessException {
return (ConcurrentOpenHashMap<String, CompletableFuture<Void>>)
return (ConcurrentHashMap<String, CompletableFuture<Void>>)
FieldUtils.readField(channel, "cleanupJobs", true);
}

Expand Down

0 comments on commit aecdb03

Please sign in to comment.