From 93e3dfaafba5695db393a72989b3bf42285fa5e1 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Mon, 10 May 2021 21:34:59 -0700 Subject: [PATCH 1/4] Bounded queue for the EventHandler thread The unbounded queue fronting the 'event' thread can cause trouble when the EventHandler is unable to keep up with the workload. This can lead to heap exhaustion, GC issues and failure modes that are generally considered "bad". Band-aid over this with a semaphore to limit the number of tasks in the queue. The semaphore is opt-in and disabled by default to avoid any nasty surprises for folks upgrading. Also add 'EventSource.awaitClosed()' to allow users to wait for underlying thread pools to completely shut down. We can't know if it's safe to clean up resources used by the EventHandler thread if we can't be certain that it has completely terminated. --- .../eventsource/AsyncEventHandler.java | 35 +++++++- .../launchdarkly/eventsource/EventSource.java | 59 +++++++++++- .../eventsource/AsyncEventHandlerTest.java | 52 ++++++++++- .../eventsource/EventSourceBuilderTest.java | 16 ++++ .../eventsource/EventSourceHttpTest.java | 90 ++++++++++++++++++- 5 files changed, 245 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java index 21f355e..37b5301 100644 --- a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java +++ b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java @@ -2,6 +2,8 @@ import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; /** * Adapted from https://github.com/aslakhellesoy/eventsource-java/blob/master/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java @@ -15,49 +17,63 @@ class AsyncEventHandler implements EventHandler { private final Executor executor; private final EventHandler eventSourceHandler; private final Logger logger; + final Semaphore semaphore; // visible for tests - AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger) { + AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger, Semaphore semaphore) { this.executor = executor; this.eventSourceHandler = eventSourceHandler; this.logger = logger; + this.semaphore = semaphore; } public void onOpen() { + acquire(); executor.execute(() -> { try { eventSourceHandler.onOpen(); } catch (Exception e) { handleUnexpectedError(e); + } finally { + release(); } }); } public void onClosed() { + acquire(); executor.execute(() -> { try { eventSourceHandler.onClosed(); } catch (Exception e) { handleUnexpectedError(e); + } finally { + release(); } }); } public void onComment(final String comment) { + acquire(); executor.execute(() -> { try { eventSourceHandler.onComment(comment); } catch (Exception e) { handleUnexpectedError(e); + } finally { + release(); } }); } public void onMessage(final String event, final MessageEvent messageEvent) { + acquire(); executor.execute(() -> { try { eventSourceHandler.onMessage(event, messageEvent); } catch (Exception e) { handleUnexpectedError(e); + } finally { + release(); } }); } @@ -82,4 +98,21 @@ private void onErrorInternal(Throwable error) { logger.debug("Stack trace: {}", new LazyStackTrace(error)); } } + + private boolean acquire() { + if (semaphore != null) { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e); + } + } + return true; + } + + private void release() { + if (semaphore != null) { + semaphore.release(); + } + } } \ No newline at end of file diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index 390c834..bf49791 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -92,7 +93,7 @@ public class EventSource implements Closeable { final Duration maxReconnectTime; // visible for tests final Duration backoffResetThreshold; // visible for tests private volatile String lastEventId; - private final AsyncEventHandler handler; + final AsyncEventHandler handler; // visible for tests private final ConnectionErrorHandler connectionErrorHandler; private final AtomicReference readyState; private final OkHttpClient client; @@ -121,7 +122,13 @@ public class EventSource implements Closeable { this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory); ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority); this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory); - this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger); + Semaphore eventThreadSemaphore; + if (builder.maxEventTasksInFlight > 0) { + eventThreadSemaphore = new Semaphore(builder.maxEventTasksInFlight); + } else { + eventThreadSemaphore = null; + } + this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger, eventThreadSemaphore); this.connectionErrorHandler = builder.connectionErrorHandler == null ? ConnectionErrorHandler.DEFAULT : builder.connectionErrorHandler; this.readyState = new AtomicReference<>(RAW); @@ -212,6 +219,35 @@ public void close() { } } } + + /** + * Block until all underlying threads have terminated and resources have been released. + * + * @param timeout maximum time to wait for everything to shut down + * @return {@code true} if all thread pools terminated within the specified timeout, {@code false} otherwise. + * @throws InterruptedException + */ + public boolean awaitClosed(final Duration timeout) throws InterruptedException { + final long deadline = System.currentTimeMillis() + timeout.toMillis(); + + if (!eventExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + return false; + } + + long shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis()); + if (!streamExecutor.awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) { + return false; + } + + if (client.dispatcher().executorService() != null) { + shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis()); + if (!client.dispatcher().executorService().awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) { + return false; + } + } + + return true; + } private void closeCurrentStream(ReadyState previousState) { if (previousState == ReadyState.OPEN) { @@ -512,6 +548,7 @@ public static final class Builder { private OkHttpClient.Builder clientBuilder; private Logger logger = null; private String loggerBaseName = null; + private int maxEventTasksInFlight = 0; /** * Creates a new builder. @@ -858,7 +895,23 @@ public Builder loggerBaseName(String loggerBaseName) { this.loggerBaseName = loggerBaseName; return this; } - + + /** + * Specifies the maximum number of tasks that can be "in-flight" for the thread executing {@link EventHandler}. + * A semaphore will be used to artificially constrain the number of tasks sitting in the queue fronting the + * event handler thread. When this limit is reached the stream thread will block until the backpressure passes. + *

