Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend SqliteDatabase with a reset() method that recreates the database #2680

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,9 @@ jsg::Promise<void> DurableObjectStorage::deleteAll(
jsg::Lock& js, jsg::Optional<PutOptions> maybeOptions) {
auto options = configureOptions(kj::mv(maybeOptions).orDefault(PutOptions{}));

auto& context = IoContext::current();
{
// Log to get a sense of whether users are potentially depending on alarms being kept around
// after deleteAll is called.
auto getOptions = configureOptions(GetOptions{});
context.addTask(context
.awaitJs(js,
transformCacheResult(js, cache->getAlarm(getOptions), getOptions,
[](jsg::Lock&, kj::Maybe<kj::Date> alarmValue) {
if (alarmValue != kj::none) {
LOG_WARNING_PERIODICALLY("NOSENTRY deleteAll called with an alarm still set");
}
return alarmValue;
})).ignoreResult());
}

auto deleteAll = cache->deleteAll(options);

auto& context = IoContext::current();
context.addTask(updateStorageDeletes(context, currentActorMetrics(), kj::mv(deleteAll.count)));

return transformMaybeBackpressure(js, options, kj::mv(deleteAll.backpressure));
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,11 @@ export class DurableObjectExample {
} else if (req.url.endsWith('/streaming-ingestion')) {
await testStreamingIngestion(req, this.state.storage);
return Response.json({ ok: true });
} else if (req.url.endsWith('/deleteAll')) {
this.state.storage.put('counter', 888); // will be deleted
this.state.storage.deleteAll();
assert.strictEqual(await this.state.storage.get('counter'), undefined);
return Response.json({ ok: true });
}

throw new Error('unknown url: ' + req.url);
Expand Down Expand Up @@ -1251,6 +1256,11 @@ export default {

// Everything's still consistent.
assert.equal(await doReq('increment'), 3);

// Delete all: increments start over
await doReq('deleteAll');
assert.equal(await doReq('increment'), 1);
assert.equal(await doReq('increment'), 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a variation on this that also waits on these concurrently would be nice.

  const results = await Promise.all([
    doReq('deleteAll'),
    doReq('increment'),
    doReq('increment'),
  ]);
  assert.deepStrictEqual(results, ...);

Largely to make sure that operations are correctly sequenced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand... what would that test that is interesting here?

},
};

Expand Down
52 changes: 52 additions & 0 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ void ActorSqlite::ImplicitTxn::commit() {
}
}

void ActorSqlite::ImplicitTxn::rollback() {
// Ignore redundant commit()s.
if (!committed) {
// As of this writing, rollback() is only called when the database is about to be reset.
// Preparing a statement for it would be a waste since that statement would never be executed
// more than once, since resetting requires repreparing all statements anyway. So we don't
// bother.
parent.db->run("ROLLBACK TRANSACTION");
committed = true;
}
}

