From cfa7155c24089a2fbfd2e9481b56ad953079acf3 Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Tue, 14 May 2013 20:44:10 +0200 Subject: [PATCH 1/4] Rewrite concat operation to not block on subscribe The concat operator previously blocked on calling subscribe until all the sequences had finished. In quite some cases this results in unwanted (and unexpected) behaviour, such as when prefixing an infinite Observable with a fixed one, for example when using startWith (which calls concat): someInputStream.startWith(123).subscribe(x -> print(x)); This statement will block indefinitely if the input stream is infinite. Also on finite sequences it seems silly to have to wait for them to finish. In this new approach the incoming observables are put into a queue, instead of waiting for the whole sequence to finish. When the first observable completes, the next one is taken from the queue and subscribed to, and so on. The queue can be extended while processing the observables, and onCompleted is only called when both the source of observables has completed and all observables in the queue have been read. --- .../java/rx/operators/OperationConcat.java | 167 +++++++++--------- 1 file changed, 88 insertions(+), 79 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 93c14b2acf..625fb4cd3e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -21,9 +21,11 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Test; @@ -41,23 +43,10 @@ public final class OperationConcat { /** * Combine the observable sequences from the list of Observables into one - * observable sequence without any transformation. If either the outer + * observable sequence without any transformation. If either the outer * observable or an inner observable calls onError, we will call onError. - * - *

- * - * The outer observable might run on a separate thread from (one of) the - * inner observables; in this case care must be taken to avoid a deadlock. - * The Concat operation may block the outer thread while servicing an inner - * thread in order to ensure a well-defined ordering of elements; therefore - * none of the inner threads must be implemented in a way that might wait on - * the outer thread. - * *

* - * Beware that concat(o1,o2).subscribe() is a blocking call from - * which it is impossible to unsubscribe if observables are running on same thread. - * * @param sequences An observable sequence of elements to project. * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ @@ -70,73 +59,101 @@ public static Func1, Subscription> concat(final List Func1, Subscription> concat(final Observable> sequences) { - return new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - return new ConcatSubscription(sequences, observer); - } - }; + return new Concat(sequences); } - private static class ConcatSubscription extends BooleanSubscription { - // Might be updated by an inner thread's onError during the outer - // thread's onNext, then read in the outer thread's onComplete. - final AtomicBoolean innerError = new AtomicBoolean(false); + private static class Concat implements Func1, Subscription> { + private Observable> sequences; + private AtomicObservableSubscription innerSubscription = null; + + public Concat(Observable> sequences) { + this.sequences = sequences; + } - public ConcatSubscription(Observable> sequences, final Observer observer) { + public Subscription call(final Observer observer) { + final AtomicBoolean completedOrErred = new AtomicBoolean(false); + final AtomicBoolean allSequencesReceived = new AtomicBoolean(false); + final Queue> nextSequences = new ConcurrentLinkedQueue>(); final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription(); - outerSubscription.wrap(sequences.subscribe(new Observer>() { + + final Observer reusableObserver = new Observer() { @Override - public void onNext(Observable nextSequence) { - // We will not return from onNext until the inner observer completes. - // NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted. - final CountDownLatch latch = new CountDownLatch(1); - final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(); - innerSubscription.wrap(nextSequence.subscribe(new Observer() { - @Override - public void onNext(T item) { - // Make our best-effort to release resources in the face of unsubscribe. - if (isUnsubscribed()) { - innerSubscription.unsubscribe(); - outerSubscription.unsubscribe(); - } else { - observer.onNext(item); + public void onNext(T item) { + observer.onNext(item); + } + @Override + public void onError(Exception e) { + if (completedOrErred.compareAndSet(false, true)) { + outerSubscription.unsubscribe(); + observer.onError(e); + } + } + @Override + public void onCompleted() { + synchronized (nextSequences) { + if (nextSequences.isEmpty()) { + // No new sequences available at the moment + innerSubscription = null; + if (allSequencesReceived.get()) { + // No new sequences are coming, we are finished + if (completedOrErred.compareAndSet(false, true)) { + observer.onCompleted(); + } } - } - @Override - public void onError(Exception e) { - outerSubscription.unsubscribe(); - innerError.set(true); - observer.onError(e); - latch.countDown(); - } - @Override - public void onCompleted() { + } else { // Continue on to the next sequence - latch.countDown(); + innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequences.poll().subscribe(this)); + } + } + } + }; + + outerSubscription.wrap(sequences.subscribe(new Observer>() { + @Override + public void onNext(Observable nextSequence) { + synchronized (nextSequences) { + if (innerSubscription == null) { + // We are currently not subscribed to any sequence + innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequence.subscribe(reusableObserver)); + } else { + // Put this sequence at the end of the queue + nextSequences.add(nextSequence); } - })); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Exceptions.propagate(e); } } @Override public void onError(Exception e) { - // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. - observer.onError(e); + if (completedOrErred.compareAndSet(false, true)) { + if (innerSubscription != null) { + innerSubscription.unsubscribe(); + } + observer.onError(e); + } } @Override public void onCompleted() { - // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. - if (!innerError.get()) { - observer.onCompleted(); + allSequencesReceived.set(true); + if (innerSubscription == null) { + // We are not subscribed to any sequence, and none are coming anymore + if (completedOrErred.compareAndSet(false, true)) { + observer.onCompleted(); + } } } })); + + return new Subscription() { + @Override + public void unsubscribe() { + synchronized (nextSequences) { + if (innerSubscription != null) + innerSubscription.unsubscribe(); + outerSubscription.unsubscribe(); + } + } + }; } } @@ -445,7 +462,7 @@ public void testConcatConcurrentWithInfinity() { /** - * The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete. + * Test unsubscribing the concatenated Observable in a single thread. */ @Test public void testConcatUnsubscribe() { @@ -459,20 +476,13 @@ public void testConcatUnsubscribe() { @SuppressWarnings("unchecked") final Observable concat = Observable.create(concat(w1, w2)); final AtomicObservableSubscription s1 = new AtomicObservableSubscription(); - Thread t = new Thread() { - @Override - public void run() { - // NB: this statement does not complete until after "six" has been delivered. - s1.wrap(concat.subscribe(aObserver)); - } - }; - t.start(); + try { + // Subscribe + s1.wrap(concat.subscribe(aObserver)); //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once. callOnce.await(); - // NB: This statement has no effect, since s1 cannot possibly - // wrap anything until "six" has been delivered, which cannot - // happen until we okToContinue.countDown() + // Unsubcribe s1.unsubscribe(); //Unblock the observable to continue. okToContinue.countDown(); @@ -488,10 +498,9 @@ public void run() { inOrder.verify(aObserver, times(1)).onNext("two"); inOrder.verify(aObserver, times(1)).onNext("three"); inOrder.verify(aObserver, times(1)).onNext("four"); - // NB: you might hope that five and six are not delivered, but see above. - inOrder.verify(aObserver, times(1)).onNext("five"); - inOrder.verify(aObserver, times(1)).onNext("six"); - inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onNext("five"); + inOrder.verify(aObserver, never()).onNext("six"); + inOrder.verify(aObserver, never()).onCompleted(); } From 658d824b9f632f79a2b29979a6dfbb1a0a925783 Mon Sep 17 00:00:00 2001 From: Billy Yuen Date: Wed, 15 May 2013 11:37:43 -0700 Subject: [PATCH 2/4] Add new unit test to cover infinite observable being the first sequence. --- .../java/rx/operators/OperationConcat.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 625fb4cd3e..6f3b8902f5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -36,7 +36,7 @@ import rx.Subscription; import rx.subscriptions.BooleanSubscription; import rx.util.AtomicObservableSubscription; -import rx.util.Exceptions; + import rx.util.functions.Func1; public final class OperationConcat { @@ -460,6 +460,38 @@ public void testConcatConcurrentWithInfinity() { } + @Test + public void testConcatConcurrentWithInfinityFirstSequence() { + final TestObservable w1 = new TestObservable("one", "two", "three"); + //This observable will send "hello" MAX_VALUE time. + final TestObservable w2 = new TestObservable("hello", Integer.MAX_VALUE); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + @SuppressWarnings("unchecked") + TestObservable> observableOfObservables = new TestObservable>(w2, w1); + Func1, Subscription> concatF = concat(observableOfObservables); + + Observable concat = Observable.create(concatF); + + concat.take(50).subscribe(aObserver); + + //Wait for the thread to start up. + try { + Thread.sleep(25); + w2.t.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(50)).onNext("hello"); + verify(aObserver, times(1)).onCompleted(); + verify(aObserver, never()).onError(any(Exception.class)); + + } + /** * Test unsubscribing the concatenated Observable in a single thread. From 9b3204b5f0494135e9178736aaf130154faf4b5f Mon Sep 17 00:00:00 2001 From: Billy Yuen Date: Wed, 15 May 2013 16:09:54 -0700 Subject: [PATCH 3/4] Revert "Add new unit test to cover infinite observable being the first sequence." This reverts commit 658d824b9f632f79a2b29979a6dfbb1a0a925783. --- .../java/rx/operators/OperationConcat.java | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 6f3b8902f5..625fb4cd3e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -36,7 +36,7 @@ import rx.Subscription; import rx.subscriptions.BooleanSubscription; import rx.util.AtomicObservableSubscription; - +import rx.util.Exceptions; import rx.util.functions.Func1; public final class OperationConcat { @@ -460,38 +460,6 @@ public void testConcatConcurrentWithInfinity() { } - @Test - public void testConcatConcurrentWithInfinityFirstSequence() { - final TestObservable w1 = new TestObservable("one", "two", "three"); - //This observable will send "hello" MAX_VALUE time. - final TestObservable w2 = new TestObservable("hello", Integer.MAX_VALUE); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - @SuppressWarnings("unchecked") - TestObservable> observableOfObservables = new TestObservable>(w2, w1); - Func1, Subscription> concatF = concat(observableOfObservables); - - Observable concat = Observable.create(concatF); - - concat.take(50).subscribe(aObserver); - - //Wait for the thread to start up. - try { - Thread.sleep(25); - w2.t.join(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - InOrder inOrder = inOrder(aObserver); - inOrder.verify(aObserver, times(50)).onNext("hello"); - verify(aObserver, times(1)).onCompleted(); - verify(aObserver, never()).onError(any(Exception.class)); - - } - /** * Test unsubscribing the concatenated Observable in a single thread. From 0063b90a47eedb7633c91acd9be6159040aa98f5 Mon Sep 17 00:00:00 2001 From: Billy Yuen Date: Wed, 15 May 2013 16:24:34 -0700 Subject: [PATCH 4/4] Add new unit test to check for non-blocking. --- .../java/rx/operators/OperationConcat.java | 67 ++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 625fb4cd3e..de65b89f02 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.ConcurrentLinkedQueue; +import org.junit.Ignore; import org.junit.Test; import org.mockito.InOrder; @@ -460,6 +462,69 @@ public void testConcatConcurrentWithInfinity() { } + + + @Test + public void testConcatUnSubscribeNotBlockingObservables() { + + final CountDownLatch okToContinueW1 = new CountDownLatch(1); + final CountDownLatch okToContinueW2 = new CountDownLatch(1); + + final TestObservable w1 = new TestObservable(null, okToContinueW1, "one", "two", "three"); + final TestObservable w2 = new TestObservable(null, okToContinueW2, "four", "five", "six"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { + + @Override + public Subscription call(Observer> observer) { + // simulate what would happen in an observable + observer.onNext(w1); + observer.onNext(w2); + observer.onCompleted(); + + return new Subscription() { + + @Override + public void unsubscribe() { + } + + }; + } + + }); + Observable concat = Observable.create(concat(observableOfObservables)); + + concat.subscribe(aObserver); + + verify(aObserver, times(0)).onCompleted(); + + + //Wait for the thread to start up. + try { + Thread.sleep(25); + w1.t.join(); + w2.t.join(); + okToContinueW1.countDown(); + okToContinueW2.countDown(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(1)).onNext("four"); + inOrder.verify(aObserver, times(1)).onNext("five"); + inOrder.verify(aObserver, times(1)).onNext("six"); + verify(aObserver, times(1)).onCompleted(); + + + } + /** * Test unsubscribing the concatenated Observable in a single thread. @@ -608,7 +673,7 @@ public void run() { once.countDown(); //Block until the main thread has called unsubscribe. if (null != okToContinue) - okToContinue.await(); + okToContinue.await(1, TimeUnit.SECONDS); } if (subscribed) observer.onCompleted();