Skip to content

Commit

Permalink
stream: add ReadableByteStream.tee()
Browse files Browse the repository at this point in the history
This supports teeing readable byte streams to meet
the latest web streams standards.

Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com
PR-URL: nodejs/node#44505
Refs: https://streams.spec.whatwg.org/#readable-stream-tee
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
  • Loading branch information
daeyeon authored and guangwong committed Jan 3, 2023
1 parent b1ac803 commit a832f57
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 12 deletions.
4 changes: 4 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ is active.

<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/44505
description: Support teeing a readable byte stream.
-->

* Returns: {ReadableStream\[]}
Expand Down
302 changes: 291 additions & 11 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const {
ArrayBufferViewGetByteOffset,
ArrayBufferGetByteLength,
AsyncIterator,
cloneAsUint8Array,
copyArrayBuffer,
customInspect,
dequeueValue,
Expand Down Expand Up @@ -211,6 +212,7 @@ class ReadableStream {
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
Expand Down Expand Up @@ -1103,7 +1105,6 @@ class ReadableByteStreamController {
chunk);
}
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
Expand All @@ -1114,11 +1115,7 @@ class ReadableByteStreamController {
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
if (this[kState].stream[kState].state !== 'readable')
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
readableByteStreamControllerEnqueue(
this,
chunkBuffer,
chunkByteLength,
chunkByteOffset);
readableByteStreamControllerEnqueue(this, chunk);
}

/**
Expand Down Expand Up @@ -1416,6 +1413,13 @@ function readableStreamPipeTo(
}

function readableStreamTee(stream, cloneForBranch2) {
if (isReadableByteStreamController(stream[kState].controller)) {
return readableByteStreamTee(stream);
}
return readableStreamDefaultTee(stream, cloneForBranch2);
}

function readableStreamDefaultTee(stream, cloneForBranch2) {
const reader = new ReadableStreamDefaultReader(stream);
let reading = false;
let canceled1 = false;
Expand Down Expand Up @@ -1510,6 +1514,282 @@ function readableStreamTee(stream, cloneForBranch2) {
return [branch1, branch2];
}

function readableByteStreamTee(stream) {
assert(isReadableStream(stream));
assert(isReadableByteStreamController(stream[kState].controller));

let reader = new ReadableStreamDefaultReader(stream);
let reading = false;
let readAgainForBranch1 = false;
let readAgainForBranch2 = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
let branch1;
let branch2;
const cancelDeferred = createDeferredPromise();

function forwardReaderError(thisReader) {
PromisePrototypeThen(
thisReader[kState].close.promise,
undefined,
(error) => {
if (thisReader !== reader) {
return;
}
readableStreamDefaultControllerError(branch1[kState].controller, error);
readableStreamDefaultControllerError(branch2[kState].controller, error);
if (!canceled1 || !canceled2) {
cancelDeferred.resolve();
}
}
);
}

function pullWithDefaultReader() {
if (isReadableStreamBYOBReader(reader)) {
reader = new ReadableStreamDefaultReader(stream);
forwardReaderError(reader);
}

const readRequest = {
[kChunk](chunk) {
queueMicrotask(() => {
readAgainForBranch1 = false;
readAgainForBranch2 = false;
const chunk1 = chunk;
let chunk2 = chunk;

if (!canceled1 && !canceled2) {
try {
chunk2 = cloneAsUint8Array(chunk);
} catch (error) {
readableByteStreamControllerError(
branch1[kState].controller,
error
);
readableByteStreamControllerError(
branch2[kState].controller,
error
);
cancelDeferred.resolve(readableStreamCancel(stream, error));
return;
}
}
if (!canceled1) {
readableByteStreamControllerEnqueue(
branch1[kState].controller,
chunk1
);
}
if (!canceled2) {
readableByteStreamControllerEnqueue(
branch2[kState].controller,
chunk2
);
}
reading = false;

if (readAgainForBranch1) {
pull1Algorithm();
} else if (readAgainForBranch2) {
pull2Algorithm();
}
});
},
[kClose]() {
reading = false;

if (!canceled1) {
readableByteStreamControllerClose(branch1[kState].controller);
}
if (!canceled2) {
readableByteStreamControllerClose(branch2[kState].controller);
}
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
readableByteStreamControllerRespond(branch1[kState].controller, 0);
}
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
readableByteStreamControllerRespond(branch2[kState].controller, 0);
}
if (!canceled1 || !canceled2) {
cancelDeferred.resolve();
}
},
[kError]() {
reading = false;
},
};

readableStreamDefaultReaderRead(reader, readRequest);
}

function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
reader = new ReadableStreamBYOBReader(stream);
forwardReaderError(reader);
}

const byobBranch = forBranch2 === true ? branch2 : branch1;
const otherBranch = forBranch2 === false ? branch2 : branch1;
const readIntoRequest = {
[kChunk](chunk) {
queueMicrotask(() => {
readAgainForBranch1 = false;
readAgainForBranch2 = false;
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;

if (!otherCanceled) {
let clonedChunk;

try {
clonedChunk = cloneAsUint8Array(chunk);
} catch (error) {
readableByteStreamControllerError(
byobBranch[kState].controller,
error
);
readableByteStreamControllerError(
otherBranch[kState].controller,
error
);
cancelDeferred.resolve(readableStreamCancel(stream, error));
return;
}
if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}

readableByteStreamControllerEnqueue(
otherBranch[kState].controller,
clonedChunk
);
} else if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}
reading = false;

if (readAgainForBranch1) {
pull1Algorithm();
} else if (readAgainForBranch2) {
pull2Algorithm();
}
});
},
[kClose](chunk) {
reading = false;

const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;

if (!byobCanceled) {
readableByteStreamControllerClose(byobBranch[kState].controller);
}
if (!otherCanceled) {
readableByteStreamControllerClose(otherBranch[kState].controller);
}
if (chunk !== undefined) {
if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}
if (
!otherCanceled &&
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
) {
readableByteStreamControllerRespond(
otherBranch[kState].controller,
0
);
}
}
if (!byobCanceled || !otherCanceled) {
cancelDeferred.resolve();
}
},
[kError]() {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
}

function pull1Algorithm() {
if (reading) {
readAgainForBranch1 = true;
return PromiseResolve();
}
reading = true;

const byobRequest = branch1[kState].controller.byobRequest;
if (byobRequest === null) {
pullWithDefaultReader();
} else {
pullWithBYOBReader(byobRequest[kState].view, false);
}
return PromiseResolve();
}

function pull2Algorithm() {
if (reading) {
readAgainForBranch2 = true;
return PromiseResolve();
}
reading = true;

const byobRequest = branch2[kState].controller.byobRequest;
if (byobRequest === null) {
pullWithDefaultReader();
} else {
pullWithBYOBReader(byobRequest[kState].view, true);
}
return PromiseResolve();
}

function cancel1Algorithm(reason) {
canceled1 = true;
reason1 = reason;
if (canceled2) {
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
}
return cancelDeferred.promise;
}

function cancel2Algorithm(reason) {
canceled2 = true;
reason2 = reason;
if (canceled1) {
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
}
return cancelDeferred.promise;
}

branch1 = new ReadableStream({
type: 'bytes',
pull: pull1Algorithm,
cancel: cancel1Algorithm,
});
branch2 = new ReadableStream({
type: 'bytes',
pull: pull2Algorithm,
cancel: cancel2Algorithm,
});

forwardReaderError(reader);

return [branch1, branch2];
}

function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
const {
buffer,
Expand Down Expand Up @@ -2273,18 +2553,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
desc.bytesFilled += size;
}

function readableByteStreamControllerEnqueue(
controller,
buffer,
byteLength,
byteOffset) {
function readableByteStreamControllerEnqueue(controller, chunk) {
const {
closeRequested,
pendingPullIntos,
queue,
stream,
} = controller[kState];

const buffer = ArrayBufferViewGetBuffer(chunk);
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
const byteLength = ArrayBufferViewGetByteLength(chunk);

if (closeRequested || stream[kState].state !== 'readable')
return;

Expand Down
Loading

0 comments on commit a832f57

Please sign in to comment.