Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: request abort #3056

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -915,22 +915,24 @@ function writeH1 (client, request) {

const socket = client[kSocket]

try {
request.onConnect((err) => {
if (request.aborted || request.completed) {
return
}
const abort = (err) => {
if (request.aborted || request.completed) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
errorRequest(client, request, err || new RequestAbortedError())

util.destroy(socket, new InformationalError('aborted'))
})
util.destroy(body)
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
util.destroy(socket, new InformationalError('aborted'))
}

try {
request.onConnect(abort)
} catch (err) {
errorRequest(client, request, err)
}

if (request.aborted) {
util.destroy(body)
return false
}

Expand Down Expand Up @@ -998,48 +1000,32 @@ function writeH1 (client, request) {

/* istanbul ignore else: assertion */
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'latin1')
}
request.onRequestSent()
writeBuffer({ abort, body: null, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

socket.cork()
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
socket.write(body)
socket.uncork()
request.onBodySent(body)
request.onRequestSent()
if (!expectsPayload) {
socket[kReset] = true
}
writeBuffer({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isBlobLike(body)) {
if (typeof body.stream === 'function') {
writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
writeIterable({ abort, body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
} else {
writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
writeBlob({ abort, body, client, request, socket, contentLength, header, expectsPayload })
}
} else if (util.isStream(body)) {
writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
writeStream({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else if (util.isIterable(body)) {
writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
writeIterable({ abort, body, client, request, socket, contentLength, header, expectsPayload })
} else {
assert(false)
}

return true
}

function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
function writeStream ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')

let finished = false

const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })

const onData = function (chunk) {
if (finished) {
Expand Down Expand Up @@ -1137,7 +1123,37 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
}
}

async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeBuffer ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
try {
if (!body) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'latin1')
}
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

socket.cork()
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
socket.write(body)
socket.uncork()
request.onBodySent(body)

if (!expectsPayload) {
socket[kReset] = true
}
}
request.onRequestSent()

client[kResume]()
} catch (err) {
abort(err)
}
}

async function writeBlob ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength === body.size, 'blob body must have content length')

try {
Expand All @@ -1161,11 +1177,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng

client[kResume]()
} catch (err) {
util.destroy(socket, err)
abort(err)
}
}

async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeIterable ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')

let callback = null
Expand All @@ -1191,7 +1207,7 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
.on('close', onDrain)
.on('drain', onDrain)

const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
try {
// It's up to the user to somehow abort the async iterable.
for await (const chunk of body) {
Expand All @@ -1215,14 +1231,15 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}

class AsyncWriter {
constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) {
this.socket = socket
this.request = request
this.contentLength = contentLength
this.client = client
this.bytesWritten = 0
this.expectsPayload = expectsPayload
this.header = header
this.abort = abort

socket[kWriting] = true
}
Expand Down Expand Up @@ -1338,13 +1355,13 @@ class AsyncWriter {
}

destroy (err) {
const { socket, client } = this
const { socket, client, abort } = this

socket[kWriting] = false

if (err) {
assert(client[kRunning] <= 1, 'pipeline should only contain this request')
util.destroy(socket, err)
abort(err)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ function writeH2 (client, request) {

if (upgrade) {
errorRequest(client, request, new Error('Upgrade not supported for H2'))
return false
}

if (request.aborted) {
Expand Down
Loading