Skip to content

Commit

Permalink
Make TransformStream transformer.flush() like sink.close()
Browse files Browse the repository at this point in the history
Return a promise with the same semantics as WritableStream's
underlyingSink.close(). This also resolves an old comment,
as well as issue #518.

Consequently, calling controller.close() after flush() is
called now throws a TypeError.
  • Loading branch information
isonmad committed Sep 18, 2016
1 parent fa4a4da commit da6fba2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
25 changes: 11 additions & 14 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ function TransformStreamCloseReadable(transformStream) {
throw new TypeError('Readable side is already closed');
}

if (transformStream._writableDone === true) {
throw new TypeError('TransformStream is already closing');
}

TransformStreamCloseReadableInternal(transformStream);
}

Expand Down Expand Up @@ -224,22 +228,15 @@ class TransformStreamSink {

assert(transformStream._transforming === false);

// No control over the promise returned by writableStreamWriter.close(). Need it?

transformStream._writableDone = true;

if (transformStream._transformer.flush === undefined) {
TransformStreamCloseReadableInternal(transformStream);
} else {
try {
transformStream._transformer.flush();
} catch (e) {
if (transformStream._errored === false) {
TransformStreamErrorInternal(transformStream, e);
throw e;
}
}
}
const flushPromise = PromiseInvokeOrNoop(transformStream._transformer, 'flush');
// Return a promise that is fulfilled with undefined on success.
return flushPromise.then(() => TransformStreamCloseReadableInternal(transformStream),
r => {
TransformStreamErrorInternal(transformStream, r);
return Promise.reject(r);
});
}
}

Expand Down
9 changes: 5 additions & 4 deletions reference-implementation/test/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ test('TransformStream flush is called after all queued writes finish, once the w
},
flush() {
flushCalled = true;
return new Promise(); // never resolves
}
});

Expand All @@ -302,7 +303,7 @@ test('TransformStream flush is called after all queued writes finish, once the w

setTimeout(() => {
t.ok(flushCalled, 'flush is eventually called');
t.equal(rsClosed, false, 'if flush does not call close, the readable does not become closed');
t.equal(rsClosed, false, 'if flushPromise does not resolve, the readable does not become closed');
t.end();
}, 50);
});
Expand Down Expand Up @@ -351,7 +352,7 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and
flush() {
c.enqueue('x');
c.enqueue('y');
setTimeout(() => c.close(), 10);
return new Promise(resolve => setTimeout(resolve, 10));
}
});

Expand All @@ -377,7 +378,7 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and
});

test('Transform stream should call transformer methods as methods', t => {
t.plan(1);
t.plan(2);

let c;
const ts = new TransformStream({
Expand All @@ -393,7 +394,7 @@ test('Transform stream should call transformer methods as methods', t => {

flush() {
c.enqueue('flushed' + this.suffix);
c.close();
t.throws(() => c.close(), /TypeError/, 'A closing TransformStream cannot be closed again');
}
});

Expand Down

0 comments on commit da6fba2

Please sign in to comment.