Skip to content

Commit

Permalink
Cleanup resources on web-app shutdown (#1207) (#1251)
Browse files Browse the repository at this point in the history
  • Loading branch information
windmueller authored and troshko111 committed Nov 11, 2019
1 parent 743af8b commit 689b6fa
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public class DiscoveryClient implements EurekaClient {
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;

private TimedSupervisorTask cacheRefreshTask;
private TimedSupervisorTask heartbeatTask;

private final Provider<HealthCheckHandler> healthCheckHandlerProvider;
private final Provider<HealthCheckCallback> healthCheckCallbackProvider;
private final PreRegistrationHandler preRegistrationHandler;
Expand Down Expand Up @@ -622,7 +625,7 @@ public Set<String> getAllKnownRegions() {
*/
@Override
public List<InstanceInfo> getInstancesById(String id) {
List<InstanceInfo> instancesList = new ArrayList<InstanceInfo>();
List<InstanceInfo> instancesList = new ArrayList<>();
for (Application app : this.getApplications()
.getRegisteredApplications()) {
InstanceInfo instanceInfo = app.getByInstanceId(id);
Expand Down Expand Up @@ -926,6 +929,8 @@ public synchronized void shutdown() {
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();

Monitors.unregisterObject(this);

logger.info("Completed shut down of DiscoveryClient");
}
}
Expand Down Expand Up @@ -1269,16 +1274,17 @@ private void initScheduledTasks() {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

Expand All @@ -1288,16 +1294,17 @@ private void initScheduledTasks() {
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);

// InstanceInfo replicator
Expand Down Expand Up @@ -1349,6 +1356,12 @@ private void cancelScheduledTasks() {
if (scheduler != null) {
scheduler.shutdownNow();
}
if (cacheRefreshTask != null) {
cacheRefreshTask.cancel();
}
if (heartbeatTask != null) {
heartbeatTask.cancel();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TimedSupervisorTask extends TimerTask {
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;

private final String name;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
Expand All @@ -41,6 +42,7 @@ public class TimedSupervisorTask extends TimerTask {

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
Expand Down Expand Up @@ -101,4 +103,10 @@ public void run() {
}
}
}

@Override
public boolean cancel() {
Monitors.unregisterObject(name, this);
return super.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public B withClientConfig(EurekaClientConfig clientConfig) {
withReadTimeout(clientConfig.getEurekaServerReadTimeoutSeconds() * 1000);
withMaxConnectionsPerHost(clientConfig.getEurekaServerTotalConnectionsPerHost());
withMaxTotalConnections(clientConfig.getEurekaServerTotalConnections());
withConnectionIdleTimeout(clientConfig.getEurekaConnectionIdleTimeoutSeconds() * 1000);
withConnectionIdleTimeout(clientConfig.getEurekaConnectionIdleTimeoutSeconds());
withEncoder(clientConfig.getEncoderName());
return withDecoder(clientConfig.getDecoderName(), clientConfig.getClientDataAccept());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ApacheHttpClientConnectionCleaner {

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-JerseyClient-Conn-Cleaner" + threadNumber.incrementAndGet());
Thread thread = new Thread(r, "Apache-HttpClient-Conn-Cleaner" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
Expand Down Expand Up @@ -87,6 +87,7 @@ public void run() {
public void shutdown() {
cleanIdle(0);
eurekaConnCleaner.shutdown();
Monitors.unregisterObject(this);
}

public void cleanIdle(long delayMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.conn.ssl.X509HostnameVerifier;
import org.apache.http.impl.conn.SchemeRegistryFactory;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpConnectionParams;
Expand Down Expand Up @@ -74,6 +73,12 @@ public ApacheHttpClient4 getClient() {
public void destroyResources() {
apacheHttpClientConnectionCleaner.shutdown();
apacheHttpClient.destroy();

final Object connectionManager =
jerseyClientConfig.getProperty(ApacheHttpClient4Config.PROPERTY_CONNECTION_MANAGER);
if (connectionManager instanceof MonitoredConnectionManager) {
((MonitoredConnectionManager) connectionManager).shutdown();
}
}

public static class EurekaJerseyClientBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@

public class DiscoveryClientCloseJerseyThreadTest extends AbstractDiscoveryClientTester {

private static final String THREAD_NAME = "Eureka-JerseyClient-Conn-Cleaner";
private static final String JERSEY_THREAD_NAME = "Eureka-JerseyClient-Conn-Cleaner";
private static final String APACHE_THREAD_NAME = "Apache-HttpClient-Conn-Cleaner";

@Test
public void testThreadCount() throws InterruptedException {
assertThat(containsJerseyThread(), equalTo(true));
assertThat(containsClientThread(), equalTo(true));
client.shutdown();
// Give up control for cleaner thread to die
Thread.sleep(5);
assertThat(containsJerseyThread(), equalTo(false));
assertThat(containsClientThread(), equalTo(false));
}

private boolean containsJerseyThread() {
private boolean containsClientThread() {
Set<Thread> threads = Thread.getAllStackTraces().keySet();
for (Thread t : threads) {
if (t.getName().contains(THREAD_NAME)) {
if (t.getName().contains(JERSEY_THREAD_NAME) || t.getName().contains(APACHE_THREAD_NAME)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package com.netflix.eureka;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.util.EurekaMonitors;
import com.netflix.eureka.util.ServoControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,6 +79,8 @@ public void shutdown() {
logger.info("Shutting down ...");
registry.shutdown();
peerEurekaNodes.shutdown();
ServoControl.shutdown();
EurekaMonitors.shutdown();
logger.info("Shut down");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public boolean equals(Object obj) {
public void shutDown() {
batchingDispatcher.shutdown();
nonBatchingDispatcher.shutdown();
replicationClient.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ public void shutdown() {
deltaRetentionTimer.cancel();
evictionTimer.cancel();
renewsLastMin.stop();
responseCache.stop();
}

@com.netflix.servo.annotations.Monitor(name = "numOfElementsinInstanceCache", description = "Number of overrides in the instance Cache", type = DataSourceType.GAUGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void shutdown() {
logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t);
}
numberOfReplicationsLastMin.stop();
timer.cancel();

super.shutdown();
}
Expand Down Expand Up @@ -647,7 +648,7 @@ private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
Expand All @@ -672,6 +673,8 @@ private void replicateInstanceActionsToPeers(Action action, String appName,
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}

Expand All @@ -686,6 +689,8 @@ private void replicateASGInfoToReplicaNodes(final String asgName,
node.statusUpdate(asgName, newStatus);
} catch (Throwable e) {
logger.error("Cannot replicate ASG status information to {}", node.getServiceUrl(), e);
} finally {
CurrentRequestVersion.remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public interface ResponseCache {
* @return compressed payload which contains information about the applications.
*/
byte[] getGZIP(Key key);

/**
* Performs a shutdown of this cache by stopping internal threads and unregistering
* Servo monitors.
*/
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public void run() {
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
Expand Down Expand Up @@ -234,6 +236,12 @@ public byte[] getGZIP(Key key) {
return payload.getGzipped();
}

@Override
public void stop() {
timer.cancel();
Monitors.unregisterObject(this);
}

/**
* Invalidate the cache of a particular application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected Response getVipResponse(String version, String entityName, String acce
);

String payLoad = responseCache.get(cacheKey);
CurrentRequestVersion.remove();

if (payLoad != null) {
logger.debug("Found: {}", entityName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public Response getApplication(@PathParam("version") String version,
);

String payLoad = responseCache.get(cacheKey);
CurrentRequestVersion.remove();

if (payLoad != null) {
logger.debug("Found: {}", appName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
return new ApplicationResource(appId, serverConfig, registry);
try {
return new ApplicationResource(appId, serverConfig, registry);
} finally {
CurrentRequestVersion.remove();
}
}

/**
Expand Down Expand Up @@ -156,6 +160,7 @@ public Response getContainers(@PathParam("version") String version,
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}

Expand Down Expand Up @@ -226,15 +231,18 @@ public Response getContainerDifferential(
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);

if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
final Response response;

if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
response = Response.ok(responseCache.get(cacheKey)).build();
}

CurrentRequestVersion.remove();
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public final class CurrentRequestVersion {

private static final ThreadLocal<Version> CURRENT_REQ_VERSION =
new ThreadLocal<Version>();
new ThreadLocal<>();

private CurrentRequestVersion() {
}
Expand All @@ -48,9 +48,19 @@ public static Version get() {

/**
* Sets the current {@link Version}.
*
* Use {@link #remove()} as soon as the version is no longer required
* in order to purge the ThreadLocal used for storing it.
*/
public static void set(Version version) {
CURRENT_REQ_VERSION.set(version);
}

/**
* Clears the {@link ThreadLocal} used to store the version.
*/
public static void remove() {
CURRENT_REQ_VERSION.remove();
}

}
Loading

0 comments on commit 689b6fa

Please sign in to comment.