Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the switch operator to Observable #259

Merged
merged 5 commits into from
May 16, 2013
Merged

Added the switch operator to Observable #259

merged 5 commits into from
May 16, 2013

Conversation

michaeldejong
Copy link
Contributor

I've implemented the switch operator for Observable sequences (see issue #13). Because "switch" is a keyword in Java I have opted for switchDo. This matches to the finallyDo method which represents the finally operator and has the same naming problem.

Please let me know if you have any feedback.

@cloudbees-pull-request-builder

RxJava-pull-requests #126 SUCCESS
This pull request looks good


@Override
public void onNext(Observable<T> args) {
synchronized (subscription) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to synchronize as we expect the sequence to comply with the Rx contract. This adds performance overhead that we don't want to add.

See Rx Design Guidelines 6.8 for more background on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the guidelines. That does make more sense in terms of performance.

@cloudbees-pull-request-builder

RxJava-pull-requests #128 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Since the onError discussion is now part of an outdated diff ... moving it here:


I believe it needs to be this:

the error should be propagated and all future sequences produced by the parent sequence should be ignored (unsubscribed)

It's the same way with merge, zip, etc. If an error occurs on any sequence it immediately kills the entire sequence and passes the error.

I could see an overload of switch existing that could add behavior such as onErrorResumeNext so that the user can choose whether to ignore the error and just switch to the next sequence, or handle it with a default or something else before switching, etc.

We can't just swallow errors though so if an onError is received from a child sequence it must behave as if the parent received an onError and unsubscribe and pass on the error.

@michaeldejong
Copy link
Contributor Author

I've modified the OperationSwitch class to handle errors in subsequences. However I'm not entirely convinced yet that unsubscribing from the parent sequence will always work. Example: If right after subscribing to the parent sequence, an error is thrown in a subsequence I'll try to unsubscribe from the parent sequence, but it's not guaranteed that the parent variable containing the Subscription to the parent sequence has already been set.

Ideas, thoughts?

@cloudbees-pull-request-builder

RxJava-pull-requests #146 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #147 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

That's a good point ... there could be a race condition (very unlikely but possible).

The only way I can think of checking it is keeping a boolean flag that we're supposed to be finished and if we receive more onNext/onError notifications we continue to unsubscribe each time (and not send those events through).

@benjchristensen
Copy link
Member

I'll make that change as I do some other cleanup before 0.9 is released.

benjchristensen added a commit that referenced this pull request May 16, 2013
Added the switch operator to Observable
@benjchristensen benjchristensen merged commit 400bdf4 into ReactiveX:master May 16, 2013
@benjchristensen
Copy link
Member

Thank you @michaeldejong for this submission.

@benjchristensen
Copy link
Member

Actually AtomicObservableSubscription already handles this case in the wrap method:

    public AtomicObservableSubscription wrap(Subscription actualSubscription) {
        if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
            if (this.actualSubscription.get() == UNSUBSCRIBED) {
                actualSubscription.unsubscribe();
                return this;
            }
            throw new IllegalStateException("Can not set subscription more than once.");
        }
        return this;
    }

If the wrap occurs after an unsubscribe occurs it will immediately unsubscribe.

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
* Add response predicate to retry sync and async for enhancement ReactiveX#259

*  ReactiveX#258 add the support to the webflux types in the circuit breaker annotation AOP

*  ReactiveX#258 review comments

*  ReactiveX#258 review comments
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
* Add response predicate to retry sync and async for enhancement ReactiveX#259

* ReactiveX#348 add sync retry spring boot annotation and config support for spring boot 1 and 2

* ReactiveX#348 add sync retry spring boot annotation and config support for spring boot 1 and 2

* ReactiveX#348 adding java doc

* ReactiveX#348 adding java doc

* ReactiveX#348 adding spring override bean option for the retry spring boot starters

* ReactiveX#348 adding spring async retry aspect support

* ReactiveX#348 adding annotation validation protection of using retry and async retry annotation together in the class level

* ReactiveX#348 updating java doc

* ReactiveX#348 adding the new prefix of async retry metrics and fixing the merge conflicts

* ReactiveX#348 covering review comments

* ReactiveX#348 removing unneeded lines

* ReactiveX#348 adding the updated spring boot documentation for the retry spring boot usage for spring boot 1 and 2

* ReactiveX#348 documentation review comments

* ReactiveX#348 documentation review comments and removing health indicators for retry support in spring boot

* ReactiveX#348 documentation review comments
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
* Add response predicate to retry sync and async for enhancement ReactiveX#259

* ReactiveX#317 skipping Java Error type from onError Rx retry transformer

* ReactiveX#317 adding proper comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants