Skip to content

Commit

Permalink
[fix][broker] Asynchronously return brokerRegistry.lookupAsync when c…
Browse files Browse the repository at this point in the history
…hecking if broker is active(ExtensibleLoadManagerImpl only) (apache#22899)

(cherry picked from commit c2702e9)
(cherry picked from commit 2cf6e51)
  • Loading branch information
heesung-sn committed Jun 27, 2024
1 parent 9ffbffc commit 9ab3f38
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
return deferGetOwnerRequest(serviceUnit)
return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -594,7 +594,7 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
CompletableFuture<String> getOwnerRequest = dedupeGetOwnerRequest(serviceUnit);

pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit)))
.whenComplete((__, ex) -> {
Expand Down Expand Up @@ -891,44 +891,54 @@ private boolean isTargetBroker(String broker) {
return broker.equals(brokerId);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<String> deferGetOwner(String serviceUnit) {
var future = new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
return future;
}

private CompletableFuture<String> dedupeGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
return getOwnerRequests.computeIfAbsent(serviceUnit, k -> {
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do the broker active check first with the computeIfAbsent lock
requested.setValue(brokerRegistry.lookupAsync(ownerBefore.get())
.thenCompose(brokerLookupData -> {
if (brokerLookupData.isPresent()) {
// The owner broker is active.
// Immediately return the request.
return CompletableFuture.completedFuture(ownerBefore.get());
} else {
// The owner broker is inactive.
// The leader broker should be cleaning up the orphan service units.
// Defer this request til the leader notifies the new ownerships.
return deferGetOwner(serviceUnit);
}
}));
} else {
// The owner broker has not been declared yet.
// The ownership should be in the middle of transferring or assigning.
// Defer this request til the inflight ownership change is complete.
requested.setValue(deferGetOwner(serviceUnit));
}
return requested.getValue();
});
} finally {
var future = requested.getValue();
if (future != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,32 +1617,63 @@ public void testOverrideOrphanStateData()
@Test(priority = 19)
public void testActiveGetOwner() throws Exception {


// set the bundle owner is the broker
// case 1: the bundle owner is empty
String broker = brokerId2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle, null);
assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get());

// case 2: the bundle ownership is transferring, and the dst broker is not the channel owner
overrideTableViews(bundle,
new ServiceUnitStateData(Releasing, broker, brokerId1, 1));
assertEquals(Optional.of(broker), channel1.getOwnerAsync(bundle).get());


// case 3: the bundle ownership is transferring, and the dst broker is the channel owner
overrideTableViews(bundle,
new ServiceUnitStateData(Assigning, brokerId1, brokerId2, 1));
assertTrue(!channel1.getOwnerAsync(bundle).isDone());

// case 4: the bundle ownership is found
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
// case 5: the owner lookup gets delayed
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);
var delayedFuture = new CompletableFuture();
doReturn(delayedFuture).when(spyRegistry).lookupAsync(eq(broker));
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
delayedFuture.complete(Optional.of(broker));
});


// verify getOwnerAsync times out because the owner is inactive now.
// verify the owner eventually returns in inFlightStateWaitingTimeInMillis.
long start = System.currentTimeMillis();
assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
long elapsed = System.currentTimeMillis() - start;
assertTrue(elapsed < 1000);

// case 6: the owner is inactive
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));

// verify getOwnerAsync times out
start = System.currentTimeMillis();
var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
assertTrue(ex.getCause() instanceof IllegalStateException);
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup(no selected owner) by the leader channel
// case 7: the ownership cleanup(no new owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any());
var leaderChannel = channel1;
Expand All @@ -1666,7 +1697,8 @@ public void testActiveGetOwner() throws Exception {
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel

// case 8: simulate ownership cleanup(brokerId1 as the new owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
Expand All @@ -1691,6 +1723,7 @@ public void testActiveGetOwner() throws Exception {

}


private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>)
Expand Down

0 comments on commit 9ab3f38

Please sign in to comment.