Skip to content

Commit

Permalink
[guava concurrent] Upstreamed a modified form of the modified interru…
Browse files Browse the repository at this point in the history
…ption behaviour from a fork of SerializingExecutor. Documented and tested the behaviour thoroughly.

RELNOTES=Adjusted the interruption behaviour of MoreExecutors.sequentialExecutor() to run tasks without a Thread interrupt marked (previously, interrupts leaked between tasks). If the Thread was interrupted when the Executor received it or during execution of its tasks, the thread will be re-interrupted before being yielded. Expanded the documentation of MoreExecutors.sequentialExecutor() to cover interruption and rejected execution behaviour in detail.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=174227926
  • Loading branch information
yorickhenning authored and cpovirk committed Nov 1, 2017
1 parent d31b6ce commit 40564c7
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.common.util.concurrent;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
Expand All @@ -37,9 +39,12 @@
* @author JJ Furman
*/
public class SequentialExecutorTest extends TestCase {

private static class FakeExecutor implements Executor {
Queue<Runnable> tasks = Queues.newArrayDeque();
@Override public void execute(Runnable command) {

@Override
public void execute(Runnable command) {
tasks.add(command);
}

Expand All @@ -58,6 +63,7 @@ void runAll() {
}
}
}

private FakeExecutor fakePool;
private SequentialExecutor e;

Expand All @@ -77,14 +83,15 @@ public void testConstructingWithNullExecutor_fails() {

public void testBasics() {
final AtomicInteger totalCalls = new AtomicInteger();
Runnable intCounter = new Runnable() {
@Override
public void run() {
totalCalls.incrementAndGet();
// Make sure that no other tasks are scheduled to run while this is running.
assertFalse(fakePool.hasNext());
}
};
Runnable intCounter =
new Runnable() {
@Override
public void run() {
totalCalls.incrementAndGet();
// Make sure that no other tasks are scheduled to run while this is running.
assertFalse(fakePool.hasNext());
}
};

assertFalse(fakePool.hasNext());
e.execute(intCounter);
Expand Down Expand Up @@ -137,13 +144,14 @@ public void testRuntimeException_doesNotStopExecution() {

final AtomicInteger numCalls = new AtomicInteger();

Runnable runMe = new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
throw new RuntimeException("FAKE EXCEPTION!");
}
};
Runnable runMe =
new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
throw new RuntimeException("FAKE EXCEPTION!");
}
};

e.execute(runMe);
e.execute(runMe);
Expand All @@ -152,16 +160,70 @@ public void run() {
assertEquals(2, numCalls.get());
}

public void testInterrupt_beforeRunRestoresInterruption() throws Exception {
// Run a task on the composed Executor that interrupts its thread (i.e. this thread).
fakePool.execute(
new Runnable() {
@Override
public void run() {
Thread.currentThread().interrupt();
}
});
// Run a task that expects that it is not interrupted while it is running.
e.execute(
new Runnable() {
@Override
public void run() {
assertThat(Thread.currentThread().isInterrupted()).isFalse();
}
});

// Run these together.
fakePool.runAll();

// Check that this thread has been marked as interrupted again now that the thread has been
// returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose
// JUnit or some other test case.
assertThat(Thread.currentThread().interrupted()).isTrue();
}

public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception {
// Run a task that interrupts its thread (i.e. this thread).
e.execute(
new Runnable() {
@Override
public void run() {
Thread.currentThread().interrupt();
}
});
// Run a task that expects that it is not interrupted while it is running.
e.execute(
new Runnable() {
@Override
public void run() {
assertThat(Thread.currentThread().isInterrupted()).isFalse();
}
});

// Run those tasks together.
fakePool.runAll();

// Check that the interruption of a SequentialExecutor's task is restored to the thread once
// it is yielded.
assertThat(Thread.currentThread().isInterrupted()).isTrue();
}

public void testInterrupt_doesNotStopExecution() {

final AtomicInteger numCalls = new AtomicInteger();

Runnable runMe = new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
}
};
Runnable runMe =
new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
}
};

Thread.currentThread().interrupt();

Expand All @@ -170,31 +232,36 @@ public void run() {
fakePool.runAll();

assertEquals(2, numCalls.get());

assertTrue(Thread.interrupted());
}

