Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Observable.window(count, skip) completes windows when there are no observers #7056

Closed
amihusb opened this issue Aug 16, 2020 · 4 comments

Comments

@amihusb
Copy link

amihusb commented Aug 16, 2020

RxJava version: 3.0.5
OS: Android

Hi, in addition to #7048. Here are two failed test cases:

Test case 1:

@Test
public void cancelAfterAbandonmentSize() {
    PublishSubject<Integer> ps = PublishSubject.create();
    AtomicReference<Observable<Integer>> firstWindow = new AtomicReference<>();
    TestObserver<Observable<Integer>> to = ps.window(3, 1)
            .doOnNext((window) -> {
                if (!firstWindow.compareAndSet(null, window)) {
                    window.subscribe();
                }
            })
            .test();

    assertTrue(ps.hasObservers());

    ps.onNext(1);
    ps.onNext(2);

    to.dispose();

    firstWindow.get()
            .test()
            .assertValues(1, 2);
}

This test will fail with following description:
Value count differs; expected: 2 [1, 2] but was: 1 [1] (latch = 0, values = 1, errors = 0, completions = 1)

Test case 2:

@Test
public void cancelAfterAbandonmentSize() {
    PublishSubject<Integer> ps = PublishSubject.create();

    TestObserver<Integer> to = ps.window(3)
            .flatMap((window) -> window.delaySubscription(1, TimeUnit.SECONDS))
            .test();

    ps.onNext(1);
    ps.onNext(2);
    ps.onNext(3);

    to.dispose();

    to.assertValues(1, 2, 3);
}

This test will fail with following description:
Value count differs; expected: 3 [1, 2, 3] but was: 0 [] (latch = 1, values = 0, errors = 0, completions = 0, disposed!)

Question
This is correct behavior?

@amihusb amihusb changed the title 3.x: Observable.window(count, skip) completes windows when there no observers 3.x: Observable.window(count, skip) completes windows when there are no observers Aug 16, 2020
@akarnokd
Copy link
Member

Test case 1 does not subscribe to the first window in time thus the window gets abandoned and completed. This is the expected behavior.

Test case 2 disposes the sequence and thus the abandoned windows will have no opportunity to emit their only item.

@arkivanov
Copy link

@akarnokd There is a notice in the JavaDocs:

Note that ignoring windows or subscribing later (i.e., on another thread) will result in
so-called window abandonment where a window may not contain any elements. In this case, subsequent
elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
a trade-off for ensuring upstream cancellation can happen under some race conditions.

Could you kindly describe in more detail what kind of race conditions? From my point of view we can consider windows "active" once emitted, and care only about disposal of windows. And since they are UnicastSubjects, there can be only one subscription and only one disposal per window. E.g. if downstream is disposed but there is an "active" window, then upstream stays subscribed, unless the last window is (possibly subscribed) and disposed.

I would appreciate your comments here.

@akarnokd
Copy link
Member

there can be only one subscription

Up to one subscription. If such subscription ever happens then the upstream may never know it has to stop sending events. The race can happen in some operators, such as flatMap for example, which hard-cuts its onNext processing upon an asynchronous cancellation, thus the emitted inner window may never see an observer/subscriber.

@arkivanov
Copy link

Thanks for such a good explanation, now it's clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants