Skip to content

Commit

Permalink
Fix jsg::Ref<WebSocketRequestResponsePair> dtor segfault in actor-state.
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Oct 17, 2023
1 parent 3114486 commit 6053b8e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 23 deletions.
8 changes: 4 additions & 4 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ void DurableObjectState::setWebSocketAutoResponse(
// If there's no request/response pair, we unset any current set auto response configuration.
KJ_IF_SOME(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
manager.unsetWebSocketAutoResponse();
manager.setWebSocketAutoResponse(kj::none, kj::none);
}
return;
}
Expand All @@ -902,15 +902,15 @@ void DurableObjectState::setWebSocketAutoResponse(
// If there's no hibernation manager created yet, we should create one and
// set its auto response.
}
KJ_REQUIRE_NONNULL(a.getHibernationManager()).setWebSocketAutoResponse(kj::mv(reqResp));
KJ_REQUIRE_NONNULL(a.getHibernationManager()).setWebSocketAutoResponse(
reqResp->getRequest(), reqResp->getResponse());
}

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> DurableObjectState::getWebSocketAutoResponse() {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
KJ_IF_SOME(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
auto r = manager.getWebSocketAutoResponse();
return r;
return manager.getWebSocketAutoResponse();
}
return kj::none;
}
Expand Down
41 changes: 27 additions & 14 deletions src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,26 @@ kj::Vector<jsg::Ref<api::WebSocket>> HibernationManagerImpl::getWebSockets(
}

void HibernationManagerImpl::setWebSocketAutoResponse(
jsg::Ref<api::WebSocketRequestResponsePair> reqResp) {
autoResponsePair = kj::mv(reqResp);
}

void HibernationManagerImpl::unsetWebSocketAutoResponse() {
autoResponsePair = kj::none;
kj::Maybe<kj::StringPtr> request, kj::Maybe<kj::StringPtr> response) {
KJ_IF_SOME(req, request) {
// If we have a request, we must also have a response. If response is kj::none, we'll throw.
autoResponsePair->request = kj::str(req);
autoResponsePair->response = kj::str(KJ_REQUIRE_NONNULL(response));
return;
}
// If we don't have a request, we must unset both request and response.
autoResponsePair->request = kj::none;
autoResponsePair->response = kj::none;
}

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> HibernationManagerImpl::getWebSocketAutoResponse() {
KJ_IF_SOME(ar, autoResponsePair) {
return ar.addRef();
} else {
return kj::none;
KJ_IF_SOME(req, autoResponsePair->request) {
// When getting the currently set auto-response pair, if we have a request we must have a response
// set. If not, we'll throw.
return api::WebSocketRequestResponsePair::constructor(kj::str(req),
kj::str(KJ_REQUIRE_NONNULL(autoResponsePair->response)));
}
return kj::none;
}

void HibernationManagerImpl::setTimerChannel(TimerChannel& timerChannel) {
Expand Down Expand Up @@ -188,10 +194,12 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {

auto skip = false;

KJ_IF_SOME (reqResp, autoResponsePair) {
// If we have a request != kj::none, we can compare it the received message. This also implies
// that we have a response set in autoResponsePair.
KJ_IF_SOME (req, autoResponsePair->request) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
if (text == (reqResp)->getRequest()) {
if (text == req) {
// If the received message matches the one set for auto-response, we must
// short-circuit readLoop, store the current timestamp and and automatically respond
// with the expected response.
Expand All @@ -207,15 +215,20 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
// If the actor is not hibernated/If the WebSocket is active, we need to update
// autoResponseTimestamp on the active websocket.
apiWs->setAutoResponseStatus(hib.autoResponseTimestamp, kj::READY_NOW);
co_await apiWs->sendAutoResponse(kj::str(reqResp->getResponse().asArray()), ws);
// Since we had a request set, we must have and response that's sent back using the
// same websocket here. The sending of response is managed in web-socket to avoid
// possible racing problems with regular websocket messages.
co_await apiWs->sendAutoResponse(
kj::str(KJ_REQUIRE_NONNULL(autoResponsePair->response).asArray()), ws);
}
KJ_CASE_ONEOF(package, api::WebSocket::HibernationPackage) {
if (!package.closedOutgoingConnection) {
// We need to store the autoResponsePromise because we may instantiate an api::websocket
// If we do that, we have to provide it with the promise to avoid races. This can
// happen if we have a websocket hibernating, that unhibernates and sends a
// message while ws.send() for auto-response is also sending.
auto p = ws.send(reqResp->getResponse().asArray()).fork();
auto p = ws.send(
KJ_REQUIRE_NONNULL(autoResponsePair->response).asArray()).fork();
hib.autoResponsePromise = p.addBranch();
co_await p;
hib.autoResponsePromise = kj::READY_NOW;
Expand Down
15 changes: 12 additions & 3 deletions src/workerd/io/hibernation-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// This converts our activeOrPackage from an api::WebSocket to a HibernationPackage.
void hibernateWebSockets(Worker::Lock& lock) override;

void setWebSocketAutoResponse(jsg::Ref<api::WebSocketRequestResponsePair> reqResp) override;
void unsetWebSocketAutoResponse() override;
void setWebSocketAutoResponse(kj::Maybe<kj::StringPtr> request,
kj::Maybe<kj::StringPtr> response) override;
kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> getWebSocketAutoResponse() override;
void setTimerChannel(TimerChannel& timerChannel) override;

Expand Down Expand Up @@ -187,6 +187,15 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
TagCollection(TagCollection&& other) = default;
};

// This structure will hold the request and corresponding response for hibernatable websockets
// auto-response feature. Although we store 2 kj::Maybe strings, if we don't have a request set
// we can't have a response, and vice versa.
// TODO(cleanup): Remove kj::Maybe from request and response strings.
struct AutoRequestResponsePair {
kj::Maybe<kj::String> request = kj::none;
kj::Maybe<kj::String> response = kj::none;
};

// A hashmap of tags to HibernatableWebSockets associated with the tag.
// We use a kj::List so we can quickly remove websockets that have disconnected.
// Also note that we box the keys and values such that in the event of a hashmap resizing we don't
Expand Down Expand Up @@ -222,7 +231,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
};
DisconnectHandler onDisconnect;
kj::TaskSet readLoopTasks;
kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> autoResponsePair;
kj::Own<AutoRequestResponsePair> autoResponsePair = kj::heap<AutoRequestResponsePair>();
kj::Maybe<TimerChannel&> timer;
};
}; // namespace workerd
4 changes: 2 additions & 2 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ class Worker::Actor final: public kj::Refcounted {
jsg::Lock& js,
kj::Maybe<kj::StringPtr> tag) = 0;
virtual void hibernateWebSockets(Worker::Lock& lock) = 0;
virtual void setWebSocketAutoResponse(jsg::Ref<api::WebSocketRequestResponsePair> reqResp) = 0;
virtual void unsetWebSocketAutoResponse() = 0;
virtual void setWebSocketAutoResponse(kj::Maybe<kj::StringPtr> request,
kj::Maybe<kj::StringPtr> response) = 0;
virtual kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> getWebSocketAutoResponse() = 0;
virtual void setTimerChannel(TimerChannel& timerChannel) = 0;
};
Expand Down

0 comments on commit 6053b8e

Please sign in to comment.