Skip to content

Commit

Permalink
Brush up TransformStream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tyoshino committed Jun 1, 2016
1 parent cfcb5f4 commit ae2e3f0
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 58 deletions.
338 changes: 281 additions & 57 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,297 @@
const { ReadableStream } = require('./readable-stream.js');
const { WritableStream } = require('./writable-stream.js');

module.exports = class TransformStream {
constructor(transformer) {
if (transformer.flush === undefined) {
transformer.flush = (enqueue, close) => close();
// Functions passed to the transformer.start().

function TransformStreamCloseReadable(stream) {
if (stream._errored === true) {
throw new TypeError('TransformStream is already errored');
}

if (stream._readableClosed === true) {
throw new TypeError('Readable side is already closed');
}

try {
stream._readableController.close();
} catch (e) {
assert(false);
}

stream._readableClosed = true;
}

function TransformStreamEnqueueToReadable(stream, chunk) {
if (stream._errroed === true) {
throw new TypeError('TransformStream is already errored');
}

if (stream._readableClosed === true) {
throw new TypeError('Readable side is already closed');
}

// We throttle transformer.transform invoation based on the backpressure of the ReadableStream, but we still
// accept TrasnformStreamEnqueueToReadable() calls.

const controller = stream._readableController;

stream._readableBackpressure = true;

try {
controller.enqueue(chunk);
} catch (e) {
// This happens when the given strategy is bad.
const reason = new TypeError('Failed to enqueue to readable side');
TransforStreamError(stream, reason);
throw reason;
}

let backpressure;
try {
backpressure = controller.desiredSize() <= 0;
} catch (e) {
const reason = new TypeError('Failed to calculate backpressure of readable side');
TransformStreamError(stream, reason);
throw reason;
}

// enqueue() may invoke pull() synchronously when we're not in pull() call.
// In such case, _readableBackpressure may be already set to false.
if (backpressure) {
stream._readableBackpressure = false;
}
}

function TransformStreamError(stream, e) {
if (stream._errored === true) {
throw new TypeError('TransformStream is already errored');
}

TransformStreamErrorInternal(stream, e);
}

// Functions passed to transformer.transform().

function TransformStreamChunkDone(stream) {
if (stream._errroed === true) {
throw new TypeError('TransformStream is already errored');
}

if (stream._transforming !== true) {
throw new TypeError('No active transform is running');
}

assert(stream._resolveWrite !== undefined);

stream._transforming = false;

stream._resolveWrite(undefined);
stream._resolveWriter = undefined;
}

// Abstract operations.

function TransformStreamErrorInternal(stream, e) {
stream._errored = true;

if (stream._writableDone === false) {
stream._writableController.error(e);
}
if (stream._readableClosed === false) {
stream._readableController.error(e);
}

stream._chunk = undefined;

if (stream._resolveWriter !== undefined) {
stream._resolveWriter(undefined);
}
}

function TransformStreamTransformIfNeeded(stream) {
if (stream._transforming === true) {
return;
}

if (stream._chunkPending === false) {
return;
}

if (stream._readableBackpressure === true) {
return;
}

assert(stream._resolveWrite !== undefined);

stream._transforming = true;

const chunk = stream._chunk;
stream._chunkPending = false;
stream._chunk = undefined;

try {
stream._transformer.transform(chunk, TransformStreamChunkDone.bind(stream));
} catch (e) {
if (stream._errored === false) {
TransformStreamErrorInternal(stream, e);
}
}
}

if (typeof transformer.transform !== 'function') {
throw new TypeError('transform must be a function');
function TransformStreamStart(stream) {
const enqueueFunction = TransformStreamEnqueueToReadable.bind(stream);
const closeFunction = TransformStreamCloseReadable.bind(stream);

const errorFunction = TransformStreamError.bind(stream);

// Thrown exception will be handled by the constructor of TransformStream.
stream._transformer.start(enqueueFunction, closeFunction, errorFunction);
}

class TransformStreamSink {
constructor(transformStream) {
this._transformStream = transformStream;
}

start(c) {
const stream = this._transformStream;

stream._writableController = c;

if (stream._readableController !== undefined) {
TransformStreamStart(stream);
}
}

let writeChunk, writeDone, errorWritable;
let transforming = false;
let chunkWrittenButNotYetTransformed = false;
this.writable = new WritableStream({
start(c) {
errorWritable = c.error.bind(c);
},
write(chunk) {
writeChunk = chunk;
chunkWrittenButNotYetTransformed = true;

const p = new Promise(resolve => writeDone = resolve);
maybeDoTransform();
return p;
},
close() {
try {
transformer.flush(enqueueInReadable, closeReadable);
} catch (e) {
errorWritable(e);
errorReadable(e);
}
write(chunk) {
const stream = this._transformStream;

assert(stream._errored === false);

assert(stream._chunkPending === false);
assert(stream._chunk === undefined);
assert(stream._resolveWrite === undefined);

stream._chunkPending = true;
stream._chunk = chunk;
const promise = new Promise(resolve => {
stream._resolveWrite = resolve;
});

TransformStreamTransformIfNeeded(stream);

return promise;
}

abort(reason) {
const stream = this._transformStream;
stream._writableDone = true;
TransformStreamErrorInternal(stream, new TypeError('Writable side aborted'));
}

close() {
const stream = this._transformStream;

assert(stream._chunkPending === false);
assert(stream._chunk === undefined);
assert(stream._resolveWrite === undefined);

assert(stream._transforming === false);

const rc = stream._readableController;

// No control over the promise returned by writableStream.close(). Need it?

const flush = stream._transformer.flush;

if (flush === undefined) {
try {
TransformStreamCloseReadable(stream);
} catch (e) {
TransformStreamError(new TypeError('TransformStream is broken'));
}
}, transformer.writableStrategy);

let enqueueInReadable, closeReadable, errorReadable;
this.readable = new ReadableStream({
start(c) {
enqueueInReadable = c.enqueue.bind(c);
closeReadable = c.close.bind(c);
errorReadable = c.error.bind(c);
},
pull() {
if (chunkWrittenButNotYetTransformed === true) {
maybeDoTransform();
} else {
try {
flush();
} catch (e) {
if (stream._errored === false) {
TransformStreamError(stream, e);
}
}
}, transformer.readableStrategy);

function maybeDoTransform() {
if (transforming === false) {
transforming = true;
try {
transformer.transform(writeChunk, enqueueInReadable, transformDone);
writeChunk = undefined;
chunkWrittenButNotYetTransformed = false;
} catch (e) {
transforming = false;
errorWritable(e);
errorReadable(e);
}
}
}
}

class TransformStreamSource {
constructor(transformStream) {
this._transformStream = transformStream;
}

start(c) {
const stream = this._transformStream;

stream._readableController = c;

if (stream._writableController !== undefined) {
TransformStreamStart(stream);
}
}

pull() {
this._transformStream._outputBackpressure = false;
TransformStreamTransformIfNeeded(this._transformStream);
}

cancel(reason) {
const stream = this._transformStream;
stream._readableClosed = true;
TransformStreamErrorInternal(stream, new TypeError('Readable side canceled'));
}
}

module.exports = class TransformStream {
constructor(transformer) {
if (transformer.flush !== undefined && typeof transformer.flush !== 'function') {
throw new TypeError('flush must be a function or undefined');
}
if (typeof transformer.transform !== 'function') {
throw new TypeError('transform must be a function');
}

this._transformer = transformer;

this._transforming = false;
this._errored = false;

this._writableController = undefined;
this._readableController = undefined;

this._resolveWrite = undefined;

this._chunkPending = false;
this._chunk = undefined;

const sink = new TransformStreamSink(this);

try {
this.writable = new WritableStream(sink, transformer.writableStrategy);
} catch (e) {
if (this._errored === false) {
TransformStreamError(this, e);
throw e;
}
return;
}

function transformDone() {
transforming = false;
writeDone();
const source = new TransformStreamSource(this);

try {
this.readable = new ReadableStream(source, transformer.readableStrategy);
} catch (e) {
if (this._errored === false) {
TransformStreamError(this, e);
throw e;
}
}
}
};
2 changes: 1 addition & 1 deletion reference-implementation/run-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ global.CountQueuingStrategy = CountQueuingStrategy;
global.TransformStream = TransformStream;

if (process.argv.length === 2) {
const tests = glob.sync(path.resolve(__dirname, 'test/pipe-to.js'));
const tests = glob.sync(path.resolve(__dirname, 'test/transform*.js'));
tests.forEach(require);
} else {
glob.sync(path.resolve(process.argv[2])).forEach(require);
Expand Down

0 comments on commit ae2e3f0

Please sign in to comment.