Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ByteStreamObserver to enable monitoring of byte stream queue size and memory usage. #3069

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ jsg::Ref<Socket> setupSocket(jsg::Lock& js,
auto openedPrPair = js.newPromiseAndResolver<SocketInfo>();
openedPrPair.promise.markAsHandled(js);
auto writable = jsg::alloc<WritableStream>(ioContext, kj::mv(sysStreams.writable),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(),
getWritableHighWaterMark(options), openedPrPair.promise.whenResolved(js));

auto result = jsg::alloc<Socket>(js, ioContext, kj::mv(refcountedConnection),
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ class WritableStreamController {

kj::Own<WritableStreamController> newWritableStreamJsController();
kj::Own<WritableStreamController> newWritableStreamInternalController(IoContext& ioContext,
kj::Own<WritableStreamSink> source,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none);

Expand Down
6 changes: 4 additions & 2 deletions src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ jsg::Ref<CompressionStream> CompressionStream::constructor(kj::String format) {
auto& ioContext = IoContext::current();

return jsg::alloc<CompressionStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide),
ioContext.getMetrics().tryCreateWritableByteStreamObserver()));
}

jsg::Ref<DecompressionStream> DecompressionStream::constructor(jsg::Lock& js, kj::String format) {
Expand All @@ -521,7 +522,8 @@ jsg::Ref<DecompressionStream> DecompressionStream::constructor(jsg::Lock& js, kj

return jsg::alloc<DecompressionStream>(
jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide),
ioContext.getMetrics().tryCreateWritableByteStreamObserver()));
}

} // namespace workerd::api
82 changes: 81 additions & 1 deletion src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
// allowed to be queued.

jsg::Ref<ReadableStream> source = ReadableStream::constructor(env.js, kj::none, kj::none);
jsg::Ref<WritableStream> sink = jsg::alloc<WritableStream>(env.context, kj::heap<MySink>());
jp4a50 marked this conversation as resolved.
Show resolved Hide resolved
jsg::Ref<WritableStream> sink =
jsg::alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::none);

auto pipeTo = source->pipeTo(env.js, sink.addRef(), PipeToOptions{.preventClose = true});

Expand Down Expand Up @@ -272,5 +273,84 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
});
}

KJ_TEST("WritableStreamInternalController observability") {

capnp::MallocMessageBuilder message;
auto flags = message.initRoot<CompatibilityFlags>();
flags.setNodeJsCompat(true);
flags.setWorkerdExperimental(true);
flags.setStreamsJavaScriptControllers(true);

TestFixture::SetupParams params;
TestFixture fixture({.featureFlags = flags.asReader()});

class MySink final: public WritableStreamSink {
public:
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
++writeCount;
return kj::READY_NOW;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
kj::Promise<void> end() override {
return kj::READY_NOW;
}
void abort(kj::Exception reason) override {}
uint getWriteCount() {
return writeCount;
}

private:
uint writeCount = 0;
};

class MyObserver final: public ByteStreamObserver {
public:
virtual void onChunkEnqueued(size_t bytes) {
++queueSize;
queueSizeBytes += bytes;
};
virtual void onChunkDequeued(size_t bytes) {
queueSizeBytes -= bytes;
--queueSize;
};
uint64_t queueSize = 0;
uint64_t queueSizeBytes = 0;
};

auto myObserver = kj::heap<MyObserver>();
auto& observer = *myObserver;
kj::Maybe<jsg::Ref<WritableStream>> stream;
fixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise<void> {
stream = jsg::alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::mv(myObserver));

auto write = [&](size_t size) {
auto buffersource = env.js.bytes(kj::heapArray<kj::byte>(size));
return env.context.awaitJs(env.js,
KJ_ASSERT_NONNULL(stream)->getController().write(env.js, buffersource.getHandle(env.js)));
};

KJ_ASSERT(observer.queueSize == 0);
KJ_ASSERT(observer.queueSizeBytes == 0);

auto builder = kj::heapArrayBuilder<kj::Promise<void>>(2);
builder.add(write(1));

KJ_ASSERT(observer.queueSize == 1);
KJ_ASSERT(observer.queueSizeBytes == 1);

builder.add(write(10));

KJ_ASSERT(observer.queueSize == 2);
KJ_ASSERT(observer.queueSizeBytes == 11);

return kj::joinPromises(builder.finish());
});

