Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform backpressure approach 2 request for comments #110

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ BaseReadableStream.prototype.pipeTo = (dest, { close = true } = {}) => {
1. Set `this.[[pulling]]` to **false**.
1. Set `this.[[state]]` to `"readable"`.
1. Resolve `this.[[waitPromise]]` with **undefined**.
1. Return **true**.
1. If `this.[[state]]` is `"readable"`,
1. Push `data` onto `this.[[buffer]]`.
1. Set `this.[[pulling]]` to **false**.
Expand Down Expand Up @@ -340,7 +341,7 @@ class BaseWritableStream {

// Internal properties
Array [[buffer]] = []
string [[state]] = "waiting"
string [[state]] = "writable"
any [[storedError]]
Promise<undefined> [[currentWritePromise]]
Promise<undefined> [[writablePromise]]
Expand Down Expand Up @@ -396,7 +397,7 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor
##### write(data)

1. If `this.[[state]]` is `"writable"`,
1. Set `this.[[state]]` to `"waiting"`.
1. If `this.[[buffer]]` is nonempty, set `this.[[state]]` to `"waiting"`.
1. Set `this.[[writablePromise]]` to be a newly-created pending promise.
1. Let `promise` be a newly-created pending promise.
1. Call `this.[[doNextWrite]]({ type: "data", promise, data })`.
Expand Down
140 changes: 86 additions & 54 deletions reference-implementation/lib/base-readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ function BaseReadableStream(callbacks) {
this._onPull = callbacks.pull;
this._onCancel = callbacks.cancel;

this._pipeDest = undefined;
this._pipeClose = undefined;
this._storedError = undefined;

this._waitPromise = new Promise(function (resolve, reject) {
Expand Down Expand Up @@ -79,33 +81,83 @@ function BaseReadableStream(callbacks) {
this._startedPromise.catch(function error(e) { stream._error(e); });
}

BaseReadableStream.prototype._push = function _push(data) {
if (this._state === 'waiting') {
this._buffer.push(data);
this._pulling = false;
this._state = 'readable';
this[WAIT_RESOLVE](undefined);
BaseReadableStream.prototype._fillPipeDest = function _fillPipeDest() {
assert(this._pipeDest !== undefined);
assert(this._pipeClose !== undefined);

return true;
// Move available data into a receptive destination.
while (this._pipeDest.state === 'writable' && this._buffer.length > 0) {
var data = this._buffer.shift();
this._pipeDest.write(data).catch(this.cancel.bind(this));
}
else if (this._state === 'readable') {
this._buffer.push(data);
this._pulling = false;

return true;
if (this._buffer.length === 0) {
if (this._draining === true) {
// If moving that data drained the entire stream, then we're done here.
this._state = 'closed';
this[CLOSED_RESOLVE](undefined); // this will also resolve this._waitPromise.

if (this._pipeClose) {
this._pipeDest.close();
}

this._pipeDest = undefined;
} else {
if (this._pulling) {
process.nextTick(this._callPull.bind(this));
} else {
this._callPull();
}
}
}
};

BaseReadableStream.prototype._push = function _push(data) {
if (this._state === 'closed' || this._state === 'errored') {
return false;
}

this._buffer.push(data);

if (this._pipeDest !== undefined) {
if (this._pipeDest.state === 'writable') {
this._fillPipeDest();

return this._pipeDest.state === 'writable';
} else if (this._pipeDest.state === 'waiting') {
this._pipeDest.wait().then(this._fillPipeDest.bind(this), this.cancel.bind(this));
} else {
// Destination has either been closed by someone else, or has errored in the course of someone else writing.
// Either way, we're not going to be able to do anything else useful.
this.cancel();
}

return false;
} else {
if (this._state === 'waiting') {
this._state = 'readable';
this[WAIT_RESOLVE](undefined);

return true;
}
}

return false;
};

BaseReadableStream.prototype._close = function _close() {
if (this._state === 'waiting') {
this._state = 'closed';
this[WAIT_RESOLVE](undefined);
this[CLOSED_RESOLVE](undefined);
}
else if (this._state === 'readable') {
if (this._pipeDest !== undefined) {
this._draining = true;
this._fillPipeDest();
} else {
if (this._state === 'waiting') {
this._state = 'closed';
this[WAIT_RESOLVE](undefined);
this[CLOSED_RESOLVE](undefined);
}
else if (this._state === 'readable') {
this._draining = true;
}
}
};

Expand Down Expand Up @@ -135,7 +187,7 @@ BaseReadableStream.prototype._error = function _error(error) {
BaseReadableStream.prototype._callPull = function _callPull() {
var stream = this;

if (this._pulling === true) return;
if (this._pulling === true || this._draining === true) return;
this._pulling = true;

if (this._started === false) {
Expand All @@ -149,6 +201,7 @@ BaseReadableStream.prototype._callPull = function _callPull() {
} catch (pullResultE) {
this._error(pullResultE);
}
this._pulling = false;
}.bind(this));
} else {
try {
Expand All @@ -160,6 +213,7 @@ BaseReadableStream.prototype._callPull = function _callPull() {
} catch (pullResultE) {
this._error(pullResultE);
}
this._pulling = false;
}
};

Expand All @@ -181,13 +235,16 @@ BaseReadableStream.prototype.read = function read() {
if (this._state === 'errored') {
throw this._storedError;
}
if (this._pipeDest !== undefined) {
throw new TypeError('Cannot read directly from a stream that is being piped!');
}

assert(this._state === 'readable', 'stream state ' + this._state + ' is invalid');
assert(this._buffer.length > 0, 'there must be data available to read');

var data = this._buffer.shift();

if (this._buffer.length < 1) {
if (this._buffer.length === 0) {
assert(this._draining === true || this._draining === false,
'draining only has two possible states');
if (this._draining === true) {
Expand Down Expand Up @@ -223,9 +280,11 @@ BaseReadableStream.prototype.cancel = function cancel() {
this._waitPromise = Promise.resolve(undefined);
}

// TODO: consolidate with other code like this? E.g. in pipeTo and read?
this._buffer.length = 0;
this._state = 'closed';
this[CLOSED_RESOLVE](undefined);
this._pipeDest = undefined;

return promiseCall(this._onCancel);
};
Expand All @@ -239,44 +298,17 @@ BaseReadableStream.prototype.pipeTo = function pipeTo(dest, options) {

var stream = this;

function closeDest() { if (close) dest.close(); }

// ISSUE: should this be preventable via an option or via `options.close`?
function abortDest(reason) { dest.abort(reason); }
function cancelSource(reason) { stream.cancel(reason); }

function pumpSource() {
switch (stream.state) {
case 'readable':
dest.write(stream.read()).catch(cancelSource);
fillDest();
break;
case 'waiting':
stream.wait().then(fillDest, abortDest);
break;
case 'closed':
closeDest();
break;
default:
abortDest();
}
if (this._pipeDest !== undefined) {
throw new TypeError('Cannot pipe to two streams at once. Consider a tee stream!');
}

function fillDest() {
switch (dest.state) {
case 'writable':
pumpSource();
break;
case 'waiting':
dest.wait().then(fillDest, cancelSource);
break;
default:
cancelSource();
}
}
fillDest();
this._pipeDest = dest;
this._pipeClose = close;
this._state = 'waiting';
this._waitPromise = this._closedPromise; // TODO: if we add unpipe this doesn't work.

return dest;
// Initial movement of any available data into a receptive destination.
this._fillPipeDest();
};

BaseReadableStream.prototype.pipeThrough = function pipeThrough(transform, options) {
Expand Down
7 changes: 5 additions & 2 deletions reference-implementation/lib/base-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ function BaseWritableStream(callbacks) {

this._buffer = [];

this._state = 'waiting';
this._state = 'writable';

this._onStart = callbacks.start;
this._onWrite = callbacks.write;
Expand Down Expand Up @@ -184,7 +184,10 @@ BaseWritableStream.prototype.write = function write(data) {

switch (this._state) {
case 'writable':
this._state = 'waiting';
if (this._buffer.length > 0) {
this._state = 'waiting';
}

this._writablePromise = new Promise(function (resolve, reject) {
stream[WRITABLE_RESOLVE] = resolve;
stream[WRITABLE_REJECT] = reject;
Expand Down
57 changes: 57 additions & 0 deletions reference-implementation/scratch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';

require('./index.js');
var Promise = require('es6-promise').Promise;

// TODO: handle errors.
var pushToOutput;
var closeOutput;
var passThroughTransform = {
input: new BaseWritableStream({
write: function (data, done, error) {
console.log('writing to the input side', data);
pushToOutput(data);
console.log('passThroughTransform.input.state', passThroughTransform.input.state);
done();
},

close: function () {
closeOutput();
}
}),

output: new BaseReadableStream({
start: function (push, close) {
pushToOutput = push;
closeOutput = close;
}
})
};

var makeSequentialBRS = require('./test/lib/sequential-brs');
var readableStreamToArray = require('./test/lib/readable-stream-to-array');

var rs = new BaseReadableStream({
start: function (push, close) {
// console.log(push('hi'));
Promise.resolve().then(function () {
console.log('---');
// console.log('rs.state', rs.state);
console.log(push('hey'));
// console.log('rs.state', rs.state);
console.log(push('what'));
// console.log('rs.state', rs.state);
console.log(push('whee'));
// console.log('rs.state', rs.state);
console.log('---');
close();
});
},
});

console.log('passThroughTransform.input.state', passThroughTransform.input.state);
var output = rs.pipeThrough(passThroughTransform);

readableStreamToArray(output).then(function (chunks) {
console.log('chunks', chunks);
});
41 changes: 41 additions & 0 deletions reference-implementation/test/base-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,44 @@ test('BaseReadableStream cancellation puts the stream in a closed state (after w
});
}, t.ifError.bind(t));
});

test('BaseReadableStream returns `true` for the first `push` call; `false` thereafter, if nobody reads', function (t) {
t.plan(5);

var pushes = 0;
var stream = new BaseReadableStream({
start : function (push) {
t.equal(push('hi'), true);
t.equal(push('hey'), false);
t.equal(push('whee'), false);
t.equal(push('yo'), false);
t.equal(push('sup'), false);
}
});
});

test('BaseReadableStream continues returning `true` from `push` if the data is read out of it', function (t) {
t.plan(12);

var stream = new BaseReadableStream({
start : function (push) {
// Delay a bit so that the stream is successfully constructed and thus the `stream` variable references something.
setTimeout(function () {
t.equal(push('hi'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'hi');
t.equal(stream.state, 'waiting');

t.equal(push('hey'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'hey');
t.equal(stream.state, 'waiting');

t.equal(push('whee'), true);
t.equal(stream.state, 'readable');
t.equal(stream.read(), 'whee');
t.equal(stream.state, 'waiting');
}, 0);
}
});
});
Loading