Skip to content

Commit

Permalink
Merge pull request #33 from launchdarkly/eb/ch76124/thread-priority
Browse files Browse the repository at this point in the history
allow setting specific thread priority
  • Loading branch information
eli-darkly authored May 8, 2020
2 parents add38c0 + 452e8bf commit 24ef1ef
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
28 changes: 23 additions & 5 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,26 @@ public class EventSource implements Closeable {
this.reconnectTime = builder.reconnectTime;
this.maxReconnectTime = builder.maxReconnectTime;
this.backoffResetThreshold = builder.backoffResetThreshold;
ThreadFactory eventsThreadFactory = createThreadFactory("okhttp-eventsource-events");
ThreadFactory eventsThreadFactory = createThreadFactory("okhttp-eventsource-events", builder.threadPriority);
this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory);
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream");
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority);
this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory);
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler);
this.connectionErrorHandler = builder.connectionErrorHandler;
this.readyState = new AtomicReference<>(RAW);
this.client = builder.clientBuilder.build();
}

private ThreadFactory createThreadFactory(final String type) {
final ThreadFactory backingThreadFactory =
Executors.defaultThreadFactory();
private ThreadFactory createThreadFactory(final String type, final Integer threadPriority) {
final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory();
final AtomicLong count = new AtomicLong(0);
return runnable -> {
Thread thread = backingThreadFactory.newThread(runnable);
thread.setName(format(Locale.ROOT, "%s-[%s]-%d", type, name, count.getAndIncrement()));
thread.setDaemon(true);
if (threadPriority != null) {
thread.setPriority(threadPriority);
}
return thread;
};
}
Expand Down Expand Up @@ -512,6 +514,7 @@ public static final class Builder {
private final HttpUrl url;
private final EventHandler handler;
private ConnectionErrorHandler connectionErrorHandler = ConnectionErrorHandler.DEFAULT;
private Integer threadPriority = null;
private Headers headers = Headers.of();
private Proxy proxy;
private Authenticator proxyAuthenticator = null;
Expand Down Expand Up @@ -791,6 +794,21 @@ public Builder connectionErrorHandler(ConnectionErrorHandler handler) {
return this;
}

/**
* Specifies the priority for threads created by {@code EventSource}.
* <p>
* If this is left unset, or set to {@code null}, threads will inherit the default priority
* provided by {@code Executors.defaultThreadFactory()}.
*
* @param threadPriority the thread priority, or null to ue the default
* @return the builder
* @since 2.2.0
*/
public Builder threadPriority(Integer threadPriority) {
this.threadPriority = threadPriority;
return this;
}

/**
* Specifies any type of configuration actions you want to perform on the OkHttpClient builder.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -18,6 +20,7 @@
import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromChunkedString;
import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -369,4 +372,54 @@ public void newLastEventIdIsSentOnNextConnectAttempt() throws Exception {
}
}
}

@Test
public void defaultThreadPriorityIsNotMaximum() throws Exception {
StubServer.Handler streamHandler = stream(CONTENT_TYPE, streamProducerFromString("", false));

ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler();

try (StubServer server = StubServer.start(streamHandler)) {
try (EventSource es = new EventSource.Builder(threadCapturingHandler, server.getUri())
.build()) {
es.start();

Thread handlerThread = threadCapturingHandler.capturedThreads.take();

assertNotEquals(Thread.MAX_PRIORITY, handlerThread.getPriority());
}
}
}

@Test
public void canSetSpecificThreadPriority() throws Exception {
StubServer.Handler streamHandler = stream(CONTENT_TYPE, streamProducerFromString("", false));

ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler();

try (StubServer server = StubServer.start(streamHandler)) {
try (EventSource es = new EventSource.Builder(threadCapturingHandler, server.getUri())
.threadPriority(Thread.MAX_PRIORITY)
.build()) {
es.start();

Thread handlerThread = threadCapturingHandler.capturedThreads.take();

assertEquals(Thread.MAX_PRIORITY, handlerThread.getPriority());
}
}
}

private static class ThreadCapturingHandler implements EventHandler {
final BlockingQueue<Thread> capturedThreads = new LinkedBlockingQueue<>();

public void onOpen() throws Exception {
capturedThreads.add(Thread.currentThread());
}

public void onMessage(String event, MessageEvent messageEvent) throws Exception {}
public void onError(Throwable t) {}
public void onComment(String comment) {}
public void onClosed() {}
}
}

0 comments on commit 24ef1ef

Please sign in to comment.