Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store alarm value in sqlite database, for sqlite-backed DOs #2648

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions src/workerd/api/actor-alarms-delete-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ export class DurableObjectExample {

async waitForAlarm(scheduledTime) {
let self = this;
let prom = new Promise((resolve) => {
self.resolve = resolve;
});
const { promise, resolve, reject } = Promise.withResolvers();
self.resolve = resolve;
self.reject = reject;

try {
await prom;
await promise;
if (Date.now() < scheduledTime.valueOf()) {
throw new Error(
`Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}`
Expand All @@ -38,22 +38,26 @@ export class DurableObjectExample {
}

async alarm() {
this.state.alarmsTriggered++;
let time = await this.state.storage.getAlarm();
if (time) {
throw new Error(`time not null inside alarm handler ${time}`);
}
// Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm
// already.
await this.state.storage.deleteAlarm();
try {
this.state.alarmsTriggered++;
let time = await this.state.storage.getAlarm();
if (time !== null) {
throw new Error(`time not null inside alarm handler ${time}`);
}
// Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm
// already.
await this.state.storage.deleteAlarm();

// On the other hand, if we have an alarm queued, it will be deleted. If this is working properly,
// we'll only have one alarm triggered.
await this.state.storage.setAlarm(Date.now() + 50);
await this.state.storage.deleteAlarm();
// On the other hand, if we have an alarm queued, it will be deleted. If this is working properly,
// we'll only have one alarm triggered.
await this.state.storage.setAlarm(Date.now() + 50);
await this.state.storage.deleteAlarm();

// All done inside `alarm()`.
this.resolve();
// All done inside `alarm()`.
this.resolve();
} catch (e) {
this.reject(e);
}
}

async fetch() {
Expand Down
20 changes: 12 additions & 8 deletions src/workerd/api/actor-alarms-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ export class DurableObjectExample {

async waitForAlarm(scheduledTime) {
let self = this;
let prom = new Promise((resolve) => {
self.resolve = resolve;
});
const { promise, resolve, reject } = Promise.withResolvers();
self.resolve = resolve;
self.reject = reject;

try {
await prom;
await promise;
if (Date.now() < scheduledTime.valueOf()) {
throw new Error(
`Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}`
Expand Down Expand Up @@ -45,11 +45,15 @@ export class DurableObjectExample {
}

async alarm() {
let time = await this.state.storage.getAlarm();
if (time) {
throw new Error(`time not null inside alarm handler ${time}`);
try {
let time = await this.state.storage.getAlarm();
if (time !== null) {
throw new Error(`time not null inside alarm handler ${time}`);
}
this.resolve();
} catch (e) {
this.reject(e);
}
this.resolve();
}
}

Expand Down
225 changes: 115 additions & 110 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -385,112 +385,73 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
auto& context = IoContext::current();
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
auto maybeDeferredDelete = persistent.armAlarmHandler(scheduledTime);

KJ_IF_SOME(deferredDelete, maybeDeferredDelete) {
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.alarm == kj::none) {
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(scheduledTime)) {
KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) {
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.alarm == kj::none) {

lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, "
"did you remember to export an alarm() function?");
return WorkerInterface::AlarmResult{
.retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND};
}
lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, "
"did you remember to export an alarm() function?");
return WorkerInterface::AlarmResult{
.retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND};
}

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

return context
.run([exportedHandler, &context, timeout, retryCount, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)](
Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then(
[&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();

LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// Report alarm handler failure and log it.
auto e = KJ_EXCEPTION(OVERLOADED,
"broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time");
context.getMetrics().reportFailure(e);

// We don't want the handler to keep running after timeout.
context.abort(kj::mv(e));
// We want timed out alarms to be treated as user errors. As such, we'll mark them as
// retriable, and we'll count the retries against the alarm retries limit. This will ensure
// that the handler will attempt to run for a number of times before giving up and deleting
// the alarm.
return WorkerInterface::AlarmResult{
.retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU};
});

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

return context
.run([exportedHandler, &context, timeout, retryCount, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)](
Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then(
[&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
return alarm(lock, jsg::alloc<AlarmInvocationInfo>(retryCount))
.then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK};
}).exclusiveJoin(kj::mv(timeoutPromise));
})
.catch_([&context, deferredDelete = kj::mv(armResult.deferredDelete)](
kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();

LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// Report alarm handler failure and log it.
auto e = KJ_EXCEPTION(OVERLOADED,
"broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time");
context.getMetrics().reportFailure(e);

// We don't want the handler to keep running after timeout.
context.abort(kj::mv(e));
// We want timed out alarms to be treated as user errors. As such, we'll mark them as
// retriable, and we'll count the retries against the alarm retries limit. This will ensure
// that the handler will attempt to run for a number of times before giving up and deleting
// the alarm.
return WorkerInterface::AlarmResult{
.retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU};
});

return alarm(lock, jsg::alloc<AlarmInvocationInfo>(retryCount))
.then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK};
}).exclusiveJoin(kj::mv(timeoutPromise));
})
.catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
// This will include the error in inspector/tracers and log to syslog if internal.
context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e));

