Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TransformStream API & misc. fixups #519

Merged
merged 29 commits into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
269a970
transform-stream: fix typo
Sep 16, 2016
9d9474f
add transformstream._storedError
Sep 19, 2016
0faf0e1
fix thrown error in transformstream enqueue
Sep 19, 2016
dbf1fd5
add TransformStreamErrorIfNeeded
Sep 19, 2016
376701c
Have TransformStreamSink.close always throw on error
Sep 19, 2016
feafe52
add TransformStreamDefaultController
Sep 16, 2016
7c1025a
test TransformStream error()
Sep 16, 2016
7b6313f
TransformStream: use InvokeOrNoop
Sep 16, 2016
a99e4cd
add TransformStreamResolveWrite
Sep 17, 2016
2fd0945
Return a Promise from TransformStream transform()
Sep 16, 2016
27bef29
add TransformStreamCloseReadableInternal
Sep 18, 2016
a715e9e
Make TransformStream transformer.flush() like sink.close()
Sep 17, 2016
ccb6dfe
add TransformStreamAdd{Readable,Writable}Controller
Sep 21, 2016
cad1cc1
TransformStream: return promise from transformer.start()
Sep 21, 2016
88c5a7d
TransformStream: call transformer.start synchronously
Sep 22, 2016
f18010d
remove weird conditional throw in TransformStream constructor
Sep 22, 2016
464df6c
test synchronous transformer.start failure
Sep 22, 2016
36944ed
TransformStream: account for close() and error() calls in flush
Sep 22, 2016
b73b21e
drop superfluous try-catch in TransformStream enqueue
Sep 23, 2016
2ca4d03
TransformStream: handle readableStrategy.size() errors
Sep 23, 2016
ccc0a1c
handle badly behaving readableStrategy.size
Sep 23, 2016
04d6fe0
fix variable name
Oct 4, 2016
db49655
drop t.end
Oct 4, 2016
c3902c6
assert controller objects are present
Oct 4, 2016
134f089
fix comment
Oct 4, 2016
40c6327
add _rejectWrite
Oct 10, 2016
6b2649e
fixup comments
Oct 10, 2016
7cc3787
move internal op next to other abstract ops
Oct 10, 2016
7c40ebc
make .readable and .writable readonly properties
Oct 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 118 additions & 107 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
'use strict';
const assert = require('assert');
const { InvokeOrNoop, PromiseInvokeOrNoop } = require('./helpers.js');
const { ReadableStream } = require('./readable-stream.js');
const { WritableStream } = require('./writable-stream.js');

// Functions passed to the transformer.start().
// Methods on the transform stream controller object

function TransformStreamCloseReadable(transformStream) {
// console.log('TransformStreamCloseReadable()');
Expand All @@ -16,25 +17,19 @@ function TransformStreamCloseReadable(transformStream) {
throw new TypeError('Readable side is already closed');
}

try {
transformStream._readableController.close();
} catch (e) {
assert(false);
}

transformStream._readableClosed = true;
TransformStreamCloseReadableInternal(transformStream);
}

