diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 357ba872c3..270c2de96f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -464,7 +464,7 @@ public void testUnsubscribe() throws InterruptedException { @Override public Subscription call(final Observer observer) { final BooleanSubscription s = new BooleanSubscription(); - System.out.println("*** Subscribing to EventStream ***"); + System.out.println("testUnsubscribe => *** Subscribing to EventStream ***"); subscribeCounter.incrementAndGet(); new Thread(new Runnable() { @@ -501,7 +501,7 @@ public Integer call(Event e) { @Override public Observable call(GroupedObservable eventGroupedObservable) { - System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey()); + System.out.println("testUnsubscribe => GroupedObservable Key: " + eventGroupedObservable.getKey()); groupCounter.incrementAndGet(); return eventGroupedObservable @@ -510,7 +510,7 @@ public Observable call(GroupedObservable eventGroupedObs @Override public String call(Event event) { - return "Source: " + event.source + " Message: " + event.message; + return "testUnsubscribe => Source: " + event.source + " Message: " + event.message; } }); @@ -540,10 +540,10 @@ public void onNext(String outputMessage) { assertEquals(1, groupCounter.get()); assertEquals(20, eventCounter.get()); // sentEvents will go until 'eventCounter' hits 20 and then unsubscribes - // which means it will also send (but ignore) the 19 events for the other group + // which means it will also send (but ignore) the 19/20 events for the other group // It will not however send all 100 events. - assertEquals(39, sentEventCounter.get()); - + assertEquals(39, sentEventCounter.get(), 2); + // gave it a delta of 2 so the threading/unsubscription race has wiggle } private static class Event {