diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 0bb9b50d50df..5e68c3aee94f 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -60,7 +60,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override; kj::Promise test() override; kj::Promise customEvent(kj::Own event) override; @@ -601,7 +601,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( } kj::Promise WorkerEntrypoint::runAlarm( - kj::Date scheduledTime) { + kj::Date scheduledTime, bool isActorAlarmRetry) { TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()"); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "runAlarm() can only be called once")); diff --git a/src/workerd/io/worker-interface.c++ b/src/workerd/io/worker-interface.c++ index 328d7f269fce..f2ce14231bfe 100644 --- a/src/workerd/io/worker-interface.c++ +++ b/src/workerd/io/worker-interface.c++ @@ -71,12 +71,12 @@ public: } } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override { KJ_IF_SOME(w, worker) { - co_return co_await w.get()->runAlarm(scheduledTime); + co_return co_await w.get()->runAlarm(scheduledTime, isActorAlarmRetry); } else { co_await promise; - co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime); + co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, isActorAlarmRetry); } } @@ -222,7 +222,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override; kj::Promise customEvent(kj::Own event) override; private: @@ -259,8 +259,8 @@ kj::Promise RevocableWebSocketWorkerInterface: return worker.runScheduled(scheduledTime, cron); } -kj::Promise RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime) { - return worker.runAlarm(scheduledTime); +kj::Promise RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) { + return worker.runAlarm(scheduledTime, isActorAlarmRetry); } kj::Promise @@ -305,7 +305,7 @@ public: kj::throwFatalException(kj::mv(exception)); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override { kj::throwFatalException(kj::mv(exception)); } @@ -373,9 +373,10 @@ kj::Promise RpcWorkerInterface::runScheduled( }); } -kj::Promise RpcWorkerInterface::runAlarm(kj::Date scheduledTime) { +kj::Promise RpcWorkerInterface::runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) { auto req = dispatcher.runAlarmRequest(); req.setScheduledTime((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS); + req.setIsRetry(isActorAlarmRetry); return req.send().then([](auto resp) { auto respResult = resp.getResult(); return WorkerInterface::AlarmResult { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index f81dac659a2c..de490d54b6dc 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -214,9 +214,10 @@ interface EventDispatcher @0xf20697475ec1752d { # the outcome and whether the run should be retried. This does not complete immediately. - runAlarm @4 (scheduledTime :Int64) -> (result :AlarmRun); + runAlarm @4 (scheduledTime :Int64, isRetry :Bool) -> (result :AlarmRun); # Runs a worker's alarm. # scheduledTime is a unix timestamp in milliseconds for when the alarm should be run + # isRetry indicates wether this runAlarm is a retry or not. # Returns an AlarmRun, detailing information about the run such as # the outcome and whether the run should be retried. This does not complete immediately. # diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index 4b74db658c6d..678807a51666 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -80,7 +80,7 @@ class WorkerInterface: public kj::HttpService { virtual kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) = 0; // Trigger an alarm event with the given scheduled (unix timestamp) time. - virtual kj::Promise runAlarm(kj::Date scheduledTime) = 0; + virtual kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) = 0; // Run the test handler. The returned promise resolves to true or false to indicate that the test // passed or failed. In the case of a failure, information should have already been written to @@ -178,7 +178,7 @@ class RpcWorkerInterface: public WorkerInterface { void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override; kj::Promise customEvent(kj::Own event) override; private: diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index d0a40c875261..f0f23727646d 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3105,8 +3105,9 @@ void Worker::Actor::Impl::HooksImpl::updateAlarmInMemory(kj::Maybe new for (auto i : kj::zeroTo(WorkerInterface::ALARM_RETRY_MAX_TRIES)) { co_await timerChannel.atTime(scheduledTime); + // TODO(now): Set correct retry info here. auto result = co_await loopback->getWorker(IoChannelFactory::SubrequestMetadata{}) - ->runAlarm(originalTime); + ->runAlarm(originalTime, false); if (result.outcome == EventOutcome::OK || !result.retry) { break; @@ -3475,7 +3476,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, bool isActorAlarmRetry) override; kj::Promise customEvent(kj::Own event) override; private: @@ -3700,8 +3701,8 @@ kj::Promise Worker::Isolate::SubrequestClient: return inner->runScheduled(scheduledTime, cron); } kj::Promise Worker::Isolate::SubrequestClient::runAlarm( - kj::Date scheduledTime) { - return inner->runAlarm(scheduledTime); + kj::Date scheduledTime, bool isActorAlarmRetry) { + return inner->runAlarm(scheduledTime, isActorAlarmRetry); } kj::Promise Worker::Isolate::SubrequestClient::customEvent(kj::Own event) { diff --git a/src/workerd/server/alarm-scheduler.c++ b/src/workerd/server/alarm-scheduler.c++ index d4f1031fbe1c..8caf43bd236c 100644 --- a/src/workerd/server/alarm-scheduler.c++ +++ b/src/workerd/server/alarm-scheduler.c++ @@ -151,9 +151,9 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) { } kj::Promise AlarmScheduler::runAlarm( - const ActorKey& actor, kj::Date scheduledTime) { + const ActorKey& actor, kj::Date scheduledTime, bool isRetry) { KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) { - auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime); + auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, isRetry); co_return RetryInfo { .retry = result.outcome != EventOutcome::OK && result.retry, @@ -166,7 +166,7 @@ kj::Promise AlarmScheduler::runAlarm( AlarmScheduler::ScheduledAlarm AlarmScheduler::scheduleAlarm( kj::Date now, kj::Own actor, kj::Date scheduledTime) { - auto task = makeAlarmTask(scheduledTime - now, *actor, scheduledTime); + auto task = makeAlarmTask(scheduledTime - now, *actor, scheduledTime, false); return ScheduledAlarm { kj::mv(actor), scheduledTime, kj::mv(task) }; } @@ -187,7 +187,8 @@ kj::Promise AlarmScheduler::checkTimestamp(kj::Duration delay, kj::Date sc kj::Promise AlarmScheduler::makeAlarmTask(kj::Duration delay, const ActorKey& actorRef, - kj::Date scheduledTime) { + kj::Date scheduledTime, + bool isRetry) { co_await checkTimestamp(delay, scheduledTime); { @@ -197,7 +198,7 @@ kj::Promise AlarmScheduler::makeAlarmTask(kj::Duration delay, auto retryInfo = co_await ([&]() -> kj::Promise { try { - co_return co_await runAlarm(actorRef, scheduledTime); + co_return co_await runAlarm(actorRef, scheduledTime, isRetry); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); KJ_LOG(WARNING, exception); @@ -265,7 +266,8 @@ kj::Promise AlarmScheduler::makeAlarmTask(kj::Duration delay, entry.value.backoff++; entry.value.retry++; - entry.value.task = makeAlarmTask(delay, actorRef, scheduledTime); + // This alarm task is a retry. As such, we set isRetry to true. + entry.value.task = makeAlarmTask(delay, actorRef, scheduledTime, true); } else { KJ_ASSERT(entry.value.queuedAlarm == kj::none); deleteAlarm(actorRef); diff --git a/src/workerd/server/alarm-scheduler.h b/src/workerd/server/alarm-scheduler.h index 7c02ac3a3d20..1efb4eec8267 100644 --- a/src/workerd/server/alarm-scheduler.h +++ b/src/workerd/server/alarm-scheduler.h @@ -112,13 +112,13 @@ class AlarmScheduler final : kj::TaskSet::ErrorHandler { bool retry; bool retryCountsAgainstLimit; }; - kj::Promise runAlarm(const ActorKey& actor, kj::Date scheduledTime); + kj::Promise runAlarm(const ActorKey& actor, kj::Date scheduledTime, bool isRetry); void setAlarmInMemory(kj::Own actor, kj::Date scheduledTime); ScheduledAlarm scheduleAlarm(kj::Date now, kj::Own actor, kj::Date scheduledTime); - kj::Promise makeAlarmTask(kj::Duration delay, const ActorKey& actor, kj::Date scheduledTime); + kj::Promise makeAlarmTask(kj::Duration delay, const ActorKey& actor, kj::Date scheduledTime, bool isRetry); kj::Promise checkTimestamp(kj::Duration delay, kj::Date scheduledTime);