Skip to content

Commit

Permalink
fix: parser callback expect sync return value (#576)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Feb 27, 2021
1 parent 9d7cab6 commit 68356f3
Showing 1 changed file with 97 additions and 79 deletions.
176 changes: 97 additions & 79 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,17 @@ class Parser extends HTTPParser {
return
}

const { queue, socket } = this

this.paused = false

this.resuming = true
while (this.queue.length) {
const [key, ...args] = this.queue.shift()
while (queue.length) {
if (socket.destroyed) {
return
}

const [key, ...args] = queue.shift()

this[key](...args)

Expand Down Expand Up @@ -439,108 +445,86 @@ class Parser extends HTTPParser {
}
}

[HTTPParser.kOnExecute] (ret) {
if (this.paused) {
this.queue.push([HTTPParser.kOnExecute, ret])
return
}
onUpgrade (head) {
const { socket } = this
const { client, headers, statusCode, request } = this

const { upgrade, socket } = this
assert(!socket.destroyed)
assert(socket === client[kSocket])
assert(!socket.isPaused())
assert(socket._handle && socket._handle.reading)
assert(request.upgrade)

if (!Number.isFinite(ret)) {
assert(ret instanceof Error)
util.destroy(socket, ret)
return
}
this.headers = null
this.statusCode = null
this.request = null

// This logic cannot live in kOnHeadersComplete since we
// have no way of slicing the parsing buffer without knowing
// the offset which is only provided in kOnExecute.
if (upgrade && !socket.destroyed) {
const { client, headers, statusCode, request } = this
// _readableState.flowing might be `true` if the socket has been
// explicitly `resume()`:d even if we never registered a 'data'
// listener.

assert(!socket.destroyed)
assert(socket === client[kSocket])
assert(!socket.isPaused())
assert(socket._handle && socket._handle.reading)
assert(request.upgrade)
// We need to stop unshift from emitting 'data'. However, we cannot
// call pause() as that will stop socket from automatically resuming
// when 'data' listener is registered.

this.headers = null
this.statusCode = null
this.request = null
// Reset socket state to non flowing:
socket._readableState.flowing = null
socket.unshift(head)

// _readableState.flowing might be `true` if the socket has been
// explicitly `resume()`:d even if we never registered a 'data'
// listener.
try {
if (!request.aborted) {
detachSocket(socket)
client[kSocket] = null

// We need to stop unshift from emitting 'data'. However, we cannot
// call pause() as that will stop socket from automatically resuming
// when 'data' listener is registered.
client[kQueue][client[kRunningIdx]++] = null

// Reset socket state to non flowing:
socket._readableState.flowing = null
client.emit('disconnect', client, new InformationalError('upgrade'))

socket.unshift(this.getCurrentBuffer().slice(ret))
request.onUpgrade(statusCode, headers, socket)
}
} catch (err) {
util.destroy(socket, err)
}

try {
if (!socket.destroyed && !request.aborted) {
detachSocket(socket)
client[kSocket] = null
resume(client)
}

client[kQueue][client[kRunningIdx]++] = null
[HTTPParser.kOnExecute] (ret) {
const { upgrade, socket } = this

client.emit('disconnect', client, new InformationalError('upgrade'))
if (socket.destroyed) {
return
}

request.onUpgrade(statusCode, headers, socket)
}
if (!Number.isFinite(ret)) {
assert(ret instanceof Error)
util.destroy(socket, ret)
return
}

resume(client)
} catch (err) {
util.destroy(socket, err)
// This logic cannot live in kOnHeadersComplete since we
// have no way of slicing the parsing buffer without knowing
// the offset which is only provided in kOnExecute.
if (upgrade) {
const head = this.getCurrentBuffer().slice(ret)
if (this.paused) {
this.queue.push(['onUpgrade', head])
} else {
this.onUpgrade(head)
}
}
}

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
/* istanbul ignore next: difficult to make a test case for */
if (this.paused) {
this.queue.push([HTTPParser.kOnHeadersComplete, versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive])
return
}

onHeadersComplete (rawHeaders, statusCode, shouldKeepAlive) {
const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]

/* istanbul ignore next: difficult to make a test case for */
if (socket.destroyed) {
return
}

clearTimeout(this.timeout)
this.timeout = client[kBodyTimeout]
? setTimeout(onBodyTimeout, client[kBodyTimeout], this)
: null

// TODO: Check for content-length mismatch from server?

assert(!this.upgrade)
assert(this.statusCode < 200)

// TODO: More statusCode validation?

if (statusCode === 100) {
util.destroy(socket, new SocketError('bad response'))
return 1
}

if (request.upgrade !== true && upgrade !== Boolean(request.upgrade)) {
util.destroy(socket, new SocketError('bad upgrade'))
return 1
}

if (this.headers) {
Array.prototype.push.apply(this.headers, rawHeaders)
} else {
Expand All @@ -554,7 +538,7 @@ class Parser extends HTTPParser {
if (request.upgrade) {
this.unconsume()
this.upgrade = true
return 2
return
}

let keepAlive
Expand Down Expand Up @@ -603,10 +587,44 @@ class Parser extends HTTPParser {
}
} catch (err) {
util.destroy(socket, err)
}
}

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]

/* istanbul ignore next: difficult to make a test case for */
if (socket.destroyed) {
return
}

// TODO: Check for content-length mismatch from server?

assert(!this.upgrade)
assert(this.statusCode < 200)

// TODO: More statusCode validation?

if (statusCode === 100) {
util.destroy(socket, new SocketError('bad response'))
return 1
}

return request.method === 'HEAD' || statusCode < 200 ? 1 : 0
if (request.upgrade !== true && upgrade !== Boolean(request.upgrade)) {
util.destroy(socket, new SocketError('bad upgrade'))
return 1
}

if (this.paused) {
this.queue.push(['onHeadersComplete', rawHeaders, statusCode, shouldKeepAlive])
} else {
this.onHeadersComplete(rawHeaders, statusCode, shouldKeepAlive)
}

return request.upgrade ? 2 : request.method === 'HEAD' || statusCode < 200 ? 1 : 0
}

[HTTPParser.kOnBody] (chunk, offset, length) {
Expand Down

0 comments on commit 68356f3

Please sign in to comment.