Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jsdoc: lib/api/readable.js, fix some types #3567

Merged
merged 5 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 193 additions & 40 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,20 @@ const kBytesRead = Symbol('kBytesRead')

const noop = () => {}

/**
* @class
* @extends {Readable}
* @see https://fetch.spec.whatwg.org/#body
*/
class BodyReadable extends Readable {
/**
* @param {object} opts
* @param {(this: Readable, size: number) => void} opts.resume
* @param {() => (void | null)} opts.abort
* @param {string} [opts.contentType = '']
* @param {number} [opts.contentLength]
* @param {number} [opts.highWaterMark = 64 * 1024]
*/
constructor ({
resume,
abort,
Expand All @@ -36,8 +49,15 @@ class BodyReadable extends Readable {
this._readableState.dataEmitted = false

this[kAbort] = abort

/**
* @type {Consume | null}
*/
this[kConsume] = null
this[kBytesRead] = 0
/**
* @type {ReadableStream|null}
*/
this[kBody] = null
this[kUsed] = false
this[kContentType] = contentType
Expand All @@ -50,6 +70,11 @@ class BodyReadable extends Readable {
this[kReading] = false
}

/**
* @param {Error|null} err
* @param {(error:(Error|null)) => void} callback
* @returns {void}
*/
_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
Expand All @@ -72,21 +97,36 @@ class BodyReadable extends Readable {
}
}

on (ev, ...args) {
if (ev === 'data' || ev === 'readable') {
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
on (event, listener) {
if (event === 'data' || event === 'readable') {
this[kReading] = true
this[kUsed] = true
}
return super.on(ev, ...args)
return super.on(event, listener)
}

addListener (ev, ...args) {
return this.on(ev, ...args)
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
addListener (event, listener) {
return this.on(event, listener)
}

off (ev, ...args) {
const ret = super.off(ev, ...args)
if (ev === 'data' || ev === 'readable') {
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
off (event, listener) {
const ret = super.off(event, listener)
if (event === 'data' || event === 'readable') {
this[kReading] = (
this.listenerCount('data') > 0 ||
this.listenerCount('readable') > 0
Expand All @@ -95,10 +135,19 @@ class BodyReadable extends Readable {
return ret
}

removeListener (ev, ...args) {
return this.off(ev, ...args)
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
removeListener (event, listener) {
return this.off(event, listener)
}

/**
* @param {Buffer|null} chunk
* @returns {boolean}
*/
push (chunk) {
this[kBytesRead] += chunk ? chunk.length : 0

Expand All @@ -109,43 +158,84 @@ class BodyReadable extends Readable {
return super.push(chunk)
}

// https://fetch.spec.whatwg.org/#dom-body-text
/**
* Consumes and returns the body as a string.
*
* @see https://fetch.spec.whatwg.org/#dom-body-text
* @returns {Promise<string>}
*/
async text () {
return consume(this, 'text')
}

// https://fetch.spec.whatwg.org/#dom-body-json
/**
* Consumes and returns the body as a JavaScript Object.
*
* @see https://fetch.spec.whatwg.org/#dom-body-json
* @returns {Promise<unknown>}
*/
async json () {
return consume(this, 'json')
}

// https://fetch.spec.whatwg.org/#dom-body-blob
/**
* Consumes and returns the body as a Blob
*
* @see https://fetch.spec.whatwg.org/#dom-body-blob
* @returns {Promise<Blob>}
*/
async blob () {
return consume(this, 'blob')
}

// https://fetch.spec.whatwg.org/#dom-body-bytes
/**
* Consumes and returns the body as an Uint8Array.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bytes
* @returns {Promise<Uint8Array>}
*/
async bytes () {
return consume(this, 'bytes')
}

// https://fetch.spec.whatwg.org/#dom-body-arraybuffer
/**
* Consumes and returns the body as an ArrayBuffer.
*
* @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
* @returns {Promise<ArrayBuffer>}
*/
async arrayBuffer () {
return consume(this, 'arrayBuffer')
}

// https://fetch.spec.whatwg.org/#dom-body-formdata
/**
* Not implemented
*
* @see https://fetch.spec.whatwg.org/#dom-body-formdata
* @throws {NotSupportedError}
*/
async formData () {
// TODO: Implement.
throw new NotSupportedError()
}

// https://fetch.spec.whatwg.org/#dom-body-bodyused
/**
* Returns true if the body is not null and the body has been consumed.
* Otherwise, returns false.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bodyused
* @readonly
* @returns {boolean}
*/
get bodyUsed () {
return util.isDisturbed(this)
}

// https://fetch.spec.whatwg.org/#dom-body-body
/**
* @see https://fetch.spec.whatwg.org/#dom-body-body
* @readonly
* @returns {ReadableStream}
*/
get body () {
if (!this[kBody]) {
this[kBody] = ReadableStreamFrom(this)
Expand All @@ -158,14 +248,23 @@ class BodyReadable extends Readable {
return this[kBody]
}

/**
* Dumps the response body by reading `limit` number of bytes.
* @param {object} opts
* @param {number} [opts.limit = 131072] Number of bytes to read.
* @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
* @returns {Promise<null>}
*/
async dump (opts) {
const signal = opts?.signal

if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}

const limit = Number.isFinite(opts?.limit) ? opts.limit : 128 * 1024
const limit = opts?.limit && Number.isFinite(opts.limit)
? opts.limit
: 128 * 1024

signal?.throwIfAborted()

Expand All @@ -174,26 +273,34 @@ class BodyReadable extends Readable {
}

return await new Promise((resolve, reject) => {
if (this[kContentLength] > limit || this[kBytesRead] > limit) {
if (
(this[kContentLength] && (this[kContentLength] > limit)) ||
this[kBytesRead] > limit
) {
this.destroy(new AbortError())
}

const onAbort = () => {
this.destroy(signal.reason ?? new AbortError())
if (signal) {
const onAbort = () => {
this.destroy(signal.reason ?? new AbortError())
}
signal.addEventListener('abort', onAbort)
this
.on('close', function () {
signal.removeEventListener('abort', onAbort)
if (signal.aborted) {
reject(signal.reason ?? new AbortError())
} else {
resolve(null)
}
})
} else {
this.on('close', resolve)
}
Comment on lines +283 to 299
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reorderering so that we only do signal specific stuff like creating onAbort if needed

signal?.addEventListener('abort', onAbort)

this
.on('close', function () {
signal?.removeEventListener('abort', onAbort)
if (signal?.aborted) {
reject(signal.reason ?? new AbortError())
} else {
resolve(null)
}
})
.on('error', noop)
.on('data', function (chunk) {
.on('data', () => {
if (this[kBytesRead] > limit) {
this.destroy()
}
Expand All @@ -204,7 +311,7 @@ class BodyReadable extends Readable {

/**
* @param {BufferEncoding} encoding
* @returns {BodyReadable}
* @returns {this}
*/
setEncoding (encoding) {
if (Buffer.isEncoding(encoding)) {
Expand All @@ -214,17 +321,40 @@ class BodyReadable extends Readable {
}
}

// https://streams.spec.whatwg.org/#readablestream-locked
function isLocked (self) {
/**
* @see https://streams.spec.whatwg.org/#readablestream-locked
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isLocked (bodyReadable) {
// Consume is an implicit lock.
return (self[kBody] && self[kBody].locked === true) || self[kConsume]
return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
}

// https://fetch.spec.whatwg.org/#body-unusable
function isUnusable (self) {
return util.isDisturbed(self) || isLocked(self)
/**
* @see https://fetch.spec.whatwg.org/#body-unusable
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isUnusable (bodyReadable) {
return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
}

/**
* @typedef {object} Consume
* @property {string} type
* @property {BodyReadable} stream
* @property {((value?: any) => void)} resolve
* @property {((err: Error) => void)} reject
* @property {number} length
* @property {Buffer[]} body
*/

/**
* @param {BodyReadable} stream
* @param {string} type
* @returns {Promise<any>}
*/
async function consume (stream, type) {
assert(!stream[kConsume])

Expand Down Expand Up @@ -269,6 +399,10 @@ async function consume (stream, type) {
})
}

/**
* @param {Consume} consume
* @returns {void}
*/
function consumeStart (consume) {
if (consume.body === null) {
return
Expand Down Expand Up @@ -356,6 +490,11 @@ function chunksConcat (chunks, length) {
return buffer
}

/**
* @param {Consume} consume
* @param {BufferEncoding} encoding
* @returns {void}
*/
function consumeEnd (consume, encoding) {
const { type, body, resolve, stream, length } = consume

Expand All @@ -378,11 +517,21 @@ function consumeEnd (consume, encoding) {
}
}

/**
* @param {Consume} consume
* @param {Buffer} chunk
* @returns {void}
*/
function consumePush (consume, chunk) {
consume.length += chunk.length
consume.body.push(chunk)
}

/**
* @param {Consume} consume
* @param {Error} [err]
* @returns {void}
*/
function consumeFinish (consume, err) {
if (consume.body === null) {
return
Expand All @@ -394,6 +543,7 @@ function consumeFinish (consume, err) {
consume.resolve()
}

// Reset the consume object to allow for garbage collection.
consume.type = null
consume.stream = null
consume.resolve = null
Expand All @@ -402,4 +552,7 @@ function consumeFinish (consume, err) {
consume.body = null
}

module.exports = { Readable: BodyReadable, chunksDecode }
module.exports = {
Readable: BodyReadable,
chunksDecode
}
Loading
Loading