-
Notifications
You must be signed in to change notification settings - Fork 338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ByteStreamObserver to enable monitoring of byte stream queue size and memory usage. #3069
Conversation
a98e50d
to
15a26ea
Compare
@@ -64,7 +64,7 @@ jsg::Ref<TransformStream> TransformStream::constructor(jsg::Lock& js, | |||
.pull = maybeAddFunctor<UnderlyingSource::PullAlgorithm>( | |||
JSG_VISITABLE_LAMBDA((controller = controller.addRef()), (controller), | |||
(jsg::Lock & js, auto c) mutable { return controller->pull(js); })), | |||
.cancel = maybeAddFunctor<UnderlyingSource::CancelAlgorithm>(JSG_VISITABLE_LAMBDA( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
odd that the linter changed this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but I haven't checked if there are more streams we might want to attach this observer to.
// be destroyed when the corresponding byte stream is destroyed. | ||
virtual kj::Maybe<kj::Own<ByteStreamObserver>> tryCreateWritableByteStreamObserver() { | ||
return kj::none; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a bit easier to just have this return a kj::Own<ByteStreamObserver>
that wraps a non-op singleton instance so that we can skip the KJ_IF_SOME
block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. It'd make sense to do this for both ByteStreamObserver
and WebSocketObserver
so I'll do both in a follow up PR since changing tryCreateWebSocketObserver
will be a breaking API change.
@@ -933,6 +933,9 @@ jsg::Promise<void> WritableStreamInternalController::write( | |||
|
|||
auto prp = js.newPromiseAndResolver<void>(); | |||
increaseCurrentWriteBufferSize(js, byteLength); | |||
KJ_IF_SOME(o, observer) { | |||
o->onWrite(byteLength); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An RAII approach the the API could work here also. Something like...
auto observedWrite = o->observeWrite(byteLength);
// .. then later when the chunk is consumed...
kj::mv(observedWrite);
This would have the advantage of updating the accounting even if someone forgets to call onRead.
Not blocking tho
Nit: Some detail of what this is meant to be used for in the PR description would be helpful for context. |
15a26ea
to
40cc736
Compare
and memory usage.
40cc736
to
3178392
Compare
No description provided.