Skip to content

Commit

Permalink
Merge pull request #1218 from mattrjacobs/missing-command-concurrency…
Browse files Browse the repository at this point in the history
…-decrement

Fixing unsubscription races by modeling explicit FSMs for command and thread execution state
  • Loading branch information
mattrjacobs committed May 25, 2016
2 parents f5fe350 + 5b6914b commit 5d27548
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 107 deletions.
179 changes: 88 additions & 91 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@
protected final HystrixThreadPoolKey threadPoolKey;
protected final HystrixCommandProperties properties;

protected static enum TimedOutStatus {
protected enum TimedOutStatus {
NOT_EXECUTED, COMPLETED, TIMED_OUT
}

protected enum CommandState {
NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
}

protected enum ThreadState {
NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL
}

protected final HystrixCommandMetrics metrics;

protected final HystrixCommandKey commandKey;
Expand All @@ -93,10 +101,8 @@ protected static enum TimedOutStatus {

protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();

protected final AtomicBoolean commandStarted = new AtomicBoolean();
protected volatile boolean executionStarted = false;
protected volatile boolean threadExecutionStarted = false;
protected volatile boolean isExecutionComplete = false;
protected AtomicReference<CommandState> commandState = new AtomicReference<CommandState>(CommandState.NOT_STARTED);
protected AtomicReference<ThreadState> threadState = new AtomicReference<ThreadState>(ThreadState.NOT_USING_THREAD);

/*
* {@link ExecutionResult} refers to what happened as the user-provided code ran. If request-caching is used,
Expand Down Expand Up @@ -356,7 +362,7 @@ public void call() {
*/
public Observable<R> toObservable() {
/* this is a stateful object so can only be used once */
if (!commandStarted.compareAndSet(false, true)) {
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
}

Expand All @@ -382,18 +388,16 @@ public Observable<R> toObservable() {
}
}

// ensure that cleanup code only runs exactly once
final AtomicBoolean commandCleanupExecuted = new AtomicBoolean(false);

//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {

@Override
public void call() {
if (commandCleanupExecuted.compareAndSet(false, true)) {
isExecutionComplete = true;
handleCommandEnd(_cmd);
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(_cmd, false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(_cmd, true); //user code did run
}
}
};
Expand All @@ -402,11 +406,16 @@ public void call() {
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
if (commandCleanupExecuted.compareAndSet(false, true)) {
eventNotifier.markEvent(HystrixEventType.CANCELLED, commandKey);
executionResultAtTimeOfCancellation = executionResult
.addEvent((int) (System.currentTimeMillis() - commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd(_cmd);
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd(_cmd, false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd(_cmd, true); //user code did run
}
}
};
Expand Down Expand Up @@ -449,13 +458,6 @@ public void call() {
}
};

final Action1<Throwable> fireOnErrorHook = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {

}
};

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Expand Down Expand Up @@ -484,8 +486,7 @@ public void call(Throwable throwable) {
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook)
.doOnError(fireOnErrorHook);
.doOnCompleted(fireOnCompletedHook);
}

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
Expand Down Expand Up @@ -614,16 +615,19 @@ private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
executionStarted = true;
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
threadExecutionStarted = true;
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
Expand All @@ -640,10 +644,34 @@ public Observable<R> call() {
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT);
Expand All @@ -654,7 +682,10 @@ public Boolean call() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
executionStarted = true;
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
Expand Down Expand Up @@ -821,66 +852,42 @@ private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd)
userObservable = Observable.error(ex);
}

final AtomicBoolean threadStateCleanedUp = new AtomicBoolean(false);

return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}

return fromCache.toObservableWithStateCopiedInto(this)
.doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.

if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
if (threadStateCleanedUp.compareAndSet(false, true)) {
handleThreadEnd(_cmd);
}
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(_cmd, false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(_cmd, true); //user code did run
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
if (threadStateCleanedUp.compareAndSet(false, true)) {
handleThreadEnd(_cmd);
}
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(_cmd, false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(_cmd, true); //user code did run
}
}
});
}

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}

final AtomicBoolean cleanupCompleted = new AtomicBoolean(false);

return fromCache.toObservableWithStateCopiedInto(this).doOnTerminate(new Action0() {
@Override
public void call() {
if (!cleanupCompleted.get()) {
cleanUpAfterResponseFromCache(_cmd);
isExecutionComplete = true;
cleanupCompleted.set(true);
}
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!cleanupCompleted.get()) {
cleanUpAfterResponseFromCache(_cmd);
cleanupCompleted.set(true);
}
}
});
}

private void cleanUpAfterResponseFromCache(AbstractCommand<R> _cmd) {
private void cleanUpAfterResponseFromCache(final AbstractCommand<R> _cmd, boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
Expand All @@ -893,16 +900,11 @@ private void cleanUpAfterResponseFromCache(AbstractCommand<R> _cmd) {
.setNotExecutedInThread();
ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
.markUserThreadCompletion(latency);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted);
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);

//in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup
if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd(_cmd);
}
}

private void handleCommandEnd(AbstractCommand<R> _cmd) {
private void handleCommandEnd(final AbstractCommand _cmd, boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
Expand All @@ -911,19 +913,14 @@ private void handleCommandEnd(AbstractCommand<R> _cmd) {
long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
if (executionResultAtTimeOfCancellation == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, executionStarted);
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}

if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}

//in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup
if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd(_cmd);
}
}

private Observable<R> handleSemaphoreRejectionViaFallback() {
Expand Down Expand Up @@ -1722,7 +1719,7 @@ public boolean isCircuitBreakerOpen() {
* @return boolean
*/
public boolean isExecutionComplete() {
return isExecutionComplete;
return commandState.get().equals(CommandState.TERMINAL);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
public class HystrixCounters {
private static final AtomicInteger concurrentThreadsExecuting = new AtomicInteger(0);

/* package-private */ static void incrementGlobalConcurrentThreads() {
concurrentThreadsExecuting.incrementAndGet();
/* package-private */ static int incrementGlobalConcurrentThreads() {
return concurrentThreadsExecuting.incrementAndGet();
}

/* package-private */ static void decrementGlobalConcurrentThreads() {
concurrentThreadsExecuting.decrementAndGet();
/* package-private */ static int decrementGlobalConcurrentThreads() {
return concurrentThreadsExecuting.decrementAndGet();
}

/**
Expand Down
Loading

0 comments on commit 5d27548

Please sign in to comment.