diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index 1f457149f..3a66bca30 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -380,6 +380,7 @@ public void call(Subscriber observer) { /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); + final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // acquire a permit if (executionSemaphore.tryAcquire()) { try { @@ -393,10 +394,21 @@ public void call() { // release the semaphore // this is done here instead of below so that the acquire/release happens where it is guaranteed // and not affected by the conditional circuit-breaker checks, timeouts, etc - executionSemaphore.release(); + if (semaphoreHasBeenReleased.compareAndSet(false, true)) { + executionSemaphore.release(); + } } - }).unsafeSubscribe(observer); + }) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + if (semaphoreHasBeenReleased.compareAndSet(false, true)) { + executionSemaphore.release(); + } + } + }) + .unsafeSubscribe(observer); } catch (RuntimeException e) { observer.onError(e); } @@ -444,24 +456,29 @@ public Observable call(Throwable t) { }); - // any final cleanup needed - o = o.doOnTerminate(new Action0() { + final AtomicBoolean commandCleanupExecuted = new AtomicBoolean(false); + final Action0 commandCleanup = new Action0() { @Override public void call() { - Reference tl = timeoutTimer.get(); - if (tl != null) { - tl.clear(); - } + if (commandCleanupExecuted.compareAndSet(false, true)) { + Reference tl = timeoutTimer.get(); + if (tl != null) { + tl.clear(); + } - long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp(); - executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); - metrics.markCommandDone(executionResult, commandKey, threadPoolKey); - // record that we're completed - isExecutionComplete.set(true); + long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp(); + executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); + metrics.markCommandDone(executionResult, commandKey, threadPoolKey); + // record that we're completed + isExecutionComplete.set(true); + } } - }); + }; + + // any final cleanup needed + o = o.doOnTerminate(commandCleanup).doOnUnsubscribe(commandCleanup); // put in cache if (requestCacheEnabled) { @@ -546,15 +563,6 @@ public Boolean call() { } } - run = run.doOnEach(new Action1>() { - - @Override - public void call(Notification n) { - setRequestContextIfNeeded(currentRequestContext); - } - - - }); if (properties.executionTimeoutEnabled().get()) { run = run.lift(new HystrixObservableTimeoutOperator(_self)); } @@ -652,8 +660,6 @@ public void call(Notification n) { }).doOnTerminate(new Action0() { @Override public void call() { - //if the command timed out, then we've reached this point in the calling thread - //but the Hystrix thread is still doing work. Let it handle these markers. if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { handleThreadEnd(); } @@ -684,7 +690,6 @@ public void call() { //to handle these markers. Otherwise, the calling thread will perform these for us. if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) { handleThreadEnd(); - } } }); @@ -730,6 +735,7 @@ private Observable getFallbackOrThrowException(final HystrixEventType eventTy final AbstractCommand _cmd = this; final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); + final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); Observable fallbackExecutionChain; @@ -755,7 +761,18 @@ private Observable getFallbackOrThrowException(final HystrixEventType eventTy @Override public void call() { - fallbackSemaphore.release(); + if (semaphoreHasBeenReleased.compareAndSet(false, true)) { + fallbackSemaphore.release(); + } + } + }) + .doOnUnsubscribe(new Action0() { + + @Override + public void call() { + if (semaphoreHasBeenReleased.compareAndSet(false, true)) { + fallbackSemaphore.release(); + } } }); } else { @@ -817,21 +834,6 @@ public Observable call(Throwable t) { } } - }).doOnTerminate(new Action0() { - - @Override - public void call() { - // record that we're completed (to handle non-successful events we do it here as well as at the end of executeCommand - isExecutionComplete.set(true); - } - - }).doOnEach(new Action1>() { - - @Override - public void call(Notification n) { - setRequestContextIfNeeded(currentRequestContext); - } - }); } else { /* fallback is disabled so throw HystrixRuntimeException */ @@ -844,15 +846,7 @@ public void call(Notification n) { } } - return fallbackLogicApplied.doOnTerminate(new Action0() { - - @Override - public void call() { - // record that we're completed (to handle non-successful events we do it here as well as at the end of executeCommand - isExecutionComplete.set(true); - } - - }).doOnEach(new Action1>() { + return fallbackLogicApplied.doOnEach(new Action1>() { @Override public void call(Notification n) { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java b/hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java index 53a24886c..008e16c55 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/CommonHystrixCommandTests.java @@ -1489,7 +1489,6 @@ public void call(C command) { ********************* END SEMAPHORE-ISOLATED Execution Hook Tests *********************************** */ - /** * Abstract methods defining a way to instantiate each of the described commands. * {@link HystrixCommandTest} and {@link HystrixObservableCommandTest} should each provide concrete impls for diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index 8e1ec8ce8..bb9ec7af9 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -35,9 +35,13 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func0; import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import java.io.IOException; import java.util.List; @@ -2980,6 +2984,139 @@ protected Integer run() throws Exception { assertFalse(executionAttempted.get()); } + @Test + public void testEarlyUnsubscribeDuringExecution() { + class AsyncCommand extends HystrixCommand { + + public AsyncCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC"))); + } + + @Override + protected Boolean run() { + try { + Thread.sleep(100); + return true; + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + HystrixCommand cmd = new AsyncCommand(); + + final CountDownLatch latch = new CountDownLatch(1); + + Observable o = cmd.toObservable(); + Subscription s = o. + doOnUnsubscribe(new Action0() { + @Override + public void call() { + System.out.println("OnUnsubscribe"); + latch.countDown(); + } + }). + subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println("OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("OnError : " + e); + } + + @Override + public void onNext(Boolean b) { + System.out.println("OnNext : " + b); + } + }); + + try { + s.unsubscribe(); + assertTrue(latch.await(200, TimeUnit.MILLISECONDS)); + assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed()); + assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed()); + assertTrue(cmd.isExecutionComplete()); + assertTrue(cmd.isExecutedInThread()); + System.out.println("EventCounts : " + cmd.getEventCounts()); + System.out.println("Execution Time : " + cmd.getExecutionTimeInMilliseconds()); + System.out.println("Is Successful : " + cmd.isSuccessfulExecution()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + + @Test + public void testEarlyUnsubscribeDuringFallback() { + class AsyncCommand extends HystrixCommand { + + public AsyncCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC"))); + } + + @Override + protected Boolean run() { + throw new RuntimeException("run failure"); + } + + @Override + protected Boolean getFallback() { + try { + Thread.sleep(100); + return false; + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + HystrixCommand cmd = new AsyncCommand(); + + final CountDownLatch latch = new CountDownLatch(1); + + Observable o = cmd.toObservable(); + Subscription s = o. + doOnUnsubscribe(new Action0() { + @Override + public void call() { + System.out.println("OnUnsubscribe"); + latch.countDown(); + } + }). + subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println("OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("OnError : " + e); + } + + @Override + public void onNext(Boolean b) { + System.out.println("OnNext : " + b); + } + }); + + try { + Thread.sleep(10); //give fallback a chance to fire + s.unsubscribe(); + assertTrue(latch.await(200, TimeUnit.MILLISECONDS)); + assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed()); + assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed()); + assertTrue(cmd.isExecutionComplete()); + assertTrue(cmd.isExecutedInThread()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + /* ******************************************************************************** */ /* ******************************************************************************** */ /* private HystrixCommand class implementations for unit testing */ diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index c04e0c679..f122af4d1 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -37,6 +37,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscriber; +import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func0; @@ -4494,6 +4495,150 @@ public void testExecutionPartialSuccessWithFallback() { } } + @Test + public void testEarlyUnsubscribeDuringExecution() { + class AsyncCommand extends HystrixObservableCommand { + + public AsyncCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC"))); + } + + @Override + protected Observable construct() { + return Observable.defer(new Func0>() { + @Override + public Observable call() { + try { + Thread.sleep(100); + return Observable.just(true); + } catch (InterruptedException ex) { + return Observable.error(ex); + } + } + }).subscribeOn(Schedulers.io()); + } + } + + HystrixObservableCommand cmd = new AsyncCommand(); + + final CountDownLatch latch = new CountDownLatch(1); + + Observable o = cmd.toObservable(); + Subscription s = o. + doOnUnsubscribe(new Action0() { + @Override + public void call() { + System.out.println("OnUnsubscribe"); + latch.countDown(); + } + }). + subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println("OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("OnError : " + e); + } + + @Override + public void onNext(Boolean b) { + System.out.println("OnNext : " + b); + } + }); + + try { + s.unsubscribe(); + assertTrue(latch.await(200, TimeUnit.MILLISECONDS)); + assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed()); + assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed()); + assertTrue(cmd.isExecutionComplete()); + assertFalse(cmd.isExecutedInThread()); + System.out.println("EventCounts : " + cmd.getEventCounts()); + System.out.println("Execution Time : " + cmd.getExecutionTimeInMilliseconds()); + System.out.println("Is Successful : " + cmd.isSuccessfulExecution()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + + @Test + public void testEarlyUnsubscribeDuringFallback() { + class AsyncCommand extends HystrixObservableCommand { + + public AsyncCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC"))); + } + + @Override + protected Observable construct() { + return Observable.error(new RuntimeException("construct failure")); + } + + @Override + protected Observable resumeWithFallback() { + return Observable.defer(new Func0>() { + @Override + public Observable call() { + try { + Thread.sleep(100); + return Observable.just(false); + } catch (InterruptedException ex) { + return Observable.error(ex); + } + } + }).subscribeOn(Schedulers.io()); + } + } + + HystrixObservableCommand cmd = new AsyncCommand(); + + final CountDownLatch latch = new CountDownLatch(1); + + Observable o = cmd.toObservable(); + Subscription s = o. + doOnUnsubscribe(new Action0() { + @Override + public void call() { + System.out.println("OnUnsubscribe"); + latch.countDown(); + } + }). + subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println("OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("OnError : " + e); + } + + @Override + public void onNext(Boolean b) { + System.out.println("OnNext : " + b); + } + }); + + try { + Thread.sleep(10); //give fallback a chance to fire + s.unsubscribe(); + assertTrue(latch.await(200, TimeUnit.MILLISECONDS)); + assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed()); + assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed()); + assertTrue(cmd.isExecutionComplete()); + assertFalse(cmd.isExecutedInThread()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + + /* ******************************************************************************** */ /* ******************************************************************************** */ /* private HystrixCommand class implementations for unit testing */ @@ -4760,7 +4905,7 @@ public TestPartialSuccessWithFallback(HystrixCommandProperties.Setter properties @Override protected Observable construct() { return Observable.just(false, true, false) - .concatWith(Observable. error(new RuntimeException("forced error"))) + .concatWith(Observable.error(new RuntimeException("forced error"))) .subscribeOn(Schedulers.computation()); } @@ -4771,43 +4916,7 @@ protected Observable resumeWithFallback() { } - /** - * Test how a fallback could be done on a streaming response where it is partially successful - * by retaining state of what has been seen. - */ - private static class TestPartialSuccessWithIntelligentFallback extends TestHystrixObservableCommand { - - TestPartialSuccessWithIntelligentFallback() { - super(TestHystrixObservableCommand.testPropsBuilder()); - } - - volatile int lastSeen = 0; - @Override - protected Observable construct() { - return Observable.just(1, 2, 3) - .concatWith(Observable. error(new RuntimeException("forced error"))) - .doOnNext(new Action1() { - - @Override - public void call(Integer t1) { - lastSeen = t1; - } - - }) - .subscribeOn(Schedulers.computation()); - } - - @Override - protected Observable resumeWithFallback() { - if (lastSeen < 4) { - return Observable.range(lastSeen + 1, 4 - lastSeen); - } else { - return Observable.empty(); - } - } - - } /** * Successful execution - no fallback implementation.