Skip to content
This repository has been archived by the owner on Sep 2, 2020. It is now read-only.

Commit

Permalink
Fix: continuation with bad Executor leads to un-predictable result an…
Browse files Browse the repository at this point in the history
…d the Exception is hidden

* Executor execute function now wrapped with try-catch for errors coming from the Executor execute method
  • Loading branch information
vcube-yunarta committed Jan 13, 2016
1 parent 87c91f9 commit b1b4b85
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 65 deletions.
11 changes: 11 additions & 0 deletions bolts-tasks/src/main/java/bolts/ExecutorException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package bolts;

/**
* This is a wrapper class for emphasizing that task failed due to bad Executor, rather than the continuation block it self.
*/
public class ExecutorException extends RuntimeException {

public ExecutorException(Exception e) {
super("An exception was thrown by a Executor", e);
}
}
143 changes: 78 additions & 65 deletions bolts-tasks/src/main/java/bolts/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,23 +294,28 @@ public static <TResult> Task<TResult> call(final Callable<TResult> callable, Exe
public static <TResult> Task<TResult> call(final Callable<TResult> callable, Executor executor,
final CancellationToken ct) {
final bolts.TaskCompletionSource<TResult> tcs = new bolts.TaskCompletionSource<>();
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}

try {
tcs.setResult(callable.call());
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
try {
tcs.setResult(callable.call());
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
}
}
}
});
});
} catch (Exception e) {
tcs.setError(new ExecutorException(e));
}

return tcs.getTask();
}

Expand Down Expand Up @@ -800,24 +805,28 @@ private static <TContinuationResult, TResult> void completeImmediately(
final bolts.TaskCompletionSource<TContinuationResult> tcs,
final Continuation<TResult, TContinuationResult> continuation, final Task<TResult> task,
Executor executor, final CancellationToken ct) {
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}

try {
TContinuationResult result = continuation.then(task);
tcs.setResult(result);
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
try {
TContinuationResult result = continuation.then(task);
tcs.setResult(result);
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
}
}
}
});
});
} catch (Exception e) {
tcs.setError(new ExecutorException(e));
}
}

/**
Expand All @@ -841,45 +850,49 @@ private static <TContinuationResult, TResult> void completeAfterTask(
final Continuation<TResult, Task<TContinuationResult>> continuation,
final Task<TResult> task, final Executor executor,
final CancellationToken ct) {
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return;
}

try {
Task<TContinuationResult> result = continuation.then(task);
if (result == null) {
tcs.setResult(null);
} else {
result.continueWith(new Continuation<TContinuationResult, Void>() {
@Override
public Void then(Task<TContinuationResult> task) {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
try {
Task<TContinuationResult> result = continuation.then(task);
if (result == null) {
tcs.setResult(null);
} else {
result.continueWith(new Continuation<TContinuationResult, Void>() {
@Override
public Void then(Task<TContinuationResult> task) {
if (ct != null && ct.isCancellationRequested()) {
tcs.setCancelled();
return null;
}

if (task.isCancelled()) {
tcs.setCancelled();
} else if (task.isFaulted()) {
tcs.setError(task.getError());
} else {
tcs.setResult(task.getResult());
}
return null;
}

if (task.isCancelled()) {
tcs.setCancelled();
} else if (task.isFaulted()) {
tcs.setError(task.getError());
} else {
tcs.setResult(task.getResult());
}
return null;
}
});
});
}
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
}
} catch (CancellationException e) {
tcs.setCancelled();
} catch (Exception e) {
tcs.setError(e);
}
}
});
});
} catch (Exception e) {
tcs.setError(new ExecutorException(e));
}
}

private void runContinuations() {
Expand Down
86 changes: 86 additions & 0 deletions bolts-tasks/src/test/java/bolts/TaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -1026,6 +1027,91 @@ public Void then(Task<Void> task) throws Exception {
});
}

@Test
public void testCallWithBadExecutor() {
final RuntimeException exception = new RuntimeException("BAD EXECUTORS");

Task.call(new Callable<Integer>() {
public Integer call() throws Exception {
return 1;
}
}, new Executor() {
@Override
public void execute(Runnable command) {
throw exception;
}
}).continueWith(new Continuation<Integer, Object>() {
@Override
public Object then(Task<Integer> task) throws Exception {
assertTrue(task.isFaulted());
assertTrue(task.getError() instanceof ExecutorException);
assertEquals(exception, task.getError().getCause());

return null;
}
});
}

@Test
public void testContinueWithBadExecutor() {
final RuntimeException exception = new RuntimeException("BAD EXECUTORS");

Task.call(new Callable<Integer>() {
public Integer call() throws Exception {
return 1;
}
}).continueWith(new Continuation<Integer, Integer>() {
@Override
public Integer then(Task<Integer> task) throws Exception {
return task.getResult();
}
}, new Executor() {
@Override
public void execute(Runnable command) {
throw exception;
}
}).continueWith(new Continuation<Integer, Object>() {
@Override
public Object then(Task<Integer> task) throws Exception {
assertTrue(task.isFaulted());
assertTrue(task.getError() instanceof ExecutorException);
assertEquals(exception, task.getError().getCause());

return null;
}
});
}

@Test
public void testContinueWithTaskAndBadExecutor() {
final RuntimeException exception = new RuntimeException("BAD EXECUTORS");

Task.call(new Callable<Integer>() {
public Integer call() throws Exception {
return 1;
}
}).continueWithTask(new Continuation<Integer, Task<Integer>>() {
@Override
public Task<Integer> then(Task<Integer> task) throws Exception {
return task;
}
}, new Executor() {
@Override
public void execute(Runnable command) {
throw exception;
}
}).continueWith(new Continuation<Integer, Object>() {
@Override
public Object then(Task<Integer> task) throws Exception {
assertTrue(task.isFaulted());
assertTrue(task.getError() instanceof ExecutorException);
assertEquals(exception, task.getError().getCause());

return null;
}
});
}

//region TaskCompletionSource

@Test
Expand Down

0 comments on commit b1b4b85

Please sign in to comment.