Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backpressure story #71

Closed
benlesh opened this issue Jul 13, 2015 · 46 comments
Closed

backpressure story #71

benlesh opened this issue Jul 13, 2015 · 46 comments

Comments

@benlesh
Copy link
Member

benlesh commented Jul 13, 2015

This issue on StackOverflow illustrates a problem with backpressure in current RxJS, I think

While I think this guy could solve his problem with an Iterator of Promise, I think it would be better if RxJS could support this with fewer object allocations.

cc/ @benjchristensen

@headinthebox
Copy link

My opinion about back pressure is that you either have a truly reactive system where the producer is fully in charge and the onus is on the consumer to keep up; or a truly interactive the consumer is fully in charge and the onus is on the producer to keep up. This is one of the cases where having different types of collections makes sense (just like we have different collections like lists, maps, sets, ... with different performance characteristics and different operations). @benjchristensen has a different opinion on that.

@benjchristensen
Copy link
Member

Any system that has hidden unbounded buffers is a recipe for disaster. RxJava was rightfully dismissed outright by many because it would consume an entire stream into unbounded buffers in zip, merge and observeOn. Adding backpressure to RxJava was painful (to figure out and retrofit) but has been worth it.

Just because some use cases should be "completely reactive" (such as mouse events), it does not mean that signals are not needed for backpressure. Even with mouse events, if the consumer is slow, an unbounded buffer in observeOn, zip or merge should not be accumulating and the consumer silently falling behind and the application losing memory. It should be an error and blow up if the developer has not put in necessary flow control.

In other words, an application should not silently drop or do unbounded buffering data. It should always be explicit to do either of those things.

So even for the mouse event use case, the backpressure mechanism becomes a signal for flow control to kick in (conditionally drop, buffer, debounce, throttle, etc only when backpressure happens) or errors out and tells the developer to add flow control either conditionally (i.e. onBackpressure*) or permanently (temporal operators like debounce, sample, throttle, etc).

If the "reactive collection" doesn't have unbounded buffers, them I'm fine with it existing separately, but that means it can't have any async operators like merge, observeOn, or zip – and we know that those are necessary for application use cases. Since we need async operators, and I think we should never have unbounded buffering, that means the "reactive collection" should support backpressure signals.

The signals can be ignored. And infinite can be requested if backpressure is not wanted. But the API and semantics should support it so it composes through and the library can exist without silent unbounded buffering, memory leaks, or dropping of data.

@benjchristensen
Copy link
Member

I suggest that the collaboration for Reactive Streams hardened the API, interaction model, and contracts quite well.

It would look something like this in Javascript:

interface Observable {
    subscribe(o: Observer<T>) : void; // or could return Subscription
    lift(operator): Observable<R>;
}

interface Subscription {
    unsubscribe(): void;
    isUnsubscribed(): boolean;
    request(n: number): void // optional if backpressure is added
}

