Skip to content

Commit

Permalink
Do not allow abort to happen when a close is about to happen
Browse files Browse the repository at this point in the history
As of #619, writer.abort() does not disturb the stream while an underlying-sink write or close operation is ongoing. However, it was still possible to cause both underlying sink abort and close to happen, with code like `writer.close(); writer.abort()`. This was because of the asynchronous way in which the stream's state transitions from "closing" to actually having [[inClose]] set to true.

This addresses that problem by peeking at the queue when abort is called. If the state is [[closing]], and the queue contains no writes, we will not perform the underlying abort, and instead just let the underlying close happen. This counts as a success, so we immediately return a promise fulfilled with undefined.

Fixes #632.
  • Loading branch information
domenic committed Dec 19, 2016
1 parent fe04ee8 commit 33773df
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 55 deletions.
13 changes: 3 additions & 10 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2664,15 +2664,6 @@ WritableStream(<var>underlyingSink</var> = {}, { <var>size</var>, <var>highWater
behavior will be the same as a {{CountQueuingStrategy}} with a <a>high water mark</a> of 1.
</div>

<div class="note">
Due to the way writable streams asynchronously close, it is possible for both <code>close</code> and
<code>abort</code> to be called, in cases where the <a>producer</a> aborts the stream while it is in the
<code>"closing"</code> state. Notably, since a stream always spends at least one turn in the <code>"closing"</code>
state, code like <code>ws.close(); ws.abort(...);</code> will cause both to be called, even if the <code>close</code>
method itself has no asynchronous behavior. A well-designed <a>underlying sink</a> object should be able to deal with
this.
</div>

<emu-alg>
1. Set *this*.[[state]] to `"writable"`.
1. Set *this*.[[storedError]], *this*.[[writer]], and *this*.[[writableStreamController]] to *undefined*.
Expand Down Expand Up @@ -2765,12 +2756,14 @@ writable stream is <a>locked to a writer</a>.

<emu-alg>
1. Let _state_ be _stream_.[[state]].
1. Let _controller_ be _stream_.[[writableStreamController]].
1. If _state_ is `"closed"`, return <a>a promise resolved with</a> *undefined*.
1. If _state_ is `"errored"`, return <a>a promise rejected with</a> _stream_.[[storedError]].
1. Assert: _state_ is `"writable"` or `"closing"`.
1. If _state_ is `"closing"`, and either _controller_.[[queue]] is empty or PeekQueueValue(_controller_.[[queue]]) is
`"close"`, return <a>a promise resolved with</a> *undefined*.
1. Let _error_ be a new *TypeError* indicating that the stream has been aborted.
1. Perform ! WritableStreamError(_stream_, _error_).
1. Let _controller_ be _stream_.[[writableStreamController]].
1. Assert: _controller_ is not *undefined*.
1. If _controller_.[[writing]] is *true* or _controller_.[[inClose]] is *true*,
1. Set _stream_.[[pendingAbortRequest]] to <a>a new promise</a>.
Expand Down
7 changes: 6 additions & 1 deletion reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ function IsWritableStreamLocked(stream) {

function WritableStreamAbort(stream, reason) {
const state = stream._state;
const controller = stream._writableStreamController;

if (state === 'closed') {
return Promise.resolve(undefined);
}
Expand All @@ -120,11 +122,14 @@ function WritableStreamAbort(stream, reason) {

assert(state === 'writable' || state === 'closing');

if (state === 'closing' && (controller._queue.length === 0 || PeekQueueValue(controller._queue) === 'close')) {
return Promise.resolve(undefined);
}

const error = new TypeError('Aborted');

WritableStreamError(stream, error);

const controller = stream._writableStreamController;
assert(controller !== undefined);
if (controller._writing === true || controller._inClose === true) {
const promise = new Promise((resolve, reject) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,25 @@ promise_test(t => {
return writePromise;
}, 'Aborting a WritableStream causes any outstanding write() promises to be rejected with a TypeError');

promise_test(t => {
const ws = new WritableStream();
promise_test(() => {
const ws = recordingWritableStream();
const writer = ws.getWriter();

const closePromise = writer.close();
writer.abort(error1);
const abortPromise = writer.abort(error1);

return Promise.all([
promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'),
promise_rejects(t, new TypeError(), closePromise, 'close() should reject with a TypeError')
]);
}, 'Closing but then immediately aborting a WritableStream causes the stream to error');
writer.closed,
closePromise,
abortPromise
]).then(() => {
assert_array_equals(ws.events, ['close']);
});
}, 'Closing but then immediately aborting a WritableStream results in a successful close and a fulfilled abort()');

promise_test(t => {
promise_test(() => {
let resolveClose;
const ws = new WritableStream({
const ws = recordingWritableStream({
close() {
return new Promise(resolve => {
resolveClose = resolve;
Expand All @@ -202,14 +205,20 @@ promise_test(t => {
const closePromise = writer.close();

return delay(0).then(() => {
writer.abort(error1);
const abortPromise = writer.abort(error1);
resolveClose();

return Promise.all([
promise_rejects(t, new TypeError(), writer.closed, 'closed should reject with a TypeError'),
closePromise
]);
writer.closed,
closePromise,
abortPromise
])
.then(() => {
assert_array_equals(ws.events, ['close']);
});
});
}, 'Closing a WritableStream and aborting it while it closes causes the stream to error');
}, 'Closing a WritableStream and aborting it while it is in the process of closing results in a successful close and ' +
'fulfilled abort()');

promise_test(() => {
const ws = new WritableStream();
Expand Down Expand Up @@ -359,41 +368,35 @@ promise_test(t => {

promise_test(() => {
let resolveWrite;
let abort_called = false;
const ws = new WritableStream({
const ws = recordingWritableStream({
write() {
return new Promise(resolve => {
resolveWrite = resolve;
});
},
abort() {
abort_called = true;
}
});

const writer = ws.getWriter();
return writer.ready.then(() => {
writer.write('a');
const abortPromise = writer.abort();
const abortPromise = writer.abort('b');
return flushAsyncEvents().then(() => {
assert_false(abort_called, 'abort should not be called while write is pending');
assert_array_equals(ws.events, ['write', 'a'], 'abort should not be called while write is pending');
resolveWrite();
return abortPromise.then(() => assert_true(abort_called, 'abort should be called'));
return abortPromise.then(() => {
assert_array_equals(ws.events, ['write', 'a', 'abort', 'b'], 'abort should be called after the write finishes');
});
});
});
}, 'underlying abort() should not be called until underlying write() completes');

promise_test(() => {
let resolveClose;
let abort_called = false;
const ws = new WritableStream({
const ws = recordingWritableStream({
close() {
return new Promise(resolve => {
resolveClose = resolve;
});
},
abort() {
abort_called = true;
}
});

Expand All @@ -402,56 +405,59 @@ promise_test(() => {
writer.close();
const abortPromise = writer.abort();
return flushAsyncEvents().then(() => {
assert_false(abort_called, 'abort should not be called while close is pending');
assert_array_equals(ws.events, ['close'], 'abort should not be called while close is pending');
resolveClose();
return abortPromise.then(() => assert_false(abort_called, 'abort should not be called after close completes'));
return abortPromise.then(() => {
assert_array_equals(ws.events, ['close'], 'abort should not be called while close is pending');
});
});
});
}, 'underlying abort() should not be called if underlying close() has started');

promise_test(t => {
let resolveWrite;
let abort_called = false;
const ws = new WritableStream({
const ws = recordingWritableStream({
write() {
return new Promise(resolve => {
resolveWrite = resolve;
});
},
abort() {
abort_called = true;
}
});

const writer = ws.getWriter();
return writer.ready.then(() => {
writer.write('a');
const closePromise = writer.close();
const abortPromise = writer.abort();
const abortPromise = writer.abort('b');

return flushAsyncEvents().then(() => {
assert_false(abort_called, 'abort should not be called while write is pending');
assert_array_equals(ws.events, ['write', 'a'], 'abort should not be called while write is pending');
resolveWrite();
return abortPromise.then(() => {
assert_true(abort_called, 'abort should be called after write completes');
assert_array_equals(ws.events, ['write', 'a', 'abort', 'b'], 'abort should be called after write completes');
return promise_rejects(t, new TypeError(), closePromise, 'promise returned by close() should be rejected');
});
});
});
}, 'underlying abort() should be called while closing if underlying close() has not started yet');
}, 'an abort() that happens during a write() should trigger the underlying abort() even with a close() queued');

promise_test(() => {
const ws = new WritableStream();
const writer = ws.getWriter();
return writer.ready.then(() => {
const closePromise = writer.close();
const abortPromise = writer.abort();
let closeResolved = false;
Promise.all([
closePromise.then(() => { closeResolved = true; }),
abortPromise.then(() => { assert_true(closeResolved, 'close() promise should resolve before abort() promise'); })
]);
const ordering = [];

return Promise.all([
closePromise.then(() => { ordering.push('close'); }),
abortPromise.then(() => { ordering.push('abort'); })
])
.then(() => {
assert_array_equals(ordering, ['abort', 'close']);
});
});
}, 'writer close() promise should resolve before abort() promise');
}, 'writer abort() promise should fulfill before close() promise (since it bails early)');

promise_test(t => {
const ws = new WritableStream({
Expand Down

0 comments on commit 33773df

Please sign in to comment.