diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index a4504d96e04..82a1ff6f65f 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -461,6 +461,8 @@ class ReadableStreamController { virtual bool isClosedOrErrored() const = 0; + virtual bool isClosed() const = 0; + virtual bool isDisturbed() = 0; // True if a Reader has been locked to this controller. @@ -673,6 +675,10 @@ class WritableStreamController { jsg::Optional queuingStrategy) {} virtual bool isClosedOrClosing() = 0; + virtual bool isErrored() = 0; + + // True is this controller requires ArrayBuffer(Views) to be written to it. + virtual bool isByteOriented() const = 0; }; kj::Own newWritableStreamJsController(); diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 9967458f4b8..af43008fed6 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -1281,6 +1281,10 @@ bool WritableStreamInternalController::isClosedOrClosing() { return state.is() || isClosing || isFlushing; } +bool WritableStreamInternalController::isErrored() { + return state.is(); +} + void WritableStreamInternalController::doClose(jsg::Lock& js) { state.init(); KJ_IF_SOME(locked, writeState.tryGet()) { diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 44f40d2002d..287f662016e 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -73,6 +73,10 @@ class ReadableStreamInternalController: public ReadableStreamController { return state.is() || state.is(); } + bool isClosed() const override { + return state.is(); + } + bool isDisturbed() override { return disturbed; } bool isLockedToReader() const override { return !readState.is(); } @@ -205,6 +209,9 @@ class WritableStreamInternalController: public WritableStreamController { void setHighWaterMark(uint64_t highWaterMark); bool isClosedOrClosing() override; + bool isErrored() override; + + inline bool isByteOriented() const override { return true; } private: struct AbortOptions { diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 9a7649fc6e4..535688bb7b6 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -420,6 +420,22 @@ kj::Array> ReadableStream::tee(jsg::Lock& js) { return kj::arr(kj::mv(tee.branch1), kj::mv(tee.branch2)); } +jsg::JsString ReadableStream::inspectState(jsg::Lock& js) { + if (controller->isClosedOrErrored()) { + return js.strIntern(controller->isClosed() ? "closed"_kj : "errored"_kj); + } else { + return js.strIntern("readable"_kj); + } +} + +bool ReadableStream::inspectSupportsBYOB() { + return controller->isByteOriented(); +} + +jsg::Optional ReadableStream::inspectLength() { + return tryGetLength(StreamEncoding::IDENTITY); +} + jsg::Promise> ReadableStream::nextFunction( jsg::Lock& js, AsyncIteratorState& state) { diff --git a/src/workerd/api/streams/readable.h b/src/workerd/api/streams/readable.h index 102bc28b308..6f1c8a1d71b 100644 --- a/src/workerd/api/streams/readable.h +++ b/src/workerd/api/streams/readable.h @@ -270,6 +270,10 @@ class ReadableStream: public jsg::Object { // data as this ReadableStream would. kj::Array> tee(jsg::Lock& js); + jsg::JsString inspectState(jsg::Lock& js); + bool inspectSupportsBYOB(); + jsg::Optional inspectLength(); + JSG_RESOURCE_TYPE(ReadableStream, CompatibilityFlags::Reader flags) { if (flags.getJsgPropertyOnPrototypeTemplate()) { JSG_READONLY_PROTOTYPE_PROPERTY(locked, isLocked); @@ -283,6 +287,10 @@ class ReadableStream: public jsg::Object { JSG_METHOD(tee); JSG_METHOD(values); + JSG_INSPECT_PROPERTY(state, inspectState); + JSG_INSPECT_PROPERTY(supportsBYOB, inspectSupportsBYOB); + JSG_INSPECT_PROPERTY(length, inspectLength); + JSG_ASYNC_ITERABLE(values); if (flags.getJsgPropertyOnPrototypeTemplate()) { diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index c8c5c0a5158..d48b6f2070a 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -613,6 +613,8 @@ public: bool isClosedOrErrored() const override; + bool isClosed() const override; + bool isLockedToReader() const override; bool lockReader(jsg::Lock& js, Reader& reader) override; @@ -777,6 +779,9 @@ public: } bool isClosedOrClosing() override; + bool isErrored() override; + + inline bool isByteOriented() const override { return false; } private: jsg::Promise pipeLoop(jsg::Lock& js); @@ -2343,6 +2348,13 @@ bool ReadableStreamJsController::isClosedOrErrored() const { return state.is() || state.is(); } +bool ReadableStreamJsController::isClosed() const { + KJ_IF_SOME(s, maybePendingState) { + return s.is(); + } + return state.is(); +} + bool ReadableStreamJsController::isDisturbed() { return disturbed; } bool ReadableStreamJsController::isLockedToReader() const { @@ -3402,7 +3414,11 @@ jsg::Ref WritableStreamJsController::addRef() { } bool WritableStreamJsController::isClosedOrClosing() { - KJ_UNIMPLEMENTED("Only defined in WritableStreamInternalController."); + return state.is(); +} + +bool WritableStreamJsController::isErrored() { + return state.is(); } jsg::Promise WritableStreamJsController::close(jsg::Lock& js, bool markAsHandled) { diff --git a/src/workerd/api/streams/streams-test.js b/src/workerd/api/streams/streams-test.js index d1fa610aeae..c4be0590573 100644 --- a/src/workerd/api/streams/streams-test.js +++ b/src/workerd/api/streams/streams-test.js @@ -3,6 +3,7 @@ // https://opensource.org/licenses/Apache-2.0 import * as assert from 'node:assert'; +import * as util from 'node:util'; export const partiallyReadStream= { async test(ctrl, env, ctx) { @@ -42,3 +43,245 @@ export const arrayBufferOfReadable = { assert.equal(10_000, read.byteLength); } } + +export const inspect = { + async test() { + const inspectOpts = { breakLength: Infinity }; + + // Check with JavaScript regular ReadableStream + { + let pulls = 0; + const readableStream = new ReadableStream({ + pull(controller) { + if (pulls === 0) controller.enqueue("hello"); + if (pulls === 1) controller.close(); + pulls++; + } + }); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }" + ); + + const reader = readableStream.getReader(); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }" + ); + + await reader.read(); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: false, [length]: undefined }" + ); + + await reader.read(); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: true, [state]: 'closed', [supportsBYOB]: false, [length]: undefined }" + ); + } + + // Check with errored JavaScript regular ReadableStream + { + const readableStream = new ReadableStream({ + start(controller) { + controller.error(new Error("Oops!")); + } + }); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: false, [state]: 'errored', [supportsBYOB]: false, [length]: undefined }" + ); + } + + // Check with JavaScript bytes ReadableStream + { + const readableStream = new ReadableStream({ + type: "bytes", + pull(controller) { + controller.enqueue(new Uint8Array([1])); + } + }); + assert.strictEqual( + util.inspect(readableStream, inspectOpts), + "ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined }" + ); + } + + // Check with JavaScript WritableStream + { + const writableStream = new WritableStream({ + write(chunk, controller) {} + }); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: false, [state]: 'writable', [expectsBytes]: false }" + ); + + const writer = writableStream.getWriter(); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: true, [state]: 'writable', [expectsBytes]: false }" + ); + + await writer.write("chunk"); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: true, [state]: 'writable', [expectsBytes]: false }" + ); + + await writer.close(); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: true, [state]: 'closed', [expectsBytes]: false }" + ); + } + + // Check with errored JavaScript WritableStream + { + const writableStream = new WritableStream({ + write(chunk, controller) { + controller.error(new Error("Oops!")); + } + }); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: false, [state]: 'writable', [expectsBytes]: false }" + ); + + const writer = writableStream.getWriter(); + const promise = writer.write("chunk"); + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: true, [state]: 'erroring', [expectsBytes]: false }" + ); + + await promise; + assert.strictEqual( + util.inspect(writableStream, inspectOpts), + "WritableStream { locked: true, [state]: 'errored', [expectsBytes]: false }" + ); + } + + // Check with internal known-length TransformStream + { + const inspectOpts = { breakLength: 100 }; + const transformStream = new FixedLengthStream(5); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: false, [state]: 'writable', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n } +}` + ); + + const { writable, readable } = transformStream; + const writer = writable.getWriter(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'writable', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n } +}` + ); + + void writer.write(new Uint8Array([1, 2, 3])); + void writer.write(new Uint8Array([4, 5])); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'writable', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n } +}` + ); + + void writer.close(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: 5n } +}` + ); + + const reader = readable.getReader(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 5n } +}` + ); + + await reader.read(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 2n } +}` + ); + + await reader.read(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: 0n } +}` + ); + + await reader.read(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`FixedLengthStream { + writable: WritableStream { locked: true, [state]: 'closed', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'closed', [supportsBYOB]: true, [length]: 0n } +}` + ); + } + + // Check with errored internal TransformStream + { + const inspectOpts = { breakLength: 100 }; + const transformStream = new IdentityTransformStream(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`IdentityTransformStream { + writable: WritableStream { locked: false, [state]: 'writable', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined } +}` + ); + + const { writable, readable } = transformStream; + const writer = writable.getWriter(); + void writer.abort(new Error("Oops!")); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`IdentityTransformStream { + writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true }, + readable: ReadableStream { locked: false, [state]: 'readable', [supportsBYOB]: true, [length]: undefined } +}` + ); + + const reader = readable.getReader(); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`IdentityTransformStream { + writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'readable', [supportsBYOB]: true, [length]: undefined } +}` + ); + + await reader.read().catch(() => {}); + assert.strictEqual( + util.inspect(transformStream, inspectOpts), +`IdentityTransformStream { + writable: WritableStream { locked: true, [state]: 'errored', [expectsBytes]: true }, + readable: ReadableStream { locked: true, [state]: 'errored', [supportsBYOB]: true, [length]: undefined } +}` + ); + } + } +}; diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index a324b110535..576cc76af3c 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -180,6 +180,22 @@ jsg::Promise WritableStreamDefaultWriter::write(jsg::Lock& js, v8::LocalisErrored()) { + return js.strIntern("errored"); + } else if (controller->isErroring(js) != kj::none) { + return js.strIntern("erroring"); + } else if (controller->isClosedOrClosing()) { + return js.strIntern("closed"); + } else { + return js.strIntern("writable"); + } +} + +bool WritableStream::inspectExpectsBytes() { + return controller->isByteOriented(); +} + void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) { visitor.visit(closedPromise, readyPromise); } diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index 654a4133155..a5063eef17d 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -135,6 +135,9 @@ class WritableStream: public jsg::Object { jsg::Ref getWriter(jsg::Lock& js); + jsg::JsString inspectState(jsg::Lock& js); + bool inspectExpectsBytes(); + JSG_RESOURCE_TYPE(WritableStream, CompatibilityFlags::Reader flags) { if (flags.getJsgPropertyOnPrototypeTemplate()) { JSG_READONLY_PROTOTYPE_PROPERTY(locked, isLocked); @@ -145,6 +148,9 @@ class WritableStream: public jsg::Object { JSG_METHOD(close); JSG_METHOD(getWriter); + JSG_INSPECT_PROPERTY(state, inspectState); + JSG_INSPECT_PROPERTY(expectsBytes, inspectExpectsBytes); + JSG_TS_OVERRIDE( { getWriter(): WritableStreamDefaultWriter; });