interface Observer<T> {
   onSubscribe(s: Subscription): void;
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

The consumer then can either invoke subscription.request(Number.MAX_VALUE) if no backpressure is needed (synchronous firehose) or request up the number of elements that can be enqueued: subscription.request(n). It keeps invoking request for more as it drains its internal queue and permits the producing Observable to keep emitting more.

If the Observable is hot, or can't otherwise be "paused", then it either blows up (i.e. BackpressuneException) or another operator composes in a flow control strategy (drop, buffer, throttle, sample, debounce, etc).

@headinthebox
Copy link

zip in particular was naively implemented, and should be a synchronization primitive like CyclicBarrier.

People have been using observeOn for ages in the UI space with no problems (https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.post(v=vs.110).aspx) or https://docs.oracle.com/javase/8/javafx/api/javafx/application/Platform.html, but if you do not want buffering you can use https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.send(v=vs.110).aspx.

If you are doing a merge of and infinite number of infinite streams that is as bas as doing unbound recursion, or allocating an unbounded number of objects. You make sure that this does not happen.

Also note that anytime you use say a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html or a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html you are using a hidden unbounded buffer.

Saying "unbounded buffer are verboten" is as fundamentalist as Haskell people saying all side-effects are bad, or the Akka folks saying everything is an actor with strict asynchronous boundaries, or Java insisting on checked exceptions. Programming is tastefully composing side effect, to write real stuff you have to break the rules, so you never say never, or always say always.

@benjchristensen
Copy link
Member

Also note that anytime you use say a Semaphore or a Executor you are using a hidden unbounded buffer.

I know. That's why RxJava never holds a lock when emitting, and never uses synchronized or Semaphore to serialize multiple streams, but uses explicit, bounded, queues. This is why merge is an async operator that has buffers. It never blocks a thread, whereas Semaphores do – which is queueing.

Same issue with Executor as you mention. Again why we never expose Scheduler directly as a raw API in Observable sequences. The scheduling into the underlying unbounded queues of Executor are always governed by the backpressure of the Observable sequence so they are bounded.

if you do not want buffering you can use https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.send(v=vs.110).aspx.

Using synchronization, and call-stack blocking to avoid observeOn (or other buffers) is not an option when event-loops are involved, so they are not a solution. Event loops are almost always involved, particularly in Javascript. Thus, synchronous approaches to backpressure are not an option.

fundamentalist

This is not a fundamentalist approach. In fact, I'd argue that you are taking the fundamentalist approach claiming "reactive" requires unbounded buffers. I'm arguing against the "purity" of pure-push, and allowing the type to support moving between push and pull depending on the speed of the producer/consumer relationship. It is embracing the messiness of our applications and abstractions.

I do not say that "unbounded buffers are verboten", I say that hidden "unbounded buffers are verboten". If someone wants to say observable.onBackpressureBuffer().subscribe(o) and explicitly put an unbounded buffer in the mix, then so be it.

People have been using observeOn for ages in the UI space with no problems

It is not correct to say "no problems". People work around them because they haven't been given tools to do otherwise. Naively consume a Websocket stream for example using RxJS in a browser. I myself have had these problems. Ben Lesh linked to someone else having similar issues in another environment RxJS will be used: Node.

We can state that an Observable is ONLY reactive, without ability to support backpressure and flip into pull mode. But if that's the case, then the type is itself being fundamentalist and pushing people to find other solutions, which they will have to.

@staltz
Copy link
Member

staltz commented Jul 13, 2015

@headinthebox

the producer is fully in charge and the onus is on the consumer to keep up

@benjchristensen

In other words, an application should not silently drop or do unbounded buffering data. It should always be explicit to do either of those things.

I don't think you guys fundamentally disagree, but as you can see from the quotes, we just need to find a way how to determine consumer behavior when producer is too fast. Implicit ("hidden") has gotchas for developers. Explicit definition of lossy or lossless consumption should be how the consumer handles its onus.

@benlesh
Copy link
Member Author

benlesh commented Jul 13, 2015

@benjchristensen could you give us a few use-cases for backpressure we'll run into at Netflix? (since that's part of the reason I'm proposing this) and I know that you an @jhusain disagree on the need for backpressure in RxJS.

@benjchristensen
Copy link
Member

we just need to find a way how to determine consumer behavior when producer is too fast

Agreed, and I suggest that this is exactly what @headinthebox and I spent 9+ months with many in the RxJava community figuring out with "backpressure" which behaves as an upstream signal from the consumer to the producer. Hence my advocating for it here. It's something we have implemented and proven in production. It's not just a theory.

Here is more information on it:

@benlesh
Copy link
Member Author

benlesh commented Jul 13, 2015

@benjchristensen can we get some use cases specifically to help make sure we're making good decisions?

I mean, it seems like one could do a something like this primitive example to deal with backpressure to some degree:

var requestor = new BehaviorSubject(true);

source.buffer(requestor)
  .flatMap(buffer => Observable.from(buffer)
    .finally(() => requestor.next(true)))
  .subscribe(x => console.log(x));

@benlesh
Copy link
Member Author

benlesh commented Jul 13, 2015

I'm not for or against the idea (I created the issue, afterall), but I want it considered up front.

@benjchristensen
Copy link
Member

@Blesh yes, I can do that a little later tonight

@benjchristensen
Copy link
Member

@headinthebox going back to your point about having multiple collection types, I'd like to explore that idea with you. Let's say we had something like this:

