From 26dacd72f1929c78d84da41ec80dc1ae3996c9f9 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Sat, 25 Jan 2020 22:07:48 +0100 Subject: [PATCH] 3.x: Add many fromX operators + marbles (#6873) --- docs/Operator-Matrix.md | 67 +++--- .../reactivex/rxjava3/core/Completable.java | 6 +- .../io/reactivex/rxjava3/core/Flowable.java | 209 ++++++++++++++++- .../java/io/reactivex/rxjava3/core/Maybe.java | 61 ++++- .../io/reactivex/rxjava3/core/Observable.java | 130 +++++++++++ .../io/reactivex/rxjava3/core/Single.java | 50 ++++ .../fuseable/AbstractEmptyQueueFuseable.java | 76 ++++++ .../fuseable/CancellableQueueFuseable.java | 42 ++++ .../SubscriberCompletableObserver.java | 59 ----- .../completable/CompletableFromAction.java | 24 +- .../completable/CompletableFromRunnable.java | 24 +- .../completable/CompletableToFlowable.java | 5 +- .../completable/CompletableToObservable.java | 66 +----- .../FlowableElementAtMaybePublisher.java | 42 ++++ .../flowable/FlowableFromAction.java | 68 ++++++ .../flowable/FlowableFromCompletable.java | 86 +++++++ .../flowable/FlowableFromObservable.java | 5 +- .../flowable/FlowableFromRunnable.java | 68 ++++++ .../operators/maybe/MaybeFromCompletable.java | 2 +- .../operators/maybe/MaybeToSingle.java | 2 +- .../observable/ObservableFromAction.java | 66 ++++++ .../observable/ObservableFromCompletable.java | 89 +++++++ .../observable/ObservableFromRunnable.java | 66 ++++++ .../CancellableQueueFuseableTest.java | 75 ++++++ .../CompletableAndThenCompletableabTest.java | 2 +- .../CompletableFromActionTest.java | 16 +- .../CompletableFromRunnableTest.java | 16 +- .../CompletableToObservableTest.java | 38 --- .../flowable/FlowableFromActionTest.java | 214 +++++++++++++++++ .../flowable/FlowableFromCompletableTest.java | 196 ++++++++++++++++ .../flowable/FlowableFromMaybeTest.java | 93 ++++++++ .../flowable/FlowableFromObservableTest.java | 10 + .../flowable/FlowableFromRunnableTest.java | 220 ++++++++++++++++++ .../flowable/FlowableFromSingleTest.java | 86 +++++++ .../maybe/MaybeFromObservableTest.java | 50 ++++ .../maybe/MaybeFromPubisherTest.java | 50 ++++ .../observable/ObservableFromActionTest.java | 214 +++++++++++++++++ .../ObservableFromCompletableTest.java | 195 ++++++++++++++++ .../observable/ObservableFromMaybeTest.java | 93 ++++++++ .../ObservableFromRunnableTest.java | 220 ++++++++++++++++++ .../observable/ObservableFromSingleTest.java | 86 +++++++ .../operators/single/SingleFromMaybeTest.java | 72 ++++++ .../util/OperatorMatrixGenerator.java | 15 ++ .../validators/InternalWrongNaming.java | 5 +- 44 files changed, 3032 insertions(+), 247 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java delete mode 100644 src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromActionTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromMaybeTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSingleTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromObservableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromPubisherTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromActionTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromMaybeTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromSingleTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromMaybeTest.java diff --git a/docs/Operator-Matrix.md b/docs/Operator-Matrix.md index 3eaaa51221..81e1f7878d 100644 --- a/docs/Operator-Matrix.md +++ b/docs/Operator-Matrix.md @@ -102,19 +102,19 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `flattenStreamAsObservable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([67](#notes-67))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([67](#notes-67))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([27](#notes-27))| `forEach`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))| `forEachWhile`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([68](#notes-68))| -`fromAction`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromAction`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([23](#notes-23))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `fromArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([69](#notes-69))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([70](#notes-70))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([71](#notes-71))| `fromCallable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`fromCompletable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([72](#notes-72))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))| +`fromCompletable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([72](#notes-72))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))| `fromCompletionStage`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `fromFuture`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `fromIterable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([69](#notes-69))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([70](#notes-70))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([71](#notes-71))| -`fromMaybe`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`fromObservable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromMaybe`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromObservable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `fromOptional`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([70](#notes-70))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([71](#notes-71))| -`fromPublisher`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`fromRunnable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`fromSingle`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromPublisher`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromRunnable`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([23](#notes-23))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| +`fromSingle`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([73](#notes-73))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `fromStream`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([69](#notes-69))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([70](#notes-70))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([71](#notes-71))| `fromSupplier`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `generate`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([74](#notes-74))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([74](#notes-74))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([74](#notes-74))| @@ -237,6 +237,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `zip`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([108](#notes-108))| `zipArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([109](#notes-109))| `zipWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([110](#notes-110))| +**237 operators** | **215** | **209** | **108** | **93** | **76** | #### Notes 1 Use [`contains()`](#contains).
@@ -375,37 +376,21 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat 21. Maybe.doOnLifecycle() 22. Single.doOnLifecycle() 23. Completable.doOnLifecycle() -24. Flowable.fromAction() -25. Observable.fromAction() -26. Single.fromAction() -27. Flowable.fromCompletable() -28. Observable.fromCompletable() -29. Flowable.fromMaybe() -30. Observable.fromMaybe() -31. Single.fromMaybe() -32. Flowable.fromObservable() -33. Maybe.fromObservable() -34. Maybe.fromPublisher() -35. Flowable.fromRunnable() -36. Observable.fromRunnable() -37. Single.fromRunnable() -38. Flowable.fromSingle() -39. Observable.fromSingle() -40. Single.mergeArray() -41. Single.mergeArrayDelayError() -42. Single.ofType() -43. Completable.onErrorReturn() -44. Completable.onErrorReturnItem() -45. Maybe.safeSubscribe() -46. Single.safeSubscribe() -47. Completable.safeSubscribe() -48. Completable.sequenceEqual() -49. Maybe.startWith() -50. Single.startWith() -51. Maybe.timeInterval() -52. Single.timeInterval() -53. Completable.timeInterval() -54. Maybe.timestamp() -55. Single.timestamp() -56. Maybe.toFuture() -57. Completable.toFuture() +24. Single.mergeArray() +25. Single.mergeArrayDelayError() +26. Single.ofType() +27. Completable.onErrorReturn() +28. Completable.onErrorReturnItem() +29. Maybe.safeSubscribe() +30. Single.safeSubscribe() +31. Completable.safeSubscribe() +32. Completable.sequenceEqual() +33. Maybe.startWith() +34. Single.startWith() +35. Maybe.timeInterval() +36. Single.timeInterval() +37. Completable.timeInterval() +38. Maybe.timestamp() +39. Single.timestamp() +40. Maybe.toFuture() +41. Completable.toFuture() diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 779d04d1d2..e965855e00 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -496,7 +496,7 @@ public static Completable fromFuture(@NonNull Future future) { } /** - * Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link Maybe} instance and + * Returns a {@code Completable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and * emits an {@code onComplete} event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any * {@code onError} events. *

@@ -506,8 +506,8 @@ public static Completable fromFuture(@NonNull Future future) { *

{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
* *

History: 2.1.17 - beta - * @param the value type of the {@link MaybeSource} element - * @param maybe the {@code Maybe} instance to subscribe to, not {@code null} + * @param the value type of the {@code MaybeSource} element + * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null} * @return the new {@code Completable} instance * @throws NullPointerException if {@code maybe} is {@code null} * @since 2.2 diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index d1b17e8b84..3d83d477c5 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -19,7 +19,6 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; -import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.flowables.*; @@ -28,8 +27,10 @@ import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier; import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.operators.flowable.*; +import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable; import io.reactivex.rxjava3.internal.operators.mixed.*; -import io.reactivex.rxjava3.internal.operators.observable.*; +import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher; +import io.reactivex.rxjava3.internal.operators.single.SingleToFlowable; import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler; import io.reactivex.rxjava3.internal.subscribers.*; import io.reactivex.rxjava3.internal.util.*; @@ -1959,6 +1960,39 @@ public static Flowable error(@NonNull Throwable throwable) { return error(Functions.justSupplier(throwable)); } + /** + * Returns a {@code Flowable} instance that runs the given {@link Action} for each subscriber and + * emits either its exception or simply completes. + *

+ * + *

+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Action} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Subscriber#onError(Throwable)}, + * except when the downstream has canceled the resulting {@code Flowable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param the target type + * @param action the {@code Action} to run for each subscriber + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code action} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public static Flowable fromAction(@NonNull Action action) { + Objects.requireNonNull(action, "action is null"); + return RxJavaPlugins.onAssembly(new FlowableFromAction<>(action)); + } + /** * Converts an array into a {@link Publisher} that emits the items in the array. *

@@ -2037,6 +2071,30 @@ public static Flowable error(@NonNull Throwable throwable) { return RxJavaPlugins.onAssembly(new FlowableFromCallable<>(callable)); } + /** + * Wraps a {@link CompletableSource} into a {@code Flowable}. + *

+ * + *

+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type + * @param completableSource the {@code CompletableSource} to convert from + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code completableSource} is {@code null} + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public static Flowable fromCompletable(@NonNull CompletableSource completableSource) { + Objects.requireNonNull(completableSource, "completableSource is null"); + return RxJavaPlugins.onAssembly(new FlowableFromCompletable<>(completableSource)); + } + /** * Converts a {@link Future} into a {@link Publisher}. *

@@ -2157,6 +2215,94 @@ public static Flowable error(@NonNull Throwable throwable) { return RxJavaPlugins.onAssembly(new FlowableFromIterable<>(source)); } + /** + * Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and + * emits {@code onSuccess} as a single item or forwards any {@code onComplete} or + * {@code onError} signal. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element + * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null} + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code maybe} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public static Flowable fromMaybe(@NonNull MaybeSource maybe) { + Objects.requireNonNull(maybe, "maybe is null"); + return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(maybe)); + } + + /** + * Converts the given {@link ObservableSource} into a {@code Flowable} by applying the specified backpressure strategy. + *

+ * Marble diagrams for the various backpressure strategies are as follows: + *

    + *
  • {@link BackpressureStrategy#BUFFER} + *

    + * + *

  • + *
  • {@link BackpressureStrategy#DROP} + *

    + * + *

  • + *
  • {@link BackpressureStrategy#LATEST} + *

    + * + *

  • + *
  • {@link BackpressureStrategy#ERROR} + *

    + * + *

  • + *
  • {@link BackpressureStrategy#MISSING} + *

    + * + *

  • + *
+ *
+ *
Backpressure:
+ *
The operator applies the chosen backpressure strategy of {@link BackpressureStrategy} enum.
+ *
Scheduler:
+ *
{@code fromObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the source and resulting sequence + * @param source the {@code ObservableSource} to convert + * @param strategy the backpressure strategy to apply + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code source} or {@code strategy} is {@code null} + */ + @BackpressureSupport(BackpressureKind.SPECIAL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public static Flowable fromObservable(@NonNull ObservableSource source, @NonNull BackpressureStrategy strategy) { + Objects.requireNonNull(source, "source is null"); + Objects.requireNonNull(strategy, "strategy is null"); + Flowable f = new FlowableFromObservable<>(source); + switch (strategy) { + case DROP: + return f.onBackpressureDrop(); + case LATEST: + return f.onBackpressureLatest(); + case MISSING: + return f; + case ERROR: + return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f)); + default: + return f.onBackpressureBuffer(); + } + } + /** * Converts an arbitrary Reactive Streams {@link Publisher} into a {@code Flowable} if not already a * {@code Flowable}. @@ -2198,6 +2344,65 @@ public static Flowable fromPublisher(@NonNull Publisher<@NonNull ? extend return RxJavaPlugins.onAssembly(new FlowableFromPublisher<>(publisher)); } + /** + * Returns a {@code Flowable} instance that runs the given {@link Runnable} for each subscriber and + * emits either its exception or simply completes. + *

+ * + *

+ *
Backpressure:
+ *
This source doesn't produce any elements and effectively ignores downstream backpressure.
+ *
Scheduler:
+ *
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Runnable} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Subscriber#onError(Throwable)}, + * except when the downstream has canceled the resulting {@code Flowable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param the target type + * @param run the {@code Runnable} to run for each subscriber + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code run} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public static Flowable fromRunnable(@NonNull Runnable run) { + Objects.requireNonNull(run, "run is null"); + return RxJavaPlugins.onAssembly(new FlowableFromRunnable<>(run)); + } + + /** + * Returns a {@code Flowable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and + * emits {@code onSuccess} as a single item or forwards the {@code onError} signal. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code SingleSource} element + * @param source the {@code SingleSource} instance to subscribe to, not {@code null} + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code source} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public static Flowable fromSingle(@NonNull SingleSource source) { + Objects.requireNonNull(source, "source is null"); + return RxJavaPlugins.onAssembly(new SingleToFlowable<>(source)); + } + /** * Returns a {@code Flowable} that, when a {@link Subscriber} subscribes to it, invokes a supplier function you specify and then * emits the value returned from that function. diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index a9435534f6..12ab3678e2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -30,6 +30,7 @@ import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.maybe.*; import io.reactivex.rxjava3.internal.operators.mixed.*; +import io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe; import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -710,7 +711,7 @@ public static Maybe error(@NonNull Supplier supplier } /** - * Returns a {@code Maybe} instance that runs the given {@link Action} for each subscriber and + * Returns a {@code Maybe} instance that runs the given {@link Action} for each observer and * emits either its exception or simply completes. *

* @@ -720,13 +721,13 @@ public static Maybe error(@NonNull Supplier supplier *

Error handling:
*
If the {@code Action} throws an exception, the respective {@link Throwable} is * delivered to the downstream via {@link MaybeObserver#onError(Throwable)}, - * except when the downstream has disposed this {@code Maybe} source. + * except when the downstream has disposed the resulting {@code Maybe} source. * In this latter case, the {@code Throwable} is delivered to the global error handler via * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}. *
* * @param the target type - * @param action the {@code Action} to run for each subscriber + * @param action the {@code Action} to run for each observer * @return the new {@code Maybe} instance * @throws NullPointerException if {@code action} is {@code null} */ @@ -901,7 +902,57 @@ public static Maybe fromSingle(@NonNull SingleSource single) { } /** - * Returns a {@code Maybe} instance that runs the given {@link Runnable} for each subscriber and + * Wraps an {@link ObservableSource} into a {@code Maybe} and emits the very first item + * or completes if the source is empty. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type + * @param source the {@code ObservableSource} to convert from + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code source} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Maybe fromObservable(@NonNull ObservableSource source) { + Objects.requireNonNull(source, "source is null"); + return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<>(source, 0L)); + } + + /** + * Wraps a {@link Publisher} into a {@code Maybe} and emits the very first item + * or completes if the source is empty. + *

+ * + *

+ *
Backpressure:
+ *
The operator consumes the given {@code Publisher} in an unbounded manner + * (requesting {@link Long#MAX_VALUE}) but cancels it after one item received.
+ *
Scheduler:
+ *
{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type + * @param source the {@code Publisher} to convert from + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code source} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + public static Maybe fromPublisher(@NonNull Publisher source) { + Objects.requireNonNull(source, "source is null"); + return RxJavaPlugins.onAssembly(new FlowableElementAtMaybePublisher<>(source, 0L)); + } + + /** + * Returns a {@code Maybe} instance that runs the given {@link Runnable} for each observer and * emits either its exception or simply completes. *

* @@ -910,7 +961,7 @@ public static Maybe fromSingle(@NonNull SingleSource single) { *

{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
* * @param the target type - * @param run the {@code Runnable} to run for each subscriber + * @param run the {@code Runnable} to run for each observer * @return the new {@code Maybe} instance * @throws NullPointerException if {@code run} is {@code null} */ diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index f599ce0eeb..89de9dd63e 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -28,8 +28,10 @@ import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.observers.*; import io.reactivex.rxjava3.internal.operators.flowable.*; +import io.reactivex.rxjava3.internal.operators.maybe.MaybeToObservable; import io.reactivex.rxjava3.internal.operators.mixed.*; import io.reactivex.rxjava3.internal.operators.observable.*; +import io.reactivex.rxjava3.internal.operators.single.SingleToObservable; import io.reactivex.rxjava3.internal.util.*; import io.reactivex.rxjava3.observables.*; import io.reactivex.rxjava3.observers.*; @@ -1734,6 +1736,36 @@ public static Observable error(@NonNull Throwable throwable) { return error(Functions.justSupplier(throwable)); } + /** + * Returns an {@code Observable} instance that runs the given {@link Action} for each subscriber and + * emits either its exception or simply completes. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Action} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Observer#onError(Throwable)}, + * except when the downstream has canceled the resulting {@code Observable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param the target type + * @param action the {@code Action} to run for each subscriber + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code action} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromAction(@NonNull Action action) { + Objects.requireNonNull(action, "action is null"); + return RxJavaPlugins.onAssembly(new ObservableFromAction<>(action)); + } + /** * Converts an array into an {@link ObservableSource} that emits the items in the array. *

@@ -1804,6 +1836,27 @@ public static Observable fromCallable(@NonNull Callable call return RxJavaPlugins.onAssembly(new ObservableFromCallable<>(callable)); } + /** + * Wraps a {@link CompletableSource} into an {@code Observable}. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the target type + * @param completableSource the {@code CompletableSource} to convert from + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code completableSource} is {@code null} + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromCompletable(@NonNull CompletableSource completableSource) { + Objects.requireNonNull(completableSource, "completableSource is null"); + return RxJavaPlugins.onAssembly(new ObservableFromCompletable<>(completableSource)); + } + /** * Converts a {@link Future} into an {@code Observable}. *

@@ -1912,6 +1965,30 @@ public static Observable fromIterable(@NonNull Iterable<@NonNull ? extend return RxJavaPlugins.onAssembly(new ObservableFromIterable<>(source)); } + /** + * Returns an {@code Observable} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and + * emits {@code onSuccess} as a single item or forwards any {@code onComplete} or + * {@code onError} signal. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element + * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null} + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code maybe} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromMaybe(@NonNull MaybeSource maybe) { + Objects.requireNonNull(maybe, "maybe is null"); + return RxJavaPlugins.onAssembly(new MaybeToObservable<>(maybe)); + } + /** * Converts an arbitrary Reactive Streams {@link Publisher} into an {@code Observable}. *

@@ -1949,6 +2026,59 @@ public static Observable fromPublisher(@NonNull Publisher<@NonNull ? exte return RxJavaPlugins.onAssembly(new ObservableFromPublisher<>(publisher)); } + /** + * Returns an {@code Observable} instance that runs the given {@link Runnable} for each observer and + * emits either its exception or simply completes. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@code Runnable} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Observer#onError(Throwable)}, + * except when the downstream has canceled the resulting {@code Observable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.rxjava3.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param the target type + * @param run the {@code Runnable} to run for each observer + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code run} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromRunnable(@NonNull Runnable run) { + Objects.requireNonNull(run, "run is null"); + return RxJavaPlugins.onAssembly(new ObservableFromRunnable<>(run)); + } + + /** + * Returns an {@code Observable} instance that when subscribed to, subscribes to the {@link SingleSource} instance and + * emits {@code onSuccess} as a single item or forwards the {@code onError} signal. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code SingleSource} element + * @param source the {@code SingleSource} instance to subscribe to, not {@code null} + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code source} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromSingle(@NonNull SingleSource source) { + Objects.requireNonNull(source, "source is null"); + return RxJavaPlugins.onAssembly(new SingleToObservable<>(source)); + } + /** * Returns an {@code Observable} that, when an observer subscribes to it, invokes a supplier function you specify and then * emits the value returned from that function. diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 3bfcd78a9c..a3b536b1f7 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -731,6 +731,56 @@ public static Single error(@NonNull Throwable throwable) { return toSingle(Flowable.fromFuture(future, timeout, unit)); } + /** + * Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and + * emits {@code onSuccess} as a single item, turns an {@code onComplete} into {@link NoSuchElementException} error signal or + * forwards the {@code onError} signal. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element + * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null} + * @return the new {@code Single} instance + * @throws NullPointerException if {@code maybe} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Single fromMaybe(@NonNull MaybeSource maybe) { + Objects.requireNonNull(maybe, "maybe is null"); + return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, null)); + } + + /** + * Returns a {@code Single} instance that when subscribed to, subscribes to the {@link MaybeSource} instance and + * emits {@code onSuccess} as a single item, emits the {@code defaultItem} for an {@code onComplete} signal or + * forwards the {@code onError} signal. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@code MaybeSource} element + * @param maybe the {@code MaybeSource} instance to subscribe to, not {@code null} + * @param defaultItem the item to signal if the current {@code MaybeSource} is empty + * @return the new {@code Single} instance + * @throws NullPointerException if {@code maybe} or {@code defaultItem} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Single fromMaybe(@NonNull MaybeSource maybe, @NonNull T defaultItem) { + Objects.requireNonNull(maybe, "maybe is null"); + Objects.requireNonNull(defaultItem, "defaultItem is null"); + return RxJavaPlugins.onAssembly(new MaybeToSingle<>(maybe, defaultItem)); + } + /** * Wraps a specific {@link Publisher} into a {@code Single} and signals its single element or error. *

diff --git a/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java b/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java new file mode 100644 index 0000000000..071db9fff8 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/fuseable/AbstractEmptyQueueFuseable.java @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.fuseable; + +import io.reactivex.rxjava3.annotations.NonNull; + +/** + * Represents an empty, async-only {@link QueueFuseable} instance. + * + * @param the output value type + * @since 3.0.0 + */ +public abstract class AbstractEmptyQueueFuseable +implements QueueSubscription, QueueDisposable { + + @Override + public final int requestFusion(int mode) { + return mode & ASYNC; + } + + @Override + public final boolean offer(@NonNull T value) { + throw new UnsupportedOperationException("Should not be called!"); + } + + @Override + public final boolean offer(@NonNull T v1, @NonNull T v2) { + throw new UnsupportedOperationException("Should not be called!"); + } + + @Override + public final T poll() throws Throwable { + return null; // always empty + } + + @Override + public final boolean isEmpty() { + return true; // always empty + } + + @Override + public final void clear() { + // always empty + } + + @Override + public final void request(long n) { + // no items to request + } + + @Override + public void cancel() { + // default No-op + } + + @Override + public void dispose() { + // default No-op + } + + @Override + public boolean isDisposed() { + return false; + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java b/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java new file mode 100644 index 0000000000..8f9e12d3cb --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseable.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.fuseable; + +/** + * Represents an empty, async-only {@link QueueFuseable} instance that tracks and exposes a + * canceled/disposed state. + * + * @param the output value type + * @since 3.0.0 + */ +public final class CancellableQueueFuseable +extends AbstractEmptyQueueFuseable { + + volatile boolean disposed; + + @Override + public void cancel() { + disposed = true; + } + + @Override + public void dispose() { + disposed = true; + } + + @Override + public boolean isDisposed() { + return disposed; + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java deleted file mode 100644 index 2bbc5128d6..0000000000 --- a/src/main/java/io/reactivex/rxjava3/internal/observers/SubscriberCompletableObserver.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.rxjava3.internal.observers; - -import org.reactivestreams.*; - -import io.reactivex.rxjava3.core.CompletableObserver; -import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.internal.disposables.DisposableHelper; - -public final class SubscriberCompletableObserver implements CompletableObserver, Subscription { - final Subscriber subscriber; - - Disposable upstream; - - public SubscriberCompletableObserver(Subscriber subscriber) { - this.subscriber = subscriber; - } - - @Override - public void onComplete() { - subscriber.onComplete(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(this.upstream, d)) { - this.upstream = d; - - subscriber.onSubscribe(this); - } - } - - @Override - public void request(long n) { - // ignored, no values emitted anyway - } - - @Override - public void cancel() { - upstream.dispose(); - } -} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java index 4cd7fb479c..82bec67c92 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java @@ -31,19 +31,21 @@ public CompletableFromAction(Action run) { protected void subscribeActual(CompletableObserver observer) { Disposable d = Disposable.empty(); observer.onSubscribe(d); - try { - run.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + try { + run.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + observer.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } if (!d.isDisposed()) { - observer.onError(e); - } else { - RxJavaPlugins.onError(e); + observer.onComplete(); } - return; - } - if (!d.isDisposed()) { - observer.onComplete(); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java index e94d650310..60332d8db2 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java @@ -30,19 +30,21 @@ public CompletableFromRunnable(Runnable runnable) { protected void subscribeActual(CompletableObserver observer) { Disposable d = Disposable.empty(); observer.onSubscribe(d); - try { - runnable.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + try { + runnable.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + observer.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } if (!d.isDisposed()) { - observer.onError(e); - } else { - RxJavaPlugins.onError(e); + observer.onComplete(); } - return; - } - if (!d.isDisposed()) { - observer.onComplete(); } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java index 9ad0d0a8d8..9a6c0326a1 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFlowable.java @@ -16,7 +16,7 @@ import org.reactivestreams.Subscriber; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.internal.observers.SubscriberCompletableObserver; +import io.reactivex.rxjava3.internal.operators.flowable.FlowableFromCompletable; public final class CompletableToFlowable extends Flowable { @@ -28,7 +28,6 @@ public CompletableToFlowable(CompletableSource source) { @Override protected void subscribeActual(Subscriber s) { - SubscriberCompletableObserver os = new SubscriberCompletableObserver<>(s); - source.subscribe(os); + source.subscribe(new FlowableFromCompletable.FromCompletableObserver<>(s)); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java index 095a975dbf..84eea97c8d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservable.java @@ -14,9 +14,7 @@ package io.reactivex.rxjava3.internal.operators.completable; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.observers.BasicQueueDisposable; +import io.reactivex.rxjava3.internal.operators.observable.ObservableFromCompletable; /** * Wraps a Completable and exposes it as an Observable. @@ -33,66 +31,6 @@ public CompletableToObservable(CompletableSource source) { @Override protected void subscribeActual(Observer observer) { - source.subscribe(new ObserverCompletableObserver(observer)); - } - - static final class ObserverCompletableObserver extends BasicQueueDisposable - implements CompletableObserver { - - final Observer observer; - - Disposable upstream; - - ObserverCompletableObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onComplete() { - observer.onComplete(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(upstream, d)) { - this.upstream = d; - observer.onSubscribe(this); - } - } - - @Override - public int requestFusion(int mode) { - return mode & ASYNC; - } - - @Override - public Void poll() { - return null; // always empty - } - - @Override - public boolean isEmpty() { - return true; - } - - @Override - public void clear() { - // always empty - } - - @Override - public void dispose() { - upstream.dispose(); - } - - @Override - public boolean isDisposed() { - return upstream.isDisposed(); - } + source.subscribe(new ObservableFromCompletable.FromCompletableObserver<>(observer)); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java new file mode 100644 index 0000000000..a474b03280 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableElementAtMaybePublisher.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.internal.operators.flowable.FlowableElementAtMaybe.ElementAtSubscriber; + +/** + * Emits the indexth element from a Publisher as a Maybe. + * + * @param the element type of the source + * @since 3.0.0 + */ +public final class FlowableElementAtMaybePublisher extends Maybe { + + final Publisher source; + + final long index; + + public FlowableElementAtMaybePublisher(Publisher source, long index) { + this.source = source; + this.index = index; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new ElementAtSubscriber<>(observer, index)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java new file mode 100644 index 0000000000..2d683e7601 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromAction.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Executes an {@link Action} and signals its exception or completes normally. + * + * @param the value type + * @since 3.0.0 + */ +public final class FlowableFromAction extends Flowable implements Supplier { + + final Action action; + + public FlowableFromAction(Action action) { + this.action = action; + } + + @Override + protected void subscribeActual(Subscriber subscriber) { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + subscriber.onSubscribe(qs); + + if (!qs.isDisposed()) { + + try { + action.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!qs.isDisposed()) { + subscriber.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!qs.isDisposed()) { + subscriber.onComplete(); + } + } + } + + @Override + public T get() throws Throwable { + action.run(); + return null; // considered as onComplete() + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java new file mode 100644 index 0000000000..78b37325ca --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletable.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.fuseable.*; + +/** + * Wrap a Completable into a Flowable. + * + * @param the value type + * @since 3.0.0 + */ +public final class FlowableFromCompletable extends Flowable implements HasUpstreamCompletableSource { + + final CompletableSource source; + + public FlowableFromCompletable(CompletableSource source) { + this.source = source; + } + + @Override + public CompletableSource source() { + return source; + } + + @Override + protected void subscribeActual(Subscriber observer) { + source.subscribe(new FromCompletableObserver(observer)); + } + + public static final class FromCompletableObserver + extends AbstractEmptyQueueFuseable + implements CompletableObserver { + + final Subscriber downstream; + + Disposable upstream; + + public FromCompletableObserver(Subscriber downstream) { + this.downstream = downstream; + } + + @Override + public void cancel() { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onComplete() { + upstream = DisposableHelper.DISPOSED; + downstream.onComplete(); + } + + @Override + public void onError(Throwable e) { + upstream = DisposableHelper.DISPOSED; + downstream.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java index 105c6b6ee2..459228f549 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java @@ -18,9 +18,10 @@ import io.reactivex.rxjava3.disposables.Disposable; public final class FlowableFromObservable extends Flowable { - private final Observable upstream; - public FlowableFromObservable(Observable upstream) { + private final ObservableSource upstream; + + public FlowableFromObservable(ObservableSource upstream) { this.upstream = upstream; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java new file mode 100644 index 0000000000..b163f1b7ee --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnable.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Supplier; +import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Executes an {@link Runnable} and signals its exception or completes normally. + * + * @param the value type + * @since 3.0.0 + */ +public final class FlowableFromRunnable extends Flowable implements Supplier { + + final Runnable run; + + public FlowableFromRunnable(Runnable run) { + this.run = run; + } + + @Override + protected void subscribeActual(Subscriber subscriber) { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + subscriber.onSubscribe(qs); + + if (!qs.isDisposed()) { + + try { + run.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!qs.isDisposed()) { + subscriber.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!qs.isDisposed()) { + subscriber.onComplete(); + } + } + } + + @Override + public T get() throws Throwable { + run.run(); + return null; // considered as onComplete() + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java index bdbc2ec03b..5cb73e821c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCompletable.java @@ -19,7 +19,7 @@ import io.reactivex.rxjava3.internal.fuseable.HasUpstreamCompletableSource; /** - * Wrap a Single into a Maybe. + * Wrap a Completable into a Maybe. * * @param the value type */ diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java index 3199521639..807b9cac51 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToSingle.java @@ -22,7 +22,7 @@ /** * Wraps a MaybeSource and exposes its onSuccess and onError signals and signals - * NoSuchElementException for onComplete. + * NoSuchElementException for onComplete if {@code defaultValue} is null. * * @param the value type */ diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java new file mode 100644 index 0000000000..cccb5b5d38 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromAction.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Executes an {@link Action} and signals its exception or completes normally. + * + * @param the value type + * @since 3.0.0 + */ +public final class ObservableFromAction extends Observable implements Supplier { + + final Action action; + + public ObservableFromAction(Action action) { + this.action = action; + } + + @Override + protected void subscribeActual(Observer observer) { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + observer.onSubscribe(qs); + + if (!qs.isDisposed()) { + + try { + action.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!qs.isDisposed()) { + observer.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!qs.isDisposed()) { + observer.onComplete(); + } + } + } + + @Override + public T get() throws Throwable { + action.run(); + return null; // considered as onComplete() + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java new file mode 100644 index 0000000000..4748e6a625 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.fuseable.*; + +/** + * Wrap a Completable into an Observable. + * + * @param the value type + * @since 3.0.0 + */ +public final class ObservableFromCompletable extends Observable implements HasUpstreamCompletableSource { + + final CompletableSource source; + + public ObservableFromCompletable(CompletableSource source) { + this.source = source; + } + + @Override + public CompletableSource source() { + return source; + } + + @Override + protected void subscribeActual(Observer observer) { + source.subscribe(new FromCompletableObserver(observer)); + } + + public static final class FromCompletableObserver + extends AbstractEmptyQueueFuseable + implements CompletableObserver { + + final Observer downstream; + + Disposable upstream; + + public FromCompletableObserver(Observer downstream) { + this.downstream = downstream; + } + + @Override + public void dispose() { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onComplete() { + upstream = DisposableHelper.DISPOSED; + downstream.onComplete(); + } + + @Override + public void onError(Throwable e) { + upstream = DisposableHelper.DISPOSED; + downstream.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java new file mode 100644 index 0000000000..0dc37a8993 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnable.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Supplier; +import io.reactivex.rxjava3.internal.fuseable.CancellableQueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Executes an {@link Runnable} and signals its exception or completes normally. + * + * @param the value type + * @since 3.0.0 + */ +public final class ObservableFromRunnable extends Observable implements Supplier { + + final Runnable run; + + public ObservableFromRunnable(Runnable run) { + this.run = run; + } + + @Override + protected void subscribeActual(Observer observer) { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + observer.onSubscribe(qs); + + if (!qs.isDisposed()) { + + try { + run.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!qs.isDisposed()) { + observer.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!qs.isDisposed()) { + observer.onComplete(); + } + } + } + + @Override + public T get() throws Throwable { + run.run(); + return null; // considered as onComplete() + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java new file mode 100644 index 0000000000..ed4718fd1c --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.fuseable; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class CancellableQueueFuseableTest { + + @Test + public void offer() { + TestHelper.assertNoOffer(new CancellableQueueFuseable<>()); + } + + @Test + public void cancel() { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + + assertFalse(qs.isDisposed()); + + qs.cancel(); + + assertTrue(qs.isDisposed()); + + qs.cancel(); + + assertTrue(qs.isDisposed()); + } + + @Test + public void dispose() { + CancellableQueueFuseable qs = new CancellableQueueFuseable<>(); + + assertFalse(qs.isDisposed()); + + qs.dispose(); + + assertTrue(qs.isDisposed()); + + qs.dispose(); + + assertTrue(qs.isDisposed()); + } + + @Test + public void cancel2() { + AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() {}; + + assertFalse(qs.isDisposed()); + + qs.cancel(); + } + + @Test + public void dispose2() { + AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() {}; + + assertFalse(qs.isDisposed()); + + qs.dispose(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAndThenCompletableabTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAndThenCompletableabTest.java index a98f3b5abb..051dd4e364 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAndThenCompletableabTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAndThenCompletableabTest.java @@ -102,7 +102,7 @@ public void run() { .andThen(Completable.complete()) .test(true) .assertEmpty(); - assertEquals(1, completableRunCount.get()); + assertEquals(0, completableRunCount.get()); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromActionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromActionTest.java index 56354fc448..1f4df16f44 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromActionTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromActionTest.java @@ -14,6 +14,7 @@ package io.reactivex.rxjava3.internal.operators.completable; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; import java.util.concurrent.atomic.AtomicInteger; @@ -108,7 +109,7 @@ public void run() throws Exception { .test(true) .assertEmpty(); - assertEquals(1, calls.get()); + assertEquals(0, calls.get()); } @Test @@ -124,6 +125,17 @@ public void run() throws Exception { .test(true) .assertEmpty(); - assertEquals(1, calls.get()); + assertEquals(0, calls.get()); + } + + @Test + public void disposedUpfront() throws Throwable { + Action run = mock(Action.class); + + Completable.fromAction(run) + .test(true) + .assertEmpty(); + + verify(run, never()).run(); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnableTest.java index 5d6473077f..2559f09187 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnableTest.java @@ -14,6 +14,7 @@ package io.reactivex.rxjava3.internal.operators.completable; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; import java.util.concurrent.atomic.AtomicInteger; @@ -107,7 +108,7 @@ public void run() { .test(true) .assertEmpty(); - assertEquals(1, calls.get()); + assertEquals(0, calls.get()); } @Test @@ -123,6 +124,17 @@ public void run() { .test(true) .assertEmpty(); - assertEquals(1, calls.get()); + assertEquals(0, calls.get()); + } + + @Test + public void disposedUpfront() throws Throwable { + Runnable run = mock(Runnable.class); + + Completable.fromRunnable(run) + .test(true) + .assertEmpty(); + + verify(run, never()).run(); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservableTest.java index f1bbda0353..96404e8167 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToObservableTest.java @@ -13,16 +13,10 @@ package io.reactivex.rxjava3.internal.operators.completable; -import static org.junit.Assert.*; - import org.junit.Test; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.functions.Function; -import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; -import io.reactivex.rxjava3.internal.operators.completable.CompletableToObservable.ObserverCompletableObserver; -import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.testsupport.TestHelper; public class CompletableToObservableTest extends RxJavaTest { @@ -37,36 +31,4 @@ public Observable apply(Completable c) throws Exception { }); } - @Test - public void fusion() throws Exception { - TestObserver to = new TestObserver<>(); - - ObserverCompletableObserver co = new ObserverCompletableObserver(to); - - Disposable d = Disposable.empty(); - - co.onSubscribe(d); - - assertEquals(QueueFuseable.NONE, co.requestFusion(QueueFuseable.SYNC)); - - assertEquals(QueueFuseable.ASYNC, co.requestFusion(QueueFuseable.ASYNC)); - - assertEquals(QueueFuseable.ASYNC, co.requestFusion(QueueFuseable.ANY)); - - assertTrue(co.isEmpty()); - - assertNull(co.poll()); - - co.clear(); - - assertFalse(co.isDisposed()); - - co.dispose(); - - assertTrue(d.isDisposed()); - - assertTrue(co.isDisposed()); - - TestHelper.assertNoOffer(co); - } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromActionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromActionTest.java new file mode 100644 index 0000000000..b19a03159c --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromActionTest.java @@ -0,0 +1,214 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class FlowableFromActionTest extends RxJavaTest { + @Test + public void fromAction() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromActionTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Action run = new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }; + + Flowable.fromAction(run) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Flowable.fromAction(run) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromActionInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable source = Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromActionThrows() { + Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Throwable { + final int[] counter = { 0 }; + + Flowable m = Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Supplier); + + assertNull(((Supplier)m).get()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestSubscriber ts = Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + ts.cancel(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Action run = mock(Action.class); + + Flowable.fromAction(run) + .test(1L, true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestSubscriber ts = new TestSubscriber<>(); + + Flowable.fromAction(new Action() { + @Override + public void run() throws Exception { + ts.cancel(); + } + }) + .subscribeWith(ts) + .assertEmpty(); + + assertTrue(ts.isCancelled()); + } + + @Test + public void asyncFused() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Action action = mock(Action.class); + + Flowable.fromAction(action) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Action action = mock(Action.class); + + Flowable.fromAction(action) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletableTest.java new file mode 100644 index 0000000000..47c0b815fb --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCompletableTest.java @@ -0,0 +1,196 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class FlowableFromCompletableTest extends RxJavaTest { + @Test + public void fromCompletable() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + })) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromCompletableTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Action run = new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }; + + Flowable.fromCompletable(Completable.fromAction(run)) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Flowable.fromCompletable(Completable.fromAction(run)) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromCompletableInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable source = Flowable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + })); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromCompletableThrows() { + Flowable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + throw new UnsupportedOperationException(); + } + })) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestSubscriber ts = Flowable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + } + })) + .subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + ts.cancel(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Action run = mock(Action.class); + + Flowable.fromCompletable(Completable.fromAction(run)) + .test(1L, true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestSubscriber ts = new TestSubscriber<>(); + + Flowable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + ts.cancel(); + } + })) + .subscribeWith(ts) + .assertEmpty(); + + assertTrue(ts.isCancelled()); + } + + @Test + public void asyncFused() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Action action = mock(Action.class); + + Flowable.fromCompletable(Completable.fromAction(action)) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Action action = mock(Action.class); + + Flowable.fromCompletable(Completable.fromAction(action)) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromMaybeTest.java new file mode 100644 index 0000000000..5eca34f9cb --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromMaybeTest.java @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.TestSubscriberEx; + +public class FlowableFromMaybeTest extends RxJavaTest { + + @Test + public void success() { + Flowable.fromMaybe(Maybe.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Flowable.fromMaybe(Maybe.empty().hide()) + .test() + .assertResult(); + } + + @Test + public void error() { + Flowable.fromMaybe(Maybe.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelComposes() { + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = Flowable.fromMaybe(ms) + .test(); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(ms.hasObservers()); + } + + @Test + public void asyncFusion() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Flowable.fromMaybe(Maybe.just(1)) + .subscribe(ts); + + ts + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void syncFusionRejected() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Flowable.fromMaybe(Maybe.just(1)) + .subscribe(ts); + + ts + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservableTest.java index 5689eadf49..0ef64d192c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservableTest.java @@ -32,4 +32,14 @@ public void error() { .test() .assertFailure(TestException.class); } + + @Test + public void all() { + for (BackpressureStrategy mode : BackpressureStrategy.values()) { + Flowable.fromObservable(Observable.range(1, 5), mode) + .test() + .withTag("mode: " + mode) + .assertResult(1, 2, 3, 4, 5); + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnableTest.java new file mode 100644 index 0000000000..f133c8fb99 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromRunnableTest.java @@ -0,0 +1,220 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Supplier; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class FlowableFromRunnableTest extends RxJavaTest { + @Test + public void fromRunnable() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromRunnableTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Runnable run = new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }; + + Flowable.fromRunnable(run) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Flowable.fromRunnable(run) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromRunnableInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Flowable source = Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromRunnableThrows() { + Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Throwable { + final int[] counter = { 0 }; + + Flowable m = Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Supplier); + + assertNull(((Supplier)m).get()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestSubscriber ts = Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + cdl1.countDown(); + try { + cdl2.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new TestException(e); + } + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + ts.cancel(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run) + .test(1L, true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestSubscriber ts = new TestSubscriber<>(); + + Flowable.fromRunnable(new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }) + .subscribeWith(ts) + .assertEmpty(); + + assertTrue(ts.isCancelled()); + } + + @Test + public void asyncFused() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Runnable action = mock(Runnable.class); + + Flowable.fromRunnable(action) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Runnable action = mock(Runnable.class); + + Flowable.fromRunnable(action) + .subscribe(ts); + + ts.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSingleTest.java new file mode 100644 index 0000000000..01f6b1661f --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSingleTest.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.TestSubscriberEx; + +public class FlowableFromSingleTest extends RxJavaTest { + + @Test + public void success() { + Flowable.fromSingle(Single.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Flowable.fromSingle(Single.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelComposes() { + SingleSubject ms = SingleSubject.create(); + + TestSubscriber ts = Flowable.fromSingle(ms) + .test(); + + ts.assertEmpty(); + + assertTrue(ms.hasObservers()); + + ts.cancel(); + + assertFalse(ms.hasObservers()); + } + + @Test + public void asyncFusion() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Flowable.fromSingle(Single.just(1)) + .subscribe(ts); + + ts + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void syncFusionRejected() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Flowable.fromSingle(Single.just(1)) + .subscribe(ts); + + ts + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromObservableTest.java new file mode 100644 index 0000000000..de2039a4d0 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromObservableTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class MaybeFromObservableTest extends RxJavaTest { + + @Test + public void empty() { + Maybe.fromObservable(Observable.empty().hide()) + .test() + .assertResult(); + } + + @Test + public void just() { + Maybe.fromObservable(Observable.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void range() { + Maybe.fromObservable(Observable.range(1, 5).hide()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Maybe.fromObservable(Observable.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromPubisherTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromPubisherTest.java new file mode 100644 index 0000000000..bcbef3dc9f --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromPubisherTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class MaybeFromPubisherTest extends RxJavaTest { + + @Test + public void empty() { + Maybe.fromPublisher(Flowable.empty().hide()) + .test() + .assertResult(); + } + + @Test + public void just() { + Maybe.fromPublisher(Flowable.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void range() { + Maybe.fromPublisher(Flowable.range(1, 5).hide()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Maybe.fromPublisher(Flowable.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromActionTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromActionTest.java new file mode 100644 index 0000000000..2b6d80896a --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromActionTest.java @@ -0,0 +1,214 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.testsupport.*; + +public class ObservableFromActionTest extends RxJavaTest { + @Test + public void fromAction() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromActionTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Action run = new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }; + + Observable.fromAction(run) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Observable.fromAction(run) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromActionInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable source = Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromActionThrows() { + Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Throwable { + final int[] counter = { 0 }; + + Observable m = Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Supplier); + + assertNull(((Supplier)m).get()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.dispose(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Action run = mock(Action.class); + + Observable.fromAction(run) + .test(true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestObserver to = new TestObserver<>(); + + Observable.fromAction(new Action() { + @Override + public void run() throws Exception { + to.dispose(); + } + }) + .subscribeWith(to) + .assertEmpty(); + + assertTrue(to.isDisposed()); + } + + @Test + public void asyncFused() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ASYNC); + + Action action = mock(Action.class); + + Observable.fromAction(action) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Action action = mock(Action.class); + + Observable.fromAction(action) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletableTest.java new file mode 100644 index 0000000000..6cdc0108d6 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletableTest.java @@ -0,0 +1,195 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.testsupport.*; + +public class ObservableFromCompletableTest extends RxJavaTest { + @Test + public void fromCompletable() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + })) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromCompletableTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Action run = new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + }; + + Observable.fromCompletable(Completable.fromAction(run)) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Observable.fromCompletable(Completable.fromAction(run)) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromCompletableInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable source = Observable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + atomicInteger.incrementAndGet(); + } + })); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromCompletableThrows() { + Observable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + throw new UnsupportedOperationException(); + } + })) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Observable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + } + })).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.dispose(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Action run = mock(Action.class); + + Observable.fromCompletable(Completable.fromAction(run)) + .test(true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestObserver to = new TestObserver<>(); + + Observable.fromCompletable(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + to.dispose(); + } + })) + .subscribeWith(to) + .assertEmpty(); + + assertTrue(to.isDisposed()); + } + + @Test + public void asyncFused() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ASYNC); + + Action action = mock(Action.class); + + Observable.fromCompletable(Completable.fromAction(action)) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Action action = mock(Action.class); + + Observable.fromCompletable(Completable.fromAction(action)) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromMaybeTest.java new file mode 100644 index 0000000000..185b363d57 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromMaybeTest.java @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestObserverEx; + +public class ObservableFromMaybeTest extends RxJavaTest { + + @Test + public void success() { + Observable.fromMaybe(Maybe.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Observable.fromMaybe(Maybe.empty().hide()) + .test() + .assertResult(); + } + + @Test + public void error() { + Observable.fromMaybe(Maybe.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelComposes() { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = Observable.fromMaybe(ms) + .test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + + to.dispose(); + + assertFalse(ms.hasObservers()); + } + + @Test + public void asyncFusion() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ASYNC); + + Observable.fromMaybe(Maybe.just(1)) + .subscribe(to); + + to + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void syncFusionRejected() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Observable.fromMaybe(Maybe.just(1)) + .subscribe(to); + + to + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnableTest.java new file mode 100644 index 0000000000..834abbd978 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromRunnableTest.java @@ -0,0 +1,220 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Supplier; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.testsupport.*; + +public class ObservableFromRunnableTest extends RxJavaTest { + @Test + public void fromRunnable() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable.fromRunnable(new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromRunnableTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Runnable run = new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }; + + Observable.fromRunnable(run) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Observable.fromRunnable(run) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromRunnableInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Observable source = Observable.fromRunnable(new Runnable() { + @Override + public void run() { + atomicInteger.incrementAndGet(); + } + }); + + assertEquals(0, atomicInteger.get()); + + source + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromRunnableThrows() { + Observable.fromRunnable(new Runnable() { + @Override + public void run() { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Throwable { + final int[] counter = { 0 }; + + Observable m = Observable.fromRunnable(new Runnable() { + @Override + public void run() { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Supplier); + + assertNull(((Supplier)m).get()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Observable.fromRunnable(new Runnable() { + @Override + public void run() { + cdl1.countDown(); + try { + cdl2.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new TestException(e); + } + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.dispose(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposedUpfront() throws Throwable { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run) + .test(true) + .assertEmpty(); + + verify(run, never()).run(); + } + + @Test + public void cancelWhileRunning() { + final TestObserver to = new TestObserver<>(); + + Observable.fromRunnable(new Runnable() { + @Override + public void run() { + to.dispose(); + } + }) + .subscribeWith(to) + .assertEmpty(); + + assertTrue(to.isDisposed()); + } + + @Test + public void asyncFused() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ASYNC); + + Runnable action = mock(Runnable.class); + + Observable.fromRunnable(action) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + + verify(action).run(); + } + + @Test + public void syncFusedRejected() throws Throwable { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Runnable action = mock(Runnable.class); + + Observable.fromRunnable(action) + .subscribe(to); + + to.assertFusionMode(QueueFuseable.NONE) + .assertResult(); + + verify(action).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromSingleTest.java new file mode 100644 index 0000000000..d77498cea8 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromSingleTest.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestObserverEx; + +public class ObservableFromSingleTest extends RxJavaTest { + + @Test + public void success() { + Observable.fromSingle(Single.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Observable.fromSingle(Single.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelComposes() { + SingleSubject ms = SingleSubject.create(); + + TestObserver to = Observable.fromSingle(ms) + .test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + + to.dispose(); + + assertFalse(ms.hasObservers()); + } + + @Test + public void asyncFusion() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ASYNC); + + Observable.fromSingle(Single.just(1)) + .subscribe(to); + + to + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void syncFusionRejected() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Observable.fromSingle(Single.just(1)) + .subscribe(to); + + to + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromMaybeTest.java new file mode 100644 index 0000000000..50eed426a7 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromMaybeTest.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.*; + +import java.util.NoSuchElementException; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.MaybeSubject; + +public class SingleFromMaybeTest extends RxJavaTest { + + @Test + public void success() { + Single.fromMaybe(Maybe.just(1).hide()) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Single.fromMaybe(Maybe.empty().hide()) + .test() + .assertFailure(NoSuchElementException.class); + } + + @Test + public void emptyDefault() { + Single.fromMaybe(Maybe.empty().hide(), 1) + .test() + .assertResult(1); + } + + @Test + public void error() { + Single.fromMaybe(Maybe.error(new TestException()).hide()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelComposes() { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = Single.fromMaybe(ms) + .test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + + to.dispose(); + + assertFalse(ms.hasObservers()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java index d99202adef..60b5a11fb6 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java +++ b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java @@ -83,6 +83,7 @@ public static void main(String[] args) throws IOException { Map notesMap = new HashMap<>(); List notesList = new ArrayList<>(); List tbdList = new ArrayList<>(); + int[] counters = new int[CLASSES.length]; for (String operatorName : sortedOperators) { out.print("`"); out.print(operatorName); out.print("`|"); + int m = 0; for (Class clazz : CLASSES) { if (operatorMap.get(clazz).contains(operatorName)) { out.print(PRESENT); + counters[m]++; } else { String notes = findNotes(clazz.getSimpleName(), operatorName); if (notes != null) { @@ -116,9 +119,19 @@ public static void main(String[] args) throws IOException { } } out.print("|"); + m++; } out.println(); } + out.print("**"); + out.print(sortedOperators.size()); + out.print(" operators** |"); + for (int m = 0; m < counters.length; m++) { + out.print(" **"); + out.print(counters[m]); + out.print("** |"); + } + out.println(); if (!notesList.isEmpty()) { out.println(); @@ -316,6 +329,7 @@ static String findNotes(String clazzName, String operatorName) { " C flattenStreamAsObservable Always empty thus no items to map.", " MSC forEach Use [`subscribe()`](#subscribe).", " MSC forEachWhile Use [`subscribe()`](#subscribe).", + " S fromAction Never empty.", " M fromArray At most one item. Use [`just()`](#just) or [`empty()`](#empty).", " S fromArray Always one item. Use [`just()`](#just).", " C fromArray Always empty. Use [`complete()`](#complete).", @@ -328,6 +342,7 @@ static String findNotes(String clazzName, String operatorName) { " O fromObservable Use [`wrap()`](#wrap).", " S fromOptional Always one item. Use [`just()`](#just).", " C fromOptional Always empty. Use [`complete()`](#complete).", + " S fromRunnable Never empty.", " S fromSingle Use [`wrap()`](#wrap).", " M fromStream At most one item. Use [`just()`](#just) or [`empty()`](#empty).", " S fromStream Always one item. Use [`just()`](#just).", diff --git a/src/test/java/io/reactivex/rxjava3/validators/InternalWrongNaming.java b/src/test/java/io/reactivex/rxjava3/validators/InternalWrongNaming.java index 14d937502f..a1ad772f05 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/InternalWrongNaming.java +++ b/src/test/java/io/reactivex/rxjava3/validators/InternalWrongNaming.java @@ -62,7 +62,7 @@ static void checkInternalOperatorNaming(String baseClassName, String consumerCla fail.append("java.lang.RuntimeException: " + g.getName() + " mentions " + consumerClassName) .append("\r\n at io.reactivex.internal.operators.") .append(baseClassName.toLowerCase()).append(".").append(g.getName().replace(".java", "")) - .append(" (").append(g.getName()).append(":").append(i + 1).append(")\r\n\r\n"); + .append(".method(").append(g.getName()).append(":").append(i + 1).append(")\r\n\r\n"); count++; } @@ -170,6 +170,9 @@ public void flowableNoObserver() throws Exception { "FlowableCountSingle", "FlowableElementAtMaybe", "FlowableElementAtSingle", + "FlowableElementAtMaybePublisher", + "FlowableElementAtSinglePublisher", + "FlowableFromCompletable", "FlowableSingleSingle", "FlowableSingleMaybe", "FlowableLastMaybe",