From 261b530ac01e2464216b0bd99a0c245f2a81da01 Mon Sep 17 00:00:00 2001 From: Jem Mawson Date: Tue, 17 Dec 2019 16:21:29 +1000 Subject: [PATCH] Separates backoff delay logic for testability into BackoffCalculator --- .../eventsource/BackoffCalculator.java | 90 ++++++++++++ .../launchdarkly/eventsource/EventSource.java | 130 +++++++----------- .../eventsource/BackoffCalculatorTest.java | 52 +++++++ .../eventsource/EventSourceHttpTest.java | 90 ++++++------ .../eventsource/EventSourceTest.java | 51 ++----- 5 files changed, 250 insertions(+), 163 deletions(-) create mode 100644 src/main/java/com/launchdarkly/eventsource/BackoffCalculator.java create mode 100644 src/test/java/com/launchdarkly/eventsource/BackoffCalculatorTest.java diff --git a/src/main/java/com/launchdarkly/eventsource/BackoffCalculator.java b/src/main/java/com/launchdarkly/eventsource/BackoffCalculator.java new file mode 100644 index 0000000..f9f6b47 --- /dev/null +++ b/src/main/java/com/launchdarkly/eventsource/BackoffCalculator.java @@ -0,0 +1,90 @@ +package com.launchdarkly.eventsource; + +import java.util.Random; + +public class BackoffCalculator { + + private final Random jitter = new Random(); + + private long maxReconnectTimeMs; + private long reconnectTimeMs; + private long backoffResetThresholdMs; + private int retryCount; + + /** + * A calculator for deriving jittered backoff durations, based upon the quantity of consecutive + * short-lived connections preceding each calculation request. + * + * @param maxReconnectTimeMs The maximum backoff delay in milliseconds + * @param reconnectTimeMs The base backoff delay in milliseconds. This value grows exponentially + * with each short-lived connection until `maxReconnectTime` is reached. + * @param backoffResetThresholdMs The duration a connection must have been alive for before it is + * considered to be long-lived. + */ + public BackoffCalculator(long maxReconnectTimeMs, long reconnectTimeMs, long backoffResetThresholdMs) { + this.maxReconnectTimeMs = maxReconnectTimeMs; + this.reconnectTimeMs = reconnectTimeMs; + this.backoffResetThresholdMs = backoffResetThresholdMs; + } + + public long delayAfterConnectedFor(long connectedDuration) { + retryCount = connectedDuration >= backoffResetThresholdMs + ? 0 + : retryCount + 1; + return delayGivenRetryCount(); + } + + public long getBackoffResetThresholdMs() { + return backoffResetThresholdMs; + } + + public void setBackoffResetThresholdMs(long backoffResetThresholdMs) { + this.backoffResetThresholdMs = backoffResetThresholdMs; + } + + public long getMaxReconnectTimeMs() { + return maxReconnectTimeMs; + } + + public void setMaxReconnectTimeMs(long maxReconnectTimeMs) { + this.maxReconnectTimeMs = maxReconnectTimeMs; + } + + public long getReconnectTimeMs() { + return reconnectTimeMs; + } + + public void setReconnectTimeMs(long reconnectTimeMs) { + this.reconnectTimeMs = reconnectTimeMs; + } + + private long delayGivenRetryCount() { + if (retryCount == 0) { + return 0; + } + long jitterVal = Math.min(maxReconnectTimeMs, reconnectTimeMs * pow2(retryCount)); + return jitterVal / 2 + nextLong(jitter, jitterVal) / 2; + } + + // Returns 2**k, or Integer.MAX_VALUE if 2**k would overflow + private int pow2(int k) { + return (k < Integer.SIZE - 1) ? (1 << k) : Integer.MAX_VALUE; + } + + // Adapted from http://stackoverflow.com/questions/2546078/java-random-long-number-in-0-x-n-range + // Since ThreadLocalRandom.current().nextLong(n) requires Android 5 + private long nextLong(Random rand, long bound) { + if (bound <= 0) { + throw new IllegalArgumentException("bound must be positive"); + } + + long r = rand.nextLong() & Long.MAX_VALUE; + long m = bound - 1L; + if ((bound & m) == 0) { // i.e., bound is a power of 2 + r = (bound * r) >> (Long.SIZE - 1); + } else { + for (long u = r; u - (r = u % bound) + m < 0L; u = rand.nextLong() & Long.MAX_VALUE) ; + } + return r; + } +} diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index ff302a4..2276b4a 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -82,7 +82,7 @@ public class EventSource implements ConnectionHandler, Closeable { private static final Headers defaultHeaders = new Headers.Builder().add("Accept", "text/event-stream").add("Cache-Control", "no-cache").build(); - + private final String name; private volatile HttpUrl url; private final Headers headers; @@ -91,16 +91,13 @@ public class EventSource implements ConnectionHandler, Closeable { private final RequestTransformer requestTransformer; private final ExecutorService eventExecutor; private final ExecutorService streamExecutor; - private long reconnectTimeMs; - private long maxReconnectTimeMs; - private final long backoffResetThresholdMs; private volatile String lastEventId; private final EventHandler handler; private final ConnectionErrorHandler connectionErrorHandler; private final AtomicReference readyState; private final OkHttpClient client; private volatile Call call; - private final Random jitter = new Random(); + private final BackoffCalculator backoffCalculator; private Response response; private BufferedSource bufferedSource; @@ -114,15 +111,16 @@ public class EventSource implements ConnectionHandler, Closeable { this.method = builder.method; this.body = builder.body; this.requestTransformer = builder.requestTransformer; - this.reconnectTimeMs = builder.reconnectTimeMs; - this.maxReconnectTimeMs = builder.maxReconnectTimeMs; - this.backoffResetThresholdMs = builder.backoffResetThresholdMs; ThreadFactory eventsThreadFactory = createThreadFactory("okhttp-eventsource-events"); this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory); ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream"); this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory); this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler); this.connectionErrorHandler = builder.connectionErrorHandler; + this.backoffCalculator = new BackoffCalculator( + builder.maxReconnectTimeMs, + builder.reconnectTimeMs, + builder.backoffResetThresholdMs); this.readyState = new AtomicReference<>(RAW); this.client = builder.clientBuilder.build(); } @@ -206,7 +204,7 @@ public void close() { } } } - + Request buildRequest() { Request.Builder builder = new Request.Builder() .headers(headers) @@ -216,7 +214,7 @@ Request buildRequest() { if (lastEventId != null && !lastEventId.isEmpty()) { builder.addHeader("Last-Event-ID", lastEventId); } - + Request request = builder.build(); return requestTransformer == null ? request : requestTransformer.transformRequest(request); } @@ -225,16 +223,15 @@ private void connect() { response = null; bufferedSource = null; - int reconnectAttempts = 0; ConnectionErrorHandler.Action errorHandlerAction = null; - + try { while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) { long connectedTime = -1; ReadyState currentState = readyState.getAndSet(CONNECTING); logger.debug("readyState change: " + currentState + " -> " + CONNECTING); - try { + try { call = client.newCall(buildRequest()); response = call.execute(); if (response.isSuccessful()) { @@ -302,12 +299,11 @@ private void connect() { handler.onError(e); } } - // Reset the backoff if we had a successful connection that stayed good for at least - // backoffResetThresholdMs milliseconds. - if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThresholdMs) { - reconnectAttempts = 0; - } - maybeWaitWithBackoff(++reconnectAttempts); + + long connectedDuration = connectedTime >= 0 + ? System.currentTimeMillis() - connectedTime + : 0; + maybeWait(backoffCalculator.delayAfterConnectedFor(connectedDuration)); } } } catch (RejectedExecutionException ignored) { @@ -327,11 +323,10 @@ private ConnectionErrorHandler.Action dispatchError(Throwable t) { } return action; } - - private void maybeWaitWithBackoff(int reconnectAttempts) { - if (reconnectTimeMs > 0 && reconnectAttempts > 0) { + + private void maybeWait(long sleepTimeMs) { + if (sleepTimeMs > 0) { try { - long sleepTimeMs = backoffWithJitter(reconnectAttempts); logger.info("Waiting " + sleepTimeMs + " milliseconds before reconnecting..."); Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { @@ -339,40 +334,13 @@ private void maybeWaitWithBackoff(int reconnectAttempts) { } } - long backoffWithJitter(int reconnectAttempts) { - long jitterVal = Math.min(maxReconnectTimeMs, reconnectTimeMs * pow2(reconnectAttempts)); - return jitterVal / 2 + nextLong(jitter, jitterVal) / 2; - } - - // Returns 2**k, or Integer.MAX_VALUE if 2**k would overflow - private int pow2(int k) { - return (k < Integer.SIZE - 1) ? (1 << k) : Integer.MAX_VALUE; - } - - // Adapted from http://stackoverflow.com/questions/2546078/java-random-long-number-in-0-x-n-range - // Since ThreadLocalRandom.current().nextLong(n) requires Android 5 - private long nextLong(Random rand, long bound) { - if (bound <= 0) { - throw new IllegalArgumentException("bound must be positive"); - } - - long r = rand.nextLong() & Long.MAX_VALUE; - long m = bound - 1L; - if ((bound & m) == 0) { // i.e., bound is a power of 2 - r = (bound * r) >> (Long.SIZE - 1); - } else { - for (long u = r; u - (r = u % bound) + m < 0L; u = rand.nextLong() & Long.MAX_VALUE) ; - } - return r; - } - private static Headers addDefaultHeaders(Headers custom) { Headers.Builder builder = new Headers.Builder(); for (String name : defaultHeaders.names()) { if (!custom.names().contains(name)) { // skip the default if they set any custom values for this key for (String value: defaultHeaders.values(name)) { - builder.add(name, value); + builder.add(name, value); } } } @@ -398,7 +366,7 @@ private static Headers addDefaultHeaders(Headers custom) { * @see #DEFAULT_RECONNECT_TIME_MS */ public void setReconnectionTimeMs(long reconnectionTimeMs) { - this.reconnectTimeMs = reconnectionTimeMs; + backoffCalculator.setReconnectTimeMs(reconnectionTimeMs); } /** @@ -410,7 +378,7 @@ public void setReconnectionTimeMs(long reconnectionTimeMs) { * @see #DEFAULT_MAX_RECONNECT_TIME_MS */ public void setMaxReconnectTimeMs(long maxReconnectTimeMs) { - this.maxReconnectTimeMs = maxReconnectTimeMs; + backoffCalculator.setMaxReconnectTimeMs(maxReconnectTimeMs); } /** @@ -418,7 +386,7 @@ public void setMaxReconnectTimeMs(long maxReconnectTimeMs) { * @return the maximum delay in milliseconds */ public long getMaxReconnectTimeMs() { - return this.maxReconnectTimeMs; + return backoffCalculator.getMaxReconnectTimeMs(); } /** @@ -440,7 +408,7 @@ public void setLastEventId(String lastEventId) { public HttpUrl getHttpUrl() { return this.url; } - + /** * Returns the current stream endpoint as a java.net.URI. * @return the endpoint URI @@ -454,12 +422,12 @@ public URI getUri() { /** * Changes the stream endpoint. This change will not take effect until the next time the * EventSource attempts to make a connection. - * + * * @param url the new endpoint, as an OkHttp HttpUrl * @throws IllegalArgumentException if the parameter is null or if the scheme is not HTTP or HTTPS * @see #getHttpUrl() * @see #setUri(URI) - * + * * @since 1.9.0 */ public void setHttpUrl(HttpUrl url) { @@ -468,11 +436,11 @@ public void setHttpUrl(HttpUrl url) { } this.url = url; } - + /** * Changes the stream endpoint. This change will not take effect until the next time the * EventSource attempts to make a connection. - * + * * @param uri the new endpoint, as a java.net.URI * @throws IllegalArgumentException if the parameter is null or if the scheme is not HTTP or HTTPS * @see #getUri() @@ -485,7 +453,7 @@ public void setUri(URI uri) { private static IllegalArgumentException badUrlException() { return new IllegalArgumentException("URI/URL must not be null and must be HTTP or HTTPS"); } - + /** * Interface for an object that can modify the network request that the EventSource will make. * Use this in conjunction with {@link Builder#requestTransformer(RequestTransformer)} if you need to set request @@ -498,24 +466,24 @@ private static IllegalArgumentException badUrlException() { * return input.newBuilder().tag("hello").build(); * } * } - * + * * EventSource es = new EventSource.Builder(handler, uri).requestTransformer(new RequestTagger()).build(); * - * + * * @since 1.9.0 */ - public static interface RequestTransformer { + public interface RequestTransformer { /** * Returns a request that is either the same as the input request or based on it. When * this method is called, EventSource has already set all of its standard properties on * the request. - * + * * @param input the original request * @return the request that will be used */ - public Request transformRequest(Request input); + Request transformRequest(Request input); } - + /** * Builder for {@link EventSource}. */ @@ -534,10 +502,10 @@ public static final class Builder { private RequestTransformer requestTransformer = null; private RequestBody body = null; private OkHttpClient.Builder clientBuilder; - + /** * Creates a new builder. - * + * * @param handler the event handler * @param uri the endpoint as a java.net.URI * @throws IllegalArgumentException if either argument is null, or if the endpoint is not HTTP or HTTPS @@ -548,11 +516,11 @@ public Builder(EventHandler handler, URI uri) { /** * Creates a new builder. - * + * * @param handler the event handler * @param url the endpoint as an OkHttp HttpUrl * @throws IllegalArgumentException if either argument is null, or if the endpoint is not HTTP or HTTPS - * + * * @since 1.9.0 */ public Builder(EventHandler handler, HttpUrl url) { @@ -566,7 +534,7 @@ public Builder(EventHandler handler, HttpUrl url) { this.handler = handler; this.clientBuilder = createInitialClientBuilder(); } - + private static OkHttpClient.Builder createInitialClientBuilder() { OkHttpClient.Builder b = new OkHttpClient.Builder() .connectionPool(new ConnectionPool(1, 1, TimeUnit.SECONDS)) @@ -581,7 +549,7 @@ private static OkHttpClient.Builder createInitialClientBuilder() { } return b; } - + /** * Set the HTTP method used for this EventSource client to use for requests to establish the EventSource. * @@ -609,17 +577,17 @@ public Builder body(RequestBody body) { /** * Specifies an object that will be used to customize outgoing requests. See {@link RequestTransformer} for details. - * + * * @param requestTransformer the transformer object * @return the builder - * + * * @since 1.9.0 */ public Builder requestTransformer(RequestTransformer requestTransformer) { this.requestTransformer = requestTransformer; return this; } - + /** * Set the name for this EventSource client to be used when naming the logger and threadpools. This is mainly useful when * multiple EventSource clients exist within the same process. @@ -669,12 +637,12 @@ public Builder maxReconnectTimeMs(long maxReconnectTimeMs) { * will be greater than the last delay; if it fails after the threshold, the delay will start over at * the initial minimum value. This prevents long delays from occurring on connections that are only * rarely restarted. - * + * * @param backoffResetThresholdMs the minimum time in milliseconds that a connection must stay open to - * avoid resetting the delay + * avoid resetting the delay * @return the builder * @see EventSource#DEFAULT_BACKOFF_RESET_THRESHOLD_MS - * + * * @since 1.9.0 */ public Builder backoffResetThresholdMs(long backoffResetThresholdMs) { @@ -805,7 +773,7 @@ public Builder connectionErrorHandler(ConnectionErrorHandler handler) { * eventSourceBuilder.clientBuilderActions(b -> { * b.sslSocketFactory(mySocketFactory, myTrustManager); * }); - * + * * // Java 7 example (anonymous class) * eventSourceBuilder.clientBuilderActions(new EventSource.Builder.ClientConfigurer() { * public void configure(OkHttpClient.Builder v) { @@ -821,7 +789,7 @@ public Builder clientBuilderActions(ClientConfigurer configurer) { configurer.configure(clientBuilder); return this; } - + /** * Constructs an {@link EventSource} using the builder's current properties. * @return the new EventSource instance @@ -853,7 +821,7 @@ private static X509TrustManager defaultTrustManager() throws GeneralSecurityExce } return (X509TrustManager) trustManagers[0]; } - + /** * An interface for use with {@link EventSource.Builder#clientBuilderActions(ClientConfigurer)}. * @since 1.10.0 diff --git a/src/test/java/com/launchdarkly/eventsource/BackoffCalculatorTest.java b/src/test/java/com/launchdarkly/eventsource/BackoffCalculatorTest.java new file mode 100644 index 0000000..daf9b14 --- /dev/null +++ b/src/test/java/com/launchdarkly/eventsource/BackoffCalculatorTest.java @@ -0,0 +1,52 @@ +package com.launchdarkly.eventsource; + +import org.junit.Assert; +import org.junit.Test; + +import static com.launchdarkly.eventsource.EventSource.DEFAULT_BACKOFF_RESET_THRESHOLD_MS; +import static com.launchdarkly.eventsource.EventSource.DEFAULT_MAX_RECONNECT_TIME_MS; +import static com.launchdarkly.eventsource.EventSource.DEFAULT_RECONNECT_TIME_MS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class BackoffCalculatorTest { + + @Test + public void respectsDefaultMaximumBackoffTime() { + BackoffCalculator calculator = subject(); + for (int i = 0; i < 300; i++) { + assertTrue(calculator.delayAfterConnectedFor(0) < calculator.getMaxReconnectTimeMs()); + } + } + + @Test + public void respectsCustomMaximumBackoffTime() { + BackoffCalculator calculator = new BackoffCalculator(5000, 2000, 100); + for (int i = 0; i < 300; i++) { + assertTrue(calculator.delayAfterConnectedFor(0) < calculator.getMaxReconnectTimeMs()); + } + } + + @Test + public void resetsDelayToZeroAfterSuccess() { + BackoffCalculator calculator = subject(); + assertTrue(calculator.delayAfterConnectedFor(0) > 0); + assertEquals(0, calculator.delayAfterConnectedFor(calculator.getBackoffResetThresholdMs())); + } + + @Test + public void delayIncreasesOnSubsequentFailures() { + BackoffCalculator calculator = subject(); + long priorDelay = 0; + for (int i = 0; i < 5; i++) { + long thisDelay = calculator.delayAfterConnectedFor(0); + assertTrue(thisDelay > priorDelay); + priorDelay = thisDelay; + } + } + + private BackoffCalculator subject() { + return new BackoffCalculator(DEFAULT_MAX_RECONNECT_TIME_MS, + DEFAULT_RECONNECT_TIME_MS, DEFAULT_BACKOFF_RESET_THRESHOLD_MS); + } +} diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java index 3c2b81b..4697178 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceHttpTest.java @@ -13,7 +13,7 @@ public class EventSourceHttpTest { private static final int CHUNK_SIZE = 5; - + @Test public void eventSourceReadsChunkedResponse() throws Exception { String body = "data: data-by-itself\n\n" + @@ -30,30 +30,30 @@ public void eventSourceReadsChunkedResponse() throws Exception { try (MockWebServer server = new MockWebServer()) { server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN)); server.start(); - + try (EventSource es = new EventSource.Builder(handler, server.url("/")) .build()) { es.start(); - + assertEquals(LogItem.opened(), handler.log.take()); - + assertEquals(LogItem.event("message", "data-by-itself"), // "message" is the default event name, per SSE spec handler.log.take()); assertEquals(LogItem.event("event-with-data", "abc"), handler.log.take()); - + assertEquals(LogItem.comment("this is a comment"), handler.log.take()); - + assertEquals(LogItem.event("event-with-more-data-and-id", "abc\ndef", "my-id"), handler.log.take()); } - + assertEquals(LogItem.closed(), handler.log.take()); } } - + @Test public void eventSourceReconnectsAfterSocketClosed() throws Exception { String body1 = "data: first\n\n"; @@ -65,25 +65,25 @@ public void eventSourceReconnectsAfterSocketClosed() throws Exception { server.enqueue(createEventsResponse(body1, SocketPolicy.DISCONNECT_AT_END)); server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN)); server.start(); - + try (EventSource es = new EventSource.Builder(handler, server.url("/")) .reconnectTimeMs(10) .build()) { es.start(); - + assertEquals(LogItem.opened(), handler.log.take()); - + assertEquals(LogItem.event("message", "first"), handler.log.take()); assertEquals(LogItem.closed(), handler.log.take()); - + assertEquals(LogItem.opened(), handler.log.take()); assertEquals(LogItem.event("message", "second"), handler.log.take()); } - + assertEquals(LogItem.closed(), handler.log.take()); } } @@ -98,21 +98,21 @@ public void eventSourceReconnectsAfterErrorOnFirstRequest() throws Exception { server.enqueue(createErrorResponse(500)); server.enqueue(createEventsResponse(body, SocketPolicy.KEEP_OPEN)); server.start(); - + try (EventSource es = new EventSource.Builder(handler, server.url("/")) .reconnectTimeMs(10) .build()) { es.start(); - + assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), handler.log.take()); - + assertEquals(LogItem.opened(), handler.log.take()); - + assertEquals(LogItem.event("message", "good"), handler.log.take()); } - + assertEquals(LogItem.closed(), handler.log.take()); } } @@ -129,47 +129,47 @@ public void eventSourceReconnectsAgainAfterErrorOnFirstReconnect() throws Except server.enqueue(createErrorResponse(500)); server.enqueue(createEventsResponse(body2, SocketPolicy.KEEP_OPEN)); server.start(); - + try (EventSource es = new EventSource.Builder(handler, server.url("/")) .reconnectTimeMs(10) .build()) { es.start(); - + assertEquals(LogItem.opened(), handler.log.take()); - + assertEquals(LogItem.event("message", "first"), handler.log.take()); - + assertEquals(LogItem.closed(), handler.log.take()); - + assertEquals(LogItem.error(new UnsuccessfulResponseException(500)), handler.log.take()); - + assertEquals(LogItem.opened(), handler.log.take()); - + assertEquals(LogItem.event("message", "second"), handler.log.take()); } - + assertEquals(LogItem.closed(), handler.log.take()); } } - + private MockResponse createEventsResponse(String body, SocketPolicy socketPolicy) { return new MockResponse() .setHeader("Content-Type", "text/event-stream") .setChunkedBody(body, CHUNK_SIZE) .setSocketPolicy(socketPolicy); } - + private MockResponse createErrorResponse(int status) { - return new MockResponse().setResponseCode(500); + return new MockResponse().setResponseCode(status); } - + static class LogItem { private final String action; private final String[] params; - + private LogItem(String action, String[] params) { this.action = action; this.params = params; @@ -178,31 +178,31 @@ private LogItem(String action, String[] params) { public static LogItem opened() { return new LogItem("opened", null); } - + public static LogItem closed() { return new LogItem("closed", null); } - + public static LogItem event(String eventName, String data) { return event(eventName, data, null); } public static LogItem event(String eventName, String data, String eventId) { if (eventId == null) { - + } return new LogItem("event", eventId == null ? new String[] { eventName, data } : new String[] { eventName, data, eventId }); } - + public static LogItem comment(String comment) { return new LogItem("comment", new String[] { comment }); } - + public static LogItem error(Throwable t) { return new LogItem("error", new String[] { t.toString() }); } - + public String toString() { StringBuilder sb = new StringBuilder().append(action); if (params != null) { @@ -217,35 +217,35 @@ public String toString() { } return sb.toString(); } - + public boolean equals(Object o) { return (o instanceof LogItem) && toString().equals(o.toString()); } - + public int hashCode() { return toString().hashCode(); } } - + static class TestHandler implements EventHandler { public final BlockingQueue log = new ArrayBlockingQueue<>(100); - + public void onOpen() throws Exception { log.add(LogItem.opened()); } - + public void onMessage(String event, MessageEvent messageEvent) throws Exception { log.add(LogItem.event(event, messageEvent.getData(), messageEvent.getLastEventId())); } - + public void onError(Throwable t) { log.add(LogItem.error(t)); } - + public void onComment(String comment) throws Exception { log.add(LogItem.comment(comment)); } - + @Override public void onClosed() throws Exception { log.add(LogItem.closed()); diff --git a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java b/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java index 2acace9..e2d7673 100644 --- a/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java +++ b/src/test/java/com/launchdarkly/eventsource/EventSourceTest.java @@ -1,25 +1,23 @@ package com.launchdarkly.eventsource; +import java.io.IOException; +import java.net.Proxy; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; +import okhttp3.OkHttpClient.Builder; import okhttp3.Request; import okhttp3.RequestBody; -import okhttp3.OkHttpClient.Builder; import okio.Buffer; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import java.io.IOException; -import java.net.Proxy; -import java.net.URI; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; @@ -47,18 +45,18 @@ public void hasExpectedUriWhenInitializedWithHttpUrl() { EventSource es = new EventSource.Builder(mock(EventHandler.class), STREAM_HTTP_URL).build(); assertEquals(STREAM_URI, es.getUri()); } - + @Test public void hasExpectedHttpUrlWhenInitializedWithUri() { assertEquals(STREAM_HTTP_URL, eventSource.getHttpUrl()); } - + @Test public void hasExpectedHttpUrlWhenInitializedWithHttpUrl() { EventSource es = new EventSource.Builder(mock(EventHandler.class), STREAM_HTTP_URL).build(); assertEquals(STREAM_HTTP_URL, es.getHttpUrl()); } - + @Test(expected=IllegalArgumentException.class) public void handlerCannotBeNull() { new EventSource.Builder(null, STREAM_URI); @@ -97,32 +95,11 @@ public void canSetHttpUrl() { eventSource.setHttpUrl(url); assertEquals(url, eventSource.getHttpUrl()); } - + @Test(expected=IllegalArgumentException.class) public void cannotSetHttpUrlToNull() { eventSource.setHttpUrl(null); } - - @Test - public void respectsDefaultMaximumBackoffTime() { - eventSource.setReconnectionTimeMs(2000); - assertEquals(EventSource.DEFAULT_MAX_RECONNECT_TIME_MS, eventSource.getMaxReconnectTimeMs()); - Assert.assertTrue(eventSource.backoffWithJitter(300) < eventSource.getMaxReconnectTimeMs()); - } - - @Test - public void respectsCustomMaximumBackoffTime() { - eventSource.setReconnectionTimeMs(2000); - eventSource.setMaxReconnectTimeMs(5000); - Assert.assertTrue(eventSource.backoffWithJitter(300) < eventSource.getMaxReconnectTimeMs()); - } - - @Ignore("Useful for inspecting jitter values empirically") - public void inspectJitter() { - for (int i = 0; i < 100; i++) { - System.out.println("With jitter, retry " + i + ": " + eventSource.backoffWithJitter(i)); - } - } @Test public void defaultClient() { @@ -190,7 +167,7 @@ public void configure(Builder b) { assertEquals(writeTimeout, client.writeTimeoutMillis()); } - + @Test public void customMethod() throws IOException { builder.method("report"); @@ -217,7 +194,7 @@ public void defaultMethod() { assertEquals("GET", req.method()); assertEquals(null, req.body()); } - + @Test public void customHeaders() throws IOException { Headers headers = new Headers.Builder() @@ -231,7 +208,7 @@ public void customHeaders() throws IOException { assertEquals(Arrays.asList("text/event-stream"), req.headers().values("Accept")); assertEquals(Arrays.asList("no-cache"), req.headers().values("Cache-Control")); } - + @Test public void customHeadersOverwritingDefaults() throws IOException { Headers headers = new Headers.Builder()