Skip to content

Commit

Permalink
Don't exert backpressure after the first push.
Browse files Browse the repository at this point in the history
Before this commit, BaseReadableStream would immediately exert backpressure (i.e. return `false` from `push`), even if the buffer was empty at the time of being pushed into. After it, backpressure will only be exerted if the buffer is nonempty.

This sets the stage for (I think) solving #24, since before this commit, it was impossible for a BaseReadableStream to ever return `true` from `push`. After it, that is possible, as long as the stream clears its internal buffer as fast as it can.
  • Loading branch information
domenic committed Apr 21, 2014
1 parent 87c43b2 commit 6e2a21d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ BaseReadableStream.prototype.pipeTo = (dest, { close = true } = {}) => {
1. Set `this.[[pulling]]` to **false**.
1. Set `this.[[state]]` to `"readable"`.
1. Resolve `this.[[waitPromise]]` with **undefined**.
1. Return **true**.
1. If `this.[[state]]` is `"readable"`,
1. Push `data` onto `this.[[buffer]]`.
1. Set `this.[[pulling]]` to **false**.
Expand Down
2 changes: 0 additions & 2 deletions reference-implementation/lib/base-readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ BaseReadableStream.prototype._push = function _push(data) {
else if (this._state === 'readable') {
this._buffer.push(data);
this._pulling = false;

return true;
}

return false;
Expand Down
41 changes: 41 additions & 0 deletions reference-implementation/test/base-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,44 @@ test('BaseReadableStream cancellation puts the stream in a closed state (after w
});
}, t.ifError.bind(t));
});

test('BaseReadableStream returns `true` for the first `push` call; `false` thereafter, if nobody reads', function (t) {
t.plan(5);

var pushes = 0;
var stream = new BaseReadableStream({
start : function (push) {
t.equal(push('hi'), true);
t.equal(push('hey'), false);
t.equal(push('whee'), false);
t.equal(push('yo'), false);
t.equal(push('sup'), false);
}
});
});

test('BaseReadableStream continues returning `true` from `push` if the data is read out of it', function (t) {
t.plan(12);

var stream = new BaseReadableStream({
start : function (push) {
// Delay a bit so that the stream is successfully constructed and thus the `stream` variable references something.
setTimeout(function () {
t.equal(push('hi'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'hi');
t.equal(stream.state, 'waiting');

t.equal(push('hey'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'hey');
t.equal(stream.state, 'waiting');

t.equal(push('whee'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'whee');
t.equal(stream.state, 'waiting');
}, 0);
}
});
});

3 comments on commit 6e2a21d

@tyoshino
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks fine to me that we forward written data to the pipe destination when piping. I'm just a bit worried about possible stack growth on long chain of pipes.

But the new line shouldn't this be like this?

If this.[[buffer]] is empty, return true
Otherwise, return false

I know that this doesn't make sense inside the current BaseReadableStream algorithm definition but I think what you're trying to do is conceptually equivalent to this.

Not only this.[[buffer]] but also this.[[pulling]], this.[[state]] and this.[[waitPromise]] should be (and are actually in your updated reference impl.) updated after investigating how data has been processed (e.g. by checking emptiness of this.[[buffer]]). For non-piping case, the processing is just "push data onto this.[[buffer]]". If not, it's forwarding logic you're just prototyping.

@domenic
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tyoshino it seems like you're referring to the entire series of commits, and not just this one?

I'm just a bit worried about possible stack growth on long chain of pipes.

I think this is taken care of by 7ce96fb#diff-89532e378d4091617b906f870b427524R106 , which prevents reetrancy into the pull method, which was the potentially-recursive method I found. I am not sure I fully grasp the danger you're referring to though.

But the new line shouldn't this be like this?

I guess you are essentially proposing replacing "waiting vs. readable" with "empty vs. nonempty buffer" in the if statements? I can't really see a way to change the wording to make this work. E.g. by the time you want to return from the function the buffer will always be nonempty.

Not only this.[[buffer]] but also this.[[pulling]], this.[[state]] and this.[[waitPromise]] should be (and are actually in your updated reference impl.)

Indeed, I haven't updated the spec to reflect the last commit yet. Before merging I certainly will do so. I kind of want to investigate a variety of fixes for this large issue first.

Thanks for your thoughts!

@tyoshino
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entire series ...

Yes, but I might have missed some points. Sorry about that.

I think this is taken care of by ...

Oh, I overlooked it. Got it.

I guess you are essentially proposing ...

It seems I should re-read whole commits you made.

Indeed, I haven't updated ...

OK. Thanks!

Please sign in to comment.