  • Observable – purely reactive, no backpressure
  • Flowable (just for sake of a name) – reactive push/pull supporting backpressure
interface Flowable {
    subscribe(o: FlowObserver<T>) : void; // or could return Subscription
    lift(operator): Flowable<R>;
}

interface FlowableSubscription {
    unsubscribe(): void;
    isUnsubscribed(): boolean;
    request(n: number): void // optional if backpressure is added
}

interface FlowObserver<T> {
   onSubscribe(s: Subscription): void;
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

Then Observable as the original and simpler:

interface Observable {
    subscribe(o: Observer<T>) : void; // or could return Subscription
    lift(operator): Observable<R>;
}

interface Subscription {
    unsubscribe(): void;
    isUnsubscribed(): boolean;
    add(s: Subscription): void
    remove(s: Subscription): void
}

interface Observer<T> extends Subscription {
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

Then on Flowable we would NOT have temporal flow control operators like sample, throttle, debounce etc. On Observable we would have the temporal flow control.

A Flowable can easily convert into an Observable that ends up fire hosing. An Observable would convert into a Flowable with a strategy given to it such as drop, buffer, debounce, sample, etc.

Is this what you're thinking instead of having one type support both use cases?

@benjchristensen
Copy link
Member

Use cases for backpressure:

  • writing to async IO in a way that goes at the rate of writing (pauses when buffer is full)
    • this is applicable to writing to databases, event streams, websockets, etc
    • Node.js will start to be used like our Java apps so all similar use cases apply
  • composing consumption and production over network boundaries
    • http://reactive-streams.io and http://reactive-socket.io
    • we will have these types of protocols over TCP, WebSockets, Aeron etc
    • we will have full duplex interaction of this nature between Java and Node.js, Node.js and browsers, Java and browsers
    • the network protocol will compose backpressure over it so we have application level consumption signals so the server-side doesn't fill network buffers (which can cause head-of-line blocking on the consumption side, do more work than necessary server-side, overwhelm memory, etc)
  • composing hot and cold streams with operators like merge and zip
    • the cold stream will firehose everything into buffers inside merge and zip
    • particularly bad for infinite, lazy Observables like range
  • file serving from Node.js?
    • this is probably a solved problem in Node.js, but if someone wants to use RxJS to read, manipulate, compose, transform before writing to IO, it needs backpressure unless it is a very small file
  • distributed stream processing
    • it is not hard to see Node.js getting used in our Mantis stream processing system where they are infinite, async event streams across network boundaries that absolutely must apply backpressure
  • just normal merge, zip, observeOn use cases while keeping memory usage low
    • for example, RxJava on Android has default bounded buffers kept very small since mobile devices have such limited memory
    • this applies to seemingly "small" sizes like a stream of 100s of items, even that may not want to be buffered, and by default RxJava on Android does not, it will only buffer up to 16 items, whereas server-side it defaults to 128. This means it "backpressures" and does not request more than 16 at a time.
    • considering the memory constraints on devices, this is likely the same concern for JS as it is on Android, though perhaps not at all a concern in a desktop browser
    • the expectation would be that request(n) semantics would apply to something like a websocket connection when streaming say 100s of video titles down, with no more than 16 outstanding at a time

This is a dumping of thoughts, some are more important than others, and all definitely don't apply to all environments (desktop browser, mobile browser, mobile app, Node.js server), but they are representative of the type of stuff we will deal with as we embrace more stream oriented programming and IO.

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

Okay, it doesn't seem like the use cases you're mentioning are at all edge cases. In fact, some seem fairly common at least in server-land where you want to maximize data throughput.

So I think what we need to weigh here is:

