Skip to content

Commit

Permalink
fix: stream body handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 1, 2023
1 parent 41c253d commit 4c8740b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
24 changes: 5 additions & 19 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1462,23 +1462,7 @@ function _resume (client, sync) {
return
}

if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
errorRequest(client, request, err)
})
.on('end', function () {
util.destroy(this)
})

request.body = null
}

if (client[kRunning] > 0 &&
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
Expand Down Expand Up @@ -1527,7 +1511,9 @@ function write (client, request) {
body.read(0)
}

let contentLength = util.bodyLength(body)
const bodyLength = util.bodyLength(body)

let contentLength = bodyLength

if (contentLength === null) {
contentLength = request.contentLength
Expand Down Expand Up @@ -1623,7 +1609,7 @@ function write (client, request) {
}

/* istanbul ignore else: assertion */
if (!body) {
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
Expand Down
41 changes: 40 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,33 @@ class Request {

this.method = method

this.abort = null

if (body == null) {
this.body = null
} else if (util.isStream(body)) {
const onEnd = () => {
util.destroy(body)
}

const onError = err => {
if (this.abort) {
this.abort(err)
} else {
this.errored = err
}
}

body
.on('end', onEnd)
.on('error', onError)

this.dispose = () => {
body
.off('end', onEnd)
.off('error', onError)
}

this.body = body
} else if (util.isBuffer(body)) {
this.body = body.byteLength ? body : null
Expand Down Expand Up @@ -236,7 +260,12 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

return this[kHandler].onConnect(abort)
if (this.errored) {
abort(this.errored)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
}
}

onHeaders (statusCode, headers, resume, statusText) {
Expand Down Expand Up @@ -267,6 +296,11 @@ class Request {
onComplete (trailers) {
assert(!this.aborted)

if (this.dispose) {
this.dispose()
this.dispose = null
}

this.completed = true
if (channels.trailers.hasSubscribers) {
channels.trailers.publish({ request: this, trailers })
Expand All @@ -275,6 +309,11 @@ class Request {
}

onError (error) {
if (this.dispose) {
this.dispose()
this.dispose = null
}

if (channels.error.hasSubscribers) {
channels.error.publish({ request: this, error })
}
Expand Down

0 comments on commit 4c8740b

Please sign in to comment.