diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index bd43f275c1..376dd3ceae 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -21,10 +21,14 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.junit.Ignore; import org.junit.Test; import org.mockito.InOrder; @@ -40,23 +44,10 @@ public final class OperationConcat { /** * Combine the observable sequences from the list of Observables into one - * observable sequence without any transformation. If either the outer + * observable sequence without any transformation. If either the outer * observable or an inner observable calls onError, we will call onError. - * - *

- * - * The outer observable might run on a separate thread from (one of) the - * inner observables; in this case care must be taken to avoid a deadlock. - * The Concat operation may block the outer thread while servicing an inner - * thread in order to ensure a well-defined ordering of elements; therefore - * none of the inner threads must be implemented in a way that might wait on - * the outer thread. - * *

* - * Beware that concat(o1,o2).subscribe() is a blocking call from - * which it is impossible to unsubscribe if observables are running on same thread. - * * @param sequences An observable sequence of elements to project. * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ @@ -69,73 +60,101 @@ public static Func1, Subscription> concat(final List Func1, Subscription> concat(final Observable> sequences) { - return new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - return new ConcatSubscription(sequences, observer); - } - }; + return new Concat(sequences); } - private static class ConcatSubscription extends BooleanSubscription { - // Might be updated by an inner thread's onError during the outer - // thread's onNext, then read in the outer thread's onComplete. - final AtomicBoolean innerError = new AtomicBoolean(false); + private static class Concat implements Func1, Subscription> { + private Observable> sequences; + private AtomicObservableSubscription innerSubscription = null; - public ConcatSubscription(Observable> sequences, final Observer observer) { + public Concat(Observable> sequences) { + this.sequences = sequences; + } + + public Subscription call(final Observer observer) { + final AtomicBoolean completedOrErred = new AtomicBoolean(false); + final AtomicBoolean allSequencesReceived = new AtomicBoolean(false); + final Queue> nextSequences = new ConcurrentLinkedQueue>(); final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription(); - outerSubscription.wrap(sequences.subscribe(new Observer>() { + + final Observer reusableObserver = new Observer() { @Override - public void onNext(Observable nextSequence) { - // We will not return from onNext until the inner observer completes. - // NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted. - final CountDownLatch latch = new CountDownLatch(1); - final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(); - innerSubscription.wrap(nextSequence.subscribe(new Observer() { - @Override - public void onNext(T item) { - // Make our best-effort to release resources in the face of unsubscribe. - if (isUnsubscribed()) { - innerSubscription.unsubscribe(); - outerSubscription.unsubscribe(); - } else { - observer.onNext(item); + public void onNext(T item) { + observer.onNext(item); + } + @Override + public void onError(Exception e) { + if (completedOrErred.compareAndSet(false, true)) { + outerSubscription.unsubscribe(); + observer.onError(e); + } + } + @Override + public void onCompleted() { + synchronized (nextSequences) { + if (nextSequences.isEmpty()) { + // No new sequences available at the moment + innerSubscription = null; + if (allSequencesReceived.get()) { + // No new sequences are coming, we are finished + if (completedOrErred.compareAndSet(false, true)) { + observer.onCompleted(); + } } - } - @Override - public void onError(Exception e) { - outerSubscription.unsubscribe(); - innerError.set(true); - observer.onError(e); - latch.countDown(); - } - @Override - public void onCompleted() { + } else { // Continue on to the next sequence - latch.countDown(); + innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequences.poll().subscribe(this)); + } + } + } + }; + + outerSubscription.wrap(sequences.subscribe(new Observer>() { + @Override + public void onNext(Observable nextSequence) { + synchronized (nextSequences) { + if (innerSubscription == null) { + // We are currently not subscribed to any sequence + innerSubscription = new AtomicObservableSubscription(); + innerSubscription.wrap(nextSequence.subscribe(reusableObserver)); + } else { + // Put this sequence at the end of the queue + nextSequences.add(nextSequence); } - })); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Exceptions.propagate(e); } } @Override public void onError(Exception e) { - // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. - observer.onError(e); + if (completedOrErred.compareAndSet(false, true)) { + if (innerSubscription != null) { + innerSubscription.unsubscribe(); + } + observer.onError(e); + } } @Override public void onCompleted() { - // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls. - if (!innerError.get()) { - observer.onCompleted(); + allSequencesReceived.set(true); + if (innerSubscription == null) { + // We are not subscribed to any sequence, and none are coming anymore + if (completedOrErred.compareAndSet(false, true)) { + observer.onCompleted(); + } } } })); + + return new Subscription() { + @Override + public void unsubscribe() { + synchronized (nextSequences) { + if (innerSubscription != null) + innerSubscription.unsubscribe(); + outerSubscription.unsubscribe(); + } + } + }; } } @@ -442,9 +461,72 @@ public void testConcatConcurrentWithInfinity() { } + + + @Test + public void testConcatUnSubscribeNotBlockingObservables() { + + final CountDownLatch okToContinueW1 = new CountDownLatch(1); + final CountDownLatch okToContinueW2 = new CountDownLatch(1); + + final TestObservable w1 = new TestObservable(null, okToContinueW1, "one", "two", "three"); + final TestObservable w2 = new TestObservable(null, okToContinueW2, "four", "five", "six"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { + + @Override + public Subscription call(Observer> observer) { + // simulate what would happen in an observable + observer.onNext(w1); + observer.onNext(w2); + observer.onCompleted(); + + return new Subscription() { + + @Override + public void unsubscribe() { + } + + }; + } + + }); + Observable concat = Observable.create(concat(observableOfObservables)); + + concat.subscribe(aObserver); + + verify(aObserver, times(0)).onCompleted(); + + + //Wait for the thread to start up. + try { + Thread.sleep(25); + w1.t.join(); + w2.t.join(); + okToContinueW1.countDown(); + okToContinueW2.countDown(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext("one"); + inOrder.verify(aObserver, times(1)).onNext("two"); + inOrder.verify(aObserver, times(1)).onNext("three"); + inOrder.verify(aObserver, times(1)).onNext("four"); + inOrder.verify(aObserver, times(1)).onNext("five"); + inOrder.verify(aObserver, times(1)).onNext("six"); + verify(aObserver, times(1)).onCompleted(); + + + } + /** - * The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete. + * Test unsubscribing the concatenated Observable in a single thread. */ @Test public void testConcatUnsubscribe() { @@ -458,20 +540,13 @@ public void testConcatUnsubscribe() { @SuppressWarnings("unchecked") final Observable concat = Observable.create(concat(w1, w2)); final AtomicObservableSubscription s1 = new AtomicObservableSubscription(); - Thread t = new Thread() { - @Override - public void run() { - // NB: this statement does not complete until after "six" has been delivered. - s1.wrap(concat.subscribe(aObserver)); - } - }; - t.start(); + try { + // Subscribe + s1.wrap(concat.subscribe(aObserver)); //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once. callOnce.await(); - // NB: This statement has no effect, since s1 cannot possibly - // wrap anything until "six" has been delivered, which cannot - // happen until we okToContinue.countDown() + // Unsubcribe s1.unsubscribe(); //Unblock the observable to continue. okToContinue.countDown(); @@ -487,10 +562,9 @@ public void run() { inOrder.verify(aObserver, times(1)).onNext("two"); inOrder.verify(aObserver, times(1)).onNext("three"); inOrder.verify(aObserver, times(1)).onNext("four"); - // NB: you might hope that five and six are not delivered, but see above. - inOrder.verify(aObserver, times(1)).onNext("five"); - inOrder.verify(aObserver, times(1)).onNext("six"); - inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onNext("five"); + inOrder.verify(aObserver, never()).onNext("six"); + inOrder.verify(aObserver, never()).onCompleted(); } @@ -598,7 +672,7 @@ public void run() { once.countDown(); //Block until the main thread has called unsubscribe. if (null != okToContinue) - okToContinue.await(); + okToContinue.await(1, TimeUnit.SECONDS); } if (subscribed) observer.onCompleted();