diff --git a/build.gradle.kts b/build.gradle.kts index bf5fa7e..5c06399 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -114,7 +114,10 @@ tasks.jacocoTestCoverageVerification.configure { violationRules { val knownMissedLinesForMethods = mapOf( // The key for each of these items is the complete method signature minus the "com.launchdarkly.eventsource." prefix. + "AsyncEventHandler.acquire()" to 2, + "AsyncEventHandler.execute(java.lang.Runnable)" to 3, "BufferedUtf8LineReader.getLineFromBuffer()" to 2, + "EventSource.awaitClosed(java.time.Duration)" to 3, "EventSource.handleSuccessfulResponse(okhttp3.Response)" to 2, "EventSource.maybeReconnectDelay(int, long)" to 2, "EventSource.run()" to 3, diff --git a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java index 7ee66e2..f4df11e 100644 --- a/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java +++ b/src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java @@ -97,7 +97,7 @@ private void execute(Runnable task) { release(); } }); - } catch (Exception e) { + } catch (Exception e) { // COVERAGE: this condition can't be reproduced in unit tests // probably a RejectedExecutionException due to pool shutdown release(); throw e; @@ -108,7 +108,7 @@ private void acquire() { if (semaphore != null) { try { semaphore.acquire(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { // COVERAGE: this condition can't be reproduced in unit tests throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e); } } diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index ae80861..42fd5f6 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -240,13 +240,13 @@ public boolean awaitClosed(final Duration timeout) throws InterruptedException { long shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis()); if (!streamExecutor.awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) { - return false; + return false; // COVERAGE: this condition can't be reproduced in unit tests } if (client.dispatcher().executorService() != null) { shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis()); if (!client.dispatcher().executorService().awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) { - return false; + return false; // COVERAGE: this condition can't be reproduced in unit tests } } diff --git a/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java b/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java index 44f97dc..f332610 100644 --- a/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java +++ b/src/test/java/com/launchdarkly/eventsource/AsyncEventHandlerTest.java @@ -2,6 +2,7 @@ import com.launchdarkly.eventsource.Stubs.LogItem; import com.launchdarkly.eventsource.Stubs.TestHandler; + import org.junit.After; import org.junit.Test; @@ -15,11 +16,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @SuppressWarnings("javadoc") @@ -42,8 +43,8 @@ public void tearDown() { } private void verifyErrorLogged(Throwable t) { - verify(logger).warn("Caught unexpected error from EventHandler: " + t); - verify(logger).debug(eq("Stack trace: {}"), any(LazyStackTrace.class)); + verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler: " + t); + verify(logger, timeout(1000)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class)); } @Test @@ -104,12 +105,13 @@ public void errorFromOnErrorIsCaughtAndLogged() { eventHandler.fakeErrorFromErrorHandler = err2; asyncHandler.onOpen(); - + assertEquals(LogItem.opened(), eventHandler.awaitLogItem()); assertEquals(LogItem.error(err1), eventHandler.awaitLogItem()); - verify(logger).warn("Caught unexpected error from EventHandler: " + err1); - verify(logger).warn("Caught unexpected error from EventHandler.onError(): " + err2); - verify(logger, times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class)); + + verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler: " + err1); + verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler.onError(): " + err2); + verify(logger, timeout(1000).times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class)); } @Test diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java index 79ac531..7b8fee5 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java @@ -23,10 +23,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java index ed24aca..fadacaa 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java @@ -14,13 +14,17 @@ import java.time.Duration; import java.time.Instant; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.launchdarkly.eventsource.TestHandlers.chunksFromString; import static com.launchdarkly.eventsource.TestHandlers.streamThatStaysOpen; import static com.launchdarkly.testhelpers.httptest.Handlers.hang; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -341,16 +345,16 @@ public void processDataWithFixedQueueSize() throws Exception { @Test(timeout = 15000) public void canAwaitClosed() throws Exception { - final String body = "data: data-by-itself\n\n" + - "event: event-with-data\n" + - "data: abc\n\n" + - ": this is a comment\n" + - "event: event-with-more-data-and-id\n" + - "id: my-id\n" + - "data: abc\n" + - "data: def\n\n"; - - TestHandler eventSink = new TestHandler(testLogger.getLogger()); + final String body = "data: event1\n\n"; + + AtomicBoolean received = new AtomicBoolean(false); + TestHandler eventSink = new TestHandler(testLogger.getLogger()) { + @Override + public void onMessage(String event, MessageEvent messageEvent) throws Exception { + received.set(true); + super.onMessage(event, messageEvent); + } + }; Handler streamHandler = chunksFromString(body, 5, Duration.ZERO, true); try (HttpServer server = HttpServer.start(streamHandler)) { @@ -362,28 +366,56 @@ public void canAwaitClosed() throws Exception { assertEquals(LogItem.opened(), eventSink.awaitLogItem()); - assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec - eventSink.awaitLogItem()); - - assertEquals(LogItem.event("event-with-data", "abc"), + assertEquals(LogItem.event("message", "event1"), // "message" is the default event name, per SSE spec eventSink.awaitLogItem()); - - assertEquals(LogItem.comment("this is a comment"), - eventSink.awaitLogItem()); - - assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"), - eventSink.awaitLogItem()); - - eventSink.assertNoMoreLogItems(); } finally { es.close(); assertTrue("Expected close to complete", es.awaitClosed(Duration.ofSeconds(10))); + assertTrue(received.get()); } } assertEquals(LogItem.closed(), eventSink.awaitLogItem()); } + @Test + public void awaitClosedTimesOutIfEventHandlerNotCompleted() throws Exception { + final String body = "data: event1\n\n"; + + CountDownLatch enteredHandler = new CountDownLatch(1); + CountDownLatch handlerCanProceed = new CountDownLatch(1); + TestHandler eventSink = new TestHandler(testLogger.getLogger()) { + @Override + public void onMessage(String event, MessageEvent messageEvent) throws Exception { + enteredHandler.countDown(); + handlerCanProceed.await(5, TimeUnit.SECONDS); + super.onMessage(event, messageEvent); + } + }; + Handler streamHandler = chunksFromString(body, 5, Duration.ZERO, true); + + try (HttpServer server = HttpServer.start(streamHandler)) { + EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .logger(testLogger.getLogger()) + .build(); + try { + es.start(); + + assertEquals(LogItem.opened(), eventSink.awaitLogItem()); + + assertTrue("expected handler to be called", enteredHandler.await(1, TimeUnit.SECONDS)); + } finally { + es.close(); + + try { + assertFalse("Expected awaitClosed to time out", es.awaitClosed(Duration.ofSeconds(2))); + } finally { + handlerCanProceed.countDown(); + } + } + } + } + @Test public void defaultThreadPriorityIsNotMaximum() throws Exception { ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler();