Skip to content

Commit

Permalink
fix: request abort (#3056)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 7, 2024
1 parent 1a638d2 commit c886b34
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 41 deletions.
97 changes: 57 additions & 40 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -915,22 +915,24 @@ function writeH1 (client, request) {

const socket = client[kSocket]

try {
request.onConnect((err) => {
if (request.aborted || request.completed) {
return
}
const abort = (err) => {
if (request.aborted || request.completed) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
errorRequest(client, request, err || new RequestAbortedError())

util.destroy(socket, new InformationalError('aborted'))
})
util.destroy(body)
util.destroy(socket, new InformationalError('aborted'))
}

try {
request.onConnect(abort)
} catch (err) {
errorRequest(client, request, err)
}

if (request.aborted) {
util.destroy(body)
return false
}

Expand Down Expand Up @@ -998,48 +1000,32 @@ function writeH1 (client, request) {

/* istanbul ignore else: assertion */
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'latin1')
}
request.onRequestSent()
writeBuffer({ abort, body: null, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

socket.cork()
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
socket.write(body)
socket.uncork()
request.onBodySent(body)
request.onRequestSent()
if (!expectsPayload) {
socket[kReset] = true
}
writeBuffer({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isBlobLike(body)) {
if (typeof body.stream === 'function') {
writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
writeIterable({ abort, body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
} else {
writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
writeBlob({ abort, body, client, request, socket, contentLength, header, expectsPayload })
}
} else if (util.isStream(body)) {
writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
writeStream({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isIterable(body)) {
writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
writeIterable({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else {
assert(false)
}

return true
}

function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
function writeStream ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')

let finished = false

const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })

const onData = function (chunk) {
if (finished) {
Expand Down Expand Up @@ -1137,7 +1123,37 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
}
}

async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeBuffer ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
try {
if (!body) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'latin1')
}
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

socket.cork()
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
socket.write(body)
socket.uncork()
request.onBodySent(body)

if (!expectsPayload) {
socket[kReset] = true
}
}
request.onRequestSent()

client[kResume]()
} catch (err) {
abort(err)
}
}

async function writeBlob ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength === body.size, 'blob body must have content length')

try {
Expand All @@ -1161,11 +1177,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng

client[kResume]()
} catch (err) {
util.destroy(socket, err)
abort(err)
}
}

async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeIterable ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')

let callback = null
Expand All @@ -1191,7 +1207,7 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
.on('close', onDrain)
.on('drain', onDrain)

const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
try {
// It's up to the user to somehow abort the async iterable.
for await (const chunk of body) {
Expand All @@ -1215,14 +1231,15 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}

class AsyncWriter {
constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) {
this.socket = socket
this.request = request
this.contentLength = contentLength
this.client = client
this.bytesWritten = 0
this.expectsPayload = expectsPayload
this.header = header
this.abort = abort

socket[kWriting] = true
}
Expand Down Expand Up @@ -1338,13 +1355,13 @@ class AsyncWriter {
}

destroy (err) {
const { socket, client } = this
const { socket, client, abort } = this

socket[kWriting] = false

if (err) {
assert(client[kRunning] <= 1, 'pipeline should only contain this request')
util.destroy(socket, err)
abort(err)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ function writeH2 (client, request) {

if (upgrade) {
errorRequest(client, request, new Error('Upgrade not supported for H2'))
return false
}

if (request.aborted) {
Expand Down

0 comments on commit c886b34

Please sign in to comment.