context.getMetrics().reportFailure(e);

// This will include the error in inspector/tracers and log to syslog if internal.
context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e));

EventOutcome outcome = EventOutcome::EXCEPTION;
KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}

kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
actorId = f->toString();
EventOutcome outcome = EventOutcome::EXCEPTION;
KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}
KJ_CASE_ONEOF(s, kj::String) {
actorId = kj::str(s);
}
}

// We only want to retry against limits if it's a user error. By default let's check if the
// output gate is broken.
auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken();

// We want to alert if we aren't going to count this alarm retry against limits
if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) &&
!jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) {
LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e);
} else if (context.isOutputGateBroken()) {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke during alarm execution without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
// retry forever.
shouldRetryCountsAgainstLimits = true;
}
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = outcome};
})
.then(
[&context](
WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then(
[result]() { return kj::mv(result); }, [&context](kj::Exception&& e) {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
Expand All @@ -500,21 +461,20 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
actorId = kj::str(s);
}
}
// We only want to retry against limits if it's a user error. By default let's assume it's our
// fault.
auto shouldRetryCountsAgainstLimits = false;
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
if (isInterestingException(e)) {
LOG_EXCEPTION("alarmOutputLock"_kj, e);
} else {
LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e);
}
} else {

// We only want to retry against limits if it's a user error. By default let's check if the
// output gate is broken.
auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken();

// We want to alert if we aren't going to count this alarm retry against limits
if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) &&
!jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) {
LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e);
} else if (context.isOutputGateBroken()) {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke after executing alarm without an interesting error description",
"output lock broke during alarm execution without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
Expand All @@ -524,12 +484,57 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = EventOutcome::EXCEPTION};
.outcome = outcome};
})
.then([&context](WorkerInterface::AlarmResult result)
-> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then(
[result]() { return kj::mv(result); }, [&context](kj::Exception&& e) {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
actorId = f->toString();
}
KJ_CASE_ONEOF(s, kj::String) {
actorId = kj::str(s);
}
}
// We only want to retry against limits if it's a user error. By default let's assume it's our
// fault.
auto shouldRetryCountsAgainstLimits = false;
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
if (isInterestingException(e)) {
LOG_EXCEPTION("alarmOutputLock"_kj, e);
} else {
LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e);
}
} else {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke after executing alarm without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
// retry forever.
shouldRetryCountsAgainstLimits = true;
}
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = EventOutcome::EXCEPTION};
});
});
});
} else {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED};
}
KJ_CASE_ONEOF(armResult, ActorCacheInterface::CancelAlarmHandler) {
return armResult.waitBeforeCancel.then([]() {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED};
});
}
}
KJ_UNREACHABLE;
}

jsg::Promise<void> ServiceWorkerGlobalScope::test(
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ kj_test(
],
)

kj_test(
src = "actor-sqlite-test.c++",
deps = [
":actor",
":io-gate",
"//src/workerd/util:test",
"//src/workerd/util:test-util",
],
)

kj_test(
src = "promise-wrapper-test.c++",
deps = [":io"],
Expand Down
Loading