Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More work on HystrixCollapser response not received bug #112

Merged
merged 1 commit into from
Feb 23, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 129 additions & 20 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -341,6 +342,15 @@ public Future<ResponseType> queue() {
}
Future<ResponseType> response = collapser.submitRequest(getRequestArgument());
if (properties.requestCachingEnabled().get()) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
* This means we can have some duplication of requests in a thread-race but we're okay
* with having some inefficiency in duplicate requests in the same batch
* and then subsequent requests will retrieve a previously cached Future.
*
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
requestCache.putIfAbsent(getCacheKey(), response);
}
return response;
Expand Down Expand Up @@ -676,9 +686,7 @@ private class BatchFutureWrapper implements Future<BatchReturnType> {
private final Future<BatchReturnType> actualFuture;
private final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> command;
private final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests;
private Lock mapResponseToRequestsLock = new ReentrantLock();
@GuardedBy("mapResponseToRequestsLock")
private volatile boolean mapResponseToRequestsPerformed = false;
private AtomicBoolean mapResponseWork = new AtomicBoolean(false);

private BatchFutureWrapper(Future<BatchReturnType> actualFuture, HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> command, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
this.actualFuture = actualFuture;
Expand All @@ -687,6 +695,20 @@ private BatchFutureWrapper(Future<BatchReturnType> actualFuture, HystrixCollapse
}

public boolean cancel(boolean mayInterruptIfRunning) {
logger.warn("Cancelling BatchFuture so setting Exception on all collapsed requests.");

RuntimeException e = new RuntimeException("BatchFuture cancelled.");

for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
request.setException(e);
} catch (IllegalStateException e2) {
// if we have partial responses set in mapResponseToRequests
// then we may get IllegalStateException as we loop over them
// so we'll log but continue to the rest
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception during BatchFuture cancellation.. Continuing ... ", e2);
}
}
return actualFuture.cancel(mayInterruptIfRunning);
}

Expand All @@ -699,26 +721,41 @@ public boolean isDone() {
}

public BatchReturnType get() throws InterruptedException, ExecutionException {
/* make one of the calling thread to this work using tryLock which allows 1 thread in and all the rest will proceed to actualFuture.get() */
if (!mapResponseToRequestsPerformed && mapResponseToRequestsLock.tryLock()) {
/* only one thread should do this and all the rest will proceed to actualFuture.get() */
if (mapResponseWork.compareAndSet(false, true)) {
try {
if (!mapResponseToRequestsPerformed) {
/* we only want one thread to execute the above code */
command.mapResponseToRequests(actualFuture.get(), requests);
} catch (Exception e) {
logger.error("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
/* we only want one thread to execute the above code */
command.mapResponseToRequests(actualFuture.get(), requests);
} catch (Exception e) {
logger.error("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
request.setException(e);
}
request.setException(e);
} catch (IllegalStateException e2) {
// if we have partial responses set in mapResponseToRequests
// then we may get IllegalStateException as we loop over them
// so we'll log but continue to the rest
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
}
mapResponseToRequestsPerformed = true;
}
} finally {
mapResponseToRequestsLock.unlock();
}

// check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
try {
if (((CollapsedRequestFutureImpl<ResponseType, RequestArgumentType>) request).responseReference.get() == null) {
request.setException(new NullPointerException("No response set."));
}
} catch (IllegalStateException e2) {
logger.warn("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
}
}
}

// TODO this is a thread-race and will release BEFORE we call request.setException/request.setResponse
// but shouldn't matter since 'responseReceived' will make the thread wait in CollapsedRequestFutureImpl.
// Does this need to block here until the code above is completed?
return actualFuture.get();
}

Expand Down Expand Up @@ -901,8 +938,12 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
responseReceived.await(timeout, unit);

if (responseReference.get() == null) {
logger.error("TimedOut waiting on responseReceived: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture);
throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException());
if(batchFuture == null) {
logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: NULL batchFuture.isCancelled: NULL argument: " + argument);
} else {
logger.error("TimedOut waiting on responseReference: " + responseReceived.getCount() + " batchReceived: " + batchReceived.getCount() + " batchFuture: " + batchFuture + " batchFuture.isDone: " + batchFuture.isDone() + " batchFuture.isCancelled: " + batchFuture.isCancelled() + " argument: " + argument);
}
throw new ExecutionException("No response or exception set before returning from Future.get", new NullPointerException("ResponseReference is NULL"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something besides NullPointerException make sense? Or is this case basically a bug so calling code shouldn't think about handling it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bug and should never happen which is what I'm trying to track down.

I'm deploying debug builds into a production canary to try and determine how this happens as I can't replicate it outside of production.

} else {
// we got past here so let's return the response now
if (responseReference.get().getException() != null) {
Expand Down Expand Up @@ -985,7 +1026,7 @@ protected String getCacheKey() {
/**
* Clears all state. If new requests come in instances will be recreated and metrics started from scratch.
*/
/* package */ static void reset() {
/* package */static void reset() {
defaultNameCache.clear();
globalScopedCollapsers.clear();
requestScopedCollapsers.clear();
Expand Down Expand Up @@ -1762,6 +1803,36 @@ public void testRequestCacheWithTimeout() {
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

/**
* Test how the collapser behaves when the circuit is short-circuited
*/
@Test
public void testRequestWithCommandShortCircuited() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
Future<String> response1 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "1").queue();
Future<String> response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "2").queue();
timer.incrementTime(10); // let time pass that equals the default delay/period

try {
response1.get();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
// what we expect
}
try {
response2.get();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
// what we expect
}

assertEquals(0, counter.get());
// it will execute once (short-circuited)
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
}

private static class TestRequestCollapser extends HystrixCollapser<List<String>, String, String> {

private final AtomicInteger count;
Expand Down Expand Up @@ -1900,6 +1971,23 @@ public HystrixCommand<List<String>> createCommand(Collection<com.netflix.hystrix

}

/**
* Throw an exception when creating a command.
*/
private static class TestRequestCollapserWithShortCircuitedCommand extends TestRequestCollapser {

public TestRequestCollapserWithShortCircuitedCommand(TestCollapserTimer timer, AtomicInteger counter, String value) {
super(timer, counter, value);
}

@Override
public HystrixCommand<List<String>> createCommand(Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<String, String>> requests) {
// args don't matter as it's short-circuited
return new ShortCircuitedCommand();
}

}

/**
* Throw an exception when mapToResponse is invoked
*/
Expand Down Expand Up @@ -1974,6 +2062,27 @@ public String getCacheKey() {
}
}

private static class ShortCircuitedCommand extends HystrixCommand<List<String>> {

protected ShortCircuitedCommand() {
super(HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("shortCircuitedCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter
.getUnitTestPropertiesSetter()
.withCircuitBreakerForceOpen(true)));
}

@Override
protected List<String> run() throws Exception {
System.out.println("*** execution (this shouldn't happen)");
// this won't ever get called as we're forcing short-circuiting
ArrayList<String> values = new ArrayList<String>();
values.add("hello");
return values;
}

}

private static class TestCollapserTimer implements CollapserTimer {

private final ConcurrentLinkedQueue<ATask> tasks = new ConcurrentLinkedQueue<ATask>();
Expand Down