From 1dae2d014fc16613ba2e73b90541d8b457a63345 Mon Sep 17 00:00:00 2001 From: Dan Lapid Date: Wed, 9 Oct 2024 18:01:08 +0100 Subject: [PATCH] Merge pull request #2863 from cloudflare/dlapid/prewarm_promise Change WorkerInterface so prewarm returns a promise. --- src/workerd/api/hibernatable-web-socket.c++ | 8 +--- src/workerd/api/hibernatable-web-socket.h | 8 +--- src/workerd/api/queue.c++ | 1 - src/workerd/api/queue.h | 1 - src/workerd/api/trace.c++ | 1 - src/workerd/api/trace.h | 4 +- src/workerd/api/worker-rpc.c++ | 1 - src/workerd/api/worker-rpc.h | 1 - src/workerd/io/hibernation-manager.c++ | 4 +- src/workerd/io/worker-entrypoint.c++ | 5 ++- src/workerd/io/worker-interface.c++ | 44 ++++++++------------- src/workerd/io/worker-interface.h | 12 ++---- src/workerd/io/worker.c++ | 6 +-- src/workerd/server/server.c++ | 22 +++++++---- 14 files changed, 46 insertions(+), 72 deletions(-) diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index cd1037277e8..ecb83fe7fe5 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -144,7 +144,6 @@ kj::Promise HibernatableWebSocketCustomEve kj::Promise HibernatableWebSocketCustomEventImpl::sendRpc( capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) { auto req = dispatcher.castAs() .hibernatableWebSocketEventRequest(); @@ -194,15 +193,12 @@ HibernatableWebSocketEvent::ItemsForRelease::ItemsForRelease( tags(kj::mv(tags)) {} HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl(uint16_t typeId, - kj::TaskSet& waitUntilTasks, kj::Own params, kj::Maybe manager) : typeId(typeId), params(kj::mv(params)) {} -HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl(uint16_t typeId, - kj::TaskSet& waitUntilTasks, - HibernatableSocketParams params, - Worker::Actor::HibernationManager& manager) +HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl( + uint16_t typeId, HibernatableSocketParams params, Worker::Actor::HibernationManager& manager) : typeId(typeId), params(kj::mv(params)), manager(manager) {} diff --git a/src/workerd/api/hibernatable-web-socket.h b/src/workerd/api/hibernatable-web-socket.h index bf0c8dc669b..a3c8b695254 100644 --- a/src/workerd/api/hibernatable-web-socket.h +++ b/src/workerd/api/hibernatable-web-socket.h @@ -59,13 +59,10 @@ class HibernatableWebSocketCustomEventImpl final: public WorkerInterface::Custom public kj::Refcounted { public: HibernatableWebSocketCustomEventImpl(uint16_t typeId, - kj::TaskSet& waitUntilTasks, kj::Own params, kj::Maybe manager = kj::none); - HibernatableWebSocketCustomEventImpl(uint16_t typeId, - kj::TaskSet& waitUntilTasks, - HibernatableSocketParams params, - Worker::Actor::HibernationManager& manager); + HibernatableWebSocketCustomEventImpl( + uint16_t typeId, HibernatableSocketParams params, Worker::Actor::HibernationManager& manager); kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, @@ -73,7 +70,6 @@ class HibernatableWebSocketCustomEventImpl final: public WorkerInterface::Custom kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) override; uint16_t getType() override { diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 742f996b732..c0cf06a3e43 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -597,7 +597,6 @@ kj::Promise QueueCustomEventImpl::run( kj::Promise QueueCustomEventImpl::sendRpc( capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) { auto req = dispatcher.castAs().queueRequest(); KJ_SWITCH_ONEOF(params) { diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index 59a78aa0157..a60ab2113c7 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -345,7 +345,6 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) override; static const uint16_t EVENT_TYPE = 5; diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index d25d680a91e..57459bc8ab7 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -646,7 +646,6 @@ auto TraceCustomEventImpl::run(kj::Own incomingReque auto TraceCustomEventImpl::sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, workerd::rpc::EventDispatcher::Client dispatcher) -> kj::Promise { auto req = dispatcher.sendTracesRequest(); auto out = req.initTraces(traces.size()); diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index a9087e8c1dd..0bee60307ba 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -604,8 +604,7 @@ class UnsafeTraceMetrics final: public jsg::Object { class TraceCustomEventImpl final: public WorkerInterface::CustomEvent { public: - TraceCustomEventImpl( - uint16_t typeId, kj::TaskSet& waitUntilTasks, kj::Array> traces) + TraceCustomEventImpl(uint16_t typeId, kj::Array> traces) : typeId(typeId), traces(kj::mv(traces)) {} @@ -615,7 +614,6 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent { kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) override; uint16_t getType() override { diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 66dd151b405..863e23fca54 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1745,7 +1745,6 @@ kj::Promise JsRpcSessionCustomEventImpl::r kj::Promise JsRpcSessionCustomEventImpl::sendRpc( capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) { // We arrange to revoke all capabilities in this session as soon as `sendRpc()` completes or is // canceled. Normally, the server side doesn't return if any capabilities still exist, so this diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index c65e0156dc2..03288cc50cb 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -420,7 +420,6 @@ class JsRpcSessionCustomEventImpl final: public WorkerInterface::CustomEvent { kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) override; uint16_t getType() override { diff --git a/src/workerd/io/hibernation-manager.c++ b/src/workerd/io/hibernation-manager.c++ index 5bc7753d822..74d3bd5b81b 100644 --- a/src/workerd/io/hibernation-manager.c++ +++ b/src/workerd/io/hibernation-manager.c++ @@ -261,7 +261,7 @@ kj::Promise HibernationManagerImpl::handleSocketTermination( auto workerInterface = loopback->getWorker(IoChannelFactory::SubrequestMetadata{}); event = workerInterface ->customEvent(kj::heap( - hibernationEventType, readLoopTasks, kj::mv(KJ_REQUIRE_NONNULL(params)), *this)) + hibernationEventType, kj::mv(KJ_REQUIRE_NONNULL(params)), *this)) .ignoreResult() .attach(kj::mv(workerInterface)); } @@ -366,7 +366,7 @@ kj::Promise HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) { // Dispatch the event. auto workerInterface = loopback->getWorker(IoChannelFactory::SubrequestMetadata{}); co_await workerInterface->customEvent(kj::heap( - hibernationEventType, readLoopTasks, kj::mv(params), *this)); + hibernationEventType, kj::mv(params), *this)); if (isClose) { co_return; } diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index b12794b11f3..9b3bbc93fc0 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -65,7 +65,7 @@ public: kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise test() override; @@ -447,7 +447,7 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); } -void WorkerEntrypoint::prewarm(kj::StringPtr url) { +kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { // Nothing to do, the worker is already loaded. TRACE_EVENT("workerd", "WorkerEntrypoint::prewarm()", "url", url.cStr()); auto incomingRequest = @@ -459,6 +459,7 @@ void WorkerEntrypoint::prewarm(kj::StringPtr url) { // TODO(someday): Ideally, middleware workers would forward prewarm() to the next stage. At // present we don't have a good way to decide what stage that is, especially given that we'll // be switching to `next` being a binding in the future. + return kj::READY_NOW; } kj::Promise WorkerEntrypoint::runScheduled( diff --git a/src/workerd/io/worker-interface.c++ b/src/workerd/io/worker-interface.c++ index 8c8a6fde4e6..62ea5bc6c81 100644 --- a/src/workerd/io/worker-interface.c++ +++ b/src/workerd/io/worker-interface.c++ @@ -16,10 +16,8 @@ namespace { // interface the promise resolved to. class PromisedWorkerInterface final: public kj::Refcounted, public WorkerInterface { public: - PromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise) - : waitUntilTasks(waitUntilTasks), - promise(promise.then([this](kj::Own result) { worker = kj::mv(result); }) + PromisedWorkerInterface(kj::Promise> promise) + : promise(promise.then([this](kj::Own result) { worker = kj::mv(result); }) .fork()) {} kj::Promise request(kj::HttpMethod method, @@ -49,18 +47,12 @@ public: } } - void prewarm(kj::StringPtr url) override { + kj::Promise prewarm(kj::StringPtr url) override { KJ_IF_SOME(w, worker) { - w.get()->prewarm(url); + co_return co_await w.get()->prewarm(url); } else { - static auto constexpr handlePrewarm = - [](kj::Promise promise, kj::String url, - kj::Own self) -> kj::Promise { - co_await promise; - KJ_ASSERT_NONNULL(self->worker)->prewarm(url); - }; - - waitUntilTasks.add(handlePrewarm(promise.addBranch(), kj::str(url), kj::addRef(*this))); + co_await promise; + co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url); } } @@ -92,15 +84,13 @@ public: } private: - kj::TaskSet& waitUntilTasks; kj::ForkedPromise promise; kj::Maybe> worker; }; } // namespace -kj::Own newPromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise) { - return kj::refcounted(waitUntilTasks, kj::mv(promise)); +kj::Own newPromisedWorkerInterface(kj::Promise> promise) { + return kj::refcounted(kj::mv(promise)); } kj::Own asHttpClient(kj::Own workerInterface) { @@ -238,7 +228,7 @@ public: kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; @@ -273,8 +263,8 @@ RevocableWebSocketWorkerInterface::RevocableWebSocketWorkerInterface( : worker(worker), revokeProm(revokeProm.fork()) {} -void RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) { - worker.prewarm(url); +kj::Promise RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) { + return worker.prewarm(url); } kj::Promise RevocableWebSocketWorkerInterface::runScheduled( @@ -324,8 +314,9 @@ public: kj::throwFatalException(kj::mv(exception)); } - void prewarm(kj::StringPtr url) override { + kj::Promise prewarm(kj::StringPtr url) override { // ignore + return kj::READY_NOW; } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { @@ -354,11 +345,9 @@ kj::Own WorkerInterface::fromException(kj::Exception&& e) { RpcWorkerInterface::RpcWorkerInterface(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) : httpOverCapnpFactory(httpOverCapnpFactory), byteStreamFactory(byteStreamFactory), - waitUntilTasks(waitUntilTasks), dispatcher(kj::mv(dispatcher)) {} kj::Promise RpcWorkerInterface::request(kj::HttpMethod method, @@ -381,10 +370,10 @@ kj::Promise RpcWorkerInterface::connect(kj::StringPtr host, return promise.attach(kj::mv(inner)); } -void RpcWorkerInterface::prewarm(kj::StringPtr url) { +kj::Promise RpcWorkerInterface::prewarm(kj::StringPtr url) { auto req = dispatcher.prewarmRequest(capnp::MessageSize{url.size() / sizeof(capnp::word) + 4, 0}); req.setUrl(url); - waitUntilTasks.add(req.send().ignoreResult()); + return req.send().ignoreResult(); } kj::Promise RpcWorkerInterface::runScheduled( @@ -414,8 +403,7 @@ kj::Promise RpcWorkerInterface::runAlarm( kj::Promise RpcWorkerInterface::customEvent( kj::Own event) { - return event->sendRpc(httpOverCapnpFactory, byteStreamFactory, waitUntilTasks, dispatcher) - .attach(kj::mv(event)); + return event->sendRpc(httpOverCapnpFactory, byteStreamFactory, dispatcher).attach(kj::mv(event)); } // ====================================================================================== diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index ff7ca2db705..5fea0a96710 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -46,7 +46,7 @@ class WorkerInterface: public kj::HttpService { // to be invoked. // // If prewarm() has to do anything asynchronous, it should use "waitUntil" tasks. - virtual void prewarm(kj::StringPtr url) = 0; + virtual kj::Promise prewarm(kj::StringPtr url) = 0; struct ScheduledResult { bool retry = true; @@ -116,7 +116,6 @@ class WorkerInterface: public kj::HttpService { // Forward the event over RPC. virtual kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) = 0; // Get the type for this event for logging / metrics purposes. This is intended for use by the @@ -147,10 +146,7 @@ class WorkerInterface: public kj::HttpService { // Given a Promise for a WorkerInterface, return a WorkerInterface whose methods will first wait // for the promise, then invoke the destination object. -kj::Own newPromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise); -// TODO(cleanup): `waitUntilTasks` is only needed to handle `prewarm` since they -// don't return promises. We should maybe change them to return promises? +kj::Own newPromisedWorkerInterface(kj::Promise> promise); // Adapts WorkerInterface to HttpClient, including taking ownership. // @@ -169,7 +165,6 @@ class RpcWorkerInterface: public WorkerInterface { public: RpcWorkerInterface(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory, - kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher); kj::Promise request(kj::HttpMethod method, @@ -184,7 +179,7 @@ class RpcWorkerInterface: public WorkerInterface { ConnectResponse& tunnel, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; @@ -192,7 +187,6 @@ class RpcWorkerInterface: public WorkerInterface { private: capnp::HttpOverCapnpFactory& httpOverCapnpFactory; capnp::ByteStreamFactory& byteStreamFactory; - kj::TaskSet& waitUntilTasks; rpc::EventDispatcher::Client dispatcher; }; diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index a676b5d68f5..523044db912 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3968,7 +3968,7 @@ public: kj::AsyncIoStream& connection, kj::HttpService::ConnectResponse& tunnel, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; @@ -4189,8 +4189,8 @@ kj::Promise Worker::Isolate::SubrequestClient::connect(kj::StringPtr host, } // TODO(someday): Log other kinds of subrequests? -void Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) { - inner->prewarm(url); +kj::Promise Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) { + return inner->prewarm(url); } kj::Promise Worker::Isolate::SubrequestClient::runScheduled( kj::Date scheduledTime, kj::StringPtr cron) { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index e313d4b9705..a4cdcd10042 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -550,7 +550,9 @@ private: co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(io_stream)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override { + return kj::READY_NOW; + } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -687,7 +689,9 @@ private: return parent.serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override { + return kj::READY_NOW; + } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -701,8 +705,7 @@ private: auto dispatcher = bootstrap.startEventRequest(capnp::MessageSize{4, 0}).send().getDispatcher(); return event - ->sendRpc(parent.httpOverCapnpFactory, parent.byteStreamFactory, parent.waitUntilTasks, - kj::mv(dispatcher)) + ->sendRpc(parent.httpOverCapnpFactory, parent.byteStreamFactory, kj::mv(dispatcher)) .attach(kj::mv(event)); } @@ -862,7 +865,9 @@ private: return serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override { + return kj::READY_NOW; + } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -1138,7 +1143,9 @@ private: kj::HttpConnectSettings settings) override { throwUnsupported(); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override { + return kj::READY_NOW; + } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -1573,8 +1580,7 @@ public: kj::Own getActor( kj::String id, IoChannelFactory::SubrequestMetadata metadata) { - return newPromisedWorkerInterface( - service.waitUntilTasks, getActorThenStartRequest(kj::mv(id), kj::mv(metadata))); + return newPromisedWorkerInterface(getActorThenStartRequest(kj::mv(id), kj::mv(metadata))); } kj::Own getActorChannel(Worker::Actor::Id id) {