Skip to content

Commit

Permalink
Show Readable/WriteableStream state when inspecting
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbbot committed Oct 17, 2023
1 parent eeedb46 commit 50bbc20
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ class ReadableStreamController {

virtual bool isClosedOrErrored() const = 0;

virtual bool isClosed() const = 0;

virtual bool isDisturbed() = 0;

// True if a Reader has been locked to this controller.
Expand Down Expand Up @@ -673,6 +675,10 @@ class WritableStreamController {
jsg::Optional<StreamQueuingStrategy> queuingStrategy) {}

virtual bool isClosedOrClosing() = 0;
virtual bool isErrored() = 0;

// True is this controller requires ArrayBuffer(Views) to be written to it.
virtual bool isByteOriented() const = 0;
};

kj::Own<WritableStreamController> newWritableStreamJsController();
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,10 @@ bool WritableStreamInternalController::isClosedOrClosing() {
return state.is<StreamStates::Closed>() || isClosing || isFlushing;
}

bool WritableStreamInternalController::isErrored() {
return state.is<StreamStates::Errored>();
}

void WritableStreamInternalController::doClose(jsg::Lock& js) {
state.init<StreamStates::Closed>();
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class ReadableStreamInternalController: public ReadableStreamController {
return state.is<StreamStates::Closed>() || state.is<StreamStates::Errored>();
}

bool isClosed() const override {
return state.is<StreamStates::Closed>();
}

bool isDisturbed() override { return disturbed; }

bool isLockedToReader() const override { return !readState.is<Unlocked>(); }
Expand Down Expand Up @@ -205,6 +209,9 @@ class WritableStreamInternalController: public WritableStreamController {
void setHighWaterMark(uint64_t highWaterMark);

bool isClosedOrClosing() override;
bool isErrored() override;

inline bool isByteOriented() const override { return true; }
private:

struct AbortOptions {
Expand Down
16 changes: 16 additions & 0 deletions src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,22 @@ kj::Array<jsg::Ref<ReadableStream>> ReadableStream::tee(jsg::Lock& js) {
return kj::arr(kj::mv(tee.branch1), kj::mv(tee.branch2));
}

jsg::JsString ReadableStream::inspectState(jsg::Lock& js) {
if (controller->isClosedOrErrored()) {
return js.strIntern(controller->isClosed() ? "closed"_kj : "errored"_kj);
} else {
return js.strIntern("readable"_kj);
}
}

bool ReadableStream::inspectSupportsBYOB() {
return controller->isByteOriented();
}

jsg::Optional<uint64_t> ReadableStream::inspectLength() {
return tryGetLength(StreamEncoding::IDENTITY);
}

jsg::Promise<kj::Maybe<jsg::Value>> ReadableStream::nextFunction(
jsg::Lock& js,
AsyncIteratorState& state) {
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/api/streams/readable.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ class ReadableStream: public jsg::Object {
// data as this ReadableStream would.
kj::Array<jsg::Ref<ReadableStream>> tee(jsg::Lock& js);

jsg::JsString inspectState(jsg::Lock& js);
bool inspectSupportsBYOB();
jsg::Optional<uint64_t> inspectLength();

JSG_RESOURCE_TYPE(ReadableStream, CompatibilityFlags::Reader flags) {
if (flags.getJsgPropertyOnPrototypeTemplate()) {
JSG_READONLY_PROTOTYPE_PROPERTY(locked, isLocked);
Expand All @@ -283,6 +287,10 @@ class ReadableStream: public jsg::Object {
JSG_METHOD(tee);
JSG_METHOD(values);

JSG_INSPECT_PROPERTY(state, inspectState);
JSG_INSPECT_PROPERTY(supportsBYOB, inspectSupportsBYOB);
JSG_INSPECT_PROPERTY(length, inspectLength);

JSG_ASYNC_ITERABLE(values);

if (flags.getJsgPropertyOnPrototypeTemplate()) {
Expand Down
18 changes: 17 additions & 1 deletion src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ public:

bool isClosedOrErrored() const override;

bool isClosed() const override;

bool isLockedToReader() const override;

bool lockReader(jsg::Lock& js, Reader& reader) override;
Expand Down Expand Up @@ -777,6 +779,9 @@ public:
}

bool isClosedOrClosing() override;
bool isErrored() override;

inline bool isByteOriented() const override { return false; }

private:
jsg::Promise<void> pipeLoop(jsg::Lock& js);
Expand Down Expand Up @@ -2343,6 +2348,13 @@ bool ReadableStreamJsController::isClosedOrErrored() const {
return state.is<StreamStates::Closed>() || state.is<StreamStates::Errored>();
}

bool ReadableStreamJsController::isClosed() const {
KJ_IF_SOME(s, maybePendingState) {
return s.is<StreamStates::Closed>();
}
return state.is<StreamStates::Closed>();
}

bool ReadableStreamJsController::isDisturbed() { return disturbed; }

bool ReadableStreamJsController::isLockedToReader() const {
Expand Down Expand Up @@ -3487,7 +3499,11 @@ jsg::Ref<WritableStream> WritableStreamJsController::addRef() {
}

bool WritableStreamJsController::isClosedOrClosing() {
KJ_UNIMPLEMENTED("Only defined in WritableStreamInternalController.");
return state.is<StreamStates::Closed>();
}

bool WritableStreamJsController::isErrored() {
return state.is<StreamStates::Errored>();
}

jsg::Promise<void> WritableStreamJsController::close(jsg::Lock& js, bool markAsHandled) {
Expand Down
243 changes: 243 additions & 0 deletions src/workerd/api/streams/streams-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0

import * as assert from 'node:assert';
import * as util from 'node:util';

export const partiallyReadStream= {
async test(ctrl, env, ctx) {
Expand Down Expand Up @@ -42,3 +43,245 @@ export const arrayBufferOfReadable = {
assert.equal(10_000, read.byteLength);
}
}

export const inspect = {
async test() {
const inspectOpts = { breakLength: Infinity };

// Check with JavaScript regular ReadableStream
{
let pulls = 0;
const readableStream = new ReadableStream({
pull(controller) {
if (pulls === 0) controller.enqueue("hello");
if (pulls === 1) controller.close();
pulls++;
}
});
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }"
);

const reader = readableStream.getReader();
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }"
);

await reader.read();
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }"
);

await reader.read();
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: true, [state]: 'closed', [supportsBYOB]: false, [length]: undefined }"
);
}

// Check with errored JavaScript regular ReadableStream
{
const readableStream = new ReadableStream({
start(controller) {
controller.error(new Error("Oops!"));
}
});
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: false, [state]: 'errored', [supportsBYOB]: false, [length]: undefined }"
);
}

// Check with JavaScript bytes ReadableStream
{
const readableStream = new ReadableStream({
type: "bytes",
pull(controller) {
controller.enqueue(new Uint8Array([1]));
}
});
assert.strictEqual(
util.inspect(readableStream, inspectOpts),
"ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined }"
);
}

// Check with JavaScript WritableStream
{
const writableStream = new WritableStream({
write(chunk, controller) {}
});
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: false, [state]: 'writable', [expectsBytes]: false }"
);

const writer = writableStream.getWriter();
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: true, [state]: 'writable', [expectsBytes]: false }"
);

