Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Observable.window() operators do not dispose upstream while there is an active window #7048

Closed
arkivanov opened this issue Aug 3, 2020 · 2 comments · Fixed by #7049
Closed
Milestone

Comments

@arkivanov
Copy link

RxJava version: 3.0.5
OS: Android

Hello. Today I discovered an odd behaviour of some of the Observable.window(...) operators. Here are some samples:

Sample 1

        val disposable = Observable
            .create<Int> { emitter ->
                Log.v("MyTest", "Upstream subscribed")
                emitter.setCancellable { Log.v("MyTest", "Upstream disposed") }
                repeat(100) {
                    emitter.onNext(it)
                    Thread.sleep(100L)
                }
            }
            .subscribeOn(Schedulers.io())
            .window(50)
            .subscribe {
                it.subscribe({}, {}, { Log.v("MyTest", "Window completed") })
            }

        Handler().postDelayed(disposable::dispose, 1000L)

The output of the code is:

2020-08-03 17:34:45.193 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed
2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed
2020-08-03 17:34:50.158 11575-11614/com.badoo.reaktive.sample.android V/MyTest: Window completed

If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) then the output is:

2020-08-03 17:35:38.969 11704-11745/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed

Concerns:

  1. In the first case the upstream is disposed after 5 seconds, not after 1 second
  2. In the second case the upstream is never disposed at all

Sample 2
The window(timeSpan, TimeUnit) operator behaves differently.

        val disposable = Observable
            .create<Int> { emitter ->
                Log.v("MyTest", "Upstream subscribed")
                emitter.setCancellable { Log.v("MyTest", "Upstream disposed") }
                repeat(100) {
                    emitter.onNext(it)
                    Thread.sleep(100L)
                }
            }
            .subscribeOn(Schedulers.io())
            .window(5000L, TimeUnit.MILLISECONDS)
            .subscribe {
                it.subscribe({}, {}, { Log.v("MyTest", "Window completed") })
            }

        Handler().postDelayed(disposable::dispose, 1000L)

So the output is:

2020-08-03 17:32:04.738 11310-11361/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed
2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed
2020-08-03 17:32:09.736 11310-11360/com.badoo.reaktive.sample.android V/MyTest: Window completed

If you comment the line it.subscribe({}, {}, { Log.v("MyTest", "Window completed") }) ten the output is:

2020-08-03 17:33:23.535 11427-11480/com.badoo.reaktive.sample.android V/MyTest: Upstream subscribed
2020-08-03 17:33:24.536 11427-11427/com.badoo.reaktive.sample.android V/MyTest: Upstream disposed

Concerns:

  1. In the first case the upstream is disposed after 5 seconds, not after 1 second (same as window(count))
  2. In the second case the upstream is disposed after 1 second (different from window(count))

Questions
Is it by design or a bug? I would expect the upstream to be disposed and all active windows completed once the downstream is disposed. Or at least behaviour to be consistent across all window(...) operators.

@akarnokd
Copy link
Member

akarnokd commented Aug 3, 2020

Dispose after 5 seconds is expected, because you indicate no more windows should be created but you still have an active window consuming items that will run to its completion, then detect there no further windows can be created and thus the upstream should be cancelled.

I have to investigate Sample 1 Case 2 (sized window without subscribing to the windows themselves).

@akarnokd
Copy link
Member

akarnokd commented Aug 3, 2020

Thanks for the feedback. Sample 1 Case 2 is indeed a bug, namely the lack of proper cancellation accounting in window(). I'll post a fix tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants