From effc08d548518df5a54c916e1b50daadb8bf4228 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 19 Mar 2013 16:23:50 -0700 Subject: [PATCH 1/3] Synchronize Observer on OperationMerge fixes https://github.com/Netflix/RxJava/issues/200 This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable. --- .../java/rx/operators/OperationMerge.java | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1eeb21c1e0..1e6e6e7568 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -23,7 +23,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -33,6 +35,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public final class OperationMerge { @@ -115,10 +119,20 @@ private MergeObservable(Observable> sequences) { } public MergeSubscription call(Observer actualObserver) { + + /** + * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. + *

+ * The calls from each sequence must be serialized. + *

+ * Bug report: https://github.com/Netflix/RxJava/issues/200 + */ + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, new AtomicObservableSubscription(ourSubscription)); + /** * Subscribe to the parent Observable to get to the children Observables */ - sequences.subscribe(new ParentObserver(actualObserver)); + sequences.subscribe(new ParentObserver(synchronizedObserver)); /* return our subscription to allow unsubscribing */ return ourSubscription; @@ -380,6 +394,68 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test + public void testSynchronizationOfMultipleSequences() throws Exception { + final TestASynchronousObservable o1 = new TestASynchronousObservable(); + final TestASynchronousObservable o2 = new TestASynchronousObservable(); + + // use this latch to cause onNext to wait until we're ready to let it go + final CountDownLatch endLatch = new CountDownLatch(1); + + final AtomicInteger concurrentCounter = new AtomicInteger(); + final AtomicInteger totalCounter = new AtomicInteger(); + + @SuppressWarnings("unchecked") + Observable m = Observable.create(merge(o1, o2)); + m.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Exception e) { + throw new RuntimeException("failed", e); + } + + @Override + public void onNext(String v) { + totalCounter.incrementAndGet(); + concurrentCounter.incrementAndGet(); + try { + // wait here until we're done asserting + endLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException("failed", e); + } finally { + concurrentCounter.decrementAndGet(); + } + } + + }); + + // wait for both observables to send (one should be blocked) + o1.onNextBeingSent.await(); + o2.onNextBeingSent.await(); + + assertEquals(1, concurrentCounter.get()); + + // release so it can finish + endLatch.countDown(); + + try { + o1.t.join(); + o2.t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertEquals(2, totalCounter.get()); + assertEquals(0, concurrentCounter.get()); + } + /** * unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge */ @@ -452,6 +528,7 @@ public void unsubscribe() { private static class TestASynchronousObservable extends Observable { Thread t; + final CountDownLatch onNextBeingSent = new CountDownLatch(1); @Override public Subscription subscribe(final Observer observer) { @@ -459,6 +536,7 @@ public Subscription subscribe(final Observer observer) { @Override public void run() { + onNextBeingSent.countDown(); observer.onNext("hello"); observer.onCompleted(); } From fb555df3376301595f6596861662c654d77209d2 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 20 Mar 2013 15:12:44 -0700 Subject: [PATCH 2/3] Synchronization of Merge operator (fixes) - return AtomicSubscription not MergeSubscription which I was accidentally still returning - try/finally in unit test so threads are released even if assertion is thrown --- .../main/java/rx/operators/OperationMerge.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1e6e6e7568..eeb1e96407 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -118,7 +118,7 @@ private MergeObservable(Observable> sequences) { this.sequences = sequences; } - public MergeSubscription call(Observer actualObserver) { + public Subscription call(Observer actualObserver) { /** * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. @@ -127,7 +127,8 @@ public MergeSubscription call(Observer actualObserver) { *

* Bug report: https://github.com/Netflix/RxJava/issues/200 */ - SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, new AtomicObservableSubscription(ourSubscription)); + AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription); + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); /** * Subscribe to the parent Observable to get to the children Observables @@ -135,7 +136,7 @@ public MergeSubscription call(Observer actualObserver) { sequences.subscribe(new ParentObserver(synchronizedObserver)); /* return our subscription to allow unsubscribing */ - return ourSubscription; + return subscription; } /** @@ -439,11 +440,13 @@ public void onNext(String v) { // wait for both observables to send (one should be blocked) o1.onNextBeingSent.await(); o2.onNextBeingSent.await(); - - assertEquals(1, concurrentCounter.get()); - // release so it can finish - endLatch.countDown(); + try { // in try/finally so threads are released via latch countDown even if assertion fails + assertEquals(1, concurrentCounter.get()); + } finally { + // release so it can finish + endLatch.countDown(); + } try { o1.t.join(); From 169e7e06ea223bd3fdca2460b74e9cd361439fff Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 1 Apr 2013 22:51:19 -0700 Subject: [PATCH 3/3] Trying to fix non-deterministic test - not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread See https://github.com/Netflix/RxJava/pull/201 for more information. --- .../src/main/java/rx/operators/OperationMerge.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index eeb1e96407..d5aebe0ad5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -441,6 +441,15 @@ public void onNext(String v) { o1.onNextBeingSent.await(); o2.onNextBeingSent.await(); + // I can't think of a way to know for sure that both threads have or are trying to send onNext + // since I can't use a CountDownLatch for "after" onNext since I want to catch during it + // but I can't know for sure onNext is invoked + // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time + // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following + // onNext is invoked. + + Thread.sleep(300); + try { // in try/finally so threads are released via latch countDown even if assertion fails assertEquals(1, concurrentCounter.get()); } finally { @@ -541,6 +550,8 @@ public Subscription subscribe(final Observer observer) { public void run() { onNextBeingSent.countDown(); observer.onNext("hello"); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock observer.onCompleted(); }