Skip to content

Commit

Permalink
Merge pull request #904 from mattrjacobs/forward-port-902-from-1.4.x
Browse files Browse the repository at this point in the history
Forward port #902 from 1.4.x
  • Loading branch information
mattrjacobs committed Sep 21, 2015
2 parents c308ce4 + eccc3c3 commit 3c839fa
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.hystrix;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;

import java.util.ArrayList;
Expand All @@ -28,6 +29,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import org.junit.After;
Expand Down Expand Up @@ -115,6 +120,7 @@ public String call(String s) {
}
};


@Before
public void init() {
// since we're going to modify properties of the same class between tests, wipe the cache each time
Expand Down Expand Up @@ -156,22 +162,20 @@ public void testTwoRequests() throws Exception {

@Test
public void testTwoRequestsWhichShouldEachEmitTwice() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
//TestCollapserTimer timer = new TestCollapserTimer();
CollapserTimer timer = new RealCollapserTimer();
HystrixObservableCollapser<String, String, String, String> collapser1 = new TestCollapserWithMultipleResponses(timer, 1, 3, false, prefixMapper, onMissingComplete);
HystrixObservableCollapser<String, String, String, String> collapser2 = new TestCollapserWithMultipleResponses(timer, 2, 3, false, prefixMapper, onMissingComplete);

System.out.println("Starting to observe collapser1");
Observable<String> result1 = collapser1.observe();
Observable<String> result2 = collapser2.observe();

timer.incrementTime(10); // let time pass that equals the default delay/period

TestSubscriber<String> testSubscriber1 = new TestSubscriber<String>();
result1.subscribe(testSubscriber1);

TestSubscriber<String> testSubscriber2 = new TestSubscriber<String>();
result2.subscribe(testSubscriber2);

System.out.println(System.currentTimeMillis() + "Starting to observe collapser1");
collapser1.observe().subscribe(testSubscriber1);
collapser2.observe().subscribe(testSubscriber2);
System.out.println(System.currentTimeMillis() + "Done with collapser observe()s");

//Note that removing these awaits breaks the unit test. That implies that the above subscribe does not wait for a terminal event
testSubscriber1.awaitTerminalEvent();
testSubscriber2.awaitTerminalEvent();

Expand Down Expand Up @@ -715,19 +719,19 @@ private static class TestCollapserWithMultipleResponses extends HystrixObservabl
emitsPerArg = new HashMap<String, Integer>();
}

public TestCollapserWithMultipleResponses(TestCollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails) {
public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails) {
this(timer, arg, numEmits, commandConstructionFails, prefixMapper, onMissingComplete);
}

public TestCollapserWithMultipleResponses(TestCollapserTimer timer, int arg, int numEmits, Action1<CollapsedRequest<String, String>> onMissingHandler) {
public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, Action1<CollapsedRequest<String, String>> onMissingHandler) {
this(timer, arg, numEmits, false, prefixMapper, onMissingHandler);
}

public TestCollapserWithMultipleResponses(TestCollapserTimer timer, int arg, int numEmits, Func1<String, String> keyMapper) {
public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, Func1<String, String> keyMapper) {
this(timer, arg, numEmits, false, keyMapper, onMissingComplete);
}

public TestCollapserWithMultipleResponses(TestCollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails, Func1<String, String> keyMapper, Action1<CollapsedRequest<String, String>> onMissingResponseHandler) {
public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails, Func1<String, String> keyMapper, Action1<CollapsedRequest<String, String>> onMissingResponseHandler) {
super(collapserKeyFromString(timer), Scope.REQUEST, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(10).withTimerDelayInMilliseconds(10), createMetrics());
this.arg = arg + "";
emitsPerArg.put(this.arg, numEmits);
Expand All @@ -748,6 +752,7 @@ public String getRequestArgument() {

@Override
protected HystrixObservableCommand<String> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
assertNotNull("command creation should have HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread());
if (commandConstructionFails) {
throw new RuntimeException("Exception thrown in command construction");
} else {
Expand Down Expand Up @@ -808,17 +813,19 @@ private static class TestCollapserCommandWithMultipleResponsePerArgument extends
private final Map<String, Integer> emitsPerArg;

TestCollapserCommandWithMultipleResponsePerArgument(List<Integer> args, Map<String, Integer> emitsPerArg) {
super(testPropsBuilder().setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionTimeoutInMilliseconds(500)));
super(testPropsBuilder());
this.args = args;
this.emitsPerArg = emitsPerArg;
}

@Override
protected Observable<String> construct() {
assertNotNull("Wiring the Batch command into the Observable chain should have a HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread());
return Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
assertNotNull("Executing the Batch command should have a HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread());
Thread.sleep(100);
for (Integer arg: args) {
int numEmits = emitsPerArg.get(arg.toString());
Expand All @@ -829,11 +836,12 @@ public void call(Subscriber<? super String> subscriber) {
Thread.sleep(10);
}
} catch (Throwable ex) {
ex.printStackTrace();
subscriber.onError(ex);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
});
}
}
}

0 comments on commit 3c839fa

Please sign in to comment.