  1. VALUE: How this solves the above use cases better than what RxJS currently has
  2. COST: How this will effect performance of the library. It's likely not zero cost (Although maybe it is if we have a different "Flowable" type)

I'm not too worried about cost in terms of "size" of the library, because there are plenty of ways to get around that.

@mattpodwysocki
Copy link
Collaborator

@Blesh people are pretty satisfied with the current backpressure mechanisms with pausableBuffered and controlled given that they're hard but they'e been fixed.

@headinthebox
Copy link

Let me state upfront that I am not against backpressure, not at all. All the use case above make total sense.

What I am against is the way backpressure is implemented in RxJava/Reactive Streams where every operator has to know about it. That is clowny and inefficient.

Just like the selector function of map or the predicate of filter doesn't know anything about backpressure, neither should map and filter have anything to do with backpressure. They should just let the values flow through them as fast as they can.

To achieve backpressure you wire up things like a classical feedback system https://en.wikipedia.org/wiki/PID_controller. There the plant/process, consisting of Rx operators, just transforms input to output streams. It knows absolutely nothing about backpressure. That is all done by the feedback loop where at one point you measure the value of a setpoint (which has no "knowledge" of the plant/process) and use that to throttle the input source, which does know how to handle backpressure (and thus probably has an interface like Flowable above).

Note that in the current ReactiveStream model, you still need such a feedback loop + a controller to know how much to request. If you ever request > 1, you run the risk that the consumer gets overwhelmed.

@LeeCampbell
Copy link

I find myself agreeing with the sentiment of both @headinthebox and @benjchristensen here.
I can see that Ben is worreid about the numerous unbounded buffers that exist in Rx.
I can see Erik's point that to introduce the concept of backpressure to most operators in Rx is bloated and distracting.

While I am unfamiliar with The PID controller design, I think this argument echos what we have found with the limits of where Rx is suitable. Rx is great for composing many sequences of events. Rx is not great at workflow. Rx is not great at parallel processing.

The concern for hidden unbounded buffers seems to more suitably addressed by introducing explicit buffers, however I dont think these belong in the Rx query.
I believe that in your Rx flow, you compose the event streams as you need to and then at points where unbounded buffers can occur, you introduce an explicit queue. On the other side of the queue may be another Rx query, if may also have some logic to notify other systems that the queue is too full.
My point is that this should be solved as a pattern, with education, not as a bloating to the Rx API.

Keep Rx a single direction event query API.

@benjchristensen
Copy link
Member

They should just let the values flow through them as fast as they can.

@headinthebox That is how how it's implemented in RxJava. Note that map (https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMap.java) does not know anything about backpressure. Filter has to know a little (https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorFilter.java#L57) because it affects the stream (dropping data), but it is trivial. Data flows through them "as fast as they can".

To achieve backpressure you wire up things like a classical feedback system

I understand this argument as we've discussed it during the RxJava backpressure design last year. However, it does not work well in practice because hidden unbounded queues in merge, observeOn, and zip require internal knowledge of operator implementations to know where and how to wire up the feedback loops. And the feedback loops must be manually wired up, so it's not something most people can successfully do.

The end result is people choose (generally rightfully) to not use Rx. This was a massive push-back I received for server-side use of Rx in Java, and the same will apply to the Node.js ecosystem (which I am now involved in building for).

the current ReactiveStream model, you still need such a feedback loop

The feedback loop is handled automatically within the operator implementations, generally by the internal bounded queue depth. And this only applies to async operators that have queues, such as merge, zip, observeOn, replay, etc. It automatically slows the producer to the rate of the consumer because all queues are bounded and it requests data to fill the bounded queues, while remaining async, unlike an Iterable, and batched, unlike an AsyncIterable.

@mattpodwysocki The pause approach was prototyped in RxJava first, but it does not work over network boundaries. By the time the pause signal reaches the async producer, all buffers can already be either saturated or overwhelmed. The controlled model is out-of-band and does not compose. It does meet the need to request data at a controlled rate, but for the same reasons I describe above, the feedback loop needs knowledge of all hidden unbounded buffers so it can inject itself correctly, and is thus error prone, difficult to compose and very manual to make work.

The ReactiveStreams/RxJava model has been proven to compose and work well in large (distributed Netflix API and stream processing) and small (Android) systems, enabling it to be built without hidden unbounded buffers, and working over async subscription boundaries, such as networks. Pausable does not meet those needs, and controlled does not compose.

@benjchristensen
Copy link
Member

@LeeCampbell thanks for getting involved!

Keep Rx a single direction event query API.

This effectively means RxJava v1 should not exist as designed and that most of the use cases I currently use Rx for should stop using Rx. Nor will I be able to use Rx for the stream-oriented IO I'm working on between Java and Node.js. The reason is that Rx without backpressure support can't easily compose with the needed "push/pull" semantics, so I'll end up instead using RxJava/ReactiveStream APIs and exposing those, and then for composition reasons, adding operators to that library and end up with a competitor to Rx whose only difference is that I support the upstream request capabilities.

@headinthebox
Copy link

@LeeCampbell My original vision (which has not changed) for Rx was as a single direction event query API (it used to be called "LINQ to Events") where the producer determines the speed. For (async) pull, where the consumer determines the speed, we shipped IAsyncEnumerable at the same time, plus push-pull adapters to convert between the various kinds of streams.

@benjchristensen I was just using map and filter as generic examples for operators, like foo and bar. And see one of my older entries for comments on zip (if you want to combine streams as opposed to synchronize you should use combineLatest), and observeOn; note thatflatMap technically does not have internal queues or buffers.

Adding backpressure to RxJava has yielded in a very long bug tail that is still dragging on. The implementation of merge with backpressure is nearly 800 of very intricate LOC, the implementation of merge without backpressure is about 30 trivial LOC. The other operators are similarly more complex, even innocent operators like take and single (need to) deal with backpressure.

By factoring out back pressure to a special backpressure sensitive source that can be sped up or slowed down controlled by a feedback loop, you factor out all that complexity into a single place. And, if you don't use it, you don't pay.

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

If we were to merge the Subscription and Observer types into one type, with lift would it not be possible to subclass Observable into a BackpressureObservable (or the like), and use lift to add a request() method to Subscription, then override operators such as zip and the like on the BackpressureObservable to get exactly what @benjchristensen is looking for, with no cost? @trxcllnt has already demonstrated that overriding lift can allow one to compose types through.

@benjchristensen
Copy link
Member

A 30LOC implementation in Java would have to assume use of synchronized or mutex locks which is wrong as it blocks event loops. It would also have to ignore many other performance improvements like ScalarSynchronousObservable optimizations that have nothing to do with backpressure – but add LOC.

I will stop arguing this point. But without composable backpressure in RxJS I will not be able to use it for the Node.js apps I am building so will end up building a competitor.

@headinthebox
Copy link

Doesn't node.js already have a stream API https://nodejs.org/docs/latest/api/stream.html?

@trxcllnt
Copy link
Member

I would also like back pressure control compose through. @benjchristensen setting aside operator complexity, is it possible to quantify the runtime cost when it isn't used?

@Blesh it doesn't seem like something you can solve with lift since it affects operator behavior. If I understand correctly, you'd have to override every operator on a BackpressureObservable to respect back pressure semantics.

On Jul 14, 2015, at 12:26, Ben Christensen notifications@github.com wrote:

A 30LOC implementation in Java would have to assume use of synchronized or mutex locks which is wrong as it blocks event loops. It would also have to ignore many other performance improvements like ScalarSynchronousObservable optimizations that have nothing to do with backpressure – but add LOC.

I will stop arguing this point. But without composable backpressure in RxJS I will not be able to use it for the Node.js apps I am building so will end up building a competitor.


Reply to this email directly or view it on GitHub.

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

you'd have to override every operator on a BackpressureObservable to respect back pressure semantics.

That's exactly right. I was just trying to think of a compromise.

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

At this point, I'm generally for an implementation with a solid backpressure story. As long as the impact on performance isn't grotesque. I think there are plenty of things we can do to make sure the queuing internally is highly performant, such as leverage libraries like @petkaantonov's deque, which we should honestly be investigating regardless.

Right now we have unbounded arrays in operators like zip, and there are definitely plenty of real-world use cases where that becomes problematic. Also, I'm not a big fan of Iterator of Promise, or Promises in general, so I find the idea that I'd have to use something like that for backpressure control across network boundaries really gross.

I want to set up a separate issue to discuss joining the Subscription and Observer as a single type (with separate interfaces), as discussed above. I think that's a related issue that needs flushed out.

@benjchristensen
Copy link
Member

Adding backpressure to RxJava has yielded in a very long bug tail that is still dragging on

Yes, it was painful retrofitting it into existing code. We had 50+ operators all implemented over 18+ months without it and have had to spend the long effort to go back and rewrite them. We also had to make complicated concessions to design to add backpressure without breaking the existing APIs.

Starting from scratch is a very different thing, especially after having done it once.

Doesn't node.js already have a stream API

Yes it does. But from what I can tell it target byte streams, like File IO, not Objects. Nor is it as flexible as Rx from what I can tell looking at it's API.

is it possible to quantify the runtime cost when it isn't used

In JS I can't give useful information unless we start building and compare. In Java we of course are concerned with performance as well since we execute 10s of billions of onNext a day in Netflix production through RxJava. We have seen backpressure as an incremental impact on perf for raw throughput, as there is more bookkeeping.

However, it has also allowed us to do optimizations that were unavailable before. For example, we always use pooled ring buffers now, instead of requiring unbounded buffers that can grow. So our object allocation and GC behavior is actually better. This took effort, and was important to achieve good performance, but was an option we didn't even have previously since we had to support unbounded growth.

I suggest two approaches to move forward:

