Skip to content

Commit

Permalink
fix shutdown state logic, simplify code paths (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-darkly authored Jun 18, 2020
1 parent 7ef1505 commit d4143ca
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 199 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ jacocoTestCoverageVerification {
violationRules { rules ->
def knownMissedLinesForMethods = [
// The key for each of these items is the complete method signature minus the "com.launchdarkly.eventsource." prefix.
"EventSource.connect()": 12,
"EventSource.handleSuccessfulResponse(okhttp3.Response)": 2,
"EventSource.maybeReconnectDelay(int, long)": 2,
"EventSource.run()": 3,
"EventSource.Builder.createInitialClientBuilder()": 1,
"EventSource.Builder.defaultTrustManager()": 2,
"SLF4JLogger.error(java.lang.String)": 2,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package com.launchdarkly.eventsource;

import java.io.EOFException;

/**
* Interface for an object that will be notified when EventSource encounters a connection failure.
* This is different from {@link EventHandler#onError(Throwable)} in that it will not be called for
* other kinds of errors; also, it has the ability to tell EventSource to stop reconnecting.
* Interface for an object that will be notified when EventSource encounters a socket connection
* error or receives an error response. This is different from {@link EventHandler#onError(Throwable)}
* in two ways:
* <ol>
* <li> It has the ability to tell EventSource to shut down instead of reconnecting.
* <li> If the server simply ends the stream, the {@code ConnectionErrorHandler} is called with
* an {@code EOFException}; {@code onError} is not called in this case.
* </ol>
*/
public interface ConnectionErrorHandler {
/**
Expand All @@ -26,8 +33,11 @@ public static enum Action {
/**
* This method is called synchronously for all exceptions that occur on the socket connection
* (including an {@link UnsuccessfulResponseException} if the server returns an unexpected HTTP
* status). It must not take any direct action to affect the state of the connection, nor do
* any I/O of its own, but can return {@link Action#SHUTDOWN} to cause the connection to close.
* status, or {@link EOFException} if the streaming response has ended).
* <p>
* It must not take any direct action to affect the state of the connection, nor do any I/O of
* its own, but it can return {@link Action#SHUTDOWN} to cause the connection to close.
*
* @param t a {@code Throwable} object
* @return an {@link Action} constant
*/
Expand Down
241 changes: 125 additions & 116 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ public class EventSource implements Closeable {
private final OkHttpClient client;
private volatile Call call;
private final Random jitter = new Random();
private Response response;
private BufferedSource bufferedSource;

EventSource(Builder builder) {
this.name = builder.name == null ? "" : builder.name;
Expand All @@ -124,7 +122,8 @@ public class EventSource implements Closeable {
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority);
this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory);
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger);
this.connectionErrorHandler = builder.connectionErrorHandler;
this.connectionErrorHandler = builder.connectionErrorHandler == null ?
ConnectionErrorHandler.DEFAULT : builder.connectionErrorHandler;
this.readyState = new AtomicReference<>(RAW);
this.client = builder.clientBuilder.build();
}
Expand Down Expand Up @@ -154,7 +153,7 @@ public void start() {
}
logger.debug("readyState change: {} -> {}", RAW, CONNECTING);
logger.info("Starting EventSource client using URI: " + url);
streamExecutor.execute(this::connect);
streamExecutor.execute(this::run);
}

/**
Expand Down Expand Up @@ -198,8 +197,8 @@ public void close() {

closeCurrentStream(currentState);

eventExecutor.shutdownNow();
streamExecutor.shutdownNow();
eventExecutor.shutdown();
streamExecutor.shutdown();

// COVERAGE: these null guards are here for safety but in practice the values are never null and there
// is no way to cause them to be null in unit tests
Expand Down Expand Up @@ -242,13 +241,108 @@ Request buildRequest() {
return requestTransformer == null ? request : requestTransformer.transformRequest(request);
}

private void connect() {
response = null;
bufferedSource = null;

private void run() {
AtomicLong connectedTime = new AtomicLong();
int reconnectAttempts = 0;
ConnectionErrorHandler.Action errorHandlerAction = null;

try {
while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) {
if (reconnectAttempts == 0) {
reconnectAttempts++;
} else {
reconnectAttempts = maybeReconnectDelay(reconnectAttempts, connectedTime.get());
}
newConnectionAttempt(connectedTime);
}
} catch (RejectedExecutionException ignored) {
// COVERAGE: there is no way to simulate this condition in unit tests
call = null;
logger.debug("Rejected execution exception ignored: {}", ignored);
// During shutdown, we tried to send a message to the event handler
// Do not reconnect; the executor has been shut down
}
}

private int maybeReconnectDelay(int reconnectAttempts, long connectedTime) {
if (reconnectTime.isZero() || reconnectTime.isNegative()) {
return reconnectAttempts;
}

int counter = reconnectAttempts;

// Reset the backoff if we had a successful connection that stayed good for at least
// backoffResetThresholdMs milliseconds.
if (connectedTime > 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) {
counter = 1;
}

try {
Duration sleepTime = backoffWithJitter(counter);
logger.info("Waiting " + sleepTime.toMillis() + " milliseconds before reconnecting...");
Thread.sleep(sleepTime.toMillis());
} catch (InterruptedException ignored) { // COVERAGE: no way to cause this in unit tests
}

return ++counter;
}

private void newConnectionAttempt(AtomicLong connectedTime) {
ConnectionErrorHandler.Action errorHandlerAction = ConnectionErrorHandler.Action.PROCEED;

ReadyState stateBeforeConnecting = readyState.getAndSet(CONNECTING);
logger.debug("readyState change: {} -> {}", stateBeforeConnecting, CONNECTING);

connectedTime.set(0);
call = client.newCall(buildRequest());

try {
try (Response response = call.execute()) {
if (response.isSuccessful()) {
connectedTime.set(System.currentTimeMillis());
handleSuccessfulResponse(response);
} else {
logger.debug("Unsuccessful response: {}", response);
errorHandlerAction = dispatchError(new UnsuccessfulResponseException(response.code()));
}
}
// If handleSuccessfulResponse returned without throwing an exception, it means the server
// ended the stream. We don't call the handler's onError() method in this case; but we will
// call the ConnectionErrorHandler with an EOFException, in case it wants to do something
// special in this scenario (like choose not to retry the connection). However, first we
// should check the state in case we've been deliberately closed from elsewhere.
ReadyState state = readyState.get();
if (state != SHUTDOWN && state != CLOSED) {
logger.warn("Connection unexpectedly closed");
errorHandlerAction = connectionErrorHandler.onConnectionError(new EOFException());
}
} catch (IOException e) {
ReadyState state = readyState.get();
if (state != SHUTDOWN && state != CLOSED) {
logger.debug("Connection problem: {}", e);
errorHandlerAction = dispatchError(e);
}
} finally {
if (errorHandlerAction == ConnectionErrorHandler.Action.SHUTDOWN) {
logger.info("Connection has been explicitly shut down by error handler");
close();
} else {
boolean wasOpen = readyState.compareAndSet(OPEN, CLOSED);
boolean wasConnecting = readyState.compareAndSet(CONNECTING, CLOSED);
if (wasOpen) {
logger.debug("readyState change: {} -> {}", OPEN, CLOSED);
handler.onClosed();
} else if (wasConnecting) {
logger.debug("readyState change: {} -> {}", CONNECTING, CLOSED);
}
}
}
}

// Read the response body as an SSE stream and dispatch each received event to the EventHandler.
// This function exits in one of two ways:
// 1. A normal return - this means the response simply ended.
// 2. Throwing an IOException - there was an unexpected connection failure.
private void handleSuccessfulResponse(Response response) throws IOException {
ConnectionHandler connectionHandler = new ConnectionHandler() {
@Override
public void setReconnectionTime(Duration reconnectionTime) {
Expand All @@ -261,121 +355,36 @@ public void setLastEventId(String lastEventId) {
}
};

try {
while (!Thread.currentThread().isInterrupted() && readyState.get() != SHUTDOWN) {
long connectedTime = -1;

ReadyState currentState = readyState.getAndSet(CONNECTING);
logger.debug("readyState change: {} -> {}", currentState, CONNECTING);
try {
call = client.newCall(buildRequest());
response = call.execute();
if (response.isSuccessful()) {
connectedTime = System.currentTimeMillis();
currentState = readyState.getAndSet(OPEN);
if (currentState != CONNECTING) {
// COVERAGE: there is no way to simulate this condition in unit tests
logger.warn("Unexpected readyState change: " + currentState + " -> " + OPEN);
} else {
logger.debug("readyState change: {} -> {}", currentState, OPEN);
}
logger.info("Connected to Event Source stream.");
handler.onOpen();
if (bufferedSource != null) {
bufferedSource.close();
}
bufferedSource = Okio.buffer(response.body().source());
EventParser parser = new EventParser(url.uri(), handler, connectionHandler, logger);
// COVERAGE: the isInterrupted() condition is not encountered in unit tests and it's unclear if it can ever happen
for (String line; !Thread.currentThread().isInterrupted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) {
parser.line(line);
}
} else {
logger.debug("Unsuccessful response: {}", response);
errorHandlerAction = dispatchError(new UnsuccessfulResponseException(response.code()));
}
} catch (EOFException eofe) {
logger.warn("Connection unexpectedly closed.");
} catch (IOException ioe) {
ReadyState state = readyState.get();
if (state == SHUTDOWN) {
errorHandlerAction = ConnectionErrorHandler.Action.SHUTDOWN;
} else if (state == CLOSED) { // this happens if it's being restarted
errorHandlerAction = ConnectionErrorHandler.Action.PROCEED;
} else {
// COVERAGE: there is no way to simulate this condition in unit tests - closing the stream causes EOFException
logger.debug("Connection problem: {}", ioe);
errorHandlerAction = dispatchError(ioe);
}
} finally {
ReadyState nextState = CLOSED;
if (errorHandlerAction == ConnectionErrorHandler.Action.SHUTDOWN) {
if (readyState.get() != SHUTDOWN) {
logger.info("Connection has been explicitly shut down by error handler");
}
nextState = SHUTDOWN;
}
currentState = readyState.getAndSet(nextState);
logger.debug("readyState change: {} -> {}", currentState, nextState);

if (response != null && response.body() != null) {
response.close();
logger.debug("response closed", null);
}

if (bufferedSource != null) {
try {
bufferedSource.close();
logger.debug("buffered source closed", null);
} catch (IOException e) {
// COVERAGE: there is no way to simulate this condition in unit tests
logger.warn("Exception when closing bufferedSource: " + e.toString());
}
}

if (currentState == ReadyState.OPEN) {
handler.onClosed();
}

if (nextState != SHUTDOWN) {
// Reset the backoff if we had a successful connection that stayed good for at least
// backoffResetThresholdMs milliseconds.
if (connectedTime >= 0 && (System.currentTimeMillis() - connectedTime) >= backoffResetThreshold.toMillis()) {
reconnectAttempts = 0;
}
maybeWaitWithBackoff(++reconnectAttempts);
}
}
}
} catch (RejectedExecutionException ignored) {
ReadyState previousState = readyState.getAndSet(OPEN);
if (previousState != CONNECTING) {
// COVERAGE: there is no way to simulate this condition in unit tests
call = null;
response = null;
bufferedSource = null;
logger.debug("Rejected execution exception ignored: {}", ignored);
// During shutdown, we tried to send a message to the event handler
// Do not reconnect; the executor has been shut down
logger.warn("Unexpected readyState change: " + previousState + " -> " + OPEN);
} else {
logger.debug("readyState change: {} -> {}", previousState, OPEN);
}
logger.info("Connected to EventSource stream.");
handler.onOpen();

try (BufferedSource bufferedSource = Okio.buffer(response.body().source())) {
EventParser parser = new EventParser(url.uri(), handler, connectionHandler, logger);
// COVERAGE: the isInterrupted() condition is not encountered in unit tests and it's unclear if it can ever happen
for (String line; !Thread.currentThread().isInterrupted() &&
!bufferedSource.exhausted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) {
parser.line(line);
}
} catch (EOFException e) {
// This should not happen because bufferedSource.exhausted() should have returned true, but if
// it does happen, we'll treat it the same as a regular end of stream.
}
}

private ConnectionErrorHandler.Action dispatchError(Throwable t) {
ConnectionErrorHandler.Action action = connectionErrorHandler.onConnectionError(t);
if (action != ConnectionErrorHandler.Action.SHUTDOWN) {
handler.onError(t);
}
return action;
}

private void maybeWaitWithBackoff(int reconnectAttempts) {
if (!reconnectTime.isZero() && !reconnectTime.isNegative() && reconnectAttempts > 0) {
try {
Duration sleepTime = backoffWithJitter(reconnectAttempts);
logger.info("Waiting " + sleepTime.toMillis() + " milliseconds before reconnecting...");
Thread.sleep(sleepTime.toMillis());
} catch (InterruptedException ignored) {
}
}
}

Duration backoffWithJitter(int reconnectAttempts) {
long maxTimeLong = Math.min(maxReconnectTime.toMillis(), reconnectTime.toMillis() * pow2(reconnectAttempts));
Expand Down
Loading

0 comments on commit d4143ca

Please sign in to comment.