From 97d90c6c0a9faa999781b60f7d698b4540e86b15 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 26 May 2016 11:03:12 +0200 Subject: [PATCH] 1.x: add multi-other withLatestFrom operators --- src/main/java/rx/Observable.java | 327 +++++++++++++++- .../operators/OperatorWithLatestFromMany.java | 214 ++++++++++ .../operators/OperatorWithLatestFromTest.java | 365 +++++++++++++++++- 3 files changed, 903 insertions(+), 3 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OperatorWithLatestFromMany.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index cad9d2d0f7..34313768a3 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9929,7 +9929,332 @@ public final Observable unsubscribeOn(Scheduler scheduler) { public final Observable withLatestFrom(Observable other, Func2 resultSelector) { return lift(new OperatorWithLatestFrom(other, resultSelector)); } - + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom(Observable o1, Observable o2, Func3 combiner) { + return create(new OperatorWithLatestFromMany(this, new Observable[] { o1, o2 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, + Func4 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the fourth other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, Observable o4, + Func5 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3, o4 }, null, Functions.fromFunc(combiner))); + } + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the fourth other source's value type + * @param the fifth other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, Observable o4, + Observable o5, + Func6 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3, o4, o5 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the fourth other source's value type + * @param the fifth other source's value type + * @param the sixth other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, Observable o4, + Observable o5, Observable o6, + Func7 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3, o4, o5, o6 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the fourth other source's value type + * @param the fifth other source's value type + * @param the sixth other source's value type + * @param the seventh other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, Observable o4, + Observable o5, Observable o6, + Observable o7, + Func8 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3, o4, o5, o6, o7 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the first other source's value type + * @param the second other source's value type + * @param the third other source's value type + * @param the fourth other source's value type + * @param the fifth other source's value type + * @param the sixth other source's value type + * @param the seventh other source's value type + * @param the eigth other source's value type + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom( + Observable o1, Observable o2, + Observable o3, Observable o4, + Observable o5, Observable o6, + Observable o7, Observable o8, + Func9 combiner) { + return create(new OperatorWithLatestFromMany(this, + new Observable[] { o1, o2, o3, o4, o5, o6, o7, o8 }, null, Functions.fromFunc(combiner))); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param others the array of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom(Observable[] others, FuncN combiner) { + return create(new OperatorWithLatestFromMany(this, others, null, combiner)); + } + + /** + * Combines the value emission from this Observable with the latest emissions from the + * other Observables via a function to produce the output item. + * + *

Note that this operator doesn't emit anything until all other sources have produced at + * least one value. The resulting emission only happens when this Observable emits (and + * not when any of the other sources emit, unlike combineLatest). + * If a source doesn't produce any value and just completes, the sequence is completed immediately. + * + *

+ *
Backpressure Support:
+ *
This operator is a pass-through for backpressure behavior between this {@code Observable} + * and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.
+ *
Scheduler:
+ *
This operator does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param others the iterable of other sources + * @param combiner the function called with an array of values from each participating observable + * @return the new Observable instance + * @Experimental The behavior of this can change at any time. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable withLatestFrom(Iterable> others, FuncN combiner) { + return create(new OperatorWithLatestFromMany(this, null, others, combiner)); + } + /** * Returns an Observable that emits windows of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping windows. It emits the current window and opens a new one diff --git a/src/main/java/rx/internal/operators/OperatorWithLatestFromMany.java b/src/main/java/rx/internal/operators/OperatorWithLatestFromMany.java new file mode 100644 index 0000000000..91c390e63d --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorWithLatestFromMany.java @@ -0,0 +1,214 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.Arrays; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.exceptions.Exceptions; +import rx.functions.FuncN; +import rx.internal.util.RxJavaPluginUtils; +import rx.observers.SerializedSubscriber; + +public final class OperatorWithLatestFromMany implements OnSubscribe { + final Observable main; + + final Observable[] others; + + final Iterable> othersIterable; + + final FuncN combiner; + + public OperatorWithLatestFromMany(Observable main, Observable[] others, Iterable> othersIterable, FuncN combiner) { + this.main = main; + this.others = others; + this.othersIterable = othersIterable; + this.combiner = combiner; + } + + @Override + public void call(Subscriber t) { + SerializedSubscriber serial = new SerializedSubscriber(t); + + + Observable[] sources; + int n = 0; + + if (others != null) { + sources = others; + n = sources.length; + } else { + sources = new Observable[8]; + for (Observable o : othersIterable) { + if (n == sources.length) { + sources = Arrays.copyOf(sources, n + (n >> 2)); + } + sources[n++] = o; + } + } + + WithLatestMainSubscriber parent = new WithLatestMainSubscriber(t, combiner, n); + + serial.add(parent); + + + for (int i = 0; i < n; i++) { + Observable o = sources[i]; + if (serial.isUnsubscribed()) { + return; + } + + WithLatestOtherSubscriber inner = new WithLatestOtherSubscriber(parent, i + 1); + parent.add(inner); + + o.unsafeSubscribe(inner); + } + + main.unsafeSubscribe(parent); + } + + static final class WithLatestMainSubscriber extends Subscriber { + final Subscriber actual; + + final FuncN combiner; + + final AtomicReferenceArray current; + + static final Object EMPTY = new Object(); + + final AtomicInteger ready; + + boolean done; + + public WithLatestMainSubscriber(Subscriber actual, FuncN combiner, int n) { + this.actual = actual; + this.combiner = combiner; + + AtomicReferenceArray array = new AtomicReferenceArray(n + 1); + for (int i = 0; i <= n; i++) { + array.lazySet(i, EMPTY); + } + this.current = array; + + this.ready = new AtomicInteger(n); + this.request(0); + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (ready.get() == 0) { + + AtomicReferenceArray array = current; + int n = array.length(); + array.lazySet(0, t); + + Object[] copy = new Object[array.length()]; + for (int i = 0; i < n; i++) { + copy[i] = array.get(i); + } + + R result; + + try { + result = combiner.call(copy); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + actual.onNext(result); + } else { + request(1); + } + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; + } + done = true; + unsubscribe(); + actual.onError(e); + } + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + unsubscribe(); + actual.onCompleted(); + } + + @Override + public void setProducer(Producer p) { + super.setProducer(p); + actual.setProducer(p); + } + + void innerNext(int index, Object o) { + Object last = current.getAndSet(index, o); + if (last == EMPTY) { + ready.decrementAndGet(); + } + } + + void innerError(int index, Throwable e) { + onError(e); + } + + void innerComplete(int index) { + if (current.get(index) == EMPTY) { + onCompleted(); + } + } + } + + static final class WithLatestOtherSubscriber extends Subscriber { + final WithLatestMainSubscriber parent; + + final int index; + + public WithLatestOtherSubscriber(WithLatestMainSubscriber parent, int index) { + this.parent = parent; + this.index = index; + } + + @Override + public void onNext(Object t) { + parent.innerNext(index, t); + } + + @Override + public void onError(Throwable e) { + parent.innerError(index, e); + } + + @Override + public void onCompleted() { + parent.innerComplete(index); + } + } +} diff --git a/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java b/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java index a172158115..a0a6a9d20c 100644 --- a/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java +++ b/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java @@ -21,13 +21,13 @@ import java.util.*; -import org.junit.Test; +import org.junit.*; import org.mockito.InOrder; import rx.Observable; import rx.Observer; import rx.exceptions.TestException; -import rx.functions.Func2; +import rx.functions.*; import rx.observers.TestSubscriber; import rx.subjects.PublishSubject; @@ -295,4 +295,365 @@ public void onStart() { ts.assertNoErrors(); } + + static final FuncN toArray = new FuncN() { + @Override + public String call(Object... args) { + return Arrays.toString(args); + } + }; + + @Test + public void manySources() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + PublishSubject main = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + main.withLatestFrom(new Observable[] { ps1, ps2, ps3 }, toArray) + .subscribe(ts); + + main.onNext("1"); + ts.assertNoValues(); + ps1.onNext("a"); + ts.assertNoValues(); + ps2.onNext("A"); + ts.assertNoValues(); + ps3.onNext("="); + ts.assertNoValues(); + + main.onNext("2"); + ts.assertValues("[2, a, A, =]"); + + ps2.onNext("B"); + + ts.assertValues("[2, a, A, =]"); + + ps3.onCompleted(); + ts.assertValues("[2, a, A, =]"); + + ps1.onNext("b"); + + main.onNext("3"); + + ts.assertValues("[2, a, A, =]", "[3, b, B, =]"); + + main.onCompleted(); + ts.assertValues("[2, a, A, =]", "[3, b, B, =]"); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertFalse("ps1 has subscribers?", ps1.hasObservers()); + Assert.assertFalse("ps2 has subscribers?", ps2.hasObservers()); + Assert.assertFalse("ps3 has subscribers?", ps3.hasObservers()); + } + + @Test + public void manySourcesIterable() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + PublishSubject main = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + main.withLatestFrom(Arrays.>asList(ps1, ps2, ps3), toArray) + .subscribe(ts); + + main.onNext("1"); + ts.assertNoValues(); + ps1.onNext("a"); + ts.assertNoValues(); + ps2.onNext("A"); + ts.assertNoValues(); + ps3.onNext("="); + ts.assertNoValues(); + + main.onNext("2"); + ts.assertValues("[2, a, A, =]"); + + ps2.onNext("B"); + + ts.assertValues("[2, a, A, =]"); + + ps3.onCompleted(); + ts.assertValues("[2, a, A, =]"); + + ps1.onNext("b"); + + main.onNext("3"); + + ts.assertValues("[2, a, A, =]", "[3, b, B, =]"); + + main.onCompleted(); + ts.assertValues("[2, a, A, =]", "[3, b, B, =]"); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertFalse("ps1 has subscribers?", ps1.hasObservers()); + Assert.assertFalse("ps2 has subscribers?", ps2.hasObservers()); + Assert.assertFalse("ps3 has subscribers?", ps3.hasObservers()); + } + + @Test + public void manySourcesIterableSweep() { + for (String val : new String[] { "1", null }) { + int n = 35; + for (int i = 0; i < n; i++) { + List> sources = new ArrayList>(); + List expected = new ArrayList(); + expected.add(val); + + for (int j = 0; j < i; j++) { + sources.add(Observable.just(val)); + expected.add(String.valueOf(val)); + } + + TestSubscriber ts = new TestSubscriber(); + + PublishSubject main = PublishSubject.create(); + + main.withLatestFrom(sources, toArray).subscribe(ts); + + ts.assertNoValues(); + + main.onNext(val); + main.onCompleted(); + + ts.assertValue(expected.toString()); + ts.assertNoErrors(); + ts.assertCompleted(); + } + } + } + + @Test + public void backpressureNoSignal() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(0); + + Observable.range(1, 10).withLatestFrom(new Observable[] { ps1, ps2 }, toArray) + .subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(1); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertFalse("ps1 has subscribers?", ps1.hasObservers()); + Assert.assertFalse("ps2 has subscribers?", ps2.hasObservers()); + } + + @Test + public void backpressureWithSignal() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(0); + + Observable.range(1, 3).withLatestFrom(new Observable[] { ps1, ps2 }, toArray) + .subscribe(ts); + + ts.assertNoValues(); + + ps1.onNext("1"); + ps2.onNext("1"); + + ts.requestMore(1); + + ts.assertValue("[1, 1, 1]"); + + ts.requestMore(1); + + ts.assertValues("[1, 1, 1]", "[2, 1, 1]"); + + ts.requestMore(1); + + ts.assertValues("[1, 1, 1]", "[2, 1, 1]", "[3, 1, 1]"); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertFalse("ps1 has subscribers?", ps1.hasObservers()); + Assert.assertFalse("ps2 has subscribers?", ps2.hasObservers()); + } + + @Test + public void withEmpty() { + TestSubscriber ts = new TestSubscriber(0); + + Observable.range(1, 3).withLatestFrom( + new Observable[] { Observable.just(1), Observable.empty() }, toArray) + .subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void withError() { + TestSubscriber ts = new TestSubscriber(0); + + Observable.range(1, 3).withLatestFrom( + new Observable[] { Observable.just(1), Observable.error(new TestException()) }, toArray) + .subscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void withMainError() { + TestSubscriber ts = new TestSubscriber(0); + + Observable.error(new TestException()).withLatestFrom( + new Observable[] { Observable.just(1), Observable.just(1) }, toArray) + .subscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void with2Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, new Func3>() { + @Override + public List call(Integer a, Integer b, Integer c) { + return Arrays.asList(a, b, c); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with3Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, new Func4>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d) { + return Arrays.asList(a, b, c, d); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with4Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, just, new Func5>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d, Integer e) { + return Arrays.asList(a, b, c, d, e); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with5Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, just, just, new Func6>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f) { + return Arrays.asList(a, b, c, d, e, f); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with6Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, just, just, just, new Func7>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f, Integer g) { + return Arrays.asList(a, b, c, d, e, f, g); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with7Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, just, just, just, just, new Func8>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f, Integer g, Integer i) { + return Arrays.asList(a, b, c, d, e, f, g, i); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void with8Others() { + Observable just = Observable.just(1); + + TestSubscriber> ts = new TestSubscriber>(); + + just.withLatestFrom(just, just, just, just, just, just, just, just, new Func9>() { + @Override + public List call(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f, Integer g, Integer i, Integer j) { + return Arrays.asList(a, b, c, d, e, f, g, i, j); + } + }) + .subscribe(ts); + + ts.assertValue(Arrays.asList(1, 1, 1, 1, 1, 1, 1, 1, 1)); + ts.assertNoErrors(); + ts.assertCompleted(); + } + }