From 0de787a6eba39dc07326131c8ef4f8689efc5d6f Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Fri, 13 Mar 2015 18:00:01 +0900 Subject: [PATCH] Update pipeTo to allow unpiping See discussion in #297. pipeTo now returns a { finished, unpipe() } object, instead of just a promise. --- index.bs | 8 +-- .../lib/readable-stream.js | 49 +++++++++++--- reference-implementation/test/brand-checks.js | 4 +- reference-implementation/test/pipe-through.js | 4 +- .../test/pipe-to-options.js | 6 +- reference-implementation/test/pipe-to.js | 64 ++++++++++--------- .../test/templated/readable-stream-closed.js | 4 +- .../readable-stream-errored-async-only.js | 12 ++-- .../test/templated/readable-stream-errored.js | 23 ++++++- .../readable-stream-two-chunks-closed.js | 47 +++++++++++++- 10 files changed, 157 insertions(+), 64 deletions(-) diff --git a/index.bs b/index.bs index d9db80921..542556f4e 100644 --- a/index.bs +++ b/index.bs @@ -192,7 +192,7 @@ longer active. At this point another reader can be acquired at will. the chain:

-    readableStream.pipeTo(writableStream)
+    readableStream.pipeTo(writableStream).finished
       .then(() => console.log("All data successfully written!"))
       .catch(e => console.error("Something went wrong!", e));
   
@@ -924,7 +924,7 @@ a variable stream, that performs the following steps: chance to slow down its data production.

-    readableStream.pipeTo(writableStream)
+    readableStream.pipeTo(writableStream).finished
       .then(() => console.log("All data successfully written!"))
       .catch(e => console.error("Something went wrong!", e));
   
@@ -1839,7 +1839,7 @@ writable stream:

   const webSocketStream = makeReadableWebSocketStream("wss://example.com", 443);
 
-  webSocketStream.pipeTo(writableStream)
+  webSocketStream.pipeTo(writableStream).finished
     .then(() => console.log("All data successfully written!"))
     .catch(e => console.error("Something went wrong!", e));
 
@@ -1972,7 +1972,7 @@ We can then use this function to create writable streams for a web socket, and p

   const webSocketStream = makeWritableWebSocketStream("wss://example.com", 443);
 
