From 40564c7415ab5fbd2dc89527d38493bc0bd2bf86 Mon Sep 17 00:00:00 2001 From: yorick Date: Wed, 1 Nov 2017 13:35:05 -0700 Subject: [PATCH] [guava concurrent] Upstreamed a modified form of the modified interruption behaviour from a fork of SerializingExecutor. Documented and tested the behaviour thoroughly. RELNOTES=Adjusted the interruption behaviour of MoreExecutors.sequentialExecutor() to run tasks without a Thread interrupt marked (previously, interrupts leaked between tasks). If the Thread was interrupted when the Executor received it or during execution of its tasks, the thread will be re-interrupted before being yielded. Expanded the documentation of MoreExecutors.sequentialExecutor() to cover interruption and rejected execution behaviour in detail. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=174227926 --- .../concurrent/SequentialExecutorTest.java | 181 ++++++++++++------ .../common/util/concurrent/MoreExecutors.java | 80 ++++++-- .../util/concurrent/SequentialExecutor.java | 62 ++++-- .../concurrent/SequentialExecutorTest.java | 181 ++++++++++++------ .../common/util/concurrent/MoreExecutors.java | 80 ++++++-- .../util/concurrent/SequentialExecutor.java | 62 ++++-- 6 files changed, 458 insertions(+), 188 deletions(-) diff --git a/android/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java b/android/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java index 388f0f76b81e..30f6477cecb7 100644 --- a/android/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java +++ b/android/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java @@ -16,6 +16,8 @@ package com.google.common.util.concurrent; +import static com.google.common.truth.Truth.assertThat; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Queues; @@ -37,9 +39,12 @@ * @author JJ Furman */ public class SequentialExecutorTest extends TestCase { + private static class FakeExecutor implements Executor { Queue tasks = Queues.newArrayDeque(); - @Override public void execute(Runnable command) { + + @Override + public void execute(Runnable command) { tasks.add(command); } @@ -58,6 +63,7 @@ void runAll() { } } } + private FakeExecutor fakePool; private SequentialExecutor e; @@ -77,14 +83,15 @@ public void testConstructingWithNullExecutor_fails() { public void testBasics() { final AtomicInteger totalCalls = new AtomicInteger(); - Runnable intCounter = new Runnable() { - @Override - public void run() { - totalCalls.incrementAndGet(); - // Make sure that no other tasks are scheduled to run while this is running. - assertFalse(fakePool.hasNext()); - } - }; + Runnable intCounter = + new Runnable() { + @Override + public void run() { + totalCalls.incrementAndGet(); + // Make sure that no other tasks are scheduled to run while this is running. + assertFalse(fakePool.hasNext()); + } + }; assertFalse(fakePool.hasNext()); e.execute(intCounter); @@ -137,13 +144,14 @@ public void testRuntimeException_doesNotStopExecution() { final AtomicInteger numCalls = new AtomicInteger(); - Runnable runMe = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - throw new RuntimeException("FAKE EXCEPTION!"); - } - }; + Runnable runMe = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); + throw new RuntimeException("FAKE EXCEPTION!"); + } + }; e.execute(runMe); e.execute(runMe); @@ -152,16 +160,70 @@ public void run() { assertEquals(2, numCalls.get()); } + public void testInterrupt_beforeRunRestoresInterruption() throws Exception { + // Run a task on the composed Executor that interrupts its thread (i.e. this thread). + fakePool.execute( + new Runnable() { + @Override + public void run() { + Thread.currentThread().interrupt(); + } + }); + // Run a task that expects that it is not interrupted while it is running. + e.execute( + new Runnable() { + @Override + public void run() { + assertThat(Thread.currentThread().isInterrupted()).isFalse(); + } + }); + + // Run these together. + fakePool.runAll(); + + // Check that this thread has been marked as interrupted again now that the thread has been + // returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose + // JUnit or some other test case. + assertThat(Thread.currentThread().interrupted()).isTrue(); + } + + public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception { + // Run a task that interrupts its thread (i.e. this thread). + e.execute( + new Runnable() { + @Override + public void run() { + Thread.currentThread().interrupt(); + } + }); + // Run a task that expects that it is not interrupted while it is running. + e.execute( + new Runnable() { + @Override + public void run() { + assertThat(Thread.currentThread().isInterrupted()).isFalse(); + } + }); + + // Run those tasks together. + fakePool.runAll(); + + // Check that the interruption of a SequentialExecutor's task is restored to the thread once + // it is yielded. + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + public void testInterrupt_doesNotStopExecution() { final AtomicInteger numCalls = new AtomicInteger(); - Runnable runMe = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - } - }; + Runnable runMe = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); + } + }; Thread.currentThread().interrupt(); @@ -170,31 +232,36 @@ public void run() { fakePool.runAll(); assertEquals(2, numCalls.get()); + assertTrue(Thread.interrupted()); } public void testDelegateRejection() { final AtomicInteger numCalls = new AtomicInteger(); final AtomicBoolean reject = new AtomicBoolean(true); - final SequentialExecutor executor = new SequentialExecutor( - new Executor() { - @Override public void execute(Runnable r) { - if (reject.get()) { - throw new RejectedExecutionException(); - } - r.run(); + final SequentialExecutor executor = + new SequentialExecutor( + new Executor() { + @Override + public void execute(Runnable r) { + if (reject.get()) { + throw new RejectedExecutionException(); + } + r.run(); + } + }); + Runnable task = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); } - }); - Runnable task = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - } - }; + }; try { executor.execute(task); fail(); - } catch (RejectedExecutionException expected) {} + } catch (RejectedExecutionException expected) { + } assertEquals(0, numCalls.get()); reject.set(false); executor.execute(task); @@ -208,30 +275,32 @@ class MyError extends Error {} ExecutorService service = Executors.newSingleThreadExecutor(); try { final SequentialExecutor executor = new SequentialExecutor(service); - Runnable errorTask = new Runnable() { - @Override - public void run() { - throw new MyError(); - } - }; - Runnable barrierTask = new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; + Runnable errorTask = + new Runnable() { + @Override + public void run() { + throw new MyError(); + } + }; + Runnable barrierTask = + new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; executor.execute(errorTask); - service.execute(barrierTask); // submit directly to the service + service.execute(barrierTask); // submit directly to the service // the barrier task runs after the error task so we know that the error has been observed by // SequentialExecutor by the time the barrier is satified - barrier.await(10, TimeUnit.SECONDS); + barrier.await(1, TimeUnit.SECONDS); executor.execute(barrierTask); // timeout means the second task wasn't even tried - barrier.await(10, TimeUnit.SECONDS); + barrier.await(1, TimeUnit.SECONDS); } finally { service.shutdown(); } diff --git a/android/guava/src/com/google/common/util/concurrent/MoreExecutors.java b/android/guava/src/com/google/common/util/concurrent/MoreExecutors.java index 39048dda98ac..320bb73a4acd 100644 --- a/android/guava/src/com/google/common/util/concurrent/MoreExecutors.java +++ b/android/guava/src/com/google/common/util/concurrent/MoreExecutors.java @@ -235,9 +235,7 @@ private static void useDaemonThreadFactory(ThreadPoolExecutor executor) { // See newDirectExecutorService javadoc for behavioral notes. @GwtIncompatible // TODO private static final class DirectExecutorService extends AbstractListeningExecutorService { - /** - * Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor - */ + /** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */ private final Object lock = new Object(); /* @@ -326,7 +324,7 @@ private void startTask() { } } - /** + /** * Decrements the running task count. */ private void endTask() { @@ -406,20 +404,40 @@ public String toString() { } /** - * Returns an {@link Executor} that runs each task executed sequentially, such that no - * two tasks are running concurrently. + * Returns an {@link Executor} that runs each task executed sequentially, such that no two tasks + * are running concurrently. * *

The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in * turn, and does not create any threads of its own. * - *

After execution starts on the {@code delegate} {@link Executor}, tasks are polled and - * executed from the queue until there are no more tasks. The thread will not be released until - * there are no more tasks to run. + *

After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are + * polled and executed from a task queue until there are no more tasks. The thread will not be + * released until there are no more tasks to run. + * + *

If a task is submitted while a thread is executing tasks from the task queue, the thread + * will not be released until that submitted task is also complete. + * + *

Tasks are always started with the Thread in an uninterrupted state. + * + *

If the thread is {@linkplain Thread#interrupt interrupted} while a task is running or before + * the thread is taken by the Executor: + * + *

    + *
  1. execution will not stop until the task queue is empty. + *
  2. the interrupt will be restored to the thread after it completes so that its {@code + * delegate} Executor may process the interrupt. + *
+ * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until the next + * time a task is submitted. * - *

If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * the next time a task is submitted. + *

When an {@code Error} is thrown by an executed task, previously submitted tasks may never + * run. An attempt will be made to restart execution on the next call to {@code execute}. If the + * {@code delegate} has begun to reject execution, the previously submitted tasks may never run, + * despite not throwing a RejectedExecutionException synchronously with the call to {@code + * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link + * Executors#newSingleThreadExecutor}). * * @deprecated Use {@link #newSequentialExecutor}. This method is scheduled for removal in * January 2018. @@ -439,14 +457,36 @@ public static Executor sequentialExecutor(Executor delegate) { *

The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in * turn, and does not create any threads of its own. * - *

After execution starts on the {@code delegate} {@link Executor}, tasks are polled and - * executed from the queue until there are no more tasks. The thread will not be released until - * there are no more tasks to run. + *

After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are + * polled and executed from a task queue until there are no more tasks. The thread will not be + * released until there are no more tasks to run. + * + *

If a task is submitted while a thread is executing tasks from the task queue, the thread + * will not be released until that submitted task is also complete. + * + *

If a task is {@linkplain Thread#interrupt interrupted} while a task is running: + * + *

    + *
  1. execution will not stop until the task queue is empty. + *
  2. tasks will begin execution with the thread marked as not interrupted - any interruption + * applies only to the task that was running at the point of interruption. + *
  3. if the thread was interrupted before the SequentialExecutor's worker begins execution, + * the interrupt will be restored to the thread after it completes so that its {@code + * delegate} Executor may process the interrupt. + *
  4. subtasks are run with the thread uninterrupted and interrupts received during execution + * of a task are ignored. + *
+ * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until the next + * time a task is submitted. * - *

If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * the next time a task is submitted. + *

When an {@code Error} is thrown by an executed task, previously submitted tasks may never + * run. An attempt will be made to restart execution on the next call to {@code execute}. If the + * {@code delegate} has begun to reject execution, the previously submitted tasks may never run, + * despite not throwing a RejectedExecutionException synchronously with the call to {@code + * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link + * Executors#newSingleThreadExecutor}). * * @since 23.3 (since 23.1 as {@link #sequentialExecutor(Executor)}) */ diff --git a/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java b/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java index c68530f26d5f..6440221f9ea7 100644 --- a/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java +++ b/android/guava/src/com/google/common/util/concurrent/SequentialExecutor.java @@ -32,9 +32,11 @@ * *

The execution of tasks is done by one thread as long as there are tasks left in the queue. * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * it is restarted by a call to {@link #execute}. + * continues. See {@link QueueWorker#workOnQueue} for details. + * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until it is + * restarted by a call to {@link #execute}. */ @GwtIncompatible final class SequentialExecutor implements Executor { @@ -100,9 +102,7 @@ private void startQueueWorker() { } } - /** - * Worker that runs tasks from {@link #queue} until it is empty. - */ + /** Worker that runs tasks from {@link #queue} until it is empty. */ @WeakOuter private final class QueueWorker implements Runnable { @Override @@ -120,21 +120,47 @@ public void run() { } } + /** + * Continues executing tasks from {@link #queue} until it is empty. + * + *

The thread's interrupt bit is cleared before execution of each task. + * + *

If the Thread in use is interrupted before or during execution of the tasks in + * {@link #queue}, the Executor will complete its tasks, and then restore the interruption. + * This means that once the Thread returns to the Executor that this Executor composes, the + * interruption will still be present. If the composed Executor is an ExecutorService, it can + * respond to shutdown() by returning tasks queued on that Thread after {@link #worker} drains + * the queue. + */ private void workOnQueue() { - while (true) { - Runnable task = null; - synchronized (queue) { - // TODO(user): How should we handle interrupts and shutdowns? - task = queue.poll(); - if (task == null) { - isWorkerRunning = false; - return; + boolean interruptedDuringTask = false; + + try { + while (true) { + // Remove the interrupt bit before each task. The interrupt is for the "current task" when + // it is sent, so subsequent tasks in the queue should not be caused to be interrupted + // by a previous one in the queue being interrupted. + interruptedDuringTask |= Thread.interrupted(); + Runnable task; + synchronized (queue) { + task = queue.poll(); + if (task == null) { + isWorkerRunning = false; + return; + } + } + try { + task.run(); + } catch (RuntimeException e) { + log.log(Level.SEVERE, "Exception while executing runnable " + task, e); } } - try { - task.run(); - } catch (RuntimeException e) { - log.log(Level.SEVERE, "Exception while executing runnable " + task, e); + } finally { + // Ensure that if the thread was interrupted at all while processing the task queue, it + // is returned to the delegate Executor interrupted so that it may handle the + // interruption if it likes. + if (interruptedDuringTask) { + Thread.currentThread().interrupt(); } } } diff --git a/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java b/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java index 388f0f76b81e..30f6477cecb7 100644 --- a/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/SequentialExecutorTest.java @@ -16,6 +16,8 @@ package com.google.common.util.concurrent; +import static com.google.common.truth.Truth.assertThat; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Queues; @@ -37,9 +39,12 @@ * @author JJ Furman */ public class SequentialExecutorTest extends TestCase { + private static class FakeExecutor implements Executor { Queue tasks = Queues.newArrayDeque(); - @Override public void execute(Runnable command) { + + @Override + public void execute(Runnable command) { tasks.add(command); } @@ -58,6 +63,7 @@ void runAll() { } } } + private FakeExecutor fakePool; private SequentialExecutor e; @@ -77,14 +83,15 @@ public void testConstructingWithNullExecutor_fails() { public void testBasics() { final AtomicInteger totalCalls = new AtomicInteger(); - Runnable intCounter = new Runnable() { - @Override - public void run() { - totalCalls.incrementAndGet(); - // Make sure that no other tasks are scheduled to run while this is running. - assertFalse(fakePool.hasNext()); - } - }; + Runnable intCounter = + new Runnable() { + @Override + public void run() { + totalCalls.incrementAndGet(); + // Make sure that no other tasks are scheduled to run while this is running. + assertFalse(fakePool.hasNext()); + } + }; assertFalse(fakePool.hasNext()); e.execute(intCounter); @@ -137,13 +144,14 @@ public void testRuntimeException_doesNotStopExecution() { final AtomicInteger numCalls = new AtomicInteger(); - Runnable runMe = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - throw new RuntimeException("FAKE EXCEPTION!"); - } - }; + Runnable runMe = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); + throw new RuntimeException("FAKE EXCEPTION!"); + } + }; e.execute(runMe); e.execute(runMe); @@ -152,16 +160,70 @@ public void run() { assertEquals(2, numCalls.get()); } + public void testInterrupt_beforeRunRestoresInterruption() throws Exception { + // Run a task on the composed Executor that interrupts its thread (i.e. this thread). + fakePool.execute( + new Runnable() { + @Override + public void run() { + Thread.currentThread().interrupt(); + } + }); + // Run a task that expects that it is not interrupted while it is running. + e.execute( + new Runnable() { + @Override + public void run() { + assertThat(Thread.currentThread().isInterrupted()).isFalse(); + } + }); + + // Run these together. + fakePool.runAll(); + + // Check that this thread has been marked as interrupted again now that the thread has been + // returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose + // JUnit or some other test case. + assertThat(Thread.currentThread().interrupted()).isTrue(); + } + + public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception { + // Run a task that interrupts its thread (i.e. this thread). + e.execute( + new Runnable() { + @Override + public void run() { + Thread.currentThread().interrupt(); + } + }); + // Run a task that expects that it is not interrupted while it is running. + e.execute( + new Runnable() { + @Override + public void run() { + assertThat(Thread.currentThread().isInterrupted()).isFalse(); + } + }); + + // Run those tasks together. + fakePool.runAll(); + + // Check that the interruption of a SequentialExecutor's task is restored to the thread once + // it is yielded. + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + public void testInterrupt_doesNotStopExecution() { final AtomicInteger numCalls = new AtomicInteger(); - Runnable runMe = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - } - }; + Runnable runMe = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); + } + }; Thread.currentThread().interrupt(); @@ -170,31 +232,36 @@ public void run() { fakePool.runAll(); assertEquals(2, numCalls.get()); + assertTrue(Thread.interrupted()); } public void testDelegateRejection() { final AtomicInteger numCalls = new AtomicInteger(); final AtomicBoolean reject = new AtomicBoolean(true); - final SequentialExecutor executor = new SequentialExecutor( - new Executor() { - @Override public void execute(Runnable r) { - if (reject.get()) { - throw new RejectedExecutionException(); - } - r.run(); + final SequentialExecutor executor = + new SequentialExecutor( + new Executor() { + @Override + public void execute(Runnable r) { + if (reject.get()) { + throw new RejectedExecutionException(); + } + r.run(); + } + }); + Runnable task = + new Runnable() { + @Override + public void run() { + numCalls.incrementAndGet(); } - }); - Runnable task = new Runnable() { - @Override - public void run() { - numCalls.incrementAndGet(); - } - }; + }; try { executor.execute(task); fail(); - } catch (RejectedExecutionException expected) {} + } catch (RejectedExecutionException expected) { + } assertEquals(0, numCalls.get()); reject.set(false); executor.execute(task); @@ -208,30 +275,32 @@ class MyError extends Error {} ExecutorService service = Executors.newSingleThreadExecutor(); try { final SequentialExecutor executor = new SequentialExecutor(service); - Runnable errorTask = new Runnable() { - @Override - public void run() { - throw new MyError(); - } - }; - Runnable barrierTask = new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; + Runnable errorTask = + new Runnable() { + @Override + public void run() { + throw new MyError(); + } + }; + Runnable barrierTask = + new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; executor.execute(errorTask); - service.execute(barrierTask); // submit directly to the service + service.execute(barrierTask); // submit directly to the service // the barrier task runs after the error task so we know that the error has been observed by // SequentialExecutor by the time the barrier is satified - barrier.await(10, TimeUnit.SECONDS); + barrier.await(1, TimeUnit.SECONDS); executor.execute(barrierTask); // timeout means the second task wasn't even tried - barrier.await(10, TimeUnit.SECONDS); + barrier.await(1, TimeUnit.SECONDS); } finally { service.shutdown(); } diff --git a/guava/src/com/google/common/util/concurrent/MoreExecutors.java b/guava/src/com/google/common/util/concurrent/MoreExecutors.java index 39048dda98ac..320bb73a4acd 100644 --- a/guava/src/com/google/common/util/concurrent/MoreExecutors.java +++ b/guava/src/com/google/common/util/concurrent/MoreExecutors.java @@ -235,9 +235,7 @@ private static void useDaemonThreadFactory(ThreadPoolExecutor executor) { // See newDirectExecutorService javadoc for behavioral notes. @GwtIncompatible // TODO private static final class DirectExecutorService extends AbstractListeningExecutorService { - /** - * Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor - */ + /** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */ private final Object lock = new Object(); /* @@ -326,7 +324,7 @@ private void startTask() { } } - /** + /** * Decrements the running task count. */ private void endTask() { @@ -406,20 +404,40 @@ public String toString() { } /** - * Returns an {@link Executor} that runs each task executed sequentially, such that no - * two tasks are running concurrently. + * Returns an {@link Executor} that runs each task executed sequentially, such that no two tasks + * are running concurrently. * *

The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in * turn, and does not create any threads of its own. * - *

After execution starts on the {@code delegate} {@link Executor}, tasks are polled and - * executed from the queue until there are no more tasks. The thread will not be released until - * there are no more tasks to run. + *

After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are + * polled and executed from a task queue until there are no more tasks. The thread will not be + * released until there are no more tasks to run. + * + *

If a task is submitted while a thread is executing tasks from the task queue, the thread + * will not be released until that submitted task is also complete. + * + *

Tasks are always started with the Thread in an uninterrupted state. + * + *

If the thread is {@linkplain Thread#interrupt interrupted} while a task is running or before + * the thread is taken by the Executor: + * + *

    + *
  1. execution will not stop until the task queue is empty. + *
  2. the interrupt will be restored to the thread after it completes so that its {@code + * delegate} Executor may process the interrupt. + *
+ * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until the next + * time a task is submitted. * - *

If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * the next time a task is submitted. + *

When an {@code Error} is thrown by an executed task, previously submitted tasks may never + * run. An attempt will be made to restart execution on the next call to {@code execute}. If the + * {@code delegate} has begun to reject execution, the previously submitted tasks may never run, + * despite not throwing a RejectedExecutionException synchronously with the call to {@code + * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link + * Executors#newSingleThreadExecutor}). * * @deprecated Use {@link #newSequentialExecutor}. This method is scheduled for removal in * January 2018. @@ -439,14 +457,36 @@ public static Executor sequentialExecutor(Executor delegate) { *

The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in * turn, and does not create any threads of its own. * - *

After execution starts on the {@code delegate} {@link Executor}, tasks are polled and - * executed from the queue until there are no more tasks. The thread will not be released until - * there are no more tasks to run. + *

After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are + * polled and executed from a task queue until there are no more tasks. The thread will not be + * released until there are no more tasks to run. + * + *

If a task is submitted while a thread is executing tasks from the task queue, the thread + * will not be released until that submitted task is also complete. + * + *

If a task is {@linkplain Thread#interrupt interrupted} while a task is running: + * + *

    + *
  1. execution will not stop until the task queue is empty. + *
  2. tasks will begin execution with the thread marked as not interrupted - any interruption + * applies only to the task that was running at the point of interruption. + *
  3. if the thread was interrupted before the SequentialExecutor's worker begins execution, + * the interrupt will be restored to the thread after it completes so that its {@code + * delegate} Executor may process the interrupt. + *
  4. subtasks are run with the thread uninterrupted and interrupts received during execution + * of a task are ignored. + *
+ * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until the next + * time a task is submitted. * - *

If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * the next time a task is submitted. + *

When an {@code Error} is thrown by an executed task, previously submitted tasks may never + * run. An attempt will be made to restart execution on the next call to {@code execute}. If the + * {@code delegate} has begun to reject execution, the previously submitted tasks may never run, + * despite not throwing a RejectedExecutionException synchronously with the call to {@code + * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link + * Executors#newSingleThreadExecutor}). * * @since 23.3 (since 23.1 as {@link #sequentialExecutor(Executor)}) */ diff --git a/guava/src/com/google/common/util/concurrent/SequentialExecutor.java b/guava/src/com/google/common/util/concurrent/SequentialExecutor.java index c68530f26d5f..6440221f9ea7 100644 --- a/guava/src/com/google/common/util/concurrent/SequentialExecutor.java +++ b/guava/src/com/google/common/util/concurrent/SequentialExecutor.java @@ -32,9 +32,11 @@ * *

The execution of tasks is done by one thread as long as there are tasks left in the queue. * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks - * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps - * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until - * it is restarted by a call to {@link #execute}. + * continues. See {@link QueueWorker#workOnQueue} for details. + * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until it is + * restarted by a call to {@link #execute}. */ @GwtIncompatible final class SequentialExecutor implements Executor { @@ -100,9 +102,7 @@ private void startQueueWorker() { } } - /** - * Worker that runs tasks from {@link #queue} until it is empty. - */ + /** Worker that runs tasks from {@link #queue} until it is empty. */ @WeakOuter private final class QueueWorker implements Runnable { @Override @@ -120,21 +120,47 @@ public void run() { } } + /** + * Continues executing tasks from {@link #queue} until it is empty. + * + *

The thread's interrupt bit is cleared before execution of each task. + * + *

If the Thread in use is interrupted before or during execution of the tasks in + * {@link #queue}, the Executor will complete its tasks, and then restore the interruption. + * This means that once the Thread returns to the Executor that this Executor composes, the + * interruption will still be present. If the composed Executor is an ExecutorService, it can + * respond to shutdown() by returning tasks queued on that Thread after {@link #worker} drains + * the queue. + */ private void workOnQueue() { - while (true) { - Runnable task = null; - synchronized (queue) { - // TODO(user): How should we handle interrupts and shutdowns? - task = queue.poll(); - if (task == null) { - isWorkerRunning = false; - return; + boolean interruptedDuringTask = false; + + try { + while (true) { + // Remove the interrupt bit before each task. The interrupt is for the "current task" when + // it is sent, so subsequent tasks in the queue should not be caused to be interrupted + // by a previous one in the queue being interrupted. + interruptedDuringTask |= Thread.interrupted(); + Runnable task; + synchronized (queue) { + task = queue.poll(); + if (task == null) { + isWorkerRunning = false; + return; + } + } + try { + task.run(); + } catch (RuntimeException e) { + log.log(Level.SEVERE, "Exception while executing runnable " + task, e); } } - try { - task.run(); - } catch (RuntimeException e) { - log.log(Level.SEVERE, "Exception while executing runnable " + task, e); + } finally { + // Ensure that if the thread was interrupted at all while processing the task queue, it + // is returned to the delegate Executor interrupted so that it may handle the + // interruption if it likes. + if (interruptedDuringTask) { + Thread.currentThread().interrupt(); } } }