Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(H2): handle goaway properly #3057

Merged
merged 4 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,12 @@ Parameters:
* **targets** `Array<Dispatcher>`
* **error** `Error`

Emitted when the dispatcher has been disconnected from the origin.

> **Note**: For HTTP/2, this event is also emitted when the dispatcher has received the [GOAWAY Frame](https://webconcepts.info/concepts/http2-frame-type/0x7) with an Error with the message `HTTP/2: "GOAWAY" frame received` and the code `UND_ERR_INFO`.
> Due to nature of the protocol of using binary frames, it is possible that requests gets hanging as a frame can be received between the `HEADER` and `DATA` frames.
> It is recommended to handle this event and close the dispatcher to create a new HTTP/2 session.

### Event: `'connectionError'`

Parameters:
Expand Down
28 changes: 27 additions & 1 deletion lib/core/util.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const assert = require('node:assert')
const { kDestroyed, kBodyUsed } = require('./symbols')
const { kDestroyed, kBodyUsed, kListeners } = require('./symbols')
const { IncomingMessage } = require('node:http')
const stream = require('node:stream')
const net = require('node:net')
Expand Down Expand Up @@ -534,6 +534,29 @@ function parseRangeHeader (range) {
: null
}

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
}

const kEnumerableProperty = Object.create(null)
kEnumerableProperty.enumerable = true

Expand All @@ -556,6 +579,9 @@ module.exports = {
isDestroyed,
headerNameToString,
bufferToLowerCasedHeaderName,
addListener,
removeAllListeners,
errorRequest,
parseRawHeaders,
parseHeaders,
parseKeepAliveTimeout,
Expand Down
36 changes: 7 additions & 29 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
kMaxRequests,
kCounter,
kMaxResponseSize,
kListeners,
kOnError,
kResume,
kHTTPContext
Expand All @@ -56,23 +55,11 @@ const {
const constants = require('../llhttp/constants.js')
const EMPTY_BUF = Buffer.alloc(0)
const FastBuffer = Buffer[Symbol.species]
const addListener = util.addListener
const removeAllListeners = util.removeAllListeners

let extractBody

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

async function lazyllhttp () {
const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined

Expand Down Expand Up @@ -719,14 +706,14 @@ async function connectH1 (client, socket) {
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]
Expand Down Expand Up @@ -831,15 +818,6 @@ function resumeH1 (client) {
}
}

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
function shouldSendContentLength (method) {
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
Expand Down Expand Up @@ -906,7 +884,7 @@ function writeH1 (client, request) {
// A user agent may send a Content-Length header with 0 value, this should be allowed.
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
util.errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

Expand All @@ -920,7 +898,7 @@ function writeH1 (client, request) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
util.errorRequest(client, request, err || new RequestAbortedError())

util.destroy(body)
util.destroy(socket, new InformationalError('aborted'))
Expand All @@ -929,7 +907,7 @@ function writeH1 (client, request) {
try {
request.onConnect(abort)
} catch (err) {
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

if (request.aborted) {
Expand Down
103 changes: 41 additions & 62 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
kSocket,
kStrictContentLength,
kOnError,
// HTTP2
kMaxConcurrentStreams,
kHTTP2Session,
kResume
Expand Down Expand Up @@ -92,24 +91,26 @@ async function connectH2 (client, socket) {
session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', function () {

util.addListener(session, 'error', onHttp2SessionError)
util.addListener(session, 'frameError', onHttp2FrameError)
util.addListener(session, 'end', onHttp2SessionEnd)
util.addListener(session, 'goaway', onHTTP2GoAway)
util.addListener(session, 'close', function () {
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTP2Session] = null

assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]
Expand All @@ -120,19 +121,21 @@ async function connectH2 (client, socket) {

client[kResume]()
})

session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

socket.on('error', function (err) {
util.addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

this[kClient][kOnError](err)
})
socket.on('end', function () {

util.addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

Expand Down Expand Up @@ -172,67 +175,42 @@ function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kSocket][kError] = err

this[kClient][kOnError](err)
}

function onHttp2FrameError (type, code, id) {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)

if (id === 0) {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
this[kSocket][kError] = err
this[kClient][kOnError](err)
}
}

function onHttp2SessionEnd () {
this.destroy(new SocketError('other side closed'))
util.destroy(this[kSocket], new SocketError('other side closed'))
const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
this.destroy(err)
util.destroy(this[kSocket], err)
}

/**
* This is the root cause of #3011
* We need to handle GOAWAY frames properly, and trigger the session close
* along with the socket right away
* Find a way to trigger the close cycle from here on.
*/
function onHTTP2GoAway (code) {
const client = this[kClient]
const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
client[kSocket] = null
client[kHTTP2Session] = null

if (client.destroyed) {
assert(this[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(this, request, err)
}
} else if (client[kRunning] > 0) {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect',
client[kUrl],
[client],
err
)

client[kResume]()
}
// We need to trigger the close cycle right away
// We need to destroy the session and the socket
// Requests should be failed with the error after the current one is handled
this[kSocket][kError] = err
this[kClient][kOnError](err)

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
this.unref()
// We send the GOAWAY frame response as no error
this.destroy()
util.destroy(this[kSocket], err)
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
Expand All @@ -245,7 +223,8 @@ function writeH2 (client, request) {
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request

if (upgrade) {
errorRequest(client, request, new Error('Upgrade not supported for H2'))
util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
return false
}

if (request.aborted) {
Expand Down Expand Up @@ -297,10 +276,10 @@ function writeH2 (client, request) {
}
}

errorRequest(client, request, err)
util.errorRequest(client, request, err)
})
} catch (err) {
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

if (method === 'CONNECT') {
Expand Down Expand Up @@ -375,7 +354,7 @@ function writeH2 (client, request) {
// A user agent may send a Content-Length header with 0 value, this should be allowed.
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
util.errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

Expand Down Expand Up @@ -417,7 +396,7 @@ function writeH2 (client, request) {
// as there's no value to keep it open.
if (request.aborted || request.completed) {
const err = new RequestAbortedError()
errorRequest(client, request, err)
util.errorRequest(client, request, err)
util.destroy(stream, err)
return
}
Expand Down Expand Up @@ -451,13 +430,12 @@ function writeH2 (client, request) {
}

const err = new InformationalError('HTTP/2: stream half-closed (remote)')
errorRequest(client, request, err)
util.errorRequest(client, request, err)
util.destroy(stream, err)
})

stream.once('close', () => {
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (session[kOpenStreams] === 0) {
session.unref()
}
Expand All @@ -466,13 +444,14 @@ function writeH2 (client, request) {
stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.errorRequest(client, request, err)
util.destroy(stream, err)
}
})

stream.once('frameError', (type, code) => {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
errorRequest(client, request, err)
util.errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
Expand Down
Loading
Loading