Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unwrap AtomicObservableSubscription? #195

Closed
mttkay opened this issue Mar 15, 2013 · 5 comments
Closed

Unwrap AtomicObservableSubscription? #195

mttkay opened this issue Mar 15, 2013 · 5 comments

Comments

@mttkay
Copy link
Contributor

mttkay commented Mar 15, 2013

As far as I can tell, any custom subscription that is passed to subscribe gets wrapped in an AtomicObservableSubscription unless trusted is true, a flag which can't be controlled from outside.

Since AOS doesn't have an unwrap method, how can I pass custom subscriptions in a type safe manner?

Rationale is being able to maintain extra state about a subscription in the subscription.

@benjchristensen
Copy link
Member

An Observable by design doesn't expose state so that it can be composed (wrapped) exactly as this is doing. For example, Observable.synchronize() could be used to wrap an Observable before vending it out for use if the Observable was thought to not be thread-safe. This is part of the principles of being functional and monadic.

All state within the Func1 implementation that gets converted into an Observable should be self-contained.

So what is your use case that is causing you to want to reach back into the Observable implementation itself and how are you trying to do that?

@mttkay
Copy link
Contributor Author

mttkay commented Mar 15, 2013

Not sure if it makes a difference, but small correction: we don't want to track state in the Observable, we want to track state in the subscription, which is what the client holds on to.

The reason we want to do this is that on Android we need to deal with component life cycles and need to attach/detach observers to prevent resource leaks, or terminate observable sequence midway through.

I see there is a BooleanSubscription which helps terminating loops by checking the subscription for isUnsubscribed, which is similar to what we want to do. How would you even use this? Anything I get in return from subscribe is wrapped away in an AtomicObservableSubscription.

@benjchristensen
Copy link
Member

The state is always hidden behind the Observable or Subscription interfaces - not leaked out.

Thus, your Func1 implementation (that becomes and Observable) would return an implementation of Subscription that works with your function correctly to signal an unsubscribe. This is important because the sequences of Observables can be composed n-levels and an unsubscribe will be propagated up the sequence and each Observable then does with it what its implementation dictates - but nothing external ever knows the implementation or tries to reach inside it.

Here's an example of how BooleanSubscription is used:
https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy#L110

Observable<VideoList> getListOfLists(userId) {
    return Observable.create({ observer -> 
        BooleanSubscription subscription = new BooleanSubscription();
        try {
            // this will happen on a separate thread as it requires a network call
            executor.execute({
                    // simulate network latency
                    Thread.sleep(180);
                    for(i in 0..15) {
                        if(subscription.isUnsubscribed()) {
                            break;
                        }
                        try {
                            //println("****** emitting list: " + i)
                            observer.onNext(new VideoList(i))
                        }catch(Exception e) {
                            observer.onError(e);
                        }
                    }
                    observer.onCompleted();
            })
        }catch(Exception e) {
            observer.onError(e);
        }
        return subscription;
    })
}

As for multiple observers subscribing/unsubscribing, that is definitely what Multicast/Publish is all about (#15 and #65) as discussed on Twitter: https://twitter.com/mttkay/status/310819293166178304

It's near the top of the todo list but if you need this functionality sooner than I or someone else is getting to it, perhaps you can try tackling them as they are the right way to handle these issues of state.

@mttkay
Copy link
Contributor Author

mttkay commented Mar 15, 2013

Thanks for clarifying! Have to admit that I'm still getting into the whole mindset of functional programming in Java (the lack of supportive language constructs doesn't help), and it's hard to do it in a clean way when introducing something like Rx into a grown code base. But we're getting there. No rush on the implementation of Multicast, but sure, I'll give it a shot when you guys are too busy!

@mttkay mttkay closed this as completed Mar 15, 2013
@benjchristensen
Copy link
Member

Yes, it took myself and my teammates a month or two to adapt our thinking to it and several times we tried breaking the model before we starting thinking functionally. Also the lack of lambdas/closures in Java (until Java8 which I eagerly await) is why we predominantly use RxJava in other languages that support them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants