Skip to content

Commit

Permalink
improve tests for AsyncEventHandler and EventSource.awaitClosed (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-darkly authored Jan 13, 2022
1 parent 5eb858d commit 78980a6
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 36 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ tasks.jacocoTestCoverageVerification.configure {
violationRules {
val knownMissedLinesForMethods = mapOf(
// The key for each of these items is the complete method signature minus the "com.launchdarkly.eventsource." prefix.
"AsyncEventHandler.acquire()" to 2,
"AsyncEventHandler.execute(java.lang.Runnable)" to 3,
"BufferedUtf8LineReader.getLineFromBuffer()" to 2,
"EventSource.awaitClosed(java.time.Duration)" to 3,
"EventSource.handleSuccessfulResponse(okhttp3.Response)" to 2,
"EventSource.maybeReconnectDelay(int, long)" to 2,
"EventSource.run()" to 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void execute(Runnable task) {
release();
}
});
} catch (Exception e) {
} catch (Exception e) { // COVERAGE: this condition can't be reproduced in unit tests
// probably a RejectedExecutionException due to pool shutdown
release();
throw e;
Expand All @@ -108,7 +108,7 @@ private void acquire() {
if (semaphore != null) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
} catch (InterruptedException e) { // COVERAGE: this condition can't be reproduced in unit tests
throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,13 @@ public boolean awaitClosed(final Duration timeout) throws InterruptedException {

long shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!streamExecutor.awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
return false; // COVERAGE: this condition can't be reproduced in unit tests
}

if (client.dispatcher().executorService() != null) {
shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!client.dispatcher().executorService().awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
return false; // COVERAGE: this condition can't be reproduced in unit tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.launchdarkly.eventsource.Stubs.LogItem;
import com.launchdarkly.eventsource.Stubs.TestHandler;

import org.junit.After;
import org.junit.Test;

Expand All @@ -15,11 +16,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

@SuppressWarnings("javadoc")
Expand All @@ -42,8 +43,8 @@ public void tearDown() {
}

private void verifyErrorLogged(Throwable t) {
verify(logger).warn("Caught unexpected error from EventHandler: " + t);
verify(logger).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));
verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler: " + t);
verify(logger, timeout(1000)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));
}

@Test
Expand Down Expand Up @@ -104,12 +105,13 @@ public void errorFromOnErrorIsCaughtAndLogged() {
eventHandler.fakeErrorFromErrorHandler = err2;

asyncHandler.onOpen();

assertEquals(LogItem.opened(), eventHandler.awaitLogItem());
assertEquals(LogItem.error(err1), eventHandler.awaitLogItem());
verify(logger).warn("Caught unexpected error from EventHandler: " + err1);
verify(logger).warn("Caught unexpected error from EventHandler.onError(): " + err2);
verify(logger, times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));

verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler: " + err1);
verify(logger, timeout(1000)).warn("Caught unexpected error from EventHandler.onError(): " + err2);
verify(logger, timeout(1000).times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

Expand Down
76 changes: 54 additions & 22 deletions src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.launchdarkly.eventsource.TestHandlers.chunksFromString;
import static com.launchdarkly.eventsource.TestHandlers.streamThatStaysOpen;
import static com.launchdarkly.testhelpers.httptest.Handlers.hang;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -341,16 +345,16 @@ public void processDataWithFixedQueueSize() throws Exception {

@Test(timeout = 15000)
public void canAwaitClosed() throws Exception {
final String body = "data: data-by-itself\n\n" +
"event: event-with-data\n" +
"data: abc\n\n" +
": this is a comment\n" +
"event: event-with-more-data-and-id\n" +
"id: my-id\n" +
"data: abc\n" +
"data: def\n\n";

TestHandler eventSink = new TestHandler(testLogger.getLogger());
final String body = "data: event1\n\n";

AtomicBoolean received = new AtomicBoolean(false);
TestHandler eventSink = new TestHandler(testLogger.getLogger()) {
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
received.set(true);
super.onMessage(event, messageEvent);
}
};
Handler streamHandler = chunksFromString(body, 5, Duration.ZERO, true);

try (HttpServer server = HttpServer.start(streamHandler)) {
Expand All @@ -362,28 +366,56 @@ public void canAwaitClosed() throws Exception {

assertEquals(LogItem.opened(), eventSink.awaitLogItem());

assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec
eventSink.awaitLogItem());

assertEquals(LogItem.event("event-with-data", "abc"),
assertEquals(LogItem.event("message", "event1"), // "message" is the default event name, per SSE spec
eventSink.awaitLogItem());

assertEquals(LogItem.comment("this is a comment"),
eventSink.awaitLogItem());

assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"),
eventSink.awaitLogItem());

eventSink.assertNoMoreLogItems();
} finally {
es.close();
assertTrue("Expected close to complete", es.awaitClosed(Duration.ofSeconds(10)));
assertTrue(received.get());
}
}

assertEquals(LogItem.closed(), eventSink.awaitLogItem());
}

@Test
public void awaitClosedTimesOutIfEventHandlerNotCompleted() throws Exception {
final String body = "data: event1\n\n";

CountDownLatch enteredHandler = new CountDownLatch(1);
CountDownLatch handlerCanProceed = new CountDownLatch(1);
TestHandler eventSink = new TestHandler(testLogger.getLogger()) {
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
enteredHandler.countDown();
handlerCanProceed.await(5, TimeUnit.SECONDS);
super.onMessage(event, messageEvent);
}
};
Handler streamHandler = chunksFromString(body, 5, Duration.ZERO, true);

try (HttpServer server = HttpServer.start(streamHandler)) {
EventSource es = new EventSource.Builder(eventSink, server.getUri())
.logger(testLogger.getLogger())
.build();
try {
es.start();

assertEquals(LogItem.opened(), eventSink.awaitLogItem());

assertTrue("expected handler to be called", enteredHandler.await(1, TimeUnit.SECONDS));
} finally {
es.close();

try {
assertFalse("Expected awaitClosed to time out", es.awaitClosed(Duration.ofSeconds(2)));
} finally {
handlerCanProceed.countDown();
}
}
}
}

@Test
public void defaultThreadPriorityIsNotMaximum() throws Exception {
ThreadCapturingHandler threadCapturingHandler = new ThreadCapturingHandler();
Expand Down

0 comments on commit 78980a6

Please sign in to comment.