Skip to content

Commit

Permalink
Add a new parameter to distinguish initial runAlarm from retries.
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Feb 27, 2024
1 parent fa0e7bf commit 7014c32
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ void ServiceWorkerGlobalScope::startScheduled(
kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
kj::Date scheduledTime,
kj::Duration timeout,
uint32_t retryCount,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler) {

auto& context = IoContext::current();
Expand All @@ -388,15 +389,14 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

return context
.run([exportedHandler, &context, timeout, &alarm,
.run([exportedHandler, &context, timeout, retryCount, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]
(Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then([&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
Expand All @@ -421,7 +421,7 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
};
});

return alarm(lock).then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return alarm(lock, jsg::alloc<AlarmInvocationInfo>(retryCount)).then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult {
.retry = false,
.outcome = EventOutcome::OK
Expand Down
23 changes: 21 additions & 2 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ class ExecutionContext: public jsg::Object {
}
};

// AlarmEventInfo is a jsg::Object used to pass alarm invocation info to an alarm handler.
class AlarmInvocationInfo: public jsg::Object {
public:
AlarmInvocationInfo(uint32_t retry): retryCount(retry) {}

bool getIsRetry() { return retryCount > 0; }
uint32_t getRetryCount() { return retryCount; }

JSG_RESOURCE_TYPE(AlarmInvocationInfo) {
JSG_READONLY_INSTANCE_PROPERTY(isRetry, getIsRetry);
JSG_READONLY_INSTANCE_PROPERTY(retryCount, getRetryCount);
}

private:
uint32_t retryCount = 0;
};

// Type signature for handlers exported from the root module.
//
// We define each handler method as a LenientOptional rather than as a plain Optional in order to
Expand All @@ -202,7 +219,7 @@ struct ExportedHandler {
jsg::Ref<ScheduledController> controller, jsg::Value env, jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
jsg::LenientOptional<jsg::Function<ScheduledHandler>> scheduled;

typedef kj::Promise<void> AlarmHandler();
typedef kj::Promise<void> AlarmHandler(jsg::Ref<AlarmInvocationInfo> alarmInfo);
// Alarms are only exported on DOs, which receive env bindings from the constructor
jsg::LenientOptional<jsg::Function<AlarmHandler>> alarm;

Expand Down Expand Up @@ -313,6 +330,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
kj::Promise<WorkerInterface::AlarmResult> runAlarm(
kj::Date scheduledTime,
kj::Duration timeout,
uint32_t retryCount,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler);

// Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns
Expand Down Expand Up @@ -691,6 +709,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
api::ServiceWorkerGlobalScope::StructuredCloneOptions, \
api::PromiseRejectionEvent, \
api::Navigator, \
api::Performance
api::Performance, \
api::AlarmInvocationInfo
// The list of global-scope.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE
} // namespace workerd::api
14 changes: 7 additions & 7 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<bool> test() override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

Expand Down Expand Up @@ -96,7 +96,7 @@ private:
kj::Promise<T> maybeAddGcPassForTest(IoContext& context, kj::Promise<T> promise);

kj::Promise<WorkerEntrypoint::AlarmResult> runAlarmImpl(
kj::Own<IoContext::IncomingRequest> incomingRequest, kj::Date scheduledTime);
kj::Own<IoContext::IncomingRequest> incomingRequest, kj::Date scheduledTime, uint32_t retryCount);

public: // For kj::heap() only; pretend this is private.
WorkerEntrypoint(kj::Badge<WorkerEntrypoint> badge,
Expand Down Expand Up @@ -502,7 +502,7 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
}

kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
kj::Own<IoContext::IncomingRequest> incomingRequest, kj::Date scheduledTime) {
kj::Own<IoContext::IncomingRequest> incomingRequest, kj::Date scheduledTime, uint32_t retryCount) {
// We want to de-duplicate alarm requests as follows:
// - An alarm must not be canceled once it is running, UNLESS the whole actor is shut down.
// - If multiple alarm invocations arrive with the same scheduled time, we only run one.
Expand Down Expand Up @@ -551,7 +551,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(

try {
auto result = co_await context.run(
[scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){
[scheduledTime, retryCount, entrypointName=entrypointName, &context](Worker::Lock& lock){
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

// If we have an invalid timeout, set it to the default value of 15 minutes.
Expand All @@ -562,7 +562,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
}

auto handler = lock.getExportedHandler(entrypointName, context.getActor());
return lock.getGlobalScope().runAlarm(scheduledTime, timeout, lock, handler);
return lock.getGlobalScope().runAlarm(scheduledTime, timeout, retryCount, lock, handler);
});

// The alarm handler was successfully complete. We must guarantee this same alarm does not
Expand Down Expand Up @@ -600,14 +600,14 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
}

kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarm(
kj::Date scheduledTime) {
kj::Date scheduledTime, uint32_t retryCount) {
TRACE_EVENT("workerd", "WorkerEntrypoint::runAlarm()");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
"runAlarm() can only be called once"));
this->incomingRequest = kj::none;

auto& context = incomingRequest->getContext();
auto promise = runAlarmImpl(kj::mv(incomingRequest), scheduledTime);
auto promise = runAlarmImpl(kj::mv(incomingRequest), scheduledTime, retryCount);
return maybeAddGcPassForTest(context, kj::mv(promise));
}

Expand Down
17 changes: 9 additions & 8 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ public:
}
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
KJ_IF_SOME(w, worker) {
co_return co_await w.get()->runAlarm(scheduledTime);
co_return co_await w.get()->runAlarm(scheduledTime, retryCount);
} else {
co_await promise;
co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime);
co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, retryCount);
}
}

