From d4143cacc6a42e165a8759cb541a4c43e4cf5323 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 18 Jun 2020 10:27:28 -0700 Subject: [PATCH] fix shutdown state logic, simplify code paths (#40) --- build.gradle | 4 +- .../eventsource/ConnectionErrorHandler.java | 20 +- .../launchdarkly/eventsource/EventSource.java | 241 +++++++++--------- .../EventSourceHttpReconnectTest.java | 156 +++++++----- .../eventsource/EventSourceHttpTest.java | 87 ++++++- .../launchdarkly/eventsource/StubServer.java | 23 +- .../eventsource/StubServerTest.java | 64 +++++ .../com/launchdarkly/eventsource/Stubs.java | 2 +- 8 files changed, 398 insertions(+), 199 deletions(-) create mode 100644 src/test/java/com/launchdarkly/eventsource/StubServerTest.java diff --git a/build.gradle b/build.gradle index f01d616..d508d44 100644 --- a/build.gradle +++ b/build.gradle @@ -102,7 +102,9 @@ jacocoTestCoverageVerification { violationRules { rules -> def knownMissedLinesForMethods = [ // The key for each of these items is the complete method signature minus the "com.launchdarkly.eventsource." prefix. - "EventSource.connect()": 12, + "EventSource.handleSuccessfulResponse(okhttp3.Response)": 2, + "EventSource.maybeReconnectDelay(int, long)": 2, + "EventSource.run()": 3, "EventSource.Builder.createInitialClientBuilder()": 1, "EventSource.Builder.defaultTrustManager()": 2, "SLF4JLogger.error(java.lang.String)": 2, diff --git a/src/main/java/com/launchdarkly/eventsource/ConnectionErrorHandler.java b/src/main/java/com/launchdarkly/eventsource/ConnectionErrorHandler.java index f663ce1..85fe572 100644 --- a/src/main/java/com/launchdarkly/eventsource/ConnectionErrorHandler.java +++ b/src/main/java/com/launchdarkly/eventsource/ConnectionErrorHandler.java @@ -1,9 +1,16 @@ package com.launchdarkly.eventsource; +import java.io.EOFException; + /** - * Interface for an object that will be notified when EventSource encounters a connection failure. - * This is different from {@link EventHandler#onError(Throwable)} in that it will not be called for - * other kinds of errors; also, it has the ability to tell EventSource to stop reconnecting. + * Interface for an object that will be notified when EventSource encounters a socket connection + * error or receives an error response. This is different from {@link EventHandler#onError(Throwable)} + * in two ways: + *
    + *
  1. It has the ability to tell EventSource to shut down instead of reconnecting. + *
  2. If the server simply ends the stream, the {@code ConnectionErrorHandler} is called with + * an {@code EOFException}; {@code onError} is not called in this case. + *
*/ public interface ConnectionErrorHandler { /** @@ -26,8 +33,11 @@ public static enum Action { /** * This method is called synchronously for all exceptions that occur on the socket connection * (including an {@link UnsuccessfulResponseException} if the server returns an unexpected HTTP - * status). It must not take any direct action to affect the state of the connection, nor do - * any I/O of its own, but can return {@link Action#SHUTDOWN} to cause the connection to close. + * status, or {@link EOFException} if the streaming response has ended). + *

+ * It must not take any direct action to affect the state of the connection, nor do any I/O of + * its own, but it can return {@link Action#SHUTDOWN} to cause the connection to close. + * * @param t a {@code Throwable} object * @return an {@link Action} constant */ diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index 44fa237..390c834 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -98,8 +98,6 @@ public class EventSource implements Closeable { private final OkHttpClient client; private volatile Call call; private final Random jitter = new Random(); - private Response response; - private BufferedSource bufferedSource; EventSource(Builder builder) { this.name = builder.name == null ? "" : builder.name; @@ -124,7 +122,8 @@ public class EventSource implements Closeable { ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority); this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory); this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger); - this.connectionErrorHandler = builder.connectionErrorHandler; + this.connectionErrorHandler = builder.connectionErrorHandler == null ? + ConnectionErrorHandler.DEFAULT : builder.connectionErrorHandler; this.readyState = new AtomicReference<>(RAW); this.client = builder.clientBuilder.build(); } @@ -154,7 +153,7 @@ public void start() { } logger.debug("readyState change: {} -> {}", RAW, CONNECTING); logger.info("Starting EventSource client using URI: " + url); - streamExecutor.execute(this::connect); + streamExecutor.execute(this::run); } /** @@ -198,8 +197,8 @@ public void close() { closeCurrentStream(currentState); - eventExecutor.shutdownNow(); - streamExecutor.shutdownNow(); + eventExecutor.shutdown(); + streamExecutor.shutdown(); // COVERAGE: these null guards are here for safety but in practice the values are never null and there // is no way to cause them to be null in unit tests @@ -242,13 +241,108 @@ Request buildRequest() { return requestTransformer == null ? request : requestTransformer.transformRequest(request); } - private void connect() { - response = null; - bufferedSource = null; - + private void run() { + AtomicLong connectedTime = new AtomicLong(); int reconnectAttempts = 0; - ConnectionErrorHandler.Action errorHandlerAction = null; + try { + while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) { + if (reconnectAttempts == 0) { + reconnectAttempts++; + } else { + reconnectAttempts = maybeReconnectDelay(reconnectAttempts, connectedTime.get()); + } + newConnectionAttempt(connectedTime); + } + } catch (RejectedExecutionException ignored) { + // COVERAGE: there is no way to simulate this condition in unit tests + call = null; + logger.debug("Rejected execution exception ignored: {}", ignored); + // During shutdown, we tried to send a message to the event handler + // Do not reconnect; the executor has been shut down + } + } + + private int maybeReconnectDelay(int reconnectAttempts, long connectedTime) { + if (reconnectTime.isZero() || reconnectTime.isNegative()) { + return reconnectAttempts; + } + + int counter = reconnectAttempts; + + // Reset the backoff if we had a successful connection that stayed good for at least + // backoffResetThresholdMs milliseconds. + if (connectedTime > 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) { + counter = 1; + } + + try { + Duration sleepTime = backoffWithJitter(counter); + logger.info("Waiting " + sleepTime.toMillis() + " milliseconds before reconnecting..."); + Thread.sleep(sleepTime.toMillis()); + } catch (InterruptedException ignored) { // COVERAGE: no way to cause this in unit tests + } + + return ++counter; + } + + private void newConnectionAttempt(AtomicLong connectedTime) { + ConnectionErrorHandler.Action errorHandlerAction = ConnectionErrorHandler.Action.PROCEED; + + ReadyState stateBeforeConnecting = readyState.getAndSet(CONNECTING); + logger.debug("readyState change: {} -> {}", stateBeforeConnecting, CONNECTING); + + connectedTime.set(0); + call = client.newCall(buildRequest()); + + try { + try (Response response = call.execute()) { + if (response.isSuccessful()) { + connectedTime.set(System.currentTimeMillis()); + handleSuccessfulResponse(response); + } else { + logger.debug("Unsuccessful response: {}", response); + errorHandlerAction = dispatchError(new UnsuccessfulResponseException(response.code())); + } + } + // If handleSuccessfulResponse returned without throwing an exception, it means the server + // ended the stream. We don't call the handler's onError() method in this case; but we will + // call the ConnectionErrorHandler with an EOFException, in case it wants to do something + // special in this scenario (like choose not to retry the connection). However, first we + // should check the state in case we've been deliberately closed from elsewhere. + ReadyState state = readyState.get(); + if (state != SHUTDOWN && state != CLOSED) { + logger.warn("Connection unexpectedly closed"); + errorHandlerAction = connectionErrorHandler.onConnectionError(new EOFException()); + } + } catch (IOException e) { + ReadyState state = readyState.get(); + if (state != SHUTDOWN && state != CLOSED) { + logger.debug("Connection problem: {}", e); + errorHandlerAction = dispatchError(e); + } + } finally { + if (errorHandlerAction == ConnectionErrorHandler.Action.SHUTDOWN) { + logger.info("Connection has been explicitly shut down by error handler"); + close(); + } else { + boolean wasOpen = readyState.compareAndSet(OPEN, CLOSED); + boolean wasConnecting = readyState.compareAndSet(CONNECTING, CLOSED); + if (wasOpen) { + logger.debug("readyState change: {} -> {}", OPEN, CLOSED); + handler.onClosed(); + } else if (wasConnecting) { + logger.debug("readyState change: {} -> {}", CONNECTING, CLOSED); + } + } + } + } + + // Read the response body as an SSE stream and dispatch each received event to the EventHandler. + // This function exits in one of two ways: + // 1. A normal return - this means the response simply ended. + // 2. Throwing an IOException - there was an unexpected connection failure. + private void handleSuccessfulResponse(Response response) throws IOException { ConnectionHandler connectionHandler = new ConnectionHandler() { @Override public void setReconnectionTime(Duration reconnectionTime) { @@ -261,103 +355,29 @@ public void setLastEventId(String lastEventId) { } }; - try { - while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) { - long connectedTime = -1; - - ReadyState currentState = readyState.getAndSet(CONNECTING); - logger.debug("readyState change: {} -> {}", currentState, CONNECTING); - try { - call = client.newCall(buildRequest()); - response = call.execute(); - if (response.isSuccessful()) { - connectedTime = System.currentTimeMillis(); - currentState = readyState.getAndSet(OPEN); - if (currentState != CONNECTING) { - // COVERAGE: there is no way to simulate this condition in unit tests - logger.warn("Unexpected readyState change: " + currentState + " -> " + OPEN); - } else { - logger.debug("readyState change: {} -> {}", currentState, OPEN); - } - logger.info("Connected to Event Source stream."); - handler.onOpen(); - if (bufferedSource != null) { - bufferedSource.close(); - } - bufferedSource = Okio.buffer(response.body().source()); - EventParser parser = new EventParser(url.uri(), handler, connectionHandler, logger); - // COVERAGE: the isInterrupted() condition is not encountered in unit tests and it's unclear if it can ever happen - for (String line; !Thread.currentThread().isInterrupted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) { - parser.line(line); - } - } else { - logger.debug("Unsuccessful response: {}", response); - errorHandlerAction = dispatchError(new UnsuccessfulResponseException(response.code())); - } - } catch (EOFException eofe) { - logger.warn("Connection unexpectedly closed."); - } catch (IOException ioe) { - ReadyState state = readyState.get(); - if (state == SHUTDOWN) { - errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN; - } else if (state == CLOSED) { // this happens if it's being restarted - errorHandlerAction = ConnectionErrorHandler.Action.PROCEED; - } else { - // COVERAGE: there is no way to simulate this condition in unit tests - closing the stream causes EOFException - logger.debug("Connection problem: {}", ioe); - errorHandlerAction = dispatchError(ioe); - } - } finally { - ReadyState nextState = CLOSED; - if (errorHandlerAction == ConnectionErrorHandler.Action.SHUTDOWN) { - if (readyState.get() != SHUTDOWN) { - logger.info("Connection has been explicitly shut down by error handler"); - } - nextState = SHUTDOWN; - } - currentState = readyState.getAndSet(nextState); - logger.debug("readyState change: {} -> {}", currentState, nextState); - - if (response != null && response.body() != null) { - response.close(); - logger.debug("response closed", null); - } - - if (bufferedSource != null) { - try { - bufferedSource.close(); - logger.debug("buffered source closed", null); - } catch (IOException e) { - // COVERAGE: there is no way to simulate this condition in unit tests - logger.warn("Exception when closing bufferedSource: " + e.toString()); - } - } - - if (currentState == ReadyState.OPEN) { - handler.onClosed(); - } - - if (nextState != SHUTDOWN) { - // Reset the backoff if we had a successful connection that stayed good for at least - // backoffResetThresholdMs milliseconds. - if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) { - reconnectAttempts = 0; - } - maybeWaitWithBackoff(++reconnectAttempts); - } - } - } - } catch (RejectedExecutionException ignored) { + ReadyState previousState = readyState.getAndSet(OPEN); + if (previousState != CONNECTING) { // COVERAGE: there is no way to simulate this condition in unit tests - call = null; - response = null; - bufferedSource = null; - logger.debug("Rejected execution exception ignored: {}", ignored); - // During shutdown, we tried to send a message to the event handler - // Do not reconnect; the executor has been shut down + logger.warn("Unexpected readyState change: " + previousState + " -> " + OPEN); + } else { + logger.debug("readyState change: {} -> {}", previousState, OPEN); + } + logger.info("Connected to EventSource stream."); + handler.onOpen(); + + try (BufferedSource bufferedSource = Okio.buffer(response.body().source())) { + EventParser parser = new EventParser(url.uri(), handler, connectionHandler, logger); + // COVERAGE: the isInterrupted() condition is not encountered in unit tests and it's unclear if it can ever happen + for (String line; !Thread.currentThread().isInterrupted() && + !bufferedSource.exhausted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) { + parser.line(line); + } + } catch (EOFException e) { + // This should not happen because bufferedSource.exhausted() should have returned true, but if + // it does happen, we'll treat it the same as a regular end of stream. } } - + private ConnectionErrorHandler.Action dispatchError(Throwable t) { ConnectionErrorHandler.Action action = connectionErrorHandler.onConnectionError(t); if (action != ConnectionErrorHandler.Action.SHUTDOWN) { @@ -365,17 +385,6 @@ private ConnectionErrorHandler.Action dispatchError(Throwable t) { } return action; } - - private void maybeWaitWithBackoff(int reconnectAttempts) { - if (!reconnectTime.isZero() && !reconnectTime.isNegative() && reconnectAttempts > 0) { - try { - Duration sleepTime = backoffWithJitter(reconnectAttempts); - logger.info("Waiting " + sleepTime.toMillis() + " milliseconds before reconnecting..."); - Thread.sleep(sleepTime.toMillis()); - } catch (InterruptedException ignored) { - } - } - } Duration backoffWithJitter(int reconnectAttempts) { long maxTimeLong = Math.min(maxReconnectTime.toMillis(), reconnectTime.toMillis() * pow2(reconnectAttempts)); diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpReconnectTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpReconnectTest.java index b362890..09c9f7c 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpReconnectTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpReconnectTest.java @@ -6,13 +6,16 @@ import org.junit.Rule; import org.junit.Test; +import java.io.IOException; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static com.launchdarkly.eventsource.StubServer.Handlers.chunkFromString; import static com.launchdarkly.eventsource.StubServer.Handlers.forRequestsInSequence; import static com.launchdarkly.eventsource.StubServer.Handlers.interruptible; +import static com.launchdarkly.eventsource.StubServer.Handlers.ioError; import static com.launchdarkly.eventsource.StubServer.Handlers.returnStatus; import static com.launchdarkly.eventsource.StubServer.Handlers.stream; import static org.junit.Assert.assertEquals; @@ -28,13 +31,21 @@ public class EventSourceHttpReconnectTest { @Rule public TestScopedLoggerRule testLogger = new TestScopedLoggerRule(); - // NOTE ABOUT KNOWN ISSUE: Intermittent test failures suggest that sometimes the handler's onClose() - // method does not get called when the stream is completely shut down. This is not a new issue, and - // it does not affect the way the LaunchDarkly SDKs use EventSource. So, for now, test assertions - // for that method are commented out. - @Test public void eventSourceReconnectsAfterSocketClosed() throws Exception { + verifyReconnectAfterStreamInterrupted( + null, + Duration.ofMillis(10), + eventSink -> { + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + }); + } + + private void verifyReconnectAfterStreamInterrupted( + StubServer.Handler extraErrorAfterReconnectHandler, + Duration reconnectTime, + Consumer checkExpectedEvents + ) { String body1 = "data: first\n\n"; String body2 = "data: second\n\n"; @@ -42,11 +53,16 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, chunkFromString(body1, true))); StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, chunkFromString(body2, true)); - StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2); + StubServer.Handler allRequests; + if (extraErrorAfterReconnectHandler == null) { + allRequests = forRequestsInSequence(streamHandler1, streamHandler2); + } else { + allRequests = forRequestsInSequence(streamHandler1, extraErrorAfterReconnectHandler, streamHandler2); + } try (StubServer server = StubServer.start(allRequests)) { try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) - .reconnectTime(Duration.ofMillis(10)) + .reconnectTime(reconnectTime) .build()) { es.start(); @@ -58,8 +74,8 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { eventSink.assertNoMoreLogItems(); // should not have closed first stream yet streamHandler1.interrupt(); - - assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + + checkExpectedEvents.accept(eventSink); assertEquals(LogItem.opened(), eventSink.awaitLogItem()); @@ -67,32 +83,55 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { eventSink.awaitLogItem()); } - // assertEquals(LogItem.closed(), eventSink.awaitLogItem()); // see NOTE ON KNOWN ISSUE - // eventSink.assertNoMoreLogItems(); + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + eventSink.assertNoMoreLogItems(); } } @Test - public void eventSourceReconnectsAfterErrorOnFirstRequest() throws Exception { + public void eventSourceReconnectsAfterHttpErrorOnFirstRequest() throws Exception { + verifyReconnectAfterErrorOnFirstRequest( + returnStatus(500), + Duration.ofMillis(10), + eventSink -> { + LogItem errorItem = eventSink.awaitLogItem(); + assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), errorItem); + assertEquals(500, ((UnsuccessfulResponseException)errorItem.error).getCode()); + }); + } + + @Test + public void eventSourceReconnectsAfterNetworkErrorOnFirstRequest() throws Exception { + verifyReconnectAfterErrorOnFirstRequest( + ioError(), + Duration.ofMillis(10), + eventSink -> { + LogItem errorItem = eventSink.awaitLogItem(); + assertEquals(IOException.class, errorItem.error.getClass()); + }); + } + + private void verifyReconnectAfterErrorOnFirstRequest( + StubServer.Handler errorProducer, + Duration reconnectTime, + Consumer checkExpectedEvents + ) { String body = "data: good\n\n"; - int statusCode = 500; TestHandler eventSink = new TestHandler(); - - StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString(body, true)); - StubServer.Handler allRequests = forRequestsInSequence(returnStatus(statusCode), streamHandler); - try (StubServer server = StubServer.start(allRequests)) { + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString(body, true)); + StubServer.Handler allRequests = forRequestsInSequence(errorProducer, streamHandler); + + try (StubServer server = StubServer.start(allRequests)) { try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) - .reconnectTime(Duration.ofMillis(10)) + .reconnectTime(reconnectTime) .logger(testLogger.getLogger()) .build()) { es.start(); - - LogItem errorItem = eventSink.awaitLogItem(); - assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), errorItem); - assertEquals(statusCode, ((UnsuccessfulResponseException)errorItem.error).getCode()); - + + checkExpectedEvents.accept(eventSink); + assertEquals(LogItem.opened(), eventSink.awaitLogItem()); assertEquals(LogItem.event("message", "good"), @@ -101,52 +140,41 @@ public void eventSourceReconnectsAfterErrorOnFirstRequest() throws Exception { eventSink.assertNoMoreLogItems(); } - // assertEquals(LogItem.closed(), eventSink.awaitLogItem()); // see NOTE ON KNOWN ISSUE + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + eventSink.assertNoMoreLogItems(); } } @Test public void eventSourceReconnectsAgainAfterErrorOnFirstReconnect() throws Exception { - String body1 = "data: first\n\n"; - String body2 = "data: second\n\n"; + verifyReconnectAfterStreamInterrupted( + returnStatus(500), + Duration.ofMillis(10), + eventSink -> { + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), + eventSink.awaitLogItem()); + }); + } - TestHandler eventSink = new TestHandler(); + @Test + public void eventSourceReconnectsEvenIfDelayIsZero() throws Exception { + verifyReconnectAfterStreamInterrupted( + null, + Duration.ZERO, + eventSink -> { + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + }); + } - StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, chunkFromString(body1, true))); - StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, chunkFromString(body2, true)); - StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, returnStatus(500), streamHandler2); - - try (StubServer server = StubServer.start(allRequests)) { - try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) - .reconnectTime(Duration.ofMillis(10)) - .logger(testLogger.getLogger()) - .build()) { - es.start(); - - assertEquals(LogItem.opened(), eventSink.awaitLogItem()); - - assertEquals(LogItem.event("message", "first"), - eventSink.awaitLogItem()); - - eventSink.assertNoMoreLogItems(); - - streamHandler1.interrupt(); // make first stream fail - - assertEquals(LogItem.closed(), eventSink.awaitLogItem()); - - assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), - eventSink.awaitLogItem()); - - assertEquals(LogItem.opened(), eventSink.awaitLogItem()); - - assertEquals(LogItem.event("message", "second"), - eventSink.awaitLogItem()); - - eventSink.assertNoMoreLogItems(); - } - - // assertEquals(LogItem.closed(), eventSink.awaitLogItem()); // see NOTE ON KNOWN ISSUE - } + @Test + public void eventSourceReconnectsEvenIfDelayIsNegative() throws Exception { + verifyReconnectAfterStreamInterrupted( + null, + Duration.ofMillis(-1), + eventSink -> { + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); + }); } @Test @@ -157,7 +185,7 @@ public void streamDoesNotReconnectIfConnectionErrorHandlerSaysToStop() throws Ex ConnectionErrorHandler connectionErrorHandler = new ConnectionErrorHandler() { public Action onConnectionError(Throwable t) { calledHandler.set(true); - receivedError.set(t); + receivedError.compareAndSet(null, t); return Action.SHUTDOWN; } }; @@ -223,7 +251,7 @@ public void canForceEventSourceToRestart() throws Exception { eventSink.assertNoMoreLogItems(); } - // assertEquals(LogItem.closed(), eventSink.awaitLogItem()); // see NOTE ON KNOWN ISSUE + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); } } } diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java index fe4b24c..f472907 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -19,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import okhttp3.Headers; import okhttp3.MediaType; @@ -33,11 +35,6 @@ public class EventSourceHttpTest { @Rule public TestScopedLoggerRule testLogger = new TestScopedLoggerRule(); - // NOTE ABOUT KNOWN ISSUE: Intermittent test failures suggest that sometimes the handler's onClose() - // method does not get called when the stream is completely shut down. This is not a new issue, and - // it does not affect the way the LaunchDarkly SDKs use EventSource. So, for now, test assertions - // for that method are commented out. - @Test public void eventSourceSetsRequestProperties() throws Exception { String requestPath = "/some/path"; @@ -293,17 +290,19 @@ public void eventSourceReadsChunkedResponse() throws Exception { eventSink.assertNoMoreLogItems(); } } - // assertEquals(LogItem.closed(), eventSink.awaitLogItem()); // see NOTE ON KNOWN ISSUE + + assertEquals(LogItem.closed(), eventSink.awaitLogItem()); } @Test public void defaultThreadPriorityIsNotMaximum() throws Exception { - StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", false)); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", true)); ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler(); try (StubServer server = StubServer.start(streamHandler)) { try (EventSource es = new EventSource.Builder(threadCapturingHandler, server.getUri()) + .logger(testLogger.getLogger()) .build()) { es.start(); @@ -316,13 +315,14 @@ public void defaultThreadPriorityIsNotMaximum() throws Exception { @Test public void canSetSpecificThreadPriority() throws Exception { - StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", false)); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", true)); ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler(); try (StubServer server = StubServer.start(streamHandler)) { try (EventSource es = new EventSource.Builder(threadCapturingHandler, server.getUri()) .threadPriority(Thread.MAX_PRIORITY) + .logger(testLogger.getLogger()) .build()) { es.start(); @@ -333,6 +333,77 @@ public void canSetSpecificThreadPriority() throws Exception { } } + @Test + public void threadsAreStoppedAfterExplicitShutdown() throws Exception { + String name = "MyTestSource"; + TestHandler eventSink = new TestHandler(testLogger.getLogger()); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, chunkFromString("", true)); + + try (StubServer server = StubServer.start(streamHandler)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .name(name) + .logger(testLogger.getLogger()) + .build()) { + es.start(); + + assertNumberOfThreadsWithSubstring(name, 2, Duration.ofSeconds(2)); + + es.close(); + + assertNumberOfThreadsWithSubstring(name, 0, Duration.ofSeconds(2)); + } + } + } + + @Test + public void threadsAreStoppedAfterShutdownIsForcedByConnectionErrorHandler() throws Exception { + String name = "MyTestSource"; + TestHandler eventSink = new TestHandler(testLogger.getLogger()); + ConnectionErrorHandler errorHandler = e -> ConnectionErrorHandler.Action.SHUTDOWN; + StubServer.InterruptibleHandler streamHandler = interruptible(stream(CONTENT_TYPE, chunkFromString("", true))); + + try (StubServer server = StubServer.start(streamHandler)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .name(name) + .connectionErrorHandler(errorHandler) + .logger(testLogger.getLogger()) + .build()) { + es.start(); + + assertEquals(LogItem.opened(), eventSink.awaitLogItem()); + + assertNumberOfThreadsWithSubstring(name, 2, Duration.ofSeconds(2)); + + streamHandler.interrupt(); + assertEquals("closed", eventSink.awaitLogItem().action); + + assertNumberOfThreadsWithSubstring(name, 0, Duration.ofSeconds(2)); + } + } + } + + private void assertNumberOfThreadsWithSubstring(String s, int expectedCount, Duration timeout) throws Exception { + Instant limit = Instant.now().plus(timeout); + int count = 0; + while (Instant.now().isBefore(limit)) { + int n = Thread.currentThread().getThreadGroup().activeCount(); + Thread[] ts = new Thread[n]; + Thread.currentThread().getThreadGroup().enumerate(ts, true); + count = 0; + for (Thread t: ts) { + if (t != null && t.isAlive() && t.getName().contains(s)) { + count++; + } + } + if (count == expectedCount) { + return; + } + Thread.sleep(50); + } + fail("wanted " + expectedCount + " threads with substring '" + s + + "' but found " + count + " after " + timeout); + } + private static class ThreadCapturingHandler implements EventHandler { final BlockingQueue capturedThreads = new LinkedBlockingQueue<>(); diff --git a/src/test/java/com/launchdarkly/eventsource/StubServer.java b/src/test/java/com/launchdarkly/eventsource/StubServer.java index bef3ccf..a673458 100644 --- a/src/test/java/com/launchdarkly/eventsource/StubServer.java +++ b/src/test/java/com/launchdarkly/eventsource/StubServer.java @@ -1,5 +1,6 @@ package com.launchdarkly.eventsource; +import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -223,10 +224,24 @@ public static abstract class Handlers { * @return the handler */ public static Handler returnStatus(final int status) { - return new Handler() { - public void handle(HttpServletRequest req, HttpServletResponse resp) { - resp.setStatus(status); - } + return (req, resp) -> { + resp.setStatus(status); + }; + } + + /** + * Provides a handler that causes the server to close the connection immediately with no response, + * which should cause an IOException on the client side. + * + * @return the handler + */ + public static Handler ioError() { + return (req, resp) -> { + // This is very hacky, because Jetty - quite properly - does not allow you to do anything via the + // regular HttpServletResponse API that would cause an early shutdown of the connection. So we're + // messing with Jetty's connection internals. + HttpConnection conn = HttpConnection.getCurrentConnection(); + conn.getHttpChannel().getEndPoint().close(); }; } diff --git a/src/test/java/com/launchdarkly/eventsource/StubServerTest.java b/src/test/java/com/launchdarkly/eventsource/StubServerTest.java new file mode 100644 index 0000000..9f5f3e2 --- /dev/null +++ b/src/test/java/com/launchdarkly/eventsource/StubServerTest.java @@ -0,0 +1,64 @@ +package com.launchdarkly.eventsource; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static com.launchdarkly.eventsource.StubServer.Handlers.forRequestsInSequence; +import static com.launchdarkly.eventsource.StubServer.Handlers.ioError; +import static com.launchdarkly.eventsource.StubServer.Handlers.returnStatus; +import static org.junit.Assert.assertEquals; + +import okhttp3.Call; +import okhttp3.ConnectionPool; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +/** + * These are not tests of EventSource; they just verify that the embedded HTTP server helper we're using + * behaves as it should, since if it doesn't, the results of the EventSource tests are undefined. + */ +@SuppressWarnings("javadoc") +public class StubServerTest { + private final OkHttpClient client; + + public StubServerTest() { + client = new OkHttpClient.Builder() + .connectionPool(new ConnectionPool(1, 1, TimeUnit.SECONDS)) + .retryOnConnectionFailure(false) + .build(); + } + + private Call makeGet(StubServer server) throws Exception { + return client.newCall(new Request.Builder().method("GET", null).url(server.getUri().toURL()).build()); + } + + @Test + public void returnStatusHandler() throws Exception { + try (StubServer server = StubServer.start(returnStatus(418))) { + Response resp = makeGet(server).execute(); + assertEquals(418, resp.code()); + } + } + + @Test(expected=IOException.class) + public void ioErrorHandler() throws Exception { + try (StubServer server = StubServer.start(ioError())) { + makeGet(server).execute(); + } + } + + @Test + public void forRequestsInSequenceHandler() throws Exception { + StubServer.Handler handler = forRequestsInSequence(returnStatus(418), returnStatus(503)); + try (StubServer server = StubServer.start(handler)) { + Response resp1 = makeGet(server).execute(); + assertEquals(418, resp1.code()); + + Response resp2 = makeGet(server).execute(); + assertEquals(503, resp2.code()); + } + } +} diff --git a/src/test/java/com/launchdarkly/eventsource/Stubs.java b/src/test/java/com/launchdarkly/eventsource/Stubs.java index c05c728..df07d4e 100644 --- a/src/test/java/com/launchdarkly/eventsource/Stubs.java +++ b/src/test/java/com/launchdarkly/eventsource/Stubs.java @@ -8,7 +8,7 @@ class Stubs { static class LogItem { - private final String action; + final String action; private final String[] params; final Throwable error;