-
Notifications
You must be signed in to change notification settings - Fork 30.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http2: use and support non-empty DATA frame with END_STREAM flag #33875
Closed
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
ca21bd1
http2: use and support non-empty DATA frame with END_STREAM flag
clshortfuse 1e35afd
remove whitespace
clshortfuse 30b1c06
Update node_http2.cc
clshortfuse 6f7c361
Update node_http2.cc
clshortfuse cbddeb6
Update node_http2.cc
clshortfuse acb5999
Update node_http2.cc
clshortfuse 85cf722
Update node_http2.cc
clshortfuse 7bae5a5
http2: add "receive paused" debug native line
clshortfuse 789cc1d
http2: remove empty DATA frame
clshortfuse 0c110d0
http2: update tests
clshortfuse 563a310
fix trailers flag detection
clshortfuse 327e9e8
fix tests
clshortfuse 58a3885
add test
clshortfuse 1778fc6
add state.shutdownWritableCalled
clshortfuse File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1140,6 +1140,7 @@ class Http2Session extends EventEmitter { | |
streams: new Map(), | ||
pendingStreams: new Set(), | ||
pendingAck: 0, | ||
shutdownWritableCalled: false, | ||
writeQueueSize: 0, | ||
originSet: undefined | ||
}; | ||
|
@@ -1718,6 +1719,25 @@ function afterShutdown(status) { | |
this.callback(); | ||
} | ||
|
||
function shutdownWritable(callback) { | ||
const handle = this[kHandle]; | ||
if (!handle) return callback(); | ||
const state = this[kState]; | ||
if (state.shutdownWritableCalled) { | ||
debugStreamObj(this, 'shutdownWritable() already called'); | ||
return callback(); | ||
} | ||
state.shutdownWritableCalled = true; | ||
|
||
const req = new ShutdownWrap(); | ||
req.oncomplete = afterShutdown; | ||
req.callback = callback; | ||
req.handle = handle; | ||
const err = handle.shutdown(req); | ||
if (err === 1) // synchronous finish | ||
return afterShutdown.call(req, 0); | ||
} | ||
|
||
function finishSendTrailers(stream, headersList) { | ||
// The stream might be destroyed and in that case | ||
// there is nothing to do. | ||
|
@@ -1978,19 +1998,47 @@ class Http2Stream extends Duplex { | |
|
||
let req; | ||
|
||
// writeGeneric does not destroy on error and we cannot enable autoDestroy, | ||
// so make sure to destroy on error. | ||
const callback = (err) => { | ||
let waitingForWriteCallback = true; | ||
let waitingForEndCheck = true; | ||
let writeCallbackErr; | ||
let endCheckCallbackErr; | ||
const done = () => { | ||
if (waitingForEndCheck || waitingForWriteCallback) return; | ||
const err = writeCallbackErr || endCheckCallbackErr; | ||
// writeGeneric does not destroy on error and | ||
// we cannot enable autoDestroy, | ||
// so make sure to destroy on error. | ||
if (err) { | ||
this.destroy(err); | ||
} | ||
cb(err); | ||
}; | ||
const writeCallback = (err) => { | ||
waitingForWriteCallback = false; | ||
writeCallbackErr = err; | ||
done(); | ||
}; | ||
const endCheckCallback = (err) => { | ||
waitingForEndCheck = false; | ||
endCheckCallbackErr = err; | ||
done(); | ||
}; | ||
// Shutdown write stream right after last chunk is sent | ||
// so final DATA frame can include END_STREAM flag | ||
process.nextTick(() => { | ||
if (writeCallbackErr || | ||
!this._writableState.ending || | ||
this._writableState.buffered.length || | ||
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS)) | ||
return endCheckCallback(); | ||
debugStreamObj(this, 'shutting down writable on last write'); | ||
shutdownWritable.call(this, endCheckCallback); | ||
}); | ||
|
||
if (writev) | ||
req = writevGeneric(this, data, callback); | ||
req = writevGeneric(this, data, writeCallback); | ||
else | ||
req = writeGeneric(this, data, encoding, callback); | ||
req = writeGeneric(this, data, encoding, writeCallback); | ||
|
||
trackWriteState(this, req.bytes); | ||
} | ||
|
@@ -2004,21 +2052,12 @@ class Http2Stream extends Duplex { | |
} | ||
|
||
_final(cb) { | ||
const handle = this[kHandle]; | ||
if (this.pending) { | ||
this.once('ready', () => this._final(cb)); | ||
} else if (handle !== undefined) { | ||
debugStreamObj(this, '_final shutting down'); | ||
const req = new ShutdownWrap(); | ||
req.oncomplete = afterShutdown; | ||
req.callback = cb; | ||
req.handle = handle; | ||
const err = handle.shutdown(req); | ||
if (err === 1) // synchronous finish | ||
return afterShutdown.call(req, 0); | ||
} else { | ||
cb(); | ||
return; | ||
} | ||
debugStreamObj(this, 'shutting down writable on _final'); | ||
shutdownWritable.call(this, cb); | ||
} | ||
|
||
_read(nread) { | ||
|
@@ -2122,11 +2161,20 @@ class Http2Stream extends Duplex { | |
debugStream(this[kID] || 'pending', session[kType], 'destroying stream'); | ||
|
||
const state = this[kState]; | ||
const sessionCode = session[kState].goawayCode || | ||
session[kState].destroyCode; | ||
const code = err != null ? | ||
sessionCode || NGHTTP2_INTERNAL_ERROR : | ||
state.rstCode || sessionCode; | ||
const sessionState = session[kState]; | ||
const sessionCode = sessionState.goawayCode || sessionState.destroyCode; | ||
|
||
// If a stream has already closed successfully, there is no error | ||
// to report from this stream, even if the session has errored. | ||
// This can happen if the stream was already in process of destroying | ||
// after a successful close, but the session had a error between | ||
// this stream's close and destroy operations. | ||
// Previously, this always overrode a successful close operation code | ||
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator. | ||
const code = (err != null ? | ||
(sessionCode || NGHTTP2_INTERNAL_ERROR) : | ||
(this.closed ? this.rstCode : sessionCode) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, a stream can now end so quickly, it can close successfully before the next stream can emit an error that bubbles up to the session (see: |
||
); | ||
const hasHandle = handle !== undefined; | ||
|
||
if (!this.closed) | ||
|
@@ -2135,13 +2183,13 @@ class Http2Stream extends Duplex { | |
|
||
if (hasHandle) { | ||
handle.destroy(); | ||
session[kState].streams.delete(id); | ||
sessionState.streams.delete(id); | ||
} else { | ||
session[kState].pendingStreams.delete(this); | ||
sessionState.pendingStreams.delete(this); | ||
} | ||
|
||
// Adjust the write queue size for accounting | ||
session[kState].writeQueueSize -= state.writeQueueSize; | ||
sessionState.writeQueueSize -= state.writeQueueSize; | ||
state.writeQueueSize = 0; | ||
|
||
// RST code 8 not emitted as an error as its used by clients to signify | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
if (!common.hasCrypto) | ||
common.skip('missing crypto'); | ||
const assert = require('assert'); | ||
const http2 = require('http2'); | ||
|
||
const { PerformanceObserver } = require('perf_hooks'); | ||
|
||
const server = http2.createServer(); | ||
|
||
server.on('stream', (stream, headers) => { | ||
stream.respond({ | ||
'content-type': 'text/html', | ||
':status': 200 | ||
}); | ||
switch (headers[':path']) { | ||
case '/singleEnd': | ||
stream.end('OK'); | ||
break; | ||
case '/sequentialEnd': | ||
stream.write('OK'); | ||
stream.end(); | ||
break; | ||
case '/delayedEnd': | ||
stream.write('OK', () => stream.end()); | ||
break; | ||
} | ||
}); | ||
|
||
function testRequest(path, targetFrameCount, callback) { | ||
const obs = new PerformanceObserver((list, observer) => { | ||
const entry = list.getEntries()[0]; | ||
if (entry.name !== 'Http2Session') return; | ||
if (entry.type !== 'client') return; | ||
assert.strictEqual(entry.framesReceived, targetFrameCount); | ||
observer.disconnect(); | ||
callback(); | ||
}); | ||
obs.observe({ entryTypes: ['http2'] }); | ||
const client = http2.connect(`http://localhost:${server.address().port}`, () => { | ||
const req = client.request({ ':path': path }); | ||
req.resume(); | ||
req.end(); | ||
req.on('end', () => client.close()); | ||
}); | ||
} | ||
|
||
// SETTINGS => SETTINGS => HEADERS => DATA | ||
const MIN_FRAME_COUNT = 4; | ||
|
||
server.listen(0, () => { | ||
testRequest('/singleEnd', MIN_FRAME_COUNT, () => { | ||
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => { | ||
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => { | ||
server.close(); | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this lead to a corrupted state where the callback from
_final
would get called right away and therefore the stream would go on withfinish
sequence even though we are not done (i.e.ShutdownWrap
is not done)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the only way to solve this is to store the
cb
from_final
and call it when realShutdownWrap
finishes (maybe inendCheckCallback
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
won't get called until the lastwrite
callback is fired.Writable
wraps all the write callbacksnode/lib/_stream_writable.js
Line 400 in b8ea471
It's important the
kWriteGeneric
function waits for both commands to finish (including Shutdown) before calling thecb
function, which will then causefinal
to eventually be called.Also, even if
Do::Shutdown()
is called twice (inwrite()
and infinal()
, it's of no actual consequence. This check is just an optimization, not a requirement.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now, thanks for the explanation. I guess that would work.
ping @ronag could you PTAL at this from streams perspective?