diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1eeb21c1e0c..d5aebe0ad5b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -23,7 +23,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -33,6 +35,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public final class OperationMerge { @@ -114,14 +118,25 @@ private MergeObservable(Observable> sequences) { this.sequences = sequences; } - public MergeSubscription call(Observer actualObserver) { + public Subscription call(Observer actualObserver) { + + /** + * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. + *

+ * The calls from each sequence must be serialized. + *

+ * Bug report: https://github.com/Netflix/RxJava/issues/200 + */ + AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription); + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); + /** * Subscribe to the parent Observable to get to the children Observables */ - sequences.subscribe(new ParentObserver(actualObserver)); + sequences.subscribe(new ParentObserver(synchronizedObserver)); /* return our subscription to allow unsubscribing */ - return ourSubscription; + return subscription; } /** @@ -380,6 +395,79 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test + public void testSynchronizationOfMultipleSequences() throws Exception { + final TestASynchronousObservable o1 = new TestASynchronousObservable(); + final TestASynchronousObservable o2 = new TestASynchronousObservable(); + + // use this latch to cause onNext to wait until we're ready to let it go + final CountDownLatch endLatch = new CountDownLatch(1); + + final AtomicInteger concurrentCounter = new AtomicInteger(); + final AtomicInteger totalCounter = new AtomicInteger(); + + @SuppressWarnings("unchecked") + Observable m = Observable.create(merge(o1, o2)); + m.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Exception e) { + throw new RuntimeException("failed", e); + } + + @Override + public void onNext(String v) { + totalCounter.incrementAndGet(); + concurrentCounter.incrementAndGet(); + try { + // wait here until we're done asserting + endLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException("failed", e); + } finally { + concurrentCounter.decrementAndGet(); + } + } + + }); + + // wait for both observables to send (one should be blocked) + o1.onNextBeingSent.await(); + o2.onNextBeingSent.await(); + + // I can't think of a way to know for sure that both threads have or are trying to send onNext + // since I can't use a CountDownLatch for "after" onNext since I want to catch during it + // but I can't know for sure onNext is invoked + // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time + // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following + // onNext is invoked. + + Thread.sleep(300); + + try { // in try/finally so threads are released via latch countDown even if assertion fails + assertEquals(1, concurrentCounter.get()); + } finally { + // release so it can finish + endLatch.countDown(); + } + + try { + o1.t.join(); + o2.t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertEquals(2, totalCounter.get()); + assertEquals(0, concurrentCounter.get()); + } + /** * unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge */ @@ -452,6 +540,7 @@ public void unsubscribe() { private static class TestASynchronousObservable extends Observable { Thread t; + final CountDownLatch onNextBeingSent = new CountDownLatch(1); @Override public Subscription subscribe(final Observer observer) { @@ -459,7 +548,10 @@ public Subscription subscribe(final Observer observer) { @Override public void run() { + onNextBeingSent.countDown(); observer.onNext("hello"); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock observer.onCompleted(); }