Skip to content

Commit

Permalink
Merge pull request #117 from MattiasBuelens/update-20220418
Browse files Browse the repository at this point in the history
Update to spec version of 18 April 2022
  • Loading branch information
MattiasBuelens authored May 14, 2022
2 parents 9fb1e69 + 172da27 commit f6ed190
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 60 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
> - 🏠 Internal
> - 💅 Polish
## Unreleased

* 👓 Align with [spec version `e9355ce`](https://github.com/whatwg/streams/tree/e9355ce79925947e8eb496563d599c329769d315/) ([#115](https://github.com/MattiasBuelens/web-streams-polyfill/issues/115), [#117](https://github.com/MattiasBuelens/web-streams-polyfill/pull/117))

## v4.0.0-beta.2 (2022-04-12)

* 🚀 Support calling `ReadableStream.pipeTo(writable)` and `.pipeThrough({ readable, writable })` when `writable` is a native (i.e. not polyfilled) `WritableStream`. ([#99](https://github.com/MattiasBuelens/web-streams-polyfill/pull/99), [#101](https://github.com/MattiasBuelens/web-streams-polyfill/pull/101))
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ If you need to support older browsers or Node versions that do not have a native

## Compliance

The polyfill implements [version `4b6b93c` (25 Oct 2021)][spec-snapshot] of the streams specification.
The polyfill implements [version `e9355ce` (18 Apr 2022)][spec-snapshot] of the streams specification.

The polyfill is tested against the same [web platform tests][wpt] that are used by browsers to test their native implementations.
It aims to pass all tests, although it allows some exceptions for practical reasons:
Expand Down Expand Up @@ -120,13 +120,13 @@ Thanks to these people for their work on [the original polyfill][creatorrr-polyf
[rs-asynciterator]: https://streams.spec.whatwg.org/#rs-asynciterator
[ws-controller-signal]: https://streams.spec.whatwg.org/#ws-default-controller-signal
[abortcontroller-polyfill]: https://www.npmjs.com/package/abortcontroller-polyfill
[spec-snapshot]: https://streams.spec.whatwg.org/commit-snapshots/4b6b93c69e531e2fe45a6ed4cb1484a7ba4eb8bb/
[wpt]: https://github.com/web-platform-tests/wpt/tree/96ca25f0f7526282c0d47e6bf6a7edd439da1968/streams
[wpt-bad-buffers]: https://github.com/web-platform-tests/wpt/blob/96ca25f0f7526282c0d47e6bf6a7edd439da1968/streams/readable-byte-streams/bad-buffers-and-views.any.js
[spec-snapshot]: https://streams.spec.whatwg.org/commit-snapshots/e9355ce79925947e8eb496563d599c329769d315/
[wpt]: https://github.com/web-platform-tests/wpt/tree/6a46d9cb8d20c510a620141c721b81b460a4ee55/streams
[wpt-bad-buffers]: https://github.com/web-platform-tests/wpt/blob/6a46d9cb8d20c510a620141c721b81b460a4ee55/streams/readable-byte-streams/bad-buffers-and-views.any.js
[proposal-arraybuffer-transfer]: https://github.com/domenic/proposal-arraybuffer-transfer
[ref-impl-transferarraybuffer]: https://github.com/whatwg/streams/blob/4b6b93c69e531e2fe45a6ed4cb1484a7ba4eb8bb/reference-implementation/lib/abstract-ops/ecmascript.js#L16
[ref-impl-transferarraybuffer]: https://github.com/whatwg/streams/blob/e9355ce79925947e8eb496563d599c329769d315/reference-implementation/lib/abstract-ops/ecmascript.js#L16
[issue-3]: https://github.com/MattiasBuelens/web-streams-polyfill/issues/3
[wpt-async-iterator-prototype]: https://github.com/web-platform-tests/wpt/blob/96ca25f0f7526282c0d47e6bf6a7edd439da1968/streams/readable-streams/async-iterator.any.js#L24
[wpt-async-iterator-prototype]: https://github.com/web-platform-tests/wpt/blob/6a46d9cb8d20c510a620141c721b81b460a4ee55/streams/readable-streams/async-iterator.any.js#L24
[stub-async-iterator-prototype]: https://github.com/MattiasBuelens/web-streams-polyfill/blob/v4.0.0-beta.2/src/lib/readable-stream/async-iterator.ts#L126-L134
[wpt-rs-patched-global]: https://github.com/web-platform-tests/wpt/blob/887350c2f46def5b01c4dd1f8d2eee35dfb9c5bb/streams/readable-streams/patched-global.any.js
[wpt-then-interception]: https://github.com/web-platform-tests/wpt/blob/cf33f00596af295ee0f207c88e23b5f8b0791307/streams/piping/then-interception.any.js
Expand Down
1 change: 1 addition & 0 deletions src/lib/abstract-ops/internal-methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export const AbortSteps = Symbol('[[AbortSteps]]');
export const ErrorSteps = Symbol('[[ErrorSteps]]');
export const CancelSteps = Symbol('[[CancelSteps]]');
export const PullSteps = Symbol('[[PullSteps]]');
export const ReleaseSteps = Symbol('[[ReleaseSteps]]');
25 changes: 10 additions & 15 deletions src/lib/readable-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import {
AcquireReadableStreamDefaultReader,
IsReadableStreamDefaultReader,
ReadableStreamDefaultReader,
ReadableStreamDefaultReaderErrorReadRequests,
type ReadableStreamDefaultReadResult
} from './readable-stream/default-reader';
import {
AcquireReadableStreamBYOBReader,
IsReadableStreamBYOBReader,
ReadableStreamBYOBReader,
ReadableStreamBYOBReaderErrorReadIntoRequests,
type ReadableStreamBYOBReadResult
} from './readable-stream/byob-reader';
import { ReadableStreamPipeTo } from './readable-stream/pipe';
Expand Down Expand Up @@ -459,10 +461,11 @@ export function ReadableStreamCancel<R>(stream: ReadableStream<R>, reason: any):

const reader = stream._reader;
if (reader !== undefined && IsReadableStreamBYOBReader(reader)) {
reader._readIntoRequests.forEach(readIntoRequest => {
const readIntoRequests = reader._readIntoRequests;
reader._readIntoRequests = new SimpleQueue();
readIntoRequests.forEach(readIntoRequest => {
readIntoRequest._closeSteps(undefined);
});
reader._readIntoRequests = new SimpleQueue();
}

const sourceCancelPromise = stream._readableStreamController[CancelSteps](reason);
Expand All @@ -483,10 +486,11 @@ export function ReadableStreamClose<R>(stream: ReadableStream<R>): void {
defaultReaderClosedPromiseResolve(reader);

if (IsReadableStreamDefaultReader<R>(reader)) {
reader._readRequests.forEach(readRequest => {
const readRequests = reader._readRequests;
reader._readRequests = new SimpleQueue();
readRequests.forEach(readRequest => {
readRequest._closeSteps();
});
reader._readRequests = new SimpleQueue();
}
}

Expand All @@ -506,19 +510,10 @@ export function ReadableStreamError<R>(stream: ReadableStream<R>, e: any): void
defaultReaderClosedPromiseReject(reader, e);

if (IsReadableStreamDefaultReader<R>(reader)) {
reader._readRequests.forEach(readRequest => {
readRequest._errorSteps(e);
});

reader._readRequests = new SimpleQueue();
ReadableStreamDefaultReaderErrorReadRequests(reader, e);
} else {
assert(IsReadableStreamBYOBReader(reader));

reader._readIntoRequests.forEach(readIntoRequest => {
readIntoRequest._errorSteps(e);
});

reader._readIntoRequests = new SimpleQueue();
ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e);
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/lib/readable-stream/byob-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,7 @@ export class ReadableStreamBYOBReader {
return;
}

if (this._readIntoRequests.length > 0) {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

ReadableStreamReaderGenericRelease(this);
ReadableStreamBYOBReaderRelease(this);
}
}

Expand Down Expand Up @@ -270,6 +266,20 @@ export function ReadableStreamBYOBReaderRead<T extends ArrayBufferView>(
}
}

export function ReadableStreamBYOBReaderRelease(reader: ReadableStreamBYOBReader) {
ReadableStreamReaderGenericRelease(reader);
const e = new TypeError('Reader was released');
ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e);
}

export function ReadableStreamBYOBReaderErrorReadIntoRequests(reader: ReadableStreamBYOBReader, e: any) {
const readIntoRequests = reader._readIntoRequests;
reader._readIntoRequests = new SimpleQueue();
readIntoRequests.forEach(readIntoRequest => {
readIntoRequest._errorSteps(e);
});
}

// Helper functions for the ReadableStreamBYOBReader.

function byobReaderBrandCheckException(name: string): TypeError {
Expand Down
109 changes: 94 additions & 15 deletions src/lib/readable-stream/byte-stream-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SimpleQueue } from '../simple-queue';
import { ResetQueue } from '../abstract-ops/queue-with-sizes';
import type { ReadRequest } from './default-reader';
import {
IsReadableStreamDefaultReader,
ReadableStreamAddReadRequest,
ReadableStreamFulfillReadRequest,
ReadableStreamGetNumReadRequests,
Expand All @@ -27,7 +28,7 @@ import {
IsDetachedBuffer,
TransferArrayBuffer
} from '../abstract-ops/ecmascript';
import { CancelSteps, PullSteps } from '../abstract-ops/internal-methods';
import { CancelSteps, PullSteps, ReleaseSteps } from '../abstract-ops/internal-methods';
import { promiseResolvedWith, uponPromise } from '../helpers/webidl';
import { assertRequiredArgument, convertUnsignedLongLongWithEnforceRange } from '../validators/basic';

Expand Down Expand Up @@ -155,7 +156,7 @@ interface DefaultPullIntoDescriptor {
bytesFilled: number;
elementSize: number;
viewConstructor: ArrayBufferViewConstructor<Uint8Array>;
readerType: 'default';
readerType: 'default' | 'none';
}

interface BYOBPullIntoDescriptor<T extends ArrayBufferView = ArrayBufferView> {
Expand All @@ -166,7 +167,7 @@ interface BYOBPullIntoDescriptor<T extends ArrayBufferView = ArrayBufferView> {
bytesFilled: number;
elementSize: number;
viewConstructor: ArrayBufferViewConstructor<T>;
readerType: 'byob';
readerType: 'byob' | 'none';
}

/**
Expand Down Expand Up @@ -313,14 +314,7 @@ export class ReadableByteStreamController {
if (this._queueTotalSize > 0) {
assert(ReadableStreamGetNumReadRequests(stream) === 0);

const entry = this._queue.shift()!;
this._queueTotalSize -= entry.byteLength;

ReadableByteStreamControllerHandleQueueDrain(this);

const view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);

readRequest._chunkSteps(view);
ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest);
return;
}

Expand Down Expand Up @@ -351,6 +345,17 @@ export class ReadableByteStreamController {
ReadableStreamAddReadRequest(stream, readRequest);
ReadableByteStreamControllerCallPullIfNeeded(this);
}

/** @internal */
[ReleaseSteps](): void {
if (this._pendingPullIntos.length > 0) {
const firstPullInto = this._pendingPullIntos.peek();
firstPullInto.readerType = 'none';

this._pendingPullIntos = new SimpleQueue();
this._pendingPullIntos.push(firstPullInto);
}
}
}

Object.defineProperties(ReadableByteStreamController.prototype, {
Expand Down Expand Up @@ -442,6 +447,7 @@ function ReadableByteStreamControllerCommitPullIntoDescriptor<T extends ArrayBuf
pullIntoDescriptor: PullIntoDescriptor<T>
) {
assert(stream._state !== 'errored');
assert(pullIntoDescriptor.readerType !== 'none');

let done = false;
if (stream._state === 'closed') {
Expand Down Expand Up @@ -479,6 +485,34 @@ function ReadableByteStreamControllerEnqueueChunkToQueue(controller: ReadableByt
controller._queueTotalSize += byteLength;
}

function ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller: ReadableByteStreamController,
buffer: ArrayBufferLike,
byteOffset: number,
byteLength: number) {
let clonedChunk;
try {
clonedChunk = buffer.slice(byteOffset, byteOffset + byteLength);
} catch (cloneE) {
ReadableByteStreamControllerError(controller, cloneE);
throw cloneE;
}
ReadableByteStreamControllerEnqueueChunkToQueue(controller, clonedChunk, 0, byteLength);
}

function ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller: ReadableByteStreamController,
firstDescriptor: PullIntoDescriptor) {
assert(firstDescriptor.readerType === 'none');
if (firstDescriptor.bytesFilled > 0) {
ReadableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
firstDescriptor.buffer,
firstDescriptor.byteOffset,
firstDescriptor.bytesFilled
);
}
ReadableByteStreamControllerShiftPendingPullInto(controller);
}

function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller: ReadableByteStreamController,
pullIntoDescriptor: PullIntoDescriptor) {
const elementSize = pullIntoDescriptor.elementSize;
Expand Down Expand Up @@ -567,6 +601,7 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro
}

const pullIntoDescriptor = controller._pendingPullIntos.peek();
assert(pullIntoDescriptor.readerType !== 'none');

if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor)) {
ReadableByteStreamControllerShiftPendingPullInto(controller);
Expand All @@ -579,6 +614,18 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro
}
}

function ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller: ReadableByteStreamController) {
const reader = controller._controlledReadableByteStream._reader;
assert(IsReadableStreamDefaultReader(reader));
while (reader._readRequests.length > 0) {
if (controller._queueTotalSize === 0) {
return;
}
const readRequest = reader._readRequests.shift();
ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest);
}
}