  1. 2 separate types living alongside each other, such as Flowable and Observable.
  2. We implement Observable with and without backpressure and see if there are performance or complexity reasons to have 2 separate types or if we can just keep them combined.

@benjchristensen
Copy link
Member

A note on AsyncEnumerable/AsyncIterable. I'm aware of this type and know that functionally it fits many of my use cases, but it is too inefficient. It allocates a Future/Promise per element. It's the same reason we didn't implement backpressure on an Observable using Future<Void> onNext(T t). Theoretically it all hangs together. In practice, it is far too inefficient due to allocations and GC.

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

In JS I can't give useful information unless we start building and compare.

Nobody can, really. It'll obviously have some impact, but it's hard to say what. I'd like to make some decisions about #75 before implementing this, because I think it could drastically change the implementation. I could, of course, be totally wrong about that.

@benjchristensen
Copy link
Member

Here are some concrete numbers of throughput for merge in varying scenarios (the scenarios matter a lot) for RxJava on my laptop. The code for the tests is here: https://github.com/ReactiveX/RxJava/blob/1.x/src/perf/java/rx/operators/OperatorMergePerf.java

These results show ops/second. I annotated the first few to show how many onNext/second it translates into.

Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4567774.035   148897.669    ops/s // 4.5million onNext/second
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    68620.728     2404.157    ops/s // 68million onNext/second
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       84.931        3.133    ops/s // 84million onNext/second
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5   104864.661     1911.610    ops/s // 104k onNext/second
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        5.592        0.215    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4254142.048   103396.131    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   526155.607    10843.846    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    59030.208     3284.554    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4662177.459   190521.858    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       72.060        1.741    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    84378.798      988.059    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3529.251      154.939    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4495344.037    99403.589    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    45401.568     7270.911    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       45.958        1.373    ops/s

Note of course that these async cases are demonstrating synchronization across multiple threads, which would not exist in Javascript.

@LeeCampbell
Copy link

This effectively means RxJava v1 should not exist as designed

Ok. ;-)

@benjchristensen
Copy link
Member

@Blesh @trxcllnt and I spent some time at a whiteboard ... since JS is a single-threaded environment, we likely can have very good fast-path options for performance, even if we have a single Observable type that caters to both.

The options I see right now after that conversation are:

1) Different types
  • Observable without backpressure - purely reactive
  • SomeCleverName - push/pull with backpressure signals

Converting between them would look something like this:

observable.toCleverName(Strategy.DROP)
observable.toCleverName(Strategy.BUFFER)
observable.toCleverName(Strategy.THROTTLE, 5_SECONDS) // pseudo code obviously

clearName.toObservable() // no strategies needed since it will just subscribe with Number.MAX_VALUE
2) Observable with RxJava v2 (Reactive Stream) semantics

