Skip to content

Commit

Permalink
Merge pull request #1088 from tdanylchuk/fix-healthcheck-handler-conc…
Browse files Browse the repository at this point in the history
…urrent-register

Concurrent register of custom healthcheck handler in DiscoveryClient
  • Loading branch information
qiangdavidliu authored Jun 25, 2018
2 parents 07876a7 + 2402d25 commit ac6bd9b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public class DiscoveryClient implements EurekaClient {
private final Provider<BackupRegistry> backupRegistryProvider;
private final EurekaTransport eurekaTransport;

private volatile HealthCheckHandler healthCheckHandler;
private final AtomicReference<HealthCheckHandler> healthCheckHandlerRef = new AtomicReference<>();
private volatile Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<>();
private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();
Expand Down Expand Up @@ -633,7 +633,7 @@ public void registerHealthCheckCallback(HealthCheckCallback callback) {
logger.error("Cannot register a listener for instance info since it is null!");
}
if (callback != null) {
healthCheckHandler = new HealthCheckCallbackToHandlerBridge(callback);
healthCheckHandlerRef.set(new HealthCheckCallbackToHandlerBridge(callback));
}
}

Expand All @@ -643,7 +643,7 @@ public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
logger.error("Cannot register a healthcheck handler when instance info is null!");
}
if (healthCheckHandler != null) {
this.healthCheckHandler = healthCheckHandler;
this.healthCheckHandlerRef.set(healthCheckHandler);
// schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
if (instanceInfoReplicator != null) {
instanceInfoReplicator.onDemandUpdate();
Expand Down Expand Up @@ -1414,6 +1414,7 @@ InstanceInfo getInstanceInfo() {

@Override
public HealthCheckHandler getHealthCheckHandler() {
HealthCheckHandler healthCheckHandler = this.healthCheckHandlerRef.get();
if (healthCheckHandler == null) {
if (null != healthCheckHandlerProvider) {
healthCheckHandler = healthCheckHandlerProvider.get();
Expand All @@ -1424,9 +1425,10 @@ public HealthCheckHandler getHealthCheckHandler() {
if (null == healthCheckHandler) {
healthCheckHandler = new HealthCheckCallbackToHandlerBridge(null);
}
this.healthCheckHandlerRef.compareAndSet(null, healthCheckHandler);
}

return healthCheckHandler;
return this.healthCheckHandlerRef.get();
}

/**
Expand Down Expand Up @@ -1491,9 +1493,9 @@ void refreshRegistry() {
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
}

/**
* Fetch the registry information from back up registry if all eureka server
* urls are unreachable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ protected void setupProperties() {
}

protected void setupDiscoveryClient() {
setupDiscoveryClient(30);
client = getSetupDiscoveryClient();
}

protected void setupDiscoveryClient(int renewalIntervalInSecs) {
protected EurekaClient getSetupDiscoveryClient() {
return getSetupDiscoveryClient(30);
}

protected EurekaClient getSetupDiscoveryClient(int renewalIntervalInSecs) {
InstanceInfo instanceInfo = newInstanceInfoBuilder(renewalIntervalInSecs).build();
client = DiscoveryClientResource.setupDiscoveryClient(instanceInfo);
return DiscoveryClientResource.setupDiscoveryClient(instanceInfo);
}

protected InstanceInfo.Builder newInstanceInfoBuilder(int renewalIntervalInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;

/**
* @author Nitesh Kant
*/
Expand Down Expand Up @@ -114,6 +122,63 @@ public void testHandler() throws Exception {
clientImpl.getInstanceInfo().getStatus());
}

@Test
public void shouldRegisterHealthCheckHandlerInConcurrentEnvironment() throws Exception {
HealthCheckHandler myHealthCheckHandler = new MyHealthCheckHandler(InstanceInfo.InstanceStatus.UP);

int testsCount = 20;
int threadsCount = testsCount * 2;
CountDownLatch starterLatch = new CountDownLatch(threadsCount);
CountDownLatch finishLatch = new CountDownLatch(threadsCount);

List<DiscoveryClient> discoveryClients = range(0, testsCount)
.mapToObj(i -> (DiscoveryClient) getSetupDiscoveryClient())
.collect(toList());

Stream<Thread> registerCustomHandlerThreads = discoveryClients.stream().map(client ->
new SimultaneousStarter(starterLatch, finishLatch, () -> client.registerHealthCheck(myHealthCheckHandler)));
Stream<Thread> lazyInitOfDefaultHandlerThreads = discoveryClients.stream().map(client ->
new SimultaneousStarter(starterLatch, finishLatch, client::getHealthCheckHandler));
List<Thread> threads = Stream.concat(registerCustomHandlerThreads, lazyInitOfDefaultHandlerThreads)
.collect(toList());
Collections.shuffle(threads);
threads.forEach(Thread::start);

try {
finishLatch.await();
discoveryClients.forEach(client ->
Assert.assertSame("Healthcheck handler should be custom.", myHealthCheckHandler, client.getHealthCheckHandler()));
} finally {
//cleanup resources
discoveryClients.forEach(DiscoveryClient::shutdown);
}
}

public static class SimultaneousStarter extends Thread {

private final CountDownLatch starterLatch;
private final CountDownLatch finishLatch;
private final Runnable runnable;

public SimultaneousStarter(CountDownLatch starterLatch, CountDownLatch finishLatch, Runnable runnable) {
this.starterLatch = starterLatch;
this.finishLatch = finishLatch;
this.runnable = runnable;
}

@Override
public void run() {
starterLatch.countDown();
try {
starterLatch.await();
runnable.run();
finishLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException("Something went wrong...");
}
}
}

private static class MyHealthCheckCallback implements HealthCheckCallback {

private final boolean health;
Expand Down

0 comments on commit ac6bd9b

Please sign in to comment.