diff --git a/README.md b/README.md index 8d159d0f1..be77e7a81 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,7 @@ BaseReadableStream.prototype.pipeTo = (dest, { close = true } = {}) => { 1. Set `this.[[pulling]]` to **false**. 1. Set `this.[[state]]` to `"readable"`. 1. Resolve `this.[[waitPromise]]` with **undefined**. + 1. Return **true**. 1. If `this.[[state]]` is `"readable"`, 1. Push `data` onto `this.[[buffer]]`. 1. Set `this.[[pulling]]` to **false**. @@ -340,7 +341,7 @@ class BaseWritableStream { // Internal properties Array [[buffer]] = [] - string [[state]] = "waiting" + string [[state]] = "writable" any [[storedError]] Promise [[currentWritePromise]] Promise [[writablePromise]] @@ -396,7 +397,7 @@ In reaction to calls to the stream's `.write()` method, the `write` constructor ##### write(data) 1. If `this.[[state]]` is `"writable"`, - 1. Set `this.[[state]]` to `"waiting"`. + 1. If `this.[[buffer]]` is nonempty, set `this.[[state]]` to `"waiting"`. 1. Set `this.[[writablePromise]]` to be a newly-created pending promise. 1. Let `promise` be a newly-created pending promise. 1. Call `this.[[doNextWrite]]({ type: "data", promise, data })`. diff --git a/reference-implementation/lib/base-readable.js b/reference-implementation/lib/base-readable.js index b0e3c4545..6d616e3bf 100644 --- a/reference-implementation/lib/base-readable.js +++ b/reference-implementation/lib/base-readable.js @@ -43,6 +43,8 @@ function BaseReadableStream(callbacks) { this._onPull = callbacks.pull; this._onCancel = callbacks.cancel; + this._pipeDest = undefined; + this._pipeClose = undefined; this._storedError = undefined; this._waitPromise = new Promise(function (resolve, reject) { @@ -79,33 +81,83 @@ function BaseReadableStream(callbacks) { this._startedPromise.catch(function error(e) { stream._error(e); }); } -BaseReadableStream.prototype._push = function _push(data) { - if (this._state === 'waiting') { - this._buffer.push(data); - this._pulling = false; - this._state = 'readable'; - this[WAIT_RESOLVE](undefined); +BaseReadableStream.prototype._fillPipeDest = function _fillPipeDest() { + assert(this._pipeDest !== undefined); + assert(this._pipeClose !== undefined); - return true; + // Move available data into a receptive destination. + while (this._pipeDest.state === 'writable' && this._buffer.length > 0) { + var data = this._buffer.shift(); + this._pipeDest.write(data).catch(this.cancel.bind(this)); } - else if (this._state === 'readable') { - this._buffer.push(data); - this._pulling = false; - return true; + if (this._buffer.length === 0) { + if (this._draining === true) { + // If moving that data drained the entire stream, then we're done here. + this._state = 'closed'; + this[CLOSED_RESOLVE](undefined); // this will also resolve this._waitPromise. + + if (this._pipeClose) { + this._pipeDest.close(); + } + + this._pipeDest = undefined; + } else { + if (this._pulling) { + process.nextTick(this._callPull.bind(this)); + } else { + this._callPull(); + } + } + } +}; + +BaseReadableStream.prototype._push = function _push(data) { + if (this._state === 'closed' || this._state === 'errored') { + return false; + } + + this._buffer.push(data); + + if (this._pipeDest !== undefined) { + if (this._pipeDest.state === 'writable') { + this._fillPipeDest(); + + return this._pipeDest.state === 'writable'; + } else if (this._pipeDest.state === 'waiting') { + this._pipeDest.wait().then(this._fillPipeDest.bind(this), this.cancel.bind(this)); + } else { + // Destination has either been closed by someone else, or has errored in the course of someone else writing. + // Either way, we're not going to be able to do anything else useful. + this.cancel(); + } + + return false; + } else { + if (this._state === 'waiting') { + this._state = 'readable'; + this[WAIT_RESOLVE](undefined); + + return true; + } } return false; }; BaseReadableStream.prototype._close = function _close() { - if (this._state === 'waiting') { - this._state = 'closed'; - this[WAIT_RESOLVE](undefined); - this[CLOSED_RESOLVE](undefined); - } - else if (this._state === 'readable') { + if (this._pipeDest !== undefined) { this._draining = true; + this._fillPipeDest(); + } else { + if (this._state === 'waiting') { + this._state = 'closed'; + this[WAIT_RESOLVE](undefined); + this[CLOSED_RESOLVE](undefined); + } + else if (this._state === 'readable') { + this._draining = true; + } } }; @@ -135,7 +187,7 @@ BaseReadableStream.prototype._error = function _error(error) { BaseReadableStream.prototype._callPull = function _callPull() { var stream = this; - if (this._pulling === true) return; + if (this._pulling === true || this._draining === true) return; this._pulling = true; if (this._started === false) { @@ -149,6 +201,7 @@ BaseReadableStream.prototype._callPull = function _callPull() { } catch (pullResultE) { this._error(pullResultE); } + this._pulling = false; }.bind(this)); } else { try { @@ -160,6 +213,7 @@ BaseReadableStream.prototype._callPull = function _callPull() { } catch (pullResultE) { this._error(pullResultE); } + this._pulling = false; } }; @@ -181,13 +235,16 @@ BaseReadableStream.prototype.read = function read() { if (this._state === 'errored') { throw this._storedError; } + if (this._pipeDest !== undefined) { + throw new TypeError('Cannot read directly from a stream that is being piped!'); + } assert(this._state === 'readable', 'stream state ' + this._state + ' is invalid'); assert(this._buffer.length > 0, 'there must be data available to read'); var data = this._buffer.shift(); - if (this._buffer.length < 1) { + if (this._buffer.length === 0) { assert(this._draining === true || this._draining === false, 'draining only has two possible states'); if (this._draining === true) { @@ -223,9 +280,11 @@ BaseReadableStream.prototype.cancel = function cancel() { this._waitPromise = Promise.resolve(undefined); } + // TODO: consolidate with other code like this? E.g. in pipeTo and read? this._buffer.length = 0; this._state = 'closed'; this[CLOSED_RESOLVE](undefined); + this._pipeDest = undefined; return promiseCall(this._onCancel); }; @@ -239,44 +298,17 @@ BaseReadableStream.prototype.pipeTo = function pipeTo(dest, options) { var stream = this; - function closeDest() { if (close) dest.close(); } - - // ISSUE: should this be preventable via an option or via `options.close`? - function abortDest(reason) { dest.abort(reason); } - function cancelSource(reason) { stream.cancel(reason); } - - function pumpSource() { - switch (stream.state) { - case 'readable': - dest.write(stream.read()).catch(cancelSource); - fillDest(); - break; - case 'waiting': - stream.wait().then(fillDest, abortDest); - break; - case 'closed': - closeDest(); - break; - default: - abortDest(); - } + if (this._pipeDest !== undefined) { + throw new TypeError('Cannot pipe to two streams at once. Consider a tee stream!'); } - function fillDest() { - switch (dest.state) { - case 'writable': - pumpSource(); - break; - case 'waiting': - dest.wait().then(fillDest, cancelSource); - break; - default: - cancelSource(); - } - } - fillDest(); + this._pipeDest = dest; + this._pipeClose = close; + this._state = 'waiting'; + this._waitPromise = this._closedPromise; // TODO: if we add unpipe this doesn't work. - return dest; + // Initial movement of any available data into a receptive destination. + this._fillPipeDest(); }; BaseReadableStream.prototype.pipeThrough = function pipeThrough(transform, options) { diff --git a/reference-implementation/lib/base-writable.js b/reference-implementation/lib/base-writable.js index 765eaf64a..896fc0dee 100644 --- a/reference-implementation/lib/base-writable.js +++ b/reference-implementation/lib/base-writable.js @@ -40,7 +40,7 @@ function BaseWritableStream(callbacks) { this._buffer = []; - this._state = 'waiting'; + this._state = 'writable'; this._onStart = callbacks.start; this._onWrite = callbacks.write; @@ -184,7 +184,10 @@ BaseWritableStream.prototype.write = function write(data) { switch (this._state) { case 'writable': - this._state = 'waiting'; + if (this._buffer.length > 0) { + this._state = 'waiting'; + } + this._writablePromise = new Promise(function (resolve, reject) { stream[WRITABLE_RESOLVE] = resolve; stream[WRITABLE_REJECT] = reject; diff --git a/reference-implementation/scratch.js b/reference-implementation/scratch.js new file mode 100644 index 000000000..b2891bb5a --- /dev/null +++ b/reference-implementation/scratch.js @@ -0,0 +1,57 @@ +'use strict'; + +require('./index.js'); +var Promise = require('es6-promise').Promise; + +// TODO: handle errors. +var pushToOutput; +var closeOutput; +var passThroughTransform = { + input: new BaseWritableStream({ + write: function (data, done, error) { + console.log('writing to the input side', data); + pushToOutput(data); + console.log('passThroughTransform.input.state', passThroughTransform.input.state); + done(); + }, + + close: function () { + closeOutput(); + } + }), + + output: new BaseReadableStream({ + start: function (push, close) { + pushToOutput = push; + closeOutput = close; + } + }) +}; + +var makeSequentialBRS = require('./test/lib/sequential-brs'); +var readableStreamToArray = require('./test/lib/readable-stream-to-array'); + +var rs = new BaseReadableStream({ + start: function (push, close) { +// console.log(push('hi')); + Promise.resolve().then(function () { + console.log('---'); +// console.log('rs.state', rs.state); + console.log(push('hey')); +// console.log('rs.state', rs.state); + console.log(push('what')); +// console.log('rs.state', rs.state); + console.log(push('whee')); +// console.log('rs.state', rs.state); + console.log('---'); + close(); + }); + }, +}); + +console.log('passThroughTransform.input.state', passThroughTransform.input.state); +var output = rs.pipeThrough(passThroughTransform); + +readableStreamToArray(output).then(function (chunks) { + console.log('chunks', chunks); +}); diff --git a/reference-implementation/test/base-readable-stream.js b/reference-implementation/test/base-readable-stream.js index 4a95843b7..7430afb95 100644 --- a/reference-implementation/test/base-readable-stream.js +++ b/reference-implementation/test/base-readable-stream.js @@ -467,3 +467,44 @@ test('BaseReadableStream cancellation puts the stream in a closed state (after w }); }, t.ifError.bind(t)); }); + +test('BaseReadableStream returns `true` for the first `push` call; `false` thereafter, if nobody reads', function (t) { + t.plan(5); + + var pushes = 0; + var stream = new BaseReadableStream({ + start : function (push) { + t.equal(push('hi'), true); + t.equal(push('hey'), false); + t.equal(push('whee'), false); + t.equal(push('yo'), false); + t.equal(push('sup'), false); + } + }); +}); + +test('BaseReadableStream continues returning `true` from `push` if the data is read out of it', function (t) { + t.plan(12); + + var stream = new BaseReadableStream({ + start : function (push) { + // Delay a bit so that the stream is successfully constructed and thus the `stream` variable references something. + setTimeout(function () { + t.equal(push('hi'), true); + t.equal(stream.state, 'readable'); + t.equal(stream.read(), 'hi'); + t.equal(stream.state, 'waiting'); + + t.equal(push('hey'), true); + t.equal(stream.state, 'readable'); + t.equal(stream.read(), 'hey'); + t.equal(stream.state, 'waiting'); + + t.equal(push('whee'), true); + t.equal(stream.state, 'readable'); + t.equal(stream.read(), 'whee'); + t.equal(stream.state, 'waiting'); + }, 0); + } + }); +}); diff --git a/reference-implementation/test/base-writable-stream.js b/reference-implementation/test/base-writable-stream.js index 3571a858f..085eda2fe 100644 --- a/reference-implementation/test/base-writable-stream.js +++ b/reference-implementation/test/base-writable-stream.js @@ -31,7 +31,7 @@ test('BaseWritableStream is correctly constructed', function (t) { t.equal(typeof basic.abort, 'function', 'has abort function'); t.equal(typeof basic.close, 'function', 'has close function'); - t.equal(basic.state, 'waiting', 'stream has default new state'); + t.equal(basic.state, 'writable', 'stream has default new state'); t.ok(basic.closed, 'has closed promise'); t.ok(basic.closed.then, 'has closed promise that is thenable'); @@ -69,7 +69,7 @@ test('BaseWritableStream with simple input', function (t) { }); }); -test('closing a stream which acknowledges all writes immediately', function (t) { +test('BaseWritableStream: closing a stream which acknowledges all writes immediately', function (t) { var storage; var basic = new BaseWritableStream({ start : function start() { storage = []; }, @@ -80,16 +80,32 @@ test('closing a stream which acknowledges all writes immediately', function (t) } }); - setTimeout(function () { - // Give it time to finish starting. - - var input = [1, 2, 3, 4, 5]; - writeArrayToStream(input, basic).then(function () { - t.deepEqual(storage, input, 'got back what was passed in'); - t.end(); - }, function (error) { - t.fail(error); - t.end(); - }); + var input = [1, 2, 3, 4, 5]; + writeArrayToStream(input, basic).then(function () { + t.deepEqual(storage, input, 'got back what was passed in'); + t.end(); + }, function (error) { + t.fail(error); + t.end(); + }); +}); + +test('BaseWritableStream: stays writable indefinitely if writes are all acknowledged synchronously', function (t) { + t.plan(10); + + var ws = new BaseWritableStream({ + write : function (data, done) { + t.equal(this.state, 'writable', 'state is writable before writing ' + data); + done(); + t.equal(this.state, 'writable', 'state is writable after writing ' + data); + } + }); + + var input = [1, 2, 3, 4, 5]; + writeArrayToStream(input, ws).then(function () { + t.end(); + }, function (error) { + t.fail(error); + t.end(); }); }); diff --git a/reference-implementation/test/piping.js b/reference-implementation/test/piping.js index 8612fe4a2..fedfe20d3 100644 --- a/reference-implementation/test/piping.js +++ b/reference-implementation/test/piping.js @@ -43,3 +43,46 @@ test('Piping through a pass-through transform stream works', function (t) { t.deepEqual(chunks, [1, 2, 3, 4, 5]); }); }); + +test('Piping through a synchronous pass-through transform stream never causes backpressure: sync push', function (t) { + t.plan(5); + + var rs = new BaseReadableStream({ + start : function (push, close) { + Promise.resolve().then(function () { + // Using promises for a portable nextTick, so that this code runs after the pipe chain is established. + t.equal(push(1), true); + t.equal(push(2), true); + t.equal(push(3), true); + t.equal(push(4), true); + close(); + }); + } + }); + + var output = rs.pipeThrough(passThroughTransform()); + + readableStreamToArray(output).then(function (chunks) { + t.deepEqual(chunks, [1, 2, 3, 4]); + }); +}); + +test('Piping through a synchronous pass-through transform stream never causes backpressure: sync pull', function (t) { + t.plan(5); + + var counter = 0; + var rs = new BaseReadableStream({ + pull : function (push, close) { + t.equal(push(++counter), true); + if (counter === 4) { + close(); + } + } + }); + + var output = rs.pipeThrough(passThroughTransform()); + + readableStreamToArray(output).then(function (chunks) { + t.deepEqual(chunks, [1, 2, 3, 4]); + }); +});