This has an Observer like this:

interface Observer<T> {
   onSubscribe(s: Subscription): void;
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

This is a cleaner type model, but means a firehose Observable has a touch more code than in option 3 below:

Observable.create(o => {
    var s = new Subscription();
    o.onSubscribe(s);
    var i=0;
    while(!s.isUnsubscribed) {
      o.onNext(i++)
    }
})

The backpressure capable case would look like this:

Observable.create(o => {
    var requested=0;
    var i=0;
    o.onSubscribe(new Subscription(n => {
        if((requested+=n) == n) {
            while(!isUnsubscribed) {
              o.onNext(i++)
            }
        }
    }));
})
3) Observable with RxJava v1
interface Observer<T> extends Subscription {
   setProducer(s: Producer): void;
   onNext(t: T): void;
   onError(e: any): void;
   onComplete(): void;
}

This is more complicated than option 2 as it involves both Subscription and Producer types, but means the firehose Observable case can completely ignore creating a Subscription and Producer, like this:

Observable.create(s => {
    var i=0;
    while(!s.isUnsubscribed) {
      s.onNext(i++)
    }
})

The backpressure capable case would look like this:

Observable.create(s => {
    var requested=0;
    var i=0;
    o.setProducer(new Producer(n => {
        if((requested+=n) == n) {
            while(!s.isUnsubscribed) {
              s.onNext(i++)
            }
        }
    }));
})
Next Steps?

