Skip to content

Commit

Permalink
Add a new parameter to distinguish initial runAlarm from retries.
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Feb 15, 2024
1 parent a8b68b3 commit 3749029
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override;
kj::Promise<bool> test() override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

Expand Down Expand Up @@ -601,7 +601,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
}

kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarm(
kj::Date scheduledTime) {
kj::Date scheduledTime, bool isActorAlarmRetry) {
TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"runAlarm() can only be called once"));
Expand Down
17 changes: 9 additions & 8 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ public:
}
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override {
KJ_IF_SOME(w, worker) {
co_return co_await w.get()->runAlarm(scheduledTime);
co_return co_await w.get()->runAlarm(scheduledTime, isActorAlarmRetry);
} else {
co_await promise;
co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime);
co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, isActorAlarmRetry);
}
}

Expand Down Expand Up @@ -222,7 +222,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down Expand Up @@ -259,8 +259,8 @@ kj::Promise<WorkerInterface::ScheduledResult> RevocableWebSocketWorkerInterface:
return worker.runScheduled(scheduledTime, cron);
}

kj::Promise<WorkerInterface::AlarmResult> RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime) {
return worker.runAlarm(scheduledTime);
kj::Promise<WorkerInterface::AlarmResult> RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) {
return worker.runAlarm(scheduledTime, isActorAlarmRetry);
}

kj::Promise<WorkerInterface::CustomEvent::Result>
Expand Down Expand Up @@ -305,7 +305,7 @@ public:
kj::throwFatalException(kj::mv(exception));
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override {
kj::throwFatalException(kj::mv(exception));
}

Expand Down Expand Up @@ -373,9 +373,10 @@ kj::Promise<WorkerInterface::ScheduledResult> RpcWorkerInterface::runScheduled(
});
}

kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(kj::Date scheduledTime) {
kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) {
auto req = dispatcher.runAlarmRequest();
req.setScheduledTime((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS);
req.setIsRetry(isActorAlarmRetry);
return req.send().then([](auto resp) {
auto respResult = resp.getResult();
return WorkerInterface::AlarmResult {
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ interface EventDispatcher @0xf20697475ec1752d {
# the outcome and whether the run should be retried. This does not complete immediately.


runAlarm @4 (scheduledTime :Int64) -> (result :AlarmRun);
runAlarm @4 (scheduledTime :Int64, isRetry :Bool) -> (result :AlarmRun);
# Runs a worker's alarm.
# scheduledTime is a unix timestamp in milliseconds for when the alarm should be run
# isRetry indicates wether this runAlarm is a retry or not.
# Returns an AlarmRun, detailing information about the run such as
# the outcome and whether the run should be retried. This does not complete immediately.
#
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WorkerInterface: public kj::HttpService {
virtual kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) = 0;

// Trigger an alarm event with the given scheduled (unix timestamp) time.
virtual kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) = 0;
virtual kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) = 0;

// Run the test handler. The returned promise resolves to true or false to indicate that the test
// passed or failed. In the case of a failure, information should have already been written to
Expand Down Expand Up @@ -178,7 +178,7 @@ class RpcWorkerInterface: public WorkerInterface {

void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3105,8 +3105,9 @@ void Worker::Actor::Impl::HooksImpl::updateAlarmInMemory(kj::Maybe<kj::Date> new

for (auto i : kj::zeroTo(WorkerInterface::ALARM_RETRY_MAX_TRIES)) {
co_await timerChannel.atTime(scheduledTime);
// TODO(now): Set correct retry info here.
auto result = co_await loopback->getWorker(IoChannelFactory::SubrequestMetadata{})
->runAlarm(originalTime);
->runAlarm(originalTime, false);

if (result.outcome == EventOutcome::OK || !result.retry) {
break;
Expand Down Expand Up @@ -3475,7 +3476,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down Expand Up @@ -3700,8 +3701,8 @@ kj::Promise<WorkerInterface::ScheduledResult> Worker::Isolate::SubrequestClient:
return inner->runScheduled(scheduledTime, cron);
}
kj::Promise<WorkerInterface::AlarmResult> Worker::Isolate::SubrequestClient::runAlarm(
kj::Date scheduledTime) {
return inner->runAlarm(scheduledTime);
kj::Date scheduledTime, bool isActorAlarmRetry) {
return inner->runAlarm(scheduledTime, isActorAlarmRetry);
}
kj::Promise<WorkerInterface::CustomEvent::Result>
Worker::Isolate::SubrequestClient::customEvent(kj::Own<CustomEvent> event) {
Expand Down
14 changes: 8 additions & 6 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) {
}

kj::Promise<AlarmScheduler::RetryInfo> AlarmScheduler::runAlarm(
const ActorKey& actor, kj::Date scheduledTime) {
const ActorKey& actor, kj::Date scheduledTime, bool isRetry) {
KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) {
auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime);
auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, isRetry);

co_return RetryInfo {
.retry = result.outcome != EventOutcome::OK && result.retry,
Expand All @@ -166,7 +166,7 @@ kj::Promise<AlarmScheduler::RetryInfo> AlarmScheduler::runAlarm(

AlarmScheduler::ScheduledAlarm AlarmScheduler::scheduleAlarm(
kj::Date now, kj::Own<ActorKey> actor, kj::Date scheduledTime) {
auto task = makeAlarmTask(scheduledTime - now, *actor, scheduledTime);
auto task = makeAlarmTask(scheduledTime - now, *actor, scheduledTime, false);

return ScheduledAlarm { kj::mv(actor), scheduledTime, kj::mv(task) };
}
Expand All @@ -187,7 +187,8 @@ kj::Promise<void> AlarmScheduler::checkTimestamp(kj::Duration delay, kj::Date sc

kj::Promise<void> AlarmScheduler::makeAlarmTask(kj::Duration delay,
const ActorKey& actorRef,
kj::Date scheduledTime) {
kj::Date scheduledTime,
bool isRetry) {
co_await checkTimestamp(delay, scheduledTime);

{
Expand All @@ -197,7 +198,7 @@ kj::Promise<void> AlarmScheduler::makeAlarmTask(kj::Duration delay,

auto retryInfo = co_await ([&]() -> kj::Promise<RetryInfo> {
try {
co_return co_await runAlarm(actorRef, scheduledTime);
co_return co_await runAlarm(actorRef, scheduledTime, isRetry);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
KJ_LOG(WARNING, exception);
Expand Down Expand Up @@ -265,7 +266,8 @@ kj::Promise<void> AlarmScheduler::makeAlarmTask(kj::Duration delay,
entry.value.backoff++;
entry.value.retry++;

entry.value.task = makeAlarmTask(delay, actorRef, scheduledTime);
// This alarm task is a retry. As such, we set isRetry to true.
entry.value.task = makeAlarmTask(delay, actorRef, scheduledTime, true);
} else {
KJ_ASSERT(entry.value.queuedAlarm == kj::none);
deleteAlarm(actorRef);
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ class AlarmScheduler final : kj::TaskSet::ErrorHandler {
bool retry;
bool retryCountsAgainstLimit;
};
kj::Promise<RetryInfo> runAlarm(const ActorKey& actor, kj::Date scheduledTime);
kj::Promise<RetryInfo> runAlarm(const ActorKey& actor, kj::Date scheduledTime, bool isRetry);

void setAlarmInMemory(kj::Own<ActorKey> actor, kj::Date scheduledTime);

ScheduledAlarm scheduleAlarm(kj::Date now, kj::Own<ActorKey> actor, kj::Date scheduledTime);

kj::Promise<void> makeAlarmTask(kj::Duration delay, const ActorKey& actor, kj::Date scheduledTime);
kj::Promise<void> makeAlarmTask(kj::Duration delay, const ActorKey& actor, kj::Date scheduledTime, bool isRetry);

kj::Promise<void> checkTimestamp(kj::Duration delay, kj::Date scheduledTime);

Expand Down

0 comments on commit 3749029

Please sign in to comment.