Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bounded queues for the EventHandler thread #58

Merged
merged 5 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

/**
* Adapted from https://github.com/aslakhellesoy/eventsource-java/blob/master/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
Expand All @@ -15,15 +17,17 @@ class AsyncEventHandler implements EventHandler {
private final Executor executor;
private final EventHandler eventSourceHandler;
private final Logger logger;
final Semaphore semaphore; // visible for tests

AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger) {
AsyncEventHandler(Executor executor, EventHandler eventSourceHandler, Logger logger, Semaphore semaphore) {
this.executor = executor;
this.eventSourceHandler = eventSourceHandler;
this.logger = logger;
this.semaphore = semaphore;
}

public void onOpen() {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onOpen();
} catch (Exception e) {
Expand All @@ -33,7 +37,7 @@ public void onOpen() {
}

public void onClosed() {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onClosed();
} catch (Exception e) {
Expand All @@ -43,7 +47,7 @@ public void onClosed() {
}

public void onComment(final String comment) {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onComment(comment);
} catch (Exception e) {
Expand All @@ -53,7 +57,7 @@ public void onComment(final String comment) {
}

public void onMessage(final String event, final MessageEvent messageEvent) {
executor.execute(() -> {
execute(() -> {
try {
eventSourceHandler.onMessage(event, messageEvent);
} catch (Exception e) {
Expand All @@ -63,7 +67,7 @@ public void onMessage(final String event, final MessageEvent messageEvent) {
}

public void onError(final Throwable error) {
executor.execute(() -> {
execute(() -> {
onErrorInternal(error);
});
}
Expand All @@ -82,4 +86,37 @@ private void onErrorInternal(Throwable error) {
logger.debug("Stack trace: {}", new LazyStackTrace(error));
}
}

private void execute(Runnable task) {
acquire();
try {
executor.execute(() -> {
try {
task.run();
} finally {
release();
}
});
} catch (Exception e) {
// probably a RejectedExecutionException due to pool shutdown
release();
throw e;
}
}

private void acquire() {
if (semaphore != null) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RejectedExecutionException("Thread interrupted while waiting for event thread semaphore", e);
}
}
}

private void release() {
if (semaphore != null) {
semaphore.release();
}
}
}
59 changes: 56 additions & 3 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -95,7 +96,7 @@ public class EventSource implements Closeable {
final Duration maxReconnectTime; // visible for tests
final Duration backoffResetThreshold; // visible for tests
private volatile String lastEventId;
private final AsyncEventHandler handler;
final AsyncEventHandler handler; // visible for tests
private final ConnectionErrorHandler connectionErrorHandler;
private final AtomicReference<ReadyState> readyState;
private final OkHttpClient client;
Expand Down Expand Up @@ -124,7 +125,13 @@ public class EventSource implements Closeable {
this.eventExecutor = Executors.newSingleThreadExecutor(eventsThreadFactory);
ThreadFactory streamThreadFactory = createThreadFactory("okhttp-eventsource-stream", builder.threadPriority);
this.streamExecutor = Executors.newSingleThreadExecutor(streamThreadFactory);
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger);
Semaphore eventThreadSemaphore;
if (builder.maxEventTasksInFlight > 0) {
eventThreadSemaphore = new Semaphore(builder.maxEventTasksInFlight);
} else {
eventThreadSemaphore = null;
}
this.handler = new AsyncEventHandler(this.eventExecutor, builder.handler, logger, eventThreadSemaphore);
this.connectionErrorHandler = builder.connectionErrorHandler == null ?
ConnectionErrorHandler.DEFAULT : builder.connectionErrorHandler;
this.readBufferSize = builder.readBufferSize;
Expand Down Expand Up @@ -216,6 +223,35 @@ public void close() {
}
}
}

/**
* Block until all underlying threads have terminated and resources have been released.
*
* @param timeout maximum time to wait for everything to shut down
* @return {@code true} if all thread pools terminated within the specified timeout, {@code false} otherwise.
* @throws InterruptedException if this thread is interrupted while blocking
*/
public boolean awaitClosed(final Duration timeout) throws InterruptedException {
final long deadline = System.currentTimeMillis() + timeout.toMillis();

if (!eventExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return false;
}

long shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!streamExecutor.awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
}

