-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aborting a stream should wait for pending writes #619
Conversation
@@ -158,7 +162,6 @@ function WritableStreamError(stream, e) { | |||
} | |||
|
|||
stream._state = 'errored'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this line together with assigning storedError? It seems bad for them to get out of sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've found a satisfactory way to do it. PTAL.
assert(stream._writeRequests.length > 0); | ||
function WritableStreamRejectUnresolvedPromises(stream) { | ||
const state = stream._state; | ||
assert(state === 'writable' || state === 'closing' || state === 'errored'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion might be tightened if we move the state assignment as discussed above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// them from being erroneously rejected on error. If a write() call is pending, the request is stored here. | ||
this._pendingWriteRequest = undefined; | ||
|
||
this._pendingCloseRequest = undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could be consolidated with writer's closedPromise...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider
const promise1 = writer.close();
const promise2 = writer.closed;
writer.releaseLock(); // [1]
promise2
becomes rejected at [1], but promise1
may not resolve until later.
Except... https://github.com/whatwg/streams/blob/master/reference-implementation/lib/writable-stream.js#L169 thinks you shouldn't be able to do this. If that line is correct and the implementation of releaseLock()
is wrong, then maybe the close()
and .closed
promises can have a shared fate after all.
@@ -173,11 +176,26 @@ function WritableStreamFinishClose(stream) { | |||
defaultWriterClosedPromiseResolve(stream._writer); | |||
} | |||
|
|||
function WritableStreamFulfillWriteRequest(stream) { | |||
assert(stream._writeRequests.length > 0); | |||
function WritableStreamRejectUnresolvedPromises(stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there's a better name for this that indicates clearly that it will reject write requests + pending close requests + writer closed promise, but not pending write requests.
Also, can it maybe assert that there is no pending write request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics are a bit hard to explain. It's basically "reject everything except .ready and maybe one write()". I couldn't come up with a catchy name for it. But it will be easy to change the name if someone comes up with a better one.
I added the assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WritableStreamRejectPromisesInReactionToError? Not important, but maybe an improvement.
promise_rejects(t, new TypeError(), writer.closed, '.closed should reject') | ||
.then(() => { | ||
closedResolved = true; | ||
})]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: I think this would be cleaner with .then
on the preceding line and ]);
on its own line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return Promise.all([ | ||
promise_rejects(t, new Error(), writePromise, 'write() should reject') | ||
.then(() => assert_false(closedResolved, '.closed should not resolve before write()')), | ||
promise_rejects(t, new TypeError(), writer.closed, '.closed should reject') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I think the write promise should get the first crack at determining how the stream errors, even if abort() has been called. I guess this is related to #617 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this is done. It should get simpler if we make write() and close() uninterruptable as discussed in #617.
promise_test(t => { | ||
const ws = new WritableStream({ | ||
write() { | ||
return delay(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not flushAsyncEvents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the assumption that abort() is synchronous, delay(0) is sufficient. However, that assumption was bogus anyway, so I've switched to flushAsyncEvents().
}, new CountQueuingStrategy(4)); | ||
const writer = ws.getWriter(); | ||
return writer.ready.then(() => { | ||
const resolveOrder = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
settlementOrder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
promise_test(t => { | ||
const ws = new WritableStream({ | ||
write() { | ||
return Promise.reject(new Error()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error1
here and below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return Promise.all([ | ||
promise_rejects(t, new Error(), writer.write('1'), 'pending write should be rejected') | ||
.then(() => resolveOrder.push(1)), | ||
promise_rejects(t, new TypeError(), writer.write('2'), 'first queued write should be rejected') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As before it makes more sense for me that the write() error sets the state, not abort().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I think this is pretty much done, except that WritableStreamRejectUnresolvedPromises is still a terrible name for the abstract operation. There are probably some clarity improvements than can be made once we get some more eyeballs on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with nits. Thanks so much.
</tr> | ||
<tr> | ||
<td>\[[pendingCloseRequest]] | ||
<td>The promise returned from the writer close method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{{ReadableStreamDefaultWriter/close()}
instead of just close
probably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1. Set _stream_.[[pendingAbortRequest]] to <a>a new promise</a>. | ||
1. If _controller_.[[writing]] is *true*, return the result of transforming _stream_.[[pendingAbortRequest]] by a | ||
fulfillment handler that returns | ||
! WritableStreamDefaultControllerAbort(_stream_.[[writableStreamController]], _reason_). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
! on previous line (the idea is to make the wrapping machine-enforceable, at least in theory)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1. <a>Reject</a> _writeRequest_ with _e_. | ||
1. Set _stream_.[[writeRequests]] to an empty List. | ||
1. Let _oldState_ be _stream_.[[state]]. | ||
1. Assert: _oldState_ is `"writable"` or `"closing".` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backtick before period
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1. <a>Reject</a> _writeRequest_ with _storedError_. | ||
1. Set _stream_.[[writeRequests]] to an empty List. | ||
1. If _stream_.[[pendingCloseRequest]] is not *undefined*, | ||
1. Assert: _stream_.[[writableStreamController]].[[inClose]] is *false*, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
period not comma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
<tr> | ||
<td>\[[inClose]] | ||
<td>A boolean flag set to <emu-val>true</emu-val> while the <a>underlying sink</a>'s <code>close</code> method is | ||
executing and has not yet fulfilled, used to prevent the <code>abort</code> method from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{{WritableStreamDefaultWriter/abort()}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
1. If _stream_.[[state]] is not `"closing"`, return. | ||
1. Perform ! WritableStreamFulfillWriteRequest(_stream_). | ||
1. Assert: _controller_.[[inClose]] is *true*. | ||
1. Set _controller_.[[InClose]] to *false*. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercase InClose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -173,11 +176,26 @@ function WritableStreamFinishClose(stream) { | |||
defaultWriterClosedPromiseResolve(stream._writer); | |||
} | |||
|
|||
function WritableStreamFulfillWriteRequest(stream) { | |||
assert(stream._writeRequests.length > 0); | |||
function WritableStreamRejectUnresolvedPromises(stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WritableStreamRejectPromisesInReactionToError? Not important, but maybe an improvement.
Previously, if abort() was called while a sink write() was in progress, the promise returned from WritableStream.write() would be rejected. This behaviour was surprising, since the rejection would happen even if the underlying write succeeded. Change the promise returned by WritableStream.write() to always reflect the success or failure of the underlyingSink.write() that has started.
When transform() throws it errors the WritableStream. Previously this caused write()'s promise to reject, but now write() returns the result of the underlying sink unchanged. Pass back rejections from transform() to the caller of write().
It is confusing if the .closed promise resolves before the promise returned by write() (except when releaseLock() is called). Delay the .closed promise until after the promise resolves for the ongoing write() in the case when abort() is called.
When rejecting queued write() promises due to an abort(), wait until a pending sink write() call completes before rejecting the rest.
If writer.abort() was called during during execution of an underlying write(), and then that write rejected, the .closed, close() and subsequent write() promises would reject with the TypeError from abort(). Make them reject with the error from write() instead.
The underlying abort() method will now not be called until any pending underlying write() has finished. If an underlying close() is in progress then abort() will no longer be called at all. Other behavioural changes: * If the underlying operation has started, then the writer's write() and close() methods will always reflect the result of the underlying operation, rather than being rejected because abort() was called. * Consistently with the above, if an underlying operation calls controller.error() then the writer method will still reflect the result from the underlying operation. * If a call to writer.abort() has to wait for an underlying write() or close() to complete, and that underlying operation rejects, then the underlying abort() will not be called, and writer.abort() will return the rejection from the failed operation.
It would be a logic error to reject a pending writer.close() if the underlying sink close() method was in progress. Add an assert to verify that this doesn't happen.
4a12afc
to
b99ad3f
Compare
Prior to whatwg#619, abort() would reject the closed Promise immediately. Now it waits for the abort() to complete. This means that there can be a window when the stream is errored but the closed Promise has not been rejected. If releaseLock() was called during the window it would incorrectly create a new closed Promise on the assumption it was already rejected. Instead, when an abort() is pending, reject the Promise rather than creating a new one.
Prior to whatwg#619, abort() would reject the closed promise immediately. Now it waits for queued sink operations to finish. This means that there can be a window when the stream is errored but the closed promise has not been rejected. If releaseLock() was called during the window it would incorrectly create a new closed promise on the assumption it was already rejected. Instead, when an abort() is pending, reject the promise rather than creating a new one.
Prior to #619, abort() would reject the closed promise immediately. Now it waits for queued sink operations to finish. This means that there can be a window when the stream is errored but the closed promise has not been rejected. If releaseLock() was called during the window it would incorrectly create a new closed promise on the assumption it was already rejected. Instead, when an abort() is pending, reject the promise rather than creating a new one.
As of #619, writer.abort() does not disturb the stream while an underlying-sink write or close operation is ongoing. However, it was still possible to cause both underlying sink abort and close to happen, with code like `writer.close(); writer.abort()`. This was because of the asynchronous way in which the stream's state transitions from "closing" to actually having [[inClose]] set to true. This addresses that problem by peeking at the queue when abort is called. If the state is [[closing]], and the queue contains no writes, we will not perform the underlying abort, and instead just let the underlying close happen. This counts as a success, so we immediately return a promise fulfilled with undefined. Fixes #632.
This is a work-in-progress for #611.
I haven't updated the standard language to match yet. I would like to get an initial round of feedback on the changes before I do that.
PTAL.