Skip to content

Commit

Permalink
Bounded queues for the EventHandler thread (#58)
Browse files Browse the repository at this point in the history
* Bounded queue for the EventHandler thread

The unbounded queue fronting the 'event' thread can cause trouble when the
EventHandler is unable to keep up with the workload. This can lead to heap
exhaustion, GC issues and failure modes that are generally considered "bad".

Band-aid over this with a semaphore to limit the number of tasks in the queue.
The semaphore is opt-in and disabled by default to avoid any nasty surprises
for folks upgrading.

Also add 'EventSource.awaitClosed()' to allow users to wait for underlying
thread pools to completely shut down. We can't know if it's safe to clean
up resources used by the EventHandler thread if we can't be certain that it
has completely terminated.

* Address checkstyle griping in StubServer

* Fix JavaDoc issue

* Tighten up exception handling

Co-authored-by: Eli Bishop <eli@launchdarkly.com>
  • Loading branch information
thomaslee and eli-darkly authored Jan 10, 2022
1 parent 60665a2 commit cea3f23
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 12 deletions.
49 changes: 43 additions & 6 deletions src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

/**
* Adapted from https://github.com/aslakhellesoy/eventsource-java/blob/master/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
Expand All @@ -15,15 +17,17 @@ class AsyncEventHandler implements EventHandler {
private final Executor executor;
private final EventHandler eventSourceHandler;
private final Logger logger;
final Semaphore semaphore; // visible for tests

AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger) {
AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger, Semaphore semaphore) {
this.executor = executor;
this.eventSourceHandler = eventSourceHandler;
this.logger = logger;
this.semaphore = semaphore;
}

public void onOpen() {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onOpen();
} catch (Exception e) {
Expand All @@ -33,7 +37,7 @@ public void onOpen() {
}

public void onClosed() {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onClosed();
} catch (Exception e) {
Expand All @@ -43,7 +47,7 @@ public void onClosed() {
}

public void onComment(final String comment) {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onComment(comment);
} catch (Exception e) {
Expand All @@ -53,7 +57,7 @@ public void onComment(final String comment) {
}

public void onMessage(final String event, final MessageEvent messageEvent) {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onMessage(event, messageEvent);
} catch (Exception e) {
Expand All @@ -63,7 +67,7 @@ public void onMessage(final String event, final MessageEvent messageEvent) {
}

public void onError(final Throwable error) {
executor.execute(() -> {
execute(() -> {
onErrorInternal(error);
});
}
Expand All @@ -82,4 +86,37 @@ private void onErrorInternal(Throwable error) {
logger.debug("Stack trace: {}", new LazyStackTrace(error));
}
}

private void execute(Runnable task) {
acquire();
try {
executor.execute(() -> {
try {
task.run();
} finally {
release();
}
});
} catch (Exception e) {
// probably a RejectedExecutionException due to pool shutdown
release();
throw e;
}
}

private void acquire() {
if (semaphore != null) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e);
}
}
}

private void release() {
if (semaphore != null) {
semaphore.release();
}
}
}
59 changes: 56 additions & 3 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -95,7 +96,7 @@ public class EventSource implements Closeable {
final Duration maxReconnectTime; // visible for tests
final Duration backoffResetThreshold; // visible for tests
private volatile String lastEventId;
private final AsyncEventHandler handler;
final AsyncEventHandler handler; // visible for tests
private final ConnectionErrorHandler connectionErrorHandler;
private final AtomicReference<ReadyState> readyState;
private final OkHttpClient client;
Expand Down Expand Up @@ -124,7 +125,13 @@ public class EventSource implements Closeable {
this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory);
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority);
this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory);
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger);
Semaphore eventThreadSemaphore;
if (builder.maxEventTasksInFlight > 0) {
eventThreadSemaphore = new Semaphore(builder.maxEventTasksInFlight);
} else {
eventThreadSemaphore = null;
}
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger, eventThreadSemaphore);
this.connectionErrorHandler = builder.connectionErrorHandler == null ?
ConnectionErrorHandler.DEFAULT : builder.connectionErrorHandler;
this.readBufferSize = builder.readBufferSize;
Expand Down Expand Up @@ -216,6 +223,35 @@ public void close() {
}
}
}

/**
* Block until all underlying threads have terminated and resources have been released.
*
* @param timeout maximum time to wait for everything to shut down
* @return {@code true} if all thread pools terminated within the specified timeout, {@code false} otherwise.
* @throws InterruptedException if this thread is interrupted while blocking
*/
public boolean awaitClosed(final Duration timeout) throws InterruptedException {
final long deadline = System.currentTimeMillis() + timeout.toMillis();

if (!eventExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return false;
}

long shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!streamExecutor.awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
}

if (client.dispatcher().executorService() != null) {
shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!client.dispatcher().executorService().awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
}
}

