From 0aed9091da5b109944658b574a50f98071fba4f3 Mon Sep 17 00:00:00 2001 From: Joaquim Silva Date: Thu, 15 Feb 2024 15:35:21 +0000 Subject: [PATCH] Add a new parameter to distinguish initial runAlarm from retries. --- src/workerd/api/global-scope.c++ | 5 +++-- src/workerd/api/global-scope.h | 23 +++++++++++++++++++++-- src/workerd/io/worker-entrypoint.c++ | 14 +++++++------- src/workerd/io/worker-interface.c++ | 17 +++++++++-------- src/workerd/io/worker-interface.capnp | 3 ++- src/workerd/io/worker-interface.h | 4 ++-- src/workerd/io/worker.c++ | 8 ++++---- src/workerd/server/alarm-scheduler.c++ | 9 +++++---- src/workerd/server/alarm-scheduler.h | 2 +- src/workerd/server/server.c++ | 8 ++++---- 10 files changed, 58 insertions(+), 35 deletions(-) diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index a5217f8df125..b5204ad7aac9 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -365,6 +365,7 @@ void ServiceWorkerGlobalScope::startScheduled( kj::Promise ServiceWorkerGlobalScope::runAlarm( kj::Date scheduledTime, kj::Duration timeout, + uint32_t retryCount, Worker::Lock& lock, kj::Maybe exportedHandler) { auto& context = IoContext::current(); @@ -388,7 +389,7 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); return context - .run([exportedHandler, &context, timeout, &alarm, + .run([exportedHandler, &context, timeout, retryCount, &alarm, maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)] (Worker::Lock& lock) mutable -> kj::Promise { jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); @@ -421,7 +422,7 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( }; }); - return alarm(lock).then([]() -> kj::Promise { + return alarm(lock, jsg::alloc(retryCount)).then([]() -> kj::Promise { return WorkerInterface::AlarmResult { .retry = false, .outcome = EventOutcome::OK diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 1b01dd0ca17d..5de8e556b38e 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -183,6 +183,23 @@ class ExecutionContext: public jsg::Object { } }; +// AlarmEventInfo is a jsg::Object used to pass alarm invocation info to an alarm handler. +class AlarmInvocationInfo: public jsg::Object { +public: + AlarmInvocationInfo(uint32_t retry): retryCount(retry) {} + + bool getIsRetry() { return retryCount > 0; } + uint32_t getRetryCount() { return retryCount; } + + JSG_RESOURCE_TYPE(AlarmInvocationInfo) { + JSG_READONLY_INSTANCE_PROPERTY(isRetry, getIsRetry); + JSG_READONLY_INSTANCE_PROPERTY(retryCount, getRetryCount); + } + +private: + uint32_t retryCount = 0; +}; + // Type signature for handlers exported from the root module. // // We define each handler method as a LenientOptional rather than as a plain Optional in order to @@ -202,7 +219,7 @@ struct ExportedHandler { jsg::Ref controller, jsg::Value env, jsg::Optional> ctx); jsg::LenientOptional> scheduled; - typedef kj::Promise AlarmHandler(); + typedef kj::Promise AlarmHandler(jsg::Ref alarmInfo); // Alarms are only exported on DOs, which receive env bindings from the constructor jsg::LenientOptional> alarm; @@ -313,6 +330,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { kj::Promise runAlarm( kj::Date scheduledTime, kj::Duration timeout, + uint32_t retryCount, Worker::Lock& lock, kj::Maybe exportedHandler); // Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns @@ -691,6 +709,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { api::ServiceWorkerGlobalScope::StructuredCloneOptions, \ api::PromiseRejectionEvent, \ api::Navigator, \ - api::Performance + api::Performance, \ + api::AlarmInvocationInfo // The list of global-scope.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE } // namespace workerd::api diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 1ca6d7064136..2e589ef68030 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -60,7 +60,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise test() override; kj::Promise customEvent(kj::Own event) override; @@ -96,7 +96,7 @@ private: kj::Promise maybeAddGcPassForTest(IoContext& context, kj::Promise promise); kj::Promise runAlarmImpl( - kj::Own incomingRequest, kj::Date scheduledTime); + kj::Own incomingRequest, kj::Date scheduledTime, uint32_t retryCount); public: // For kj::heap() only; pretend this is private. WorkerEntrypoint(kj::Badge badge, @@ -502,7 +502,7 @@ kj::Promise WorkerEntrypoint::runScheduled( } kj::Promise WorkerEntrypoint::runAlarmImpl( - kj::Own incomingRequest, kj::Date scheduledTime) { + kj::Own incomingRequest, kj::Date scheduledTime, uint32_t retryCount) { // We want to de-duplicate alarm requests as follows: // - An alarm must not be canceled once it is running, UNLESS the whole actor is shut down. // - If multiple alarm invocations arrive with the same scheduled time, we only run one. @@ -551,7 +551,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( try { auto result = co_await context.run( - [scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){ + [scheduledTime, retryCount, entrypointName=entrypointName, &context](Worker::Lock& lock){ jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); // If we have an invalid timeout, set it to the default value of 15 minutes. @@ -562,7 +562,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( } auto handler = lock.getExportedHandler(entrypointName, context.getActor()); - return lock.getGlobalScope().runAlarm(scheduledTime, timeout, lock, handler); + return lock.getGlobalScope().runAlarm(scheduledTime, timeout, retryCount, lock, handler); }); // The alarm handler was successfully complete. We must guarantee this same alarm does not @@ -600,14 +600,14 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( } kj::Promise WorkerEntrypoint::runAlarm( - kj::Date scheduledTime) { + kj::Date scheduledTime, uint32_t retryCount) { TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()"); auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "runAlarm() can only be called once")); this->incomingRequest = kj::none; auto& context = incomingRequest->getContext(); - auto promise = runAlarmImpl(kj::mv(incomingRequest), scheduledTime); + auto promise = runAlarmImpl(kj::mv(incomingRequest), scheduledTime, retryCount); return maybeAddGcPassForTest(context, kj::mv(promise)); } diff --git a/src/workerd/io/worker-interface.c++ b/src/workerd/io/worker-interface.c++ index e5dec1f95174..dd32fceb8270 100644 --- a/src/workerd/io/worker-interface.c++ +++ b/src/workerd/io/worker-interface.c++ @@ -71,12 +71,12 @@ public: } } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { KJ_IF_SOME(w, worker) { - co_return co_await w.get()->runAlarm(scheduledTime); + co_return co_await w.get()->runAlarm(scheduledTime, retryCount); } else { co_await promise; - co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime); + co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, retryCount); } } @@ -226,7 +226,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; private: @@ -263,8 +263,8 @@ kj::Promise RevocableWebSocketWorkerInterface: return worker.runScheduled(scheduledTime, cron); } -kj::Promise RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime) { - return worker.runAlarm(scheduledTime); +kj::Promise RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime, uint32_t retryCount) { + return worker.runAlarm(scheduledTime, retryCount); } kj::Promise @@ -309,7 +309,7 @@ public: kj::throwFatalException(kj::mv(exception)); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { kj::throwFatalException(kj::mv(exception)); } @@ -377,9 +377,10 @@ kj::Promise RpcWorkerInterface::runScheduled( }); } -kj::Promise RpcWorkerInterface::runAlarm(kj::Date scheduledTime) { +kj::Promise RpcWorkerInterface::runAlarm(kj::Date scheduledTime, uint32_t retryCount) { auto req = dispatcher.runAlarmRequest(); req.setScheduledTime((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS); + req.setRetryCount(retryCount); return req.send().then([](auto resp) { auto respResult = resp.getResult(); return WorkerInterface::AlarmResult { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 2189df087d80..334ee9ac917d 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -245,9 +245,10 @@ interface EventDispatcher @0xf20697475ec1752d { # the outcome and whether the run should be retried. This does not complete immediately. - runAlarm @4 (scheduledTime :Int64) -> (result :AlarmRun); + runAlarm @4 (scheduledTime :Int64, retryCount :UInt32) -> (result :AlarmRun); # Runs a worker's alarm. # scheduledTime is a unix timestamp in milliseconds for when the alarm should be run + # retryCount indicates the retry count, if it's a retry. Else it'll be 0. # Returns an AlarmRun, detailing information about the run such as # the outcome and whether the run should be retried. This does not complete immediately. # diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index 4b74db658c6d..6aa041c78148 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -80,7 +80,7 @@ class WorkerInterface: public kj::HttpService { virtual kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) = 0; // Trigger an alarm event with the given scheduled (unix timestamp) time. - virtual kj::Promise runAlarm(kj::Date scheduledTime) = 0; + virtual kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) = 0; // Run the test handler. The returned promise resolves to true or false to indicate that the test // passed or failed. In the case of a failure, information should have already been written to @@ -178,7 +178,7 @@ class RpcWorkerInterface: public WorkerInterface { void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; private: diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index fde3a864c069..30c723b06af5 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3216,7 +3216,7 @@ void Worker::Actor::Impl::HooksImpl::updateAlarmInMemory(kj::Maybe new for (auto i : kj::zeroTo(WorkerInterface::ALARM_RETRY_MAX_TRIES)) { co_await timerChannel.atTime(scheduledTime); auto result = co_await loopback->getWorker(IoChannelFactory::SubrequestMetadata{}) - ->runAlarm(originalTime); + ->runAlarm(originalTime, i); if (result.outcome == EventOutcome::OK || !result.retry) { break; @@ -3585,7 +3585,7 @@ public: kj::HttpConnectSettings settings) override; void prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; - kj::Promise runAlarm(kj::Date scheduledTime) override; + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; private: @@ -3810,8 +3810,8 @@ kj::Promise Worker::Isolate::SubrequestClient: return inner->runScheduled(scheduledTime, cron); } kj::Promise Worker::Isolate::SubrequestClient::runAlarm( - kj::Date scheduledTime) { - return inner->runAlarm(scheduledTime); + kj::Date scheduledTime, uint32_t retryCount) { + return inner->runAlarm(scheduledTime, retryCount); } kj::Promise Worker::Isolate::SubrequestClient::customEvent(kj::Own event) { diff --git a/src/workerd/server/alarm-scheduler.c++ b/src/workerd/server/alarm-scheduler.c++ index d4f1031fbe1c..5f6076a8e074 100644 --- a/src/workerd/server/alarm-scheduler.c++ +++ b/src/workerd/server/alarm-scheduler.c++ @@ -151,9 +151,9 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) { } kj::Promise AlarmScheduler::runAlarm( - const ActorKey& actor, kj::Date scheduledTime) { + const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount) { KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) { - auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime); + auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount); co_return RetryInfo { .retry = result.outcome != EventOutcome::OK && result.retry, @@ -189,15 +189,16 @@ kj::Promise AlarmScheduler::makeAlarmTask(kj::Duration delay, const ActorKey& actorRef, kj::Date scheduledTime) { co_await checkTimestamp(delay, scheduledTime); - + uint32_t retryCount = 0; { auto& entry = KJ_ASSERT_NONNULL(alarms.findEntry(actorRef)); entry.value.status = AlarmStatus::STARTED; + retryCount = entry.value.countedRetry; } auto retryInfo = co_await ([&]() -> kj::Promise { try { - co_return co_await runAlarm(actorRef, scheduledTime); + co_return co_await runAlarm(actorRef, scheduledTime, retryCount); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); KJ_LOG(WARNING, exception); diff --git a/src/workerd/server/alarm-scheduler.h b/src/workerd/server/alarm-scheduler.h index 7c02ac3a3d20..33cbc2e110e6 100644 --- a/src/workerd/server/alarm-scheduler.h +++ b/src/workerd/server/alarm-scheduler.h @@ -112,7 +112,7 @@ class AlarmScheduler final : kj::TaskSet::ErrorHandler { bool retry; bool retryCountsAgainstLimit; }; - kj::Promise runAlarm(const ActorKey& actor, kj::Date scheduledTime); + kj::Promise runAlarm(const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount); void setAlarmInMemory(kj::Own actor, kj::Date scheduledTime); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 798142da3617..9b1ef6888ca1 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -504,7 +504,7 @@ private: kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { throwUnsupported(); } kj::Promise customEvent(kj::Own event) override { @@ -578,7 +578,7 @@ private: kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { throwUnsupported(); } kj::Promise customEvent(kj::Own event) override { @@ -736,7 +736,7 @@ private: kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { throwUnsupported(); } kj::Promise customEvent(kj::Own event) override { @@ -992,7 +992,7 @@ private: kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } - kj::Promise runAlarm(kj::Date scheduledTime) override { + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { throwUnsupported(); } kj::Promise customEvent(kj::Own event) override {