Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: minor connect cleanup #2877

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const {
kMaxResponseSize,
kListeners,
kOnError,
kResume
kResume,
kHTTPContext
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -403,6 +404,7 @@ class Parser {
removeAllListeners(socket)

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))

Expand Down Expand Up @@ -643,6 +645,8 @@ function onParserTimeout (parser) {
}

async function connectH1 (client, socket) {
client[kSocket] = socket

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
Expand Down Expand Up @@ -706,6 +710,7 @@ async function connectH1 (client, socket) {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)
Expand Down Expand Up @@ -733,6 +738,11 @@ async function connectH1 (client, socket) {
client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
})

return {
version: 'h1',
defaultPipelining: 1,
Expand All @@ -742,38 +752,48 @@ async function connectH1 (client, socket) {
resume () {
resumeH1(client)
},
destroy () {
destroy (err, callback) {
if (closed) {
queueMicrotask(callback)
} else {
socket.destroy(err).on('close', callback)
}
},
get destroyed () {
return socket.destroyed
},
busy (request) {
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
return true
}

if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}
if (request) {
if (client[kRunning] > 0 && !request.idempotent) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}

if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
// Don't dispatch an upgrade until all preceding requests have completed.
// A misbehaving server might upgrade the connection before all pipelined
// request has completed.
return true
}

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
// Ensure this doesn't happen by waiting for inflight
// to complete before dispatching.

// Request with stream or iterator body cannot be retried.
// Ensure that no other requests are inflight and
// could cause failure.
return true
}
}

return false
Expand Down
17 changes: 16 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const {
} = http2

async function connectH2 (client, socket) {
client[kSocket] = socket

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
Expand Down Expand Up @@ -122,6 +124,11 @@ async function connectH2 (client, socket) {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

let closed = false
socket.on('close', () => {
closed = true
})

return {
version: 'h2',
defaultPipelining: Infinity,
Expand All @@ -132,8 +139,16 @@ async function connectH2 (client, socket) {
resume () {

},
destroy (err) {
destroy (err, callback) {
session.destroy(err)
if (closed) {
queueMicrotask(callback)
} else {
socket.destroy(err).on('close', callback)
}
},
get destroyed () {
return socket.destroyed
},
busy () {
return false
Expand Down
55 changes: 19 additions & 36 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ const {
const buildConnector = require('../core/connect.js')
const {
kUrl,
kReset,
kServerName,
kClient,
kBusy,
kConnect,
kBlocking,
kResuming,
kRunning,
kPending,
kSize,
kWriting,
kQueue,
kConnected,
kConnecting,
Expand All @@ -38,7 +35,6 @@ const {
kRunningIdx,
kError,
kPipelining,
kSocket,
kKeepAliveTimeoutValue,
kMaxHeadersSize,
kKeepAliveMaxTimeout,
Expand Down Expand Up @@ -216,7 +212,6 @@ class Client extends DispatcherBase {
: [createRedirectInterceptor({ maxRedirections })]
this[kUrl] = util.parseOrigin(url)
this[kConnector] = connect
this[kSocket] = null
this[kPipelining] = pipelining != null ? pipelining : 1
this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
Expand Down Expand Up @@ -277,13 +272,12 @@ class Client extends DispatcherBase {
}

get [kConnected] () {
return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
}

get [kBusy] () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
return Boolean(
this[kHTTPContext]?.busy(null) ||
(this[kSize] >= (getPipelining(this) || 1)) ||
this[kPending] > 0
)
Expand Down Expand Up @@ -346,13 +340,9 @@ class Client extends DispatcherBase {
resolve(null)
}

if (this[kHTTPContext] != null) {
this[kHTTPContext].destroy(err)
if (this[kHTTPContext]) {
this[kHTTPContext].destroy(err, callback)
this[kHTTPContext] = null
}

if (this[kSocket]) {
this[kSocket].destroy(err).on('close', callback)
} else {
queueMicrotask(callback)
}
Expand Down Expand Up @@ -386,7 +376,7 @@ function onError (client, err) {

async function connect (client) {
assert(!client[kConnecting])
assert(!client[kSocket])
assert(!client[kHTTPContext])

let { host, hostname, protocol, port } = client[kUrl]

Expand Down Expand Up @@ -441,21 +431,24 @@ async function connect (client) {
return
}

client[kConnecting] = false

assert(socket)

client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
try {
client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)
} catch (err) {
socket.destroy().on('error', () => {})
throw err
}

client[kConnecting] = false

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket[kClient] = client
socket[kError] = null

client[kSocket] = socket

if (channels.connected.hasSubscribers) {
channels.connected.publish({
connectParams: {
Expand Down Expand Up @@ -546,8 +539,6 @@ function _resume (client, sync) {
return
}

const socket = client[kSocket]

if (client[kHTTPContext]) {
client[kHTTPContext].resume()
}
Expand Down Expand Up @@ -580,27 +571,19 @@ function _resume (client, sync) {
}

client[kServerName] = request.servername

if (socket && socket.servername !== request.servername) {
util.destroy(socket, new InformationalError('servername changed'))
return
}
client[kHTTPContext]?.destroy(new InformationalError('servername changed'))
}

if (client[kConnecting]) {
return
}

if (!socket) {
if (!client[kHTTPContext]) {
connect(client)
return
}

if (socket.destroyed) {
return
}

if (!client[kHTTPContext]) {
if (client[kHTTPContext].destroyed) {
return
}

Expand Down
Loading