diff --git a/doc/api/errors.md b/doc/api/errors.md index 7ac741bba68737..4f0e0dad5ae7df 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1428,6 +1428,11 @@ is set for the `Http2Stream`. `http2.connect()` was passed a URL that uses any protocol other than `http:` or `https:`. + +### `ERR_ILLEGAL_CONSTRUCTOR` + +An attempt was made to construct an object using a non-public constructor. + ### `ERR_INCOMPATIBLE_OPTION_PAIR` diff --git a/doc/api/index.md b/doc/api/index.md index 71c415afaa673a..957c5d55b3008b 100644 --- a/doc/api/index.md +++ b/doc/api/index.md @@ -64,6 +64,7 @@ * [VM](vm.md) * [WASI](wasi.md) * [Web Crypto API](webcrypto.md) +* [WHATWG Streams](whatwg_streams.md) * [Worker threads](worker_threads.md) * [Zlib](zlib.md) diff --git a/doc/api/whatwg_streams.md b/doc/api/whatwg_streams.md new file mode 100644 index 00000000000000..c0a689d0193be2 --- /dev/null +++ b/doc/api/whatwg_streams.md @@ -0,0 +1,607 @@ +# WHATWG Streams + +> Stability: 1 - Experimental + +An implementation of the [WHATWG Streams Standard][]. + +```mjs +import { + ReadableStream, + WritableStream, + TransformStream, +} from 'node:stream/whatwg'; +``` + +```cjs +const { + ReadableStream, + WritableStream, + TransformStream, +} = require('stream/whatwg'); +``` + +## Overview + +## API + +### Class: `ReadableStream` + + +#### `new ReadableStream([underlyingSource [, strategy]])` + + + +* `underlyingSource` {Object} + * `start` {Function} + * `controller` {ReadableStreamDefaultController|ReadableByteStreamController} + * Returns: `undefined` or a promise fulfilled with `undefined`. + * `pull` {Function} + * `controller` {ReadableStreamDefaultController|ReadableByteStreamController} + * Returns: A promise fulfilled with `undefined`. + * `cancel` {Function} + * `reason` {any} + * Returns: A promise fulfilled with `undefined`. + * `type` {string} Must be `'bytes'` or `undefined`. + * `autoAllocateChunkSize` {number} +* `strategy` {Object} + * `highWaterMark` {number} + * `size` {Function} + * `chunk` {any} + * Returns: {number} + + +#### `readableStream.locked` + + +* Type: {boolean} + +#### `readableStream.cancel([reason])` + + +* `reason` {any} +* Returns: A promise fulfilled with `undefined`. + +#### `readableStream.getReader([options])` + + +* `options` {Object} + * `mode` {string} `'byob'` or `undefined` +* Returns: {ReadableStreamDefaultReader|ReadableStreamBYOBReader} + +#### `readableStream.pipeThrough(transform[, options])` + + +* `transform` {Object} + * `readable` {ReadableStream} + * `writable` {WritableStream} +* `options` {Object} + * `preventAbort` {boolean} + * `preventCancel` {boolean} + * `preventClose` {boolean} + * `signal` {AbortSignal} +* Returns: {ReadableStream} + +#### `readableStream.pipeTo(destination, options)` + + +* `destination` {WritableStream} +* `options` {Object} + * `preventAbort` {boolean} + * `preventCancel` {boolean} + * `preventClose` {boolean} + * `signal` {AbortSignal} +* Returns: A promise fulfilled with `undefined` + +#### `readableStream.tee()` + + +* Returns: {ReadableStream[]} + +#### `readableStream.values([options])` + + +* `options` {Object} + * `preventCancel` {boolean} When `true`, prevents the {ReadableStream} + from being closed when the async iterator abruptly terminates. + **Defaults**: `false` + +```mjs +const stream = new ReadableStream(getSomeSource()); + +for await (const chunk of stream.values({ preventCancel: true })) + console.log(Buffer.from(chunk).toString()); +``` + +#### Async Iteration + +The {ReadableStream} object supports the async iterator protocol using +`for await` syntax. + +```mjs +const stream = new ReadableStream(getSomeSource()); + +for await (const chunk of stream) + console.log(Buffer.from(chunk).toString()); +``` + +The async iterator will consume the {ReadableStream} until it terminates. + +By default, if the async iterator exits early (via either a `break`, +`return`, or a `throw`), the {ReadableStream} will be closed. To prevent +automatic closing of the {ReadableStream}, use the `readableStream.values()` +method to acquire the async iterator and set the `preventCancel` option to +`true`. + +The {ReadableStream} must not be locked (that is, it must not have an existing +active reader). During the async iteration, the {ReadableStream} will be locked. + +#### Cloning with postMessage() + +TBD + +### Class: `ReadableStreamDefaultReader` + + +#### `new ReadableStreamDefaultReader(stream)` + + +* `stream` {ReadableStream} + +#### `readableStreamDefaultReader.cancel([reason])` + + +* `reason` {any} + +#### `readableStreamDefaultReader.closed` + + +* Type: {Promise} Fulfilled with `undefined` when the reader is closed. + +#### `readableStreamDefaultReader.read()` + + +* Returns: A promise fulfilled with an object: + * `value` {ArrayBuffer} + * `done` {boolean} + +#### `readableStreamDefaultReader.releaseLock()` + + +### Class: `ReadableStreamBYOBReader` + + +#### `new ReadableStreamBYOBReader(stream)` + + +* `stream` {ReadableStream} + +#### `readableStreamBYOBReader.cancel([reason])` + + +* `reason` {any} + +#### `readableStreamBYOBReader.closed` + + +* Type: {Promise} Fulfilled with `undefined` when the reader is closed. + +#### `readableStreamBYOBReader.read(view)` + + +* `view` {Buffer|TypedArray|DataView} +* Returns: A promise fulfilled with an object: + * `value` {ArrayBuffer} + * `done` {boolean} + +#### `readableStreamBYOBReader.releaseLock()` + + +### Class: `ReadableStreamDefaultController` + + +#### `readableStreamDefaultController.close()` + + +#### `readableStreamDefaultController.desiredSize` + + +* Type: {number} + +#### `readableStreamDefaultController.enqueue(chunk)` + + +* `chunk` {any} + +#### `readableStreamDefaultController.error(error)` + + +* `error` {any} + +### Class: `ReadableByteStreamController` + + +#### `readableByteStreamController.byobRequest` + + +* Type: {ReadableStreamBYOBRequest} + +#### `readableByteStreamController.close()` + + +#### `readableByteStreamController.desiredSize` + + +* Type: {number} + +#### `readableByteStreamController.enqueue(chunk)` + + +* `chunk`: {Buffer|TypedArray|DataView} + +#### `readableByteStreamController.error(error)` + + +* `error` {any} + +### Class: `ReadableStreamBYOBRequest` + + +#### `readableStreamBYOBRequest.respond(bytesWritten)` + + +* `bytesWritten` {number} + +#### `readableStreamBYOBRequest.respondWithNewView(view)` + + +* `view` {Buffer|TypedArray|DataView} + +#### `readableStreamBYOBRequest.view` + + +* Type: {Buffer|TypedArray|DataView} + +### Class: `WritableStream` + + +#### `new WritableStream([underlyingSink[, strategy]])` + + +* `underlyingSink` {Object} + * `start` {Function} + * `controller` {WritableStreamDefaultController} + * Returns: `undefined` or a promise fulfilled with `undefined`. + * `write` {Function} + * `chunk` {any} + * `controller` {WritableStreamDefaultController} + * Returns: A promise fulfilled with `undefined`. + * `close` {Function} + * Returns: A promise fulfilled with `undefined`. + * `abort` {Function} + * `reason` {any} + * Returns: A promise fulfilled with `undefined`. + * `type` {any} +* `strategy` {Object} + * `highWaterMark` {number} + * `size` {Function} + * `chunk` {any} + * Returns: {number} + +#### `writableStream.abort([reason])` + + +* `reason` {any} +* Returns: A promise fulfilled with `undefined`. + +#### `writableStream.close()` + + +* Returns: A promise fulfilled with `undefined`. + +#### `writableStream.getWriter()` + + +* Returns: {WritableStreamDefaultWriter} + +#### `writableStream.locked` + + +* Type: {boolean} + +### Class: `WritableStreamDefaultWriter` + + +#### `new WritableStreamDefaultWriter(stream)` + + +* `stream` {WritableStream} + +#### `writableStreamDefaultWriter.abort([reason])` + + +* `reason` {any} +* Returns: A promise fulfilled with `undefined`. + +#### `writableStreamDefaultWriter.close()` + + +* Returns: A promise fulfilled with `undefined`. + +#### `writableStreamDefaultWriter.closed` + + +* Type: A promise that is fulfilled with `undefined` when the + writer is closed. + +#### `writableStreamDefaultWriter.desiredSize` + + +* Type: {number} + +#### `writableStreamDefaultWriter.ready` + + +* type: A promise that is fulfilled with `undefined` when the + writer is ready to be used. + +#### `writableStreamDefaultWriter.releaseLock()` + + +#### `writableStreamDefaultWriter.write([chunk])` + + +* `chunk`: {any} +* Returns: A promise fulfilled with `undefined`. + +### Class: `WritableStreamDefaultController` + + +#### `writableStreamDefaultController.error(error)` + + +* `error` {any} + +### Class: `TransformStream` + + +#### `new TransformStream([transformer[, writableStrategy[, readableStrategy]]])` + + +* `transformer` {Object} + * `start` {Function} + * `controller` {TransformStreamDefaultController} + * Returns: `undefined` or a promise fulfilled with `undefined` + * `transform` {Function} + * `chunk` {any} + * `controller` {TransformStreamDefaultController} + * Returns: A promise fulfilled with `undefined`. + * `flush` {Function} + * `controller` {TransformStreamDefaultController} + * Returns: A promise fulfilled with `undefined`. + * `readableType` {any} + * `writableType` {any} +* `writableStrategy` {Object} + * `highWaterMark` {number} + * `size` {Function} + * `chunk` {any} + * Returns: {number} +* `readableStrategy` {Object} + * `highWaterMark` {number} + * `size` {Function} + * `chunk` {any} + * Returns: {number} + +#### `transformStream.readable` + + +* Type: {ReadableStream} + +#### `transformStream.writable` + + +* Type: {WritableStream} + +### Class: `TransformStreamDefaultController` + + +#### `transformStreamDefaultController.desiredSize` + + +* Type: {number} + +#### `transformStreamDefaultController.enqueue([chunk])` + + +* `chunk` {any} + +#### `transformStreamDefaultController.error([reason])` + + +* `reason` {any} + +#### `transformStreamDefaultController.terminate()` + + +### Class: `ByteLengthQueuingStrategy` + + +#### `new ByteLengthQueuingStrategy(options)` + + +* `options` {Object} + * `highWaterMark` {number} + +#### `byteLengthQueuingStrategy.highWaterMark` + + +* Type: {number} + +#### `byteLengthQueuingStrategy.size` + + +* Type: {Function} + * `chunk` {any} + * Returns: {number} + +### Class: `CountQueuingStrategy` + + +#### `new CountQueuingStrategy(options)` + + +* `options` {Object} + * `highWaterMark` {number} + +#### `countQueuingStrategy.highWaterMark` + + +* Type: {number} + +#### `countQueuingStrategy.size` + + +* Type: {Function} + * `chunk` {any} + * Returns: {number} + +[WHATWG Streams Standard]: https://streams.spec.whatwg.org/ diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 13b56311d370b8..320cb07662967c 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1033,6 +1033,7 @@ E('ERR_HTTP_SOCKET_ENCODING', 'Changing the socket encoding is not allowed per RFC7230 Section 3.', Error); E('ERR_HTTP_TRAILER_INVALID', 'Trailers are invalid with this transfer encoding', Error); +E('ERR_ILLEGAL_CONSTRUCTOR', 'Illegal constructor', TypeError); E('ERR_INCOMPATIBLE_OPTION_PAIR', 'Option "%s" cannot be used in combination with option "%s"', TypeError); E('ERR_INPUT_TYPE_NOT_ALLOWED', '--input-type can only be used with string ' + @@ -1256,8 +1257,8 @@ E('ERR_INVALID_RETURN_VALUE', (input, name, value) => { } return `Expected ${input} to be returned from the "${name}"` + ` function but got ${type}.`; -}, TypeError); -E('ERR_INVALID_STATE', 'Invalid state: %s', Error); +}, TypeError, RangeError); +E('ERR_INVALID_STATE', 'Invalid state: %s', Error, TypeError); E('ERR_INVALID_SYNC_FORK_INPUT', 'Asynchronous forks do not support ' + 'Buffer, TypedArray, DataView or string input: %s', diff --git a/lib/internal/streams/whatwg.js b/lib/internal/streams/whatwg.js new file mode 100644 index 00000000000000..df9e6eb9cfff42 --- /dev/null +++ b/lib/internal/streams/whatwg.js @@ -0,0 +1,3850 @@ +'use strict'; + +/* eslint-disable no-use-before-define */ + +const { + ArrayBuffer, + ArrayPrototypePush, + ArrayPrototypeShift, + DataViewCtor, + FunctionPrototypeBind, + FunctionPrototypeCall, + MathMax, + MathMin, + NumberIsInteger, + NumberIsNaN, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + PromisePrototypeCatch, + PromisePrototypeThen, + PromiseResolve, + PromiseReject, + PromiseAll, + Symbol, + SymbolAsyncIterator, + SymbolToStringTag, + Uint8Array, +} = primordials; + +const { + AbortError, + codes: { + ERR_ILLEGAL_CONSTRUCTOR, + ERR_INVALID_ARG_VALUE, + ERR_INVALID_ARG_TYPE, + ERR_INVALID_STATE, + }, +} = require('internal/errors'); + +const { + copyArrayBuffer, + detachArrayBuffer +} = internalBinding('buffer'); + +const { + isArrayBufferView, + isDataView, +} = require('util/types'); + +const { + createDeferredPromise, + emitExperimentalWarning, + customInspectSymbol: kInspect, +} = require('internal/util'); + +const { + inspect, +} = require('util'); + +const { + serialize, + deserialize, +} = require('v8'); + +const { + validateAbortSignal, + validateBoolean, + validateObject, +} = require('internal/validators'); + +const { + getPromiseDetails, + kPending, +} = internalBinding('util'); + +const { + queueMicrotask, +} = require('internal/process/task_queues'); + +const assert = require('internal/assert'); + +emitExperimentalWarning('stream/whatwg'); + +const kAbort = Symbol('kAbort'); +const kCancel = Symbol('kCancel'); +const kClose = Symbol('kClose'); +const kCloseSentinel = Symbol('kCloseSentinel'); +const kChunk = Symbol('kChunk'); +const kError = Symbol('kError'); +const kPull = Symbol('kPull'); +const kState = Symbol('kState'); + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +function extractHighWaterMark(value, defaultHWM) { + if (value === undefined) return defaultHWM; + if (typeof value !== 'number' || + NumberIsNaN(value) || + value < 0) + throw new ERR_INVALID_ARG_VALUE.RangeError('strategy.highWaterMark', value); + return value; +} + +function extractSizeAlgorithm(size) { + if (size === undefined) return () => 1; + if (typeof size !== 'function') + throw new ERR_INVALID_ARG_VALUE('strategy.size', size); + return size; +} + +function customInspect(depth, options, name, data) { + if (depth < 0) + return this; + + const opts = { + ...options, + depth: options.depth == null ? null : options.depth - 1 + }; + + return `${name} ${inspect(data, opts)}`; +} + +/** + * @typedef {import('../abort_controller').AbortSignal} AbortSignal + * + * @typedef { + * ReadableStreamDefaultController | ReadableByteStreamController + * } ReadableStreamController + * + * @typedef { + * ReadableStreamDefaultReader | ReadableStreamBYOBReader + * } ReadableStreamReader + * + * @callback UnderlyingSourceStartCallback + * @param {ReadableStreamController} controller + * @returns { any | Promise } + * + * @callback UnderlyingSourcePullCallback + * @param {ReadableStreamController} controller + * @returns { Promise } + * + * @callback UnderlyingSourceCancelCallback + * @param {any} reason + * @returns { Promise } + * + * @callback QueuingStrategySize + * @param {any} chunk + * @returns {number} + * + * @callback UnderlyingSinkStartCallback + * @param {WritableStreamDefaultController} controller + * + * @callback UnderlyingSinkWriteCallback + * @param {any} chunk + * @param {WritableStreamDefaultController} controller + * @returns {Promise} + * + * @callback UnderlyingSinkCloseCallback + * @returns {Promise} + * + * @callback UnderlyingSinkAbortCallback + * @param {any} reason + * @returns {Promise} + * + * @callback TransformerStartCallback + * @param {TransformStreamDefaultController} controller; + * + * @callback TransformerFlushCallback + * @param {TransformStreamDefaultController} controller; + * @returns {Promise} + * + * @callback TransformerTransformCallback + * @param {any} chunk + * @param {TransformStreamDefaultController} controller + * @returns {Promise} + * + * @typedef {{ + * readable: ReadableStream, + * writable: WritableStream, + * }} ReadableWritablePair + * + * @typedef {{ + * preventClose? : boolean, + * preventAbort? : boolean, + * preventCancel? : boolean, + * signal? : AbortSignal, + * }} StreamPipeOptions + * + * @typedef {{ + * start? : UnderlyingSourceStartCallback, + * pull? : UnderlyingSourcePullCallback, + * cancel? : UnderlyingSourceCancelCallback, + * type? : "bytes", + * autoAllocateChunkSize? : number + * }} UnderlyingSource + * + * @typedef {{ + * start? : UnderlyingSinkStartCallback, + * write? : UnderlyingSinkWriteCallback, + * close? : UnderlyingSinkCloseCallback, + * abort? : UnderlyingSinkAbortCallback, + * type? : any, + * }} UnderlyingSink + * + * @typedef {{ + * start? : TransformerStartCallback, + * transform? : TransformerTransformCallback, + * flush? : TransformerFlushCallback, + * readableType? : any, + * writableType? : any, + * }} Transformer + * + * @typedef {{ + * highWaterMark : number, + * size? : QueuingStrategySize, + * }} QueuingStrategy + */ + +class ReadableStream { + /** + * @param {UnderlyingSource} [source] + * @param {QueuingStrategy} [strategy] + */ + constructor(source = null, strategy = {}) { + this[kState] = { + disturbed: false, + state: 'readable', + storedError: undefined, + stream: undefined, + }; + // The spec requires handling of the strategy first + // here. Specifically, if getting the size and + // highWaterMark from the strategy fail, that has + // to trigger a throw before getting the details + // from the source. So be sure to keep these in + // this order. + const size = strategy?.size; + const highWaterMark = strategy?.highWaterMark; + const type = source?.type; + + if (type === 'bytes') { + if (size !== undefined) + throw new ERR_INVALID_ARG_VALUE('strategy.size', size); + setupReadableByteStreamControllerFromSource( + this, + source, + extractHighWaterMark(highWaterMark, 0)); + return; + } + + if (type !== undefined) + throw new ERR_INVALID_ARG_VALUE('source.type', type); + setupReadableStreamDefaultControllerFromSource( + this, + source, + extractHighWaterMark(highWaterMark, 1), + extractSizeAlgorithm(size)); + } + + /** + * @readonly + * @type {boolean} + */ + get locked() { + return isReadableStreamLocked(this); + } + + /** + * @param {any} [reason] + * @returns { Promise } + */ + cancel(reason) { + if (isReadableStreamLocked(this)) { + return PromiseReject( + new ERR_INVALID_STATE.TypeError('ReadableStream is locked')); + } + return readableStreamCancel(this, reason); + } + + /** + * @param {{ + * mode? : "byob" + * }} [options] + * @returns {ReadableStreamReader} + */ + getReader(options = {}) { + validateObject(options, 'options'); + const { + mode, + } = options; + if (mode === undefined) { + return new ReadableStreamDefaultReader(this); + } + if (mode !== 'byob') + throw new ERR_INVALID_ARG_VALUE('options.mode', mode); + return new ReadableStreamBYOBReader(this); + } + + /** + * @param {ReadableWritablePair} transform + * @param {StreamPipeOptions} [options] + * @returns {ReadableStream} + */ + pipeThrough(transform, options = {}) { + const readable = transform?.readable; + const writable = transform?.writable; + if (!isReadableStream(readable)) { + throw new ERR_INVALID_ARG_TYPE( + 'transform.readable', + 'ReadableStream', + readable); + } + if (!isWritableStream(writable)) { + throw new ERR_INVALID_ARG_TYPE( + 'transform.writable', + 'WritableStream', + writable); + } + if (isReadableStreamLocked(this)) + throw new ERR_INVALID_STATE('The ReadableStream is locked'); + if (isWritableStreamLocked(writable)) + throw new ERR_INVALID_STATE('The WritableStream is locked'); + const { + signal, + preventClose = false, + preventAbort = false, + preventCancel = false, + } = options; + validateBoolean(preventAbort, 'options.preventAbort'); + validateBoolean(preventClose, 'options.preventClose'); + validateBoolean(preventCancel, 'options.preventCancel'); + if (signal !== undefined) + validateAbortSignal(signal, 'options.signal'); + + const promise = readableStreamPipeTo( + this, + writable, + preventClose, + preventAbort, + preventCancel, + signal); + setPromiseHandled(promise); + + return readable; + } + + /** + * @param {WritableStream} destination + * @param {StreamPipeOptions} [options] + * @returns {Promise} + */ + pipeTo(destination, options = {}) { + if (!isWritableStream(destination)) { + throw new ERR_INVALID_ARG_TYPE( + 'transform.writable', + 'WritableStream', + destination); + } + if (isReadableStreamLocked(this)) + throw new ERR_INVALID_STATE('The ReadableStream is locked'); + if (isWritableStreamLocked(destination)) + throw new ERR_INVALID_STATE('The WritableStream is locked'); + const { + signal, + preventClose = false, + preventAbort = false, + preventCancel = false, + } = options; + validateBoolean(preventAbort, 'options.preventAbort'); + validateBoolean(preventClose, 'options.preventClose'); + validateBoolean(preventCancel, 'options.preventCancel'); + if (signal !== undefined) + validateAbortSignal(signal, 'options.signal'); + + return readableStreamPipeTo( + this, + destination, + preventClose, + preventAbort, + preventCancel, + signal); + } + + /** + * @returns {ReadableStream[]} + */ + tee() { + return readableStreamTee(this, false); + } + + // The values() method is not defined in the streams standard + // spec but it is called out in the section on async iteration. + // So... that's weird. + + /** + * @param {{ + * preventCancel? : boolean, + * }} [options] + * @returns {AsyncIterable} + */ + values(options = {}) { + validateObject(options, 'options'); + const { + preventCancel = false, + } = options; + validateBoolean(preventCancel, 'options.preventCancel'); + + const reader = new ReadableStreamDefaultReader(this); + + return ObjectSetPrototypeOf({ + next() { + if (reader[kState].stream === undefined) { + return PromiseReject(new ERR_INVALID_STATE( + 'The reader is not bound to a ReadableStream')); + } + const promise = createDeferredPromise(); + readableStreamDefaultReaderRead(reader, { + [kChunk](chunk) { + promise.resolve({ value: chunk, done: false }); + }, + [kClose]() { + readableStreamReaderGenericRelease(reader); + promise.resolve({ done: true }); + }, + [kError](error) { + readableStreamReaderGenericRelease(reader); + promise.reject(error); + } + }); + return promise.promise; + }, + + return(error) { + if (reader[kState].stream === undefined) { + return PromiseResolve(); + } + assert(!reader[kState].readRequests.length); + if (!preventCancel) { + const result = readableStreamReaderGenericCancel(reader, error); + readableStreamReaderGenericRelease(reader); + return PromisePrototypeThen(result, () => { + return { done: true }; + }); + } + readableStreamReaderGenericRelease(reader); + return PromiseResolve({ done: true }); + }, + + [SymbolAsyncIterator]() { return this; } + }, AsyncIteratorPrototype); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'ReadableStream', { + locked: this.locked, + state: this[kState].state, + }); + } + + /** + * @returns {AsyncIterable} + */ + [SymbolAsyncIterator]() { + return this.values({ preventCancel: false }); + } + + get [SymbolToStringTag]() { return 'ReadableStream'; } +} + +class ReadableStreamBYOBRequest { + constructor() { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + + /** + * @readonly + * @type {ArrayBufferView} + */ + get view() { return this[kState].view; } + + /** + * @param {number} bytesWritten + */ + respond(bytesWritten) { + const { + view, + controller, + } = this[kState]; + if (controller === undefined) + throw new ERR_INVALID_STATE('This request is not bound to a controller'); + // Supposed to assert here that the view's buffer is not + // detached, but there's no API available to use to check that. + assert(view.byteLength || view.buffer.byteLength); + readableByteStreamControllerRespond(controller, bytesWritten); + } + + /** + * @param {ArrayBufferView} view + */ + respondWithNewView(view) { + const { + controller, + } = this[kState]; + // Supposed to assert here that the view's buffer is not + // detached, but there's no API available to use to check that. + readableByteStreamControllerRespondWithNewView(controller, view); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'ReadableStreamBYOBRequest', { + view: this.view, + controller: this[kState].controller, + }); + } +} + +class InternalReadableStreamBYOBRequest { + constructor(controller, view) { + this[kState] = { + controller, + view, + }; + } +} + +class DefaultReadRequest { + constructor() { + this[kState] = createDeferredPromise(); + } + + [kChunk](value) { + this[kState].resolve?.({ value, done: false }); + } + + [kClose]() { + this[kState].resolve?.({ value: undefined, done: true }); + } + + [kError](error) { + this[kState].reject?.(error); + } + + get promise() { return this[kState].promise; } +} + +class ReadIntoRequest { + constructor() { + this[kState] = createDeferredPromise(); + } + + [kChunk](value) { + this[kState].resolve?.({ value, done: false }); + } + + [kClose](value) { + this[kState].resolve?.({ value, done: true }); + } + + [kError](error) { + this[kState].reject?.(error); + } + + get promise() { return this[kState].promise; } +} + +class ReadableStreamDefaultReader { + /** + * @param {ReadableStream} stream + */ + constructor(stream) { + if (!isReadableStream(stream)) + throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); + this[kState] = { + readRequests: [], + stream: undefined, + close: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + }; + setupReadableStreamDefaultReader(this, stream); + } + + /** + * @returns {Promise<{ + * value : any, + * done : boolean + * }>} + */ + read() { + if (this[kState].stream === undefined) { + return PromiseReject( + new ERR_INVALID_STATE.TypeError( + 'The reader is not attached to a stream')); + } + const readRequest = new DefaultReadRequest(); + readableStreamDefaultReaderRead(this, readRequest); + return readRequest.promise; + } + + releaseLock() { + if (this[kState].stream === undefined) + return; + if (this[kState].readRequests.length) { + throw new ERR_INVALID_STATE.TypeError( + 'Cannot release with pending read requests'); + } + readableStreamReaderGenericRelease(this); + } + + /** + * @readonly + * @type {Promise} + */ + get closed() { return this[kState].close.promise; } + + /** + * @param {any} reason + * @returns {Promise} + */ + cancel(reason) { + if (this[kState].stream === undefined) { + throw new ERR_INVALID_STATE.TypeError( + 'The reader is not attached to a stream'); + } + return readableStreamReaderGenericCancel(this, reason); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'ReadableStreamDefaultReader', { + stream: this[kState].stream, + readRequests: this[kState].readRequests.length, + close: this[kState].close.promise, + }); + } + + get [SymbolToStringTag]() { return 'ReadableStreamDefaultReader'; } +} + +class ReadableStreamBYOBReader { + /** + * @param {ReadableStream} stream + */ + constructor(stream) { + if (!isReadableStream(stream)) + throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); + this[kState] = { + stream: undefined, + requestIntoRequests: [], + close: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + }; + setupReadableStreamBYOBReader(this, stream); + } + + /** + * @param {ArrayBufferView} view + * @returns {Promise<{ + * view : ArrayBufferView, + * done : boolean, + * }>} + */ + read(view) { + if (!isArrayBufferView(view)) { + return PromiseReject( + new ERR_INVALID_ARG_TYPE( + 'view', + [ + 'Buffer', + 'TypedArray', + 'DataView', + ], + view)); + } + if (view.byteLength === 0 || view.buffer.byteLength === 0) { + return PromiseReject( + new ERR_INVALID_ARG_VALUE('View cannot be zero length')); + } + // Supposed to assert here that the view's buffer is not + // detached, but there's no API available to use to check that. + if (this[kState].stream === undefined) { + return PromiseReject( + new ERR_INVALID_STATE('The reader is not attached to a stream')); + } + const readIntoRequest = new ReadIntoRequest(); + readableStreamBYOBReaderRead(this, view, readIntoRequest); + return readIntoRequest.promise; + } + + releaseLock() { + if (this[kState].stream === undefined) + return; + if (this[kState].readIntoRequests.length) { + throw new ERR_INVALID_STATE.TypeError( + 'Cannot release with pending read requests'); + } + readableStreamReaderGenericRelease(this); + } + + /** + * @readonly + * @type {Promise} + */ + get closed() { return this[kState].close.promise; } + + /** + * @param {any} reason + * @returns {Promise} + */ + cancel(reason) { + if (this[kState].stream === undefined) { + throw new ERR_INVALID_STATE.TypeError( + 'The reader is not attached to a stream'); + } + return readableStreamReaderGenericCancel(this, reason); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'ReadableStreamBYOBReader', { + stream: this[kState].stream, + requestIntoRequests: this[kState].requestIntoRequests.length, + close: this[kState].close.promise, + }); + } + + get [SymbolToStringTag]() { return 'ReadableStreamBYOBReader'; } +} + +class ReadableStreamDefaultController { + constructor() { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + + /** + * @readonly + * @type {number} + */ + get desiredSize() { + return readableStreamDefaultControllerGetDesiredSize(this); + } + + close() { + if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) + throw new ERR_INVALID_STATE('Controller is already closed'); + readableStreamDefaultControllerClose(this); + } + + /** + * @param {any} chunk + */ + enqueue(chunk) { + if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) + throw new ERR_INVALID_STATE('Controller is already closed'); + readableStreamDefaultControllerEnqueue(this, chunk); + } + + /** + * @param {any} error + */ + error(error) { + readableStreamDefaultControllerError(this, error); + } + + [kCancel](reason) { + return readableStreamDefaultControllerCancelSteps(this, reason); + } + + [kPull](readRequest) { + readableStreamDefaultControllerPullSteps(this, readRequest); + } + + [kInspect](depth, options) { + return customInspect( + depth, + options, + 'ReadableStreamDefaultController', { }); + } + + get [SymbolToStringTag]() { return 'ReadableStreamDefaultController'; } +} + +class InternalReadableStreamDefaultController { + constructor() { + this[kState] = {}; + } +} + +class ReadableByteStreamController { + constructor() { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + + /** + * @readonly + * @type {ReadableStreamBYOBRequest} + */ + get byobRequest() { + if (this[kState].byobRequest === null && + this[kState].pendingPullIntos.length) { + const { + buffer, + byteOffset, + bytesFilled, + byteLength, + } = this[kState].pendingPullIntos[0]; + const view = + new Uint8Array( + buffer, + byteOffset + bytesFilled, + byteLength - bytesFilled); + this[kState].byobRequest = + new InternalReadableStreamBYOBRequest(this, view); + } + return this[kState].byobRequest; + } + + /** + * @readonly + * @type {number} + */ + get desiredSize() { + return readableByteStreamControllerGetDesiredSize(this); + } + + close() { + if (this[kState].closeRequested) + throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); + if (this[kState].stream[kState].state !== 'readable') + throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); + readableByteStreamControllerClose(this); + } + + /** + * @param {ArrayBufferView} chunk + */ + enqueue(chunk) { + if (!isArrayBufferView(chunk)) { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', + [ + 'Buffer', + 'TypedArray', + 'DataView', + ], + chunk); + } + if (chunk.byteLength === 0 || chunk.buffer.byteLength === 0) { + throw new ERR_INVALID_ARG_VALUE( + 'chunk', + chunk, + 'cannot be zero length'); + } + if (this[kState].closeRequested) + throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); + if (this[kState].stream[kState].state !== 'readable') + throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); + readableByteStreamControllerEnqueue(this, chunk); + } + + /** + * @param {any} error + */ + error(error) { + readableByteStreamControllerError(this, error); + } + + [kCancel](reason) { + return readableByteStreamControllerCancelSteps(this, reason); + } + + [kPull](readRequest) { + readableByteStreamControllerPullSteps(this, readRequest); + } + + [kInspect](depth, options) { + return customInspect( + depth, + options, + 'ReadableByteStreamController', { }); + } + + get [SymbolToStringTag]() { return 'ReadableByteStreamController'; } +} + +class InternalReadableByteStreamController { + constructor() { + this[kState] = {}; + } +} + +class WritableStream { + /** + * @param {UnderlyingSink} [sink] + * @param {QueuingStrategy} [strategy] + */ + constructor(sink = null, strategy = {}) { + const type = sink?.type; + if (type !== undefined) + throw new ERR_INVALID_ARG_VALUE.RangeError('type', type); + + this[kState] = { + close: createDeferredPromise(), + closeRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightWriteRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + inFlightCloseRequest: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + pendingAbortRequest: { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }, + backpressure: false, + controller: undefined, + state: 'writable', + storedError: undefined, + writeRequests: [], + writer: undefined, + }; + + const highWaterMark = strategy?.highWaterMark; + const size = strategy?.size; + + setupWritableStreamDefaultControllerFromSink( + this, + sink, + extractHighWaterMark(highWaterMark, 1), + extractSizeAlgorithm(size)); + } + + /** + * @readonly + * @type {boolean} + */ + get locked() { return isWritableStreamLocked(this); } + + /** + * @param {any} reason + * @returns {Promise} + */ + abort(reason) { + if (isWritableStreamLocked(this)) + return PromiseReject(new ERR_INVALID_STATE('WritableStream is locked')); + return writableStreamAbort(this, reason); + } + + /** + * @returns {Promise} + */ + close() { + if (isWritableStreamLocked(this)) + return PromiseReject(new ERR_INVALID_STATE('WritableStream is locked')); + if (writableStreamCloseQueuedOrInFlight(this)) { + return PromiseReject( + new ERR_INVALID_STATE('Failure closing WritableStream')); + } + return writableStreamClose(this); + } + + /** + * @returns {WritableStreamDefaultWriter} + */ + getWriter() { + return new WritableStreamDefaultWriter(this); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'WritableStream', { + locked: this.locked, + state: this[kState].state, + }); + } + + get [SymbolToStringTag]() { return 'WritableStream'; } +} + +class WritableStreamDefaultWriter { + /** + * @param {WritableStream} stream + */ + constructor(stream) { + if (!isWritableStream(stream)) + throw new ERR_INVALID_ARG_TYPE('stream', 'WritableStream', stream); + this[kState] = { + stream: undefined, + close: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + ready: { + promise: undefined, + resolve: undefined, + reject: undefined, + } + }; + setupWritableStreamDefaultWriter(this, stream); + } + + /** + * @readonly + * @type {Promise} + */ + get closed() { return this[kState].close.promise; } + + /** + * @readonly + * @type {number} + */ + get desiredSize() { + if (this[kState].stream === undefined) + throw new ERR_INVALID_STATE('Writer is not bound to a WritableStream'); + return writableStreamDefaultWriterGetDesiredSize(this); + } + + /** + * @readonly + * @type {Promise} + */ + get ready() { return this[kState].ready.promise; } + + /** + * @param {any} reason + * @returns {Promise} + */ + abort(reason) { + if (this[kState].stream === undefined) { + return PromiseReject( + new ERR_INVALID_STATE('Writer is not bound to a WritableStream')); + } + return writableStreamDefaultWriterAbort(this, reason); + } + + /** + * @returns {Promise} + */ + close() { + const { + stream, + } = this[kState]; + if (stream === undefined) { + return PromiseReject( + new ERR_INVALID_STATE('Writer is not bound to a WritableStream')); + } + if (writableStreamCloseQueuedOrInFlight(stream)) { + return PromiseReject( + new ERR_INVALID_STATE('Failure to close WritableStream')); + } + return writableStreamDefaultWriterClose(this); + } + + releaseLock() { + const { + stream, + } = this[kState]; + if (stream === undefined) + return; + assert(stream[kState].writer !== undefined); + writableStreamDefaultWriterRelease(this); + } + + /** + * @param {any} chunk + * @returns {Promise} + */ + write(chunk) { + if (this[kState].stream === undefined) { + return PromiseReject( + new ERR_INVALID_STATE('Writer is not bound to a WritableStream')); + } + return writableStreamDefaultWriterWrite(this, chunk); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'WritableStreamDefaultWriter', { + stream: this[kState].stream, + close: this[kState].close.promise, + ready: this[kState].ready.promise, + desiredSize: this.desiredSize, + }); + } + + get [SymbolToStringTag]() { return 'WritableStreamDefaultWriter'; } +} + +class WritableStreamDefaultController { + constructor() { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + + [kAbort](reason) { + const result = this[kState].abortAlgorithm(reason); + writableStreamDefaultControllerClearAlgorithms(this); + return result; + } + + [kError]() { + resetQueue(this); + } + + /** + * @param {any} error + */ + error(error) { + if (this[kState].stream[kState].state !== 'writable') + return; + writableStreamDefaultControllerError(this, error); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'WritableStreamDefaultController', { + stream: this[kState].stream, + }); + } + + get [SymbolToStringTag]() { return 'WritableStreamDefaultController'; } +} + +class InternalWritableStreamDefaultController {} + +class TransformStream { + /** + * @param {Transformer} [transformer] + * @param {QueuingStrategy} [writableStrategy] + * @param {QueuingStrategy} [readableStrategy] + */ + constructor( + transformer = null, + writableStrategy = {}, + readableStrategy = {}) { + + const readableType = transformer?.readableType; + const writableType = transformer?.writableType; + const start = transformer?.start; + + if (readableType !== undefined) + throw new ERR_INVALID_ARG_VALUE('transformer.readableType', readableType); + if (writableType !== undefined) + throw new ERR_INVALID_ARG_VALUE('transformer.writableType', writableType); + + const readableHighWaterMark = readableStrategy?.highWaterMark; + const readableSize = readableStrategy?.size; + + const writableHighWaterMark = writableStrategy?.highWaterMark; + const writableSize = writableStrategy?.size; + + const actualReadableHighWaterMark = + extractHighWaterMark(readableHighWaterMark, 0); + const actualReadableSize = extractSizeAlgorithm(readableSize); + + const actualWritableHighWaterMark = + extractHighWaterMark(writableHighWaterMark, 1); + const actualWritableSize = extractSizeAlgorithm(writableSize); + + const startPromise = createDeferredPromise(); + + initializeTransformStream( + this, + startPromise, + actualWritableHighWaterMark, + actualWritableSize, + actualReadableHighWaterMark, + actualReadableSize); + + setupTransformStreamDefaultControllerFromTransformer(this, transformer); + + if (start !== undefined) { + startPromise.resolve( + FunctionPrototypeCall( + start, + transformer, + this[kState].controller)); + } else { + startPromise.resolve(); + } + } + + /** + * @readonly + * @type {ReadableStream} + */ + get readable() { return this[kState].readable; } + + /** + * @readonly + * @type {WritableStream} + */ + get writable() { return this[kState].writable; } + + [kInspect](depth, options) { + return customInspect(depth, options, 'TransformStream', { + readable: this.readable, + writable: this.writable, + backpressure: this[kState].backpressure, + }); + } + + get [SymbolToStringTag]() { return 'TransformStream'; } +} + +class TransformStreamDefaultController { + constructor() { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + + /** + * @readonly + * @type {number} + */ + get desiredSize() { + const { + stream, + } = this[kState]; + const { + readable, + } = stream[kState]; + const { + controller: readableController, + } = readable[kState]; + return readableStreamDefaultControllerGetDesiredSize(readableController); + } + + /** + * @param {any} chunk + */ + enqueue(chunk) { + transformStreamDefaultControllerEnqueue(this, chunk); + } + + /** + * @param {any} reason + */ + error(reason) { + transformStreamDefaultControllerError(this, reason); + } + + terminate() { + transformStreamDefaultControllerTerminate(this); + } + + [kInspect](depth, options) { + return customInspect(depth, options, 'TransformStreamDefaultController', { + stream: this[kState].stream, + }); + } + + get [SymbolToStringTag]() { return 'TransformStreamDefaultController'; } +} + +class InternalTransformStreamDefaultController {} + +class ByteLengthQueuingStrategy { + /** + * @param {{ + * highWaterMark : number + * }} init + */ + constructor(init = {}) { + validateObject(init, 'init'); + const { + highWaterMark + } = init; + if (typeof highWaterMark !== 'number') { + throw new ERR_INVALID_ARG_TYPE( + 'init.highWaterMark', + 'number', + highWaterMark); + } + this[kState] = { + highWaterMark, + }; + this.size = + FunctionPrototypeBind( + ByteLengthQueuingStrategy.prototype.size, + this); + } + + /** + * @readonly + * @type {number} + */ + get highWaterMark() { + return this[kState].highWaterMark; + } + + /** + * @type {QueuingStrategySize} + */ + size(chunk) { return chunk?.byteLength | 0; } + + [kInspect](depth, options) { + return customInspect(depth, options, 'ByteLengthQueuingStrategy', { + highWaterMark: this.highWaterMark, + }); + } + + get [SymbolToStringTag]() { return 'ByteLengthQueuingStrategy'; } +} + +class CountQueuingStrategy { + /** + * @param {{ + * highWaterMark : number + * }} init + */ + constructor(init = {}) { + validateObject(init, 'init'); + const { + highWaterMark + } = init; + if (typeof highWaterMark !== 'number') { + throw new ERR_INVALID_ARG_TYPE( + 'init.highWaterMark', + 'number', + highWaterMark); + } + this[kState] = { + highWaterMark, + }; + this.size = + FunctionPrototypeBind( + CountQueuingStrategy.prototype.size, + this); + } + + /** + * @readonly + * @type {number} + */ + get highWaterMark() { + return this[kState].highWaterMark; + } + + /** + * @type {QueuingStrategySize} + */ + size() { return 1; } + + [kInspect](depth, options) { + return customInspect(depth, options, 'CountQueuingStrategy', { + highWaterMark: this.highWaterMark, + }); + } + + get [SymbolToStringTag]() { return 'CountQueuingStrategy'; } +} + +internalExtend( + InternalReadableByteStreamController, + ReadableByteStreamController); + +internalExtend( + InternalReadableStreamDefaultController, + ReadableStreamDefaultController); + +internalExtend( + InternalReadableStreamBYOBRequest, + ReadableStreamBYOBRequest); + +internalExtend( + InternalWritableStreamDefaultController, + WritableStreamDefaultController); + +internalExtend( + InternalTransformStreamDefaultController, + TransformStreamDefaultController); + +function isReadableStream(value) { + return value[kState] !== undefined && + value[SymbolToStringTag] === 'ReadableStream'; +} + +function isWritableStream(value) { + return value[kState] !== undefined && + value[SymbolToStringTag] === 'WritableStream'; +} + +function isTransformStream(value) { + return value[kState] !== undefined && + value[SymbolToStringTag] === 'TransformStream'; +} + +function isReadableByteStreamController(value) { + return value[kState] !== undefined && + value[SymbolToStringTag] === 'ReadableByteStreamController'; +} + +function internalExtend(ctor, actual) { + ctor.prototype.constructor = actual; + ObjectSetPrototypeOf(ctor.prototype, actual.prototype); +} + +function transferArrayBuffer(buffer) { + const res = detachArrayBuffer(buffer); + if (res === undefined) + throw new ERR_INVALID_ARG_VALUE('buffer', 'ArrayBuffer', buffer); + return res; +} + +function setPromiseHandled(promise) { + // Alternatively, we could use the native API + // MarkAsHandled, but this avoids the extra boundary cross + // and is hopefully faster at the cost of an extra Promise + // allocation. + PromisePrototypeThen(promise, () => {}, () => {}); +} + +function dequeueValue(controller) { + assert(controller[kState].queue !== undefined); + assert(controller[kState].queueTotalSize !== undefined); + assert(controller[kState].queue.length); + const { + value, + size, + } = ArrayPrototypeShift(controller[kState].queue); + controller[kState].queueTotalSize = + MathMax(0, controller[kState].queueTotalSize - size); + return value; +} + +function resetQueue(controller) { + assert(controller[kState].queue !== undefined); + assert(controller[kState].queueTotalSize !== undefined); + controller[kState].queue = []; + controller[kState].queueTotalSize = 0; +} + +function peekQueueValue(controller) { + assert(controller[kState].queue !== undefined); + assert(controller[kState].queueTotalSize !== undefined); + assert(controller[kState].queue.length); + return controller[kState].queue[0].value; +} + +function enqueueValueWithSize(controller, value, size) { + assert(controller[kState].queue !== undefined); + assert(controller[kState].queueTotalSize !== undefined); + if (typeof size !== 'number' || + size < 0 || + size === Infinity) { + throw new ERR_INVALID_ARG_VALUE.RangeError('size', size); + } + ArrayPrototypePush(controller[kState].queue, { value, size }); + controller[kState].queueTotalSize += size; +} + +// ---- ReadableStream Implementation + +function readableStreamPipeTo( + source, + dest, + preventClose, + preventAbort, + preventCancel, + signal) { + + const reader = new ReadableStreamDefaultReader(source); + const writer = new WritableStreamDefaultWriter(dest); + + source[kState].disturbed = true; + + let shuttingDown = false; + + const promise = createDeferredPromise(); + + function finalize(error) { + writableStreamDefaultWriterRelease(writer); + readableStreamReaderGenericRelease(reader); + if (signal !== undefined) + signal.removeEventListener('abort', abortAlgorithm); + if (error !== undefined) + promise.reject(error); + else + promise.resolve(); + } + + function shutdownWithAnAction(action, originalError) { + if (shuttingDown) return; + shuttingDown = true; + if (dest[kState].state === 'writable' && + !writableStreamCloseQueuedOrInFlight(dest)) { + // TODO: If any chunks have been read but but not yet written, write them + // to dest. Wait until every chunk that has been read has been written + } + PromisePrototypeThen(action, finalize, finalize); + } + + function shutdown(error) { + if (shuttingDown) return; + shuttingDown = true; + if (dest[kState].state === 'writable' && + !writableStreamCloseQueuedOrInFlight(dest)) { + // TODO: if any chunks have been read but not yet written write them to + // dest. Wait until every chunk that has been read has been written + } + finalize(error); + } + + function abortAlgorithm() { + const error = new AbortError(); + const actions = []; + if (!preventAbort) { + if (dest[kState].state === 'writable') + ArrayPrototypePush(actions, writableStreamAbort(dest, error)); + else + ArrayPrototypePush(actions, PromiseResolve()); + } + if (!preventCancel) { + if (source[kState].state === 'readable') + ArrayPrototypePush(actions, readableStreamCancel(source, error)); + else + ArrayPrototypePush(actions, PromiseResolve()); + } + shutdownWithAnAction(PromiseAll(actions), error); + } + + // The streams spec requires that the public API must not + // be used when interacting with the reader and writer, + // so we bypass those here by duplicating in a separate + // function that we can be sure is not modified. + function read() { + const readRequest = new DefaultReadRequest(); + readableStreamDefaultReaderRead(reader, readRequest); + return readRequest.promise; + } + + function write(chunk) { + return writableStreamDefaultWriterWrite(writer, chunk); + } + + function watchErrored(stream, promise, action) { + if (stream[kState].state === 'errored') + action(stream[kState].storedError); + else + PromisePrototypeCatch(promise, action); + } + + function watchClosed(stream, promise, action) { + if (stream[kState].state === 'closed') + action(stream[kState].storedError); + else + PromisePrototypeThen(promise, action); + } + + async function step() { + if (shuttingDown) + return true; + await writer[kState].ready.promise; + const { value, done } = await read(); + await write(value); + return done; + } + + async function run() { + // Run until step resolves as true + while (!await step()) {} + } + + if (signal !== undefined) { + if (signal.aborted) { + abortAlgorithm(); + return promise.promise; + } + signal.addEventListener('abort', abortAlgorithm, { once: true }); + } + + setPromiseHandled(run()); + + watchErrored(source, reader[kState].close.promise, (error) => { + if (!preventAbort) + shutdownWithAnAction(writableStreamAbort(dest, error), error); + else + shutdown(error); + }); + + watchErrored(dest, writer[kState].close.promise, (error) => { + if (!preventCancel) + shutdownWithAnAction(readableStreamCancel(dest, error), error); + else + shutdown(error); + }); + + watchClosed(source, reader[kState].close.promise, () => { + if (!preventClose) { + shutdownWithAnAction( + writableStreamDefaultWriterCloseWithErrorPropagation(writer)); + } else { + shutdown(); + } + }); + + if (writableStreamCloseQueuedOrInFlight(dest) || + dest[kState].state === 'closed') { + const error = new ERR_INVALID_STATE.TypeError( + 'Destination WritableStream is closed'); + if (!preventCancel) + shutdownWithAnAction(readableStreamCancel(source, error), error); + else + shutdown(error); + } + + return promise.promise; +} + +function readableStreamTee(stream, cloneForBranch2) { + const reader = new ReadableStreamDefaultReader(stream); + let reading = false; + let canceled1 = false; + let canceled2 = false; + let reason1; + let reason2; + let branch1; + let branch2; + const cancelPromise = createDeferredPromise(); + + async function pullAlgorithm() { + if (reading) return; + reading = true; + const readRequest = { + [kChunk](value) { + queueMicrotask(() => { + reading = false; + const value1 = value; + let value2 = value; + if (!canceled2 && cloneForBranch2) { + // Structured Clone + value2 = deserialize(serialize(value2)); + } + if (!canceled1) { + readableStreamDefaultControllerEnqueue( + branch1[kState].controller, + value1); + } + if (!canceled2) { + readableStreamDefaultControllerEnqueue( + branch2[kState].controller, + value2); + } + }); + }, + [kClose]() { + reading = false; + if (!canceled1) + readableStreamDefaultControllerClose(branch1[kState].controller); + if (!canceled2) + readableStreamDefaultControllerClose(branch2[kState].controller); + if (!canceled1 || !canceled2) + cancelPromise.resolve(); + }, + [kError]() { + reading = false; + }, + }; + readableStreamDefaultReaderRead(reader, readRequest); + } + + function cancel1Algorithm(reason) { + canceled1 = true; + reason1 = reason; + if (canceled2) { + const compositeReason = [reason1, reason2]; + cancelPromise.resolve(readableStreamCancel(stream, compositeReason)); + } + return cancelPromise.promise; + } + + function cancel2Algorithm(reason) { + canceled2 = true; + reason2 = reason; + if (canceled1) { + const compositeReason = [reason1, reason2]; + cancelPromise.resolve(readableStreamCancel(stream, compositeReason)); + } + return cancelPromise.promise; + } + + branch1 = new ReadableStream({ + start: nonOpStart, + pull: pullAlgorithm, + cancel: cancel1Algorithm, + }); + + branch2 = new ReadableStream({ + start: nonOpStart, + pull: pullAlgorithm, + cancel: cancel2Algorithm, + }); + + PromisePrototypeCatch( + reader[kState].close.promise, + (error) => { + readableStreamDefaultControllerError(branch1[kState].controller, error); + readableStreamDefaultControllerError(branch2[kState].controller, error); + }); + + return [branch1, branch2]; +} + +function readableByteStreamControllerConvertPullIntoDescriptor(desc) { + const { + buffer, + bytesFilled, + byteLength, + byteOffset, + ctor, + elementSize, + } = desc; + assert(bytesFilled <= byteLength); + assert(bytesFilled % elementSize === 0); + const transferedBuffer = transferArrayBuffer(buffer); + return new ctor(transferedBuffer, byteOffset, bytesFilled / elementSize); +} + +function isReadableStreamLocked(stream) { + return stream[kState].reader !== undefined; +} + +function readableStreamCancel(stream, reason) { + stream[kState].disturbed = true; + switch (stream[kState].state) { + case 'closed': + return PromiseResolve(); + case 'errored': + return PromiseReject(stream[kState].storedError); + } + readableStreamClose(stream); + const { + reader, + } = stream[kState]; + if (reader !== undefined && readableStreamHasBYOBReader(stream)) { + for (let n = 0; n < reader[kState].readIntoRequests.length; n++) + reader[kState].readIntoRequests[n][kClose](); + reader[kState].readIntoRequests = []; + } + + try { + return PromisePrototypeThen( + PromiseResolve(stream[kState].controller[kCancel](reason))); + } catch (error) { + return PromiseReject(error); + } +} + +function readableStreamClose(stream) { + assert(stream[kState].state === 'readable'); + stream[kState].state = 'closed'; + + const { + reader, + } = stream[kState]; + + if (reader === undefined) + return; + + reader[kState].close.resolve(); + + if (readableStreamHasDefaultReader(stream)) { + for (let n = 0; n < reader[kState].readRequests.length; n++) + reader[kState].readRequests[n][kClose](); + reader[kState].readRequests = []; + } +} + +function readableStreamError(stream, error) { + assert(stream[kState].state === 'readable'); + stream[kState].state = 'errored'; + stream[kState].storedError = error; + + const { + reader + } = stream[kState]; + + if (reader === undefined) + return; + + reader[kState].close.reject(error); + setPromiseHandled(reader[kState].close.promise); + + if (readableStreamHasDefaultReader(stream)) { + for (let n = 0; n < reader[kState].readRequests.length; n++) + reader[kState].readRequests[n][kError](error); + reader[kState].readRequests = []; + } else { + assert(readableStreamHasBYOBReader(stream)); + for (let n = 0; n < reader[kState].readIntoRequests.length; n++) + reader[kState].readIntoRequests[n][kError](error); + reader[kState].readIntoRequests = []; + } +} + +function readableStreamHasDefaultReader(stream) { + const { + reader, + } = stream[kState]; + + if (reader === undefined) + return false; + + return reader[kState] !== undefined && + reader[SymbolToStringTag] === 'ReadableStreamDefaultReader'; +} + +function readableStreamGetNumReadRequests(stream) { + assert(readableStreamHasDefaultReader(stream)); + return stream[kState].reader[kState].readRequests.length; +} + +function readableStreamHasBYOBReader(stream) { + const { + reader, + } = stream[kState]; + + if (reader === undefined) + return false; + + return reader[kState] !== undefined && + reader[SymbolToStringTag] === 'ReadableStreamBYOBReader'; +} + +function readableStreamGetNumReadIntoRequests(stream) { + assert(readableStreamHasBYOBReader(stream)); + return stream[kState].reader[kState].readIntoRequests.length; +} + +function readableStreamFulfillReadRequest(stream, chunk, done) { + assert(readableStreamHasDefaultReader(stream)); + const { + reader, + } = stream[kState]; + assert(reader[kState].readRequests.length); + const readRequest = ArrayPrototypeShift(reader[kState].readRequests); + if (done) + readRequest[kClose](); + else + readRequest[kChunk](chunk); +} + +function readableStreamFulfillReadIntoRequest(stream, chunk, done) { + assert(readableStreamHasBYOBReader(stream)); + const { + reader, + } = stream[kState]; + assert(reader[kState].readIntoRequests.length); + const readIntoRequest = ArrayPrototypeShift(reader[kState].readIntoRequests); + if (done) + readIntoRequest[kClose](chunk); + else + readIntoRequest[kChunk](chunk); +} + +function readableStreamAddReadRequest(stream, readRequest) { + assert(readableStreamHasDefaultReader(stream)); + assert(stream[kState].state === 'readable'); + ArrayPrototypePush(stream[kState].reader[kState].readRequests, readRequest); +} + +function readableStreamAddReadIntoRequest(stream, readIntoRequest) { + assert(readableStreamHasBYOBReader(stream)); + assert(stream[kState].state !== 'errored'); + ArrayPrototypePush( + stream[kState].reader[kState].readIntoRequests, + readIntoRequest); +} + +function readableStreamReaderGenericCancel(reader, reason) { + const { + stream, + } = reader[kState]; + assert(stream !== undefined); + return readableStreamCancel(stream, reason); +} + +function readableStreamReaderGenericInitialize(reader, stream) { + reader[kState].stream = stream; + stream[kState].reader = reader; + switch (stream[kState].state) { + case 'readable': + reader[kState].close = createDeferredPromise(); + break; + case 'closed': + reader[kState].close = { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; + break; + case 'errored': + reader[kState].close = { + promise: PromiseReject(stream[kState].storedError), + resolve: undefined, + reject: undefined, + }; + setPromiseHandled(reader[kState].close.promise); + break; + } +} + +function readableStreamReaderGenericRelease(reader) { + const { + stream, + } = reader[kState]; + assert(stream !== undefined); + assert(stream[kState].reader === reader); + + if (stream[kState].state === 'readable') { + reader[kState].close.reject?.( + new ERR_INVALID_STATE.TypeError('Reader released')); + } else { + reader[kState].close = { + promise: PromiseReject( + new ERR_INVALID_STATE.TypeError('Reader released')), + resolve: undefined, + reject: undefined, + }; + } + setPromiseHandled(reader[kState].close.promise); + stream[kState].reader = undefined; + reader[kState].stream = undefined; +} + +function readableStreamBYOBReaderRead(reader, view, readIntoRequest) { + const { + stream, + } = reader[kState]; + assert(stream !== undefined); + stream[kState].disturbed = true; + if (stream[kState].state === 'errored') { + readIntoRequest[kError](stream[kState].storedError); + return; + } + readableByteStreamControllerPullInto( + stream[kState].controller, + view, + readIntoRequest); +} + +function readableStreamDefaultReaderRead(reader, readRequest) { + const { + stream, + } = reader[kState]; + assert(stream !== undefined); + stream[kState].disturbed = true; + switch (stream[kState].state) { + case 'closed': + readRequest[kClose](); + break; + case 'errored': + readRequest[kError](stream[kState].storedError); + break; + case 'readable': + stream[kState].controller[kPull](readRequest); + } +} + +function setupReadableStreamBYOBReader(reader, stream) { + if (isReadableStreamLocked(stream)) + throw new ERR_INVALID_STATE('ReadableStream is locked'); + const { + controller, + } = stream[kState]; + if (!isReadableByteStreamController(controller)) + throw new ERR_INVALID_ARG_VALUE('reader', reader, 'must be a byte stream'); + readableStreamReaderGenericInitialize(reader, stream); + reader[kState].readIntoRequests = []; +} + +function setupReadableStreamDefaultReader(reader, stream) { + if (isReadableStreamLocked(stream)) + throw new ERR_INVALID_STATE('ReadableStream is locked'); + readableStreamReaderGenericInitialize(reader, stream); + reader[kState].readRequests = []; +} + +function readableStreamDefaultControllerClose(controller) { + if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return; + controller[kState].closeRequested = true; + if (!controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(controller[kState].stream); + } +} + +function readableStreamDefaultControllerEnqueue(controller, chunk) { + if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return; + + const { + stream, + } = controller[kState]; + + if (isReadableStreamLocked(stream) && + readableStreamGetNumReadRequests(stream)) { + readableStreamFulfillReadRequest(stream, chunk, false); + } else { + try { + const chunkSize = controller[kState].sizeAlgorithm(chunk); + enqueueValueWithSize(controller, chunk, chunkSize); + } catch (error) { + readableStreamDefaultControllerError(controller, error); + return; + } + } + readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +function readableStreamDefaultControllerHasBackpressure(controller) { + return !readableStreamDefaultControllerShouldCallPull(controller); +} + +function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { + const { + stream, + } = controller[kState]; + return !controller[kState].closeRequested && + stream[kState].state === 'readable'; +} + +function readableStreamDefaultControllerGetDesiredSize(controller) { + const { + stream, + highWaterMark, + queueTotalSize, + } = controller[kState]; + switch (stream[kState].state) { + case 'errored': return null; + case 'closed': return 0; + default: + return highWaterMark - queueTotalSize; + } +} + +function readableStreamDefaultControllerShouldCallPull(controller) { + const { + stream, + } = controller[kState]; + if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) || + !controller[kState].started) + return false; + + if (isReadableStreamLocked(stream) && + readableStreamGetNumReadRequests(stream)) { + return true; + } + + const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); + assert(desiredSize !== null); + + return desiredSize > 0; +} + +function readableStreamDefaultControllerCallPullIfNeeded(controller) { + if (!readableStreamDefaultControllerShouldCallPull(controller)) + return; + if (controller[kState].pulling) { + controller[kState].pullAgain = true; + return; + } + assert(!controller[kState].pullAgain); + controller[kState].pulling = true; + PromisePrototypeThen( + (async () => controller[kState].pullAlgorithm())(), + () => { + controller[kState].pulling = false; + if (controller[kState].pullAgain) { + controller[kState].pullAgain = false; + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, + (error) => readableStreamDefaultControllerError(controller, error)); +} + +function readableStreamDefaultControllerClearAlgorithms(controller) { + controller[kState].pullAlgorithm = undefined; + controller[kState].cancelAlgorithm = undefined; + controller[kState].sizeAlgorithm = undefined; +} + +function readableStreamDefaultControllerError(controller, error) { + const { + stream, + } = controller[kState]; + if (stream[kState].state === 'readable') { + resetQueue(controller); + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamError(stream, error); + } +} + +function readableStreamDefaultControllerCancelSteps(controller, reason) { + resetQueue(controller); + const result = controller[kState].cancelAlgorithm(reason); + readableStreamDefaultControllerClearAlgorithms(controller); + return result; +} + +function readableStreamDefaultControllerPullSteps(controller, readRequest) { + const { + stream, + queue, + } = controller[kState]; + if (queue.length) { + const chunk = dequeueValue(controller); + if (controller[kState].closeRequested && !queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } else { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + readRequest[kChunk](chunk); + return; + } + readableStreamAddReadRequest(stream, readRequest); + readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +function setupReadableStreamDefaultController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + highWaterMark, + sizeAlgorithm) { + assert(stream[kState].controller === undefined); + controller[kState] = { + cancelAlgorithm, + closeRequested: false, + highWaterMark, + pullAgain: false, + pullAlgorithm, + pulling: false, + queue: [], + queueTotalSize: 0, + started: false, + sizeAlgorithm, + stream, + }; + stream[kState].controller = controller; + + PromisePrototypeThen( + (async () => startAlgorithm())(), + () => { + controller[kState].started = true; + assert(!controller[kState].pulling); + assert(!controller[kState].pullAgain); + readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + (error) => readableStreamDefaultControllerError(controller, error)); +} + +function setupReadableStreamDefaultControllerFromSource( + stream, + source, + highWaterMark, + sizeAlgorithm) { + const controller = new InternalReadableStreamDefaultController(); + const start = source?.start; + const pull = source?.pull; + const cancel = source?.cancel; + const startAlgorithm = start ? + FunctionPrototypeBind(start, source, controller) : + nonOpStart; + const pullAlgorithm = pull ? + FunctionPrototypeBind(pull, source, controller) : + nonOpPull; + const cancelAlgorithm = cancel ? + FunctionPrototypeBind(cancel, source) : + nonOpCancel; + setupReadableStreamDefaultController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + highWaterMark, + sizeAlgorithm); +} + +function readableByteStreamControllerClose(controller) { + const { + closeRequested, + pendingPullIntos, + queueTotalSize, + stream, + } = controller[kState]; + + if (closeRequested || stream[kState].state !== 'readable') + return; + + if (queueTotalSize) { + controller[kState].closeRequested = true; + return; + } + + if (pendingPullIntos.length) { + const firstPendingPullInto = pendingPullIntos[0]; + if (firstPendingPullInto.bytesFilled > 0) { + const error = new ERR_INVALID_STATE.TypeError('Partial read'); + readableByteStreamControllerError(controller, error); + throw error; + } + } + + readableByteStreamControllerClearAlgorithms(controller); + readableStreamClose(stream); +} + +function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) { + assert(stream[kState].state !== 'errored'); + let done = false; + if (stream[kState].state === 'closed') { + desc.bytesFilled = 0; + done = true; + } + + const filledView = + readableByteStreamControllerConvertPullIntoDescriptor(desc); + + if (desc.type === 'default') + readableStreamFulfillReadRequest(stream, filledView, done); + else { + assert(desc.type === 'byob'); + readableStreamFulfillReadIntoRequest(stream, filledView, done); + } +} + +function readableByteStreamControllerInvalidateBYOBRequest(controller) { + if (controller[kState].byobRequest === null) + return; + controller[kState].byobRequest[kState].controller = undefined; + controller[kState].byobRequest[kState].view = null; + controller[kState].byobRequest = null; +} + +function readableByteStreamControllerClearAlgorithms(controller) { + controller[kState].pullAlgorithm = undefined; + controller[kState].cancelAlgorithm = undefined; +} + +function readableByteStreamControllerClearPendingPullIntos(controller) { + readableByteStreamControllerInvalidateBYOBRequest(controller); + controller[kState].pendingPullIntos = []; +} + +function readableByteStreamControllerGetDesiredSize(controller) { + const { + stream, + highWaterMark, + queueTotalSize, + } = controller[kState]; + switch (stream[kState].state) { + case 'errored': return null; + case 'closed': return 0; + default: return highWaterMark - queueTotalSize; + } +} + +function readableByteStreamControllerShouldCallPull(controller) { + const { + stream, + } = controller[kState]; + if (stream[kState].state !== 'readable' || + controller[kState].closeRequested || + !controller[kState].started) { + return false; + } + if (readableStreamHasDefaultReader(stream) && + readableStreamGetNumReadRequests(stream) > 0) { + return true; + } + + if (readableStreamHasBYOBReader(stream) && + readableStreamGetNumReadIntoRequests(stream) > 0) { + return true; + } + + const desiredSize = readableByteStreamControllerGetDesiredSize(controller); + assert(desiredSize !== null); + + return desiredSize > 0; +} + +function readableByteStreamControllerHandleQueueDrain(controller) { + const { + closeRequested, + queueTotalSize, + stream, + } = controller[kState]; + assert(stream[kState].state === 'readable'); + if (!queueTotalSize && closeRequested) { + readableByteStreamControllerClearAlgorithms(controller); + readableStreamClose(stream); + return; + } + readableByteStreamControllerCallPullIfNeeded(controller); +} + +function readableByteStreamControllerPullInto( + controller, + view, + readIntoRequest) { + const { + closeRequested, + stream, + pendingPullIntos, + } = controller[kState]; + let elementSize = 1; + let ctor = DataViewCtor; + if (isArrayBufferView(view) && !isDataView(view)) { + elementSize = view.constructor.BYTES_PER_ELEMENT; + ctor = view.constructor; + } + const { + buffer, + byteOffset, + byteLength, + } = view; + let transferedBuffer; + try { + transferedBuffer = transferArrayBuffer(buffer); + } catch (error) { + readIntoRequest[kError](error); + return; + } + const desc = { + buffer: transferedBuffer, + byteOffset, + byteLength, + bytesFilled: 0, + elementSize, + ctor, + type: 'byob', + }; + if (pendingPullIntos.length) { + ArrayPrototypePush(pendingPullIntos, desc); + readableStreamAddReadIntoRequest(stream, readIntoRequest); + return; + } + if (stream[kState].state === 'closed') { + const emptyView = new ctor(desc.buffer, byteOffset, 0); + readIntoRequest[kClose](emptyView); + return; + } + if (controller[kState].queueTotalSize) { + if (readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + desc)) { + const filledView = + readableByteStreamControllerConvertPullIntoDescriptor(desc); + readableByteStreamControllerHandleQueueDrain(controller); + readIntoRequest[kChunk](filledView); + return; + } + if (closeRequested) { + const error = new ERR_INVALID_STATE('ReadableStream closed'); + readableByteStreamControllerError(controller, error); + readIntoRequest[kError](error); + return; + } + } + ArrayPrototypePush(pendingPullIntos, desc); + readableStreamAddReadIntoRequest(stream, readIntoRequest); + readableByteStreamControllerCallPullIfNeeded(controller); +} + +function readableByteStreamControllerRespondInternal(controller, bytesWritten) { + const { + stream, + pendingPullIntos, + } = controller[kState]; + const desc = pendingPullIntos[0]; + // TODO(@jasnell): Assert can transfer array buffer(desc.buffer) + readableByteStreamControllerInvalidateBYOBRequest(controller); + + if (stream[kState].state === 'closed') { + assert(bytesWritten === 0); + readableByteStreamControllerRespondInClosedState(controller, desc); + } else { + assert(stream[kState].state === 'readable'); + assert(bytesWritten > 0); + readableByteStreamControllerRespondInReadableState( + controller, + bytesWritten, + desc); + } + readableByteStreamControllerCallPullIfNeeded(controller); +} + +function readableByteStreamControllerRespond(controller, bytesWritten) { + const { + pendingPullIntos, + stream, + } = controller[kState]; + assert(pendingPullIntos.length); + const desc = pendingPullIntos[0]; + + if (stream[kState].state === 'closed') { + if (bytesWritten !== 0) + throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten); + } else { + assert(stream[kState].state === 'readable'); + + if (bytesWritten === 0) + throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten); + + if (desc.bytesFilled + bytesWritten > desc.byteLength) + throw new ERR_INVALID_ARG_VALUE.RangeError('bytesWritten', bytesWritten); + } + + desc.buffer = transferArrayBuffer(desc.buffer); + + readableByteStreamControllerRespondInternal(controller, bytesWritten); +} + +function readableByteStreamControllerRespondInClosedState(controller, desc) { + assert(desc.bytesFilled === 0); + const { + stream, + } = controller[kState]; + if (readableStreamHasBYOBReader(stream)) { + while (readableStreamGetNumReadIntoRequests(stream) > 0) { + readableByteStreamControllerCommitPullIntoDescriptor( + stream, + readableByteStreamControllerShiftPendingPullInto(controller)); + } + } +} + +function readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + size, + desc) { + const { + pendingPullIntos, + byobRequest, + } = controller[kState]; + assert(!pendingPullIntos.length || pendingPullIntos[0] === desc); + assert(byobRequest === null); + desc.bytesFilled += size; +} + +function readableByteStreamControllerEnqueue(controller, chunk) { + const { + closeRequested, + pendingPullIntos, + queue, + stream, + } = controller[kState]; + + if (closeRequested || stream[kState].state !== 'readable') + return; + + const { + buffer, + byteOffset, + byteLength, + } = chunk; + + // Supposed to assert here that the buffer is not + // detached, but there's no API available to use to check that. + + const transferedBuffer = transferArrayBuffer(buffer); + + if (pendingPullIntos.length) { + const firstPendingPullInto = pendingPullIntos[0]; + // Supposed to assert here that the firstPendingPullInto's buffer is not + // detached, but there's no API available to use to check that. + firstPendingPullInto.buffer = + transferArrayBuffer(firstPendingPullInto.buffer); + } + + readableByteStreamControllerInvalidateBYOBRequest(controller); + + if (readableStreamHasDefaultReader(stream)) { + if (readableStreamGetNumReadRequests(stream)) { + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferedBuffer, + byteOffset, + byteLength); + } else { + assert(!queue.length); + const transferedView = + new Uint8Array(transferedBuffer, byteOffset, byteLength); + readableStreamFulfillReadRequest(stream, transferedView, false); + } + } else if (readableStreamHasBYOBReader(stream)) { + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferedBuffer, + byteOffset, + byteLength); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller); + } else { + assert(!isReadableStreamLocked(stream)); + readableByteStreamControllerEnqueueChunkToQueue( + controller, + transferedBuffer, + byteOffset, + byteLength); + } + readableByteStreamControllerCallPullIfNeeded(controller); +} + +function readableByteStreamControllerEnqueueChunkToQueue( + controller, + buffer, + byteOffset, + byteLength) { + ArrayPrototypePush( + controller[kState].queue, + { + buffer, + byteOffset, + byteLength, + }); + controller[kState].queueTotalSize += byteLength; +} + +function readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + desc) { + const { + buffer, + byteLength, + byteOffset, + bytesFilled, + elementSize, + } = desc; + const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize); + const maxBytesToCopy = MathMin( + controller[kState].queueTotalSize, + byteLength - bytesFilled); + const maxBytesFilled = bytesFilled + maxBytesToCopy; + const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); + let totalBytesToCopyRemaining = maxBytesToCopy; + let ready = false; + if (maxAlignedBytes > currentAlignedBytes) { + totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled; + ready = true; + } + const { + queue, + } = controller[kState]; + + while (totalBytesToCopyRemaining) { + const headOfQueue = queue[0]; + const bytesToCopy = MathMin( + totalBytesToCopyRemaining, + headOfQueue.byteLength); + const destStart = byteOffset + desc.bytesFilled; + copyArrayBuffer( + buffer, + destStart, + headOfQueue.buffer, + headOfQueue.byteOffset, + bytesToCopy); + if (headOfQueue.byteLength === bytesToCopy) { + ArrayPrototypeShift(queue); + } else { + headOfQueue.byteOffset += bytesToCopy; + headOfQueue.byteLength -= bytesToCopy; + } + controller[kState].queueTotalSize -= bytesToCopy; + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesToCopy, + desc); + totalBytesToCopyRemaining -= bytesToCopy; + } + + if (!ready) { + assert(controller[kState].queueTotalSize === 0); + assert(desc.bytesFilled > 0); + assert(desc.bytesFilled < elementSize); + } + return ready; +} + +function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller) { + const { + closeRequested, + pendingPullIntos, + stream, + } = controller[kState]; + assert(!closeRequested); + while (pendingPullIntos.length) { + if (controller[kState].queueTotalSize === 0) + return; + const desc = pendingPullIntos[0]; + if (readableByteStreamControllerFillPullIntoDescriptorFromQueue( + controller, + desc)) { + readableByteStreamControllerShiftPendingPullInto(controller); + readableByteStreamControllerCommitPullIntoDescriptor(stream, desc); + } + } +} + +function readableByteStreamControllerRespondInReadableState( + controller, + bytesWritten, + desc) { + const { + buffer, + bytesFilled, + byteLength, + } = desc; + assert(bytesFilled + bytesWritten <= byteLength); + readableByteStreamControllerFillHeadPullIntoDescriptor( + controller, + bytesWritten, + desc); + + if (desc.bytesFilled < desc.elementSize) + return; + + readableByteStreamControllerShiftPendingPullInto(controller); + + const remainderSize = desc.bytesFilled % desc.elementSize; + + if (remainderSize) { + const end = desc.byteOffset + desc.bytesFilled; + const remainder = buffer.slice(end - remainderSize, remainderSize); + readableByteStreamControllerEnqueueChunkToQueue( + controller, + remainder, + 0, + remainder.byteLength); + } + desc.bytesFilled -= remainderSize; + readableByteStreamControllerCommitPullIntoDescriptor( + controller[kState].stream, + desc); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); +} + +function readableByteStreamControllerRespondWithNewView(controller, view) { + const { + stream, + pendingPullIntos, + } = controller[kState]; + assert(pendingPullIntos.length); + // Supposed to assert here that the view's buffer is not + // detached, but there's no API available to use to check that. + const desc = pendingPullIntos[0]; + assert(stream[kState].state !== 'errored'); + + if (!isArrayBufferView(view)) { + throw new ERR_INVALID_ARG_TYPE( + 'view', + [ + 'Buffer', + 'TypedArray', + 'DataView', + ], + view); + } + if (view.byteLength === 0 || view.buffer.byteLength === 0) { + throw new ERR_INVALID_ARG_VALUE( + 'view', + view, + 'cannot be zero length'); + } + + const { + buffer, + byteOffset, + byteLength, + bytesFilled, + } = desc; + + if (byteOffset + bytesFilled !== view.byteOffset) + throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); + + if (buffer.byteLength !== view.buffer.byteLength) + throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); + + if (bytesFilled + view.byteLength > byteLength) + throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); + + desc.buffer = transferArrayBuffer(view.buffer); + + readableByteStreamControllerRespondInternal(controller, view.byteLength); +} + +function readableByteStreamControllerShiftPendingPullInto(controller) { + assert(controller[kState].byobRequest === null); + return ArrayPrototypeShift(controller[kState].pendingPullIntos); +} + +function readableByteStreamControllerCallPullIfNeeded(controller) { + if (!readableByteStreamControllerShouldCallPull(controller)) + return; + if (controller[kState].pulling) { + controller[kState].pullAgain = true; + return; + } + assert(!controller[kState].pullAgain); + controller[kState].pulling = true; + PromisePrototypeThen( + (async () => controller[kState].pullAlgorithm())(), + () => { + controller[kState].pulling = false; + if (controller[kState].pullAgain) { + controller[kState].pullAgain = false; + readableByteStreamControllerCallPullIfNeeded(controller); + } + }, + (error) => readableByteStreamControllerError(controller, error)); +} + +function readableByteStreamControllerError(controller, error) { + const { + stream, + } = controller[kState]; + if (stream[kState].state !== 'readable') + return; + readableByteStreamControllerClearPendingPullIntos(controller); + resetQueue(controller); + readableByteStreamControllerClearAlgorithms(controller); + readableStreamError(stream, error); +} + +function readableByteStreamControllerCancelSteps(controller, reason) { + readableByteStreamControllerClearPendingPullIntos(controller); + resetQueue(controller); + const result = controller[kState].cancelAlgorithm(reason); + readableByteStreamControllerClearAlgorithms(controller); + return result; +} + +function readableByteStreamControllerPullSteps(controller, readRequest) { + const { + pendingPullIntos, + queue, + queueTotalSize, + stream, + } = controller[kState]; + assert(readableStreamHasDefaultReader(stream)); + if (queueTotalSize) { + assert(!readableStreamGetNumReadRequests(stream)); + const { + buffer, + byteOffset, + byteLength, + } = ArrayPrototypeShift(queue); + controller[kState].queueTotalSize -= byteLength; + readableByteStreamControllerHandleQueueDrain(controller); + const view = new Uint8Array(buffer, byteOffset, byteLength); + readRequest[kChunk](view); + return; + } + const { + autoAllocateChunkSize, + } = controller[kState]; + if (autoAllocateChunkSize !== undefined) { + try { + const buffer = new ArrayBuffer(autoAllocateChunkSize); + ArrayPrototypePush( + pendingPullIntos, + { + buffer, + byteOffset: 0, + byteLength: autoAllocateChunkSize, + bytesFilled: 0, + elementSize: 1, + ctor: Uint8Array, + type: 'default', + }); + } catch (error) { + readRequest[kError](error); + return; + } + } + readableStreamAddReadRequest(stream, readRequest); + readableByteStreamControllerCallPullIfNeeded(controller); +} + +function setupReadableByteStreamController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + highWaterMark, + autoAllocateChunkSize) { + assert(stream[kState].controller === undefined); + if (autoAllocateChunkSize !== undefined) { + assert(NumberIsInteger(autoAllocateChunkSize)); + assert(autoAllocateChunkSize > 0); + } + controller[kState] = { + byobRequest: null, + closeRequested: false, + pullAgain: false, + pulling: false, + started: false, + stream, + queue: [], + queueTotalSize: 0, + highWaterMark, + pullAlgorithm, + cancelAlgorithm, + autoAllocateChunkSize, + pendingPullIntos: [], + }; + stream[kState].controller = controller; + + PromisePrototypeThen( + (async () => startAlgorithm())(), + () => { + controller[kState].started = true; + assert(!controller[kState].pulling); + assert(!controller[kState].pullAgain); + readableByteStreamControllerCallPullIfNeeded(controller); + }, + (error) => readableByteStreamControllerError(controller, error)); +} + +function setupReadableByteStreamControllerFromSource( + stream, + source, + highWaterMark) { + const controller = new InternalReadableByteStreamController(); + const start = source?.start; + const pull = source?.pull; + const cancel = source?.cancel; + const autoAllocateChunkSize = source?.autoAllocateChunkSize; + const startAlgorithm = start ? + FunctionPrototypeBind(start, source, controller) : + nonOpStart; + const pullAlgorithm = pull ? + FunctionPrototypeBind(pull, source, controller) : + nonOpPull; + const cancelAlgorithm = cancel ? + FunctionPrototypeBind(cancel, source) : + nonOpCancel; + if (autoAllocateChunkSize === 0) { + throw new ERR_INVALID_ARG_VALUE( + 'source.autoAllocateChunkSize', + autoAllocateChunkSize); + } + setupReadableByteStreamController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + highWaterMark, + autoAllocateChunkSize); +} + +function nonOpStart() {} + +async function nonOpPull() {} + +async function nonOpCancel() {} + +async function nonOpWrite() {} + +// ---- WritableStream Implementation + +function isPromisePending(promise) { + if (promise === undefined) return false; + const { 0: state } = getPromiseDetails(promise); + return state === kPending; +} + +function isWritableStreamLocked(stream) { + return stream[kState].writer !== undefined; +} + +function setupWritableStreamDefaultWriter(writer, stream) { + if (isWritableStreamLocked(stream)) + throw new ERR_INVALID_STATE.TypeError('WritableStream is locked'); + writer[kState].stream = stream; + stream[kState].writer = writer; + switch (stream[kState].state) { + case 'writable': + if (!writableStreamCloseQueuedOrInFlight(stream) && + stream[kState].backpressure) { + writer[kState].ready = createDeferredPromise(); + } else { + writer[kState].ready = { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; + } + setClosedPromiseToNewPromise(); + break; + case 'erroring': + writer[kState].ready = { + promise: PromiseReject(stream[kState].storedError), + resolve: undefined, + reject: undefined, + }; + setPromiseHandled(writer[kState].ready.promise); + setClosedPromiseToNewPromise(); + break; + case 'closed': + writer[kState].ready = { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; + writer[kState].close = { + promise: PromiseResolve(), + resolve: undefined, + reject: undefined, + }; + break; + default: + writer[kState].ready = { + promise: PromiseReject(stream[kState].storedError), + resolve: undefined, + reject: undefined, + }; + writer[kState].close = { + promise: PromiseReject(stream[kState].storedError), + resolve: undefined, + reject: undefined, + }; + setPromiseHandled(writer[kState].ready.promise); + setPromiseHandled(writer[kState].close.promise); + } + + function setClosedPromiseToNewPromise() { + writer[kState].close = createDeferredPromise(); + } +} + +function writableStreamAbort(stream, reason) { + const { + state, + } = stream[kState]; + if (state === 'closed' || state === 'errored') + return PromiseResolve(); + + if (stream[kState].pendingAbortRequest.abort.promise !== undefined) + return stream[kState].pendingAbortRequest.abort.promise; + + assert(state === 'writable' || state === 'erroring'); + + let wasAlreadyErroring = false; + if (state === 'erroring') { + wasAlreadyErroring = true; + reason = undefined; + } + + stream[kState].pendingAbortRequest = { + abort: createDeferredPromise(), + reason, + wasAlreadyErroring, + }; + + if (!wasAlreadyErroring) + writableStreamStartErroring(stream, reason); + + return stream[kState].pendingAbortRequest.abort.promise; +} + +function writableStreamClose(stream) { + const { + state, + writer, + backpressure, + controller, + } = stream[kState]; + if (state === 'closed' || state === 'errored') + return PromiseReject(new ERR_INVALID_STATE('WritableStream is closed')); + assert(state === 'writable' || state === 'erroring'); + assert(!writableStreamCloseQueuedOrInFlight(stream)); + stream[kState].closeRequest = createDeferredPromise(); + const { promise } = stream[kState].closeRequest; + if (writer !== undefined && backpressure && state === 'writable') + writer[kState].ready.resolve?.(); + writableStreamDefaultControllerClose(controller); + return promise; +} + +function writableStreamUpdateBackpressure(stream, backpressure) { + assert(stream[kState].state === 'writable'); + assert(!writableStreamCloseQueuedOrInFlight(stream)); + const { + writer, + } = stream[kState]; + if (writer !== undefined && stream[kState].backpressure !== backpressure) { + if (backpressure) { + writer[kState].ready = createDeferredPromise(); + } else { + writer[kState].ready.resolve?.(); + } + } + stream[kState].backpressure = backpressure; +} + +function writableStreamStartErroring(stream, reason) { + assert(stream[kState].storedError === undefined); + assert(stream[kState].state === 'writable'); + const { + controller, + writer, + } = stream[kState]; + assert(controller !== undefined); + stream[kState].state = 'erroring'; + stream[kState].storedError = reason; + if (writer !== undefined) { + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + } + if (!writableStreamHasOperationMarkedInFlight(stream) && + controller[kState].started) { + writableStreamFinishErroring(stream); + } +} + +function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { + assert(stream[kState].state === 'errored'); + if (stream[kState].closeRequest.promise !== undefined) { + assert(stream[kState].inFlightCloseRequest.promise === undefined); + stream[kState].closeRequest.reject?.(stream[kState].storedError); + stream[kState].closeRequest = { + promise: undefined, + reject: undefined, + resolve: undefined, + }; + } + const { + writer, + } = stream[kState]; + if (writer !== undefined) { + writer[kState].close.reject?.(stream[kState].storedError); + setPromiseHandled(writer[kState].close.promise); + } +} + +function writableStreamMarkFirstWriteRequestInFlight(stream) { + assert(stream[kState].inFlightWriteRequest.promise === undefined); + assert(stream[kState].writeRequests.length); + const writeRequest = ArrayPrototypeShift(stream[kState].writeRequests); + stream[kState].inFlightWriteRequest = writeRequest; +} + +function writableStreamMarkCloseRequestInFlight(stream) { + assert(stream[kState].inFlightWriteRequest.promise === undefined); + assert(stream[kState].closeRequest.promise !== undefined); + stream[kState].inFlightCloseRequest = stream[kState].closeRequest; + stream[kState].closeRequest = { + promise: undefined, + resolve: undefined, + reject: undefined, + }; +} + +function writableStreamHasOperationMarkedInFlight(stream) { + const { + inFlightWriteRequest, + inFlightCloseRequest, + } = stream[kState]; + if (inFlightWriteRequest.promise === undefined && + inFlightCloseRequest.promise === undefined) { + return false; + } + return true; +} + +function writableStreamFinishInFlightWriteWithError(stream, error) { + assert(stream[kState].inFlightWriteRequest.promise !== undefined); + stream[kState].inFlightWriteRequest.reject?.(error); + stream[kState].inFlightWriteRequest = { + promise: undefined, + resolve: undefined, + reject: undefined, + }; + assert(stream[kState].state === 'writable' || + stream[kState].state === 'erroring'); + writableStreamDealWithRejection(stream, error); +} + +function writableStreamFinishInFlightWrite(stream) { + assert(stream[kState].inFlightWriteRequest.promise !== undefined); + stream[kState].inFlightWriteRequest.resolve?.(); + stream[kState].inFlightWriteRequest = { + promise: undefined, + resolve: undefined, + reject: undefined, + }; +} + +function writableStreamFinishInFlightCloseWithError(stream, error) { + assert(stream[kState].inFlightCloseRequest.promise !== undefined); + stream[kState].inFlightCloseRequest.reject?.(error); + stream[kState].inFlightCloseRequest = { + promise: undefined, + resolve: undefined, + reject: undefined, + }; + assert(stream[kState].state === 'writable' || + stream[kState].state === 'erroring'); + if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { + stream[kState].pendingAbortRequest.abort.reject?.(error); + stream[kState].pendingAbortRequest = { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }; + } + writableStreamDealWithRejection(stream, error); +} + +function writableStreamFinishInFlightClose(stream) { + assert(stream[kState].inFlightCloseRequest.promise !== undefined); + stream[kState].inFlightCloseRequest.resolve?.(); + stream[kState].inFlightCloseRequest = { + promise: undefined, + resolve: undefined, + reject: undefined, + }; + if (stream[kState].state === 'erroring') { + stream[kState].storedError = undefined; + if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { + stream[kState].pendingAbortRequest.abort.resolve?.(); + stream[kState].pendingAbortRequest = { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }; + } + } + stream[kState].state = 'closed'; + if (stream[kState].writer !== undefined) + stream[kState].writer[kState].close.resolve?.(); + assert(stream[kState].pendingAbortRequest.abort.promise === undefined); + assert(stream[kState].storedError === undefined); +} + +function writableStreamFinishErroring(stream) { + assert(stream[kState].state === 'erroring'); + assert(!writableStreamHasOperationMarkedInFlight(stream)); + stream[kState].state = 'errored'; + stream[kState].controller[kError](); + const storedError = stream[kState].storedError; + for (let n = 0; n < stream[kState].writeRequests.length; n++) + stream[kState].writeRequests[n].reject?.(storedError); + stream[kState].writeRequests = []; + + if (stream[kState].pendingAbortRequest.abort.promise === undefined) { + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + const abortRequest = stream[kState].pendingAbortRequest; + stream[kState].pendingAbortRequest = { + abort: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + reason: undefined, + wasAlreadyErroring: false, + }; + if (abortRequest.wasAlreadyErroring) { + abortRequest.abort.reject?.(storedError); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + PromisePrototypeThen( + PromiseResolve( + (async () => stream[kState].controller[kAbort](abortRequest.reason))()), + () => { + abortRequest.abort.resolve?.(); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + (error) => { + abortRequest.abort.reject?.(error); + writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }); +} + +function writableStreamDealWithRejection(stream, error) { + const { + state, + } = stream[kState]; + if (state === 'writable') { + writableStreamStartErroring(stream, error); + return; + } + + assert(state === 'erroring'); + writableStreamFinishErroring(stream); +} + +function writableStreamCloseQueuedOrInFlight(stream) { + if (stream[kState].closeRequest.promise === undefined && + stream[kState].inFlightCloseRequest.promise === undefined) { + return false; + } + return true; +} + +function writableStreamAddWriteRequest(stream) { + assert(isWritableStreamLocked(stream)); + assert(stream[kState].state === 'writable'); + const { + promise, + resolve, + reject, + } = createDeferredPromise(); + ArrayPrototypePush( + stream[kState].writeRequests, + { + promise, + resolve, + reject, + }); + return promise; +} + +function writableStreamDefaultWriterWrite(writer, chunk) { + const { + stream, + } = writer[kState]; + assert(stream !== undefined); + const { + controller, + } = stream[kState]; + const chunkSize = writableStreamDefaultControllerGetChunkSize( + controller, + chunk); + if (stream !== writer[kState].stream) { + return PromiseReject( + new ERR_INVALID_STATE.TypeError('Mismatched WritableStreams')); + } + const { + state, + } = stream[kState]; + + if (state === 'errored') + return PromiseReject(stream[kState].storedError); + + if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') { + return PromiseReject( + new ERR_INVALID_STATE.TypeError('WritableStream is closed')); + } + + if (state === 'erroring') + return PromiseReject(stream[kState].storedError); + + assert(state === 'writable'); + + const promise = writableStreamAddWriteRequest(stream); + writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +function writableStreamDefaultWriterRelease(writer) { + const { + stream, + } = writer[kState]; + assert(stream !== undefined); + assert(stream[kState].writer === writer); + const releasedError = + new ERR_INVALID_STATE.TypeError('Writer has been released'); + writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); + stream[kState].writer = undefined; + writer[kState].stream = undefined; +} + +function writableStreamDefaultWriterGetDesiredSize(writer) { + const { + stream, + } = writer[kState]; + switch (stream[kState].state) { + case 'erroror': + // Fall through + case 'erroring': + return null; + case 'closed': + return 0; + } + return writableStreamDefaultControllerGetDesiredSize( + stream[kState].controller); +} + +function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { + if (isPromisePending(writer[kState].ready.promise)) { + writer[kState].ready.reject?.(error); + } else { + writer[kState].ready = { + promise: PromiseReject(error), + resolve: undefined, + reject: undefined, + }; + } + setPromiseHandled(writer[kState].ready.promise); +} + +function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { + if (isPromisePending(writer[kState].close.promise)) { + writer[kState].close.reject?.(error); + } else { + writer[kState].close = { + promise: PromiseReject(error), + resolve: undefined, + reject: undefined, + }; + } + setPromiseHandled(writer[kState].close.promise); +} + +function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { + const { + stream, + } = writer[kState]; + assert(stream !== undefined); + const { + state, + } = stream[kState]; + if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') + return PromiseResolve(); + + if (state === 'errored') + return PromiseReject(stream[kState].storedError); + + assert(state === 'writable' || state === 'erroring'); + + return writableStreamDefaultWriterClose(writer); +} + +function writableStreamDefaultWriterClose(writer) { + const { + stream, + } = writer[kState]; + assert(stream !== undefined); + return writableStreamClose(stream); +} + +function writableStreamDefaultWriterAbort(writer, reason) { + const { + stream, + } = writer[kState]; + assert(stream !== undefined); + return writableStreamAbort(stream, reason); +} + +function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { + try { + enqueueValueWithSize(controller, chunk, chunkSize); + } catch (error) { + writableStreamDefaultControllerErrorIfNeeded(controller, error); + return; + } + const { + stream, + } = controller[kState]; + if (!writableStreamCloseQueuedOrInFlight(stream) && + stream[kState].state === 'writable') { + writableStreamUpdateBackpressure( + stream, + writableStreamDefaultControllerGetBackpressure(controller)); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +function writableStreamDefaultControllerProcessWrite(controller, chunk) { + const { + stream, + writeAlgorithm, + } = controller[kState]; + writableStreamMarkFirstWriteRequestInFlight(stream); + + PromisePrototypeThen( + (async () => writeAlgorithm(chunk, controller))(), + () => { + writableStreamFinishInFlightWrite(stream); + const { + state, + } = stream[kState]; + assert(state === 'writable' || state === 'erroring'); + dequeueValue(controller); + if (!writableStreamCloseQueuedOrInFlight(stream) && + state === 'writable') { + writableStreamUpdateBackpressure( + stream, + writableStreamDefaultControllerGetBackpressure(controller)); + } + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (error) => { + if (stream[kState].state === 'writable') + writableStreamDefaultControllerClearAlgorithms(controller); + writableStreamFinishInFlightWriteWithError(stream, error); + }); + +} + +function writableStreamDefaultControllerProcessClose(controller) { + const { + closeAlgorithm, + queue, + stream, + } = controller[kState]; + writableStreamMarkCloseRequestInFlight(stream); + dequeueValue(controller); + assert(!queue.length); + const sinkClosePromise = (async () => closeAlgorithm())(); + writableStreamDefaultControllerClearAlgorithms(controller); + PromisePrototypeThen( + PromiseResolve(sinkClosePromise), + () => writableStreamFinishInFlightClose(stream), + (error) => writableStreamFinishInFlightCloseWithError(stream, error)); +} + +function writableStreamDefaultControllerGetDesiredSize(controller) { + const { + highWaterMark, + queueTotalSize, + } = controller[kState]; + return highWaterMark - queueTotalSize; +} + +function writableStreamDefaultControllerGetChunkSize(controller, chunk) { + try { + return controller[kState].sizeAlgorithm(chunk); + } catch (error) { + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller, error); + return 1; + } +} + +function writableStreamDefaultControllerErrorIfNeeded(controller, error) { + const { + stream, + } = controller[kState]; + if (stream[kState].state === 'writable') + writableStreamDefaultControllerError(controller, error); +} + +function writableStreamDefaultControllerError(controller, error) { + const { + stream, + } = controller[kState]; + assert(stream[kState].state === 'writable'); + writableStreamDefaultControllerClearAlgorithms(controller); + writableStreamStartErroring(stream, error); +} + +function writableStreamDefaultControllerClose(controller) { + enqueueValueWithSize(controller, kCloseSentinel, 0); + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +function writableStreamDefaultControllerClearAlgorithms(controller) { + controller[kState].writeAlgorithm = undefined; + controller[kState].closeAlgorithm = undefined; + controller[kState].abortAlgorithm = undefined; + controller[kState].sizeAlgorithm = undefined; +} + +function writableStreamDefaultControllerGetBackpressure(controller) { + return writableStreamDefaultControllerGetDesiredSize(controller) <= 0; +} + +function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { + const { + queue, + started, + stream, + } = controller[kState]; + if (!started || stream[kState].inFlightWriteRequest.promise !== undefined) + return; + + if (stream[kState].state === 'erroring') { + writableStreamFinishErroring(stream); + return; + } + + if (!queue.length) + return; + + const value = peekQueueValue(controller); + if (value === kCloseSentinel) + writableStreamDefaultControllerProcessClose(controller); + else + writableStreamDefaultControllerProcessWrite(controller, value); +} + +function setupWritableStreamDefaultControllerFromSink( + stream, + sink, + highWaterMark, + sizeAlgorithm) { + const controller = new InternalWritableStreamDefaultController(); + const start = sink?.start; + const write = sink?.write; + const close = sink?.close; + const abort = sink?.abort; + const startAlgorithm = start ? + FunctionPrototypeBind(start, sink, controller) : + nonOpStart; + const writeAlgorithm = write ? + FunctionPrototypeBind(write, sink) : + nonOpWrite; + const closeAlgorithm = close ? + FunctionPrototypeBind(close, sink) : nonOpCancel; + const abortAlgorithm = abort ? + FunctionPrototypeBind(abort, sink) : nonOpCancel; + setupWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm); +} + +function setupWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm) { + assert(isWritableStream(stream)); + assert(stream[kState].controller === undefined); + controller[kState] = { + abortAlgorithm, + closeAlgorithm, + highWaterMark, + queue: [], + queueTotalSize: 0, + sizeAlgorithm, + started: false, + stream, + writeAlgorithm, + }; + stream[kState].controller = controller; + + writableStreamUpdateBackpressure( + stream, + writableStreamDefaultControllerGetBackpressure(controller)); + + PromisePrototypeThen( + (async () => startAlgorithm())(), + () => { + assert(stream[kState].state === 'writable' || + stream[kState].state === 'erroring'); + controller[kState].started = true; + writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + (error) => { + assert(stream[kState].state === 'writable' || + stream[kState].state === 'erroring'); + controller[kState].started = true; + writableStreamDealWithRejection(stream, error); + }); +} + +// ---- TransformStream Implementation + +async function defaultTransformAlgorithm(chunk, controller) { + transformStreamDefaultControllerEnqueue(controller, chunk); +} + +async function nonOpFlush() {} + +function initializeTransformStream( + stream, + startPromise, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm) { + + const writable = new WritableStream({ + start() { return startPromise.promise; }, + write(chunk) { + return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); + }, + abort(reason) { + return transformStreamDefaultSinkAbortAlgorithm(stream, reason); + }, + close() { + return transformStreamDefaultSinkCloseAlgorithm(stream); + }, + }, { + highWaterMark: writableHighWaterMark, + size: writableSizeAlgorithm, + }); + + const readable = new ReadableStream({ + pull() { + return transformStreamDefaultSourcePullAlgorithm(stream); + }, + cancel(reason) { + transformStreamErrorWritableAndUnblockWrite(stream, reason); + return PromiseResolve(); + }, + }, { + highWaterMark: readableHighWaterMark, + size: readableSizeAlgorithm, + }); + + stream[kState] = { + readable, + writable, + controller: undefined, + backpressure: undefined, + backpressureChange: { + promise: undefined, + resolve: undefined, + reject: undefined, + } + }; + + transformStreamSetBackpressure(stream, true); +} + +function transformStreamError(stream, error) { + const { + readable, + } = stream[kState]; + const { + controller: readableController, + } = readable[kState]; + readableStreamDefaultControllerError(readableController, error); + transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +function transformStreamErrorWritableAndUnblockWrite(stream, error) { + const { + controller, + } = stream[kState]; + transformStreamDefaultControllerClearAlgorithms(controller); + writableStreamDefaultControllerErrorIfNeeded(controller, error); + if (stream[kState].backpressure) + transformStreamSetBackpressure(stream, false); +} + +function transformStreamSetBackpressure(stream, backpressure) { + assert(stream[kState].backpressure !== backpressure); + if (stream[kState].backpressureChange.promise !== undefined) + stream[kState].backpressureChange.resolve?.(); + stream[kState].backpressureChange = createDeferredPromise(); + stream[kState].backpressure = backpressure; +} + +function setupTransformStreamDefaultController( + stream, + controller, + transformAlgorithm, + flushAlgorithm) { + assert(isTransformStream(stream)); + assert(stream[kState].controller === undefined); + controller[kState] = { + stream, + transformAlgorithm, + flushAlgorithm, + }; + stream[kState].controller = controller; +} + +function setupTransformStreamDefaultControllerFromTransformer( + stream, + transformer) { + const controller = new InternalTransformStreamDefaultController(); + const transform = transformer?.transform || defaultTransformAlgorithm; + const flush = transformer?.flush || nonOpFlush; + const transformAlgorithm = + FunctionPrototypeBind(transform, transformer); + const flushAlgorithm = + FunctionPrototypeBind(flush, transformer); + + setupTransformStreamDefaultController( + stream, + controller, + transformAlgorithm, + flushAlgorithm); +} + +function transformStreamDefaultControllerClearAlgorithms(controller) { + controller[kState].transformAlgorithm = undefined; + controller[kState].flushAlgorithm = undefined; +} + +function transformStreamDefaultControllerEnqueue(controller, chunk) { + const { + stream, + } = controller[kState]; + const { + readable, + } = stream[kState]; + const { + controller: readableController, + } = readable[kState]; + if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + throw new ERR_INVALID_STATE.TypeError('Unable to enqueue'); + try { + readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (error) { + transformStreamErrorWritableAndUnblockWrite(stream, error); + throw readable[kState].storedError; + } + const backpressure = + readableStreamDefaultControllerHasBackpressure(readableController); + if (backpressure !== stream[kState].backpressure) { + assert(backpressure); + transformStreamSetBackpressure(stream, true); + } +} + +function transformStreamDefaultControllerError(controller, error) { + transformStreamError(controller[kState].stream, error); +} + +function transformStreamDefaultControllerPerformTransform(controller, chunk) { + const transformPromise = + PromiseResolve( + (async () => controller[kState].transformAlgorithm(chunk, controller))()); + return PromisePrototypeCatch(transformPromise, (error) => { + transformStreamError(controller[kState].stream, error); + throw error; + }); +} + +function transformStreamDefaultControllerTerminate(controller) { + const { + stream, + readable, + } = controller[kState]; + assert(readable !== undefined); + const { + controller: readableController, + } = readable[kState]; + readableStreamDefaultControllerClose(readableController); + transformStreamErrorWritableAndUnblockWrite( + stream, + new ERR_INVALID_STATE('TransformStream has been terminated')); +} + +function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { + const { + writable, + controller, + } = stream[kState]; + assert(writable[kState].state === 'writable'); + if (stream[kState].backpressure) { + const backpressureChange = stream[kState].backpressureChange.promise; + return PromisePrototypeThen(backpressureChange, () => { + const { + writable, + } = stream[kState].writable; + if (writable[kState].state === 'erroring') + throw writable[kState].storedError; + assert(writable[kState].state === 'writable'); + return transformStreamDefaultControllerPerformTransform( + controller, + chunk); + }); + } + transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { + transformStreamError(stream, reason); + return PromiseResolve(); +} + +function transformStreamDefaultSinkCloseAlgorithm(stream) { + const { + readable, + controller, + } = stream[kState]; + + const flushPromise = + (async () => controller[kState].flushAlgorithm(controller))(); + transformStreamDefaultControllerClearAlgorithms(controller); + return PromisePrototypeThen( + flushPromise, + () => { + if (readable[kState].state === 'errored') + throw readable[kState].storedError; + readableStreamDefaultControllerClose(readable[kState].controller); + }, + (error) => { + transformStreamError(stream, error); + throw readable[kState].storedError; + }); +} + +function transformStreamDefaultSourcePullAlgorithm(stream) { + assert(stream[kState].backpressure); + assert(stream[kState].backpressureChange.promise !== undefined); + transformStreamSetBackpressure(stream, false); + return stream[kState].backpressureChange.promise; +} + +module.exports = { + kState, // Exported for testing purposes only + ReadableStream, + ReadableStreamDefaultReader, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, + ReadableByteStreamController, + ReadableStreamDefaultController, + TransformStream, + TransformStreamDefaultController, + WritableStream, + WritableStreamDefaultWriter, + WritableStreamDefaultController, + ByteLengthQueuingStrategy, + CountQueuingStrategy, +}; + +/* eslint-enable no-use-before-define */ diff --git a/lib/stream/whatwg.js b/lib/stream/whatwg.js new file mode 100644 index 00000000000000..9efbbfe9f03b6e --- /dev/null +++ b/lib/stream/whatwg.js @@ -0,0 +1,33 @@ +'use strict'; + +const { + ReadableStream, + ReadableStreamDefaultReader, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, + ReadableByteStreamController, + ReadableStreamDefaultController, + TransformStream, + TransformStreamDefaultController, + WritableStream, + WritableStreamDefaultWriter, + WritableStreamDefaultController, + ByteLengthQueuingStrategy, + CountQueuingStrategy, +} = require('internal/streams/whatwg'); + +module.exports = { + ReadableStream, + ReadableStreamDefaultReader, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, + ReadableByteStreamController, + ReadableStreamDefaultController, + TransformStream, + TransformStreamDefaultController, + WritableStream, + WritableStreamDefaultWriter, + WritableStreamDefaultController, + ByteLengthQueuingStrategy, + CountQueuingStrategy, +}; diff --git a/node.gyp b/node.gyp index 3f674c978d2788..faaec9331f65c6 100644 --- a/node.gyp +++ b/node.gyp @@ -80,6 +80,7 @@ 'lib/repl.js', 'lib/stream.js', 'lib/stream/promises.js', + 'lib/stream/whatwg.js', 'lib/_stream_readable.js', 'lib/_stream_writable.js', 'lib/_stream_duplex.js', @@ -272,6 +273,7 @@ 'lib/internal/streams/pipeline.js', 'lib/internal/streams/end-of-stream.js', 'lib/internal/streams/utils.js', + 'lib/internal/streams/whatwg.js', 'deps/v8/tools/splaytree.mjs', 'deps/v8/tools/codemap.mjs', 'deps/v8/tools/consarray.mjs', diff --git a/src/node_buffer.cc b/src/node_buffer.cc index e816ba131644ad..29c43ee2a81b12 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -67,6 +67,7 @@ using v8::MaybeLocal; using v8::Nothing; using v8::Number; using v8::Object; +using v8::SharedArrayBuffer; using v8::String; using v8::Uint32; using v8::Uint32Array; @@ -1158,6 +1159,60 @@ void GetZeroFillToggle(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(Uint32Array::New(ab, 0, 1)); } +void DetachArrayBuffer(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (args[0]->IsArrayBuffer()) { + Local buf = args[0].As(); + if (buf->IsDetachable()) { + std::shared_ptr store = buf->GetBackingStore(); + buf->Detach(); + args.GetReturnValue().Set(ArrayBuffer::New(env->isolate(), store)); + } + } +} + +void CopyArrayBuffer(const FunctionCallbackInfo& args) { + // args[0] == Destination ArrayBuffer + // args[1] == Destination ArrayBuffer Offset + // args[2] == Source ArrayBuffer + // args[3] == Source ArrayBuffer Offset + // args[4] == bytesToCopy + + CHECK(args[0]->IsArrayBuffer() || args[0]->IsSharedArrayBuffer()); + CHECK(args[1]->IsUint32()); + CHECK(args[2]->IsArrayBuffer() || args[2]->IsSharedArrayBuffer()); + CHECK(args[3]->IsUint32()); + CHECK(args[4]->IsUint32()); + + std::shared_ptr destination; + std::shared_ptr source; + + if (args[0]->IsArrayBuffer()) { + destination = args[0].As()->GetBackingStore(); + } else if (args[0]->IsSharedArrayBuffer()) { + destination = args[0].As()->GetBackingStore(); + } + + if (args[2]->IsArrayBuffer()) { + source = args[2].As()->GetBackingStore(); + } else if (args[0]->IsSharedArrayBuffer()) { + source = args[2].As()->GetBackingStore(); + } + + uint32_t destination_offset = args[1].As()->Value(); + uint32_t source_offset = args[3].As()->Value(); + size_t bytes_to_copy = args[4].As()->Value(); + + CHECK(destination->ByteLength() - destination_offset >= bytes_to_copy); + CHECK(source->ByteLength() - source_offset >= bytes_to_copy); + + uint8_t* dest = + static_cast(destination->Data()) + destination_offset; + uint8_t* src = + static_cast(source->Data()) + source_offset; + memcpy(dest, src, bytes_to_copy); +} + void Initialize(Local target, Local unused, Local context, @@ -1176,6 +1231,9 @@ void Initialize(Local target, env->SetMethodNoSideEffect(target, "indexOfNumber", IndexOfNumber); env->SetMethodNoSideEffect(target, "indexOfString", IndexOfString); + env->SetMethod(target, "detachArrayBuffer", DetachArrayBuffer); + env->SetMethod(target, "copyArrayBuffer", CopyArrayBuffer); + env->SetMethod(target, "swap16", Swap16); env->SetMethod(target, "swap32", Swap32); env->SetMethod(target, "swap64", Swap64); @@ -1251,6 +1309,9 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(StringWrite); registry->Register(GetZeroFillToggle); + registry->Register(DetachArrayBuffer); + registry->Register(CopyArrayBuffer); + Blob::RegisterExternalReferences(registry); FixedSizeBlobCopyJob::RegisterExternalReferences(registry); } diff --git a/test/parallel/test-whatwg-readablebytestream.js b/test/parallel/test-whatwg-readablebytestream.js new file mode 100644 index 00000000000000..34409c74b64d1c --- /dev/null +++ b/test/parallel/test-whatwg-readablebytestream.js @@ -0,0 +1,165 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + ReadableStream, + ReadableByteStreamController, + ReadableStreamDefaultReader, + ReadableStreamBYOBReader, +} = require('stream/whatwg'); + +const { + kState, +} = require('internal/streams/whatwg'); + +const { + open, +} = require('fs/promises'); + +const { + readFileSync, +} = require('fs'); + +const { + Buffer, +} = require('buffer'); + +{ + const r = new ReadableStream({ + type: 'bytes', + }); + + assert(r[kState].controller instanceof ReadableByteStreamController); + + assert.strictEqual(typeof r.locked, 'boolean'); + assert.strictEqual(typeof r.cancel, 'function'); + assert.strictEqual(typeof r.getReader, 'function'); + assert.strictEqual(typeof r.pipeThrough, 'function'); + assert.strictEqual(typeof r.pipeTo, 'function'); + assert.strictEqual(typeof r.tee, 'function'); + + ['', null, 'asdf'].forEach((mode) => { + assert.throws(() => r.getReader({ mode }), { + code: 'ERR_INVALID_ARG_VALUE', + }); + }); + + [1, null, 'asdf'].forEach((options) => { + assert.throws(() => r.getReader(options), { + code: 'ERR_INVALID_ARG_TYPE', + }); + }); + + assert(!r.locked); + const defaultReader = r.getReader(); + assert(r.locked); + assert(defaultReader instanceof ReadableStreamDefaultReader); + defaultReader.releaseLock(); + const byobReader = r.getReader({ mode: 'byob' }); + assert(byobReader instanceof ReadableStreamBYOBReader); +} + +class Source { + constructor() { + this.controllerClosed = false; + } + + async start(controller) { + this.file = await open(__filename); + this.controller = controller; + } + + async pull(controller) { + const view = controller.byobRequest?.view; + const { + bytesRead, + } = await this.file.read({ + buffer: view, + offset: view.byteOffset, + length: view.byteLength + }); + + if (bytesRead === 0) { + await this.file.close(); + this.controller.close(); + } + controller.byobRequest.respond(bytesRead); + } + + get type() { return 'bytes'; } + + get autoAllocateChunkSize() { return 1024; } +} + +{ + const stream = new ReadableStream(new Source()); + assert(stream[kState].controller instanceof ReadableByteStreamController); + + async function read(stream) { + const reader = stream.getReader({ mode: 'byob' }); + + const chunks = []; + let result; + do { + result = await reader.read(Buffer.alloc(100)); + if (result.value !== undefined) + chunks.push(Buffer.from(result.value)); + } while (!result.done); + + return Buffer.concat(chunks); + } + + read(stream).then(common.mustCall((data) => { + const check = readFileSync(__filename); + assert.deepStrictEqual(check, data); + })); +} + +{ + const stream = new ReadableStream(new Source()); + assert(stream[kState].controller instanceof ReadableByteStreamController); + + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + + return Buffer.concat(chunks); + } + + read(stream).then(common.mustCall((data) => { + const check = readFileSync(__filename); + assert.deepStrictEqual(check, data); + })); +} + +{ + const stream = new ReadableStream(new Source()); + assert(stream[kState].controller instanceof ReadableByteStreamController); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream) + break; + } + + read(stream).then(common.mustCall()); +} + +{ + const stream = new ReadableStream(new Source()); + assert(stream[kState].controller instanceof ReadableByteStreamController); + + const error = new Error('boom'); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream) + throw error; + } + + assert.rejects(read(stream), error); +} diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js new file mode 100644 index 00000000000000..3274d52d8f9007 --- /dev/null +++ b/test/parallel/test-whatwg-readablestream.js @@ -0,0 +1,1298 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { + isPromise, +} = require('util/types'); +const { + setImmediate: delay +} = require('timers/promises'); + +const { + ByteLengthQueuingStrategy, + CountQueuingStrategy, + ReadableStream, + ReadableStreamDefaultReader, + ReadableStreamDefaultController, + ReadableByteStreamController +} = require('stream/whatwg'); + +const { + kState +} = require('internal/streams/whatwg'); + +const { + createReadStream, + readFileSync, +} = require('fs'); +const { + Buffer, +} = require('buffer'); + +{ + const r = new ReadableStream(); + assert.strictEqual(typeof r.locked, 'boolean'); + assert.strictEqual(typeof r.cancel, 'function'); + assert.strictEqual(typeof r.getReader, 'function'); + assert.strictEqual(typeof r.pipeThrough, 'function'); + assert.strictEqual(typeof r.pipeTo, 'function'); + assert.strictEqual(typeof r.tee, 'function'); + + ['', null, 'asdf'].forEach((mode) => { + assert.throws(() => r.getReader({ mode }), { + code: 'ERR_INVALID_ARG_VALUE', + }); + }); + + [1, null, 'asdf'].forEach((options) => { + assert.throws(() => r.getReader(options), { + code: 'ERR_INVALID_ARG_TYPE', + }); + }); + + assert(!r.locked); + r.getReader(); + assert(r.locked); +} + +{ + const source = { + start: common.mustCall((controller) => { + assert(controller instanceof ReadableStreamDefaultController); + }), + pull: common.mustCall((controller) => { + assert(controller instanceof ReadableStreamDefaultController); + }), + cancel: common.mustNotCall(), + }; + + new ReadableStream(source); +} + +{ + const source = { + start: common.mustCall(async (controller) => { + assert(controller instanceof ReadableStreamDefaultController); + }), + pull: common.mustCall(async (controller) => { + assert(controller instanceof ReadableStreamDefaultController); + }), + cancel: common.mustNotCall(), + }; + + new ReadableStream(source); +} + +{ + const source = { + start: common.mustCall((controller) => { + assert(controller instanceof ReadableByteStreamController); + }), + pull: common.mustNotCall(), + cancel: common.mustNotCall(), + type: 'bytes', + }; + + new ReadableStream(source); +} + +{ + const source = { + start: common.mustCall(async (controller) => { + assert(controller instanceof ReadableByteStreamController); + }), + pull: common.mustNotCall(), + cancel: common.mustNotCall(), + type: 'bytes', + }; + + new ReadableStream(source); +} + +{ + const source = { + start: common.mustCall(async (controller) => { + assert(controller instanceof ReadableByteStreamController); + }), + pull: common.mustCall(async (controller) => { + assert(controller instanceof ReadableByteStreamController); + }), + cancel: common.mustNotCall(), + type: 'bytes', + }; + + new ReadableStream(source, { highWaterMark: 10 }); +} + +{ + // These are silly but they should all work per spec + new ReadableStream(1); + new ReadableStream('hello'); + new ReadableStream(false); + new ReadableStream([]); + new ReadableStream(1, 1); + new ReadableStream(1, 'hello'); + new ReadableStream(1, false); + new ReadableStream(1, []); +} + +['a', {}, false].forEach((size) => { + assert.throws(() => { + new ReadableStream({}, { size }); + }, { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +['a', {}, false].forEach((highWaterMark) => { + assert.throws(() => { + new ReadableStream({}, { highWaterMark }); + }, { + code: 'ERR_INVALID_ARG_VALUE', + }); + + assert.throws(() => { + new ReadableStream({ type: 'bytes' }, { highWaterMark }); + }, { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +[-1, NaN].forEach((highWaterMark) => { + assert.throws(() => { + new ReadableStream({}, { highWaterMark }); + }, { + code: 'ERR_INVALID_ARG_VALUE', + }); + + assert.throws(() => { + new ReadableStream({ type: 'bytes' }, { highWaterMark }); + }, { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +{ + new ReadableStream({}, new ByteLengthQueuingStrategy({ highWaterMark: 1 })); + new ReadableStream({}, new CountQueuingStrategy({ highWaterMark: 1 })); +} + +{ + const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1 }); + assert.strictEqual(strategy.highWaterMark, 1); + assert.strictEqual(strategy.size(new ArrayBuffer(10)), 10); + + const { size } = strategy; + assert.strictEqual(size(new ArrayBuffer(10)), 10); +} + +{ + const strategy = new CountQueuingStrategy({ highWaterMark: 1 }); + assert.strictEqual(strategy.highWaterMark, 1); + assert.strictEqual(strategy.size(new ArrayBuffer(10)), 1); + + const { size } = strategy; + assert.strictEqual(size(new ArrayBuffer(10)), 1); +} + +{ + const r = new ReadableStream({ + async start() { + throw new Error('boom'); + } + }); + + setImmediate(() => { + assert.strictEqual(r[kState].state, 'errored'); + assert.match(r[kState].storedError?.message, /boom/); + }); +} + +{ + const data = Buffer.from('hello'); + const r = new ReadableStream({ + start(controller) { + controller.enqueue(data); + controller.close(); + }, + }); + + (async function read() { + const reader = r.getReader(); + let res = await reader.read(); + if (res.done) return; + const buf = Buffer.from(res.value); + assert.strictEqual(buf.toString(), data.toString()); + res = await reader.read(); + assert(res.done); + })().then(common.mustCall()); +} + +{ + const r = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + (async function read() { + const reader = r.getReader(); + const res = await reader.read(); + assert(res.done); + })().then(common.mustCall()); +} + +assert.throws(() => { + new ReadableStream({ + get start() { throw new Error('boom1'); } + }, { + get size() { throw new Error('boom2'); } + }); +}, /boom2/); + +{ + const stream = new ReadableStream(); + const reader = stream.getReader(); + + assert(stream.locked); + assert.strictEqual(reader[kState].stream, stream); + assert.strictEqual(stream[kState].reader, reader); + + assert.throws(() => stream.getReader(), { + code: 'ERR_INVALID_STATE', + }); + + assert(reader instanceof ReadableStreamDefaultReader); + + assert(isPromise(reader.closed)); + assert.strictEqual(typeof reader.cancel, 'function'); + assert.strictEqual(typeof reader.read, 'function'); + assert.strictEqual(typeof reader.releaseLock, 'function'); + + const read1 = reader.read(); + const read2 = reader.read(); + + // The stream is empty so the read will never settle. + read1.then( + common.mustNotCall(), + common.mustNotCall() + ); + + // The stream is empty so the read will never settle. + read2.then( + common.mustNotCall(), + common.mustNotCall() + ); + + assert.notStrictEqual(read1, read2); + + assert.strictEqual(reader[kState].readRequests.length, 2); + + delay().then(common.mustCall()); + + assert.throws(() => reader.releaseLock(), { + code: 'ERR_INVALID_STATE', + }); + assert(stream.locked); +} + +{ + const stream = new ReadableStream(); + const reader = stream.getReader(); + const closedBefore = reader.closed; + assert(stream.locked); + reader.releaseLock(); + assert(!stream.locked); + const closedAfter = reader.closed; + + assert.strictEqual(closedBefore, closedAfter); + + assert.rejects(reader.read(), { + code: 'ERR_INVALID_STATE', + }); + + assert.rejects(closedBefore, { + code: 'ERR_INVALID_STATE', + }); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(Buffer.from('hello')); + } + }); + + const reader = stream.getReader(); + + assert.rejects(stream.cancel(), { + code: 'ERR_INVALID_STATE', + }); + + reader.cancel(); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + assert(!stream.locked); + + const cancel1 = stream.cancel(); + const cancel2 = stream.cancel(); + + assert.notStrictEqual(cancel1, cancel2); + + Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { + assert.deepStrictEqual(res, [undefined, undefined]); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + + stream.getReader().releaseLock(); + stream.getReader().releaseLock(); + stream.getReader(); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + + stream.getReader(); + + assert.throws(() => stream.getReader(), { + code: 'ERR_INVALID_STATE', + }); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const reader = stream.getReader(); + + reader.closed.then(common.mustCall()); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + })); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const reader = stream.getReader(); + + const closedBefore = reader.closed; + reader.releaseLock(); + const closedAfter = reader.closed; + assert.notStrictEqual(closedBefore, closedAfter); + + closedBefore.then(common.mustCall()); + assert.rejects(closedAfter, { + code: 'ERR_INVALID_STATE', + }); +} + +{ + let c; + const stream = new ReadableStream({ + start(controller) { + c = controller; + }, + }); + + const reader = stream.getReader(); + c.close(); + + const closedBefore = reader.closed; + reader.releaseLock(); + const closedAfter = reader.closed; + assert.notStrictEqual(closedBefore, closedAfter); + + closedBefore.then(common.mustCall()); + assert.rejects(closedAfter, { + code: 'ERR_INVALID_STATE', + }); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const reader = stream.getReader(); + + const cancel1 = reader.cancel(); + const cancel2 = reader.cancel(); + const closed = reader.closed; + + assert.notStrictEqual(cancel1, cancel2); + assert.notStrictEqual(cancel1, closed); + assert.notStrictEqual(cancel2, closed); + + Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { + assert.deepStrictEqual(res, [undefined, undefined]); + })); +} + +{ + let c; + const stream = new ReadableStream({ + start(controller) { + c = controller; + }, + }); + + const reader = stream.getReader(); + c.close(); + + const cancel1 = reader.cancel(); + const cancel2 = reader.cancel(); + const closed = reader.closed; + + assert.notStrictEqual(cancel1, cancel2); + assert.notStrictEqual(cancel1, closed); + assert.notStrictEqual(cancel2, closed); + + Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { + assert.deepStrictEqual(res, [undefined, undefined]); + })); +} + +{ + const stream = new ReadableStream(); + const cancel1 = stream.cancel(); + const cancel2 = stream.cancel(); + assert.notStrictEqual(cancel1, cancel2); + + Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { + assert.deepStrictEqual(res, [undefined, undefined]); + })); + + stream.getReader().read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + })); +} + +{ + const error = new Error('boom'); + const stream = new ReadableStream({ + start(controller) { + controller.error(error); + } + }); + stream.getReader().releaseLock(); + const reader = stream.getReader(); + assert.rejects(reader.closed, error); + assert.rejects(reader.read(), error); + assert.rejects(reader.read(), error); +} + +{ + const error = new Error('boom'); + const stream = new ReadableStream({ + start(controller) { + controller.error(error); + } + }); + const reader = stream.getReader(); + const cancel1 = reader.cancel(); + const cancel2 = reader.cancel(); + assert.notStrictEqual(cancel1, cancel2); + assert.rejects(cancel1, error); + assert.rejects(cancel2, error); +} + +{ + const error = new Error('boom'); + const stream = new ReadableStream({ + async start(controller) { + throw error; + } + }); + stream.getReader().releaseLock(); + const reader = stream.getReader(); + assert.rejects(reader.closed, error); + assert.rejects(reader.read(), error); + assert.rejects(reader.read(), error); +} + +{ + const buf1 = Buffer.from('hello'); + const buf2 = Buffer.from('there'); + let doClose; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(buf1); + controller.enqueue(buf2); + doClose = controller.close.bind(controller); + } + }); + const reader = stream.getReader(); + doClose(); + reader.read().then(common.mustCall(({ value, done }) => { + assert.deepStrictEqual(value, buf1); + assert(!done); + reader.read().then(common.mustCall(({ value, done }) => { + assert.deepStrictEqual(value, buf2); + assert(!done); + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + })); + })); + })); +} + +{ + const buf1 = Buffer.from('hello'); + const buf2 = Buffer.from('there'); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(buf1); + controller.enqueue(buf2); + } + }); + const reader = stream.getReader(); + reader.read().then(common.mustCall(({ value, done }) => { + assert.deepStrictEqual(value, buf1); + assert(!done); + reader.read().then(common.mustCall(({ value, done }) => { + assert.deepStrictEqual(value, buf2); + assert(!done); + reader.read().then(common.mustNotCall()); + delay().then(common.mustCall()); + })); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + + assert(s1 instanceof ReadableStream); + assert(s2 instanceof ReadableStream); + + async function read(stream) { + const reader = stream.getReader(); + assert.deepStrictEqual( + await reader.read(), { value: 'a', done: false }); + assert.deepStrictEqual( + await reader.read(), { value: 'b', done: false }); + assert.deepStrictEqual( + await reader.read(), { value: undefined, done: true }); + } + + Promise.all([ + read(s1), + read(s2), + ]).then(common.mustCall()); +} + +{ + const error = new Error('boom'); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + }, + pull() { throw error; } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + + assert(stream.locked); + + assert(s1 instanceof ReadableStream); + assert(s2 instanceof ReadableStream); + + const reader1 = s1.getReader(); + const reader2 = s2.getReader(); + + const closed1 = reader1.closed; + const closed2 = reader2.closed; + + assert.notStrictEqual(closed1, closed2); + + assert.rejects(closed1, error); + assert.rejects(closed2, error); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + + assert(s1 instanceof ReadableStream); + assert(s2 instanceof ReadableStream); + + s2.cancel(); + + async function read(stream, canceled = false) { + const reader = stream.getReader(); + if (!canceled) { + assert.deepStrictEqual( + await reader.read(), { value: 'a', done: false }); + assert.deepStrictEqual( + await reader.read(), { value: 'b', done: false }); + } + assert.deepStrictEqual( + await reader.read(), { value: undefined, done: true }); + } + + Promise.all([ + read(s1), + read(s2, true), + ]).then(common.mustCall()); +} + +{ + const error1 = new Error('boom1'); + const error2 = new Error('boom2'); + + const stream = new ReadableStream({ + cancel(reason) { + assert.deepStrictEqual(reason, [error1, error2]); + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + s1.cancel(error1); + s2.cancel(error2); +} + +{ + const error1 = new Error('boom1'); + const error2 = new Error('boom2'); + + const stream = new ReadableStream({ + cancel(reason) { + assert.deepStrictEqual(reason, [error1, error2]); + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + s2.cancel(error2); + s1.cancel(error1); +} + +{ + const error = new Error('boom1'); + + const stream = new ReadableStream({ + cancel() { + throw error; + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + + assert.rejects(s1.cancel(), error); + assert.rejects(s2.cancel(), error); +} + +{ + const error = new Error('boom1'); + let c; + const stream = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + c.error(error); + + assert.rejects(s1.cancel(), error); + assert.rejects(s2.cancel(), error); +} + +{ + const error = new Error('boom1'); + let c; + const stream = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const { 0: s1, 1: s2 } = stream.tee(); + + const reader1 = s1.getReader(); + const reader2 = s2.getReader(); + + assert.rejects(reader1.closed, error); + assert.rejects(reader2.closed, error); + + assert.rejects(reader1.read(), error); + assert.rejects(reader2.read(), error); + + setImmediate(() => c.error(error)); +} + +{ + let pullCount = 0; + const stream = new ReadableStream({ + pull(controller) { + if (pullCount) + controller.enqueue(pullCount); + pullCount++; + }, + }); + + const reader = stream.getReader(); + + queueMicrotask(common.mustCall(() => { + assert.strictEqual(pullCount, 1); + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 1); + assert(!done); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 2); + assert(!done); + })); + + })); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + }, + pull: common.mustCall(), + }); + + stream.getReader().read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 'a'); + assert(!done); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + }, + pull: common.mustCall(), + }); + + const reader = stream.getReader(); + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 'a'); + assert(!done); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 'b'); + assert(!done); + })); + })); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + }, + pull: common.mustNotCall(), + }); + + const reader = stream.getReader(); + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 'a'); + assert(!done); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, 'b'); + assert(!done); + + reader.read().then(common.mustCall(({ value, done }) => { + assert.strictEqual(value, undefined); + assert(done); + })); + + })); + })); +} + +{ + let res; + let promise; + let calls = 0; + const stream = new ReadableStream({ + pull(controller) { + controller.enqueue(++calls); + promise = new Promise((resolve) => res = resolve); + return promise; + } + }); + + const reader = stream.getReader(); + + (async () => { + await reader.read(); + assert.strictEqual(calls, 1); + await delay(); + assert.strictEqual(calls, 1); + res(); + await delay(); + assert.strictEqual(calls, 2); + })().then(common.mustCall()); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + }, + pull: common.mustCall(4), + }, { + highWaterMark: Infinity, + size() { return 1; } + }); + + const reader = stream.getReader(); + (async () => { + await delay(); + await reader.read(); + await reader.read(); + await reader.read(); + })().then(common.mustCall()); +} + +{ + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + controller.close(); + }, + pull: common.mustNotCall(), + }, { + highWaterMark: Infinity, + size() { return 1; } + }); + + const reader = stream.getReader(); + (async () => { + await delay(); + await reader.read(); + await reader.read(); + await reader.read(); + })().then(common.mustCall()); +} + +{ + let calls = 0; + let res; + const ready = new Promise((resolve) => res = resolve); + + new ReadableStream({ + pull(controller) { + controller.enqueue(++calls); + if (calls === 4) + res(); + } + }, { + size() { return 1; }, + highWaterMark: 4 + }); + + ready.then(common.mustCall(() => { + assert.strictEqual(calls, 4); + })); +} + +{ + const stream = new ReadableStream({ + pull: common.mustCall((controller) => controller.close()) + }); + + const reader = stream.getReader(); + + reader.closed.then(common.mustCall()); +} + +{ + const error = new Error('boom'); + const stream = new ReadableStream({ + pull: common.mustCall((controller) => controller.error(error)) + }); + + const reader = stream.getReader(); + + assert.rejects(reader.closed, error); +} + +{ + const error = new Error('boom'); + const error2 = new Error('boom2'); + const stream = new ReadableStream({ + pull: common.mustCall((controller) => { + controller.error(error); + throw error2; + }) + }); + + const reader = stream.getReader(); + + assert.rejects(reader.closed, error); +} + +{ + let startCalled = false; + new ReadableStream({ + start: common.mustCall((controller) => { + controller.enqueue('a'); + controller.close(); + assert.throws(() => controller.enqueue('b'), { + code: 'ERR_INVALID_STATE' + }); + startCalled = true; + }) + }); + assert(startCalled); +} + +{ + let startCalled = false; + new ReadableStream({ + start: common.mustCall((controller) => { + controller.close(); + assert.throws(() => controller.enqueue('b'), { + code: 'ERR_INVALID_STATE' + }); + startCalled = true; + }) + }); + assert(startCalled); +} + +{ + class Source { + startCalled = false; + pullCalled = false; + cancelCalled = false; + + start(controller) { + assert.strictEqual(this, source); + this.startCalled = true; + controller.enqueue('a'); + } + + pull() { + assert.strictEqual(this, source); + this.pullCalled = true; + } + + cancel() { + assert.strictEqual(this, source); + this.cancelCalled = true; + } + } + + const source = new Source(); + + const stream = new ReadableStream(source); + const reader = stream.getReader(); + + (async () => { + await reader.read(); + reader.releaseLock(); + stream.cancel(); + assert(source.startCalled); + assert(source.pullCalled); + assert(source.cancelCalled); + })().then(common.mustCall()); +} + +{ + let startCalled = false; + new ReadableStream({ + start(controller) { + assert.strictEqual(controller.desiredSize, 10); + controller.close(); + assert.strictEqual(controller.desiredSize, 0); + startCalled = true; + } + }, { + highWaterMark: 10 + }); + assert(startCalled); +} + +{ + let startCalled = false; + new ReadableStream({ + start(controller) { + assert.strictEqual(controller.desiredSize, 10); + controller.error(); + assert.strictEqual(controller.desiredSize, null); + startCalled = true; + } + }, { + highWaterMark: 10 + }); + assert(startCalled); +} + +{ + class Foo extends ReadableStream {} + const foo = new Foo(); + foo.getReader(); +} + +{ + let startCalled = false; + new ReadableStream({ + start(controller) { + assert.strictEqual(controller.desiredSize, 1); + controller.enqueue('a'); + assert.strictEqual(controller.desiredSize, 0); + controller.enqueue('a'); + assert.strictEqual(controller.desiredSize, -1); + controller.enqueue('a'); + assert.strictEqual(controller.desiredSize, -2); + controller.enqueue('a'); + assert.strictEqual(controller.desiredSize, -3); + startCalled = true; + } + }); + assert(startCalled); +} + +{ + let c; + const stream = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const reader = stream.getReader(); + + (async () => { + assert.strictEqual(c.desiredSize, 1); + c.enqueue(1); + assert.strictEqual(c.desiredSize, 0); + await reader.read(); + assert.strictEqual(c.desiredSize, 1); + c.enqueue(1); + c.enqueue(1); + assert.strictEqual(c.desiredSize, -1); + await reader.read(); + assert.strictEqual(c.desiredSize, 0); + await reader.read(); + assert.strictEqual(c.desiredSize, 1); + })().then(common.mustCall()); +} + +{ + let c; + new ReadableStream({ + start(controller) { + c = controller; + } + }); + assert(c instanceof ReadableStreamDefaultController); + assert.strictEqual(typeof c.desiredSize, 'number'); + assert.strictEqual(typeof c.enqueue, 'function'); + assert.strictEqual(typeof c.close, 'function'); + assert.strictEqual(typeof c.error, 'function'); +} + +class Source { + constructor() { + this.cancelCalled = false; + } + + start(controller) { + this.stream = createReadStream(__filename); + this.stream.on('data', (chunk) => { + controller.enqueue(chunk); + }); + this.stream.once('end', () => { + if (!this.cancelCalled) + controller.close(); + }); + this.stream.once('error', (error) => { + controller.error(error); + }); + } + + cancel() { + this.cancelCalled = true; + } +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + async function read(stream) { + const reader = stream.getReader(); + const chunks = []; + let read = await reader.read(); + while (!read.done) { + chunks.push(Buffer.from(read.value)); + read = await reader.read(); + } + return Buffer.concat(chunks); + } + + read(stream).then(common.mustCall((data) => { + const check = readFileSync(__filename); + assert.deepStrictEqual(data, check); + })); +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks); + } + + read(stream).then(common.mustCall((data) => { + const check = readFileSync(__filename); + assert.deepStrictEqual(data, check); + + assert.strictEqual(stream[kState].state, 'closed'); + assert(!stream.locked); + })); +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + [1, false, ''].forEach((options) => { + assert.throws(() => stream.values(options), { + code: 'ERR_INVALID_ARG_TYPE', + }); + }); + + [1, {}, ''].forEach((preventCancel) => { + assert.throws(() => stream.values({ preventCancel }), { + code: 'ERR_INVALID_ARG_TYPE', + }); + }); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream.values({ preventCancel: true })) + return; + } + + read(stream).then(common.mustCall((data) => { + assert.strictEqual(stream[kState].state, 'readable'); + })); +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream.values({ preventCancel: false })) + return; + } + + read(stream).then(common.mustCall((data) => { + assert.strictEqual(stream[kState].state, 'closed'); + })); +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + const error = new Error('boom'); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream.values({ preventCancel: true })) + throw error; + } + + assert.rejects(read(stream), error).then(common.mustCall(() => { + assert.strictEqual(stream[kState].state, 'readable'); + })); +} + +{ + const source = new Source(); + const stream = new ReadableStream(source); + + const error = new Error('boom'); + + async function read(stream) { + // eslint-disable-next-line no-unused-vars + for await (const _ of stream.values({ preventCancel: false })) + throw error; + } + + assert.rejects(read(stream), error).then(common.mustCall(() => { + assert.strictEqual(stream[kState].state, 'closed'); + })); +} diff --git a/test/parallel/test-whatwg-transformstream.js b/test/parallel/test-whatwg-transformstream.js new file mode 100644 index 00000000000000..12d3cc71ecfe46 --- /dev/null +++ b/test/parallel/test-whatwg-transformstream.js @@ -0,0 +1,121 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + ReadableStream, + TransformStream, +} = require('stream/whatwg'); + +const { + createReadStream, + readFileSync, +} = require('fs'); + +assert.throws(() => new TransformStream({ readableType: 1 }), { + code: 'ERR_INVALID_ARG_VALUE', +}); +assert.throws(() => new TransformStream({ writableType: 1 }), { + code: 'ERR_INVALID_ARG_VALUE', +}); + + +{ + const stream = new TransformStream(); + + async function test(stream) { + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const { 1: result } = await Promise.all([ + writer.write('hello'), + reader.read(), + ]); + + assert.strictEqual(result.value, 'hello'); + } + + test(stream).then(common.mustCall()); +} + +class Transform { + start(controller) { + this.started = true; + } + + async transform(chunk, controller) { + controller.enqueue(chunk.toUpperCase()); + } + + async flush() { + this.flushed = true; + } +} + +{ + const transform = new Transform(); + const stream = new TransformStream(transform); + assert(transform.started); + + async function test(stream) { + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const { 1: result } = await Promise.all([ + writer.write('hello'), + reader.read(), + ]); + + assert.strictEqual(result.value, 'HELLO'); + + await writer.close(); + } + + test(stream).then(common.mustCall(() => { + assert(transform.flushed); + })); +} + +class Source { + constructor() { + this.cancelCalled = false; + } + + start(controller) { + this.stream = createReadStream(__filename); + this.stream.on('data', (chunk) => { + controller.enqueue(chunk.toString()); + }); + this.stream.once('end', () => { + if (!this.cancelCalled) + controller.close(); + }); + this.stream.once('error', (error) => { + controller.error(error); + }); + } + + cancel() { + this.cancelCalled = true; + } +} + +{ + const instream = new ReadableStream(new Source()); + const tstream = new TransformStream(new Transform()); + const r = instream.pipeThrough(tstream); + + async function read(stream) { + let res = ''; + for await (const chunk of stream) + res += chunk; + return res; + } + + read(r).then(common.mustCall((data) => { + const check = readFileSync(__filename); + assert.strictEqual(check.toString().toUpperCase(), data); + })); +} diff --git a/test/parallel/test-whatwg-writablestream.js b/test/parallel/test-whatwg-writablestream.js new file mode 100644 index 00000000000000..d4a4cc504802e8 --- /dev/null +++ b/test/parallel/test-whatwg-writablestream.js @@ -0,0 +1,150 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + WritableStream, + WritableStreamDefaultController, + WritableStreamDefaultWriter, + CountQueuingStrategy, +} = require('stream/whatwg'); + +const { + kState, +} = require('internal/streams/whatwg'); + +const { + isPromise, +} = require('util/types'); + +class Sink { + constructor() { + this.chunks = []; + } + + start() { + this.started = true; + } + + write(chunk) { + this.chunks.push(chunk); + } + + close() { + this.closed = true; + } + + abort() { + this.aborted = true; + } +} + +{ + const stream = new WritableStream(); + + assert(stream[kState].controller instanceof WritableStreamDefaultController); + assert(!stream.locked); + + assert.strictEqual(typeof stream.abort, 'function'); + assert.strictEqual(typeof stream.close, 'function'); + assert.strictEqual(typeof stream.getWriter, 'function'); +} + +[1, false, ''].forEach((type) => { + assert.throws(() => new WritableStream({ type }), { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +['a', false, {}].forEach((highWaterMark) => { + assert.throws(() => new WritableStream({}, { highWaterMark }), { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +['a', false, {}].forEach((size) => { + assert.throws(() => new WritableStream({}, { size }), { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +{ + new WritableStream({}, 1); + new WritableStream({}, 'a'); + new WritableStream({}, null); +} + +{ + const sink = new Sink(); + const stream = new WritableStream( + sink, + new CountQueuingStrategy({ highWaterMark: 1 })); + + assert(!stream.locked); + const writer = stream.getWriter(); + assert(stream.locked); + assert(writer instanceof WritableStreamDefaultWriter); + + assert(isPromise(writer.closed)); + assert(isPromise(writer.ready)); + assert(typeof writer.desiredSize, 'number'); + assert(typeof writer.abort, 'function'); + assert(typeof writer.close, 'function'); + assert(typeof writer.releaseLock, 'function'); + assert(typeof writer.write, 'function'); + + writer.releaseLock(); + assert(!stream.locked); + + const writer2 = stream.getWriter(); + + assert(sink.started); + + writer2.closed.then(common.mustCall()); + writer2.ready.then(common.mustCall()); + + writer2.close().then(common.mustCall(() => { + assert.strict(sink.closed); + })); +} + +{ + const sink = new Sink(); + + const stream = new WritableStream( + sink, + new CountQueuingStrategy({ highWaterMark: 1 })); + + const error = new Error('boom'); + + const writer = stream.getWriter(); + + assert.rejects(writer.closed, error); + + writer.abort(error).then(common.mustCall(() => { + assert.strictEqual(stream[kState].state, 'errored'); + assert(sink.aborted); + })); +} + +{ + const sink = new Sink(); + + const stream = new WritableStream( + sink, { highWaterMark: 1 } + ); + + async function write(stream) { + const writer = stream.getWriter(); + const p = writer.write('hello'); + assert.strictEqual(writer.desiredSize, 0); + await p; + assert.strictEqual(writer.desiredSize, 1); + } + + write(stream).then(common.mustCall(() => { + assert.deepStrictEqual(['hello'], sink.chunks); + })); +} diff --git a/tools/doc/type-parser.mjs b/tools/doc/type-parser.mjs index c2586a43254ecb..6c9e35efcfa9f4 100644 --- a/tools/doc/type-parser.mjs +++ b/tools/doc/type-parser.mjs @@ -226,6 +226,33 @@ const customTypesMap = { 'X509Certificate': 'crypto.html#crypto_class_x509certificate', 'zlib options': 'zlib.html#zlib_class_options', + + 'ReadableStream': + 'whatwg_stream.html#whatwg_stream_class_readablestream', + 'ReadableStreamDefaultReader': + 'whatwg_stream.html#whatwg_stream_class_readablestreamdefaultreader', + 'ReadableStreamBYOBReader': + 'whatwg_stream.html#whatwg_stream_class_readablestreambyobreader', + 'ReadableStreamDefaultController': + 'whatwg_stream.html#whatwg_stream_class_readablestreamdefaultcontroller', + 'ReadableByteStreamController': + 'whatwg_stream.html#whatwg_stream_class_readablebytestreamcontroller', + 'ReadableStreamBYOBRequest': + 'whatwg_stream.html#whatwg_stream_class_readablestreambyobrequest', + 'WritableStream': + 'whatwg_stream.html#whatwg_stream_class_writablestream', + 'WritableStreamDefaultWriter': + 'whatwg_stream.html#whatwg_stream_class_writablestreamdefaultwriter', + 'WritableStreamDefaultController': + 'whatwg_stream.html#whatwg_stream_class_writablestreamdefaultcontroller', + 'TransformStream': + 'whatwg_stream.html#whatwg_stream_class_transformstream', + 'TransformStreamDefaultController': + 'whatwg_stream.html#whatwg_stream_class_transformstreamdefaultcontroller', + 'ByteLengthQueuingStrategy': + 'whatwg_stream.html#whatwg_stream_class_bytelengthqueuingstrategy', + 'CountQueuingStrategy': + 'whatwg_stream.html#whatwg_stream_class_countqueuingstrategy', }; const arrayPart = /(?:\[])+$/;