Skip to content

Commit

Permalink
Corrected a bug blocking null emissions in CombineLatest (#28)
Browse files Browse the repository at this point in the history
Closes #28
  • Loading branch information
noheltcj authored Feb 5, 2019
1 parent d3625c7 commit 03e070a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 14 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ There are several places requiring imports to utilize this library.

### Common Module
```groovy
implementation "com.noheltcj:rx-common:0.4.1"
implementation "com.noheltcj:rx-common:0.4.2"
```

### JVM Module
```groovy
implementation "com.noheltcj:rx-common-jvm:0.4.1"
implementation "com.noheltcj:rx-common-jvm:0.4.2"
```

### JavaScript Module
```groovy
implementation "com.noheltcj:rx-common-js:0.4.1"
implementation "com.noheltcj:rx-common-js:0.4.2"
```

### Native Module
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
buildscript {
ext {
project_group_id = 'com.noheltcj'
project_version = '0.4.1'
project_version = '0.4.2'
kotlin_version = '1.3.20'
upload_artifacts = System.getenv("uploadArtifacts") == "true"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.noheltcj.rxcommon.exceptions.UndeliverableEmissionException
import com.noheltcj.rxcommon.observers.Observer

open class ColdEmitter<E>(private val doOnDispose: () -> Unit) : Emitter<E> {
private val activeObservers = mutableListOf<Observer<E>>()
protected val activeObservers = mutableListOf<Observer<E>>()

override var isDisposed = false
protected set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.noheltcj.rxcommon.exceptions.UndeliverableEmissionException
import com.noheltcj.rxcommon.observers.Observer

open class HotEmitter<E> : Emitter<E> {
private val activeObservers = mutableListOf<Observer<E>>()
protected val activeObservers = mutableListOf<Observer<E>>()

override var isDisposed = false
protected set
Expand Down Expand Up @@ -34,19 +34,15 @@ open class HotEmitter<E> : Emitter<E> {
if (!isDisposed) {
isTerminated = true
activeObservers.forEach { it.onError(throwable) }
dispose()
isDisposed = true
}
}

override fun complete() {
if (!isDisposed) {
isCompleted = true
activeObservers.forEach { it.onComplete() }
dispose()
isDisposed = true
}
}

private fun dispose() {
isDisposed = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ class CombineLatest<S1, S2, R>(
) : Source<R> {
val emitter: Emitter<R> = ColdEmitter {}

@Suppress("UNCHECKED_CAST")
override fun subscribe(observer: Observer<R>): Disposable {
emitter.addObserver(observer)

var sourceOneEmitted = false
var sourceTwoEmitted = false
var sourceOneLastElement: S1? = null
var sourceTwoLastElement: S2? = null

Expand All @@ -34,8 +37,11 @@ class CombineLatest<S1, S2, R>(
val upstreamOneDisposable = sourceOne.subscribe(
AllObserver(
onNext = {
if (sourceTwoEmitted) {
emitter.next(transform(it, sourceTwoLastElement as S2))
}
sourceOneEmitted = true
sourceOneLastElement = it
sourceTwoLastElement?.run { emitter.next(transform(it, this)) }
},
onError = { emitter.terminate(it) },
onComplete = { onSourceCompleted() }
Expand All @@ -44,8 +50,11 @@ class CombineLatest<S1, S2, R>(
val upstreamTwoDisposable = sourceTwo.subscribe(
AllObserver(
onNext = {
if (sourceOneEmitted) {
emitter.next(transform(sourceOneLastElement as S1, it))
}
sourceTwoEmitted = true
sourceTwoLastElement = it
sourceOneLastElement?.run { emitter.next(transform(this, it)) }
},
onError = { emitter.terminate(it) },
onComplete = { onSourceCompleted() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ class CombineLatestIntegrationTests {
testObserver.assertValue(1 to "1")
}

@Test
@JsName("givenTwoCombinedColdSources_andBothHaveEmittedNull_whenSubscribing_shouldEmit")
fun `given two combined cold sources and both have emitted null, when subscribing, should emit`() {
val nullableCombineTestObserver = TestObserver<Pair<Any?, String?>>()
val combinedObs = Observable<Any?>(just = null).combineLatest(Observable<String?>(just = null))

combinedObs.subscribe(nullableCombineTestObserver)

nullableCombineTestObserver.assertValue(null to null)
}

@Test
@JsName("givenSubscribedToCombinedColdSources_andBothHaveEmitted_whenOneEmitsAgain_shouldEmitTwoValues")
fun `given subscribed to combined cold sources and both have emitted, when one emits again, should emit two values`() {
Expand Down

0 comments on commit 03e070a

Please sign in to comment.