Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onComplete() is not getting called on the specified Scheduler #5716

Closed
sadegh opened this issue Nov 10, 2017 · 6 comments
Closed

onComplete() is not getting called on the specified Scheduler #5716

sadegh opened this issue Nov 10, 2017 · 6 comments
Labels

Comments

@sadegh
Copy link
Contributor

sadegh commented Nov 10, 2017

when I use Observable.take(time, TimeUnit.SECONDS), if the result Observable stops due to timeout, onComplete() callback gets called on computation thread, not the one that I've specified with observeOn().

Here is a sample code:

Observable.just("one", "two")
        .observeOn(Schedulers.io())
        .take(0, TimeUnit.SECONDS)
        .subscribe(new Observer<String>() {
            @Override public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe(), thread: " + Thread.currentThread().getName());
            }

            @Override public void onNext(String s) {
                System.out.println("onNext(), thread: " + Thread.currentThread().getName());
            }

            @Override public void onError(Throwable e) {
                System.out.println("onError(), thread: " + Thread.currentThread().getName());
            }

            @Override public void onComplete() {
                System.out.println("onComplete, thread: " + Thread.currentThread().getName());
            }
        });

And as the result you'll get onComplete, thread: RxComputationThreadPool-1.

If it's required for all callback methods to run on the specified thread per Reactive specifications, this would be a bug, and it happens on both RxJava 1.x and 2.x

@akarnokd
Copy link
Member

observeOn lasts until the next async operator, in this case take. You can specify the Scheduler where take will time out. Alternatively, you can swap take and observeOn so events will end up on the desired thread.

@sadegh
Copy link
Contributor Author

sadegh commented Nov 10, 2017

Yeah, I looked into the code and figured what's causing that. However I believe this is kinda inconsistent behavior, (and probably wrong), since all onNext() calls are running on the specified scheduler, and all of the sudden onComplete() runs on another thread.

At least we should have this behavior documented. It's not known that take() might change thread for some callbacks! (onError is also possible to run on computation)

@sadegh
Copy link
Contributor Author

sadegh commented Nov 10, 2017

I also looked into the source code and figured why this is happening.
Here is my quick (and probably naive!) fix that I did to just test my idea about the source of this issue.
I added an if to Observable.take()

public final Observable<T> take(long time, TimeUnit unit) {
  if (this instanceof ObservableObserveOn) 
    return takeUntil(timer(time, unit)).observeOn(((ObservableObserveOn) this).scheduler);
  else 
    return takeUntil(timer(time, unit));
}

@akarnokd
Copy link
Member

The operator behaves and expected. If the upstream completes before the time runs out, those events will be delivered on the upstream's thread. If there is a timeout, which is scheduled by a different Scheduler, the terminal event will be delivered on that other thread. If you want to ensure events are delivered on the desired thread, apply observeOn just before that.

If you want, you can create a PR wich add a clause to the relevant operators' Javadoc (in both Observable and Flowable which explicitly states the completion triggered by the timeout may come from a different thread (depending on the Scheduler used) than the items came from.

@sadegh
Copy link
Contributor Author

sadegh commented Nov 11, 2017

Alright, I'll add it to doc.

@akarnokd
Copy link
Member

Closing via #5718 and #5719.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants