From 42ae118082a0dccded3e67b3b63ecc4b3dddee78 Mon Sep 17 00:00:00 2001 From: LaunchDarklyCI Date: Mon, 30 Mar 2020 13:37:42 -0700 Subject: [PATCH 1/3] prepare 1.11.0 release (#44) --- build.gradle | 2 +- .../eventsource/EventHandler.java | 10 + .../launchdarkly/eventsource/EventSource.java | 77 +++- .../eventsource/EventParserTest.java | 1 + ...eTest.java => EventSourceBuilderTest.java} | 3 +- .../eventsource/EventSourceHttpTest.java | 251 ++++++++--- .../ManualConnectionErrorTest.java | 67 --- .../launchdarkly/eventsource/ManualTest.java | 47 -- .../ModernTLSSocketFactoryTest.java | 7 +- .../launchdarkly/eventsource/StubServer.java | 418 ++++++++++++++++++ 10 files changed, 675 insertions(+), 208 deletions(-) rename src/test/java/com/launchdarkly/eventsource/{EventSourceTest.java => EventSourceBuilderTest.java} (99%) delete mode 100644 src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java delete mode 100644 src/test/java/com/launchdarkly/eventsource/ManualTest.java create mode 100644 src/test/java/com/launchdarkly/eventsource/StubServer.java diff --git a/build.gradle b/build.gradle index 6be9b73..3dd5223 100644 --- a/build.gradle +++ b/build.gradle @@ -44,7 +44,7 @@ dependencies { api "org.slf4j:slf4j-api:${versions.slf4j}" testImplementation "ch.qos.logback:logback-classic:1.1.9" testImplementation "org.mockito:mockito-core:1.10.19" - testImplementation "com.squareup.okhttp3:mockwebserver:3.10.0" + testCompile "org.eclipse.jetty:jetty-server:9.4.27.v20200227" testImplementation "junit:junit:4.11" } diff --git a/src/main/java/com/launchdarkly/eventsource/EventHandler.java b/src/main/java/com/launchdarkly/eventsource/EventHandler.java index 9d82192..2205fb6 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventHandler.java +++ b/src/main/java/com/launchdarkly/eventsource/EventHandler.java @@ -12,6 +12,11 @@ public interface EventHandler { /** * EventSource calls this method when the stream connection has been closed. + *

+ * This method is not called if the connection was closed due to a {@link ConnectionErrorHandler} + * returning {@link ConnectionErrorHandler.Action#SHUTDOWN}; EventSource assumes that if you registered + * such a handler and made it return that value, then you already know that the connection is being closed. + * * @throws Exception throwing an exception here will cause it to be logged and also sent to {@link #onError(Throwable)} */ void onClosed() throws Exception; @@ -36,6 +41,11 @@ public interface EventHandler { * an {@link UnsuccessfulResponseException} if the server returns an unexpected HTTP status), * but only after the {@link ConnectionErrorHandler} (if any) has processed it. If you need to * do anything that affects the state of the connection, use {@link ConnectionErrorHandler}. + *

+ * This method is not called if the error was already passed to a {@link ConnectionErrorHandler} + * which returned {@link ConnectionErrorHandler.Action#SHUTDOWN}; EventSource assumes that if you registered + * such a handler and made it return that value, then you do not want to handle the same error twice. + * * @param t a {@code Throwable} object */ void onError(Throwable t); diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index c3baf42..3d8124f 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -13,9 +13,7 @@ import java.security.GeneralSecurityException; import java.security.KeyStore; import java.util.Arrays; -import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; @@ -159,7 +158,31 @@ public void run() { } }); } - + + /** + * Drops the current stream connection (if any) and attempts to reconnect. + *

+ * This method returns immediately after dropping the current connection; the reconnection happens on + * a worker thread. + *

+ * If a connection attempt is already in progress but has not yet connected, or if {@link #close()} has + * previously been called, this method has no effect. If {@link #start()} has never been called, it is + * the same as calling {@link #start()}. + */ + public void restart() { + ReadyState previousState = readyState.getAndUpdate(new UnaryOperator() { + public ReadyState apply(ReadyState t) { + return t == ReadyState.OPEN ? ReadyState.CLOSED : t; + } + }); + if (previousState == OPEN) { + closeCurrentStream(previousState); + } else if (previousState == RAW || previousState == CONNECTING) { + start(); + } + // if already shutdown or in the process of closing, do nothing + } + /** * Returns an enum indicating the current status of the connection. * @return a {@link ReadyState} value @@ -168,6 +191,9 @@ public ReadyState getState() { return readyState.get(); } + /** + * Drops the current stream connection (if any) and permanently shuts down the EventSource. + */ @Override public void close() { ReadyState currentState = readyState.getAndSet(SHUTDOWN); @@ -175,21 +201,8 @@ public void close() { if (currentState == SHUTDOWN) { return; } - if (currentState == ReadyState.OPEN) { - try { - handler.onClosed(); - } catch (Exception e) { - handler.onError(e); - } - } - - if (call != null) { - // The call.cancel() must precede the bufferedSource.close(). - // Otherwise, an IllegalArgumentException "Unbalanced enter/exit" error is thrown by okhttp. - // https://github.com/google/ExoPlayer/issues/1348 - call.cancel(); - logger.debug("call cancelled"); - } + + closeCurrentStream(currentState); eventExecutor.shutdownNow(); streamExecutor.shutdownNow(); @@ -207,6 +220,24 @@ public void close() { } } + private void closeCurrentStream(ReadyState previousState) { + if (previousState == ReadyState.OPEN) { + try { + handler.onClosed(); + } catch (Exception e) { + handler.onError(e); + } + } + + if (call != null) { + // The call.cancel() must precede the bufferedSource.close(). + // Otherwise, an IllegalArgumentException "Unbalanced enter/exit" error is thrown by okhttp. + // https://github.com/google/ExoPlayer/issues/1348 + call.cancel(); + logger.debug("call cancelled"); + } + } + Request buildRequest() { Request.Builder builder = new Request.Builder() .headers(headers) @@ -266,11 +297,14 @@ private void connect() { } catch (EOFException eofe) { logger.warn("Connection unexpectedly closed."); } catch (IOException ioe) { - if (readyState.get() != SHUTDOWN) { + ReadyState state = readyState.get(); + if (state == SHUTDOWN) { + errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN; + } else if (state == CLOSED) { // this happens if it's being restarted + errorHandlerAction = ConnectionErrorHandler.Action.PROCEED; + } else { logger.debug("Connection problem.", ioe); errorHandlerAction = dispatchError(ioe); - } else { - errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN; } } finally { ReadyState nextState = CLOSED; @@ -302,6 +336,7 @@ private void connect() { handler.onError(e); } } + // Reset the backoff if we had a successful connection that stayed good for at least // backoffResetThresholdMs milliseconds. if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThresholdMs) { diff --git a/src/test/java/com/launchdarkly/eventsource/EventParserTest.java b/src/test/java/com/launchdarkly/eventsource/EventParserTest.java index b9922f3..8d44e8c 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventParserTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventParserTest.java @@ -16,6 +16,7 @@ /** * Adapted from https://github.com/aslakhellesoy/eventsource-java/blob/master/src/test/java/com/github/eventsource/client/EventStreamParserTest.java */ +@SuppressWarnings("javadoc") public class EventParserTest { private static final URI ORIGIN = URI.create("http://host.com:99/foo"); diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java similarity index 99% rename from src/test/java/com/launchdarkly/eventsource/EventSourceTest.java rename to src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java index 2acace9..8a32be8 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java @@ -24,7 +24,8 @@ import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; -public class EventSourceTest { +@SuppressWarnings("javadoc") +public class EventSourceBuilderTest { private static final URI STREAM_URI = URI.create("http://www.example.com/"); private static final HttpUrl STREAM_HTTP_URL = HttpUrl.parse("http://www.example.com/"); private EventSource eventSource; diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java index 3c2b81b..a440480 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java @@ -2,21 +2,53 @@ import org.junit.Test; +import java.time.Duration; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import static com.launchdarkly.eventsource.StubServer.Handlers.forRequestsInSequence; +import static com.launchdarkly.eventsource.StubServer.Handlers.hang; +import static com.launchdarkly.eventsource.StubServer.Handlers.interruptible; +import static com.launchdarkly.eventsource.StubServer.Handlers.returnStatus; +import static com.launchdarkly.eventsource.StubServer.Handlers.stream; +import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromChunkedString; +import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.SocketPolicy; +import okhttp3.Headers; +@SuppressWarnings("javadoc") public class EventSourceHttpTest { - private static final int CHUNK_SIZE = 5; + private static final String CONTENT_TYPE = "text/event-stream"; + + @Test + public void eventSourceSetsRequestProperties() throws Exception { + String requestPath = "/some/path"; + Headers headers = new Headers.Builder().add("header1", "value1").add("header2", "value2").build(); + + try (StubServer server = StubServer.start(hang())) { + try (EventSource es = new EventSource.Builder(new TestHandler(), server.getUri().resolve(requestPath)) + .headers(headers) + .build()) { + es.start(); + + StubServer.RequestInfo r = server.awaitRequest(); + assertEquals(requestPath, r.getPath()); + assertEquals("value1", r.getHeader("header1")); + assertEquals("value2", r.getHeader("header2")); + } + } + } @Test public void eventSourceReadsChunkedResponse() throws Exception { - String body = "data: data-by-itself\n\n" + + final String body = "data: data-by-itself\n\n" + "event: event-with-data\n" + "data: abc\n\n" + ": this is a comment\n" + @@ -24,34 +56,33 @@ public void eventSourceReadsChunkedResponse() throws Exception { "id: my-id\n" + "data: abc\n" + "data: def\n\n"; - - TestHandler handler = new TestHandler(); - - try (MockWebServer server = new MockWebServer()) { - server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN)); - server.start(); - - try (EventSource es = new EventSource.Builder(handler, server.url("/")) - .build()) { + + TestHandler eventSink = new TestHandler(); + StubServer.Handler streamHandler = stream(CONTENT_TYPE, + streamProducerFromChunkedString(body, 5, Duration.ZERO, true)); + + try (StubServer server = StubServer.start(streamHandler)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()).build()) { es.start(); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec - handler.log.take()); + eventSink.log.take()); assertEquals(LogItem.event("event-with-data", "abc"), - handler.log.take()); + eventSink.log.take()); assertEquals(LogItem.comment("this is a comment"), - handler.log.take()); + eventSink.log.take()); assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"), - handler.log.take()); + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); } - - assertEquals(LogItem.closed(), handler.log.take()); } + assertEquals(LogItem.closed(), eventSink.log.take()); } @Test @@ -59,32 +90,37 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { String body1 = "data: first\n\n"; String body2 = "data: second\n\n"; - TestHandler handler = new TestHandler(); - - try (MockWebServer server = new MockWebServer()) { - server.enqueue(createEventsResponse(body1, SocketPolicy.DISCONNECT_AT_END)); - server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN)); - server.start(); - - try (EventSource es = new EventSource.Builder(handler, server.url("/")) + TestHandler eventSink = new TestHandler(); + + StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, streamProducerFromString(body1, true))); + StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true)); + StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2); + + try (StubServer server = StubServer.start(allRequests)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) .reconnectTimeMs(10) .build()) { es.start(); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "first"), - handler.log.take()); + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); // should not have closed first stream yet + + streamHandler1.interrupt(); - assertEquals(LogItem.closed(), handler.log.take()); + assertEquals(LogItem.closed(), eventSink.log.take()); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "second"), - handler.log.take()); + eventSink.log.take()); } - assertEquals(LogItem.closed(), handler.log.take()); + assertEquals(LogItem.closed(), eventSink.log.take()); + eventSink.assertNoMoreLogItems(); } } @@ -92,28 +128,29 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { public void eventSourceReconnectsAfterErrorOnFirstRequest() throws Exception { String body = "data: good\n\n"; - TestHandler handler = new TestHandler(); + TestHandler eventSink = new TestHandler(); - try (MockWebServer server = new MockWebServer()) { - server.enqueue(createErrorResponse(500)); - server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN)); - server.start(); - - try (EventSource es = new EventSource.Builder(handler, server.url("/")) + StubServer.Handler streamHandler = stream(CONTENT_TYPE, streamProducerFromString(body, true)); + StubServer.Handler allRequests = forRequestsInSequence(returnStatus(500), streamHandler); + + try (StubServer server = StubServer.start(allRequests)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) .reconnectTimeMs(10) .build()) { es.start(); assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), - handler.log.take()); + eventSink.log.take()); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "good"), - handler.log.take()); + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); } - assertEquals(LogItem.closed(), handler.log.take()); + assertEquals(LogItem.closed(), eventSink.log.take()); } } @@ -122,48 +159,118 @@ public void eventSourceReconnectsAgainAfterErrorOnFirstReconnect() throws Except String body1 = "data: first\n\n"; String body2 = "data: second\n\n"; - TestHandler handler = new TestHandler(); + TestHandler eventSink = new TestHandler(); - try (MockWebServer server = new MockWebServer()) { - server.enqueue(createEventsResponse(body1, SocketPolicy.DISCONNECT_AT_END)); - server.enqueue(createErrorResponse(500)); - server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN)); - server.start(); - - try (EventSource es = new EventSource.Builder(handler, server.url("/")) + StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, streamProducerFromString(body1, true))); + StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true)); + StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, returnStatus(500), streamHandler2); + + try (StubServer server = StubServer.start(allRequests)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) .reconnectTimeMs(10) .build()) { es.start(); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "first"), - handler.log.take()); + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); - assertEquals(LogItem.closed(), handler.log.take()); + streamHandler1.interrupt(); // make first stream fail + + assertEquals(LogItem.closed(), eventSink.log.take()); assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), - handler.log.take()); + eventSink.log.take()); - assertEquals(LogItem.opened(), handler.log.take()); + assertEquals(LogItem.opened(), eventSink.log.take()); assertEquals(LogItem.event("message", "second"), - handler.log.take()); + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); } - assertEquals(LogItem.closed(), handler.log.take()); + assertEquals(LogItem.closed(), eventSink.log.take()); } } - - private MockResponse createEventsResponse(String body, SocketPolicy socketPolicy) { - return new MockResponse() - .setHeader("Content-Type", "text/event-stream") - .setChunkedBody(body, CHUNK_SIZE) - .setSocketPolicy(socketPolicy); + + @Test + public void streamDoesNotReconnectIfConnectionErrorHandlerSaysToStop() throws Exception { + final AtomicBoolean calledHandler = new AtomicBoolean(false); + final AtomicReference receivedError = new AtomicReference(); + + ConnectionErrorHandler connectionErrorHandler = new ConnectionErrorHandler() { + public Action onConnectionError(Throwable t) { + calledHandler.set(true); + receivedError.set(t); + return Action.SHUTDOWN; + } + }; + + TestHandler eventSink = new TestHandler(); + + try (StubServer server = StubServer.start(returnStatus(500))) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .connectionErrorHandler(connectionErrorHandler) + .reconnectTimeMs(10) + .build()) { + es.start(); + + // If a ConnectionErrorHandler returns SHUTDOWN, EventSource does not call onClosed() or onError() + // on the regular event handler, since it assumes that the caller already knows what happened. + // Therefore we don't expect to see any items in eventSink. + eventSink.assertNoMoreLogItems(); + + assertEquals(ReadyState.SHUTDOWN, es.getState()); + } + } + + assertTrue(calledHandler.get()); + assertNotNull(receivedError.get()); + assertEquals(UnsuccessfulResponseException.class, receivedError.get().getClass()); } - private MockResponse createErrorResponse(int status) { - return new MockResponse().setResponseCode(500); + @Test + public void canForceEventSourceToRestart() throws Exception { + String body1 = "data: first\n\n"; + String body2 = "data: second\n\n"; + + TestHandler eventSink = new TestHandler(); + + StubServer.Handler streamHandler1 = stream(CONTENT_TYPE, streamProducerFromString(body1, true)); + StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true)); + StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2); + + try (StubServer server = StubServer.start(allRequests)) { + try (EventSource es = new EventSource.Builder(eventSink, server.getUri()) + .reconnectTimeMs(10) + .build()) { + es.start(); + + assertEquals(LogItem.opened(), eventSink.log.take()); + + assertEquals(LogItem.event("message", "first"), + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); + + es.restart(); + + assertEquals(LogItem.closed(), eventSink.log.take()); // there shouldn't be any error notification, just "closed" + + assertEquals(LogItem.opened(), eventSink.log.take()); + + assertEquals(LogItem.event("message", "second"), + eventSink.log.take()); + + eventSink.assertNoMoreLogItems(); + } + + assertEquals(LogItem.closed(), eventSink.log.take()); + } } static class LogItem { @@ -250,5 +357,11 @@ public void onComment(String comment) throws Exception { public void onClosed() throws Exception { log.add(LogItem.closed()); } + + void assertNoMoreLogItems() { + try { + assertNull(log.poll(100, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) {} + } } } diff --git a/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java b/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java deleted file mode 100644 index 0da1f26..0000000 --- a/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.launchdarkly.eventsource; - -import java.net.URI; - -import org.junit.Ignore; -import org.junit.Test; - -@Ignore -public class ManualConnectionErrorTest { - EventSource source; - - EventHandler handler = new EventHandler() { - public void onOpen() throws Exception { - } - - @Override - public void onClosed() throws Exception { - } - - public void onMessage(String event, MessageEvent messageEvent) throws Exception { - } - - public void onError(Throwable t) { - System.out.println("async handler got error: " + t); - } - - public void onComment(String comment) { - } - }; - - @Test - public void testConnectionIsRetriedAfter404() throws Exception { - // Expected output: multiple connection retries, and "async handler got error" each time. - - source = new EventSource.Builder(handler, URI.create("http://launchdarkly.com/bad-url")) - .reconnectTimeMs(10) - .build(); - - source.start(); - Thread.sleep(100000L); - } - - @Test - public void testConnectionIsNotRetriedAfter404IfErrorHandlerSaysToStop() throws Exception { - // Expected output: "connection handler got error ... 404", followed by "Connection has been explicitly - // shut down by error handler" and no further connection retries. - - ConnectionErrorHandler connectionErrorHandler = new ConnectionErrorHandler() { - public Action onConnectionError(Throwable t) { - System.out.println("connection handler got error: " + t); - if (t instanceof UnsuccessfulResponseException && - ((UnsuccessfulResponseException) t).getCode() == 404) { - return Action.SHUTDOWN; - } - return Action.PROCEED; - } - }; - - source = new EventSource.Builder(handler, URI.create("http://launchdarkly.com/bad-url")) - .connectionErrorHandler(connectionErrorHandler) - .reconnectTimeMs(10) - .build(); - - source.start(); - Thread.sleep(100000L); - } -} diff --git a/src/test/java/com/launchdarkly/eventsource/ManualTest.java b/src/test/java/com/launchdarkly/eventsource/ManualTest.java deleted file mode 100644 index aa6b41a..0000000 --- a/src/test/java/com/launchdarkly/eventsource/ManualTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.launchdarkly.eventsource; - -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; - -@Ignore -public class ManualTest { - private static final Logger logger = LoggerFactory.getLogger(ManualTest.class); - - @Test - public void manualTest() throws InterruptedException { - EventHandler handler = new EventHandler() { - public void onOpen() throws Exception { - logger.info("open"); - } - - @Override - public void onClosed() throws Exception { - - } - - public void onMessage(String event, MessageEvent messageEvent) throws Exception { - logger.info(event + ": " + messageEvent.getData()); - } - - public void onError(Throwable t) { - logger.error("Error: " + t); - } - - public void onComment(String comment) { - logger.info("comment: " + comment); - } - }; - EventSource source = new EventSource.Builder(handler, URI.create("http://localhost:8080/events/")).build(); - source.start(); - logger.warn("Sleeping..."); - Thread.sleep(10000L); - logger.debug("Stopping source"); - source.close(); - logger.debug("Stopped"); - } -} diff --git a/src/test/java/com/launchdarkly/eventsource/ModernTLSSocketFactoryTest.java b/src/test/java/com/launchdarkly/eventsource/ModernTLSSocketFactoryTest.java index d486115..9bd745c 100644 --- a/src/test/java/com/launchdarkly/eventsource/ModernTLSSocketFactoryTest.java +++ b/src/test/java/com/launchdarkly/eventsource/ModernTLSSocketFactoryTest.java @@ -3,11 +3,14 @@ import org.junit.Test; import javax.net.ssl.SSLSocket; -import java.net.Socket; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@SuppressWarnings("javadoc") public class ModernTLSSocketFactoryTest { @Test diff --git a/src/test/java/com/launchdarkly/eventsource/StubServer.java b/src/test/java/com/launchdarkly/eventsource/StubServer.java new file mode 100644 index 0000000..6b4874c --- /dev/null +++ b/src/test/java/com/launchdarkly/eventsource/StubServer.java @@ -0,0 +1,418 @@ +package com.launchdarkly.eventsource; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * A simple Jetty-based test framework for verifying end-to-end HTTP behavior. + *

+ * Previous versions of the library used okhttp's MockWebServer for end-to-end tests, but MockWebServer + * does not support actual streaming responses so the tests could not control when the stream got + * disconnected. + */ +@SuppressWarnings("javadoc") +public class StubServer implements Closeable { + private final Server server; + private final BlockingQueue requests = new LinkedBlockingQueue<>(); + + /** + * Starts an HTTP server that uses the specified handler. + * + * @param handler a {@link Handler} implementation + * @return a started server + */ + public static StubServer start(Handler handler) { + return new StubServer(handler); + } + + private StubServer(final Handler handler) { + server = new Server(0); + + server.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + RequestInfo requestInfo = new RequestInfo(request); + requests.add(requestInfo); + handler.handle(request, response); + baseRequest.setHandled(true); + } + }); + server.setStopTimeout(1); // without this, Jetty does not interrupt worker threads on shutdown + + try { + server.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Shuts down the server. + *

+ * All active request handler threads will be interrupted. + */ + @Override + public void close() { + try { + server.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Returns the server's base URI. + * + * @return the base URI + */ + public URI getUri() { + return server.getURI(); + } + + /** + * Returns the next queued request, blocking until one is available. + * + * @return the request information + */ + public RequestInfo awaitRequest() { + try { + return requests.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Returns the next queued request, or null if none is available within the specified timeout. + * + * @param timeout the maximum time to wait + * @return the request information or null + */ + public RequestInfo awaitRequest(Duration timeout) { + try { + return requests.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } + } + + /** + * Interface for StubServer's simplified request handler mechanism. + */ + public static interface Handler { + /** + * Handle a request. + * + * @param request the request + * @param response the response + */ + public void handle(HttpServletRequest request, HttpServletResponse response); + } + + /** + * The properties of a received request. + *

+ * Note that this fully reads the request body, so request handlers cannot make use of the body. + */ + public static class RequestInfo { + private final String path; + private final Map> headers; + private final String body; + + public RequestInfo(HttpServletRequest request) { + this.path = request.getRequestURI(); + + headers = new HashMap<>(); + Enumeration headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String name = headerNames.nextElement(); + List valuesOut = new ArrayList<>(); + Enumeration values = request.getHeaders(name); + while (values.hasMoreElements()) { + valuesOut.add(values.nextElement()); + } + headers.put(name, valuesOut); + } + + StringBuilder s = new StringBuilder(); + try { + try (BufferedReader reader = request.getReader()) { + char[] buf = new char[1000]; + int count = -1; + while ((count = reader.read(buf)) > 0) { + s.append(buf, 0, count); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + body = s.toString(); + } + + public String getPath() { + return path; + } + + public String getHeader(String name) { + List headers = getHeaders(name); + return headers == null || headers.isEmpty() ? null : headers.get(0); + } + + public List getHeaders(String name) { + return headers.get(name); + } + + public Map> getHeaders() { + return headers; + } + + public String getBody() { + return body; + } + } + + /** + * Interface for use with {@link Handlers#interruptible}. + */ + public static interface InterruptibleHandler extends Handler { + /** + * Causes the handler to stop what it is doing and terminate the response as soon as possible. + */ + void interrupt(); + } + + /** + * Factory methods for StubServer handlers. + */ + public static abstract class Handlers { + /** + * Provides a handler that returns the specified HTTP status, with no content. + * + * @param status the status code + * @return the handler + */ + public static Handler returnStatus(final int status) { + return new Handler() { + public void handle(HttpServletRequest req, HttpServletResponse resp) { + resp.setStatus(status); + } + }; + } + + /** + * Provides a handler that delegates to a series of handlers for each request, in the order given. + * If there are more requests than the number of handlers, the last handler is used for the rest. + * + * @param firstHandler the first handler + * @param moreHandlers additional handlers + * @return the delegating handler + */ + public static Handler forRequestsInSequence(final Handler firstHandler, final Handler... moreHandlers) { + final AtomicInteger counter = new AtomicInteger(0); + return new Handler() { + public void handle(HttpServletRequest req, HttpServletResponse resp) { + int i = counter.getAndIncrement(); + Handler h = i == 0 ? firstHandler : + (i >= moreHandlers.length ? moreHandlers[moreHandlers.length - 1] : moreHandlers[i - 1]); + h.handle(req, resp); + } + }; + } + + /** + * Provides a handler that does not send a response, but does not close the socket. + * + * @return the handler + */ + public static Handler hang() { + return new Handler() { + public void handle(HttpServletRequest request, HttpServletResponse response) { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + } + }; + } + + /** + * Provides the ability to interrupt the worker thread for a handler at any time. This can be used + * to terminate a stream. If multiple requests are in progress for the same handler, interrupting it + * causes all of them to terminate. + * + * @param realHandler the handler to delegate to + * @return a wrapper handler that provides an {@link InterruptibleHandler#interrupt()} method + */ + public static InterruptibleHandler interruptible(final Handler realHandler) { + final AtomicBoolean interrupted = new AtomicBoolean(false); + final Object interruptLock = new Object(); + + return new InterruptibleHandler() { + @Override + public void handle(HttpServletRequest request, HttpServletResponse response) { + final Thread writerThread = Thread.currentThread(); + Thread interrupterThread = new Thread(new Runnable() { + public void run() { + while (true) { + synchronized (interruptLock) { + if (interrupted.get()) { + break; + } + try { + interruptLock.wait(); + } + catch (InterruptedException e) { + return; + } + } + } + writerThread.interrupt(); + } + }); + interrupterThread.start(); + realHandler.handle(request, response); + } + + @Override + public void interrupt() { + synchronized (interruptLock) { + interrupted.set(true); + interruptLock.notifyAll(); + } + } + }; + } + + /** + * Interface for use with {@link Handlers#stream(String, StreamProducer)}. + */ + public static interface StreamProducer { + /** + * Pushes chunks of response data onto a queue. Another worker thread will dequeue and send the chunks. + * + * @param chunks the queue for chunks of stream data + * @return true if the stream should be left open indefinitely afterward, false to close it + */ + boolean writeStream(BlockingQueue chunks); + } + + /** + * Provides a handler that streams a chunked response. + * + * @param contentType value for the Content-Type header + * @param streamProducer a {@link StreamProducer} that will provide the response + * @return the handler + */ + public static Handler stream(final String contentType, final StreamProducer streamProducer) { + return new Handler() { + public void handle(HttpServletRequest req, HttpServletResponse resp) { + resp.setStatus(200); + resp.setHeader("Content-Type", contentType); + resp.setHeader("Transfer-Encoding", "chunked"); + try { + resp.flushBuffer(); + PrintWriter w = resp.getWriter(); + final BlockingQueue chunks = new LinkedBlockingQueue<>(); + final String terminator = "***EOF***"; // value doesn't matter, we're checking for reference equality + Thread producerThread = new Thread(new Runnable() { + public void run() { + boolean leaveOpen = streamProducer.writeStream(chunks); + if (leaveOpen) { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + } + chunks.add(terminator); + } + }); + producerThread.start(); + while (true) { + try { + String chunk = chunks.take(); + if (chunk == terminator) { + break; + } + w.write(chunk); + w.flush(); + } catch (InterruptedException e) { + break; + } + } + producerThread.interrupt(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + }; + } + + /** + * Provides content for {@link #stream(String, StreamProducer)} that is a single chunk of data. + * + * @param body the response body + * @param leaveOpen true to leave the stream open after sending this data, false to close it + * @return the stream producer + */ + public static StreamProducer streamProducerFromString(final String body, final boolean leaveOpen) { + return streamProducerFromChunkedString(body, body.length(), Duration.ZERO, leaveOpen); + } + + /** + * Provides content for {@link #stream(String, StreamProducer)} that is a string broken up into + * multiple chunks of equal size. + * + * @param body the response body + * @param chunkSize the number of characters per chunk + * @param chunkDelay how long to wait between chunks + * @param leaveOpen true to leave the stream open after sending this data, false to close it + * @return the stream producer + */ + public static StreamProducer streamProducerFromChunkedString(final String body, final int chunkSize, + final Duration chunkDelay, final boolean leaveOpen) { + return new StreamProducer() { + public boolean writeStream(BlockingQueue chunks) { + for (int p = 0; p < body.length(); p += chunkSize) { + String chunk = body.substring(p, Math.min(p + chunkSize, body.length())); + chunks.add(chunk); + try { + Thread.sleep(chunkDelay.toMillis()); + } catch (InterruptedException e) { + break; + } + } + return leaveOpen; + } + }; + } + } +} From 989cc9d90604dc008d6c2f78026676dce8865c0c Mon Sep 17 00:00:00 2001 From: LaunchDarklyCI Date: Mon, 30 Mar 2020 20:38:03 +0000 Subject: [PATCH 2/3] Releasing version 1.11.0 --- CHANGELOG.md | 4 ++++ gradle.properties | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27c41a9..3811e55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to the LaunchDarkly EventSource implementation for Java will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [1.11.0] - 2020-03-30 +### Added: +- New `EventSource` method `restart()` allows the caller to force a stream connection retry even if no I/O error has happened, using the same backoff behavior that would be used for errors. + ## [1.10.2] - 2020-03-20 ### Changed: - Updated OkHttp version to 3.12.10 (the latest version that still supports Java 7). diff --git a/gradle.properties b/gradle.properties index 1c26890..00c8ba9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.10.2 +version=1.11.0 ossrhUsername= ossrhPassword= From e095fab076d54998ebf102d11baab53d0e207753 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 6 Apr 2020 11:18:02 -0700 Subject: [PATCH 3/3] Gradle release fixes --- build.gradle | 5 +---- gradle.properties | 3 +++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build.gradle b/build.gradle index 3dd5223..d55f9c2 100644 --- a/build.gradle +++ b/build.gradle @@ -4,9 +4,6 @@ buildscript { mavenCentral() mavenLocal() } - dependencies { - classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.8.0" - } } plugins { @@ -14,7 +11,7 @@ plugins { id "java-library" id "signing" id "maven-publish" - id "de.marcphilipp.nexus-publish" version "0.3.0" + id "de.marcphilipp.nexus-publish" version "0.4.0" id "io.codearte.nexus-staging" version "0.21.2" id "idea" } diff --git a/gradle.properties b/gradle.properties index 00c8ba9..c9b8e91 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,6 @@ version=1.11.0 ossrhUsername= ossrhPassword= + +# See https://github.com/gradle/gradle/issues/11308 regarding the following property +systemProp.org.gradle.internal.publish.checksums.insecure=true