KJ_ASSERT(observer.queueSize == 0);
KJ_ASSERT(observer.queueSizeBytes == 0);
}

} // namespace
} // namespace workerd::api
12 changes: 11 additions & 1 deletion src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,9 @@ jsg::Promise<void> WritableStreamInternalController::write(

auto prp = js.newPromiseAndResolver<void>();
increaseCurrentWriteBufferSize(js, byteLength);
KJ_IF_SOME(o, observer) {
o->onChunkEnqueued(byteLength);
}
Copy link
Member

@jasnell jasnell Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An RAII approach the the API could work here also. Something like...

auto observedWrite = o->observeWrite(byteLength);
// .. then later when the chunk is consumed...
kj::mv(observedWrite); 

This would have the advantage of updating the accounting even if someone forgets to call onRead.

Not blocking tho

auto ptr =
kj::ArrayPtr<kj::byte>(static_cast<kj::byte*>(store->Data()) + byteOffset, byteLength);
queue.push_back(
Expand Down Expand Up @@ -1598,6 +1601,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto& request = check();
maybeResolvePromise(js, request.promise);
decreaseCurrentWriteBufferSize(js, amountToWrite);
KJ_IF_SOME(o, observer) {
o->onChunkDequeued(amountToWrite);
}
queue.pop_front();
maybeAbort(js, request);
return writeLoop(js, IoContext::current());
Expand All @@ -1610,6 +1616,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto& request = check();
auto& writable = state.get<IoOwn<Writable>>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
KJ_IF_SOME(o, observer) {
o->onChunkDequeued(amountToWrite);
}
maybeRejectPromise<void>(js, request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
Expand Down Expand Up @@ -2468,10 +2477,11 @@ kj::Own<ReadableStreamController> newReadableStreamInternalController(

kj::Own<WritableStreamController> newWritableStreamInternalController(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable) {
return kj::heap<WritableStreamInternalController>(
kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable));
kj::mv(sink), kj::mv(observer), maybeHighWaterMark, kj::mv(maybeClosureWaitable));
}

kj::StringPtr WritableStreamInternalController::jsgGetMemoryName() const {
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "writable.h"

#include <workerd/io/io-context.h>
#include <workerd/io/observer.h>

#include <deque>

Expand Down Expand Up @@ -183,9 +184,11 @@ class WritableStreamInternalController: public WritableStreamController {
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(kj::Own<WritableStreamSink> writable,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none)
: state(IoContext::current().addObject(kj::heap<Writable>(kj::mv(writable)))),
observer(kj::mv(observer)),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {}

Expand Down Expand Up @@ -279,6 +282,8 @@ class WritableStreamInternalController: public WritableStreamController {
kj::OneOf<StreamStates::Closed, StreamStates::Errored, IoOwn<Writable>> state;
kj::OneOf<Unlocked, Locked, PipeLocked, WriterLocked> writeState = Unlocked();

kj::Maybe<kj::Own<ByteStreamObserver>> observer;

kj::Maybe<PendingAbort> maybePendingAbort;

uint64_t currentWriteBufferSize = 0;
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/api/streams/transform.c++
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jsg::Ref<TransformStream> TransformStream::constructor(jsg::Lock& js,
.pull = maybeAddFunctor<UnderlyingSource::PullAlgorithm>(
JSG_VISITABLE_LAMBDA((controller = controller.addRef()), (controller),
(jsg::Lock & js, auto c) mutable { return controller->pull(js); })),
.cancel = maybeAddFunctor<UnderlyingSource::CancelAlgorithm>(JSG_VISITABLE_LAMBDA(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

odd that the linter changed this line

.cancel = maybeAddFunctor<UnderlyingSource::CancelAlgorithm>( JSG_VISITABLE_LAMBDA(
(controller = controller.addRef()), (controller),
(jsg::Lock & js, auto reason) mutable { return controller->cancel(js, reason); })),
.expectedLength = transformer.expectedLength.map(
Expand Down Expand Up @@ -123,9 +123,9 @@ jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(
KJ_IF_SOME(queuingStrategy, maybeQueuingStrategy) {
maybeHighWaterMark = queuingStrategy.highWaterMark;
}

return jsg::alloc<IdentityTransformStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(pipe.in)),
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out), maybeHighWaterMark));
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark));
}

jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
Expand All @@ -147,7 +147,8 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
}

return jsg::alloc<FixedLengthStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(pipe.in)),
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out), maybeHighWaterMark));
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark));
}