I suggest we start building option 2 and let performance numbers prove out whether we should end up with a single type or two types, and then pursue the bikeshedding on naming the thing.

I agree with @Blesh that if we do option 1 that we should have both types in the same RxJS library so the community, mind-share, collaboration, interoperability, etc all coexist. They will however be modularized so the artifacts can be published and depended upon independently.

Anyone have a recommendation for a better way forward?

@benlesh
Copy link
Member Author

benlesh commented Jul 14, 2015

@benjchristensen, per our in-person discussion with @trxcllnt, I'd like people to better understand the implications backpressure has on groupBy and merge in particular.

For example, we can fast-path merge if the consumer requests Infinity, but if you have a zip after the merge, even if you request Infinity, you're going to create a slightly slower path in the merge, because it will be under backpressure control everywhere above zip (to prevent unbounded buffers in zip)

I'm not saying this is good or bad, I'm saying it's the nature of real backpressure control that actually composes.

@benlesh
Copy link
Member Author

benlesh commented Jul 15, 2015

Also, some "Netflixy" context:

This type will exist somewhere

@benjchristensen's team has a very real need for an Observable type with backpressure. As such, this type WILL exist. And it can exist in one of three ways:

  1. We work it into this Observable type
  2. We develop it as a separate type alongside our Observable in this library/community
  3. Netflix creates a totally separate library/community

Different strokes for different folks within Netflix

@trxcllnt and @jhusain have need for an Observable type that has very short function call stacks and is very, very fast. This is because they need an Observable that will operate in non-JIT'ed environments on very weak hardware (That cheap smart TV you bought three years ago, for example).

... so this decision will not be met with any small amount of debate here at Netflix. I'm really hoping the larger community of GitHub comes in handy with thoughtful comments and concerns on both sides.

@benjchristensen
Copy link
Member

Speaking more with @jhusain, I'm fine with RxJS not including backpressure for now. I will use alternate APIs (i.e. Reactive Streams) at the network protocol layer that expose backpressure over the network and then bridge into RxJS which will request 'infinite'.

The primary use cases right now in JS are tackling small enough datasets that they can be swallowed into buffers. Focusing on a highly performant RxJS is higher priority than supporting backpressure at this time, even if separated into a different yet-unnamed type

@staltz
Copy link
Member

staltz commented Jul 15, 2015

JS being primarily used to build user interfaces (and 90+% on the average browser if you consider the whole community), backpressure should not be a priority. If a push/pull sequence with backpressure is needed on Node.js, that isn't a use case for Rx. It's pretty "cheap" to create yet another JS library (much cheaper/quicker than to create a Java library, for instance), I don't see why force Rx to solve all use cases.

@headinthebox
Copy link

+1

@benlesh
Copy link
Member Author

benlesh commented Jul 15, 2015