return true;
}

private void closeCurrentStream(ReadyState previousState) {
if (previousState == ReadyState.OPEN) {
Expand Down Expand Up @@ -516,6 +552,7 @@ public static final class Builder {
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private Logger logger = null;
private String loggerBaseName = null;
private int maxEventTasksInFlight = 0;

/**
* Creates a new builder.
Expand Down Expand Up @@ -887,7 +924,23 @@ public Builder loggerBaseName(String loggerBaseName) {
this.loggerBaseName = loggerBaseName;
return this;
}


/**
* Specifies the maximum number of tasks that can be "in-flight" for the thread executing {@link EventHandler}.
* A semaphore will be used to artificially constrain the number of tasks sitting in the queue fronting the
* event handler thread. When this limit is reached the stream thread will block until the backpressure passes.
* <p>
* For backward compatibility reasons the default is "unbounded".
*
* @param maxEventTasksInFlight the maximum number of tasks/messages that can be in-flight for the {@code EventHandler}
* @return the builder
* @since 2.4.0
*/
public Builder maxEventTasksInFlight(int maxEventTasksInFlight) {
this.maxEventTasksInFlight = maxEventTasksInFlight;
return this;
}

/**
* Constructs an {@link EventSource} using the builder's current properties.
* @return the new EventSource instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import com.launchdarkly.eventsource.Stubs.LogItem;
import com.launchdarkly.eventsource.Stubs.TestHandler;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -27,7 +33,7 @@ public AsyncEventHandlerTest() {
executor = Executors.newSingleThreadExecutor();
eventHandler = new TestHandler();
logger = mock(Logger.class);
asyncHandler = new AsyncEventHandler(executor, eventHandler, logger);
asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, null);
}

@After
Expand Down Expand Up @@ -105,4 +111,46 @@ public void errorFromOnErrorIsCaughtAndLogged() {
verify(logger).warn("Caught unexpected error from EventHandler.onError(): " + err2);
verify(logger, times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));
}

@Test
public void backpressureOnQueueFull() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final CountDownLatch latch1 = new CountDownLatch(1);
EventHandler eventHandler = mock(EventHandler.class);
doAnswer(invocation -> {
latch1.await();
return null;
}).doNothing().when(eventHandler).onMessage(anyString(), any(MessageEvent.class));

final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
ExecutorService blockable = Executors.newSingleThreadExecutor();
try {
blockable.execute(() -> {
AsyncEventHandler asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, new Semaphore(1));

asyncHandler.onOpen();

asyncHandler.onMessage("message", new MessageEvent("hello world"));
latch2.countDown();
asyncHandler.onMessage("message", new MessageEvent("goodbye horses"));
latch3.countDown();
});

assertTrue("Expected latch2 to trip", latch2.await(5, TimeUnit.SECONDS));
assertFalse("Expected latch3 not to trip", latch3.await(250, TimeUnit.MILLISECONDS));
latch1.countDown();
assertTrue("Expected latch3 to trip", latch3.await(5, TimeUnit.SECONDS));
} finally {
latch1.countDown();
latch2.countDown();
latch3.countDown();
blockable.shutdown();
assertTrue("Expected background thread to terminate", blockable.awaitTermination(5, TimeUnit.SECONDS));
}
} finally {
executor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -376,4 +378,18 @@ public void customLogger() {
assertSame(myLogger, es.logger);
}
}

@Test
public void defaultEventThreadWorkQueueCapacity() {
try (EventSource es = builder.build()) {
assertNull(es.handler.semaphore);
}
}

@Test
public void eventThreadWorkQueueCapacity() {
try (EventSource es = builder.maxEventTasksInFlight(8).build()) {
assertEquals(8, es.handler.semaphore.availablePermits());
}
}
}
Loading

0 comments on commit cea3f23

Please sign in to comment.