Skip to content

Commit

Permalink
ActorSqlite: pass alarm handler scheduled time when doing deferred al…
Browse files Browse the repository at this point in the history
…arm deletion for a handler
  • Loading branch information
jclee committed Sep 20, 2024
1 parent e6aa529 commit 62936f6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 13 deletions.
34 changes: 30 additions & 4 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ KJ_TEST("alarm scheduling does not start synchronously before nested explicit lo
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("synchronous alarm scheduling failure causes local db commit to fail synchronously") {
KJ_TEST("synchronous alarm scheduling failure causes local db commit to throw synchronously") {
ActorSqliteTest test({.monitorOutputGate = false});
auto promise = test.gate.onBroken();

Expand Down Expand Up @@ -496,7 +496,33 @@ KJ_TEST("getAlarm() returns null during handler") {
KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(none)"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(1ms)"})[0]->fulfill();
}

KJ_TEST("can attempt to arm alarm handler after handler completion") {
ActorSqliteTest test;

// Initialize alarm state to 1ms.
test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

{
auto armResult = test.actor.armAlarmHandler(oneMs, false);
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
test.pollAndExpectCalls({});
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(1ms)"})[0]->fulfill();

{
auto armResult = test.actor.armAlarmHandler(oneMs, false);
KJ_ASSERT(
KJ_ASSERT_NONNULL(armResult.tryGet<ActorSqlite::CancelAlarmHandler>()).waitBeforeCancel ==
kj::none);
}
}

KJ_TEST("alarm handler handle clears alarm when dropped with no writes") {
Expand All @@ -514,7 +540,7 @@ KJ_TEST("alarm handler handle clears alarm when dropped with no writes") {
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(none)"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(1ms)"})[0]->fulfill();
KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

Expand Down Expand Up @@ -573,7 +599,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler has no effect") {
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
}
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(none)"})[0]->fulfill();
test.pollAndExpectCalls({"cancelRun(1ms)"})[0]->fulfill();

test.actor.cancelDeferredAlarmDeletion();

Expand Down
44 changes: 36 additions & 8 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ kj::Promise<void> ActorSqlite::requestScheduledAlarm(kj::Maybe<kj::Date> request
}
}

kj::Promise<void> ActorSqlite::requestCancelAlarm(kj::Date timeToCancel) {
return hooks.cancelRun(timeToCancel).then([this]() {
lastConfirmedScheduledAlarm = kj::none;
});
}

