Skip to content

Commit

Permalink
Add ReadableStreamBYOBReader.prototype.read(view, { min })
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens committed Jan 4, 2024
1 parent c6b5f78 commit 645bc83
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 20 deletions.
8 changes: 7 additions & 1 deletion etc/web-streams-polyfill.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ export class ReadableStreamBYOBReader {
constructor(stream: ReadableStream<Uint8Array>);
cancel(reason?: any): Promise<void>;
get closed(): Promise<undefined>;
read<T extends ArrayBufferView>(view: T): Promise<ReadableStreamBYOBReadResult<T>>;
read<T extends ArrayBufferView>(view: T, options?: ReadableStreamBYOBReaderReadOptions): Promise<ReadableStreamBYOBReadResult<T>>;
releaseLock(): void;
}

// @public
export interface ReadableStreamBYOBReaderReadOptions {
// (undocumented)
min?: number;
}

// @public
export type ReadableStreamBYOBReadResult<T extends ArrayBufferView> = {
done: false;
Expand Down
4 changes: 4 additions & 0 deletions src/lib/helpers/array-buffer-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ function isDataViewConstructor(ctor: Function): ctor is DataViewConstructor {
return ctor === DataView;
}

export function isDataView(view: ArrayBufferView): view is DataView {
return isDataViewConstructor(view.constructor);
}

export function arrayBufferViewElementSize<T extends ArrayBufferView>(ctor: ArrayBufferViewConstructor<T>): number {
if (isDataViewConstructor(ctor)) {
return 1;
Expand Down
6 changes: 5 additions & 1 deletion src/lib/readable-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ import { assertObject, assertRequiredArgument } from './validators/basic';
import { convertQueuingStrategy } from './validators/queuing-strategy';
import { ExtractHighWaterMark, ExtractSizeAlgorithm } from './abstract-ops/queuing-strategy';
import { convertUnderlyingDefaultOrByteSource } from './validators/underlying-source';
import type { ReadableStreamGetReaderOptions } from './readable-stream/reader-options';
import type {
ReadableStreamBYOBReaderReadOptions,
ReadableStreamGetReaderOptions
} from './readable-stream/reader-options';
import { convertReaderOptions } from './validators/reader-options';
import type { StreamPipeOptions, ValidatedStreamPipeOptions } from './readable-stream/pipe-options';
import type { ReadableStreamIteratorOptions } from './readable-stream/iterator-options';
Expand Down Expand Up @@ -371,6 +374,7 @@ export type {
ReadableStreamAsyncIterator,
ReadableStreamDefaultReadResult,
ReadableStreamBYOBReadResult,
ReadableStreamBYOBReaderReadOptions,
UnderlyingByteSource,
UnderlyingSource,
UnderlyingSourceStartCallback,
Expand Down
37 changes: 35 additions & 2 deletions src/lib/readable-stream/byob-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import { newPromise, promiseRejectedWith } from '../helpers/webidl';
import { assertRequiredArgument } from '../validators/basic';
import { assertReadableStream } from '../validators/readable-stream';
import { IsDetachedBuffer } from '../abstract-ops/ecmascript';
import type {
ReadableStreamBYOBReaderReadOptions,
ValidatedReadableStreamBYOBReaderReadOptions
} from './reader-options';
import { convertByobReadOptions } from '../validators/reader-options';
import { isDataView, type TypedArray } from '../helpers/array-buffer-view';

/**
* A result returned by {@link ReadableStreamBYOBReader.read}.
Expand Down Expand Up @@ -157,7 +163,14 @@ export class ReadableStreamBYOBReader {
*
* If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.
*/
read<T extends ArrayBufferView>(view: T): Promise<ReadableStreamBYOBReadResult<T>> {
read<T extends ArrayBufferView>(
view: T,
options?: ReadableStreamBYOBReaderReadOptions
): Promise<ReadableStreamBYOBReadResult<T>>;
read<T extends ArrayBufferView>(
view: T,
rawOptions: ReadableStreamBYOBReaderReadOptions | null | undefined = {}
): Promise<ReadableStreamBYOBReadResult<T>> {
if (!IsReadableStreamBYOBReader(this)) {
return promiseRejectedWith(byobReaderBrandCheckException('read'));
}
Expand All @@ -175,6 +188,24 @@ export class ReadableStreamBYOBReader {
return promiseRejectedWith(new TypeError('view\'s buffer has been detached'));
}

let options: ValidatedReadableStreamBYOBReaderReadOptions;
try {
options = convertByobReadOptions(rawOptions, 'options');
} catch (e) {
return promiseRejectedWith(e);
}
const min = options.min;
if (min === 0) {
return promiseRejectedWith(new TypeError('options.min must be greater than 0'));
}
if (!isDataView(view)) {
if (min > (view as unknown as TypedArray).length) {
return promiseRejectedWith(new RangeError('options.min must be less than or equal to view\'s length'));
}
} else if (min > view.byteLength) {
return promiseRejectedWith(new RangeError('options.min must be less than or equal to view\'s byteLength'));
}

if (this._ownerReadableStream === undefined) {
return promiseRejectedWith(readerLockException('read from'));
}
Expand All @@ -190,7 +221,7 @@ export class ReadableStreamBYOBReader {
_closeSteps: chunk => resolvePromise({ value: chunk, done: true }),
_errorSteps: e => rejectPromise(e)
};
ReadableStreamBYOBReaderRead(this, view, readIntoRequest);
ReadableStreamBYOBReaderRead(this, view, min, readIntoRequest);
return promise;
}

Expand Down Expand Up @@ -249,6 +280,7 @@ export function IsReadableStreamBYOBReader(x: any): x is ReadableStreamBYOBReade
export function ReadableStreamBYOBReaderRead<T extends ArrayBufferView>(
reader: ReadableStreamBYOBReader,
view: T,
min: number,
readIntoRequest: ReadIntoRequest<T>
): void {
const stream = reader._ownerReadableStream;
Expand All @@ -263,6 +295,7 @@ export function ReadableStreamBYOBReaderRead<T extends ArrayBufferView>(
ReadableByteStreamControllerPullInto(
stream._readableStreamController as ReadableByteStreamController,
view,
min,
readIntoRequest
);
}
Expand Down
33 changes: 22 additions & 11 deletions src/lib/readable-stream/byte-stream-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ interface DefaultPullIntoDescriptor {
byteOffset: number;
byteLength: number;
bytesFilled: number;
minimumFill: number;
elementSize: number;
viewConstructor: TypedArrayConstructor<Uint8Array>;
readerType: 'default' | 'none';
Expand All @@ -166,6 +167,7 @@ interface BYOBPullIntoDescriptor<T extends ArrayBufferView = ArrayBufferView> {
byteOffset: number;
byteLength: number;
bytesFilled: number;
minimumFill: number;
elementSize: number;
viewConstructor: ArrayBufferViewConstructor<T>;
readerType: 'byob' | 'none';
Expand Down Expand Up @@ -335,6 +337,7 @@ export class ReadableByteStreamController {
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
minimumFill: 1,
elementSize: 1,
viewConstructor: Uint8Array,
readerType: 'default'
Expand Down Expand Up @@ -452,7 +455,7 @@ function ReadableByteStreamControllerCommitPullIntoDescriptor<T extends ArrayBuf

let done = false;
if (stream._state === 'closed') {
assert(pullIntoDescriptor.bytesFilled === 0);
assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
done = true;
}

Expand Down Expand Up @@ -516,18 +519,18 @@ function ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller:

function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller: ReadableByteStreamController,
pullIntoDescriptor: PullIntoDescriptor) {
const elementSize = pullIntoDescriptor.elementSize;

const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize;

const maxBytesToCopy = Math.min(controller._queueTotalSize,
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize;

let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
const remainderBytes = maxBytesFilled % pullIntoDescriptor.elementSize;
const maxAlignedBytes = maxBytesFilled - remainderBytes;
// A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head
// of the queue, so the underlying source can keep filling it.
if (maxAlignedBytes >= pullIntoDescriptor.minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
ready = true;
}
Expand Down Expand Up @@ -558,7 +561,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller:
if (!ready) {
assert(controller._queueTotalSize === 0);
assert(pullIntoDescriptor.bytesFilled > 0);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
}

return ready;
Expand Down Expand Up @@ -630,13 +633,18 @@ function ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller: R
export function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(
controller: ReadableByteStreamController,
view: T,
min: number,
readIntoRequest: ReadIntoRequest<T>
): void {
const stream = controller._controlledReadableByteStream;

const ctor = view.constructor as ArrayBufferViewConstructor<T>;
const elementSize = arrayBufferViewElementSize(ctor);

const minimumFill = min * elementSize;
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
assert(minimumFill % elementSize === 0);

// try {
const buffer = TransferArrayBuffer(view.buffer);
// } catch (e) {
Expand All @@ -650,6 +658,7 @@ export function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(
byteOffset: view.byteOffset,
byteLength: view.byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
viewConstructor: ctor,
readerType: 'byob'
Expand Down Expand Up @@ -699,7 +708,7 @@ export function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(

function ReadableByteStreamControllerRespondInClosedState(controller: ReadableByteStreamController,
firstDescriptor: PullIntoDescriptor) {
assert(firstDescriptor.bytesFilled === 0);
assert(firstDescriptor.bytesFilled % firstDescriptor.elementSize === 0);

if (firstDescriptor.readerType === 'none') {
ReadableByteStreamControllerShiftPendingPullInto(controller);
Expand Down Expand Up @@ -727,7 +736,9 @@ function ReadableByteStreamControllerRespondInReadableState(controller: Readable
return;
}

if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill) {
// A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head
// of the queue, so the underlying source can keep filling it.
return;
}

Expand Down Expand Up @@ -831,7 +842,7 @@ export function ReadableByteStreamControllerClose(controller: ReadableByteStream

if (controller._pendingPullIntos.length > 0) {
const firstPendingPullInto = controller._pendingPullIntos.peek();
if (firstPendingPullInto.bytesFilled > 0) {
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
const e = new TypeError('Insufficient bytes to fill elements in the given buffer');
ReadableByteStreamControllerError(controller, e);

Expand Down
12 changes: 12 additions & 0 deletions src/lib/readable-stream/reader-options.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
export interface ReadableStreamGetReaderOptions {
mode?: 'byob';
}

/**
* Options for {@link ReadableStreamBYOBReader.read | reading} a stream
* with a {@link ReadableStreamBYOBReader | BYOB reader}.
*
* @public
*/
export interface ReadableStreamBYOBReaderReadOptions {
min?: number;
}

export type ValidatedReadableStreamBYOBReaderReadOptions = Required<ReadableStreamBYOBReaderReadOptions>;
2 changes: 1 addition & 1 deletion src/lib/readable-stream/tee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ export function ReadableByteStreamTee(stream: ReadableByteStream): [ReadableByte
reading = false;
}
};
ReadableStreamBYOBReaderRead(reader, view, readIntoRequest);
ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm(): Promise<void> {
Expand Down
22 changes: 20 additions & 2 deletions src/lib/validators/reader-options.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { assertDictionary } from './basic';
import type { ReadableStreamGetReaderOptions } from '../readable-stream/reader-options';
import { assertDictionary, convertUnsignedLongLongWithEnforceRange } from './basic';
import type {
ReadableStreamBYOBReaderReadOptions,
ReadableStreamGetReaderOptions,
ValidatedReadableStreamBYOBReaderReadOptions
} from '../readable-stream/reader-options';

export function convertReaderOptions(options: ReadableStreamGetReaderOptions | null | undefined,
context: string): ReadableStreamGetReaderOptions {
Expand All @@ -17,3 +21,17 @@ function convertReadableStreamReaderMode(mode: string, context: string): 'byob'
}
return mode;
}

export function convertByobReadOptions(
options: ReadableStreamBYOBReaderReadOptions | null | undefined,
context: string
): ValidatedReadableStreamBYOBReaderReadOptions {
assertDictionary(options, context);
const min = options?.min ?? 1;
return {
min: convertUnsignedLongLongWithEnforceRange(
min,
`${context} has member 'min' that`
)
};
}
2 changes: 2 additions & 0 deletions src/ponyfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ReadableStream,
type ReadableStreamAsyncIterator,
ReadableStreamBYOBReader,
type ReadableStreamBYOBReaderReadOptions,
type ReadableStreamBYOBReadResult,
ReadableStreamBYOBRequest,
ReadableStreamDefaultController,
Expand Down Expand Up @@ -56,6 +57,7 @@ export {
type StreamPipeOptions,
type ReadableStreamDefaultReadResult,
type ReadableStreamBYOBReadResult,
type ReadableStreamBYOBReaderReadOptions,
ReadableStreamDefaultController,
ReadableByteStreamController,
ReadableStreamBYOBRequest,
Expand Down
2 changes: 1 addition & 1 deletion test/web-platform-tests
2 changes: 1 addition & 1 deletion test/wpt/shared/exclusions.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const ignoredFailuresMinified = {
'idlharness.any.html': [
// Terser turns `(a = undefined) => {}` into `(a) => {}`, changing the function's length property
// Therefore we cannot correctly implement methods with optional arguments
/interface: operation (abort|cancel|enqueue|error|getReader|write)/,
/interface: operation (abort|cancel|enqueue|error|getReader|read|write)/,
// Same thing for ReadableStream.values(), which is tested as part of the async iterable declaration
'ReadableStream interface: async iterable<any>'
]
Expand Down

0 comments on commit 645bc83

Please sign in to comment.