-  readableStream.pipeTo(webSocketStream)
+  readableStream.pipeTo(webSocketStream).finished
     .then(() => console.log("All data successfully written!"))
     .catch(e => console.error("Something went wrong!", e));
 
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 64e740bd4..0ba2e5a3c 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -72,12 +72,13 @@ export default class ReadableStream { let reader; let lastRead; let closedPurposefully = false; - let resolvePipeToPromise; - let rejectPipeToPromise; + let unpiped = false; + let resolveFinishedPromise; + let rejectFinishedPromise; - return new Promise((resolve, reject) => { - resolvePipeToPromise = resolve; - rejectPipeToPromise = reject; + const finishedPromise = new Promise((resolve, reject) => { + resolveFinishedPromise = resolve; + rejectFinishedPromise = reject; reader = source.getReader(); @@ -94,9 +95,25 @@ export default class ReadableStream { doPipe(); }); + return { finished: finishedPromise, unpipe }; + + function unpipe() { + unpiped = true; + return lastRead.then(finishUnpipe, finishUnpipe); + + function finishUnpipe() { + reader.releaseLock(); + resolveFinishedPromise(undefined); + } + } + function doPipe() { lastRead = reader.read(); Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => { + if (unpiped === true) { + return; + } + if (Boolean(done) === true) { closeDest(); } else { @@ -110,22 +127,30 @@ export default class ReadableStream { } function cancelSource(reason) { + if (unpiped === true) { + return; + } + if (preventCancel === false) { reader.cancel(reason); reader.releaseLock(); - rejectPipeToPromise(reason); + rejectFinishedPromise(reason); } else { // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. // We don't need to handle lastRead failing because that will trigger abortDest which takes care of // both of these. lastRead.then(() => { reader.releaseLock(); - rejectPipeToPromise(reason); + rejectFinishedPromise(reason); }); } } function closeDest() { + if (unpiped === true) { + return; + } + // Does not need to wait for lastRead since it occurs only on source closed. reader.releaseLock(); @@ -133,13 +158,17 @@ export default class ReadableStream { const destState = dest.state; if (preventClose === false && (destState === 'waiting' || destState === 'writable')) { closedPurposefully = true; - dest.close().then(resolvePipeToPromise, rejectPipeToPromise); + dest.close().then(resolveFinishedPromise, rejectFinishedPromise); } else { - resolvePipeToPromise(); + resolveFinishedPromise(); } } function abortDest(reason) { + if (unpiped === true) { + return; + } + // Does not need to wait for lastRead since it only occurs on source errored. reader.releaseLock(); @@ -147,7 +176,7 @@ export default class ReadableStream { if (preventAbort === false) { dest.abort(reason); } - rejectPipeToPromise(reason); + rejectFinishedPromise(reason); } } diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js index f94ab50c6..305995f9e 100644 --- a/reference-implementation/test/brand-checks.js +++ b/reference-implementation/test/brand-checks.js @@ -15,7 +15,9 @@ function fakeReadableStream() { get closed() { return Promise.resolve(); }, cancel(reason) { return Promise.resolve(); }, pipeThrough({ writable, readable }, options) { return readable; }, - pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); }, + pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { + return { finished: Promise.resolve(), unpipe() { } }; + }, getReader() { return new ReadableStream(new ReadableStream()); } }; } diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js index 079052e86..14597ae9b 100644 --- a/reference-implementation/test/pipe-through.js +++ b/reference-implementation/test/pipe-through.js @@ -33,7 +33,7 @@ test('Piping through an identity transform stream will close the destination whe const ws = new WritableStream(); - rs.pipeThrough(ts).pipeTo(ws).then(() => { + rs.pipeThrough(ts).pipeTo(ws).finished.then(() => { t.equal(ws.state, 'closed', 'the writable stream was closed'); }) .catch(e => t.error(e)); @@ -82,7 +82,7 @@ test.skip('Piping through a default transform stream causes backpressure to be e }); setTimeout(() => { - rs.pipeThrough(ts).pipeTo(ws).then(() => { + rs.pipeThrough(ts).pipeTo(ws).finished.then(() => { t.deepEqual( enqueueReturnValues, [true, true, true, true, false, false, false, false], diff --git a/reference-implementation/test/pipe-to-options.js b/reference-implementation/test/pipe-to-options.js index dc50e7635..b08cc5747 100644 --- a/reference-implementation/test/pipe-to-options.js +++ b/reference-implementation/test/pipe-to-options.js @@ -80,11 +80,11 @@ test('Piping with { preventCancel: true } and a destination error', t => { } }); - rs.pipeTo(ws, { preventCancel: true }).catch(e => { - t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error'); + rs.pipeTo(ws, { preventCancel: true }).finished.catch(e => { + t.equal(e, theError, 'pipeTo finished promise should reject with the sink error'); let reader; - t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes'); + t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo finishes'); // { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost. diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 5b7427e10..11a6840da 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -30,7 +30,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron }); let pipeFinished = false; - rs.pipeTo(ws).then( + rs.pipeTo(ws).finished.then( () => { pipeFinished = true; t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); @@ -75,11 +75,11 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos rsClosed = true; }); - rs.pipeTo(ws).then( - () => t.fail('promise returned by pipeTo should not fulfill'), + rs.pipeTo(ws).finished.then( + () => t.fail('pipeTo finished promise should not fulfill'), r => { t.equal(r, cancelReason, - 'the pipeTo promise should reject with the same error as the underlying source cancel was called with'); + 'pipeTo finished promise should reject with the same error as the underlying source cancel was called with'); t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes'); } ); @@ -135,8 +135,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro ws.ready.then(() => { t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); - rs.pipeTo(ws).catch(e => { - t.equal(e, passedError, 'pipeTo promise should be rejected with the error'); + rs.pipeTo(ws).finished.catch(e => { + t.equal(e, passedError, 'pipeTo finished promise should be rejected with the error'); t.assert(cancelCalled, 'cancel should have been called'); t.end(); }); @@ -187,11 +187,11 @@ test('Piping from a ReadableStream in the readable state which becomes closed af }); startPromise.then(() => { - rs.pipeTo(ws).then(() => { - t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes'); + rs.pipeTo(ws).finished.then(() => { + t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo finishes'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call'); closeReadableStream(); }); @@ -235,12 +235,12 @@ test('Piping from a ReadableStream in the readable state which becomes errored a }); startPromise.then(() => { - rs.pipeTo(ws).catch(e => { - t.equal(e, passedError, 'pipeTo should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes'); + rs.pipeTo(ws).finished.catch(e => { + t.equal(e, passedError, 'pipeTo finished promise should be rejected with the passed error'); + t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo finishes'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); + t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call'); errorReadableStream(passedError); }); @@ -276,7 +276,7 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c } }); - rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill')); + rs.pipeTo(ws).finished.then(() => t.fail('pipeTo finished promise should not fulfill')); t.equal(ws.state, 'writable', 'writable stream should start in writable state'); enqueue('Hello'); @@ -312,7 +312,9 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal } }); - rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error')); + rs.pipeTo(ws).finished.catch( + e => t.equal(e, passedError, 'pipeTo finished promise should reject with the passed error') + ); t.equal(ws.state, 'writable', 'writable stream should start out writable'); errorReadableStream(passedError); }); @@ -355,7 +357,9 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st startPromise.then(() => { t.equal(ws.state, 'writable', 'ws should start writable'); - rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error')); + rs.pipeTo(ws).finished.catch( + e => t.equal(e, theError, 'pipeTo finished promise should reject with the passed error') + ); t.equal(ws.state, 'writable', 'ws should be writable after pipe'); errorWritableStream(theError); @@ -788,9 +792,9 @@ test('Piping to a stream that has been aborted passes through the error as the c const passedReason = new Error('I don\'t like you.'); ws.abort(passedReason); - rs.pipeTo(ws).catch(e => { - t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); + rs.pipeTo(ws).finished.catch(e => { + t.equal(e, passedReason, 'pipeTo finished promise should reject with the cancellation reason'); + t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason'); t.end(); }); }); @@ -809,9 +813,9 @@ test('Piping to a stream and then aborting it passes through the error as the ca const pipeToPromise = rs.pipeTo(ws); ws.abort(passedReason); - pipeToPromise.catch(e => { - t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); + pipeToPromise.finished.catch(e => { + t.equal(e, passedReason, 'pipeTo finished promise should reject with the abortion reason'); + t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason'); t.end(); }); }); @@ -827,8 +831,8 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio const ws = new WritableStream(); ws.close(); - rs.pipeTo(ws).catch(e => { - t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); + rs.pipeTo(ws).finished.catch(e => { + t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError'); t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); }); @@ -844,11 +848,11 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation const ws = new WritableStream(); - const pipeToPromise = rs.pipeTo(ws); + const pipeToPromise = rs.pipeTo(ws).finished; ws.close(); pipeToPromise.catch(e => { - t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); + t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError'); t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); t.end(); }); @@ -913,10 +917,10 @@ test('Piping to a stream that errors on write should not pass through the error } }); - rs.pipeTo(ws).then( - () => t.fail('pipeTo should not fulfill'), + rs.pipeTo(ws).finished.then( + () => t.fail('pipeTo finished promise should not fulfill'), r => { - t.equal(r, passedError, 'pipeTo should reject with the same error as the write'); + t.equal(r, passedError, 'pipeTo finished promise should reject with the same error as the write'); t.equal(cancelCalled, false, 'cancel should not have been called'); t.end(); } @@ -986,7 +990,7 @@ test('Piping to a writable stream that does not consume the writes fast enough e }); startPromise.then(() => { - rs.pipeTo(ws).then(() => { + rs.pipeTo(ws).finished.then(() => { t.deepEqual(enqueueReturnValues, [true, true, true, false], 'backpressure was correctly exerted at the source'); t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written'); t.end(); diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js index 134b30613..f4b15e20f 100644 --- a/reference-implementation/test/templated/readable-stream-closed.js +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -60,8 +60,8 @@ export default (label, factory) => { startPromise.then(() => { t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - rs.pipeTo(ws).then(() => { - t.pass('pipeTo promise should be fulfilled'); + rs.pipeTo(ws).finished.then(() => { + t.pass('pipeTo finished promise should be fulfilled'); t.equal(ws.state, 'closed', 'writable stream should become closed'); }); }); diff --git a/reference-implementation/test/templated/readable-stream-errored-async-only.js b/reference-implementation/test/templated/readable-stream-errored-async-only.js index 8ec30721d..86907372c 100644 --- a/reference-implementation/test/templated/readable-stream-errored-async-only.js +++ b/reference-implementation/test/templated/readable-stream-errored-async-only.js @@ -15,9 +15,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws).catch(e => { + rs.pipeTo(ws).finished.catch(e => { t.equal(ws.state, 'errored', 'destination should be errored'); - t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); + t.equal(e, error, 'pipeTo finished promise should reject with the source error'); }); ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); @@ -33,9 +33,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws, { preventAbort: false }).catch(e => { + rs.pipeTo(ws, { preventAbort: false }).finished.catch(e => { t.equal(ws.state, 'errored', 'destination should be errored'); - t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); + t.equal(e, error, 'pipeTo finished promise should reject with the source error'); }); ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); @@ -51,9 +51,9 @@ export default (label, factory, error) => { } }); - rs.pipeTo(ws, { preventAbort: true }).catch(e => { + rs.pipeTo(ws, { preventAbort: true }).finished.catch(e => { t.equal(ws.state, 'writable', 'destination should remain writable'); - t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); + t.equal(e, error, 'pipeTo finished promise should reject with the source error'); }); }); }; diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js index 2866f9a3b..3aeb00382 100644 --- a/reference-implementation/test/templated/readable-stream-errored.js +++ b/reference-implementation/test/templated/readable-stream-errored.js @@ -39,16 +39,33 @@ export default (label, factory, error) => { startPromise.then(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws).then( - () => t.fail('pipeTo promise should not be fulfilled'), + rs.pipeTo(ws).finished.then( + () => t.fail('pipeTo finished promise should not be fulfilled'), e => { - t.equal(e, error, 'pipeTo promise should be rejected with the passed error'); + t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error'); t.equal(ws.state, 'errored', 'writable stream should become errored'); } ); }); }); + test('unpiping should be a no-op after the pipe fails', t => { + t.plan(2); + + const rs = factory(); + const ws = new WritableStream(); + const pipe = rs.pipeTo(ws); + + pipe.finished.catch(e => { + t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error'); + + return pipe.unpipe().then(v => { + t.equal(v, undefined, 'unpipe() should fulfill with undefined'); + }); + }) + .catch(e => t.error(e)); + }); + test('getReader() should return a reader that acts errored', t => { t.plan(2); const rs = factory(); diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js index 2793dfd13..907c8fa83 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js @@ -18,7 +18,7 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws).then(() => { + rs.pipeTo(ws).finished.then(() => { t.equal(ws.state, 'closed', 'destination should be closed'); t.deepEqual(chunksWritten, chunks); t.end(); @@ -38,7 +38,7 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws).then(() => { + rs.pipeTo(ws).finished.then(() => { t.equal(ws.state, 'closed', 'destination should be closed'); t.deepEqual(chunksWritten, chunks); t.end(); @@ -61,11 +61,52 @@ export default (label, factory, chunks) => { } }); - rs.pipeTo(ws, { preventClose: true }).then(() => { + rs.pipeTo(ws, { preventClose: true }).finished.then(() => { t.equal(ws.state, 'writable', 'destination should be writable'); t.deepEqual(chunksWritten, chunks); t.end(); }); }); + + test('piping and then immediately unpiping', t => { + t.plan(5); + const rs = factory(); + + const chunksWritten = []; + const ws = new WritableStream({ + close() { + t.fail('unexpected close call'); + }, + abort() { + t.fail('unexpected abort call'); + }, + write(chunk) { + chunksWritten.push(chunk); + } + }); + + const pipe = rs.pipeTo(ws); + + let unpipeFulfilled = false; + + pipe.unpipe().then(() => { + unpipeFulfilled = true; + + let reader; + t.doesNotThrow(() => { reader = rs.getReader(); }, + 'should be able to get a reader after unpipe promise fulfills'); + + reader.read().then(r => { + t.deepEqual(r, { value: chunks[1], done: false }, 'reading from the reader should give the second chunk'); + }); + }); + + pipe.finished.then(v => { + t.equal(v, undefined, 'pipeTo finished promise should fulfill with undefined'); + t.equal(unpipeFulfilled, false, 'pipeTo finished promise should fulfill before the unpipe promise'); + }); + + t.throws(() => rs.getReader(), TypeError, 'should not be able to get a reader immediately after unpipe call'); + }); };