+ * For backward compatibility reasons the default is "unbounded". + * + * @param maxEventTasksInFlight the maximum number of tasks/messages that can be in-flight for the {@code EventHandler} + * @return the builder + * @since 2.4.0 + */ + public Builder maxEventTasksInFlight(int maxEventTasksInFlight) { + this.maxEventTasksInFlight = maxEventTasksInFlight; + return this; + } + /** * Constructs an {@link EventSource} using the builder's current properties. * @return the new EventSource instance diff --git a/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java b/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java index 197dcfb..44f97dc 100644 --- a/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java +++ b/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java @@ -2,16 +2,22 @@ import com.launchdarkly.eventsource.Stubs.LogItem; import com.launchdarkly.eventsource.Stubs.TestHandler; - import org.junit.After; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -27,7 +33,7 @@ public AsyncEventHandlerTest() { executor = Executors.newSingleThreadExecutor(); eventHandler = new TestHandler(); logger = mock(Logger.class); - asyncHandler = new AsyncEventHandler(executor, eventHandler, logger); + asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, null); } @After @@ -105,4 +111,46 @@ public void errorFromOnErrorIsCaughtAndLogged() { verify(logger).warn("Caught unexpected error from EventHandler.onError(): " + err2); verify(logger, times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class)); } + + @Test + public void backpressureOnQueueFull() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final CountDownLatch latch1 = new CountDownLatch(1); + EventHandler eventHandler = mock(EventHandler.class); + doAnswer(invocation -> { + latch1.await(); + return null; + }).doNothing().when(eventHandler).onMessage(anyString(), any(MessageEvent.class)); + + final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch3 = new CountDownLatch(1); + ExecutorService blockable = Executors.newSingleThreadExecutor(); + try { + blockable.execute(() -> { + AsyncEventHandler asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, new Semaphore(1)); + + asyncHandler.onOpen(); + + asyncHandler.onMessage("message", new MessageEvent("hello world")); + latch2.countDown(); + asyncHandler.onMessage("message", new MessageEvent("goodbye horses")); + latch3.countDown(); + }); + + assertTrue("Expected latch2 to trip", latch2.await(5, TimeUnit.SECONDS)); + assertFalse("Expected latch3 not to trip", latch3.await(250, TimeUnit.MILLISECONDS)); + latch1.countDown(); + assertTrue("Expected latch3 to trip", latch3.await(5, TimeUnit.SECONDS)); + } finally { + latch1.countDown(); + latch2.countDown(); + latch3.countDown(); + blockable.shutdown(); + assertTrue("Expected background thread to terminate", blockable.awaitTermination(5, TimeUnit.SECONDS)); + } + } finally { + executor.shutdown(); + } + } } diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java index 483a65d..526bd8e 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java @@ -21,8 +21,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import okhttp3.Authenticator; @@ -357,4 +359,18 @@ public void customLogger() { assertSame(myLogger, es.logger); } } + + @Test + public void defaultEventThreadWorkQueueCapacity() { + try (EventSource es = builder.build()) { + assertNull(es.handler.semaphore); + } + } + + @Test + public void eventThreadWorkQueueCapacity() { + try (EventSource es = builder.maxEventTasksInFlight(8).build()) { + assertEquals(8, es.handler.semaphore.availablePermits()); + } + } } diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java index f472907..ca2d770 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import okhttp3.Headers; @@ -293,7 +294,94 @@ public void eventSourceReadsChunkedResponse() throws Exception { assertEquals(LogItem.closed(), eventSink.awaitLogItem()); } - + + @Test + public void processDataWithFixedQueueSize() throws Exception { + final String body = "data: data-by-itself\n\n" + + "event: event-with-data\n" + + "data: abc\n\n" + + ": this is a comment\n" + + "event: event-with-more-data-and-id\n" + + "id: my-id\n" + + "data: abc\n" + + "data: def\n\n"; + + TestHandler eventSink = new TestHandler(testLogger.getLogger()); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunksFromString(body, 5, Duration.ZERO, true)); + + try (StubServer server = StubServer.start(streamHandler)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .maxEventTasksInFlight(1) + .logger(testLogger.getLogger()) + .build()) { + es.start(); + + assertEquals(LogItem.opened(), eventSink.awaitLogItem()); + + assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec + eventSink.awaitLogItem()); + + assertEquals(LogItem.event("event-with-data", "abc"), + eventSink.awaitLogItem()); + + assertEquals(LogItem.comment("this is a comment"), + eventSink.awaitLogItem()); + + assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"), + eventSink.awaitLogItem()); + + eventSink.assertNoMoreLogItems(); + } + } + + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + } + + @Test(timeout = 15000) + public void canAwaitClosed() throws Exception { + final String body = "data: data-by-itself\n\n" + + "event: event-with-data\n" + + "data: abc\n\n" + + ": this is a comment\n" + + "event: event-with-more-data-and-id\n" + + "id: my-id\n" + + "data: abc\n" + + "data: def\n\n"; + + TestHandler eventSink = new TestHandler(testLogger.getLogger()); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunksFromString(body, 5, Duration.ZERO, true)); + + try (StubServer server = StubServer.start(streamHandler)) { + EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .logger(testLogger.getLogger()) + .build(); + try { + es.start(); + + assertEquals(LogItem.opened(), eventSink.awaitLogItem()); + + assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec + eventSink.awaitLogItem()); + + assertEquals(LogItem.event("event-with-data", "abc"), + eventSink.awaitLogItem()); + + assertEquals(LogItem.comment("this is a comment"), + eventSink.awaitLogItem()); + + assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"), + eventSink.awaitLogItem()); + + eventSink.assertNoMoreLogItems(); + } finally { + es.close(); + assertTrue("Expected close to complete", es.awaitClosed(Duration.ofSeconds(10))); + } + } + + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + } + @Test public void defaultThreadPriorityIsNotMaximum() throws Exception { StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", true)); From ba3a349a22c6bac09754085c2271d2d93b118e0b Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Mon, 10 May 2021 21:39:24 -0700 Subject: [PATCH 2/4] Address checkstyle griping in StubServer --- src/test/java/com/launchdarkly/eventsource/StubServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/launchdarkly/eventsource/StubServer.java b/src/test/java/com/launchdarkly/eventsource/StubServer.java index a673458..1905ea9 100644 --- a/src/test/java/com/launchdarkly/eventsource/StubServer.java +++ b/src/test/java/com/launchdarkly/eventsource/StubServer.java @@ -338,7 +338,7 @@ public void interrupt() { * signals the end of the response. * * @param contentType value for the Content-Type header - * @param streamProducer a {@link StreamProducer} that will provide the response + * @param chunks the queue that chunks have been (or will be) written to * @return the handler */ public static Handler stream(final String contentType, final BlockingQueue chunks) { From cf638ca6baa9b78a5c7a7a0d92820acdf4165431 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Tue, 11 May 2021 01:28:47 -0700 Subject: [PATCH 3/4] Fix JavaDoc issue --- src/main/java/com/launchdarkly/eventsource/EventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index bf49791..d61ed4d 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -225,7 +225,7 @@ public void close() { * * @param timeout maximum time to wait for everything to shut down * @return {@code true} if all thread pools terminated within the specified timeout, {@code false} otherwise. - * @throws InterruptedException + * @throws InterruptedException if this thread is interrupted while blocking */ public boolean awaitClosed(final Duration timeout) throws InterruptedException { final long deadline = System.currentTimeMillis() + timeout.toMillis(); From 1d9710dd8ff83819920354bcb0ee747aa6364928 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Tue, 11 May 2021 08:15:42 -0700 Subject: [PATCH 4/4] Tighten up exception handling --- .../eventsource/AsyncEventHandler.java | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java index 37b5301..7ee66e2 100644 --- a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java +++ b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java @@ -27,59 +27,47 @@ class AsyncEventHandler implements EventHandler { } public void onOpen() { - acquire(); - executor.execute(() -> { + execute(() -> { try { eventSourceHandler.onOpen(); } catch (Exception e) { handleUnexpectedError(e); - } finally { - release(); } }); } public void onClosed() { - acquire(); - executor.execute(() -> { + execute(() -> { try { eventSourceHandler.onClosed(); } catch (Exception e) { handleUnexpectedError(e); - } finally { - release(); } }); } public void onComment(final String comment) { - acquire(); - executor.execute(() -> { + execute(() -> { try { eventSourceHandler.onComment(comment); } catch (Exception e) { handleUnexpectedError(e); - } finally { - release(); } }); } public void onMessage(final String event, final MessageEvent messageEvent) { - acquire(); - executor.execute(() -> { + execute(() -> { try { eventSourceHandler.onMessage(event, messageEvent); } catch (Exception e) { handleUnexpectedError(e); - } finally { - release(); } }); } public void onError(final Throwable error) { - executor.execute(() -> { + execute(() -> { onErrorInternal(error); }); } @@ -99,7 +87,24 @@ private void onErrorInternal(Throwable error) { } } - private boolean acquire() { + private void execute(Runnable task) { + acquire(); + try { + executor.execute(() -> { + try { + task.run(); + } finally { + release(); + } + }); + } catch (Exception e) { + // probably a RejectedExecutionException due to pool shutdown + release(); + throw e; + } + } + + private void acquire() { if (semaphore != null) { try { semaphore.acquire(); @@ -107,7 +112,6 @@ private boolean acquire() { throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e); } } - return true; } private void release() {