export function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(
controller: ReadableByteStreamController,
view: T,
Expand Down Expand Up @@ -657,6 +704,10 @@ function ReadableByteStreamControllerRespondInClosedState(controller: ReadableBy
firstDescriptor: PullIntoDescriptor) {
assert(firstDescriptor.bytesFilled === 0);

if (firstDescriptor.readerType === 'none') {
ReadableByteStreamControllerShiftPendingPullInto(controller);
}

const stream = controller._controlledReadableByteStream;
if (ReadableStreamHasBYOBReader(stream)) {
while (ReadableStreamGetNumReadIntoRequests(stream) > 0) {
Expand All @@ -673,6 +724,12 @@ function ReadableByteStreamControllerRespondInReadableState(controller: Readable

ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);

if (pullIntoDescriptor.readerType === 'none') {
ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor);
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
return;
}

if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
return;
}
Expand All @@ -682,8 +739,12 @@ function ReadableByteStreamControllerRespondInReadableState(controller: Readable
const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
if (remainderSize > 0) {
const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
const remainder = ArrayBufferSlice(pullIntoDescriptor.buffer, end - remainderSize, end);
ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);
ReadableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
pullIntoDescriptor.buffer,
end - remainderSize,
remainderSize
);
}

pullIntoDescriptor.bytesFilled -= remainderSize;
Expand Down Expand Up @@ -807,12 +868,15 @@ export function ReadableByteStreamControllerEnqueue(controller: ReadableByteStre
'The BYOB request\'s buffer has been detached and so cannot be filled with an enqueued chunk'
);
}
ReadableByteStreamControllerInvalidateBYOBRequest(controller);
firstPendingPullInto.buffer = TransferArrayBuffer(firstPendingPullInto.buffer);
if (firstPendingPullInto.readerType === 'none') {
ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto);
}
}