if (client.dispatcher().executorService() != null) {
shutdownTimeoutMills = Math.max(0, deadline - System.currentTimeMillis());
if (!client.dispatcher().executorService().awaitTermination(shutdownTimeoutMills, TimeUnit.MILLISECONDS)) {
return false;
}
}

return true;
}

private void closeCurrentStream(ReadyState previousState) {
if (previousState == ReadyState.OPEN) {
Expand Down Expand Up @@ -516,6 +552,7 @@ public static final class Builder {
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private Logger logger = null;
private String loggerBaseName = null;
private int maxEventTasksInFlight = 0;

/**
* Creates a new builder.
Expand Down Expand Up @@ -887,7 +924,23 @@ public Builder loggerBaseName(String loggerBaseName) {
this.loggerBaseName = loggerBaseName;
return this;
}


/**
* Specifies the maximum number of tasks that can be "in-flight" for the thread executing {@link EventHandler}.
* A semaphore will be used to artificially constrain the number of tasks sitting in the queue fronting the
* event handler thread. When this limit is reached the stream thread will block until the backpressure passes.
* <p>
* For backward compatibility reasons the default is "unbounded".
*
* @param maxEventTasksInFlight the maximum number of tasks/messages that can be in-flight for the {@code EventHandler}
* @return the builder
* @since 2.4.0
*/
public Builder maxEventTasksInFlight(int maxEventTasksInFlight) {
this.maxEventTasksInFlight = maxEventTasksInFlight;
return this;
}

/**
* Constructs an {@link EventSource} using the builder's current properties.
* @return the new EventSource instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import com.launchdarkly.eventsource.Stubs.LogItem;
import com.launchdarkly.eventsource.Stubs.TestHandler;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -27,7 +33,7 @@ public AsyncEventHandlerTest() {
executor = Executors.newSingleThreadExecutor();
eventHandler = new TestHandler();
logger = mock(Logger.class);
asyncHandler = new AsyncEventHandler(executor, eventHandler, logger);
asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, null);
}

@After
Expand Down Expand Up @@ -105,4 +111,46 @@ public void errorFromOnErrorIsCaughtAndLogged() {
verify(logger).warn("Caught unexpected error from EventHandler.onError(): " + err2);
verify(logger, times(2)).debug(eq("Stack trace: {}"), any(LazyStackTrace.class));
}

@Test
public void backpressureOnQueueFull() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final CountDownLatch latch1 = new CountDownLatch(1);
EventHandler eventHandler = mock(EventHandler.class);
doAnswer(invocation -> {
latch1.await();
return null;
}).doNothing().when(eventHandler).onMessage(anyString(), any(MessageEvent.class));

final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
ExecutorService blockable = Executors.newSingleThreadExecutor();
try {
blockable.execute(() -> {
AsyncEventHandler asyncHandler = new AsyncEventHandler(executor, eventHandler, logger, new Semaphore(1));

asyncHandler.onOpen();

asyncHandler.onMessage("message", new MessageEvent("hello world"));
latch2.countDown();
asyncHandler.onMessage("message", new MessageEvent("goodbye horses"));
latch3.countDown();
});

assertTrue("Expected latch2 to trip", latch2.await(5, TimeUnit.SECONDS));
assertFalse("Expected latch3 not to trip", latch3.await(250, TimeUnit.MILLISECONDS));
latch1.countDown();
assertTrue("Expected latch3 to trip", latch3.await(5, TimeUnit.SECONDS));
} finally {
latch1.countDown();
latch2.countDown();
latch3.countDown();
blockable.shutdown();
assertTrue("Expected background thread to terminate", blockable.awaitTermination(5, TimeUnit.SECONDS));
}
} finally {
executor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -376,4 +378,18 @@ public void customLogger() {
assertSame(myLogger, es.logger);
}
}

@Test
public void defaultEventThreadWorkQueueCapacity() {
try (EventSource es = builder.build()) {
assertNull(es.handler.semaphore);
}
}

@Test
public void eventThreadWorkQueueCapacity() {
try (EventSource es = builder.maxEventTasksInFlight(8).build()) {
assertEquals(8, es.handler.semaphore.availablePermits());
}
}
}
Loading