Skip to content

Commit

Permalink
Added OnErrorReturn operator (#24)
Browse files Browse the repository at this point in the history
Closes #24
LGTM @estebanstatesidecr
  • Loading branch information
noheltcj committed Jan 21, 2019
1 parent 212bdcd commit 28d4968
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 22 deletions.
55 changes: 40 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
# RxCommon
A multi-platform (Native, JVM, iOS, macOS, and JS) implementation of ReactiveX.
A multi-platform (Native, JVM, iOS, macOS, and JS) ReactiveX implementation.

## Documentation
Please refer to <https://reactivex.io> for documentation.

While this is currently only a partial implementation, I'm doing my best
to follow the spec as closely as possible.

### Sources
The reactivex documentation covers much of the functionality. If there are any significant discrepancies,
excluding those I've illuminated within this documentation, please post an issue.
excluding those illuminated within this documentation, please post an issue.

* [Observable](<http://reactivex.io/documentation/observable.html>)
* Single - Similar to observable, but will complete when the first value is emitted.
Expand All @@ -28,36 +25,64 @@ Currently supported operators:
* [FlatMap](http://reactivex.io/documentation/operators/flatmap.html)
* SwitchMap (non-interleaving variant of [FlatMap](http://reactivex.io/documentation/operators/flatmap.html))
* [CombineLatest](http://reactivex.io/documentation/operators/combinelatest.html)
* [OnErrorReturn](http://reactivex.io/documentation/operators/catch.html)

### Examples
```kotlin
Single(just = "hello")
.map { "$it world" }
.subscribe(NextObserver { result ->
// result => "hello world"
})

Observable<String>(createWithEmitter = { emitter ->
emitter.next("we're happy")
emitter.next("la la la")
emitter.terminate(Throwable("¯\\_(ツ)_/¯"))
Disposables.create {
// Cleanup
}
}).onErrorReturn { throwable ->
// map error to something useful or forward it down the chain
Single(just = "crashed 'n burning")
}.subscribe(NextTerminalObserver({ emission ->
// emission => we're happy
// emission => la la la
// emission => crashed 'n burning
}, { throwable ->
// No terminal notifications in this example
}))
```

## Installing
There are several places requiring imports to utilize this library.

### Common Module
```groovy
implementation "com.noheltcj:rx-common:0.3.0"
implementation "com.noheltcj:rx-common:0.4.0"
```

### JVM Module
```groovy
implementation "com.noheltcj:rx-common-jvm:0.3.0"
implementation "com.noheltcj:rx-common-jvm:0.4.0"
```

### JavaScript Module
```groovy
implementation "com.noheltcj:rx-common-js:0.3.0"
implementation "com.noheltcj:rx-common-js:0.4.0"
```

### Native Module
Slightly more complicated. See the [Native Distribution Limitation](#native-library-distribution)

## Temporary Limitations
As this is a new project with only one contributor, I haven't had time
to implement many of the things we've come to expect from a complete Rx
implementation.
As this is a new project with only a couple of contributors, we haven't had time
to implement many of the things many have come to expect from a complete Rx
implementation, but open up a pull request to solve any issues and we'll work through it.

### Native Library Distribution
I haven't had time to fully work out distribution via maven central for
the native kotlin library in kotlin/native projects.
Distribution via maven central for the native kotlin library in kotlin/native
projects hasn't been implemented yet, but you can still use this in native projects.

_You can find the pre-built kotlin libraries zipped in the release tag for each
version._
Expand Down Expand Up @@ -105,6 +130,6 @@ There is absolutely no thread safety or scheduling in the library yet,
but it's on the to-do list. In the meantime, it's best to keep any
application state and logic that utilizes this library on one thread.
This doesn't mean you can't still operate on different threads, just
transfer any data back to the designated thread. I personally use the
transfer any data back to a single designated thread. I personally use the
existing platform specific implementations of Rx (RxSwift, RxJava, etc)
to do this.
combined with platform scheduling (ExecutorService, DispatchQueue, etc) to do this.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
buildscript {
ext {
project_group_id = 'com.noheltcj'
project_version = '0.3.0'
kotlin_version = '1.2.70'
project_version = '0.4.0'
kotlin_version = '1.3.11'
upload_artifacts = System.getenv("uploadArtifacts") == "true"
}

Expand Down
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id 'kotlin-platform-common' version '1.2.70'
id 'kotlin-platform-common' version '1.3.11'
}

group = project_group_id
Expand Down
1 change: 1 addition & 0 deletions common/src/main/kotlin/com/noheltcj/rxcommon/Source.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ interface Source<E> {
fun doOnNext(onNext: (E) -> Unit): Source<E> = DoOnEach(this, NextObserver(onNext))
fun doOnComplete(onComplete: () -> Unit): Source<E> = DoOnEach(this, CompleteObserver(onComplete))
fun doOnError(onError: (Throwable) -> Unit): Source<E> = DoOnEach(this, TerminalObserver(onError))
fun onErrorReturn(resolveNewSource: (Throwable) -> Source<E>) = OnErrorReturn(this, resolveNewSource)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.noheltcj.rxcommon.operators

import com.noheltcj.rxcommon.Source
import com.noheltcj.rxcommon.disposables.Disposable
import com.noheltcj.rxcommon.disposables.Disposables
import com.noheltcj.rxcommon.emitters.ColdEmitter
import com.noheltcj.rxcommon.emitters.Emitter
import com.noheltcj.rxcommon.observers.AllObserver
import com.noheltcj.rxcommon.observers.Observer

class OnErrorReturn<U>(
private val upstream: Source<U>,
private val onErrorResolveNewSource: (Throwable) -> Source<U>
) : Operator<U>() {
override val emitter: Emitter<U> = ColdEmitter {}

override fun subscribe(observer: Observer<U>): Disposable {
emitter.addObserver(observer)

val upstreamDisposable = upstream.subscribe(AllObserver(
onNext = { emitter.next(it) },
onError = { upstreamThrowable ->
onErrorResolveNewSource(upstreamThrowable).subscribe(AllObserver(
onNext = { emitter.next(it) },
onError = { emitter.terminate(it) },
onComplete = { emitter.complete() }
))
},
onComplete = { emitter.complete() }
))

return Disposables.create {
emitter.removeObserver(observer)
upstreamDisposable.dispose()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package com.noheltcj.rxcommon.operators

import com.noheltcj.rxcommon.disposables.Disposables
import com.noheltcj.rxcommon.emitters.Emitter
import com.noheltcj.rxcommon.observables.Observable
import com.noheltcj.rxcommon.observables.Single
import com.noheltcj.rxcommon.observers.NextObserver
import com.noheltcj.rxcommon.subjects.PublishSubject
import com.noheltcj.rxcommon.utility.JsName
import com.noheltcj.rxcommon.utility.TestObserver
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals

class OnErrorReturnIntegrationTests {
lateinit var testObserver: TestObserver<String>

@BeforeTest
fun beforeEach() {
testObserver = TestObserver()
}

@Test
@JsName("givenSourceOnError_whenSubscribing_shouldNotEmit")
fun `given source to on error, when subscribing should emit nothing`() {
Observable<String>()
.onErrorReturn { Observable("") }
.subscribe(testObserver)

testObserver.assertNoEmission()
}

@Test
@JsName("givenSourceOnError_whenSourceEmits_shouldEmitOriginalValue")
fun `given source to on error, when the source emits, should emit original value`() {
Observable(just = "ten")
.onErrorReturn { Observable("not ten") }
.subscribe(testObserver)

testObserver.assertValue("ten")
}

@Test
@JsName("givenSourceOnError_whenSourceTerminates_shouldResolveToNewSource")
fun `given source to on error, when the source terminates, should resolve to new source`() {
val expectedThrowable = Throwable("poof")
Observable<String>(error = expectedThrowable)
.onErrorReturn { Observable("not poof") }
.subscribe(testObserver)

testObserver.assertValue("not poof")
}

@Test
@JsName("givenSourceOnError_whenSourceCompletes_shouldNotify")
fun `given source to on error, when the source completes, should notify`() {
Observable<String>(completeOnSubscribe = true)
.onErrorReturn { Single("") }
.subscribe(testObserver)

testObserver.assertComplete()
}

@Test
@JsName("givenDisposableSourceOnError_whenSourceCompletes_shouldNotify")
fun `given disposable source to on error, when the source completes, should notify`() {
val source = PublishSubject<String>()
source.onErrorReturn { Single("") }.subscribe(testObserver)
source.onComplete()

testObserver.assertComplete()
}

@Test
@JsName("givenSourceOnError_whenSourceTerminates_shouldNotEmit")
fun `given source to on error, when the source terminates, should not emit`() {
val source = PublishSubject<String>()
source.onErrorReturn { Single() }.subscribe(testObserver)
source.onError(RuntimeException())

testObserver.assertNoEmission()
testObserver.assertNotTerminated()
}

@Test
@JsName("givenSourceOnError_whenSourceTerminates_shouldResolveNewSourceWithUpstreamThrowable")
fun `given source to on error, when the source terminates, should resolve new source upstream throwable`() {
val expectedThrowable = RuntimeException("should be me")
var capturedUpstreamThrowable: Throwable? = null
val source = PublishSubject<String>()
source.onErrorReturn {
capturedUpstreamThrowable = it
Single()
}.subscribe(testObserver)
source.onError(expectedThrowable)

assertEquals(expectedThrowable, capturedUpstreamThrowable)
}

@Test
@JsName("givenOriginalSourceTerminatedWithOnError_whenResolvedSourceEmits_shouldEmit")
fun `given original source terminated with on error, when resolved source emits, should emit`() {
val source = PublishSubject<String>()
source.onErrorReturn { Single(just = "") }.subscribe(testObserver)
source.onError(RuntimeException())

testObserver.assertValue("")
}

@Test
@JsName("givenOriginalSourceTerminatedWithOnError_whenResolvedSourceTerminates_shouldNotify")
fun `given original source terminated with on error, when resolved source terminates, should notify`() {
val expectedThrowable = RuntimeException("woops")
val source = PublishSubject<String>()
source.onErrorReturn { Single(error = expectedThrowable) }.subscribe(testObserver)
source.onError(RuntimeException())

testObserver.assertTerminated(expectedThrowable)
}

@Test
@JsName("givenOriginalSourceTerminatedWithOnError_whenResolvedSourceCompletes_shouldNotify")
fun `given original source terminated with on error, when resolved source completes, should notify`() {
val source = PublishSubject<String>()
source.onErrorReturn { Observable(completeOnSubscribe = true) }.subscribe(testObserver)
source.onError(RuntimeException())

testObserver.assertComplete()
}

@Test // Test added for posterity's sake
@JsName("givenOriginalSourceEmittedAndFlatMapped_andResolvedSourceTerminatedWithOnError_shouldNotDisposeFlatMapOrOriginalSource")
fun `given original source emitted and flatmapped and resolved source terminated with on error, should not dispose flatmap or original source`() {
lateinit var emitter: Emitter<String>
val flatmapTestObserver = TestObserver<String>()
val source = Observable<String>(createWithEmitter = {
emitter = it
Disposables.empty()
})
source.subscribe(testObserver)
source.flatMap {
Observable<String>(error = RuntimeException())
.onErrorReturn { Single(just = "hmm") }
}.subscribe(flatmapTestObserver)
emitter.next("yup")

testObserver.assertNotDisposed()
flatmapTestObserver.assertNotDisposed()
flatmapTestObserver.assertValue("hmm")
}

@Test
@JsName("givenColdSourceOnError_whenOnlyObserverDisposed_shouldDisposeSource")
fun `given cold source to on error, when the only observer disposed, should dispose source`() {
val source = Observable<String>()
source.onErrorReturn { Single("hi") }
.subscribe(testObserver)
.dispose()

val sourceTestObserver = TestObserver<String>()
source.subscribe(sourceTestObserver)

sourceTestObserver.assertDisposed()
}

@Test
@JsName("givenHotSourceOnError_whenOnlyObserverDisposed_shouldDisposeOperator")
fun `given hot source to on error, when the only observer disposed, should dispose operator`() {
val source = PublishSubject<String>()
source.onErrorReturn { Single("no") }.apply {
subscribe(TestObserver()).dispose()
subscribe(testObserver)
}

testObserver.assertDisposed()
}

@Test
@JsName("givenHotSourceOnError_whenOnlyObserverDisposed_shouldNotDisposeSource")
fun `given hot source to on error, when the only observer disposed, should not dispose source`() {
val source = PublishSubject<String>()
source.onErrorReturn { Single("yes") }.apply {
val emptyObserver = NextObserver<String> {}
subscribe(emptyObserver).dispose()
}

val sourceTestObserver = TestObserver<String>()
source.subscribe(sourceTestObserver)

source.onNext("one")

sourceTestObserver.assertValue("one")
}
}
2 changes: 1 addition & 1 deletion js/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id 'kotlin-platform-js' version '1.2.70'
id 'kotlin-platform-js' version '1.3.11'
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion jvm/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id 'kotlin-platform-jvm' version '1.2.70'
id 'kotlin-platform-jvm' version '1.3.11'
}

dependencies {
Expand Down
4 changes: 2 additions & 2 deletions native/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ buildscript {
}

dependencies {
classpath "org.jetbrains.kotlin:kotlin-native-gradle-plugin:0.8"
classpath "org.jetbrains.kotlin:kotlin-native-gradle-plugin:1.3.11"
}
}

Expand All @@ -14,7 +14,7 @@ apply plugin: 'konan'
konanArtifacts {
library('RxCommon', targets: ['iphone', 'iphone_sim', 'macbook', 'wasm32', 'linux', 'android_arm64', 'android_arm32']) {
enableMultiplatform true
extraOpts '-module_name', ''
extraOpts '-module-name', ''
}

program('tests') {
Expand Down

0 comments on commit 28d4968

Please sign in to comment.