ActorSqlite::PrecommitAlarmState ActorSqlite::startPrecommitAlarmScheduling() {
PrecommitAlarmState state;
state.localAlarmState = metadata.getAlarm();
Expand Down Expand Up @@ -265,13 +271,33 @@ kj::Promise<void> ActorSqlite::commitImpl(ActorSqlite::PrecommitAlarmState preco
auto commitCallbackPromise = commitCallback();
pendingCommit = kj::none;

// If the set of merged commits included a queued deferred delete, capture its info for the
// post-commit alarm deletion.
kj::Maybe<DeferredAlarmDelete> deferredAlarmDeleteForCommit;
KJ_IF_SOME(d, deferredAlarmDelete) {
if (d.deleteOnNextCommit) {
deferredAlarmDeleteForCommit = kj::mv(deferredAlarmDelete);
deferredAlarmDelete = kj::none;
}
}

// Wait for the db to persist.
co_await commitCallbackPromise;
lastConfirmedAlarmDbState = localAlarmState;

// Notify any merged commitImpl() requests that the db update completed.
fulfiller->fulfill();

// If the set of merged commits included a queued deferred delete and the local alarm state is
// still in the deleted state, tell the scheduler to delete the alarm for the handler's
// scheduled time.
KJ_IF_SOME(d, deferredAlarmDeleteForCommit) {
if (localAlarmState == kj::none && lastConfirmedScheduledAlarm != kj::none) {
commitTasks.add(requestCancelAlarm(d.timeToDelete));
co_return;
}
}

// If the db state is now later than the known-scheduled alarm, issue a request to update it to
// match the db state. We don't need to hold open the output gate, so we add the scheduling
// request to commitTasks.
Expand Down Expand Up @@ -302,7 +328,10 @@ void ActorSqlite::maybeDeleteDeferredAlarm() {
}
inAlarmHandler = false;

if (haveDeferredDelete) {
KJ_IF_SOME(d, deferredAlarmDelete) {
d.deleteOnNextCommit = true;
// Update sqlite alarm state to deleted, which will queue the state to be committed in an
// evalLater() promise.
metadata.setAlarm(kj::none);
}
}
Expand Down Expand Up @@ -336,7 +365,7 @@ kj::OneOf<kj::Maybe<kj::Date>, kj::Promise<kj::Maybe<kj::Date>>> ActorSqlite::ge
ReadOptions options) {
requireNotBroken();

if (haveDeferredDelete) {
if (deferredAlarmDelete != kj::none) {
// An alarm handler is currently running, and a new alarm time has not been set yet.
// We need to return that there is no alarm.
return kj::Maybe<kj::Date>(kj::none);
Expand Down Expand Up @@ -411,7 +440,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::setAlarm(
// TODO(soon): Need special logic to handle case where actor is using alarms without using
// other storage?

if (!haveDeferredDelete) {
if (deferredAlarmDelete == kj::none) {
// Skip setting alarm if alarm is already set to the given value.
//
// If we're in the alarm handler and haven't set the time yet, we can't perform this
Expand All @@ -428,7 +457,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::setAlarm(
}
}
metadata.setAlarm(newAlarmTime);
haveDeferredDelete = false;
deferredAlarmDelete = kj::none;
return kj::none;
}

Expand Down Expand Up @@ -548,7 +577,6 @@ void ActorSqlite::shutdown(kj::Maybe<const kj::Exception&> maybeException) {

kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSqlite::
armAlarmHandler(kj::Date scheduledTime, bool noCache) {
KJ_ASSERT(!haveDeferredDelete);
KJ_ASSERT(!inAlarmHandler);

auto localAlarmState = metadata.getAlarm();
Expand All @@ -572,10 +600,10 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
// There's a alarm write that hasn't been set yet pending for a time different than ours --
// We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete
// the pending write.
haveDeferredDelete = false;
deferredAlarmDelete = kj::none;
}
} else {
haveDeferredDelete = true;
deferredAlarmDelete = DeferredAlarmDelete{.timeToDelete = scheduledTime};
}
inAlarmHandler = true;

Expand All @@ -588,7 +616,7 @@ void ActorSqlite::cancelDeferredAlarmDeletion() {
// Pretty sure this can't happen.
LOG_WARNING_ONCE("expected to be in alarm handler when trying to cancel deleted alarm");
}
haveDeferredDelete = false;
deferredAlarmDelete = kj::none;
}

kj::Maybe<kj::Promise<void>> ActorSqlite::onNoPendingFlush() {
Expand Down
11 changes: 10 additions & 1 deletion src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// Within an alarm handler, we want the observable alarm state to look like the running alarm
// was deleted at the start of the handler (when armAlarmHandler() is called), but we don't
// actually want to persist that deletion until after the handler has successfully completed.
bool haveDeferredDelete = false;
struct DeferredAlarmDelete {
// Set to a time to pass as `timeToDelete` when making the delete call.
kj::Date timeToDelete;
bool deleteOnNextCommit = false;
};
kj::Maybe<DeferredAlarmDelete> deferredAlarmDelete;

// Some state only used for tracking calling invariants.
bool inAlarmHandler = false;
Expand All @@ -205,6 +210,10 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// when the request is confirmed.
kj::Promise<void> requestScheduledAlarm(kj::Maybe<kj::Date> requestedTime);

// Issues a request to the alarm scheduler to cancel an alarm for a specific time, if such an
// alarm is currently scheduled.
kj::Promise<void> requestCancelAlarm(kj::Date timeToCancel);

struct PrecommitAlarmState {
// Lets us avoid an extra read of db alarm state:
kj::Maybe<kj::Date> localAlarmState;
Expand Down

0 comments on commit 62936f6

Please sign in to comment.