Skip to content

Commit

Permalink
TransformStream: return promise from transformer.start()
Browse files Browse the repository at this point in the history
Now all 3 transformer methods are promise-ified.
  • Loading branch information
isonmad committed Sep 21, 2016
1 parent ccb6dfe commit cad1cc1
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 10 deletions.
46 changes: 36 additions & 10 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { InvokeOrNoop, PromiseInvokeOrNoop } = require('./helpers.js');
const { PromiseInvokeOrNoop } = require('./helpers.js');
const { ReadableStream } = require('./readable-stream.js');
const { WritableStream } = require('./writable-stream.js');

Expand Down Expand Up @@ -167,25 +167,49 @@ function TransformStreamTransformIfNeeded(transformStream) {
}

function TransformStreamStart(transformStream) {
// Thrown exception will be handled by TransformStreamSink.start()
// method.
InvokeOrNoop(transformStream._transformer, 'start', [transformStream._controller]);
return PromiseInvokeOrNoop(transformStream._transformer, 'start', [transformStream._controller]);
}

function TransformStreamCreateStartPromise(transformStream) {
const sinkStart = new Promise(resolve => {
transformStream._sinkStart_resolve = resolve;
});

const sourceStart = new Promise(resolve => {
transformStream._sourceStart_resolve = resolve;
});

const bothStart = Promise.all([sinkStart, sourceStart]);

const transformerStart = bothStart.then(() => TransformStreamStart(transformStream));

transformerStart.catch(e => {
// The underlyingSink and underlyingSource will error the readable and writable ends on their own.
if (transformStream._errored === false) {
transformStream._errored = true;
transformStream._storedError = e;
}
});

return transformerStart;
}

function TransformStreamAddWritableController(transformStream, c) {
transformStream._writableController = c;

if (transformStream._readableController !== undefined) {
TransformStreamStart(transformStream);
}
transformStream._sinkStart_resolve();
transformStream._sinkStart_resolve = undefined;

return transformStream._startPromise;
}

function TransformStreamAddReadableController(transformStream, c) {
transformStream._readableController = c;

if (transformStream._writableController !== undefined) {
TransformStreamStart(transformStream);
}
transformStream._sourceStart_resolve();
transformStream._sourceStart_resolve = undefined;

return transformStream._startPromise;
}

class TransformStreamSink {
Expand Down Expand Up @@ -325,6 +349,8 @@ module.exports = class TransformStream {

this._controller = new TransformStreamDefaultController(this);

this._startPromise = TransformStreamCreateStartPromise(this);

const sink = new TransformStreamSink(this);

try {
Expand Down
92 changes: 92 additions & 0 deletions reference-implementation/test/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,95 @@ test('TransformStream cannot be used after becoming errored', t => {
}
});
});

test('TransformStream start, transform, and flush are strictly ordered', t => {
t.plan(4);
let startCalled = false;
let startDone = false;
let transformDone = false;
let flushDone = false;
const ts = new TransformStream({
start() {
startCalled = true;
return new Promise(resolve => setTimeout(resolve, 90))
.then(() => { startDone = true; });
},
transform() {
t.ok(startDone, 'startPromise must resolve before transform is called');
return new Promise(resolve => setTimeout(resolve, 30))
.then(() => { transformDone = true; });
},
flush() {
t.ok(transformDone, 'pending transform promise must resolve before flush is called');
return new Promise(resolve => setTimeout(resolve, 50))
.then(() => { flushDone = true; });
}
});

t.notOk(startCalled, 'start is not called synchronously');

This comment has been minimized.

Copy link
@ricea

ricea Sep 21, 2016

Collaborator

It's not obvious to me why this is not okay. Would it be possible to change the test description to describe the correctness invariant that is being enforced?


const writer = ts.writable.getWriter();
writer.write('a');
writer.close().then(() => {
t.ok(flushDone, 'flushPromise resolved');
t.end();
})
.catch(e => t.error(e));
});

test('TransformStream transformer.start() rejected promise errors the stream', t => {
t.plan(2);
const thrownError = new Error('start failure');
const ts = new TransformStream({
start() {
return new Promise(resolve => setTimeout(resolve, 90))
.then(() => { throw thrownError; });
},
transform() {
t.fail('transform must never be called if start() fails');
},
flush() {
t.fail('flush must never be called if start() fails');
}
});

const writer = ts.writable.getWriter();
writer.write('a');
writer.close().then(() => {
t.fail('writer should be errored if start() fails');
})
.catch(e => t.equal(e, thrownError, 'writer rejects with same error'));

const reader = ts.readable.getReader();

reader.read().catch(e => t.equal(e, thrownError, 'reader rejects with same error'));
});

test('TransformStream both calling controller.error and rejecting a promise', t => {
t.plan(2);
const controllerError = new Error('start failure');
const ts = new TransformStream({
start(c) {
c.error(controllerError);
return new Promise(resolve => setTimeout(resolve, 90))
.then(() => { throw new Error('ignored error'); });
},
transform() {
t.fail('transform must never be called if start() fails');
},
flush() {
t.fail('flush must never be called if start() fails');
}
});

const writer = ts.writable.getWriter();
writer.write('a');
writer.close().then(() => {
t.fail('writer should be errored if start() fails');
})
.catch(e => t.equal(e, controllerError, 'writer rejects with same error'));

const reader = ts.readable.getReader();

reader.read().catch(e => t.equal(e, controllerError, 'reader rejects with same error'));
});

0 comments on commit cad1cc1

Please sign in to comment.