Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Asynchronously return brokerRegistry.lookupAsync when checking if broker is active(ExtensibleLoadManagerImpl only) #22899

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
return deferGetOwnerRequest(serviceUnit)
return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -622,7 +622,7 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
CompletableFuture<String> getOwnerRequest = dedupeGetOwnerRequest(serviceUnit);

pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit)))
.whenComplete((__, ex) -> {
Expand Down Expand Up @@ -932,44 +932,54 @@ private boolean isTargetBroker(String broker) {
return broker.equals(brokerId);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<String> deferGetOwner(String serviceUnit) {
var future = new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
return future;
}

private CompletableFuture<String> dedupeGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
return getOwnerRequests.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do the broker active check first with the computeIfAbsent lock
requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get())
.thenCompose(brokerLookupData -> {
if (brokerLookupData.isPresent()) {
// The owner broker is active.
// Immediately return the request.
return CompletableFuture.completedFuture(ownerBefore.get());
} else {
// The owner broker is inactive.
// The leader broker should be cleaning up the orphan service units.
// Defer this request til the leader notifies the new ownerships.
return deferGetOwner(serviceUnit);
}
}));
} else {
// The owner broker has not been declared yet.
// The ownership should be in the middle of transferring or assigning.
// Defer this request til the inflight ownership change is complete.
requested.setValue(deferGetOwner(serviceUnit));
}
return requested.getValue();
});
} finally {
var future = requested.getValue();
if (future != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,32 +1620,63 @@ public void testOverrideOrphanStateData()
@Test(priority = 19)
public void testActiveGetOwner() throws Exception {


// set the bundle owner is the broker
// case 1: the bundle owner is empty
String broker = brokerId2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle, null);
assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());

// case 2: the bundle ownership is transferring, and the dst broker is not the channel owner
overrideTableViews(bundle,
new ServiceUnitStateData(Releasing, broker, brokerId1, 1));
assertEquals(Optional.of(broker), channel1.getOwnerAsync(bundle).get());


// case 3: the bundle ownership is transferring, and the dst broker is the channel owner
overrideTableViews(bundle,
new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1));
assertTrue(!channel1.getOwnerAsync(bundle).isDone());

// case 4: the bundle ownership is found
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
// case 5: the owner lookup gets delayed
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);
var delayedFuture = new CompletableFuture();
doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker));
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
delayedFuture.complete(Optional.of(broker));
});


// verify getOwnerAsync times out because the owner is inactive now.
// verify the owner eventually returns in inFlightStateWaitingTimeInMillis.
long start = System.currentTimeMillis();
assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
long elapsed = System.currentTimeMillis() - start;
assertTrue(elapsed < 1000);

// case 6: the owner is inactive
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));

// verify getOwnerAsync times out
start = System.currentTimeMillis();
var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
assertTrue(ex.getCause() instanceof IllegalStateException);
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup(no selected owner) by the leader channel
// case 7: the ownership cleanup(no new owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any(), any());
var leaderChannel = channel1;
Expand All @@ -1669,7 +1700,8 @@ public void testActiveGetOwner() throws Exception {
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel

// case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
Expand All @@ -1694,6 +1726,7 @@ public void testActiveGetOwner() throws Exception {

}


private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)
Expand Down
Loading