From 6df70a191b0337a6ace44dcea2a0523fb53900d6 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 22 Feb 2013 17:18:13 -0800 Subject: [PATCH] More work on HystrixCollapser response not received bug I still can't replicate this so am adding error handling anywhere I can foresee something as well as adding more logs. https://github.com/Netflix/Hystrix/issues/80 --- .../com/netflix/hystrix/HystrixCollapser.java | 149 +++++++++++++++--- 1 file changed, 129 insertions(+), 20 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index aae93bc81..be05b0984 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -341,6 +342,15 @@ public Future queue() { } Future response = collapser.submitRequest(getRequestArgument()); if (properties.requestCachingEnabled().get()) { + /* + * A race can occur here with multiple threads queuing but only one will be cached. + * This means we can have some duplication of requests in a thread-race but we're okay + * with having some inefficiency in duplicate requests in the same batch + * and then subsequent requests will retrieve a previously cached Future. + * + * If this is an issue we can make a lazy-future that gets set in the cache + * then only the winning 'put' will be invoked to actually call 'submitRequest' + */ requestCache.putIfAbsent(getCacheKey(), response); } return response; @@ -676,9 +686,7 @@ private class BatchFutureWrapper implements Future { private final Future actualFuture; private final HystrixCollapser command; private final Collection> requests; - private Lock mapResponseToRequestsLock = new ReentrantLock(); - @GuardedBy("mapResponseToRequestsLock") - private volatile boolean mapResponseToRequestsPerformed = false; + private AtomicBoolean mapResponseWork = new AtomicBoolean(false); private BatchFutureWrapper(Future actualFuture, HystrixCollapser command, Collection> requests) { this.actualFuture = actualFuture; @@ -687,6 +695,20 @@ private BatchFutureWrapper(Future actualFuture, HystrixCollapse } public boolean cancel(boolean mayInterruptIfRunning) { + logger.warn("Cancelling BatchFuture so setting Exception on all collapsed requests."); + + RuntimeException e = new RuntimeException("BatchFuture cancelled."); + + for (CollapsedRequest request : requests) { + try { + request.setException(e); + } catch (IllegalStateException e2) { + // if we have partial responses set in mapResponseToRequests + // then we may get IllegalStateException as we loop over them + // so we'll log but continue to the rest + logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception during BatchFuture cancellation.. Continuing ... ", e2); + } + } return actualFuture.cancel(mayInterruptIfRunning); } @@ -699,26 +721,41 @@ public boolean isDone() { } public BatchReturnType get() throws InterruptedException, ExecutionException { - /* make one of the calling thread to this work using tryLock which allows 1 thread in and all the rest will proceed to actualFuture.get() */ - if (!mapResponseToRequestsPerformed && mapResponseToRequestsLock.tryLock()) { + /* only one thread should do this and all the rest will proceed to actualFuture.get() */ + if (mapResponseWork.compareAndSet(false, true)) { try { - if (!mapResponseToRequestsPerformed) { + /* we only want one thread to execute the above code */ + command.mapResponseToRequests(actualFuture.get(), requests); + } catch (Exception e) { + logger.error("Exception mapping responses to requests.", e); + // if a failure occurs we want to pass that exception to all of the Futures that we've returned + for (CollapsedRequest request : requests) { try { - /* we only want one thread to execute the above code */ - command.mapResponseToRequests(actualFuture.get(), requests); - } catch (Exception e) { - logger.error("Exception mapping responses to requests.", e); - // if a failure occurs we want to pass that exception to all of the Futures that we've returned - for (CollapsedRequest request : requests) { - request.setException(e); - } + request.setException(e); + } catch (IllegalStateException e2) { + // if we have partial responses set in mapResponseToRequests + // then we may get IllegalStateException as we loop over them + // so we'll log but continue to the rest + logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2); } - mapResponseToRequestsPerformed = true; } - } finally { - mapResponseToRequestsLock.unlock(); } + + // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly + for (CollapsedRequest request : requests) { + try { + if (((CollapsedRequestFutureImpl) request).responseReference.get() == null) { + request.setException(new NullPointerException("No response set.")); + } + } catch (IllegalStateException e2) { + logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2); + } + } } + + // TODO this is a thread-race and will release BEFORE we call request.setException/request.setResponse + // but shouldn't matter since 'responseReceived' will make the thread wait in CollapsedRequestFutureImpl. + // Does this need to block here until the code above is completed? return actualFuture.get(); } @@ -901,8 +938,12 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution responseReceived.await(timeout, unit); if (responseReference.get() == null) { - logger.error("TimedOut waiting on responseReceived: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture); - throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException()); + if(batchFuture == null) { + logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: NULL batchFuture.isCancelled: NULL argument: " + argument); + } else { + logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: " + batchFuture.isDone() + " batchFuture.isCancelled: " + batchFuture.isCancelled() + " argument: " + argument); + } + throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException("ResponseReference is NULL")); } else { // we got past here so let's return the response now if (responseReference.get().getException() != null) { @@ -985,7 +1026,7 @@ protected String getCacheKey() { /** * Clears all state. If new requests come in instances will be recreated and metrics started from scratch. */ - /* package */ static void reset() { + /* package */static void reset() { defaultNameCache.clear(); globalScopedCollapsers.clear(); requestScopedCollapsers.clear(); @@ -1762,6 +1803,36 @@ public void testRequestCacheWithTimeout() { assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); } + /** + * Test how the collapser behaves when the circuit is short-circuited + */ + @Test + public void testRequestWithCommandShortCircuited() throws Exception { + TestCollapserTimer timer = new TestCollapserTimer(); + Future response1 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "1").queue(); + Future response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "2").queue(); + timer.incrementTime(10); // let time pass that equals the default delay/period + + try { + response1.get(); + fail("we should have received an exception"); + } catch (ExecutionException e) { + // e.printStackTrace(); + // what we expect + } + try { + response2.get(); + fail("we should have received an exception"); + } catch (ExecutionException e) { + // e.printStackTrace(); + // what we expect + } + + assertEquals(0, counter.get()); + // it will execute once (short-circuited) + assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + } + private static class TestRequestCollapser extends HystrixCollapser, String, String> { private final AtomicInteger count; @@ -1900,6 +1971,23 @@ public HystrixCommand> createCommand(Collection> createCommand(Collection> requests) { + // args don't matter as it's short-circuited + return new ShortCircuitedCommand(); + } + + } + /** * Throw an exception when mapToResponse is invoked */ @@ -1974,6 +2062,27 @@ public String getCacheKey() { } } + private static class ShortCircuitedCommand extends HystrixCommand> { + + protected ShortCircuitedCommand() { + super(HystrixCommand.Setter.withGroupKey( + HystrixCommandGroupKey.Factory.asKey("shortCircuitedCommand")) + .andCommandPropertiesDefaults(HystrixCommandProperties.Setter + .getUnitTestPropertiesSetter() + .withCircuitBreakerForceOpen(true))); + } + + @Override + protected List run() throws Exception { + System.out.println("*** execution (this shouldn't happen)"); + // this won't ever get called as we're forcing short-circuiting + ArrayList values = new ArrayList(); + values.add("hello"); + return values; + } + + } + private static class TestCollapserTimer implements CollapserTimer { private final ConcurrentLinkedQueue tasks = new ConcurrentLinkedQueue();