public void testDelegateRejection() {
final AtomicInteger numCalls = new AtomicInteger();
final AtomicBoolean reject = new AtomicBoolean(true);
final SequentialExecutor executor = new SequentialExecutor(
new Executor() {
@Override public void execute(Runnable r) {
if (reject.get()) {
throw new RejectedExecutionException();
}
r.run();
final SequentialExecutor executor =
new SequentialExecutor(
new Executor() {
@Override
public void execute(Runnable r) {
if (reject.get()) {
throw new RejectedExecutionException();
}
r.run();
}
});
Runnable task =
new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
}
});
Runnable task = new Runnable() {
@Override
public void run() {
numCalls.incrementAndGet();
}
};
};
try {
executor.execute(task);
fail();
} catch (RejectedExecutionException expected) {}
} catch (RejectedExecutionException expected) {
}
assertEquals(0, numCalls.get());
reject.set(false);
executor.execute(task);
Expand All @@ -208,30 +275,32 @@ class MyError extends Error {}
ExecutorService service = Executors.newSingleThreadExecutor();
try {
final SequentialExecutor executor = new SequentialExecutor(service);
Runnable errorTask = new Runnable() {
@Override
public void run() {
throw new MyError();
}
};
Runnable barrierTask = new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
Runnable errorTask =
new Runnable() {
@Override
public void run() {
throw new MyError();
}
};
Runnable barrierTask =
new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
executor.execute(errorTask);
service.execute(barrierTask); // submit directly to the service
service.execute(barrierTask); // submit directly to the service
// the barrier task runs after the error task so we know that the error has been observed by
// SequentialExecutor by the time the barrier is satified
barrier.await(10, TimeUnit.SECONDS);
barrier.await(1, TimeUnit.SECONDS);
executor.execute(barrierTask);
// timeout means the second task wasn't even tried
barrier.await(10, TimeUnit.SECONDS);
barrier.await(1, TimeUnit.SECONDS);
} finally {
service.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
// See newDirectExecutorService javadoc for behavioral notes.
@GwtIncompatible // TODO
private static final class DirectExecutorService extends AbstractListeningExecutorService {
/**
* Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor
*/
/** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */
private final Object lock = new Object();

/*
Expand Down Expand Up @@ -326,7 +324,7 @@ private void startTask() {
}
}

/**
/**
* Decrements the running task count.
*/
private void endTask() {
Expand Down Expand Up @@ -406,20 +404,40 @@ public String toString() {
}

/**
* Returns an {@link Executor} that runs each task executed sequentially, such that no
* two tasks are running concurrently.
* Returns an {@link Executor} that runs each task executed sequentially, such that no two tasks
* are running concurrently.
*
* <p>The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in
* turn, and does not create any threads of its own.
*
* <p>After execution starts on the {@code delegate} {@link Executor}, tasks are polled and
* executed from the queue until there are no more tasks. The thread will not be released until
* there are no more tasks to run.
* <p>After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are
* polled and executed from a task queue until there are no more tasks. The thread will not be
* released until there are no more tasks to run.
*
* <p>If a task is submitted while a thread is executing tasks from the task queue, the thread
* will not be released until that submitted task is also complete.
*
* <p>Tasks are always started with the Thread in an uninterrupted state.
*
* <p>If the thread is {@linkplain Thread#interrupt interrupted} while a task is running or before
* the thread is taken by the Executor:
*
* <ol>
* <li>execution will not stop until the task queue is empty.
* <li>the interrupt will be restored to the thread after it completes so that its {@code
* delegate} Executor may process the interrupt.
* </ol>
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until the next
* time a task is submitted.
*
* <p>If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps
* trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until
* the next time a task is submitted.
* <p>When an {@code Error} is thrown by an executed task, previously submitted tasks may never
* run. An attempt will be made to restart execution on the next call to {@code execute}. If the
* {@code delegate} has begun to reject execution, the previously submitted tasks may never run,
* despite not throwing a RejectedExecutionException synchronously with the call to {@code
* execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link
* Executors#newSingleThreadExecutor}).
*
* @deprecated Use {@link #newSequentialExecutor}. This method is scheduled for removal in
* January 2018.
Expand All @@ -439,14 +457,36 @@ public static Executor sequentialExecutor(Executor delegate) {
* <p>The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in
* turn, and does not create any threads of its own.
*
* <p>After execution starts on the {@code delegate} {@link Executor}, tasks are polled and
* executed from the queue until there are no more tasks. The thread will not be released until
* there are no more tasks to run.
* <p>After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are
* polled and executed from a task queue until there are no more tasks. The thread will not be
* released until there are no more tasks to run.
*
* <p>If a task is submitted while a thread is executing tasks from the task queue, the thread
* will not be released until that submitted task is also complete.
*
* <p>If a task is {@linkplain Thread#interrupt interrupted} while a task is running:
*
* <ol>
* <li>execution will not stop until the task queue is empty.
* <li>tasks will begin execution with the thread marked as not interrupted - any interruption
* applies only to the task that was running at the point of interruption.
* <li>if the thread was interrupted before the SequentialExecutor's worker begins execution,
* the interrupt will be restored to the thread after it completes so that its {@code
* delegate} Executor may process the interrupt.
* <li>subtasks are run with the thread uninterrupted and interrupts received during execution
* of a task are ignored.
* </ol>
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until the next
* time a task is submitted.
*
* <p>If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps
* trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until
* the next time a task is submitted.
* <p>When an {@code Error} is thrown by an executed task, previously submitted tasks may never
* run. An attempt will be made to restart execution on the next call to {@code execute}. If the
* {@code delegate} has begun to reject execution, the previously submitted tasks may never run,
* despite not throwing a RejectedExecutionException synchronously with the call to {@code
* execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link
* Executors#newSingleThreadExecutor}).
*
* @since 23.3 (since 23.1 as {@link #sequentialExecutor(Executor)})
*/
Expand Down
Loading

0 comments on commit 40564c7

Please sign in to comment.