-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can we add buffering strategies via a transform stream? #24
Comments
It's time to find out of this is possible. It's starting to block a bunch of other stuff, regarding the evolutionary direction of the buffering strategies, base-class split, etc. I hope to sit down and have a real hard think about this sometime in the next couple days. In the meantime, if anyone else wants to take a look, please keep me updated. We can use this issue as a scratchpad. |
Before this commit, BaseReadableStream would immediately exert backpressure (i.e. return `false` from `push`), even if the buffer was empty at the time of being pushed into. After it, backpressure will only be exerted if the buffer is nonempty. This sets the stage for (I think) solving #24, since before this commit, it was impossible for a BaseReadableStream to ever return `true` from `push`. After it, that is possible, as long as the stream clears its internal buffer as fast as it can.
Before this commit, BaseReadableStream would immediately exert backpressure (i.e. return `false` from `push`), even if the buffer was empty at the time of being pushed into. After it, backpressure will only be exerted if the buffer is nonempty. This sets the stage for (I think) solving #24, since before this commit, it was impossible for a BaseReadableStream to ever return `true` from `push`. After it, that is possible, as long as the stream clears its internal buffer as fast as it can.
OK. I started down this path in a branch. But I have run into a roadblock which is making me mad; as far as I can tell, the current spec does not allow this possibility. Let me explain. First off, what is this issue trying to accomplish? Our goal is to be able to have only var rs = getABaseReadableStreamWrappingAnUnderlyingSource();
var hwmTransform = new ByteLengthHighWaterMarkTransformStream(1024);
var output = rs.pipeThrough(hwmTransform); and the following requirements are obeyed:
We can even simplify this scenario, as I tried to do in the branch's Well, remember how backpressure signals are communicated to readable streams, and thus to underlying sources: the var rs = new BaseReadableStream({
start(push) {
source.ondata = chunk => {
if (!push(chunk)) {
source.exertBackpressure();
}
};
// etc.
}
}); The current definition of BRS's But wait! What if we make a small, reasonable tweak: it returns The hope is that this tweak will give us the breathing room we need, so that we can write our infinite-high-water-mark transform stream to immediately suck out data the moment it appears, so that This turns out to be impossible, though. Because consider a stream like this: var rs = new BaseReadableStream({
start(push) {
console.log(push('a'));
console.log(push('b'));
// etc.
}
}); i.e. a stream that synchronously pushes two buffer chunks in. (This is quite realistic, I believe: e.g. if the underlying source allocates buffers in a fixed size, but several of them are available at a given time.) There's no "time" in between the first push and the second for us, the transform stream, to suck away the data empty the buffer! So the second Why is there no "time"? Because once the buffer becomes empty, then our I believe Node streams get around this by using synchronous events, i.e. This seems kind of bad. I need to think about what we can do here. Off the top of my head:
Rarrrgh. |
Upon reflection solutions 2 and 5 seem most feasible, with 5 of course being easiest. I think I will start prototyping 2 tonight to see how it goes... |
Oh, I forgot to mention: the virtue of 2 is that it seems very in-line with #97. |
I'm in favor of 4, for the reasons that you list for 2 being kind of gross. Maybe I'm crazy, but it seems like there should be a solution that breaks I/O streaming down into a set of simple combinators that the high-level API functions compose. Put another way, there are primitive operations underlying all this, it's just whether they're visible or obvious underneath the current specification language that's at issue. I'll put some thought into how this might look and post here if I come up with anything that seems promising to me. I still feel like @Gozala's approach has promise here, but I agree that we're discussing that in enough other places for now that we don't need to do it here. |
I'd love any ideas you come up with, certainly. I think I'm most interested in ones that preserve the current API's capabilities and ergonomics, at least for the consumers of the readable and writable stream objects. (Modifying how the creators deal with things seems fine, and might indeed be the ticket out of here.) |
Before this commit, BaseReadableStream would immediately exert backpressure (i.e. return `false` from `push`), even if the buffer was empty at the time of being pushed into. After it, backpressure will only be exerted if the buffer is nonempty. This sets the stage for (I think) solving #24, since before this commit, it was impossible for a BaseReadableStream to ever return `true` from `push`. After it, that is possible, as long as the stream clears its internal buffer as fast as it can.
When piping: - The stream enters a "waiting" state until it closes. It is never directly readable. - Calling read() gives a TypeError. (An informative one!) - Calls to `push` forward themselves directly into the destination's `write` call, within a single tick. This is solution 2 to the problem mentioned in #24 (comment).
My work in #110 made me think of an idea for a re-thinking approach. It's not well baked yet so this might be rambly. But I want to get the thoughts down before I head to sleep. In #110, I made a direct pipe connection be given "special access" to the readable stream's internals: whenever data gets pushed into the readable stream from the underlying source, the piped-to stream will immediately, synchronously be written to (assuming it's currently writable). What's distasteful about this approach is that it gives piped-to streams special privileges that ordinary consumers of the stream do not get. That is, to consume data as fast as possible, you essentially have to create a writable stream and pipe to it; the usual read loops won't work. (The problem here, to reiterate, is that the usual read loops---the type What if we made this less of a hack, and more of a base part of the API? That is, the most basic part of the API for readable streams is "piping to a writable stream." (Those words might not apply as well anymore as e.g. "subscribing with an observer," but let's keep using stream terminology for now.) By default, the readable stream is hooked up to a writable stream that buffers things internally (call it Hmm, but that doesn't exactly work if the OK, let me step back and look at this again. Ignoring the buffering, the basic primitive is a base readable stream whose interface is more "push" than "pull", that "pushes" into a writable stream. The writable stream must have some way to communicate back when its data is acknowledged; if it is acknowledged slowly, the base readable stream applies backpressure to its underlying source. As-is, the base readable stream has no buffer at all. I guess if it is not hooked up to any writable stream, then the data gets dropped on the floor. Oh no!! Badness. But solvable. Now, how do we get the same API ergonomics as our current solution? I.e., how do we create readable streams which add buffering on top of our no-buffering base readable streams? Well, upon creation, readable streams (not base readable streams) are piped to a buffering transform stream (still an input/output writable/readable stream pair). Its input end will acknowledge writes from the original readable stream immediately, except if nobody has read from its output end quickly enough. Again we have the default-strategy vs. high-water-mark strategy idea; the default transform would e.g. acknowledge the write as long as its buffer is empty. But what's going on with the output end? Hmm. Let's see. Calling So I think the BRS API ends up being mostly What does this gain us? Maybe not enough over approach 5. It makes BRS even more useless, instead moving all buffering into RS. This is nice though because it "explains" the buffering in terms of existing mechanisms, namely transform streams. And I guess it allows people to build more complicated buffering strategies in a natural way, using the full power of transform streams and a fairly simple protocol, instead of us having to account for it manually and add fields to the buffering strategy objects (like It also seems to be converging with some Rx-style thinking, of observables and observers and push being primary. (Interestingly, when I ask Rx people which is primary, "hot" or "cold" observables, I get contradictory answers.) That's not necessarily a virtue in and of itself, but it does mean that there is relevant stuff to learn. I guess I need to spend some time prototyping this approach. It seems like a big enough departure that it's going to run into its own set of gotchas and failures, which makes me wary of trying it. But it seems like a net positive in that it doesn't reduce expressiveness or usefulness of the ReadableStream API (just BRS), and opens up a path toward explaining buffering strategies instead of baking them in. That is, unlike other potential big departures, this one doesn't shake up the API or ergonomics or applicability to our use cases. |
Something different than
I don't come up with any concrete example of this complicated buffering strategies. I think the strategy interface should be modified to be able to look at all contents in the |
Base stream interfaces should designed to work as a bufferAs:
, we cannot get rid of a buffer from the readable side interface. So, to me, it's natural to me that we implement buffering control mechanism directly on it. I'm not so sure why separating buffering strategy from the base interface is so important (I understand it's simpler but ...). It's already a "buffer". We want to control size of it. Could you please teach me or point some issue if exists to understand the reason you want to separate buffering from the BRS. As I've been stating, I really like number 5 of your plans. We're already (asynchronously) observing BRSCurrently, we're trying to control the BRS not to buffer much data by looking at You just made some tweak to make infinite transform stream to drain data sync to But I'm not convinced that such synchronous chaining is really essential. Once we bake the buffering strategy into the base, it's resolved at least for the "draining data If we really think making each |
Yay, glad to get your thoughts @tyoshino.
I think I agree in theory, but in practice I am not sure we can make this performant. In particular, any interface based on
Sure. I don't necessarily want to separate buffering from BRS entirely, mind. I just think it would be nice if the buffering strategy was somehow separable from the stream itself, so that you could choose different ones using syntax like The motivation here is largely to decouple concerns, if possible, and to provide a more flexible approach for custom buffering strategies. As-is, looking at That said, this is somewhat abstract, and I certainly am not ruled by a need for a theoretical decoupling purity. If the best approach that meets all the use cases is number 5, possibly with another hook to cover #76, then that's what we'll do. But I want to explore other options first to see if they yield something better. I appreciate you bringing things back to reality though. Perhaps it would be best to merge approach 5 into master ASAP and then continue developing alternatives like 2 and 4 in branches.
This makes perfect sense to me. But once I started going down this path, it led me to my above rambly ideas! E.g., this dam you propose could just be a predefined buffering transform, instead of baking it into the base stream; then you keep buffers out of the base stream entirely. (Assuming my rambly ideas translate well into code, that is!) The most straightforward hack to fix this would be to replace This is pretty distasteful, though, and hurts a future
I am convinced it is absolutely essential. Many, many transforms are synchronous, and imposing a next-microtask penalty for each step in such a synchronous transform chain is not a good thing. Even if your ultimate consumer is slow, you should get data to that consumer as fast as possible. It also is the foundational underlying behind any generic combinators (e.g.
Do you have an idea for this? Was |
Actually, I am not sure this is feasible. Given our next major work item is transform streams (that's the biggest missing feature IMO), the important problem that needs to be solved ASAP is whether synchronous transform streams are possible at all. This issue about buffering strategies is just the catalyst. Any ASAP solution that we merge in needs to solve that problem primarily. |
Sorry for delay
OK. Personally I'm fine if there we introduce a little complexity to integrate strategy into the BRS but don't object to trying the other options.
The base ideas behind my proposals are:
With this, I ... guess the API gets shaped well. It's possible there we find some important use cases not feasible to be covered as you're worried about... but for now.
Yeah, let's keep trying.
Looking forward to it!
This sounds good to me.
Do you mean a method works the same as current
Yes, though we could give some longer/descriptive name if necessary. |
I have largely given up on this idea, and baked queueing strategies into the base stream, with simple defaults. I have also been convinced that, without experience-based performance arguments, synchronous transforms are not compelling enough to be worth the way they break the clean-stack invariant. Closing this, but further discussion of queueing strategies, along the lines of the last few comments, to continue in #119. |
People have demonstrated how to do this with simpler always-async streams, e.g. in #13 (comment), but I am not sure how it would work with the current API.
In any case, it would be preferable to explain streams with buffering strategies (i.e.
ReadableStream
) in terms ofBaseReadableStream
+ something else, instead of havingReadableStream
overwrite parts ofBaseReadableStream
. Will take a bit of work, but should be doable.The text was updated successfully, but these errors were encountered: