Skip to content

Commit

Permalink
2.x: Fix Observable.concatMapSingle dropping upstream items (#5972)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Apr 24, 2018
1 parent 63877ae commit 05b0d40
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -107,7 +107,7 @@ static final class ConcatMapSingleMainObserver<T, R>
this.errorMode = errorMode;
this.errors = new AtomicThrowable();
this.inner = new ConcatMapSingleObserver<R>(this);
this.queue = new SpscArrayQueue<T>(prefetch);
this.queue = new SpscLinkedArrayQueue<T>(prefetch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,21 @@ public void cancelNoConcurrentClean() {

assertTrue(operator.queue.isEmpty());
}

@Test
public void checkUnboundedInnerQueue() {
MaybeSubject<Integer> ms = MaybeSubject.create();

@SuppressWarnings("unchecked")
TestObserver<Integer> to = Observable
.fromArray(ms, Maybe.just(2), Maybe.just(3), Maybe.just(4))
.concatMapMaybe(Functions.<Maybe<Integer>>identity(), 2)
.test();

to.assertEmpty();

ms.onSuccess(1);

to.assertResult(1, 2, 3, 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,21 @@ public void cancelNoConcurrentClean() {

assertTrue(operator.queue.isEmpty());
}

@Test
public void checkUnboundedInnerQueue() {
SingleSubject<Integer> ss = SingleSubject.create();

@SuppressWarnings("unchecked")
TestObserver<Integer> to = Observable
.fromArray(ss, Single.just(2), Single.just(3), Single.just(4))
.concatMapSingle(Functions.<Single<Integer>>identity(), 2)
.test();

to.assertEmpty();

ss.onSuccess(1);

to.assertResult(1, 2, 3, 4);
}
}

0 comments on commit 05b0d40

Please sign in to comment.