-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http2: fix condition where data is lost #18895
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,8 +307,23 @@ function onStreamClose(code) { | |
|
||
if (state.fd !== undefined) | ||
tryClose(state.fd); | ||
stream.push(null); | ||
stream[kMaybeDestroy](null, code); | ||
|
||
// Defer destroy we actually emit end. | ||
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { | ||
// If errored or ended, we can destroy immediately. | ||
stream[kMaybeDestroy](null, code); | ||
} else { | ||
// Wait for end to destroy. | ||
stream.on('end', stream[kMaybeDestroy]); | ||
// Push a null so the stream can end whenever the client consumes | ||
// it completely. | ||
stream.push(null); | ||
|
||
// Same as net. | ||
if (stream.readableLength === 0) { | ||
stream.read(0); | ||
} | ||
} | ||
} | ||
|
||
// Receives a chunk of data for a given stream and forwards it on | ||
|
@@ -326,11 +341,19 @@ function onStreamRead(nread, buf) { | |
} | ||
return; | ||
} | ||
|
||
// Last chunk was received. End the readable side. | ||
debug(`Http2Stream ${stream[kID]} [Http2Session ` + | ||
`${sessionName(stream[kSession][kType])}]: ending readable.`); | ||
stream.push(null); | ||
stream[kMaybeDestroy](); | ||
|
||
// defer this until we actually emit end | ||
if (stream._readableState.endEmitted) { | ||
stream[kMaybeDestroy](); | ||
} else { | ||
stream.on('end', stream[kMaybeDestroy]); | ||
stream.push(null); | ||
stream.read(0); | ||
} | ||
} | ||
|
||
// Called when the remote peer settings have been updated. | ||
|
@@ -1833,21 +1856,25 @@ class Http2Stream extends Duplex { | |
session[kMaybeDestroy](); | ||
process.nextTick(emit, this, 'close', code); | ||
callback(err); | ||
} | ||
|
||
} | ||
// The Http2Stream can be destroyed if it has closed and if the readable | ||
// side has received the final chunk. | ||
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) { | ||
if (error == null) { | ||
if (code === NGHTTP2_NO_ERROR && | ||
(!this._readableState.ended || | ||
!this._writableState.ended || | ||
this._writableState.pendingcb > 0 || | ||
!this.closed)) { | ||
return; | ||
} | ||
if (error || code !== NGHTTP2_NO_ERROR) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code is correct as it is. There can be an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I meant is if But it is not important. |
||
this.destroy(error); | ||
return; | ||
} | ||
|
||
// TODO(mcollina): remove usage of _*State properties | ||
if (this._readableState.ended && | ||
this._writableState.ended && | ||
this._writableState.pendingcb === 0 && | ||
this.closed) { | ||
this.destroy(); | ||
// This should return, but eslint complains. | ||
// return | ||
} | ||
this.destroy(error); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
if (!common.hasCrypto) | ||
common.skip('missing crypto'); | ||
const assert = require('assert'); | ||
const http2 = require('http2'); | ||
const { Readable } = require('stream'); | ||
|
||
const server = http2.createServer(common.mustCall((req, res) => { | ||
res.setHeader('content-type', 'text/html'); | ||
const input = new Readable({ | ||
read() { | ||
this.push('test'); | ||
this.push(null); | ||
} | ||
}); | ||
input.pipe(res); | ||
})); | ||
|
||
server.listen(0, common.mustCall(() => { | ||
const port = server.address().port; | ||
const client = http2.connect(`http://localhost:${port}`); | ||
|
||
const req = client.request(); | ||
|
||
req.on('response', common.mustCall((headers) => { | ||
assert.strictEqual(headers[':status'], 200); | ||
assert.strictEqual(headers['content-type'], 'text/html'); | ||
})); | ||
|
||
let data = ''; | ||
|
||
const notCallClose = common.mustNotCall(); | ||
|
||
setTimeout(() => { | ||
req.setEncoding('utf8'); | ||
req.removeListener('close', notCallClose); | ||
req.on('close', common.mustCall(() => { | ||
server.close(); | ||
client.close(); | ||
})); | ||
req.on('data', common.mustCallAtLeast((d) => data += d)); | ||
req.on('end', common.mustCall(() => { | ||
assert.strictEqual(data, 'test'); | ||
})); | ||
}, common.platformTimeout(100)); | ||
|
||
req.on('close', notCallClose); | ||
})); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
if (!common.hasCrypto) | ||
common.skip('missing crypto'); | ||
const assert = require('assert'); | ||
const http2 = require('http2'); | ||
const { Readable } = require('stream'); | ||
|
||
const server = http2.createServer(); | ||
server.on('stream', common.mustCall((stream) => { | ||
stream.respond({ | ||
':status': 200, | ||
'content-type': 'text/html' | ||
}); | ||
const input = new Readable({ | ||
read() { | ||
this.push('test'); | ||
this.push(null); | ||
} | ||
}); | ||
input.pipe(stream); | ||
})); | ||
|
||
|
||
server.listen(0, common.mustCall(() => { | ||
const port = server.address().port; | ||
const client = http2.connect(`http://localhost:${port}`); | ||
|
||
const req = client.request(); | ||
|
||
req.on('response', common.mustCall((headers) => { | ||
assert.strictEqual(headers[':status'], 200); | ||
assert.strictEqual(headers['content-type'], 'text/html'); | ||
})); | ||
|
||
let data = ''; | ||
|
||
const notCallClose = common.mustNotCall(); | ||
|
||
setTimeout(() => { | ||
req.setEncoding('utf8'); | ||
req.removeListener('close', notCallClose); | ||
req.on('close', common.mustCall(() => { | ||
server.close(); | ||
client.close(); | ||
})); | ||
req.on('data', common.mustCallAtLeast((d) => data += d)); | ||
req.on('end', common.mustCall(() => { | ||
assert.strictEqual(data, 'test'); | ||
})); | ||
}, common.platformTimeout(100)); | ||
|
||
req.on('close', notCallClose); | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the
stream.push
necessary here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is. It's needed to close the stream.