Skip to content

Commit

Permalink
3.x: Add many fromX operators + marbles (#6873)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 25, 2020
1 parent ea8ab3b commit 26dacd7
Show file tree
Hide file tree
Showing 44 changed files with 3,032 additions and 247 deletions.
67 changes: 26 additions & 41 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public static Completable fromFuture(@NonNull Future<?> future) {
}

/**
* Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link Maybe} instance and
* Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits an {@code onComplete} event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any
* {@code onError} events.
* <p>
Expand All @@ -506,8 +506,8 @@ public static Completable fromFuture(@NonNull Future<?> future) {
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.17 - beta
* @param <T> the value type of the {@link MaybeSource} element
* @param maybe the {@code Maybe} instance to subscribe to, not {@code null}
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 2.2
Expand Down
209 changes: 207 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
Expand All @@ -28,8 +27,10 @@
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.rxjava3.internal.operators.single.SingleToFlowable;
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.rxjava3.internal.subscribers.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -1959,6 +1960,39 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return error(Functions.justSupplier(throwable));
}

/**
* Returns a {@code Flowable} instance that runs the given {@link Action} for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromAction.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromAction} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
* except when the downstream has canceled the resulting {@code Flowable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param action the {@code Action} to run for each subscriber
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code action} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromAction(@NonNull Action action) {
Objects.requireNonNull(action, "action is null");
return RxJavaPlugins.onAssembly(new FlowableFromAction<>(action));
}

/**
* Converts an array into a {@link Publisher} that emits the items in the array.
* <p>
Expand Down Expand Up @@ -2037,6 +2071,30 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable));
}

/**
* Wraps a {@link CompletableSource} into a {@code Flowable}.
* <p>
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromCompletable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param completableSource the {@code CompletableSource} to convert from
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code completableSource} is {@code null}
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromCompletable(@NonNull CompletableSource completableSource) {
Objects.requireNonNull(completableSource, "completableSource is null");
return RxJavaPlugins.onAssembly(new FlowableFromCompletable<>(completableSource));
}

/**
* Converts a {@link Future} into a {@link Publisher}.
* <p>
Expand Down Expand Up @@ -2157,6 +2215,94 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromIterable<>(source));
}

/**
* Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and
* emits {@code onSuccess} as a single item or forwards any {@code onComplete} or
* {@code onError} signal.
* <p>
* <img width="640" height="226" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromMaybe.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code MaybeSource} element
* @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code maybe} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> fromMaybe(@NonNull MaybeSource<T> maybe) {
Objects.requireNonNull(maybe, "maybe is null");
return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(maybe));
}

/**
* Converts the given {@link ObservableSource} into a {@code Flowable} by applying the specified backpressure strategy.
* <p>
* Marble diagrams for the various backpressure strategies are as follows:
* <ul>
* <li>{@link BackpressureStrategy#BUFFER}
* <p>
* <img width="640" height="264" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.buffer.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#DROP}
* <p>
* <img width="640" height="374" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.drop.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#LATEST}
* <p>
* <img width="640" height="284" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.latest.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#ERROR}
* <p>
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.error.png" alt="">
* </li>
* <li>{@link BackpressureStrategy#MISSING}
* <p>
* <img width="640" height="397" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromObservable.missing.png" alt="">
* </li>
* </ul>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator applies the chosen backpressure strategy of {@link BackpressureStrategy} enum.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the element type of the source and resulting sequence
* @param source the {@code ObservableSource} to convert
* @param strategy the backpressure strategy to apply
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source} or {@code strategy} is {@code null}
*/
@BackpressureSupport(BackpressureKind.SPECIAL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> fromObservable(@NonNull ObservableSource<T> source, @NonNull BackpressureStrategy strategy) {
Objects.requireNonNull(source, "source is null");
Objects.requireNonNull(strategy, "strategy is null");
Flowable<T> f = new FlowableFromObservable<>(source);
switch (strategy) {
case DROP:
return f.onBackpressureDrop();
case LATEST:
return f.onBackpressureLatest();
case MISSING:
return f;
case ERROR:
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f));
default:
return f.onBackpressureBuffer();
}
}

/**
* Converts an arbitrary <em>Reactive Streams</em> {@link Publisher} into a {@code Flowable} if not already a
* {@code Flowable}.
Expand Down Expand Up @@ -2198,6 +2344,65 @@ public static <T> Flowable<T> fromPublisher(@NonNull Publisher<@NonNull ? extend
return RxJavaPlugins.onAssembly(new FlowableFromPublisher<>(publisher));
}

/**
* Returns a {@code Flowable} instance that runs the given {@link Runnable} for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromRunnable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This source doesn't produce any elements and effectively ignores downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Runnable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
* except when the downstream has canceled the resulting {@code Flowable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param run the {@code Runnable} to run for each subscriber
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code run} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public static <T> Flowable<T> fromRunnable(@NonNull Runnable run) {
Objects.requireNonNull(run, "run is null");
return RxJavaPlugins.onAssembly(new FlowableFromRunnable<>(run));
}

/**
* Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and
* emits {@code onSuccess} as a single item or forwards the {@code onError} signal.
* <p>
* <img width="640" height="341" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.fromSingle.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@code SingleSource} element
* @param source the {@code SingleSource} instance to subscribe to, not {@code null}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> fromSingle(@NonNull SingleSource<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleToFlowable<>(source));
}

/**
* Returns a {@code Flowable} that, when a {@link Subscriber} subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
Expand Down
61 changes: 56 additions & 5 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -710,7 +711,7 @@ public static <T> Maybe<T> error(@NonNull Supplier<? extends Throwable> supplier
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Action} for each subscriber and
* Returns a {@code Maybe} instance that runs the given {@link Action} for each observer and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromAction.png" alt="">
Expand All @@ -720,13 +721,13 @@ public static <T> Maybe<T> error(@NonNull Supplier<? extends Throwable> supplier
* <dt><b>Error handling:</b></dt>
* <dd> If the {@code Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link MaybeObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Maybe} source.
* except when the downstream has disposed the resulting {@code Maybe} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param <T> the target type
* @param action the {@code Action} to run for each subscriber
* @param action the {@code Action} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code action} is {@code null}
*/
Expand Down Expand Up @@ -901,7 +902,57 @@ public static <T> Maybe<T> fromSingle(@NonNull SingleSource<T> single) {
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Runnable} for each subscriber and
* Wraps an {@link ObservableSource} into a {@code Maybe} and emits the very first item
* or completes if the source is empty.
* <p>
* <img width="640" height="276" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param source the {@code ObservableSource} to convert from
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> fromObservable(@NonNull ObservableSource<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<>(source, 0L));
}

/**
* Wraps a {@link Publisher} into a {@code Maybe} and emits the very first item
* or completes if the source is empty.
* <p>
* <img width="640" height="309" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromPublisher.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the given {@code Publisher} in an unbounded manner
* (requesting {@link Long#MAX_VALUE}) but cancels it after one item received.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param source the {@code Publisher} to convert from
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code source} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static <T> Maybe<T> fromPublisher(@NonNull Publisher<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new FlowableElementAtMaybePublisher<>(source, 0L));
}

/**
* Returns a {@code Maybe} instance that runs the given {@link Runnable} for each observer and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.fromRunnable.png" alt="">
Expand All @@ -910,7 +961,7 @@ public static <T> Maybe<T> fromSingle(@NonNull SingleSource<T> single) {
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target type
* @param run the {@code Runnable} to run for each subscriber
* @param run the {@code Runnable} to run for each observer
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code run} is {@code null}
*/
Expand Down
Loading

0 comments on commit 26dacd7

Please sign in to comment.