diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index b335f10214d..2270f505a40 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -160,6 +160,7 @@ jsg::Ref setupSocket(jsg::Lock& js, auto openedPrPair = js.newPromiseAndResolver(); openedPrPair.promise.markAsHandled(js); auto writable = jsg::alloc(ioContext, kj::mv(sysStreams.writable), + ioContext.getMetrics().tryCreateWritableByteStreamObserver(), getWritableHighWaterMark(options), openedPrPair.promise.whenResolved(js)); auto result = jsg::alloc(js, ioContext, kj::mv(refcountedConnection), diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index 3706734efdf..b0a7a9d87d5 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -726,7 +726,8 @@ class WritableStreamController { kj::Own newWritableStreamJsController(); kj::Own newWritableStreamInternalController(IoContext& ioContext, - kj::Own source, + kj::Own sink, + kj::Maybe> observer, kj::Maybe maybeHighWaterMark = kj::none, kj::Maybe> maybeClosureWaitable = kj::none); diff --git a/src/workerd/api/streams/compression.c++ b/src/workerd/api/streams/compression.c++ index 6a0c76a2fa0..71106973d66 100644 --- a/src/workerd/api/streams/compression.c++ +++ b/src/workerd/api/streams/compression.c++ @@ -504,7 +504,8 @@ jsg::Ref CompressionStream::constructor(kj::String format) { auto& ioContext = IoContext::current(); return jsg::alloc(jsg::alloc(ioContext, kj::mv(readableSide)), - jsg::alloc(ioContext, kj::mv(writableSide))); + jsg::alloc(ioContext, kj::mv(writableSide), + ioContext.getMetrics().tryCreateWritableByteStreamObserver())); } jsg::Ref DecompressionStream::constructor(jsg::Lock& js, kj::String format) { @@ -521,7 +522,8 @@ jsg::Ref DecompressionStream::constructor(jsg::Lock& js, kj return jsg::alloc( jsg::alloc(ioContext, kj::mv(readableSide)), - jsg::alloc(ioContext, kj::mv(writableSide))); + jsg::alloc(ioContext, kj::mv(writableSide), + ioContext.getMetrics().tryCreateWritableByteStreamObserver())); } } // namespace workerd::api diff --git a/src/workerd/api/streams/internal-test.c++ b/src/workerd/api/streams/internal-test.c++ index bd2c3df9701..ece64b9c3b8 100644 --- a/src/workerd/api/streams/internal-test.c++ +++ b/src/workerd/api/streams/internal-test.c++ @@ -230,7 +230,8 @@ KJ_TEST("WritableStreamInternalController queue size assertion") { // allowed to be queued. jsg::Ref source = ReadableStream::constructor(env.js, kj::none, kj::none); - jsg::Ref sink = jsg::alloc(env.context, kj::heap()); + jsg::Ref sink = + jsg::alloc(env.context, kj::heap(), kj::none); auto pipeTo = source->pipeTo(env.js, sink.addRef(), PipeToOptions{.preventClose = true}); @@ -272,5 +273,84 @@ KJ_TEST("WritableStreamInternalController queue size assertion") { }); } +KJ_TEST("WritableStreamInternalController observability") { + + capnp::MallocMessageBuilder message; + auto flags = message.initRoot(); + flags.setNodeJsCompat(true); + flags.setWorkerdExperimental(true); + flags.setStreamsJavaScriptControllers(true); + + TestFixture::SetupParams params; + TestFixture fixture({.featureFlags = flags.asReader()}); + + class MySink final: public WritableStreamSink { + public: + kj::Promise write(kj::ArrayPtr buffer) override { + ++writeCount; + return kj::READY_NOW; + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return kj::READY_NOW; + } + kj::Promise end() override { + return kj::READY_NOW; + } + void abort(kj::Exception reason) override {} + uint getWriteCount() { + return writeCount; + } + + private: + uint writeCount = 0; + }; + + class MyObserver final: public ByteStreamObserver { + public: + virtual void onChunkEnqueued(size_t bytes) { + ++queueSize; + queueSizeBytes += bytes; + }; + virtual void onChunkDequeued(size_t bytes) { + queueSizeBytes -= bytes; + --queueSize; + }; + uint64_t queueSize = 0; + uint64_t queueSizeBytes = 0; + }; + + auto myObserver = kj::heap(); + auto& observer = *myObserver; + kj::Maybe> stream; + fixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise { + stream = jsg::alloc(env.context, kj::heap(), kj::mv(myObserver)); + + auto write = [&](size_t size) { + auto buffersource = env.js.bytes(kj::heapArray(size)); + return env.context.awaitJs(env.js, + KJ_ASSERT_NONNULL(stream)->getController().write(env.js, buffersource.getHandle(env.js))); + }; + + KJ_ASSERT(observer.queueSize == 0); + KJ_ASSERT(observer.queueSizeBytes == 0); + + auto builder = kj::heapArrayBuilder>(2); + builder.add(write(1)); + + KJ_ASSERT(observer.queueSize == 1); + KJ_ASSERT(observer.queueSizeBytes == 1); + + builder.add(write(10)); + + KJ_ASSERT(observer.queueSize == 2); + KJ_ASSERT(observer.queueSizeBytes == 11); + + return kj::joinPromises(builder.finish()); + }); + + KJ_ASSERT(observer.queueSize == 0); + KJ_ASSERT(observer.queueSizeBytes == 0); +} + } // namespace } // namespace workerd::api diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index c451d9d3b7a..5bae8a30624 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -933,6 +933,9 @@ jsg::Promise WritableStreamInternalController::write( auto prp = js.newPromiseAndResolver(); increaseCurrentWriteBufferSize(js, byteLength); + KJ_IF_SOME(o, observer) { + o->onChunkEnqueued(byteLength); + } auto ptr = kj::ArrayPtr(static_cast(store->Data()) + byteOffset, byteLength); queue.push_back( @@ -1598,6 +1601,9 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo auto& request = check(); maybeResolvePromise(js, request.promise); decreaseCurrentWriteBufferSize(js, amountToWrite); + KJ_IF_SOME(o, observer) { + o->onChunkDequeued(amountToWrite); + } queue.pop_front(); maybeAbort(js, request); return writeLoop(js, IoContext::current()); @@ -1610,6 +1616,9 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo auto& request = check(); auto& writable = state.get>(); decreaseCurrentWriteBufferSize(js, amountToWrite); + KJ_IF_SOME(o, observer) { + o->onChunkDequeued(amountToWrite); + } maybeRejectPromise(js, request.promise, handle); queue.pop_front(); if (!maybeAbort(js, request)) { @@ -2468,10 +2477,11 @@ kj::Own newReadableStreamInternalController( kj::Own newWritableStreamInternalController(IoContext& ioContext, kj::Own sink, + kj::Maybe> observer, kj::Maybe maybeHighWaterMark, kj::Maybe> maybeClosureWaitable) { return kj::heap( - kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable)); + kj::mv(sink), kj::mv(observer), maybeHighWaterMark, kj::mv(maybeClosureWaitable)); } kj::StringPtr WritableStreamInternalController::jsgGetMemoryName() const { diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index bddb9eef31f..3319251bb4d 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -8,6 +8,7 @@ #include "writable.h" #include +#include #include @@ -183,9 +184,11 @@ class WritableStreamInternalController: public WritableStreamController { explicit WritableStreamInternalController(StreamStates::Errored errored) : state(kj::mv(errored)) {} explicit WritableStreamInternalController(kj::Own writable, + kj::Maybe> observer, kj::Maybe maybeHighWaterMark = kj::none, kj::Maybe> maybeClosureWaitable = kj::none) : state(IoContext::current().addObject(kj::heap(kj::mv(writable)))), + observer(kj::mv(observer)), maybeHighWaterMark(maybeHighWaterMark), maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {} @@ -279,6 +282,8 @@ class WritableStreamInternalController: public WritableStreamController { kj::OneOf> state; kj::OneOf writeState = Unlocked(); + kj::Maybe> observer; + kj::Maybe maybePendingAbort; uint64_t currentWriteBufferSize = 0; diff --git a/src/workerd/api/streams/transform.c++ b/src/workerd/api/streams/transform.c++ index ae40d9d58ed..fd8cd656cf9 100644 --- a/src/workerd/api/streams/transform.c++ +++ b/src/workerd/api/streams/transform.c++ @@ -64,7 +64,7 @@ jsg::Ref TransformStream::constructor(jsg::Lock& js, .pull = maybeAddFunctor( JSG_VISITABLE_LAMBDA((controller = controller.addRef()), (controller), (jsg::Lock & js, auto c) mutable { return controller->pull(js); })), - .cancel = maybeAddFunctor(JSG_VISITABLE_LAMBDA( + .cancel = maybeAddFunctor( JSG_VISITABLE_LAMBDA( (controller = controller.addRef()), (controller), (jsg::Lock & js, auto reason) mutable { return controller->cancel(js, reason); })), .expectedLength = transformer.expectedLength.map( @@ -123,9 +123,9 @@ jsg::Ref IdentityTransformStream::constructor( KJ_IF_SOME(queuingStrategy, maybeQueuingStrategy) { maybeHighWaterMark = queuingStrategy.highWaterMark; } - return jsg::alloc(jsg::alloc(ioContext, kj::mv(pipe.in)), - jsg::alloc(ioContext, kj::mv(pipe.out), maybeHighWaterMark)); + jsg::alloc(ioContext, kj::mv(pipe.out), + ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark)); } jsg::Ref FixedLengthStream::constructor(jsg::Lock& js, @@ -147,7 +147,8 @@ jsg::Ref FixedLengthStream::constructor(jsg::Lock& js, } return jsg::alloc(jsg::alloc(ioContext, kj::mv(pipe.in)), - jsg::alloc(ioContext, kj::mv(pipe.out), maybeHighWaterMark)); + jsg::alloc(ioContext, kj::mv(pipe.out), + ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark)); } OneWayPipe newIdentityPipe(kj::Maybe expectedLength) { diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 85478c0c014..8d81b536211 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -216,10 +216,14 @@ void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) { WritableStream::WritableStream(IoContext& ioContext, kj::Own sink, + kj::Maybe> maybeObserver, kj::Maybe maybeHighWaterMark, kj::Maybe> maybeClosureWaitable) - : WritableStream(newWritableStreamInternalController( - ioContext, kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable))) {} + : WritableStream(newWritableStreamInternalController(ioContext, + kj::mv(sink), + kj::mv(maybeObserver), + maybeHighWaterMark, + kj::mv(maybeClosureWaitable))) {} WritableStream::WritableStream(kj::Own controller) : ioContext(tryGetIoContext()), @@ -611,7 +615,8 @@ jsg::Ref WritableStream::deserialize( auto stream = ioctx.getByteStreamFactory().capnpToKjExplicitEnd(ws.getByteStream()); auto sink = newSystemStream(kj::mv(stream), encoding, ioctx); - return jsg::alloc(ioctx, kj::mv(sink)); + return jsg::alloc( + ioctx, kj::mv(sink), ioctx.getMetrics().tryCreateWritableByteStreamObserver()); } void WritableStreamDefaultWriter::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index 10e636e4d6f..b46e312fe66 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -101,6 +101,7 @@ class WritableStream: public jsg::Object { public: explicit WritableStream(IoContext& ioContext, kj::Own sink, + kj::Maybe> observer, kj::Maybe maybeHighWaterMark = kj::none, kj::Maybe> maybeClosureWaitable = kj::none); diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index e146af2c0fa..35ee29639f8 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -480,7 +480,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler // immediately if any of these happen: // - The JS promise is GC'd without resolving. // - The JS promise is resolved from the wrong context. - // - The system detects that no further process will be made in this context (because there is no + // - The system detects that no further progress will be made in this context (because there is no // more JavaScript to run, and there is no outstanding I/O scheduled with awaitIo()). // // If `T` is `IoOwn`, it will be unwrapped to just `U` in the result. If `U` is in turn diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index e1eed46e03c..b4043921341 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -24,12 +24,30 @@ class TimerChannel; class WebSocketObserver: public kj::Refcounted { public: + virtual ~WebSocketObserver() noexcept(false) = default; // Called when a worker sends a message on this WebSocket (includes close messages). virtual void sentMessage(size_t bytes) {}; // Called when a worker receives a message on this WebSocket (includes close messages). virtual void receivedMessage(size_t bytes) {}; }; +// Observes a byte stream. Byte streams which use instances of this observer should call enqueue() +// and dequeue() once for each chunk that passes through the stream. The order of enqueues should +// match the order of dequeues. +// +// Byte observer implementations can then calculate the current number of chunks and the sum of the +// size of the chunks in the internal queue by incrementing and decrementing each metric in +// enqueue() and dequeue() respectively. +class ByteStreamObserver { +public: + virtual ~ByteStreamObserver() noexcept(false) = default; + // Called when a chunk of size `bytes` is enqueued on the stream. + virtual void onChunkEnqueued(size_t bytes) {}; + // Called when a chunk of size `bytes` is dequeued from the stream (e.g. when a writable byte + // stream writes the chunk to its corresponding sink). + virtual void onChunkDequeued(size_t bytes) {}; +}; + // Observes a specific request to a specific worker. Also observes outgoing subrequests. // // Observing anything is optional. Default implementations of all methods observe nothing. @@ -45,6 +63,12 @@ class RequestObserver: public kj::Refcounted { return kj::none; }; + // This is called when a writable byte stream is created whilst processing this request. It will + // be destroyed when the corresponding byte stream is destroyed. + virtual kj::Maybe> tryCreateWritableByteStreamObserver() { + return kj::none; + } + // Invoked when the request is actually delivered. // // If, for some reason, this is not invoked before the object is destroyed, this indicate that