Since @benjchristensen was the primary advocate for this feature, I'm closing this issue for now. If at a later time we decide to add an additional type to support this, we can revisit.

@benlesh benlesh closed this as completed Jul 15, 2015
@jeffbski
Copy link
Contributor

Doesn't node.js already have a stream API

Yes it does. But from what I can tell it target byte streams, like File IO, not Objects. Nor is it as flexible as Rx from what I can tell looking at it's API.

@benjchristensen Just for documentation completeness, Node.js's stream API does support objects as well as byte and string streams. Just set objectMode: true when creating a stream. https://nodejs.org/api/stream.html#stream_object_mode

I do agree that the Node.js stream API is not nearly as flexible as Rx.

@timruffles
Copy link
Contributor

It's a real shame RxJS 5.0 doesn't yet support backpressure. It precludes its use on the backend, or on the front-end when you're producing data.

Simple example I wanted to use RxJS for and can't because of this: interactive CSV parse from DOM File reference into batched HTTP uploads. Can't overwhelm the uploader during slow uploads consumer or we'll ENOMEM.

Reactive-programming without back-pressure is try { } catch(e) { /* meh, let's hope this doesn't happen */ } style development.

@benlesh
Copy link
Member Author

benlesh commented Jun 27, 2016

@timruffles You can handle back pressure in RxJS by using a BehaviorSubject and building an Observable chain that subscribes to itself.

Something like this might do:

// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items

// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
  // then concat map that into an observable of what you want to control with backpressure
  // you might have some parameterization here you need to handle, this example is simplified
  // handle side effects with a `do` block
  .concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
  // narrow it down to when there are no more left to request,
  // and pump another batch request into the BehaviorSubject
  .filter(remaining => remaining === 0)
  .mapTo(BATCH_SIZE)
  .subscribe(requests);

@benlesh
Copy link
Member Author

benlesh commented Jun 27, 2016

@timruffles keep in mind, you might want to code in delays or pauses for whatever other processes you might need to run. When you do that, you need to do it within that concatMap, most likely. In some cases you might want to do it just before the subscribe at the end. It all depends on what you're going for.

So you can do back pressure, it's just more explicit. Which has pros and cons, of course.

I hope this helps.

@trxcllnt
Copy link
Member

@timruffles I'd enjoy eventually adding RxJava-style compositional back-pressure support to RxJS, but I doubt I could get @Blesh to merge the PR ;-). So instead here's another example of something sort of backpressure-y that sounds similar to your use case. Recomputing the the thresholds and limits on each new request would probably get it closer to something you could use.

@woldie
Copy link

woldie commented Feb 22, 2017

Is there any mechanism for adding metadata to or classifying the hidden queue that forms behind a slow consumer? If there was a way to tag a consumer's queue, then a producer could specify rules which state "when a consumer downstream (or elsewhere in the same production) as me that is tagged with 'SLOW' has over n items in his queue, then I will be paused".

I'm thinking of using RxJS to control the virtual machine for a game-like system of mine. I can't predict the maximum number of graphics objects that a rendering frame will need to draw, because the user is in charge of that in the little programs they write to run on the virtual machine. My renderer is double buffered, and I'd like the virtual machine to become paused while there are more than 1 undrawn frames in the rendering queue waiting to be drawn. I sortof kindof do this pausing manually today when this condition arises, but I'm not happy with it.

Perhaps there are more performant ways to do the scheduling I want to do than to use RxJS, but I'm interested in at least orchestrating the big state transitions in the system with RxJS. Having that kind of organization would definitely help me get a better handle on the asynchrony and frequent yielding that I need to do that is all over my system.

Perhaps simply being able to tap into the statistics of a queue leading up to a consumer would be enough for me to implement something like this flow control myself. Please pardon my ignorance, I have some experience tinkering with RxJS and perhaps what I am describing is antithetical to the design or how the internals actually work.

@OliverJAsh
Copy link
Contributor

I spotted this project under the ReactiveX organisation: ReactiveX/IxJS.

Is IxJS the proposed solution for "pull sequences"? I.e instead of an observable, use an iterable/async iterable?

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests