Skip to content

Commit

Permalink
Have internal streams return empty Uint8Array on end of byob stream (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Apr 26, 2024
1 parent c150442 commit 4e282ed
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 3 deletions.
25 changes: 23 additions & 2 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <kj/vector.h>
#include <workerd/api/util.h>
#include <workerd/util/string-buffer.h>
#include <workerd/io/features.h>

namespace workerd::api {

Expand Down Expand Up @@ -543,6 +544,15 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
if (maybeByobOptions != kj::none && FeatureFlags::get(js).getInternalStreamByobReturn()) {
// When using the BYOB reader, we must return a sized-0 Uint8Array that is backed
// by the ArrayBuffer passed in the options.
auto u8 = v8::Uint8Array::New(v8::ArrayBuffer::New(js.v8Isolate, store), 0, 0);
return js.resolvedPromise(ReadResult {
.value = js.v8Ref(u8.As<v8::Value>()),
.done = true,
});
}
return js.resolvedPromise(ReadResult { .done = true });
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
Expand Down Expand Up @@ -582,8 +592,10 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
// That's a larger refactor, though.
auto& ioContext = IoContext::current();
return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js,
ioContext.addFunctor([this,store = kj::mv(store), byteOffset, byteLength]
(jsg::Lock& js, size_t amount) mutable -> jsg::Promise<ReadResult> {
ioContext.addFunctor(
[this,store = kj::mv(store), byteOffset, byteLength,
isByob = maybeByobOptions != kj::none]
(jsg::Lock& js, size_t amount) mutable -> jsg::Promise<ReadResult> {
readPending = false;
KJ_ASSERT(amount <= byteLength);
if (amount == 0) {
Expand All @@ -593,6 +605,15 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
KJ_IF_SOME(o, owner) {
o.signalEof(js);
}
if (isByob && FeatureFlags::get(js).getInternalStreamByobReturn()) {
// When using the BYOB reader, we must return a sized-0 Uint8Array that is backed
// by the ArrayBuffer passed in the options.
auto u8 = v8::Uint8Array::New(v8::ArrayBuffer::New(js.v8Isolate, store), 0, 0);
return js.resolvedPromise(ReadResult {
.value = js.v8Ref(u8.As<v8::Value>()),
.done = true,
});
}
return js.resolvedPromise(ReadResult { .done = true });
}
// Return a slice so the script can see how many bytes were read.
Expand Down
21 changes: 21 additions & 0 deletions src/workerd/api/tests/streams-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,27 @@ export const abortWriterAfterGc = {
}
};

export const finalReadOnInternalStreamReturnsBuffer = {
async test() {
const { readable, writable } = new IdentityTransformStream();
const writer = writable.getWriter();
await writer.close();

const reader = readable.getReader({ mode: 'byob' });
let result = await reader.read(new Uint8Array(10));
strictEqual(result.done, true);
ok(result.value instanceof Uint8Array);
strictEqual(result.value.byteLength, 0);
strictEqual(result.value.buffer.byteLength, 10);

result = await reader.read(new Uint8Array(10));
strictEqual(result.done, true);
ok(result.value instanceof Uint8Array);
strictEqual(result.value.byteLength, 0);
strictEqual(result.value.buffer.byteLength, 10);
}
};

export default {
async fetch(request, env) {
strictEqual(request.headers.get('content-length'), '10');
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/tests/streams-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const unitTests :Workerd.Config = (
(name = "worker", esModule = embed "streams-test.js")
],
compatibilityDate = "2023-01-15",
compatibilityFlags = ["nodejs_compat"],
compatibilityFlags = ["nodejs_compat", "internal_stream_byob_return_view"],
bindings = [
(name = "subrequest", service = "streams-test")
]
Expand Down
11 changes: 11 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,15 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# type of Durable Object stubs -- support RPC. If so, this type will have a wildcard method, so
# it will appear that all possible property names are present on any fetcher instance. This could
# break code that tries to infer types based on the presence or absence of methods.

internalStreamByobReturn @47 :Bool
$compatEnableFlag("internal_stream_byob_return_view")
$compatDisableFlag("internal_stream_byob_return_undefined")
$compatEnableDate("2024-05-13");
# Sadly, the original implementation of ReadableStream (now called "internal" streams), did not
# properly implement the result of ReadableStreamBYOBReader's read method. When done = true,
# per the spec, the result `value` must be an empty ArrayBufferView whose underlying ArrayBuffer
# is the same as the one passed to the read method. Our original implementation returned
# undefined instead. This flag changes the behavior to match the spec and to match the behavior
# implemented by the JS-backed ReadableStream implementation.
}

0 comments on commit 4e282ed

Please sign in to comment.