Skip to content

Commit

Permalink
Merge pull request ReactiveX#201 from benjchristensen/issue-200-merge…
Browse files Browse the repository at this point in the history
…-synchronization

Synchronize Observer on OperationMerge
  • Loading branch information
benjchristensen committed Apr 2, 2013
2 parents f230b45 + 5ece32b commit c83d218
Showing 1 changed file with 95 additions and 3 deletions.
98 changes: 95 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -33,6 +35,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Func1;

public final class OperationMerge {
Expand Down Expand Up @@ -114,14 +118,25 @@ private MergeObservable(Observable<Observable<T>> sequences) {
this.sequences = sequences;
}

public MergeSubscription call(Observer<T> actualObserver) {
public Subscription call(Observer<T> actualObserver) {

/**
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
* <p>
* The calls from each sequence must be serialized.
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/200
*/
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);

/**
* Subscribe to the parent Observable to get to the children Observables
*/
sequences.subscribe(new ParentObserver(actualObserver));
sequences.subscribe(new ParentObserver(synchronizedObserver));

/* return our subscription to allow unsubscribing */
return ourSubscription;
return subscription;
}

/**
Expand Down Expand Up @@ -380,6 +395,79 @@ public void testMergeArrayWithThreading() {
verify(stringObserver, times(1)).onCompleted();
}

@Test
public void testSynchronizationOfMultipleSequences() throws Exception {
final TestASynchronousObservable o1 = new TestASynchronousObservable();
final TestASynchronousObservable o2 = new TestASynchronousObservable();

// use this latch to cause onNext to wait until we're ready to let it go
final CountDownLatch endLatch = new CountDownLatch(1);

final AtomicInteger concurrentCounter = new AtomicInteger();
final AtomicInteger totalCounter = new AtomicInteger();

@SuppressWarnings("unchecked")
Observable<String> m = Observable.create(merge(o1, o2));
m.subscribe(new Observer<String>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Exception e) {
throw new RuntimeException("failed", e);
}

@Override
public void onNext(String v) {
totalCounter.incrementAndGet();
concurrentCounter.incrementAndGet();
try {
// wait here until we're done asserting
endLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("failed", e);
} finally {
concurrentCounter.decrementAndGet();
}
}

});

// wait for both observables to send (one should be blocked)
o1.onNextBeingSent.await();
o2.onNextBeingSent.await();

// I can't think of a way to know for sure that both threads have or are trying to send onNext
// since I can't use a CountDownLatch for "after" onNext since I want to catch during it
// but I can't know for sure onNext is invoked
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
// onNext is invoked.

Thread.sleep(300);

try { // in try/finally so threads are released via latch countDown even if assertion fails
assertEquals(1, concurrentCounter.get());
} finally {
// release so it can finish
endLatch.countDown();
}

try {
o1.t.join();
o2.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertEquals(2, totalCounter.get());
assertEquals(0, concurrentCounter.get());
}

/**
* unit test from OperationMergeDelayError backported here to show how these use cases work with normal merge
*/
Expand Down Expand Up @@ -452,14 +540,18 @@ public void unsubscribe() {

private static class TestASynchronousObservable extends Observable<String> {
Thread t;
final CountDownLatch onNextBeingSent = new CountDownLatch(1);

@Override
public Subscription subscribe(final Observer<String> observer) {
t = new Thread(new Runnable() {

@Override
public void run() {
onNextBeingSent.countDown();
observer.onNext("hello");
// I can't use a countDownLatch to prove we are actually sending 'onNext'
// since it will block if synchronized and I'll deadlock
observer.onCompleted();
}

Expand Down

0 comments on commit c83d218

Please sign in to comment.