diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 270c2de96f..228a5165c7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -16,6 +16,7 @@ package rx.operators; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.Arrays; import java.util.Collection; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observer; @@ -556,6 +558,78 @@ public String toString() { } } + /* + * Test subscribing to a group, unsubscribing from it again, and subscribing to a next group + */ + @Test + public void testSubscribeAndImmediatelyUnsubscribeFirstGroup() { + CounterSource source = new CounterSource(); + @SuppressWarnings("unchecked") + final Observer observer = mock(Observer.class); + + Func1 modulo2 = new Func1() { + @Override + public Integer call(Integer x) { + return x%2; + } + }; + + Subscription outerSubscription = source.groupBy(modulo2).subscribe(new Action1>() { + @Override + public void call(GroupedObservable group) { + Subscription innerSubscription = group.subscribe(observer); + if (group.getKey() == 0) { + // We immediately unsubscribe again from the even numbers + innerSubscription.unsubscribe(); + // We should still get the group of odd numbers + } + } + }); + try { + source.thread.join(); + } catch (InterruptedException ex) { + } + + InOrder o = inOrder(observer); + // With a different implementation that subscribes to the group concurrently, we might actually receive 0. + o.verify(observer, never()).onNext(0); + o.verify(observer).onNext(1); + o.verify(observer, never()).onNext(2); + o.verify(observer).onNext(3); + o.verify(observer, never()).onNext(4); + o.verify(observer).onNext(5); + o.verify(observer, never()).onNext(6); + o.verify(observer).onNext(7); + o.verify(observer, never()).onNext(8); + o.verify(observer).onNext(9); + } + + private class CounterSource extends Observable { + public Thread thread = null; + @Override + public Subscription subscribe(final Observer observer) { + thread = new Thread(new Runnable() { + @Override + public void run() { + int i = 0; + while (i < 10) { + observer.onNext(i++); + if (Thread.interrupted()) { + return; + } + } + } + }); + thread.start(); + return new Subscription() { + @Override + public void unsubscribe() { + thread.interrupt(); + } + }; + } + } + } }