Skip to content

Commit

Permalink
WIP rework state tracking, use local db queries instead of manual cac…
Browse files Browse the repository at this point in the history
…hing
  • Loading branch information
jclee committed Sep 16, 2024
1 parent 7d125e9 commit 3d8640b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 153 deletions.
1 change: 1 addition & 0 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler edge case") {

{ auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); }
test.actor.cancelDeferredAlarmDeletion();
test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();

KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
Expand Down
186 changes: 54 additions & 132 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,6 @@

namespace workerd {

kj::StringPtr KJ_STRINGIFY(ActorSqlite::KnownAlarmTime::Status status) {
switch (status) {
case ActorSqlite::KnownAlarmTime::Status::CLEAN:
return "clean";
case ActorSqlite::KnownAlarmTime::Status::DIRTY:
return "dirty";
case ActorSqlite::KnownAlarmTime::Status::FLUSHING:
return "flushing";
}
KJ_UNREACHABLE;
}

kj::StringPtr KJ_STRINGIFY(ActorSqlite::DeferredAlarmDelete::Status status) {
switch (status) {
case ActorSqlite::DeferredAlarmDelete::Status::WAITING:
return "waiting";
case ActorSqlite::DeferredAlarmDelete::Status::READY:
return "ready";
case ActorSqlite::DeferredAlarmDelete::Status::FLUSHING:
return "flushing";
}
KJ_UNREACHABLE;
}

ActorSqlite::ActorSqlite(kj::Own<SqliteDatabase> dbParam,
OutputGate& outputGate,
kj::Function<kj::Promise<void>()> commitCallback,
Expand All @@ -46,10 +22,8 @@ ActorSqlite::ActorSqlite(kj::Own<SqliteDatabase> dbParam,
metadata(*db),
commitTasks(*this) {
db->onWrite(KJ_BIND_METHOD(*this, onWrite));
currentAlarmTime = KnownAlarmTime{
.status = KnownAlarmTime::Status::CLEAN,
.time = metadata.getAlarm(),
};
// TODO(now): should probably populate this asynchronously from alarm manager:
lastConfirmedAlarmDbState = metadata.getAlarm();
}

ActorSqlite::ImplicitTxn::ImplicitTxn(ActorSqlite& parent): parent(parent) {
Expand Down Expand Up @@ -203,26 +177,10 @@ void ActorSqlite::onWrite() {
}

kj::Maybe<kj::Promise<void>> ActorSqlite::schedulePrecommitAlarm() {
kj::Maybe<kj::Date> newAlarmTime;
bool needsAlarmFlush = false;
KJ_SWITCH_ONEOF(currentAlarmTime) {
KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) {
if (knownAlarmTime.status == KnownAlarmTime::Status::DIRTY) {
knownAlarmTime.status = KnownAlarmTime::Status::FLUSHING;
needsAlarmFlush = true;
newAlarmTime = knownAlarmTime.time;
}
}
KJ_CASE_ONEOF(deferredDelete, DeferredAlarmDelete) {
if (deferredDelete.status == DeferredAlarmDelete::Status::READY) {
deferredDelete.status = DeferredAlarmDelete::Status::FLUSHING;
needsAlarmFlush = true;
newAlarmTime = kj::none;
}
}
}
if (needsAlarmFlush) {
return hooks.scheduleRun(newAlarmTime);
auto dirtyAlarmState = metadata.getAlarm();
if (dirtyAlarmState != lastPrecommitAlarmState) {
lastPrecommitAlarmState = dirtyAlarmState;
return hooks.scheduleRun(dirtyAlarmState);
} else {
return kj::none;
}
Expand All @@ -234,27 +192,13 @@ kj::Promise<void> ActorSqlite::commitImpl(kj::Maybe<kj::Promise<void>> precommit

KJ_IF_SOME(p, precommitAlarmPromise) {
// TODO(soon): fix sequencing of alarm scheduling vs. commitCallback().
auto dirtyAlarmState = lastPrecommitAlarmState;
co_await p;
co_await commitCallback();
lastConfirmedAlarmDbState = dirtyAlarmState;
} else {
co_await commitCallback();
}

KJ_SWITCH_ONEOF(currentAlarmTime) {
KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) {
if (knownAlarmTime.status == KnownAlarmTime::Status::FLUSHING) {
knownAlarmTime.status = KnownAlarmTime::Status::CLEAN;
}
}
KJ_CASE_ONEOF(deferredDelete, DeferredAlarmDelete) {
if (deferredDelete.status == DeferredAlarmDelete::Status::FLUSHING) {
currentAlarmTime = KnownAlarmTime{
.status = KnownAlarmTime::Status::CLEAN,
.time = kj::none,
};
}
}
}
}

void ActorSqlite::taskFailed(kj::Exception&& exception) {
Expand All @@ -273,12 +217,13 @@ void ActorSqlite::requireNotBroken() {
}

void ActorSqlite::maybeDeleteDeferredAlarm() {
KJ_IF_SOME(d, currentAlarmTime.tryGet<DeferredAlarmDelete>()) {
// Pretty sure this can't happen; just logging for now, could eventually be made an assert.
if (d.status != DeferredAlarmDelete::Status::WAITING) {
LOG_WARNING_ONCE("unexpected status when trying to delete alarm", d.status);
}
d.status = DeferredAlarmDelete::Status::READY;
if (!inAlarmHandler) {
// Pretty sure this can't happen.
LOG_WARNING_ONCE("expected to be in alarm handler when trying to delete alarm");
}
inAlarmHandler = false;

if (haveDeferredDelete) {
metadata.setAlarm(kj::none);
}
}
Expand Down Expand Up @@ -312,15 +257,12 @@ kj::OneOf<kj::Maybe<kj::Date>, kj::Promise<kj::Maybe<kj::Date>>> ActorSqlite::ge
ReadOptions options) {
requireNotBroken();

KJ_SWITCH_ONEOF(currentAlarmTime) {
KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) {
return knownAlarmTime.time;
}
KJ_CASE_ONEOF(_, DeferredAlarmDelete) {
// An alarm handler is currently running, and a new alarm time has not been set yet.
// We need to return that there is no alarm.
return kj::Maybe<kj::Date>(kj::none);
}
if (haveDeferredDelete) {
// An alarm handler is currently running, and a new alarm time has not been set yet.
// We need to return that there is no alarm.
return kj::Maybe<kj::Date>(kj::none);
} else {
return metadata.getAlarm();
}
KJ_UNREACHABLE;
}
Expand Down Expand Up @@ -390,27 +332,24 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::setAlarm(
// TODO(soon): Need special logic to handle case where actor is using alarms without using
// other storage?

KJ_IF_SOME(t, currentAlarmTime.tryGet<KnownAlarmTime>()) {
// If we're in the alarm handler and haven't set the time yet,
// we can't perform this optimization as currentAlarmTime will be equal
// to the currently running time but we indicate to the actor in getAlarm() that there
// is no alarm set, therefore we need to act like that in setAlarm().
if (!haveDeferredDelete) {
// Skip setting alarm if alarm is already set to the given value.
//
// After the first write in the handler occurs, which would set KnownAlarmTime,
// the logic here is correct again as currentAlarmTime would match what we are reporting
// to the user from getAlarm().
// If we're in the alarm handler and haven't set the time yet, we can't perform this
// optimization as the current alarm time may be equal to the requested time but we indicate
// to the actor in getAlarm() that there is no alarm set, therefore we need to act like that
// in setAlarm().
//
// So, we only apply this for KnownAlarmTime.

if (t.time == newAlarmTime) {
// After the first write in the handler occurs, the logic here is correct again as the current
// alarm time would match what we are reporting to the user from getAlarm().
//
// TODO(now): optimization no longer useful in sqlite?
if (metadata.getAlarm() == newAlarmTime) {
return kj::none;
}
}
metadata.setAlarm(newAlarmTime);
currentAlarmTime = KnownAlarmTime{
.status = ActorSqlite::KnownAlarmTime::Status::DIRTY,
.time = newAlarmTime,
};
haveDeferredDelete = false;
return kj::none;
}

Expand Down Expand Up @@ -472,53 +411,36 @@ void ActorSqlite::shutdown(kj::Maybe<const kj::Exception&> maybeException) {
}

kj::Maybe<kj::Own<void>> ActorSqlite::armAlarmHandler(kj::Date scheduledTime, bool noCache) {
KJ_ASSERT(!currentAlarmTime.is<DeferredAlarmDelete>());

bool alarmDeleteNeeded = true;
KJ_IF_SOME(t, currentAlarmTime.tryGet<KnownAlarmTime>()) {
if (t.time != scheduledTime) {
if (t.status == KnownAlarmTime::Status::CLEAN) {
// If there's a clean scheduledTime that is different from ours, this run should be
// canceled.
return kj::none;
} else {
// There's a alarm write that hasn't been set yet pending for a time different than ours --
// We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete
// the pending write.
alarmDeleteNeeded = false;
}
KJ_ASSERT(!haveDeferredDelete);
KJ_ASSERT(!inAlarmHandler);

if (metadata.getAlarm() != scheduledTime) {
if (metadata.getAlarm() == lastConfirmedAlarmDbState) {
// If there's a clean scheduledTime that is different from ours, this run should be
// canceled.
// TODO(now): should probably also check that no other db requests are in-flight?
return kj::none;
} else {
// There's a alarm write that hasn't been set yet pending for a time different than ours --
// We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete
// the pending write.
haveDeferredDelete = false;
}
} else {
haveDeferredDelete = true;
}

// TODO(now): Here, similar to the ActorCache code, we're assuming that if the alarm time
// matches but is not in a clean state (i.e. is dirty or flushing), that it's OK to schedule the
// delete, and that it's OK to use the current alarm time to initialize the alarm state back to
// "clean" if the alarm is cancelled. Is that OK?

if (alarmDeleteNeeded) {
currentAlarmTime = DeferredAlarmDelete{
.status = DeferredAlarmDelete::Status::WAITING,
.timeToDelete = scheduledTime,
};
}
inAlarmHandler = true;

static const DeferredAlarmDeleter disposer;
return kj::Own<void>(this, disposer);
}

void ActorSqlite::cancelDeferredAlarmDeletion() {
KJ_IF_SOME(d, currentAlarmTime.tryGet<DeferredAlarmDelete>()) {
if (d.status != DeferredAlarmDelete::Status::WAITING) {
// Pretty sure this can't happen.
LOG_WARNING_ONCE("unexpected status when trying to cancel deleted alarm", d.status);
}
// Unsure if it's necessarily correct to reset status to CLEAN here, but it's worked OK for
// ActorCache so far.
currentAlarmTime = KnownAlarmTime{
.status = KnownAlarmTime::Status::CLEAN,
.time = metadata.getAlarm(),
};
if (!inAlarmHandler) {
// Pretty sure this can't happen.
LOG_WARNING_ONCE("expected to be in alarm handler when trying to cancel deleted alarm");
}
haveDeferredDelete = false;
}

kj::Maybe<kj::Promise<void>> ActorSqlite::onNoPendingFlush() {
Expand Down
31 changes: 10 additions & 21 deletions src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,24 +179,16 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// was deleted at the start of the handler (when armAlarmHandler() is called), but we don't
// actually want to persist that deletion until after the handler has successfully completed
// (DeferredAlarmDeleter freed prior to any setAlarm() or cancelDeferredAlarmDeletion() calls).
struct KnownAlarmTime {
enum class Status {
CLEAN, // Alarm time has been committed
DIRTY, // Alarm time has been changed and not yet committed
FLUSHING // Alarm time is being committed in an ongoing transaction
} status;
kj::Maybe<kj::Date> time;
};
struct DeferredAlarmDelete {
enum class Status {
WAITING, // Alarm handler is running, and alarm value has not changed
READY, // Alarm handler completed, deletion is pending, but has not yet been committed
FLUSHING // Alarm deletion is being committed in an ongoing transaction
} status;
kj::Date timeToDelete;
// TODO(correctness): ActorCache tracks a "wasDeleted" flag; needed here too?
};
kj::OneOf<KnownAlarmTime, DeferredAlarmDelete> currentAlarmTime;
bool haveDeferredDelete = false;

// Some state only used for tracking calling invariants.
bool inAlarmHandler = false;

// The state the local alarm db was in when we started the last commit.
kj::Maybe<kj::Date> lastPrecommitAlarmState;

// The alarm state for which we last received confirmation that the db was durably stored.
kj::Maybe<kj::Date> lastConfirmedAlarmDbState;

kj::TaskSet commitTasks;

Expand All @@ -215,9 +207,6 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// Called when DeferredAlarmDeleter is destroyed, to delete alarm if not reset or cancelled
// during handler.
void maybeDeleteDeferredAlarm();

friend kj::StringPtr KJ_STRINGIFY(KnownAlarmTime::Status status);
friend kj::StringPtr KJ_STRINGIFY(DeferredAlarmDelete::Status status);
};

} // namespace workerd

0 comments on commit 3d8640b

Please sign in to comment.