OneWayPipe newIdentityPipe(kj::Maybe<uint64_t> expectedLength) {
Expand Down
11 changes: 8 additions & 3 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,14 @@ void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) {

WritableStream::WritableStream(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> maybeObserver,
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable)
: WritableStream(newWritableStreamInternalController(
ioContext, kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable))) {}
: WritableStream(newWritableStreamInternalController(ioContext,
kj::mv(sink),
kj::mv(maybeObserver),
maybeHighWaterMark,
kj::mv(maybeClosureWaitable))) {}

WritableStream::WritableStream(kj::Own<WritableStreamController> controller)
: ioContext(tryGetIoContext()),
Expand Down Expand Up @@ -611,7 +615,8 @@ jsg::Ref<WritableStream> WritableStream::deserialize(
auto stream = ioctx.getByteStreamFactory().capnpToKjExplicitEnd(ws.getByteStream());
auto sink = newSystemStream(kj::mv(stream), encoding, ioctx);

return jsg::alloc<WritableStream>(ioctx, kj::mv(sink));
return jsg::alloc<WritableStream>(
ioctx, kj::mv(sink), ioctx.getMetrics().tryCreateWritableByteStreamObserver());
}

void WritableStreamDefaultWriter::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class WritableStream: public jsg::Object {
public:
explicit WritableStream(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none);

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
// immediately if any of these happen:
// - The JS promise is GC'd without resolving.
// - The JS promise is resolved from the wrong context.
// - The system detects that no further process will be made in this context (because there is no
// - The system detects that no further progress will be made in this context (because there is no
// more JavaScript to run, and there is no outstanding I/O scheduled with awaitIo()).
//
// If `T` is `IoOwn<U>`, it will be unwrapped to just `U` in the result. If `U` is in turn
Expand Down
24 changes: 24 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,30 @@ class TimerChannel;

class WebSocketObserver: public kj::Refcounted {
public:
virtual ~WebSocketObserver() noexcept(false) = default;
// Called when a worker sends a message on this WebSocket (includes close messages).
virtual void sentMessage(size_t bytes) {};
// Called when a worker receives a message on this WebSocket (includes close messages).
virtual void receivedMessage(size_t bytes) {};
};

// Observes a byte stream. Byte streams which use instances of this observer should call enqueue()
// and dequeue() once for each chunk that passes through the stream. The order of enqueues should
// match the order of dequeues.
//
// Byte observer implementations can then calculate the current number of chunks and the sum of the
// size of the chunks in the internal queue by incrementing and decrementing each metric in
// enqueue() and dequeue() respectively.
class ByteStreamObserver {
public:
virtual ~ByteStreamObserver() noexcept(false) = default;
// Called when a chunk of size `bytes` is enqueued on the stream.
virtual void onChunkEnqueued(size_t bytes) {};
// Called when a chunk of size `bytes` is dequeued from the stream (e.g. when a writable byte
// stream writes the chunk to its corresponding sink).
virtual void onChunkDequeued(size_t bytes) {};
};

// Observes a specific request to a specific worker. Also observes outgoing subrequests.
//
// Observing anything is optional. Default implementations of all methods observe nothing.
Expand All @@ -45,6 +63,12 @@ class RequestObserver: public kj::Refcounted {
return kj::none;
};

// This is called when a writable byte stream is created whilst processing this request. It will
// be destroyed when the corresponding byte stream is destroyed.
virtual kj::Maybe<kj::Own<ByteStreamObserver>> tryCreateWritableByteStreamObserver() {
return kj::none;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a bit easier to just have this return a kj::Own<ByteStreamObserver> that wraps a non-op singleton instance so that we can skip the KJ_IF_SOME block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. It'd make sense to do this for both ByteStreamObserver and WebSocketObserver so I'll do both in a follow up PR since changing tryCreateWebSocketObserver will be a breaking API change.


// Invoked when the request is actually delivered.
//
// If, for some reason, this is not invoked before the object is destroyed, this indicate that
Expand Down