Skip to content

Commit

Permalink
[RibCoroutineWorker] In asWorker(), keep scope alive until lifecycl…
Browse files Browse the repository at this point in the history
…e completion.

This fixes Rx subscriptions using `autoDispose(CoroutineScope)` immediately terminating.

In order to properly support `autoDispose(CoroutineScope)` subscriptions, we must keep the `CoroutineScope` received in `onStart` alive as long as the `WorkerScopeProvider` lifecycle.

`autoDispose` does *not* create a children coroutine: instead it installs a completion handler. Hence, outer scope will not have children to wait for completion and will terminate immediately.
  • Loading branch information
psteiger committed Nov 21, 2023
1 parent 6847098 commit 35b3b06
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,20 @@ internal constructor(
) : Worker {

override fun onStart(lifecycle: WorkerScopeProvider) {
// We can start it undispatched because Worker binder will already call `onStart` in correct
// context,
// but we still want to pass in `coroutineDispatcher` to resume from suspensions in `onStart` in
// We start it undispatched to keep the behavior of immediate binding of Worker when
// WorkerBinder.bind is called.
// We still want to pass in `coroutineContext` to resume from suspensions in `onStart` in
// correct context.
lifecycle.coroutineScope.launch(coroutineContext, start = CoroutineStart.UNDISPATCHED) {
supervisorScope { ribCoroutineWorker.onStart(this) }
lifecycle.coroutineScope.launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
supervisorScope {
ribCoroutineWorker.onStart(this)
// Keep this scope alive until cancelled.
// This is particularly important for cases where we do not launch long-running coroutines
// with scope, but instead install some completion handler that we expect to be called at
// worker
// unbinding. This is the case with Rx subscriptions with 'autoDispose(scope)'
awaitCancellation()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.uber.rib.core

import com.google.common.truth.Truth.assertThat
import com.uber.autodispose.coroutinesinterop.autoDispose
import io.reactivex.subjects.PublishSubject
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CancellationException
Expand Down Expand Up @@ -46,6 +48,7 @@ import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.Rule
import org.junit.Test
import org.mockito.kotlin.mock

private const val ON_START_DELAY_DURATION_MILLIS = 100L
private const val INNER_COROUTINE_DELAY_DURATION_MILLIS = 200L
Expand Down Expand Up @@ -180,6 +183,21 @@ class RibCoroutineWorkerTest {
}
}

@Test
fun asWorker_autoDisposeWithCoroutineScope_lateEmissionIsReceivedBySubscriber() = runTest {
val router = mock<Router<*>>()
val interactor = object : Interactor<Any, Router<*>>() {}
val relay = PublishSubject.create<Unit>()
var gotEmission = false
val worker =
RibCoroutineWorker { relay.autoDispose(this).subscribe { gotEmission = true } }.asWorker()
InteractorHelper.attach(interactor, Any(), router, null)
WorkerBinder.bind(interactor, worker)
runCurrent()
relay.onNext(Unit)
assertThat(gotEmission).isTrue()
}

@Test
fun testHelperFunction() = runTest {
// Sanity - assert initial state.
Expand Down

0 comments on commit 35b3b06

Please sign in to comment.