Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: add multi-other withLatestFrom operators #3966

Merged
merged 1 commit into from
Jun 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 326 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9929,7 +9929,332 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
return lift(new OperatorWithLatestFrom<T, U, R>(other, resultSelector));
}


/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, R> Observable<R> withLatestFrom(Observable<T1> o1, Observable<T2> o2, Func3<? super T, ? super T1, ? super T2, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this, new Observable<?>[] { o1, o2 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T3> o3,
Func4<? super T, ? super T1, ? super T2, ? super T3, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <T4> the fourth other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, T4, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T3> o3, Observable<T4> o4,
Func5<? super T, ? super T1, ? super T2, ? super T3, ? super T4, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3, o4 }, null, Functions.fromFunc(combiner)));
}
/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <T4> the fourth other source's value type
* @param <T5> the fifth other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, T4, T5, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T1> o3, Observable<T2> o4,
Observable<T1> o5,
Func6<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3, o4, o5 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <T4> the fourth other source's value type
* @param <T5> the fifth other source's value type
* @param <T6> the sixth other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, T4, T5, T6, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T1> o3, Observable<T2> o4,
Observable<T1> o5, Observable<T2> o6,
Func7<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3, o4, o5, o6 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <T4> the fourth other source's value type
* @param <T5> the fifth other source's value type
* @param <T6> the sixth other source's value type
* @param <T7> the seventh other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T1> o3, Observable<T2> o4,
Observable<T1> o5, Observable<T2> o6,
Observable<T1> o7,
Func8<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first other source's value type
* @param <T2> the second other source's value type
* @param <T3> the third other source's value type
* @param <T4> the fourth other source's value type
* @param <T5> the fifth other source's value type
* @param <T6> the sixth other source's value type
* @param <T7> the seventh other source's value type
* @param <T8> the eigth other source's value type
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> withLatestFrom(
Observable<T1> o1, Observable<T2> o2,
Observable<T1> o3, Observable<T2> o4,
Observable<T1> o5, Observable<T2> o6,
Observable<T1> o7, Observable<T2> o8,
Func9<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this,
new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }, null, Functions.fromFunc(combiner)));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param others the array of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <R> Observable<R> withLatestFrom(Observable<?>[] others, FuncN<R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this, others, null, combiner));
}

/**
* Combines the value emission from this Observable with the latest emissions from the
* other Observables via a function to produce the output item.
*
* <p>Note that this operator doesn't emit anything until all other sources have produced at
* least one value. The resulting emission only happens when this Observable emits (and
* not when any of the other sources emit, unlike combineLatest).
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
*
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param others the iterable of other sources
* @param combiner the function called with an array of values from each participating observable
* @return the new Observable instance
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final <R> Observable<R> withLatestFrom(Iterable<Observable<?>> others, FuncN<R> combiner) {
return create(new OperatorWithLatestFromMany<T, R>(this, null, others, combiner));
}

/**
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping windows. It emits the current window and opens a new one
Expand Down
Loading