function TransformStreamEnqueueToReadable(transformStream, chunk) {
if (transformStream._errroed === true) {
if (transformStream._errored === true) {
throw new TypeError('TransformStream is already errored');
}

if (transformStream._readableClosed === true) {
throw new TypeError('Readable side is already closed');
}

// We throttle transformer.transform invoation based on the backpressure of the ReadableStream, but we still
// We throttle transformer.transform invocation based on the backpressure of the ReadableStream, but we still
// accept TransformStreamEnqueueToReadable() calls.

const controller = transformStream._readableController;
Expand All @@ -44,25 +39,16 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) {
try {
controller.enqueue(chunk);
} catch (e) {
if (transformStream._error === false) {
// This happens when the given strategy is bad.
const reason = new TypeError('Failed to enqueue to readable side');
TransformStreamErrorInternal(transformStream, reason);
}
throw transformStream._error;
}
// This happens when readableStrategy.size() throws.
// The ReadableStream has already errored itself.
transformStream._readableClosed = true;
TransformStreamErrorIfNeeded(transformStream, e);

let backpressure;
try {
backpressure = controller.desiredSize <= 0;
} catch (e) {
if (transformStream._error === false) {
const reason = new TypeError('Failed to calculate backpressure of readable side');
TransformStreamError(transformStream, reason);
}
throw transformStream._error;
throw transformStream._storedError;
}

const backpressure = controller.desiredSize <= 0;

// enqueue() may invoke pull() synchronously when we're not in pull() call.
// In such case, _readableBackpressure may be already set to false.
if (backpressure) {
Expand All @@ -78,33 +64,53 @@ function TransformStreamError(transformStream, e) {
TransformStreamErrorInternal(transformStream, e);
}

// Functions passed to transformer.transform().
// Abstract operations.

function TransformStreamCloseReadableInternal(transformStream) {
assert(transformStream._errored === false);
assert(transformStream._readableClosed === false);

function TransformStreamChunkDone(transformStream) {
if (transformStream._errroed === true) {
throw new TypeError('TransformStream is already errored');
try {
transformStream._readableController.close();
} catch (e) {
assert(false);
}

if (transformStream._transforming === false) {
throw new TypeError('No active transform is running');
transformStream._readableClosed = true;
}

function TransformStreamResolveWrite(transformStream) {
if (transformStream._errored === true) {
return;
}

assert(transformStream._transforming === true);

assert(transformStream._resolveWrite !== undefined);
assert(transformStream._rejectWrite !== undefined);

transformStream._transforming = false;

transformStream._resolveWrite(undefined);
transformStream._resolveWrite = undefined;
transformStream._rejectWrite = undefined;

TransformStreamTransformIfNeeded(transformStream);
}

// Abstract operations.
function TransformStreamErrorIfNeeded(transformStream, e) {
if (transformStream._errored === false) {
TransformStreamErrorInternal(transformStream, e);
}
}

function TransformStreamErrorInternal(transformStream, e) {
// console.log('TransformStreamErrorInternal()');

assert(transformStream._errored === false);

transformStream._errored = true;
transformStream._storedError = e;

if (transformStream._writableDone === false) {
transformStream._writableController.error(e);
Expand All @@ -115,8 +121,8 @@ function TransformStreamErrorInternal(transformStream, e) {

transformStream._chunk = undefined;

if (transformStream._resolveWriter !== undefined) {
transformStream._resolveWriter(undefined);
if (transformStream._rejectWriter !== undefined) {
transformStream._rejectWriter(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/rejectWriter/rejectWrite/

anyway, I just noticed that this is unnecessary since we call _writableController.error() at L118. I'm going to just remove this in #533

}
}

Expand All @@ -143,48 +149,25 @@ function TransformStreamTransformIfNeeded(transformStream) {
transformStream._chunkPending = false;
transformStream._chunk = undefined;

try {
if (transformStream._transformer.transform !== undefined) {
transformStream._transformer.transform(
chunk,
TransformStreamChunkDone.bind(undefined, transformStream),
transformStream._enqueueFunction,
transformStream._closeFunction,
transformStream._errorFunction);
}
} catch (e) {
if (transformStream._errored === false) {
TransformStreamErrorInternal(transformStream, e);
}
}
}

function TransformStreamStart(transformStream) {
if (transformStream._transformer.start === undefined) {
return;
}
const transformPromise = PromiseInvokeOrNoop(transformStream._transformer,
'transform', [chunk, transformStream._controller]);

// Thrown exception will be handled by TransformStreamSink.start()
// method.
transformStream._transformer.start(
transformStream._enqueueFunction,
transformStream._closeFunction,
transformStream._errorFunction);
transformPromise.then(() => TransformStreamResolveWrite(transformStream),
e => TransformStreamErrorIfNeeded(transformStream, e));
}

class TransformStreamSink {
constructor(transformStream) {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;

transformStream._writableController = c;

if (transformStream._readableController !== undefined) {
TransformStreamStart(transformStream);
}
return this._startPromise;
}

write(chunk) {
Expand All @@ -198,12 +181,14 @@ class TransformStreamSink {
assert(transformStream._chunk === undefined);

assert(transformStream._resolveWrite === undefined);
assert(transformStream._rejectWrite === undefined);

transformStream._chunkPending = true;
transformStream._chunk = chunk;

const promise = new Promise(resolve => {
transformStream._resolveWrite = resolve;
transformStream._rejectWrite = resolve;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be set to the rejection callback of the promise

});

TransformStreamTransformIfNeeded(transformStream);
Expand All @@ -226,44 +211,41 @@ class TransformStreamSink {
assert(transformStream._chunk === undefined);

assert(transformStream._resolveWrite === undefined);
assert(transformStream._rejectWrite === undefined);

assert(transformStream._transforming === false);

// No control over the promise returned by writableStreamWriter.close(). Need it?

transformStream._writableDone = true;

if (transformStream._transformer.flush === undefined) {
TransformStreamCloseReadable(transformStream);
} else {
try {
transformStream._transformer.flush(
transformStream._enqueueFunction,
transformStream._closeFunction,
transformStream._errorFunction);
} catch (e) {
if (transformStream._errored === false) {
TransformStreamErrorInternal(transformStream, e);
throw e;
}
const flushPromise = PromiseInvokeOrNoop(transformStream._transformer, 'flush', [transformStream._controller]);
// Return a promise that is fulfilled with undefined on success.
return flushPromise.then(() => {
if (transformStream._errored === true) {
return Promise.reject(transformStream._storedError);
}
}
if (transformStream._readableClosed === false) {
TransformStreamCloseReadableInternal(transformStream);
}
return Promise.resolve();
}).catch(r => {
TransformStreamErrorIfNeeded(transformStream, r);
return Promise.reject(transformStream._storedError);
});
}
}

class TransformStreamSource {
constructor(transformStream) {
constructor(transformStream, startPromise) {
this._transformStream = transformStream;
this._startPromise = startPromise;
}

start(c) {
const transformStream = this._transformStream;

transformStream._readableController = c;

if (transformStream._writableController !== undefined) {
TransformStreamStart(transformStream);
}
return this._startPromise;
}

pull() {
Expand All @@ -278,6 +260,24 @@ class TransformStreamSource {
}
}

class TransformStreamDefaultController {
constructor(transformStream) {
this._controlledTransformStream = transformStream;
}

enqueue(chunk) {
TransformStreamEnqueueToReadable(this._controlledTransformStream, chunk);
}

close() {
TransformStreamCloseReadable(this._controlledTransformStream);
}

error(reason) {
TransformStreamError(this._controlledTransformStream, reason);
}
}

module.exports = class TransformStream {
constructor(transformer) {
if (transformer.start !== undefined && typeof transformer.start !== 'function') {
Expand All @@ -294,6 +294,7 @@ module.exports = class TransformStream {

this._transforming = false;
this._errored = false;
this._storedError = undefined;

this._writableController = undefined;
this._readableController = undefined;
Expand All @@ -302,36 +303,46 @@ module.exports = class TransformStream {
this._readableClosed = false;

this._resolveWrite = undefined;
this._rejectWrite = undefined;

this._chunkPending = false;
this._chunk = undefined;

this._enqueueFunction = TransformStreamEnqueueToReadable.bind(undefined, this);
this._closeFunction = TransformStreamCloseReadable.bind(undefined, this);
this._errorFunction = TransformStreamError.bind(undefined, this);
this._controller = new TransformStreamDefaultController(this);

const sink = new TransformStreamSink(this);
let startPromise_resolve;
const startPromise = new Promise(resolve => {
startPromise_resolve = resolve;
});

try {
this.writable = new WritableStream(sink, transformer.writableStrategy);
} catch (e) {
if (this._errored === false) {
TransformStreamError(this, e);
throw e;
}
return;
}
const sink = new TransformStreamSink(this, startPromise);

const source = new TransformStreamSource(this);
this._writable = new WritableStream(sink, transformer.writableStrategy);

try {
this.readable = new ReadableStream(source, transformer.readableStrategy);
} catch (e) {
this.writable = undefined;
if (this._errored === false) {
TransformStreamError(this, e);
throw e;
const source = new TransformStreamSource(this, startPromise);

this._readable = new ReadableStream(source, transformer.readableStrategy);

assert(this._writableController !== undefined);
assert(this._readableController !== undefined);

const transformStream = this;
const startResult = InvokeOrNoop(transformer, 'start', [transformStream._controller]);
startPromise_resolve(startResult);
startPromise.catch(e => {
// The underlyingSink and underlyingSource will error the readable and writable ends on their own.
if (transformStream._errored === false) {
transformStream._errored = true;
transformStream._storedError = e;
}
}
});
}

get readable() {
return this._readable;
}

get writable() {
return this._writable;
}
};
Loading