Skip to content

Commit

Permalink
Direct pipe hookup: fix for pull sources. (Reference implementation o…
Browse files Browse the repository at this point in the history
…nly)

This reestablishes the meaning of the [[pulling]] flag to mean "is the `pull` callback executing right now," so that it can be used to avoid infinitely-recursive pull calls.

The meaning of [[pulling]] has gotten a bit confused. It does two things right now: prevent "too many" calls to `pull` in a row, and avoid reentrant calls in the pipe use case. (Reentrant calls are not possible in the non-pipe use case, since those will necessarily be mediated through wait(). I think.) Notably you can remove the line that checks [[pulling]] inside [[callPull]] without any tests failing except the one that is explicitly checking that `pull` is not called multiple times.
  • Loading branch information
domenic committed Apr 21, 2014
1 parent daf5032 commit 7ce96fb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
12 changes: 8 additions & 4 deletions reference-implementation/lib/base-readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ BaseReadableStream.prototype._fillPipeDest = function _fillPipeDest() {

this._pipeDest = undefined;
} else {
// Otherwise, do the usual thing you do with an empty buffer: ask to fill it.
this._callPull();
if (this._pulling) {
process.nextTick(this._callPull.bind(this));
} else {
this._callPull();
}
}
}
};
Expand All @@ -115,7 +118,6 @@ BaseReadableStream.prototype._push = function _push(data) {
}

this._buffer.push(data);
this._pulling = false;

if (this._pipeDest !== undefined) {
if (this._pipeDest.state === 'writable') {
Expand Down Expand Up @@ -185,7 +187,7 @@ BaseReadableStream.prototype._error = function _error(error) {
BaseReadableStream.prototype._callPull = function _callPull() {
var stream = this;

if (this._pulling === true) return;
if (this._pulling === true || this._draining === true) return;
this._pulling = true;

if (this._started === false) {
Expand All @@ -199,6 +201,7 @@ BaseReadableStream.prototype._callPull = function _callPull() {
} catch (pullResultE) {
this._error(pullResultE);
}
this._pulling = false;
}.bind(this));
} else {
try {
Expand All @@ -210,6 +213,7 @@ BaseReadableStream.prototype._callPull = function _callPull() {
} catch (pullResultE) {
this._error(pullResultE);
}
this._pulling = false;
}
};

Expand Down
20 changes: 20 additions & 0 deletions reference-implementation/test/piping.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,23 @@ test('Piping through a synchronous pass-through transform stream never causes ba
t.deepEqual(chunks, [1, 2, 3, 4]);
});
});

test('Piping through a synchronous pass-through transform stream never causes backpressure: sync pull', function (t) {
t.plan(5);

var counter = 0;
var rs = new BaseReadableStream({
pull : function (push, close) {
t.equal(push(++counter), true);
if (counter === 4) {
close();
}
}
});

var output = rs.pipeThrough(passThroughTransform());

readableStreamToArray(output).then(function (chunks) {
t.deepEqual(chunks, [1, 2, 3, 4]);
});
});

0 comments on commit 7ce96fb

Please sign in to comment.