Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking behaviour in RxJava #270

Closed
purplefox opened this issue May 14, 2013 · 8 comments
Closed

Blocking behaviour in RxJava #270

purplefox opened this issue May 14, 2013 · 8 comments

Comments

@purplefox
Copy link

Firstly - thanks for a great library :)

I am in the process of getting RxJava to play nicely with Vert.x http://vertx.io

If you don't know Vert.x, it's (very roughly) a polyglot reactor implementation for the JVM. I guess you can think of it superficially as a "Node.js for the JVM".

Vert.x provides 100% asynchronous APIs for our users, and everything is executed on an event loop - the exact same thread is always used for any piece of user code.

Vert.x (like Node.js) APIs generally take the form of:

void doSomething(someParam, Handler handler);

where Handler is a callback handler that gets executed asynchronously on the event loop when the result is available.

The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.

This means they become composable with RxJava and we can get rid of "callback hell".

Since everything is executed on the event loop in Vert.x it's essential that the RxJava API is 100% non-blocking.

Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking, e.g.

Observable<Message> concatenated = Observable.concat(obs1, obs2, obs3);
concatenated.subscribe(action); // This blocks!!

Unfortunately this will cause Vert.x to hang since the thread that blocks is the same thread that the events will be delivered on so they will never arrive :(

Looking at the code in OperationConcat.java I can see that a CountDownLatch is being used.

I can't see a good reason why any of the main operations in RxJava (including concat) can't be implemented in a 100% non blocking way, and this would be essential for Vert.x to use them.

Do you have any plans to refactor concat to work in a non blocking way? We would love to use RxJava in Vert.x, and I think Vert.x would be a good vehicle to push RxJava to an even bigger audience :)

Thanks in advance.

[Also.. I noticed that last() is also blocking which makes it unusable in Vert.x (or any non blocking system) Again I can't see a reason why it has to be.]

@headinthebox
Copy link
Contributor

In Rx.NET all blocking operators are deprecated, so perhaps we should not have Last in RxJava; however, if you really want the last value of a stream you ultimately need to block somewhere (your main program cannot terminate before seeing it).
You may also see some operators blocking when you use their default schedulers (where the assumption is that in the typical case they perform "a small amount of work", which is debatable).

In addition to providing “await” support for observable sequences, we’ve also added Async variants of a number of operators. First of all, we’ve deprecated blocking operations in favor of asynchronous ones. Those include:

First[OrDefault]Async
Last[OrDefault]Async
Single[OrDefault]Async
ForEachAsync

On May 14, 2013, at 5:03 AM, Tim Fox notifications@github.com wrote:

Firstly - thanks for a great library :)

I am in the process of getting RxJava to play nicely with Vert.x http://vertx.io

If you don't know Vert.x, it's (very roughly) a polyglot reactor implementation for the JVM. I guess you can think of it superficially as a "Node.js for the JVM".

Vert.x provides 100% asynchronous APIs for our users, and everything is executed on an event loop - the exact same thread is always used for any piece of user code.

Vert.x (like Node.js) APIs generally take the form of:

void doSomething(someParam, Handler handler);

where Handler is a callback handler that gets executed asynchronously on the event loop when the result is available.

The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.

This means they become composable with RxJava and we can get rid of "callback hell".

Since everything is executed on the event loop in Vert.x it's essentially that the RxJava is 100% non-blocking.

Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking, e.g.

Observable> concatenated = Observable.concat(obs1, obs2, obs3);
concatenated.subscribe(action); // This blocks!!

Unfortunately this will cause Vert.x to hang since the thread that blocks is the same thread that the events will be delivered on so they will never arrive :(

Looking at the code in OperationConcat.java I can see that a CountDownLatch is being used.

I can't see a good reason why any of the main operations in RxJava (including concat) can't be implemented in a 100% non blocking way, and this would be essential for Vert.x to use them.

Do you have any plans to refactor concat to work in a non blocking way? We would love to use RxJava in Vert.x, and I think Vert.x would be a good vehicle to push RxJava to an even bigger audience :)

Thanks in advance.

[Also.. I noticed that last() is also blocking which makes it unusable in Vert.x (or any non blocking system) Again I can't see a reason why it has to be.]


Reply to this email directly or view it on GitHub.

@purplefox
Copy link
Author

Thanks,

I'm not to bothered about last(), it's really the blocking nature of the
concat operator which is the main problem.

On 14/05/13 16:12, headinthebox wrote:

In Rx.NET all blocking operators are deprecated, so perhaps we should not have Last in RxJava; however, if you really want the last value of a stream you ultimately need to block somewhere (your main program cannot terminate before seeing it).
You may also see some operators blocking when you use their default schedulers (where the assumption is that in the typical case they perform "a small amount of work", which is debatable).

In addition to providing “await” support for observable sequences, we’ve also added Async variants of a number of operators. First of all, we’ve deprecated blocking operations in favor of asynchronous ones. Those include:

First[OrDefault]Async
Last[OrDefault]Async
Single[OrDefault]Async
ForEachAsync

On May 14, 2013, at 5:03 AM, Tim Fox notifications@github.com wrote:

Firstly - thanks for a great library :)

