Skip to content

Commit

Permalink
fixup! sqlite actor alarms: store alarm time in sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
jclee committed Sep 17, 2024
1 parent a7edf79 commit 5a8ac6b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
29 changes: 15 additions & 14 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::ExplicitTxn::commit() {
"nested txn is outstanding");

// Start the schedule request before root transaction commit(), for correctness in workerd.
kj::Maybe<kj::Promise<void>> precommitAlarmPromise;
kj::Maybe<PrecommitAlarmState> precommitAlarmState;
if (parent == kj::none) {
precommitAlarmPromise = actorSqlite.startPrecommitAlarmScheduling();
precommitAlarmState = actorSqlite.startPrecommitAlarmScheduling();
}

actorSqlite.db->run(SqliteDatabase::TRUSTED, kj::str("RELEASE _cf_savepoint_", depth));
Expand All @@ -148,8 +148,8 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::ExplicitTxn::commit() {
if (parent == kj::none) {
// We committed the root transaction, so it's time to signal any replication layer and lock
// the output gate in the meantime.
actorSqlite.commitTasks.add(
actorSqlite.outputGate.lockWhile(actorSqlite.commitImpl(kj::mv(precommitAlarmPromise))));
actorSqlite.commitTasks.add(actorSqlite.outputGate.lockWhile(
actorSqlite.commitImpl(kj::mv(KJ_ASSERT_NONNULL(precommitAlarmState)))));
}

// No backpressure for SQLite.
Expand Down Expand Up @@ -210,20 +210,21 @@ kj::Promise<void> ActorSqlite::requestScheduledAlarm(kj::Maybe<kj::Date> request
lastConfirmedScheduledAlarm = requestedTime;
}

kj::Maybe<kj::Promise<void>> ActorSqlite::startPrecommitAlarmScheduling() {
auto localAlarmState = metadata.getAlarm();
if (pendingCommit == kj::none && willFireEarlier(localAlarmState, lastConfirmedScheduledAlarm)) {
ActorSqlite::PrecommitAlarmState ActorSqlite::startPrecommitAlarmScheduling() {
PrecommitAlarmState state;
state.localAlarmState = metadata.getAlarm();
if (pendingCommit == kj::none &&
willFireEarlier(state.localAlarmState, lastConfirmedScheduledAlarm)) {
// Basically, this is the first scheduling request that commitImpl() would make prior to
// commitCallback(). We start the request separately, ahead of calling sqlite functions that
// commit to local disk, for correctness in workerd, where alarm scheduling and db commits are
// both synchronous.
return requestScheduledAlarm(localAlarmState);
} else {
return kj::none;
state.schedulingPromise = requestScheduledAlarm(state.localAlarmState);
}
return kj::mv(state);
}

kj::Promise<void> ActorSqlite::commitImpl(kj::Maybe<kj::Promise<void>> precommitAlarmPromise) {
kj::Promise<void> ActorSqlite::commitImpl(ActorSqlite::PrecommitAlarmState precommitAlarmState) {
// We assume that exceptions thrown during commit will propagate to the caller, such that they
// will ensure cancelDeferredAlarmDeletion() is called, if necessary.

Expand All @@ -235,12 +236,12 @@ kj::Promise<void> ActorSqlite::commitImpl(kj::Maybe<kj::Promise<void>> precommit
pendingCommit = promise.fork();

// Wait for any precommit alarm scheduling work to complete.
KJ_IF_SOME(p, precommitAlarmPromise) {
auto localAlarmState = precommitAlarmState.localAlarmState;
KJ_IF_SOME(p, precommitAlarmState.schedulingPromise) {
co_await p;
localAlarmState = metadata.getAlarm();
}

// TODO(perf): might be able to reduce alarm reads with more state tracking?
auto localAlarmState = metadata.getAlarm();
while (willFireEarlier(localAlarmState, lastConfirmedScheduledAlarm)) {
co_await requestScheduledAlarm(localAlarmState);
localAlarmState = metadata.getAlarm();
Expand Down
19 changes: 15 additions & 4 deletions src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,22 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// when the request is confirmed.
kj::Promise<void> requestScheduledAlarm(kj::Maybe<kj::Date> requestedTime);

// If the alarm notification needs to be updated prior to a commit, issues the scheduling
// request synchronously and returns a promise for its completion. Otherwise returns none.
kj::Maybe<kj::Promise<void>> startPrecommitAlarmScheduling();
struct PrecommitAlarmState {
// Lets us avoid an extra read of db alarm state:
kj::Maybe<kj::Date> localAlarmState;

kj::Promise<void> commitImpl(kj::Maybe<kj::Promise<void>> precommitAlarmPromise);
// Promise for the completion of precommit alarm scheduling
kj::Maybe<kj::Promise<void>> schedulingPromise;
};

// To be called just before committing the local sqlite db, to synchronously start any necessary
// alarm scheduling:
PrecommitAlarmState startPrecommitAlarmScheduling();

// Performs the rest of the asynchronous commit, to be waited on after committing the local
// sqlite db. Should be called in the same turn of the event loop as
// startPrecommitAlarmScheduling() and passed the state that it returned.
kj::Promise<void> commitImpl(PrecommitAlarmState precommitAlarmState);

void taskFailed(kj::Exception&& exception) override;

Expand Down

0 comments on commit 5a8ac6b

Please sign in to comment.