Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try ExtensibleLoadManagerImpl as the default loadManagerClassName #64

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ loadBalancerNamespaceMaximumBundles=128
loadBalancerOverrideBrokerNicSpeedGbps=

# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl

# Name of topic bundle assignment strategy to use
topicBundleAssignmentStrategy=org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2590,7 +2590,7 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
category = CATEGORY_LOAD_BALANCER,
doc = "Name of load manager to use"
)
private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";
private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl";

@FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use")
private String topicBundleAssignmentStrategy =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe

log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}

public static void createTenantIfAbsent(PulsarResources resources, String tenant, String cluster)
throws IOException, InterruptedException, ExecutionException {

Expand All @@ -375,7 +374,7 @@ public static void createTenantIfAbsent(PulsarResources resources, String tenant
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
NamespaceResources namespaceResources = resources.getNamespaceResources();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,9 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker,
boolean force) {
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -691,7 +693,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
timeout, timeoutUnit);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
Expand Down Expand Up @@ -298,7 +297,8 @@ public synchronized void start() throws PulsarServerException {
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName());

PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(),
config.getDefaultNumberOfNamespaceBundles());

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

Expand Down Expand Up @@ -484,28 +484,34 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}
return brokerRegistry.getAvailableBrokersAsync().thenCompose(activeBrokers -> {
if (activeBrokers.isEmpty()) {
return null;
} else {
return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}

return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
}).thenApply(Optional::ofNullable);
return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}
}).thenApply(Optional::ofNullable);
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
Expand Down Expand Up @@ -622,7 +628,7 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
CompletableFuture<String> getOwnerRequest = dedupeGetOwnerRequest(serviceUnit);

pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit)))
.whenComplete((__, ex) -> {
Expand Down Expand Up @@ -932,44 +938,54 @@ private boolean isTargetBroker(String broker) {
return broker.equals(brokerId);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<String> deferGetOwner(String serviceUnit) {
var future = new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
return future;
}

private CompletableFuture<String> dedupeGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
return getOwnerRequests.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do the broker active check first with the computeIfAbsent lock
requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get())
.thenCompose(brokerLookupData -> {
if (brokerLookupData.isPresent()) {
// The owner broker is active.
// Immediately return the request.
return CompletableFuture.completedFuture(ownerBefore.get());
} else {
// The owner broker is inactive.
// The leader broker should be cleaning up the orphan service units.
// Defer this request til the leader notifies the new ownerships.
return deferGetOwner(serviceUnit);
}
}));
} else {
// The owner broker has not been declared yet.
// The ownership should be in the middle of transferring or assigning.
// Defer this request til the inflight ownership change is complete.
requested.setValue(deferGetOwner(serviceUnit));
}
return requested.getValue();
});
} finally {
var future = requested.getValue();
if (future != null) {
Expand Down Expand Up @@ -1008,6 +1024,10 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
if (ex != null) {
log.error("Failed to close topics under bundle:{} in {} ms",
bundle.toString(), unloadBundleTime, ex);
if (!disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
} else {
log.info("Unloading bundle:{} with {} topics completed in {} ms",
bundle, unloadedTopics, unloadBundleTime);
Expand Down Expand Up @@ -1332,11 +1352,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
if (cleaned) {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
}
break;
} else {
try {
Expand All @@ -1347,9 +1362,22 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup",
broker);
return;
}
} catch (Exception e) {
log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker);
return;
}
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;

Expand All @@ -44,6 +43,8 @@
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {

private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
private static final long INIT_TIMEOUT_IN_SECS = 5;


private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
Expand Down Expand Up @@ -123,10 +124,11 @@ public synchronized void start() throws LoadDataStoreException {
public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp = System.currentTimeMillis());
} catch (PulsarClientException e) {
} catch (Exception e) {
tableView = null;
throw new LoadDataStoreException(e);
}
Expand All @@ -137,8 +139,9 @@ public synchronized void startTableView() throws LoadDataStoreException {
public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
} catch (PulsarClientException e) {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1287,7 +1287,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
nsBundle, Optional.empty(), true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Loading
Loading