diff --git a/CHANGELOG.md b/CHANGELOG.md
index d6fa3ef..a85d0c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,12 +17,19 @@ All notable changes to the LaunchDarkly EventSource implementation for Java will
### Removed:
- In `EventSource`: `setHttpUrl`, `setLastEventId`, `setMaxReconnectTime`, `setReconnectionTime`, `setUri` (these can only be set in the builder).
+## [1.11.0] - 2020-03-30
+### Added:
+- New `EventSource` method `restart()` allows the caller to force a stream connection retry even if no I/O error has happened, using the same backoff behavior that would be used for errors.
+
+## [1.10.2] - 2020-03-20
+### Changed:
+- Updated OkHttp version to 3.12.10 (the latest version that still supports Java 7).
+
## [1.10.1] - 2019-10-17
### Fixed:
- Fixed trailing period in logger name. ([#34](https://github.com/launchdarkly/okhttp-eventsource/issues/34))
- If you provide your own value for the `Accept` header using `EventSource.Builder.headers()`, it should not _also_ send the default `Accept: text/event-stream`, but replace it. ([#38](https://github.com/launchdarkly/okhttp-eventsource/issues/38))
-
## [1.10.0] - 2019-08-01
### Added:
- `EventSource.Builder.clientBuilderActions()` allows you to modify the OkHttp client options in any way, such as customizing TLS behavior or any other methods supported by `OkHttpClient.Builder`.
diff --git a/build.gradle b/build.gradle
index cd4719d..ba78af2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -11,7 +11,7 @@ plugins {
id "java-library"
id "signing"
id "maven-publish"
- id "de.marcphilipp.nexus-publish" version "0.3.0"
+ id "de.marcphilipp.nexus-publish" version "0.4.0"
id "io.codearte.nexus-staging" version "0.21.2"
id "org.ajoberstar.git-publish" version "2.1.3"
id "idea"
@@ -27,27 +27,29 @@ repositories {
allprojects {
group = 'com.launchdarkly'
version = "${version}"
+ archivesBaseName = 'okhttp-eventsource'
sourceCompatibility = 1.8
targetCompatibility = 1.8
}
-dependencies {
- compile "com.squareup.okhttp3:okhttp:4.3.1"
- compile "org.slf4j:slf4j-api:1.7.22"
- testRuntime "ch.qos.logback:logback-classic:1.1.9"
- testCompile "org.mockito:mockito-core:1.10.19"
- testCompile "com.squareup.okhttp3:mockwebserver:4.3.1"
- testCompile "junit:junit:4.11"
-}
+ext.versions = [
+ "okhttp": "4.5.0",
+ "slf4j": "1.7.22"
+]
-task wrapper(type: Wrapper) {
- gradleVersion = '4.10.2'
+dependencies {
+ api "com.squareup.okhttp3:okhttp:${versions.okhttp}"
+ api "org.slf4j:slf4j-api:${versions.slf4j}"
+ testImplementation "ch.qos.logback:logback-classic:1.1.9"
+ testImplementation "org.mockito:mockito-core:1.10.19"
+ testImplementation "org.eclipse.jetty:jetty-server:9.4.27.v20200227"
+ testImplementation "junit:junit:4.12"
+ testImplementation "org.hamcrest:hamcrest-all:1.3"
}
jar {
- baseName = 'okhttp-eventsource'
manifest {
- attributes("Implementation-Version": version)
+ attributes("Implementation-Version": project.version)
}
}
@@ -61,6 +63,17 @@ task javadocJar(type: Jar, dependsOn: javadoc) {
from javadoc.destinationDir
}
+javadoc {
+ // Force the Javadoc build to fail if there are any Javadoc warnings. See: https://discuss.gradle.org/t/javadoc-fail-on-warning/18141/3
+ // The '-quiet' as second argument is actually a hack,
+ // since the one parameter addStringOption doesn't seem to
+ // work, we extra add '-quiet', which is added anyway by
+ // gradle. See https://github.com/gradle/gradle/issues/2354
+ // See JDK-8200363 (https://bugs.openjdk.java.net/browse/JDK-8200363)
+ // for information about the -Xwerror option.
+ options.addStringOption('Xwerror', '-quiet')
+}
+
test {
testLogging {
events "passed", "skipped", "failed", "standardOut", "standardError"
@@ -111,7 +124,7 @@ publishing {
scm {
connection = 'scm:git:git://github.com/launchdarkly/okhttp-eventsource.git'
developerConnection = 'scm:git:ssh:git@github.com:launchdarkly/okhttp-eventsource.git'
- url = 'http://example.com/my-library/'
+ url = 'https://github.com/launchdarkly/okhttp-eventsource'
}
}
}
diff --git a/gradle.properties b/gradle.properties
index c42a05d..d717c9a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,3 +1,6 @@
version=2.0.1
ossrhUsername=
ossrhPassword=
+
+# See https://github.com/gradle/gradle/issues/11308 regarding the following property
+systemProp.org.gradle.internal.publish.checksums.insecure=true
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index e0b3fb8..a2bf131 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/src/main/java/com/launchdarkly/eventsource/EventHandler.java b/src/main/java/com/launchdarkly/eventsource/EventHandler.java
index 9d82192..2205fb6 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventHandler.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventHandler.java
@@ -12,6 +12,11 @@ public interface EventHandler {
/**
* EventSource calls this method when the stream connection has been closed.
+ *
+ * This method is not called if the connection was closed due to a {@link ConnectionErrorHandler}
+ * returning {@link ConnectionErrorHandler.Action#SHUTDOWN}; EventSource assumes that if you registered
+ * such a handler and made it return that value, then you already know that the connection is being closed.
+ *
* @throws Exception throwing an exception here will cause it to be logged and also sent to {@link #onError(Throwable)}
*/
void onClosed() throws Exception;
@@ -36,6 +41,11 @@ public interface EventHandler {
* an {@link UnsuccessfulResponseException} if the server returns an unexpected HTTP status),
* but only after the {@link ConnectionErrorHandler} (if any) has processed it. If you need to
* do anything that affects the state of the connection, use {@link ConnectionErrorHandler}.
+ *
+ * This method is not called if the error was already passed to a {@link ConnectionErrorHandler}
+ * which returned {@link ConnectionErrorHandler.Action#SHUTDOWN}; EventSource assumes that if you registered
+ * such a handler and made it return that value, then you do not want to handle the same error twice.
+ *
* @param t a {@code Throwable} object
*/
void onError(Throwable t);
diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java
index f8f4f96..766e9a9 100644
--- a/src/main/java/com/launchdarkly/eventsource/EventSource.java
+++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java
@@ -152,7 +152,27 @@ public void start() {
logger.info("Starting EventSource client using URI: " + url);
streamExecutor.execute(this::connect);
}
-
+
+ /**
+ * Drops the current stream connection (if any) and attempts to reconnect.
+ *
+ * This method returns immediately after dropping the current connection; the reconnection happens on
+ * a worker thread.
+ *
+ * If a connection attempt is already in progress but has not yet connected, or if {@link #close()} has
+ * previously been called, this method has no effect. If {@link #start()} has never been called, it is
+ * the same as calling {@link #start()}.
+ */
+ public void restart() {
+ ReadyState previousState = readyState.getAndUpdate(t -> t == ReadyState.OPEN ? ReadyState.CLOSED : t);
+ if (previousState == OPEN) {
+ closeCurrentStream(previousState);
+ } else if (previousState == RAW || previousState == CONNECTING) {
+ start();
+ }
+ // if already shutdown or in the process of closing, do nothing
+ }
+
/**
* Returns an enum indicating the current status of the connection.
* @return a {@link ReadyState} value
@@ -161,6 +181,9 @@ public ReadyState getState() {
return readyState.get();
}
+ /**
+ * Drops the current stream connection (if any) and permanently shuts down the EventSource.
+ */
@Override
public void close() {
ReadyState currentState = readyState.getAndSet(SHUTDOWN);
@@ -168,21 +191,8 @@ public void close() {
if (currentState == SHUTDOWN) {
return;
}
- if (currentState == ReadyState.OPEN) {
- try {
- handler.onClosed();
- } catch (Exception e) {
- handler.onError(e);
- }
- }
-
- if (call != null) {
- // The call.cancel() must precede the bufferedSource.close().
- // Otherwise, an IllegalArgumentException "Unbalanced enter/exit" error is thrown by okhttp.
- // https://github.com/google/ExoPlayer/issues/1348
- call.cancel();
- logger.debug("call cancelled");
- }
+
+ closeCurrentStream(currentState);
eventExecutor.shutdownNow();
streamExecutor.shutdownNow();
@@ -200,6 +210,24 @@ public void close() {
}
}
+ private void closeCurrentStream(ReadyState previousState) {
+ if (previousState == ReadyState.OPEN) {
+ try {
+ handler.onClosed();
+ } catch (Exception e) {
+ handler.onError(e);
+ }
+ }
+
+ if (call != null) {
+ // The call.cancel() must precede the bufferedSource.close().
+ // Otherwise, an IllegalArgumentException "Unbalanced enter/exit" error is thrown by okhttp.
+ // https://github.com/google/ExoPlayer/issues/1348
+ call.cancel();
+ logger.debug("call cancelled");
+ }
+ }
+
Request buildRequest() {
Request.Builder builder = new Request.Builder()
.headers(headers)
@@ -271,11 +299,14 @@ public void setLastEventId(String lastEventId) {
} catch (EOFException eofe) {
logger.warn("Connection unexpectedly closed.");
} catch (IOException ioe) {
- if (readyState.get() != SHUTDOWN) {
+ ReadyState state = readyState.get();
+ if (state == SHUTDOWN) {
+ errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN;
+ } else if (state == CLOSED) { // this happens if it's being restarted
+ errorHandlerAction = ConnectionErrorHandler.Action.PROCEED;
+ } else {
logger.debug("Connection problem.", ioe);
errorHandlerAction = dispatchError(ioe);
- } else {
- errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN;
}
} finally {
ReadyState nextState = CLOSED;
@@ -307,6 +338,7 @@ public void setLastEventId(String lastEventId) {
handler.onError(e);
}
}
+
// Reset the backoff if we had a successful connection that stayed good for at least
// backoffResetThresholdMs milliseconds.
if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) {
diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java
new file mode 100644
index 0000000..d020c83
--- /dev/null
+++ b/src/test/java/com/launchdarkly/eventsource/EventSourceBuilderTest.java
@@ -0,0 +1,225 @@
+package com.launchdarkly.eventsource;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.Proxy;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static com.launchdarkly.eventsource.EventSource.DEFAULT_CONNECT_TIMEOUT;
+import static com.launchdarkly.eventsource.EventSource.DEFAULT_MAX_RECONNECT_TIME;
+import static com.launchdarkly.eventsource.EventSource.DEFAULT_READ_TIMEOUT;
+import static com.launchdarkly.eventsource.EventSource.DEFAULT_WRITE_TIMEOUT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import okhttp3.Headers;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.Buffer;
+
+@SuppressWarnings("javadoc")
+public class EventSourceBuilderTest {
+ private static final URI STREAM_URI = URI.create("http://www.example.com/");
+ private static final HttpUrl STREAM_HTTP_URL = HttpUrl.parse("http://www.example.com/");
+ private EventSource.Builder builder;
+ private EventHandler mockHandler;
+
+ @Before
+ public void setUp() {
+ mockHandler = mock(EventHandler.class);
+ builder = new EventSource.Builder(mockHandler, STREAM_URI);
+ }
+
+ @Test
+ public void hasExpectedUri() {
+ try (EventSource es = builder.build()) {
+ assertEquals(STREAM_URI, es.getUri());
+ }
+ }
+
+ @Test
+ public void hasExpectedUriWhenInitializedWithHttpUrl() {
+ try (EventSource es = new EventSource.Builder(mockHandler, STREAM_HTTP_URL).build()) {
+ assertEquals(STREAM_URI, es.getUri());
+ }
+ }
+
+ @Test
+ public void hasExpectedHttpUrlWhenInitializedWithUri() {
+ try (EventSource es = builder.build()) {
+ assertEquals(STREAM_HTTP_URL, es.getHttpUrl());
+ }
+ }
+
+ @Test
+ public void hasExpectedHttpUrlWhenInitializedWithHttpUrl() {
+ try (EventSource es = new EventSource.Builder(mockHandler, STREAM_HTTP_URL).build()) {
+ assertEquals(STREAM_HTTP_URL, es.getHttpUrl());
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void handlerCannotBeNull() {
+ new EventSource.Builder(null, STREAM_URI);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void uriCannotBeNull() {
+ new EventSource.Builder(mock(EventHandler.class), (URI)null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void httpUrlCannotBeNull() {
+ new EventSource.Builder(mock(EventHandler.class), (HttpUrl)null);
+ }
+
+ @Test
+ public void respectsDefaultMaximumBackoffTime() {
+ builder.reconnectTime(DEFAULT_MAX_RECONNECT_TIME.minus(Duration.ofMillis(1)));
+ try (EventSource es = builder.build()) {
+ assertThat(es.backoffWithJitter(100), lessThanOrEqualTo(EventSource.DEFAULT_MAX_RECONNECT_TIME));
+ }
+ }
+
+ @Test
+ public void respectsCustomMaximumBackoffTime() {
+ builder.reconnectTime(Duration.ofMillis(2000)).maxReconnectTime(Duration.ofMillis(5000));
+ try (EventSource es = builder.build()) {
+ assertThat(es.backoffWithJitter(100), lessThanOrEqualTo(Duration.ofMillis(5000)));
+ }
+ }
+
+ @Test
+ public void defaultClient() {
+ builder.build();
+ OkHttpClient client = builder.getClientBuilder().build();
+ assertEquals(DEFAULT_CONNECT_TIMEOUT.toMillis(), client.connectTimeoutMillis());
+ assertEquals(DEFAULT_READ_TIMEOUT.toMillis(), client.readTimeoutMillis());
+ assertEquals(DEFAULT_WRITE_TIMEOUT.toMillis(), client.writeTimeoutMillis());
+ assertNull(client.proxy());
+ }
+
+ @Test
+ public void defaultClientWithProxyHostAndPort() {
+ String proxyHost = "http://proxy.example.com";
+ int proxyPort = 8080;
+ builder.proxy(proxyHost, proxyPort);
+ builder.build();
+ OkHttpClient client = builder.getClientBuilder().build();
+
+ assertEquals(DEFAULT_CONNECT_TIMEOUT.toMillis(), client.connectTimeoutMillis());
+ assertEquals(DEFAULT_READ_TIMEOUT.toMillis(), client.readTimeoutMillis());
+ assertEquals(DEFAULT_WRITE_TIMEOUT.toMillis(), client.writeTimeoutMillis());
+ Assert.assertNotNull(client.proxy());
+ assertEquals(proxyHost + ":" + proxyPort, client.proxy().address().toString());
+ }
+
+ @Test
+ public void defaultClientWithProxy() {
+ Proxy proxy = mock(java.net.Proxy.class);
+ builder.proxy(proxy);
+ builder.build();
+ OkHttpClient client = builder.getClientBuilder().build();
+
+ assertEquals(DEFAULT_CONNECT_TIMEOUT.toMillis(), client.connectTimeoutMillis());
+ assertEquals(DEFAULT_READ_TIMEOUT.toMillis(), client.readTimeoutMillis());
+ assertEquals(DEFAULT_WRITE_TIMEOUT.toMillis(), client.writeTimeoutMillis());
+ assertEquals(proxy, client.proxy());
+ }
+
+ @Test
+ public void defaultClientWithCustomTimeouts() {
+ int connectTimeout = 100;
+ int readTimeout = 1000;
+ int writeTimeout = 10000;
+ builder.connectTimeout(Duration.ofMillis(connectTimeout));
+ builder.readTimeout(Duration.ofMillis(readTimeout));
+ builder.writeTimeout(Duration.ofMillis(writeTimeout));
+ builder.build();
+ OkHttpClient client = builder.getClientBuilder().build();
+
+ assertEquals(connectTimeout, client.connectTimeoutMillis());
+ assertEquals(readTimeout, client.readTimeoutMillis());
+ assertEquals(writeTimeout, client.writeTimeoutMillis());
+ }
+
+ @Test
+ public void customBuilderActions() {
+ final int writeTimeout = 9999;
+ builder.clientBuilderActions(new EventSource.Builder.ClientConfigurer() {
+ public void configure(Builder b) {
+ b.writeTimeout(writeTimeout, TimeUnit.MILLISECONDS);
+ }
+ });
+ OkHttpClient client = builder.getClientBuilder().build();
+
+ assertEquals(writeTimeout, client.writeTimeoutMillis());
+ }
+
+ @Test
+ public void customMethod() throws IOException {
+ builder.method("report");
+ builder.body(RequestBody.create("hello world", MediaType.parse("text/plain; charset=utf-8")));
+ Request req = builder.build().buildRequest();
+ assertEquals("REPORT", req.method());
+ assertEquals(MediaType.parse("text/plain; charset=utf-8"), req.body().contentType());
+ Buffer actualBody = new Buffer();
+ req.body().writeTo(actualBody);
+ assertEquals("hello world", actualBody.readString(Charset.forName("utf-8")));
+
+ // ensure we can build multiple requests:
+ req = builder.build().buildRequest();
+ assertEquals("REPORT", req.method());
+ assertEquals(MediaType.parse("text/plain; charset=utf-8"), req.body().contentType());
+ actualBody = new Buffer();
+ req.body().writeTo(actualBody);
+ assertEquals("hello world", actualBody.readString(Charset.forName("utf-8")));
+ }
+
+ @Test
+ public void defaultMethod() {
+ Request req = builder.build().buildRequest();
+ assertEquals("GET", req.method());
+ assertEquals(null, req.body());
+ }
+
+ @Test
+ public void customHeaders() throws IOException {
+ Headers headers = new Headers.Builder()
+ .add("header1", "value1").add("header1", "value2")
+ .add("header2", "value1")
+ .build();
+ builder.headers(headers);
+ Request req = builder.build().buildRequest();
+ assertEquals(Arrays.asList("value1", "value2"), req.headers().values("header1"));
+ assertEquals(Arrays.asList("value1"), req.headers().values("header2"));
+ assertEquals(Arrays.asList("text/event-stream"), req.headers().values("Accept"));
+ assertEquals(Arrays.asList("no-cache"), req.headers().values("Cache-Control"));
+ }
+
+ @Test
+ public void customHeadersOverwritingDefaults() throws IOException {
+ Headers headers = new Headers.Builder()
+ .add("Accept", "text/plain")
+ .add("header2", "value1")
+ .build();
+ builder.headers(headers);
+ Request req = builder.build().buildRequest();
+ assertEquals(Arrays.asList("text/plain"), req.headers().values("Accept"));
+ assertEquals(Arrays.asList("value1"), req.headers().values("header2"));
+ }
+}
diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java
index ea5b3df..3d63c64 100644
--- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java
+++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java
@@ -5,20 +5,121 @@
import org.junit.Test;
+import java.io.IOException;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import static com.launchdarkly.eventsource.Stubs.createErrorResponse;
-import static com.launchdarkly.eventsource.Stubs.createEventsResponse;
+import static com.launchdarkly.eventsource.StubServer.Handlers.forRequestsInSequence;
+import static com.launchdarkly.eventsource.StubServer.Handlers.hang;
+import static com.launchdarkly.eventsource.StubServer.Handlers.interruptible;
+import static com.launchdarkly.eventsource.StubServer.Handlers.returnStatus;
+import static com.launchdarkly.eventsource.StubServer.Handlers.stream;
+import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromChunkedString;
+import static com.launchdarkly.eventsource.StubServer.Handlers.streamProducerFromString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
-import okhttp3.mockwebserver.MockWebServer;
-import okhttp3.mockwebserver.SocketPolicy;
+import okhttp3.Headers;
+import okhttp3.MediaType;
+import okhttp3.RequestBody;
@SuppressWarnings("javadoc")
public class EventSourceHttpTest {
+ private static final String CONTENT_TYPE = "text/event-stream";
+
+ @Test
+ public void eventSourceSetsRequestProperties() throws Exception {
+ String requestPath = "/some/path";
+ Headers headers = new Headers.Builder().add("header1", "value1").add("header2", "value2").build();
+
+ try (StubServer server = StubServer.start(hang())) {
+ try (EventSource es = new EventSource.Builder(new TestHandler(), server.getUri().resolve(requestPath))
+ .headers(headers)
+ .build()) {
+ es.start();
+
+ StubServer.RequestInfo r = server.awaitRequest();
+ assertEquals("GET", r.getMethod());
+ assertEquals(requestPath, r.getPath());
+ assertEquals("value1", r.getHeader("header1"));
+ assertEquals("value2", r.getHeader("header2"));
+ assertEquals("text/event-stream", r.getHeader("Accept"));
+ assertEquals("no-cache", r.getHeader("Cache-Control"));
+ }
+ }
+ }
+
+ @Test
+ public void customMethodWithBody() throws IOException {
+ String content = "hello world";
+
+ try (StubServer server = StubServer.start(hang())) {
+ try (EventSource es = new EventSource.Builder(new TestHandler(), server.getUri())
+ .method("report")
+ .body(RequestBody.create("hello world", MediaType.parse("text/plain; charset=utf-8")))
+ .build()) {
+ es.start();
+
+ StubServer.RequestInfo r = server.awaitRequest();
+ assertEquals("REPORT", r.getMethod());
+ assertEquals(content, r.getBody());
+ }
+ }
+ }
+
+ @Test
+ public void configuredLastEventIdIsIncludedInHeaders() throws Exception {
+ String lastId = "123";
+
+ try (StubServer server = StubServer.start(hang())) {
+ try (EventSource es = new EventSource.Builder(new TestHandler(), server.getUri())
+ .lastEventId(lastId)
+ .build()) {
+ es.start();
+
+ StubServer.RequestInfo r = server.awaitRequest();
+ assertEquals(lastId, r.getHeader("Last-Event-Id"));
+ }
+ }
+ }
+
+ @Test
+ public void lastEventIdIsUpdatedFromEvent() throws Exception {
+ String newLastId = "099";
+ String eventType = "thing";
+ String eventData = "some-data";
+
+ final String body = "event: " + eventType + "\nid: " + newLastId + "\ndata: " + eventData + "\n\n";
+
+ TestHandler eventSink = new TestHandler();
+ StubServer.InterruptibleHandler streamHandler = interruptible(stream(CONTENT_TYPE, streamProducerFromString(body, true)));
+
+ try (StubServer server = StubServer.start(streamHandler)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
+ .reconnectTime(Duration.ofMillis(10))
+ .build()) {
+ es.start();
+
+ assertEquals(Stubs.LogItem.opened(), eventSink.log.take());
+ assertEquals(Stubs.LogItem.event(eventType, eventData, newLastId), eventSink.log.take());
+
+ StubServer.RequestInfo r0 = server.awaitRequest();
+ assertNull(r0.getHeader("Last-Event-Id"));
+
+ streamHandler.interrupt(); // force stream to reconnect
+
+ StubServer.RequestInfo r1 = server.awaitRequest();
+ assertEquals(newLastId, r1.getHeader("Last-Event-Id"));
+ }
+ }
+ }
+
@Test
public void eventSourceReadsChunkedResponse() throws Exception {
- String body = "data: data-by-itself\n\n" +
+ final String body = "data: data-by-itself\n\n" +
"event: event-with-data\n" +
"data: abc\n\n" +
": this is a comment\n" +
@@ -26,34 +127,33 @@ public void eventSourceReadsChunkedResponse() throws Exception {
"id: my-id\n" +
"data: abc\n" +
"data: def\n\n";
-
- TestHandler handler = new TestHandler();
-
- try (MockWebServer server = new MockWebServer()) {
- server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN));
- server.start();
-
- try (EventSource es = new EventSource.Builder(handler, server.url("/"))
- .build()) {
+
+ TestHandler eventSink = new TestHandler();
+ StubServer.Handler streamHandler = stream(CONTENT_TYPE,
+ streamProducerFromChunkedString(body, 5, Duration.ZERO, true));
+
+ try (StubServer server = StubServer.start(streamHandler)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri()).build()) {
es.start();
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec
- handler.log.take());
+ eventSink.log.take());
assertEquals(LogItem.event("event-with-data", "abc"),
- handler.log.take());
+ eventSink.log.take());
assertEquals(LogItem.comment("this is a comment"),
- handler.log.take());
+ eventSink.log.take());
assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"),
- handler.log.take());
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
}
-
- assertEquals(LogItem.closed(), handler.log.take());
}
+ assertEquals(LogItem.closed(), eventSink.log.take());
}
@Test
@@ -61,32 +161,37 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception {
String body1 = "data: first\n\n";
String body2 = "data: second\n\n";
- TestHandler handler = new TestHandler();
-
- try (MockWebServer server = new MockWebServer()) {
- server.enqueue(createEventsResponse(body1, SocketPolicy.DISCONNECT_AT_END));
- server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN));
- server.start();
-
- try (EventSource es = new EventSource.Builder(handler, server.url("/"))
+ TestHandler eventSink = new TestHandler();
+
+ StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, streamProducerFromString(body1, true)));
+ StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true));
+ StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2);
+
+ try (StubServer server = StubServer.start(allRequests)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
.reconnectTime(Duration.ofMillis(10))
.build()) {
es.start();
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "first"),
- handler.log.take());
+ eventSink.log.take());
- assertEquals(LogItem.closed(), handler.log.take());
+ eventSink.assertNoMoreLogItems(); // should not have closed first stream yet
+
+ streamHandler1.interrupt();
+
+ assertEquals(LogItem.closed(), eventSink.log.take());
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "second"),
- handler.log.take());
+ eventSink.log.take());
}
- assertEquals(LogItem.closed(), handler.log.take());
+ assertEquals(LogItem.closed(), eventSink.log.take());
+ eventSink.assertNoMoreLogItems();
}
}
@@ -94,28 +199,29 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception {
public void eventSourceReconnectsAfterErrorOnFirstRequest() throws Exception {
String body = "data: good\n\n";
- TestHandler handler = new TestHandler();
+ TestHandler eventSink = new TestHandler();
- try (MockWebServer server = new MockWebServer()) {
- server.enqueue(createErrorResponse(500));
- server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN));
- server.start();
-
- try (EventSource es = new EventSource.Builder(handler, server.url("/"))
+ StubServer.Handler streamHandler = stream(CONTENT_TYPE, streamProducerFromString(body, true));
+ StubServer.Handler allRequests = forRequestsInSequence(returnStatus(500), streamHandler);
+
+ try (StubServer server = StubServer.start(allRequests)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
.reconnectTime(Duration.ofMillis(10))
.build()) {
es.start();
assertEquals(LogItem.error(new UnsuccessfulResponseException(500)),
- handler.log.take());
+ eventSink.log.take());
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "good"),
- handler.log.take());
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
}
- assertEquals(LogItem.closed(), handler.log.take());
+ assertEquals(LogItem.closed(), eventSink.log.take());
}
}
@@ -124,36 +230,143 @@ public void eventSourceReconnectsAgainAfterErrorOnFirstReconnect() throws Except
String body1 = "data: first\n\n";
String body2 = "data: second\n\n";
- TestHandler handler = new TestHandler();
+ TestHandler eventSink = new TestHandler();
- try (MockWebServer server = new MockWebServer()) {
- server.enqueue(createEventsResponse(body1, SocketPolicy.DISCONNECT_AT_END));
- server.enqueue(createErrorResponse(500));
- server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN));
- server.start();
-
- try (EventSource es = new EventSource.Builder(handler, server.url("/"))
+ StubServer.InterruptibleHandler streamHandler1 = interruptible(stream(CONTENT_TYPE, streamProducerFromString(body1, true)));
+ StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true));
+ StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, returnStatus(500), streamHandler2);
+
+ try (StubServer server = StubServer.start(allRequests)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
.reconnectTime(Duration.ofMillis(10))
.build()) {
es.start();
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "first"),
- handler.log.take());
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
+
+ streamHandler1.interrupt(); // make first stream fail
- assertEquals(LogItem.closed(), handler.log.take());
+ assertEquals(LogItem.closed(), eventSink.log.take());
assertEquals(LogItem.error(new UnsuccessfulResponseException(500)),
- handler.log.take());
+ eventSink.log.take());
- assertEquals(LogItem.opened(), handler.log.take());
+ assertEquals(LogItem.opened(), eventSink.log.take());
assertEquals(LogItem.event("message", "second"),
- handler.log.take());
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
}
- assertEquals(LogItem.closed(), handler.log.take());
+ assertEquals(LogItem.closed(), eventSink.log.take());
+ }
+ }
+
+ @Test
+ public void streamDoesNotReconnectIfConnectionErrorHandlerSaysToStop() throws Exception {
+ final AtomicBoolean calledHandler = new AtomicBoolean(false);
+ final AtomicReference receivedError = new AtomicReference();
+
+ ConnectionErrorHandler connectionErrorHandler = new ConnectionErrorHandler() {
+ public Action onConnectionError(Throwable t) {
+ calledHandler.set(true);
+ receivedError.set(t);
+ return Action.SHUTDOWN;
+ }
+ };
+
+ TestHandler eventSink = new TestHandler();
+
+ try (StubServer server = StubServer.start(returnStatus(500))) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
+ .connectionErrorHandler(connectionErrorHandler)
+ .reconnectTime(Duration.ofMillis(10))
+ .build()) {
+ es.start();
+
+ // If a ConnectionErrorHandler returns SHUTDOWN, EventSource does not call onClosed() or onError()
+ // on the regular event handler, since it assumes that the caller already knows what happened.
+ // Therefore we don't expect to see any items in eventSink.
+ eventSink.assertNoMoreLogItems();
+
+ assertEquals(ReadyState.SHUTDOWN, es.getState());
+ }
+ }
+
+ assertTrue(calledHandler.get());
+ assertNotNull(receivedError.get());
+ assertEquals(UnsuccessfulResponseException.class, receivedError.get().getClass());
+ }
+
+ @Test
+ public void canForceEventSourceToRestart() throws Exception {
+ String body1 = "data: first\n\n";
+ String body2 = "data: second\n\n";
+
+ TestHandler eventSink = new TestHandler();
+
+ StubServer.Handler streamHandler1 = stream(CONTENT_TYPE, streamProducerFromString(body1, true));
+ StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString(body2, true));
+ StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2);
+
+ try (StubServer server = StubServer.start(allRequests)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
+ .reconnectTime(Duration.ofMillis(10))
+ .build()) {
+ es.start();
+
+ assertEquals(LogItem.opened(), eventSink.log.take());
+
+ assertEquals(LogItem.event("message", "first"),
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
+
+ es.restart();
+
+ assertEquals(LogItem.closed(), eventSink.log.take()); // there shouldn't be any error notification, just "closed"
+
+ assertEquals(LogItem.opened(), eventSink.log.take());
+
+ assertEquals(LogItem.event("message", "second"),
+ eventSink.log.take());
+
+ eventSink.assertNoMoreLogItems();
+ }
+
+ assertEquals(LogItem.closed(), eventSink.log.take());
+ }
+ }
+
+ @Test
+ public void newLastEventIdIsSentOnNextConnectAttempt() throws Exception {
+ String initialLastId = "123";
+ String newLastId = "099";
+ String body = "id: " + newLastId + "\ndata: first\n\n";
+
+ TestHandler eventSink = new TestHandler();
+
+ StubServer.Handler streamHandler1 = stream(CONTENT_TYPE, streamProducerFromString(body, false));
+ StubServer.Handler streamHandler2 = stream(CONTENT_TYPE, streamProducerFromString("", true));
+ StubServer.Handler allRequests = forRequestsInSequence(streamHandler1, streamHandler2);
+
+ try (StubServer server = StubServer.start(allRequests)) {
+ try (EventSource es = new EventSource.Builder(eventSink, server.getUri())
+ .lastEventId(initialLastId)
+ .build()) {
+ es.start();
+
+ StubServer.RequestInfo req0 = server.awaitRequest();
+ StubServer.RequestInfo req1 = server.awaitRequest();
+ assertEquals(initialLastId, req0.getHeader("Last-Event-ID"));
+ assertEquals(newLastId, req1.getHeader("Last-Event-ID"));
+ }
}
}
}
diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java
deleted file mode 100644
index 321058b..0000000
--- a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java
+++ /dev/null
@@ -1,307 +0,0 @@
-package com.launchdarkly.eventsource;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.Proxy;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import static com.launchdarkly.eventsource.Stubs.createEventsResponse;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-import okhttp3.Headers;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.OkHttpClient.Builder;
-import okhttp3.mockwebserver.MockWebServer;
-import okhttp3.mockwebserver.RecordedRequest;
-import okhttp3.mockwebserver.SocketPolicy;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okio.Buffer;
-
-@SuppressWarnings("javadoc")
-public class EventSourceTest {
- private static final URI STREAM_URI = URI.create("http://www.example.com/");
- private static final HttpUrl STREAM_HTTP_URL = HttpUrl.parse("http://www.example.com/");
- private EventSource.Builder builder;
-
- @Before
- public void setUp() {
- EventHandler eventHandler = mock(EventHandler.class);
- builder = new EventSource.Builder(eventHandler, STREAM_URI);
- }
-
- @Test
- public void hasExpectedUri() {
- try (EventSource eventSource = builder.build()) {
- assertEquals(STREAM_URI, eventSource.getUri());
- }
- }
-
- @Test
- public void hasExpectedUriWhenInitializedWithHttpUrl() {
- try (EventSource es = new EventSource.Builder(mock(EventHandler.class), STREAM_HTTP_URL).build()) {
- assertEquals(STREAM_URI, es.getUri());
- }
- }
-
- @Test
- public void hasExpectedHttpUrlWhenInitializedWithUri() {
- try (EventSource eventSource = builder.build()) {
- assertEquals(STREAM_HTTP_URL, eventSource.getHttpUrl());
- }
- }
-
- @Test
- public void hasExpectedHttpUrlWhenInitializedWithHttpUrl() {
- try (EventSource es = new EventSource.Builder(mock(EventHandler.class), STREAM_HTTP_URL).build()) {
- assertEquals(STREAM_HTTP_URL, es.getHttpUrl());
- }
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void handlerCannotBeNull() {
- new EventSource.Builder(null, STREAM_URI);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void uriCannotBeNull() {
- new EventSource.Builder(mock(EventHandler.class), (URI)null);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void httpUrlCannotBeNull() {
- new EventSource.Builder(mock(EventHandler.class), (HttpUrl)null);
- }
-
- @Test
- public void respectsDefaultMaximumBackoffTime() {
- builder.reconnectTime(Duration.ofMillis(2000));
- try (EventSource eventSource = builder.build()) {
- assertEquals(eventSource.backoffWithJitter(300).compareTo(EventSource.DEFAULT_MAX_RECONNECT_TIME), -1);
- }
- }
-
- @Test
- public void respectsCustomMaximumBackoffTime() {
- Duration max = Duration.ofMillis(5000);
- builder.reconnectTime(Duration.ofMillis(2000));
- builder.maxReconnectTime(max);
- try (EventSource eventSource = builder.build()) {
- assertEquals(eventSource.backoffWithJitter(300).compareTo(max), -1);
- }
- }
-
- @Test
- public void lastEventIdIsSetToConfiguredValue() throws Exception {
- String lastId = "123";
- builder.lastEventId(lastId);
- try (EventSource es = builder.build()) {
- assertEquals(lastId, es.getLastEventId());
- }
- }
-
- private OkHttpClient getHttpClientFromBuilder() {
- try (EventSource es = builder.build()) {} // ensures that the configuration is all copied into the client builder
- return builder.getClientBuilder().build();
- }
-
- @Test
- public void defaultClient() {
- OkHttpClient client = getHttpClientFromBuilder();
- assertEquals(EventSource.DEFAULT_CONNECT_TIMEOUT, Duration.ofMillis(client.connectTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_READ_TIMEOUT, Duration.ofMillis(client.readTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_WRITE_TIMEOUT, Duration.ofMillis(client.writeTimeoutMillis()));
- assertNull(client.proxy());
- }
-
- @Test
- public void defaultClientWithProxyHostAndPort() {
- String proxyHost = "http://proxy.example.com";
- int proxyPort = 8080;
- builder.proxy(proxyHost, proxyPort);
- OkHttpClient client = getHttpClientFromBuilder();
-
- assertEquals(EventSource.DEFAULT_CONNECT_TIMEOUT, Duration.ofMillis(client.connectTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_READ_TIMEOUT, Duration.ofMillis(client.readTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_WRITE_TIMEOUT, Duration.ofMillis(client.writeTimeoutMillis()));
- Assert.assertNotNull(client.proxy());
- assertEquals(proxyHost + ":" + proxyPort, client.proxy().address().toString());
- }
-
- @Test
- public void defaultClientWithProxy() {
- Proxy proxy = mock(java.net.Proxy.class);
- builder.proxy(proxy);
- OkHttpClient client = getHttpClientFromBuilder();
-
- assertEquals(EventSource.DEFAULT_CONNECT_TIMEOUT, Duration.ofMillis(client.connectTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_READ_TIMEOUT, Duration.ofMillis(client.readTimeoutMillis()));
- assertEquals(EventSource.DEFAULT_WRITE_TIMEOUT, Duration.ofMillis(client.writeTimeoutMillis()));
- assertEquals(proxy, client.proxy());
- }
-
- @Test
- public void defaultClientWithCustomTimeouts() {
- Duration connectTimeout = Duration.ofMillis(100);
- Duration readTimeout = Duration.ofMillis(1000);
- Duration writeTimeout = Duration.ofMillis(10000);
- builder.connectTimeout(connectTimeout);
- builder.readTimeout(readTimeout);
- builder.writeTimeout(writeTimeout);
- OkHttpClient client = getHttpClientFromBuilder();
-
- assertEquals(connectTimeout, Duration.ofMillis(client.connectTimeoutMillis()));
- assertEquals(readTimeout, Duration.ofMillis(client.readTimeoutMillis()));
- assertEquals(writeTimeout, Duration.ofMillis(client.writeTimeoutMillis()));
- }
-
- @Test
- public void customBuilderActions() {
- final int writeTimeout = 9999;
- builder.clientBuilderActions(new EventSource.Builder.ClientConfigurer() {
- public void configure(Builder b) {
- b.writeTimeout(writeTimeout, TimeUnit.MILLISECONDS);
- }
- });
- OkHttpClient client = getHttpClientFromBuilder();
-
- assertEquals(writeTimeout, client.writeTimeoutMillis());
- }
-
- @Test
- public void customMethod() throws IOException {
- builder.method("report");
- builder.body(RequestBody.create("hello world", MediaType.parse("text/plain; charset=utf-8")));
- try (EventSource es = builder.build()) {
- Request req = es.buildRequest();
- assertEquals("REPORT", req.method());
- assertEquals(MediaType.parse("text/plain; charset=utf-8"), req.body().contentType());
- Buffer actualBody = new Buffer();
- req.body().writeTo(actualBody);
- assertEquals("hello world", actualBody.readString(Charset.forName("utf-8")));
-
- // ensure we can build multiple requests:
- req = es.buildRequest();
- assertEquals("REPORT", req.method());
- assertEquals(MediaType.parse("text/plain; charset=utf-8"), req.body().contentType());
- actualBody = new Buffer();
- req.body().writeTo(actualBody);
- assertEquals("hello world", actualBody.readString(Charset.forName("utf-8")));
- }
- }
-
- @Test
- public void defaultMethod() {
- try (EventSource es = builder.build()) {
- Request req = es.buildRequest();
- assertEquals("GET", req.method());
- assertEquals(null, req.body());
- }
- }
-
- @Test
- public void customHeaders() throws IOException {
- Headers headers = new Headers.Builder()
- .add("header1", "value1").add("header1", "value2")
- .add("header2", "value1")
- .build();
- builder.headers(headers);
- try (EventSource es = builder.build()) {
- Request req = es.buildRequest();
- assertEquals(Arrays.asList("value1", "value2"), req.headers().values("header1"));
- assertEquals(Arrays.asList("value1"), req.headers().values("header2"));
- assertEquals(Arrays.asList("text/event-stream"), req.headers().values("Accept"));
- assertEquals(Arrays.asList("no-cache"), req.headers().values("Cache-Control"));
- }
- }
-
- @Test
- public void customHeadersOverwritingDefaults() throws IOException {
- Headers headers = new Headers.Builder()
- .add("Accept", "text/plain")
- .add("header2", "value1")
- .build();
- builder.headers(headers);
- try (EventSource es = builder.build()) {
- Request req = es.buildRequest();
- assertEquals(Arrays.asList("text/plain"), req.headers().values("Accept"));
- assertEquals(Arrays.asList("value1"), req.headers().values("header2"));
- }
- }
-
- @Test
- public void configuredLastEventIdIsIncludedInHeaders() throws Exception {
- String lastId = "123";
- builder.lastEventId(lastId);
- try (EventSource es = builder.build()) {
- Request req = es.buildRequest();
- assertEquals(Arrays.asList(lastId), req.headers().values("Last-Event-Id"));
- }
- }
-
- @Test
- public void lastEventIdIsUpdatedFromEvent() throws Exception {
- String initialLastId = "123";
- String newLastId = "099";
- String eventType = "thing";
- String eventData = "some-data";
-
- try (MockWebServer server = new MockWebServer()) {
- String body = "id: " + newLastId + "\nevent: " + eventType + "\ndata: " + eventData + "\n\n";
- server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN));
- server.start();
-
- Stubs.TestHandler eventHandler = new Stubs.TestHandler();
- EventSource.Builder builder = new EventSource.Builder(eventHandler, server.url("/"))
- .lastEventId(initialLastId);
- try (EventSource es = builder.build()) {
- es.start();
- assertEquals(Stubs.LogItem.opened(), eventHandler.log.take());
-
- Stubs.LogItem receivedEvent = eventHandler.log.take(); // waits till we've processed a request
- assertEquals(Stubs.LogItem.event(eventType, eventData, newLastId), receivedEvent);
-
- assertEquals(newLastId, es.getLastEventId());
- }
- }
- }
-
- @Test
- public void newLastEventIdIsSentOnNextConnectAttempt() throws Exception {
- String initialLastId = "123";
- String newLastId = "099";
- String eventType = "thing";
- String eventData = "some-data";
-
- try (MockWebServer server = new MockWebServer()) {
- String body = "id: " + newLastId + "\nevent: " + eventType + "\ndata: " + eventData + "\n\n";
- server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN));
- server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN)); // expect a 2nd connection
- server.start();
-
- Stubs.TestHandler eventHandler = new Stubs.TestHandler();
- EventSource.Builder builder = new EventSource.Builder(eventHandler, server.url("/"))
- .reconnectTime(Duration.ofMillis(100))
- .lastEventId(initialLastId);
- try (EventSource es = builder.build()) {
- es.start();
-
- RecordedRequest req0 = server.takeRequest();
- RecordedRequest req1 = server.takeRequest();
- assertEquals(initialLastId, req0.getHeader("Last-Event-Id"));
- assertEquals(newLastId, req1.getHeader("Last-Event-Id"));
- }
- }
- }
-}
diff --git a/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java b/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java
deleted file mode 100644
index 6935b81..0000000
--- a/src/test/java/com/launchdarkly/eventsource/ManualConnectionErrorTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.launchdarkly.eventsource;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.net.URI;
-import java.time.Duration;
-
-@Ignore
-@SuppressWarnings("javadoc")
-public class ManualConnectionErrorTest {
- EventSource source;
-
- EventHandler handler = new EventHandler() {
- public void onOpen() throws Exception {
- }
-
- @Override
- public void onClosed() throws Exception {
- }
-
- public void onMessage(String event, MessageEvent messageEvent) throws Exception {
- }
-
- public void onError(Throwable t) {
- System.out.println("async handler got error: " + t);
- }
-
- public void onComment(String comment) {
- }
- };
-
- @Test
- public void testConnectionIsRetriedAfter404() throws Exception {
- // Expected output: multiple connection retries, and "async handler got error" each time.
-
- source = new EventSource.Builder(handler, URI.create("http://launchdarkly.com/bad-url"))
- .reconnectTime(Duration.ofMillis(10))
- .build();
-
- source.start();
- Thread.sleep(100000L);
- }
-
- @Test
- public void testConnectionIsNotRetriedAfter404IfErrorHandlerSaysToStop() throws Exception {
- // Expected output: "connection handler got error ... 404", followed by "Connection has been explicitly
- // shut down by error handler" and no further connection retries.
-
- ConnectionErrorHandler connectionErrorHandler = new ConnectionErrorHandler() {
- public Action onConnectionError(Throwable t) {
- System.out.println("connection handler got error: " + t);
- if (t instanceof UnsuccessfulResponseException &&
- ((UnsuccessfulResponseException) t).getCode() == 404) {
- return Action.SHUTDOWN;
- }
- return Action.PROCEED;
- }
- };
-
- source = new EventSource.Builder(handler, URI.create("http://launchdarkly.com/bad-url"))
- .connectionErrorHandler(connectionErrorHandler)
- .reconnectTime(Duration.ofMillis(10))
- .build();
-
- source.start();
- Thread.sleep(100000L);
- }
-}
diff --git a/src/test/java/com/launchdarkly/eventsource/ManualTest.java b/src/test/java/com/launchdarkly/eventsource/ManualTest.java
deleted file mode 100644
index 055aaa4..0000000
--- a/src/test/java/com/launchdarkly/eventsource/ManualTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.launchdarkly.eventsource;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-
-@Ignore
-@SuppressWarnings("javadoc")
-public class ManualTest {
- private static final Logger logger = LoggerFactory.getLogger(ManualTest.class);
-
- @Test
- public void manualTest() throws InterruptedException {
- EventHandler handler = new EventHandler() {
- public void onOpen() throws Exception {
- logger.info("open");
- }
-
- @Override
- public void onClosed() throws Exception {
-
- }
-
- public void onMessage(String event, MessageEvent messageEvent) throws Exception {
- logger.info(event + ": " + messageEvent.getData());
- }
-
- public void onError(Throwable t) {
- logger.error("Error: " + t);
- }
-
- public void onComment(String comment) {
- logger.info("comment: " + comment);
- }
- };
- EventSource source = new EventSource.Builder(handler, URI.create("http://localhost:8080/events/")).build();
- source.start();
- logger.warn("Sleeping...");
- Thread.sleep(10000L);
- logger.debug("Stopping source");
- source.close();
- logger.debug("Stopped");
- }
-}
diff --git a/src/test/java/com/launchdarkly/eventsource/StubServer.java b/src/test/java/com/launchdarkly/eventsource/StubServer.java
new file mode 100644
index 0000000..8a228c6
--- /dev/null
+++ b/src/test/java/com/launchdarkly/eventsource/StubServer.java
@@ -0,0 +1,417 @@
+package com.launchdarkly.eventsource;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import okhttp3.Headers;
+
+/**
+ * A simple Jetty-based test framework for verifying end-to-end HTTP behavior.
+ *
+ * Previous versions of the library used okhttp's MockWebServer for end-to-end tests, but MockWebServer
+ * does not support actual streaming responses so the tests could not control when the stream got
+ * disconnected.
+ */
+@SuppressWarnings("javadoc")
+public final class StubServer implements Closeable {
+ private final Server server;
+ private final BlockingQueue requests = new LinkedBlockingQueue<>();
+
+ /**
+ * Starts an HTTP server that uses the specified handler.
+ *
+ * @param handler a {@link Handler} implementation
+ * @return a started server
+ */
+ public static StubServer start(Handler handler) {
+ return new StubServer(handler);
+ }
+
+ private StubServer(final Handler handler) {
+ server = new Server(0);
+
+ server.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ RequestInfo requestInfo = new RequestInfo(request);
+ requests.add(requestInfo);
+ handler.handle(request, response);
+ baseRequest.setHandled(true);
+ }
+ });
+ server.setStopTimeout(100); // without this, Jetty does not interrupt worker threads on shutdown
+
+ try {
+ server.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Shuts down the server.
+ *
+ * All active request handler threads will be interrupted.
+ */
+ @Override
+ public void close() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Returns the server's base URI.
+ *
+ * @return the base URI
+ */
+ public URI getUri() {
+ return server.getURI();
+ }
+
+ /**
+ * Returns the next queued request, blocking until one is available.
+ *
+ * @return the request information
+ */
+ public RequestInfo awaitRequest() {
+ try {
+ return requests.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Returns the next queued request, or null if none is available within the specified timeout.
+ *
+ * @param timeout the maximum time to wait
+ * @return the request information or null
+ */
+ public RequestInfo awaitRequest(Duration timeout) {
+ try {
+ return requests.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Interface for StubServer's simplified request handler mechanism.
+ */
+ public static interface Handler {
+ /**
+ * Handle a request.
+ *
+ * @param request the request
+ * @param response the response
+ */
+ public void handle(HttpServletRequest request, HttpServletResponse response);
+ }
+
+ /**
+ * The properties of a received request.
+ *
+ * Note that this fully reads the request body, so request handlers cannot make use of the body.
+ */
+ public static final class RequestInfo {
+ private final String method;
+ private final String path;
+ private final Headers headers;
+ private final String body;
+
+ public RequestInfo(HttpServletRequest request) {
+ this.method = request.getMethod();
+ this.path = request.getRequestURI();
+
+ Headers.Builder buildHeaders = new Headers.Builder();
+ Enumeration headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String name = headerNames.nextElement();
+ Enumeration values = request.getHeaders(name);
+ while (values.hasMoreElements()) {
+ buildHeaders.add(name, values.nextElement());
+ }
+ }
+ headers = buildHeaders.build();
+
+ StringBuilder s = new StringBuilder();
+ try {
+ try (BufferedReader reader = request.getReader()) {
+ char[] buf = new char[1000];
+ int count = -1;
+ while ((count = reader.read(buf)) > 0) {
+ s.append(buf, 0, count);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ body = s.toString();
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getHeader(String name) {
+ return headers.get(name);
+ }
+
+ public List getHeaders(String name) {
+ return headers.values(name);
+ }
+
+ public String getBody() {
+ return body;
+ }
+ }
+
+ /**
+ * Interface for use with {@link Handlers#interruptible}.
+ */
+ public static interface InterruptibleHandler extends Handler {
+ /**
+ * Causes the handler to stop what it is doing and terminate the response as soon as possible.
+ */
+ void interrupt();
+ }
+
+ /**
+ * Factory methods for StubServer handlers.
+ */
+ public static abstract class Handlers {
+ /**
+ * Provides a handler that returns the specified HTTP status, with no content.
+ *
+ * @param status the status code
+ * @return the handler
+ */
+ public static Handler returnStatus(final int status) {
+ return new Handler() {
+ public void handle(HttpServletRequest req, HttpServletResponse resp) {
+ resp.setStatus(status);
+ }
+ };
+ }
+
+ /**
+ * Provides a handler that delegates to a series of handlers for each request, in the order given.
+ * If there are more requests than the number of handlers, the last handler is used for the rest.
+ *
+ * @param firstHandler the first handler
+ * @param moreHandlers additional handlers
+ * @return the delegating handler
+ */
+ public static Handler forRequestsInSequence(final Handler firstHandler, final Handler... moreHandlers) {
+ final AtomicInteger counter = new AtomicInteger(0);
+ return new Handler() {
+ public void handle(HttpServletRequest req, HttpServletResponse resp) {
+ int i = counter.getAndIncrement();
+ Handler h = i == 0 ? firstHandler :
+ (i >= moreHandlers.length ? moreHandlers[moreHandlers.length - 1] : moreHandlers[i - 1]);
+ h.handle(req, resp);
+ }
+ };
+ }
+
+ /**
+ * Provides a handler that does not send a response, but does not close the socket.
+ *
+ * @return the handler
+ */
+ public static Handler hang() {
+ return new Handler() {
+ public void handle(HttpServletRequest request, HttpServletResponse response) {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Provides the ability to interrupt the worker thread for a handler at any time. This can be used
+ * to terminate a stream. If multiple requests are in progress for the same handler, interrupting it
+ * causes all of them to terminate.
+ *
+ * @param realHandler the handler to delegate to
+ * @return a wrapper handler that provides an {@link InterruptibleHandler#interrupt()} method
+ */
+ public static InterruptibleHandler interruptible(final Handler realHandler) {
+ final AtomicBoolean interrupted = new AtomicBoolean(false);
+ final Object interruptLock = new Object();
+
+ return new InterruptibleHandler() {
+ @Override
+ public void handle(HttpServletRequest request, HttpServletResponse response) {
+ final Thread writerThread = Thread.currentThread();
+ Thread interrupterThread = new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ synchronized (interruptLock) {
+ if (interrupted.get()) {
+ break;
+ }
+ try {
+ interruptLock.wait();
+ }
+ catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ writerThread.interrupt();
+ }
+ });
+ interrupterThread.start();
+ realHandler.handle(request, response);
+ }
+
+ @Override
+ public void interrupt() {
+ synchronized (interruptLock) {
+ interrupted.set(true);
+ interruptLock.notifyAll();
+ }
+ }
+ };
+ }
+
+ /**
+ * Interface for use with {@link Handlers#stream(String, StreamProducer)}.
+ */
+ public static interface StreamProducer {
+ /**
+ * Pushes chunks of response data onto a queue. Another worker thread will dequeue and send the chunks.
+ *
+ * @param chunks the queue for chunks of stream data
+ * @return true if the stream should be left open indefinitely afterward, false to close it
+ */
+ boolean writeStream(BlockingQueue chunks);
+ }
+
+ /**
+ * Provides a handler that streams a chunked response.
+ *
+ * @param contentType value for the Content-Type header
+ * @param streamProducer a {@link StreamProducer} that will provide the response
+ * @return the handler
+ */
+ public static Handler stream(final String contentType, final StreamProducer streamProducer) {
+ return new Handler() {
+ public void handle(HttpServletRequest req, HttpServletResponse resp) {
+ resp.setStatus(200);
+ resp.setHeader("Content-Type", contentType);
+ resp.setHeader("Transfer-Encoding", "chunked");
+ try {
+ resp.flushBuffer();
+ PrintWriter w = resp.getWriter();
+ final BlockingQueue chunks = new LinkedBlockingQueue<>();
+ final String terminator = "***EOF***"; // value doesn't matter, we're checking for reference equality
+ Thread producerThread = new Thread(new Runnable() {
+ public void run() {
+ boolean leaveOpen = streamProducer.writeStream(chunks);
+ if (leaveOpen) {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ chunks.add(terminator);
+ }
+ });
+ producerThread.start();
+ while (true) {
+ try {
+ String chunk = chunks.take();
+ if (chunk == terminator) {
+ break;
+ }
+ w.write(chunk);
+ w.flush();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ producerThread.interrupt();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ };
+ }
+
+ /**
+ * Provides content for {@link #stream(String, StreamProducer)} that is a single chunk of data.
+ *
+ * @param body the response body
+ * @param leaveOpen true to leave the stream open after sending this data, false to close it
+ * @return the stream producer
+ */
+ public static StreamProducer streamProducerFromString(final String body, final boolean leaveOpen) {
+ return streamProducerFromChunkedString(body, body.length(), Duration.ZERO, leaveOpen);
+ }
+
+ /**
+ * Provides content for {@link #stream(String, StreamProducer)} that is a string broken up into
+ * multiple chunks of equal size.
+ *
+ * @param body the response body
+ * @param chunkSize the number of characters per chunk
+ * @param chunkDelay how long to wait between chunks
+ * @param leaveOpen true to leave the stream open after sending this data, false to close it
+ * @return the stream producer
+ */
+ public static StreamProducer streamProducerFromChunkedString(final String body, final int chunkSize,
+ final Duration chunkDelay, final boolean leaveOpen) {
+ return new StreamProducer() {
+ public boolean writeStream(BlockingQueue chunks) {
+ for (int p = 0; p < body.length(); p += chunkSize) {
+ String chunk = body.substring(p, Math.min(p + chunkSize, body.length()));
+ chunks.add(chunk);
+ try {
+ Thread.sleep(chunkDelay.toMillis());
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ return leaveOpen;
+ }
+ };
+ }
+ }
+}
diff --git a/src/test/java/com/launchdarkly/eventsource/Stubs.java b/src/test/java/com/launchdarkly/eventsource/Stubs.java
index 85b6baf..feabe75 100644
--- a/src/test/java/com/launchdarkly/eventsource/Stubs.java
+++ b/src/test/java/com/launchdarkly/eventsource/Stubs.java
@@ -2,24 +2,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
-import okhttp3.mockwebserver.MockResponse;
-import okhttp3.mockwebserver.SocketPolicy;
+import static org.junit.Assert.assertNull;
class Stubs {
- private static final int CHUNK_SIZE = 5;
-
- static MockResponse createEventsResponse(String body, SocketPolicy socketPolicy) {
- return new MockResponse()
- .setHeader("Content-Type", "text/event-stream")
- .setChunkedBody(body, CHUNK_SIZE)
- .setSocketPolicy(socketPolicy);
- }
-
- static MockResponse createErrorResponse(int status) {
- return new MockResponse().setResponseCode(500);
- }
-
static class LogItem {
private final String action;
private final String[] params;
@@ -104,5 +91,11 @@ public void onComment(String comment) throws Exception {
public void onClosed() throws Exception {
log.add(LogItem.closed());
}
+
+ void assertNoMoreLogItems() {
+ try {
+ assertNull(log.poll(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {}
+ }
}
}