Expand Down Expand Up @@ -226,7 +226,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down Expand Up @@ -263,8 +263,8 @@ kj::Promise<WorkerInterface::ScheduledResult> RevocableWebSocketWorkerInterface:
return worker.runScheduled(scheduledTime, cron);
}

kj::Promise<WorkerInterface::AlarmResult> RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime) {
return worker.runAlarm(scheduledTime);
kj::Promise<WorkerInterface::AlarmResult> RevocableWebSocketWorkerInterface::runAlarm(kj::Date scheduledTime, uint32_t retryCount) {
return worker.runAlarm(scheduledTime, retryCount);
}

kj::Promise<WorkerInterface::CustomEvent::Result>
Expand Down Expand Up @@ -309,7 +309,7 @@ public:
kj::throwFatalException(kj::mv(exception));
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
kj::throwFatalException(kj::mv(exception));
}

Expand Down Expand Up @@ -377,9 +377,10 @@ kj::Promise<WorkerInterface::ScheduledResult> RpcWorkerInterface::runScheduled(
});
}

kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(kj::Date scheduledTime) {
kj::Promise<WorkerInterface::AlarmResult> RpcWorkerInterface::runAlarm(kj::Date scheduledTime, uint32_t retryCount) {
auto req = dispatcher.runAlarmRequest();
req.setScheduledTime((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS);
req.setRetryCount(retryCount);
return req.send().then([](auto resp) {
auto respResult = resp.getResult();
return WorkerInterface::AlarmResult {
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ interface EventDispatcher @0xf20697475ec1752d {
# the outcome and whether the run should be retried. This does not complete immediately.


runAlarm @4 (scheduledTime :Int64) -> (result :AlarmRun);
runAlarm @4 (scheduledTime :Int64, retryCount :UInt32) -> (result :AlarmRun);
# Runs a worker's alarm.
# scheduledTime is a unix timestamp in milliseconds for when the alarm should be run
# retryCount indicates the retry count, if it's a retry. Else it'll be 0.
# Returns an AlarmRun, detailing information about the run such as
# the outcome and whether the run should be retried. This does not complete immediately.
#
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WorkerInterface: public kj::HttpService {
virtual kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) = 0;

// Trigger an alarm event with the given scheduled (unix timestamp) time.
virtual kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) = 0;
virtual kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) = 0;

// Run the test handler. The returned promise resolves to true or false to indicate that the test
// passed or failed. In the case of a failure, information should have already been written to
Expand Down Expand Up @@ -178,7 +178,7 @@ class RpcWorkerInterface: public WorkerInterface {

void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3216,7 +3216,7 @@ void Worker::Actor::Impl::HooksImpl::updateAlarmInMemory(kj::Maybe<kj::Date> new
for (auto i : kj::zeroTo(WorkerInterface::ALARM_RETRY_MAX_TRIES)) {
co_await timerChannel.atTime(scheduledTime);
auto result = co_await loopback->getWorker(IoChannelFactory::SubrequestMetadata{})
->runAlarm(originalTime);
->runAlarm(originalTime, i);

if (result.outcome == EventOutcome::OK || !result.retry) {
break;
Expand Down Expand Up @@ -3585,7 +3585,7 @@ public:
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;

private:
Expand Down Expand Up @@ -3810,8 +3810,8 @@ kj::Promise<WorkerInterface::ScheduledResult> Worker::Isolate::SubrequestClient:
return inner->runScheduled(scheduledTime, cron);
}
kj::Promise<WorkerInterface::AlarmResult> Worker::Isolate::SubrequestClient::runAlarm(
kj::Date scheduledTime) {
return inner->runAlarm(scheduledTime);
kj::Date scheduledTime, uint32_t retryCount) {
return inner->runAlarm(scheduledTime, retryCount);
}
kj::Promise<WorkerInterface::CustomEvent::Result>
Worker::Isolate::SubrequestClient::customEvent(kj::Own<CustomEvent> event) {
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) {
}

kj::Promise<AlarmScheduler::RetryInfo> AlarmScheduler::runAlarm(
const ActorKey& actor, kj::Date scheduledTime) {
const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount) {
KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) {
auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime);
auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount);

co_return RetryInfo {
.retry = result.outcome != EventOutcome::OK && result.retry,
Expand Down Expand Up @@ -189,15 +189,16 @@ kj::Promise<void> AlarmScheduler::makeAlarmTask(kj::Duration delay,
const ActorKey& actorRef,
kj::Date scheduledTime) {
co_await checkTimestamp(delay, scheduledTime);

uint32_t retryCount = 0;
{
auto& entry = KJ_ASSERT_NONNULL(alarms.findEntry(actorRef));
entry.value.status = AlarmStatus::STARTED;
retryCount = entry.value.countedRetry;
}

auto retryInfo = co_await ([&]() -> kj::Promise<RetryInfo> {
try {
co_return co_await runAlarm(actorRef, scheduledTime);
co_return co_await runAlarm(actorRef, scheduledTime, retryCount);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
KJ_LOG(WARNING, exception);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class AlarmScheduler final : kj::TaskSet::ErrorHandler {
bool retry;
bool retryCountsAgainstLimit;
};
kj::Promise<RetryInfo> runAlarm(const ActorKey& actor, kj::Date scheduledTime);
kj::Promise<RetryInfo> runAlarm(const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount);

void setAlarmInMemory(kj::Own<ActorKey> actor, kj::Date scheduledTime);

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private:
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
throwUnsupported();
}
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
Expand Down Expand Up @@ -578,7 +578,7 @@ private:
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
throwUnsupported();
}
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
Expand Down Expand Up @@ -736,7 +736,7 @@ private:
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
throwUnsupported();
}
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
Expand Down Expand Up @@ -992,7 +992,7 @@ private:
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
throwUnsupported();
}
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
Expand Down

0 comments on commit 7014c32

Please sign in to comment.