ReadableByteStreamControllerInvalidateBYOBRequest(controller);

if (ReadableStreamHasDefaultReader(stream)) {
ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller);
if (ReadableStreamGetNumReadRequests(stream) === 0) {
assert(controller._pendingPullIntos.length === 0);
ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);
Expand Down Expand Up @@ -851,6 +915,21 @@ export function ReadableByteStreamControllerError(controller: ReadableByteStream
ReadableStreamError(stream, e);
}

export function ReadableByteStreamControllerFillReadRequestFromQueue(
controller: ReadableByteStreamController,
readRequest: ReadRequest<Uint8Array>
) {
assert(controller._queueTotalSize > 0);

const entry = controller._queue.shift();
controller._queueTotalSize -= entry.byteLength;

ReadableByteStreamControllerHandleQueueDrain(controller);

const view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
readRequest._chunkSteps(view);
}

export function ReadableByteStreamControllerGetBYOBRequest(
controller: ReadableByteStreamController
): ReadableStreamBYOBRequest | null {
Expand Down
7 changes: 6 additions & 1 deletion src/lib/readable-stream/default-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type { ReadableStream } from '../readable-stream';
import { IsReadableStreamLocked, ReadableStreamClose, ReadableStreamError } from '../readable-stream';
import type { ValidatedUnderlyingSource } from './underlying-source';
import { setFunctionName, typeIsObject } from '../helpers/miscellaneous';
import { CancelSteps, PullSteps } from '../abstract-ops/internal-methods';
import { CancelSteps, PullSteps, ReleaseSteps } from '../abstract-ops/internal-methods';
import { promiseResolvedWith, uponPromise } from '../helpers/webidl';

/**
Expand Down Expand Up @@ -132,6 +132,11 @@ export class ReadableStreamDefaultController<R> {
ReadableStreamDefaultControllerCallPullIfNeeded(this);
}
}

/** @internal */
[ReleaseSteps](): void {
// Do nothing.
}
}

Object.defineProperties(ReadableStreamDefaultController.prototype, {
Expand Down
Loading

0 comments on commit f6ed190

Please sign in to comment.