await writer.write("chunk");
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: true, [state]: 'writable', [expectsBytes]: false }"
);

await writer.close();
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: true, [state]: 'closed', [expectsBytes]: false }"
);
}

// Check with errored JavaScript WritableStream
{
const writableStream = new WritableStream({
write(chunk, controller) {
controller.error(new Error("Oops!"));
}
});
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: false, [state]: 'writable', [expectsBytes]: false }"
);

const writer = writableStream.getWriter();
const promise = writer.write("chunk");
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: true, [state]: 'erroring', [expectsBytes]: false }"
);

await promise;
assert.strictEqual(
util.inspect(writableStream, inspectOpts),
"WritableStream { locked: true, [state]: 'errored', [expectsBytes]: false }"
);
}

// Check with internal known-length TransformStream
{
const inspectOpts = { breakLength: 100 };
const transformStream = new FixedLengthStream(5);
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: false, [state]: 'writable', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n }
}`
);

const { writable, readable } = transformStream;
const writer = writable.getWriter();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'writable', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n }
}`
);

void writer.write(new Uint8Array([1, 2, 3]));
void writer.write(new Uint8Array([4, 5]));
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'writable', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n }
}`
);

void writer.close();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n }
}`
);

const reader = readable.getReader();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 5n }
}`
);

await reader.read();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 2n }
}`
);

await reader.read();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 0n }
}`
);

await reader.read();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`FixedLengthStream {
writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'closed', [supportsBYOB]: true, [length]: 0n }
}`
);
}

// Check with errored internal TransformStream
{
const inspectOpts = { breakLength: 100 };
const transformStream = new IdentityTransformStream();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`IdentityTransformStream {
writable: WritableStream { locked: false, [state]: 'writable', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined }
}`
);

const { writable, readable } = transformStream;
const writer = writable.getWriter();
void writer.abort(new Error("Oops!"));
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`IdentityTransformStream {
writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true },
readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined }
}`
);

const reader = readable.getReader();
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`IdentityTransformStream {
writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: undefined }
}`
);

await reader.read().catch(() => {});
assert.strictEqual(
util.inspect(transformStream, inspectOpts),
`IdentityTransformStream {
writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true },
readable: ReadableStream { locked: true, [state]: 'errored', [supportsBYOB]: true, [length]: undefined }
}`
);
}
}
};
Loading

0 comments on commit 50bbc20

Please sign in to comment.