I am in the process of getting RxJava to play nicely with Vert.x http://vertx.io

If you don't know Vert.x, it's (very roughly) a polyglot reactor implementation for the JVM. I guess you can think of it superficially as a "Node.js for the JVM".

Vert.x provides 100% asynchronous APIs for our users, and everything is executed on an event loop - the exact same thread is always used for any piece of user code.

Vert.x (like Node.js) APIs generally take the form of:

void doSomething(someParam, Handler handler);

where Handler is a callback handler that gets executed asynchronously on the event loop when the result is available.

The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.

This means they become composable with RxJava and we can get rid of "callback hell".

Since everything is executed on the event loop in Vert.x it's essentially that the RxJava is 100% non-blocking.

Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking, e.g.

Observable> concatenated = Observable.concat(obs1, obs2, obs3);
concatenated.subscribe(action); // This blocks!!

Unfortunately this will cause Vert.x to hang since the thread that blocks is the same thread that the events will be delivered on so they will never arrive :(

Looking at the code in OperationConcat.java I can see that a CountDownLatch is being used.

I can't see a good reason why any of the main operations in RxJava (including concat) can't be implemented in a 100% non blocking way, and this would be essential for Vert.x to use them.

Do you have any plans to refactor concat to work in a non blocking way? We would love to use RxJava in Vert.x, and I think Vert.x would be a good vehicle to push RxJava to an even bigger audience :)

Thanks in advance.

[Also.. I noticed that last() is also blocking which makes it unusable in Vert.x (or any non blocking system) Again I can't see a reason why it has to be.]


Reply to this email directly or view it on GitHub.


Reply to this email directly or view it on GitHub:
#270 (comment)

@benjchristensen
Copy link
Member

Hi Tim,

Great to hear from you. Vert.x is what I tell everyone they should use when they talk about Node.js and I'd be happy to work with you to resolve any issues preventing adoption of Rx inside Vert.x as I feel they are a perfect fit for each other (it's unfortunate to see promises become the standard in many other libraries and frameworks when Rx Observable/Observer can handle the promise use cases plus sequences, event notifications, infinite streams, scheduling, etc while still retaining a simple mental model).

The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.

This sounds perfect and exactly what I would like to see when using the Vert.x APIs.

Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking

The blocking nature of concat is a bug as it is supposed to be non-blocking. I or someone else can get on fixing that.

As for the blocking operators I'd really like to remove them from Observable and perhaps put them in a BlockingObservable where people can use them if they want.

For example:

BlockingObservable.from(normalObservable).single()

When we started with RxJava internally we didn't have any blocking operators as the whole point of this library for us was non-blocking composition of asynchronous Observables. They have been contributed since becoming open-source and I've been allowing them in so far since that is how Rx.Net does it and we've been trying to conform to Rx.Net as closely as possible (but I've been wanting to find a way to separate them that still maintains the spirit of that principle).

@headinthebox Thanks Erik for getting involved. (For anyone who does not know who Erik Meijer is, he is the inventor of Rx and was behind the original implementation at Microsoft)

Erik, due to your statement would you be comfortable with me pulling all of the blocking operators into a separate BlockingObservable that people can use if they want? The benefit to me is that they are still there for people consciously choosing to use them (so they don't lose that functionality) but they would not be part of the main Observable behavior and thus not confuse people and cause people to accidentally block (such as the difference between last and takeLast).

I would like to do this soon as part of version 0.9 and get the breaking changes out of the way before major systems get too far integrated (example: at Netflix I've been struggling with how to prevent use of blocking operators in a graceful way in our API layer). I have reserved the right to do breaking changes for things such as this until we hit 1.0 (which is still a bit out) to ensure we get the long-term design correct. I think this deserves the change as I'm not happy with where we've ended up mixing blocking and non-blocking operators.

@purplefox
Copy link
Author

Hi Ben,

Thanks for the reply.

Glad to hear that the blocking nature of concat is not intended.

+1 on the idea to move all the blocking stuff out. I think it would be cleaner to keep it nice and 100% non blocking.

@benjchristensen
Copy link
Member

@purplefox Once we get these issues resolved (both have pull requests for review right now) is there anything else preventing adoption?

@benjchristensen
Copy link
Member

Now that 0.9.0 is released, can you confirm whether this issue is resolved or not?

@purplefox
Copy link
Author

Thanks Ben. I will take it for a spin ASAP :)

@benjchristensen
Copy link
Member

Closing out as I believe this is resolved. If anything is blocking in 0.9 or later (other than in BlockingObservable) then please file another issue or re-open this one.

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants