Skip to content

Commit

Permalink
Refactor waitUntilTasks from CustomEvent::sendRpc. (#2865)
Browse files Browse the repository at this point in the history
Using waitUntilTasks in sendRpc is a footgun as you might depend on
a request to the rpc server that will happen complete after the request
has been responded to and the client closes the connection causing a
waitUntilTasks is not empty error. This PR and the internal PR fix any
remaining usages of waitUntilTasks and finally refactors waitUntilTasks
out of the code.
  • Loading branch information
danlapid authored Oct 9, 2024
1 parent bf09b12 commit d08964a
Show file tree
Hide file tree
Showing 12 changed files with 9 additions and 31 deletions.
8 changes: 2 additions & 6 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) {
auto req = dispatcher.castAs<rpc::HibernatableWebSocketEventDispatcher>()
.hibernatableWebSocketEventRequest();
Expand Down Expand Up @@ -194,15 +193,12 @@ HibernatableWebSocketEvent::ItemsForRelease::ItemsForRelease(
tags(kj::mv(tags)) {}

HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl(uint16_t typeId,
kj::TaskSet& waitUntilTasks,
kj::Own<HibernationReader> params,
kj::Maybe<Worker::Actor::HibernationManager&> manager)
: typeId(typeId),
params(kj::mv(params)) {}
HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl(uint16_t typeId,
kj::TaskSet& waitUntilTasks,
HibernatableSocketParams params,
Worker::Actor::HibernationManager& manager)
HibernatableWebSocketCustomEventImpl::HibernatableWebSocketCustomEventImpl(
uint16_t typeId, HibernatableSocketParams params, Worker::Actor::HibernationManager& manager)
: typeId(typeId),
params(kj::mv(params)),
manager(manager) {}
Expand Down
8 changes: 2 additions & 6 deletions src/workerd/api/hibernatable-web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,17 @@ class HibernatableWebSocketCustomEventImpl final: public WorkerInterface::Custom
public kj::Refcounted {
public:
HibernatableWebSocketCustomEventImpl(uint16_t typeId,
kj::TaskSet& waitUntilTasks,
kj::Own<HibernationReader> params,
kj::Maybe<Worker::Actor::HibernationManager&> manager = kj::none);
HibernatableWebSocketCustomEventImpl(uint16_t typeId,
kj::TaskSet& waitUntilTasks,
HibernatableSocketParams params,
Worker::Actor::HibernationManager& manager);
HibernatableWebSocketCustomEventImpl(
uint16_t typeId, HibernatableSocketParams params, Worker::Actor::HibernationManager& manager);

kj::Promise<Result> run(kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
kj::TaskSet& waitUntilTasks) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) override;

uint16_t getType() override {
Expand Down
1 change: 0 additions & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) {
auto req = dispatcher.castAs<rpc::EventDispatcher>().queueRequest();
KJ_SWITCH_ONEOF(params) {
Expand Down
1 change: 0 additions & 1 deletion src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) override;

static const uint16_t EVENT_TYPE = 5;
Expand Down
1 change: 0 additions & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ auto TraceCustomEventImpl::run(kj::Own<IoContext::IncomingRequest> incomingReque

auto TraceCustomEventImpl::sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
workerd::rpc::EventDispatcher::Client dispatcher) -> kj::Promise<Result> {
auto req = dispatcher.sendTracesRequest();
auto out = req.initTraces(traces.size());
Expand Down
4 changes: 1 addition & 3 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,7 @@ class UnsafeTraceMetrics final: public jsg::Object {

class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {
public:
TraceCustomEventImpl(
uint16_t typeId, kj::TaskSet& waitUntilTasks, kj::Array<kj::Own<Trace>> traces)
TraceCustomEventImpl(uint16_t typeId, kj::Array<kj::Own<Trace>> traces)
: typeId(typeId),
traces(kj::mv(traces)) {}

Expand All @@ -615,7 +614,6 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) override;

uint16_t getType() override {
Expand Down
1 change: 0 additions & 1 deletion src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,6 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) {
// We arrange to revoke all capabilities in this session as soon as `sendRpc()` completes or is
// canceled. Normally, the server side doesn't return if any capabilities still exist, so this
Expand Down
1 change: 0 additions & 1 deletion src/workerd/api/worker-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ class JsRpcSessionCustomEventImpl final: public WorkerInterface::CustomEvent {

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) override;

uint16_t getType() override {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ kj::Promise<void> HibernationManagerImpl::handleSocketTermination(
auto workerInterface = loopback->getWorker(IoChannelFactory::SubrequestMetadata{});
event = workerInterface
->customEvent(kj::heap<api::HibernatableWebSocketCustomEventImpl>(
hibernationEventType, readLoopTasks, kj::mv(KJ_REQUIRE_NONNULL(params)), *this))
hibernationEventType, kj::mv(KJ_REQUIRE_NONNULL(params)), *this))
.ignoreResult()
.attach(kj::mv(workerInterface));
}
Expand Down Expand Up @@ -366,7 +366,7 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
// Dispatch the event.
auto workerInterface = loopback->getWorker(IoChannelFactory::SubrequestMetadata{});
co_await workerInterface->customEvent(kj::heap<api::HibernatableWebSocketCustomEventImpl>(
hibernationEventType, readLoopTasks, kj::mv(params), *this));
hibernationEventType, kj::mv(params), *this));
if (isClose) {
co_return;
}
Expand Down
5 changes: 1 addition & 4 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,9 @@ kj::Own<WorkerInterface> WorkerInterface::fromException(kj::Exception&& e) {

RpcWorkerInterface::RpcWorkerInterface(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher)
: httpOverCapnpFactory(httpOverCapnpFactory),
byteStreamFactory(byteStreamFactory),
waitUntilTasks(waitUntilTasks),
dispatcher(kj::mv(dispatcher)) {}

kj::Promise<void> RpcWorkerInterface::request(kj::HttpMethod method,
Expand Down Expand Up @@ -405,8 +403,7 @@ kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(

kj::Promise<WorkerInterface::CustomEvent::Result> RpcWorkerInterface::customEvent(
kj::Own<CustomEvent> event) {
return event->sendRpc(httpOverCapnpFactory, byteStreamFactory, waitUntilTasks, dispatcher)
.attach(kj::mv(event));
return event->sendRpc(httpOverCapnpFactory, byteStreamFactory, dispatcher).attach(kj::mv(event));
}

// ======================================================================================
Expand Down
3 changes: 0 additions & 3 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class WorkerInterface: public kj::HttpService {
// Forward the event over RPC.
virtual kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) = 0;

// Get the type for this event for logging / metrics purposes. This is intended for use by the
Expand Down Expand Up @@ -166,7 +165,6 @@ class RpcWorkerInterface: public WorkerInterface {
public:
RpcWorkerInterface(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher);

kj::Promise<void> request(kj::HttpMethod method,
Expand All @@ -189,7 +187,6 @@ class RpcWorkerInterface: public WorkerInterface {
private:
capnp::HttpOverCapnpFactory& httpOverCapnpFactory;
capnp::ByteStreamFactory& byteStreamFactory;
kj::TaskSet& waitUntilTasks;
rpc::EventDispatcher::Client dispatcher;
};

Expand Down
3 changes: 1 addition & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,7 @@ private:
auto dispatcher =
bootstrap.startEventRequest(capnp::MessageSize{4, 0}).send().getDispatcher();
return event
->sendRpc(parent.httpOverCapnpFactory, parent.byteStreamFactory, parent.waitUntilTasks,
kj::mv(dispatcher))
->sendRpc(parent.httpOverCapnpFactory, parent.byteStreamFactory, kj::mv(dispatcher))
.attach(kj::mv(event));
}

Expand Down

0 comments on commit d08964a

Please sign in to comment.