Skip to content

Commit

Permalink
Use a dedicated ThreadGroup in rest sniffer (#26897)
Browse files Browse the repository at this point in the history
This change adds a dedicated thread group, configures threads with a corresponding thread name and starts all threads as daemon threads.
  • Loading branch information
retoo authored and s1monw committed Oct 12, 2017
1 parent f81ee22 commit ab94150
Showing 1 changed file with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@

import java.io.Closeable;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
Expand All @@ -45,6 +49,7 @@
public class Sniffer implements Closeable {

private static final Log logger = LogFactory.getLog(Sniffer.class);
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";

private final Task task;

Expand Down Expand Up @@ -79,7 +84,8 @@ private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffInterva
this.restClient = restClient;
this.sniffIntervalMillis = sniffIntervalMillis;
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME);
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
scheduleNextRun(0);
}

Expand Down Expand Up @@ -151,4 +157,34 @@ synchronized void shutdown() {
public static SnifferBuilder builder(RestClient restClient) {
return new SnifferBuilder(restClient);
}

private static class SnifferThreadFactory implements ThreadFactory {

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final ThreadFactory originalThreadFactory;

private SnifferThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
this.originalThreadFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return Executors.defaultThreadFactory();
}
});
}

@Override
public Thread newThread(final Runnable r) {
return AccessController.doPrivileged(new PrivilegedAction<Thread>() {
@Override
public Thread run() {
Thread t = originalThreadFactory.newThread(r);
t.setName(namePrefix + "[T#" + threadNumber.getAndIncrement() + "]");
t.setDaemon(true);
return t;
}
});
}
}
}

0 comments on commit ab94150

Please sign in to comment.