From 42f2fd3de9d4a6ff9adbee7e30702e0b49a8368d Mon Sep 17 00:00:00 2001 From: Jon Phillips Date: Mon, 8 Jul 2024 17:37:29 +0100 Subject: [PATCH] Add WebSocketObserver to allow observation of traffic on websocket connections terminating within workers. --- src/workerd/api/http.c++ | 2 +- src/workerd/api/web-socket.c++ | 24 +++++++++++++++++++----- src/workerd/api/web-socket.h | 9 ++++++--- src/workerd/io/observer.h | 13 +++++++++++++ 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/workerd/api/http.c++ b/src/workerd/api/http.c++ index 12077ac976f..ec82b28933d 100644 --- a/src/workerd/api/http.c++ +++ b/src/workerd/api/http.c++ @@ -1549,7 +1549,7 @@ kj::Promise> Response::send( } auto clientSocket = outer.acceptWebSocket(outHeaders); - auto wsPromise = ws->couple(kj::mv(clientSocket)); + auto wsPromise = ws->couple(kj::mv(clientSocket), context.getMetrics()); KJ_IF_SOME(a, context.getActor()) { KJ_IF_SOME(hib, a.getHibernationManager()) { diff --git a/src/workerd/api/web-socket.c++ b/src/workerd/api/web-socket.c++ index 59993886ec3..071c181607a 100644 --- a/src/workerd/api/web-socket.c++ +++ b/src/workerd/api/web-socket.c++ @@ -302,7 +302,7 @@ jsg::Ref WebSocket::constructor( return ws; } -kj::Promise> WebSocket::couple(kj::Own other) { +kj::Promise> WebSocket::couple(kj::Own other, RequestObserver& request) { auto& native = *farNative; JSG_REQUIRE(!native.state.is(), TypeError, "Can't return WebSocket in a Response if it was created with `new WebSocket()`"); @@ -341,10 +341,16 @@ kj::Promise> WebSocket::couple(kj::Own other) } return false; }; - if (tryGetPeer() != kj::none) { + KJ_IF_SOME(p, tryGetPeer()) { // We're terminating the WebSocket in this worker, so the upstream promise (which pumps // messages from the client to this worker) counts as something the request is waiting for. upstream = upstream.attach(context.registerPendingEvent()); + + // We can observe websocket traffic in both directions by attaching an observer to the peer + // websocket which terminates in the worker. + KJ_IF_SOME(observer, request.coupledToLocalWebSocket()) { + p.observer = kj::mv(observer); + } } // We need to use `eagerlyEvaluate()` on both inputs to `joinPromises` to work around the awkward @@ -745,7 +751,7 @@ void WebSocket::ensurePumping(jsg::Lock& js) { auto& accepted = KJ_ASSERT_NONNULL(native.state.tryGet()); auto promise = kj::evalNow([&]() { return accepted.canceler.wrap(pump(context, *outgoingMessages, - *accepted.ws, native, autoResponseStatus)); + *accepted.ws, native, autoResponseStatus, observer)); }); // TODO(cleanup): We use awaitIoLegacy() here because we don't want this to count as a pending @@ -840,7 +846,7 @@ size_t countBytesFromMessage(const kj::WebSocket::Message& message) { kj::Promise WebSocket::pump( IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws, Native& native, - AutoResponse& autoResponse) { + AutoResponse& autoResponse, kj::Maybe>& observer) { KJ_ASSERT(!native.isPumping); native.isPumping = true; autoResponse.isPumping = true; @@ -899,6 +905,10 @@ kj::Promise WebSocket::pump( } } + KJ_IF_SOME(o, observer) { + o->sentMessage(size); + } + KJ_IF_SOME(a, context.getActor()) { a.getMetrics().sentWebSocketMessage(size); } @@ -941,9 +951,13 @@ kj::Promise> WebSocket::readLoop( while (true) { auto message = co_await ws.receive(); + auto size = countBytesFromMessage(message); + KJ_IF_SOME(o, observer) { + o->receivedMessage(size); + } + context.getLimitEnforcer().topUpActor(); KJ_IF_SOME(a, context.getActor()) { - auto size = countBytesFromMessage(message); a.getMetrics().receivedWebSocketMessage(size); } diff --git a/src/workerd/api/web-socket.h b/src/workerd/api/web-socket.h index 505c6edf4ae..358930cc850 100644 --- a/src/workerd/api/web-socket.h +++ b/src/workerd/api/web-socket.h @@ -254,14 +254,15 @@ class WebSocket: public EventTarget { void initConnection(jsg::Lock& js, kj::Promise); // Pumps messages from this WebSocket to `other`, and from `other` to this, making sure to - // register pending events as appropriate. Used to implement FetchEvent.respondWith(). + // register pending events as appropriate. Used to connect a websocket to a client via an HTTP + // response. // // Only one of this or accept() is allowed to be invoked. // // As an exception to the usual KJ convention, it is not necessary for the JavaScript `WebSocket` // object to be kept live while waiting for the promise returned by couple() to complete. Instead, // the promise takes direct ownership of the underlying KJ-native WebSocket (as well as `other`). - kj::Promise> couple(kj::Own other); + kj::Promise> couple(kj::Own other, RequestObserver& request); // Extract the kj::WebSocket from this api::WebSocket (if applicable). The kj::WebSocket will be // owned elsewhere, but the api::WebSocket will retain a reference. @@ -585,6 +586,8 @@ class WebSocket: public EventTarget { AutoResponse autoResponseStatus; + kj::Maybe> observer; + // Contains a websocket and possibly some data from the WebSocketResponse headers. struct PackedWebSocket { kj::Own ws; @@ -620,7 +623,7 @@ class WebSocket: public EventTarget { // owned by the `IoContext` so it'll be canceled if the `IoContext` is destroyed. static kj::Promise pump( IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws, Native& native, - AutoResponse& autoResponse); + AutoResponse& autoResponse, kj::Maybe>& observer); kj::Promise> readLoop(kj::Maybe> cs); diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index 5951e553e44..9525d2772fd 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -21,11 +21,24 @@ class WorkerInterface; class LimitEnforcer; class TimerChannel; +class WebSocketObserver: public kj::Refcounted { +public: + // Called when a worker sends a message on this WebSocket (includes close messages). + virtual void sentMessage(size_t bytes) { }; + // Called when a worker receives a message is on this WebSocket (includes close messages). + virtual void receivedMessage(size_t bytes) { }; +}; + // Observes a specific request to a specific worker. Also observes outgoing subrequests. // // Observing anything is optional. Default implementations of all methods observe nothing. class RequestObserver: public kj::Refcounted { public: + // Called when a WebSocket terminating at a worker is coupled to the response. A WebSocketObserver + // may optionally be returned to observe messages being sent and received on the resulting + // websocket connection by the worker. + virtual kj::Maybe> coupledToLocalWebSocket() { return kj::none; }; + // Invoked when the request is actually delivered. // // If, for some reason, this is not invoked before the object is destroyed, this indicate that