ActorSqlite::ExplicitTxn::ExplicitTxn(ActorSqlite& actorSqlite): actorSqlite(actorSqlite) {
KJ_SWITCH_ONEOF(actorSqlite.currentTxn) {
KJ_CASE_ONEOF(_, NoTxn) {
Expand Down Expand Up @@ -283,6 +295,46 @@ kj::Own<ActorCacheInterface::Transaction> ActorSqlite::startTransaction() {
ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions options) {
requireNotBroken();

// deleteAll() cannot be part of a transaction because it deletes the database altogether. So,
// we have to close our transactions or fail.
KJ_SWITCH_ONEOF(currentTxn) {
KJ_CASE_ONEOF(_, NoTxn) {
// good
}
KJ_CASE_ONEOF(implicit, ImplicitTxn*) {
// Whatever the implicit transaction did, it's about to be blown away anyway. Roll it back
// so we don't waste time flushing these writes anywhere.
implicit->rollback();
currentTxn = NoTxn();
}
KJ_CASE_ONEOF(exp, ExplicitTxn*) {
// Keep in mind:
//
// ctx.storage.transaction(txn => {
// txn.deleteAll(); // calls `DurableObjectTransaction::deleteAll()`
// ctx.storage.deleteAll(); // calls this method, `ActorSqlite::deleteAll()`
// });
//
// `DurableObjectTransaction::deleteAll()` throws this exception, since `deleteAll()` is not
// supported inside a transaction. Under the new SQLite-backed storage system, directly
// calling `cxt.storage` inside a transaction (as opposed to using the `txn` object) should
// still be treated as part of the transaction, and so should throw the same thing.
JSG_FAIL_REQUIRE(Error, "Cannot call deleteAll() within a transaction");
}
}

if (!deleteAllCommitScheduled) {
// We'll want to make sure the commit callback is called for the deleteAll().
commitTasks.add(outputGate.lockWhile(kj::evalLater([this]() mutable -> kj::Promise<void> {
// Don't commit if shutdown() has been called.
requireNotBroken();

deleteAllCommitScheduled = false;
return commitCallback();
})));
deleteAllCommitScheduled = true;
}

uint count = kv.deleteAll();
return {
.backpressure = kj::none,
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/io/actor-sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
Hooks& hooks = const_cast<Hooks&>(Hooks::DEFAULT));

bool isCommitScheduled() {
return !currentTxn.is<NoTxn>();
return !currentTxn.is<NoTxn>() || deleteAllCommitScheduled;
}

kj::Maybe<SqliteDatabase&> getSqliteDatabase() override {
Expand Down Expand Up @@ -101,6 +101,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
KJ_DISALLOW_COPY_AND_MOVE(ImplicitTxn);

void commit();
void rollback();

private:
ActorSqlite& parent;
Expand Down Expand Up @@ -156,6 +157,9 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
// transactions should be used in the meantime.
kj::OneOf<NoTxn, ImplicitTxn*, ExplicitTxn*> currentTxn = NoTxn();

// If true, then a commit is scheduled as a result of deleteAll() having been called.
bool deleteAllCommitScheduled = false;

kj::TaskSet commitTasks;

void onWrite();
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ std::default_random_engine makeSeededRandomEngine() {
} // namespace

AlarmScheduler::AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::PathPtr path)
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path)
: clock(clock),
timer(timer),
random(makeSeededRandomEngine()),
db([&] {
auto db = kj::heap<SqliteDatabase>(vfs, path,
auto db = kj::heap<SqliteDatabase>(vfs, kj::mv(path),
kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT);
ensureInitialized(*db);
return kj::mv(db);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {
using GetActorFn = kj::Function<kj::Own<WorkerInterface>(kj::String)>;

AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::PathPtr path);
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path);

kj::Maybe<kj::Date> getAlarm(ActorKey actor);
bool setAlarm(ActorKey actor, kj::Date scheduledTime);
Expand Down
7 changes: 5 additions & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1941,8 +1941,11 @@ public:
kj::Path({d.uniqueKey, kj::str(idPtr, ".sqlite")}),
kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT);

// Before we do anything, make sure the database is in WAL mode.
db->run("PRAGMA journal_mode=WAL;");
// Before we do anything, make sure the database is in WAL mode. We also need to
// do this after reset() is used, so register a callback for that.
auto setWalMode = [](SqliteDatabase& db) { db.run("PRAGMA journal_mode=WAL;"); };
setWalMode(*db);
db->afterReset(kj::mv(setWalMode));

return kj::heap<ActorSqlite>(kj::mv(db), outputGate,
[]() -> kj::Promise<void> { return kj::READY_NOW; }, *sqliteHooks)
Expand Down
21 changes: 20 additions & 1 deletion src/workerd/util/sqlite-kv-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,27 @@ KJ_TEST("SQLite-KV") {
kv.put("foo", "hello"_kj.asBytes());
KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "bar=def, foo=hello, qux=321");

kv.deleteAll();
// deleteAll()
KJ_EXPECT(kv.deleteAll() == 3);
KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "");

KJ_EXPECT(!kv.get("bar", [&](kj::ArrayPtr<const byte> value) {
KJ_FAIL_EXPECT("should not call callback when no match", value.asChars());
}));

kv.put("bar", "ghi"_kj.asBytes());
kv.put("corge", "garply"_kj.asBytes());

KJ_EXPECT(list(nullptr, kj::none, kj::none, F) == "bar=ghi, corge=garply");

{
bool called = false;
KJ_EXPECT(kv.get("bar", [&](kj::ArrayPtr<const byte> value) {
KJ_EXPECT(kj::str(value.asChars()) == "ghi");
called = true;
}));
KJ_EXPECT(called);
}
}

} // namespace
Expand Down
40 changes: 26 additions & 14 deletions src/workerd/util/sqlite-kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,33 @@

namespace workerd {

SqliteKv::SqliteKv(SqliteDatabase& db) {
SqliteKv::SqliteKv(SqliteDatabase& db): ResetListener(db) {
if (db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_KV'").isDone()) {
// The _cf_KV table doesn't exist. Defer initialization.
state = Uninitialized{db};
state.init<Uninitialized>(Uninitialized{});
} else {
// The KV table was initialized in the past. We can go ahead and prepare our statements.
// (We don't call ensureInitialized() here because the `CREATE TABLE IF NOT EXISTS` query it
// executes would be redundant.)
state = Initialized(db);
tableCreated = true;
state.init<Initialized>(db);
}
}

SqliteKv::Initialized& SqliteKv::ensureInitialized() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(uninitialized, Uninitialized) {
auto& db = uninitialized.db;
if (!tableCreated) {
db.run(R"(
CREATE TABLE IF NOT EXISTS _cf_KV (
key TEXT PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
)");

db.run(R"(
CREATE TABLE IF NOT EXISTS _cf_KV (
key TEXT PRIMARY KEY,
value BLOB
) WITHOUT ROWID;
)");
tableCreated = true;
}

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(uninitialized, Uninitialized) {
return state.init<Initialized>(db);
}
KJ_CASE_ONEOF(initialized, Initialized) {
Expand All @@ -49,8 +52,17 @@ bool SqliteKv::delete_(KeyPtr key) {
}

uint SqliteKv::deleteAll() {
auto query = ensureInitialized().stmtDeleteAll.run();
return query.changeCount();
// TODO(perf): Consider introducing a compatibility flag that causes deleteAll() to always return
// 1. Apps almost certainly don't care about the return value but historically we returned the
// count of keys deleted, so now we're stuck counting the table size for no good reason.
uint count = tableCreated ? ensureInitialized().stmtCountKeys.run().getInt(0) : 0;
db.reset();
return count;
}

void SqliteKv::beforeSqliteReset() {
// We'll need to recreate the table on the next operation.
tableCreated = false;
}

} // namespace workerd
20 changes: 13 additions & 7 deletions src/workerd/util/sqlite-kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace workerd {
// perform direct SQL queries, we can block it from accessing any table prefixed with `_cf_`.
// (Ideally this class would allow configuring the table name, but this would require a somewhat
// obnoxious amount of string allocation.)
class SqliteKv {
class SqliteKv: private SqliteDatabase::ResetListener {
public:
explicit SqliteKv(SqliteDatabase& db);

Expand Down Expand Up @@ -51,11 +51,11 @@ class SqliteKv {
// byte blobs or strings containing NUL bytes.

private:
struct Uninitialized {
SqliteDatabase& db;
};
struct Uninitialized {};

struct Initialized {
// This reference is redundant but storing it here makes the prepared statement code below
// easier to manage.
SqliteDatabase& db;

SqliteDatabase::Statement stmtGet = db.prepare(R"(
Expand Down Expand Up @@ -112,20 +112,24 @@ class SqliteKv {
ORDER BY key DESC
LIMIT ?
)");
SqliteDatabase::Statement stmtDeleteAll = db.prepare(R"(
DELETE FROM _cf_KV
SqliteDatabase::Statement stmtCountKeys = db.prepare(R"(
SELECT count(*) FROM _cf_KV
)");

Initialized(SqliteDatabase& db): db(db) {}
};

kj::OneOf<Uninitialized, Initialized> state;

// Has the _cf_KV table been created? This is separate from Uninitialized/Initialized since it
// has to be repeated after a reset, whereas the statements do not need to be recreated.
bool tableCreated = false;

Initialized& ensureInitialized();
// Make sure the KV table is created and prepared statements are ready. Not called until the
// first write.

SqliteKv(SqliteDatabase& db, bool);
void beforeSqliteReset() override;
};

// =======================================================================================
Expand All @@ -137,6 +141,7 @@ class SqliteKv {

template <typename Func>
bool SqliteKv::get(KeyPtr key, Func&& callback) {
if (!tableCreated) return 0;
auto& stmts = KJ_UNWRAP_OR(state.tryGet<Initialized>(), return false);

auto query = stmts.stmtGet.run(key);
Expand All @@ -152,6 +157,7 @@ bool SqliteKv::get(KeyPtr key, Func&& callback) {
template <typename Func>
uint SqliteKv::list(
KeyPtr begin, kj::Maybe<KeyPtr> end, kj::Maybe<uint> limit, Order order, Func&& callback) {
if (!tableCreated) return 0;
auto& stmts = KJ_UNWRAP_OR(state.tryGet<Initialized>(), return 0);

auto iterate = [&](SqliteDatabase::Query&& query) {
Expand Down
42 changes: 42 additions & 0 deletions src/workerd/util/sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -740,5 +740,47 @@ KJ_TEST("DELETE with LIMIT") {
KJ_EXPECT(q.getInt(0) == 3);
}

KJ_TEST("reset database") {
auto dir = kj::newInMemoryDirectory(kj::nullClock());
SqliteDatabase::Vfs vfs(*dir);
SqliteDatabase db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY);

db.run("PRAGMA journal_mode=WAL;");

db.run("CREATE TABLE things (id INTEGER PRIMARY KEY)");

db.run("INSERT INTO things VALUES (123)");
db.run("INSERT INTO things VALUES (321)");

auto stmt = db.prepare("SELECT * FROM things");

auto query = stmt.run();
KJ_ASSERT(!query.isDone());
KJ_EXPECT(query.getInt(0) == 123);

db.reset();
db.run("PRAGMA journal_mode=WAL;");

// The query was canceled.
KJ_EXPECT_THROW_MESSAGE("query canceled because reset()", query.nextRow());
KJ_EXPECT_THROW_MESSAGE("query canceled because reset()", query.getInt(0));

// The statement doesn't work because the table is gone.
KJ_EXPECT_THROW_MESSAGE("no such table: things: SQLITE_ERROR", stmt.run());

// But we can recreate it.
db.run("CREATE TABLE things (id INTEGER PRIMARY KEY)");
db.run("INSERT INTO things VALUES (456)");

// Now the statement works.
{
auto q2 = stmt.run();
KJ_ASSERT(!q2.isDone());
KJ_EXPECT(q2.getInt(0) == 456);
q2.nextRow();
KJ_EXPECT(q2.isDone());
}
}

} // namespace
} // namespace workerd
Loading