From 58ddc357d9b9e7e463916f2045269bac46536ba7 Mon Sep 17 00:00:00 2001 From: Dan Gallagher Date: Thu, 15 Feb 2024 15:49:38 -0500 Subject: [PATCH] make assigning rowid safe for concurrent access --- src/PendingInserts.zig | 36 ++----------- src/Table.zig | 111 ++++++++++++++++++++++---------------- src/row_group/Creator.zig | 22 ++++---- src/sqlite3/ChangeSet.zig | 6 --- 4 files changed, 81 insertions(+), 94 deletions(-) diff --git a/src/PendingInserts.zig b/src/PendingInserts.zig index bd23739..0d0e527 100644 --- a/src/PendingInserts.zig +++ b/src/PendingInserts.zig @@ -28,9 +28,6 @@ const SortKeyRange = index_mod.SortKeyRange; ctx: *const VtabCtx, -table_data: *TableData, -next_rowid: i64, - insert_stmt: StmtCell, cursor_from_start: StmtPool, cursor_from_key: StmtPool, @@ -42,29 +39,15 @@ const Self = @This(); const StmtCell = prep_stmt.Cell(VtabCtx); const StmtPool = prep_stmt.Pool(VtabCtx); -pub fn init( - allocator: Allocator, - tmp_arena: *ArenaAllocator, - ctx: *const VtabCtx, - table_data: *TableData, -) !Self { - var self = Self{ +pub fn init(allocator: Allocator, ctx: *const VtabCtx) !Self { + return Self{ .ctx = ctx, - .table_data = table_data, - .next_rowid = 1, .insert_stmt = StmtCell.init(&insertDml), .cursor_from_start = StmtPool.init(allocator, &cursorFromStartQuery), .cursor_from_key = StmtPool.init(allocator, &cursorFromKeyQuery), .delete_from = StmtCell.init(&deleteFromQuery), .delete_range = StmtCell.init(&deleteRangeQuery), }; - - // Initialize the rowid to the proper value if the table exists - if (try self.table().checkExists(tmp_arena)) { - try self.loadNextRowid(tmp_arena); - } - - return self; } pub fn deinit(self: *Self) void { @@ -143,10 +126,7 @@ pub fn table(self: Self) ShadowTable { return .{ .ctx = self.ctx }; } -pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, values: anytype) !i64 { - const rowid = self.next_rowid; - self.next_rowid += 1; - +pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, rowid: i64, values: anytype) !void { const stmt = try self.insert_stmt.acquire(tmp_arena, self.ctx.*); defer self.insert_stmt.release(); @@ -157,8 +137,6 @@ pub fn insert(self: *Self, tmp_arena: *ArenaAllocator, values: anytype) !i64 { } try stmt.exec(); - - return rowid; } fn insertDml(ctx: VtabCtx, arena: *ArenaAllocator) ![]const u8 { @@ -193,14 +171,6 @@ test "pending inserts: insert dml" { try testing.expectEqualSlices(u8, expected, result); } -pub fn persistNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void { - try self.table_data.writeInt(tmp_arena, .next_rowid, self.next_rowid); -} - -pub fn loadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void { - self.next_rowid = (try self.table_data.readInt(tmp_arena, .next_rowid)) orelse 1; -} - pub const Cursor = struct { stmt: Stmt, pool: ?*StmtPool, diff --git a/src/Table.zig b/src/Table.zig index ec84f86..d720f17 100644 --- a/src/Table.zig +++ b/src/Table.zig @@ -55,7 +55,15 @@ pending_inserts: PendingInserts, row_group_creator: RowGroupCreator, -dirty: bool, +/// In order to synchronize `next_rowid` across multiple connections to the same sqlite database, +/// it is loaded from the table data when the first insert in a transaction occurs. This is safe +/// because only one write transaction can be active in a sqlite database at a time. The in memory +/// value of `next_rowid` is incremented and used as additional inserts happen within a transaction +/// so that rowid generation is fast for subsequent inserts. When the transaction is about to +/// commit, `next_rowid` is persisted to the table data and cleared from memory. This process +/// allows `next_rowid` to be used as a "dirty" flag to see if any writes occurred in a +/// transaction. +next_rowid: ?i64, warned_update_delete_not_supported: bool = false, pub const InitError = error{ @@ -138,12 +146,7 @@ pub fn create( ); }; - self.pending_inserts = PendingInserts.init( - allocator, - cb_ctx.arena, - &self.ctx, - &self.table_data, - ) catch |e| { + self.pending_inserts = PendingInserts.init(allocator, &self.ctx) catch |e| { return cb_ctx.captureErrMsg(e, "failed to init pending inserts", .{}); }; errdefer self.pending_inserts.deinit(); @@ -167,7 +170,7 @@ pub fn create( return cb_ctx.captureErrMsg(e, "failed to init row group creator", .{}); }; - self.dirty = false; + self.next_rowid = null; } test "create table" { @@ -253,12 +256,7 @@ pub fn connect( return cb_ctx.captureErrMsg(e, "`{s}_rowgroupindex` shadow table does not exist", .{name}); }; - self.pending_inserts = PendingInserts.init( - allocator, - cb_ctx.arena, - &self.ctx, - &self.table_data, - ) catch |e| { + self.pending_inserts = PendingInserts.init(allocator, &self.ctx) catch |e| { return cb_ctx.captureErrMsg(e, "failed to init pending inserts", .{}); }; errdefer self.pending_inserts.deinit(); @@ -282,7 +280,7 @@ pub fn connect( return cb_ctx.captureErrMsg(e, "failed to init row group creator", .{}); }; - self.dirty = false; + self.next_rowid = null; } pub fn disconnect(self: *Self) void { @@ -358,6 +356,18 @@ pub fn rename(self: *Self, cb_ctx: *vtab.CallbackContext, new_name: [:0]const u8 return; } +pub fn bestIndex( + self: *Self, + cb_ctx: *vtab.CallbackContext, + best_index_info: vtab.BestIndexInfo, +) !bool { + index.chooseBestIndex(cb_ctx.arena, self.ctx.sortKey(), best_index_info) catch |e| { + return cb_ctx.captureErrMsg(e, "error occurred while choosing the best index", .{}); + }; + // There is always a query solution because a table scan always works + return true; +} + pub fn update( self: *Self, cb_ctx: *vtab.CallbackContext, @@ -366,10 +376,18 @@ pub fn update( ) !void { const change_type = change_set.changeType(); if (change_type == .Insert) { - rowid.* = self.pending_inserts.insert(cb_ctx.arena, change_set) catch |e| { + if (self.next_rowid == null) { + self.loadNextRowid(cb_ctx.arena) catch |e| { + return cb_ctx.captureErrMsg(e, "error loading the next rowid", .{}); + }; + } + rowid.* = self.next_rowid.?; + self.next_rowid.? += 1; + + self.pending_inserts.insert(cb_ctx.arena, rowid.*, change_set) catch |e| { return cb_ctx.captureErrMsg(e, "error inserting into pending inserts", .{}); }; - self.dirty = true; + return; } @@ -379,42 +397,27 @@ pub fn update( } } -pub fn bestIndex( - self: *Self, - cb_ctx: *vtab.CallbackContext, - best_index_info: vtab.BestIndexInfo, -) !bool { - index.chooseBestIndex(cb_ctx.arena, self.ctx.sortKey(), best_index_info) catch |e| { - return cb_ctx.captureErrMsg(e, "error occurred while choosing the best index", .{}); - }; - // There is always a query solution because a table scan always works - return true; -} - pub fn begin(_: *Self, _: *vtab.CallbackContext) !void { log.debug("txn begin", .{}); } pub fn sync(self: *Self, cb_ctx: *vtab.CallbackContext) !void { - _ = cb_ctx; - _ = self; + if (self.next_rowid) |_| { + try self.row_group_creator.createAll(cb_ctx.arena); + try self.unloadNextRowid(cb_ctx.arena); + } } pub fn commit(self: *Self, cb_ctx: *vtab.CallbackContext) !void { + _ = cb_ctx; + _ = self; log.debug("txn commit", .{}); - if (self.dirty) { - try self.pending_inserts.persistNextRowid(cb_ctx.arena); - // TODO should this be called in sync so that an error causes the transaction to be - // aborted? - try self.row_group_creator.createAll(cb_ctx.arena); - self.dirty = false; - } } -pub fn rollback(self: *Self, cb_ctx: *vtab.CallbackContext) !void { +pub fn rollback(self: *Self, _: *vtab.CallbackContext) !void { log.debug("txn rollback", .{}); - if (self.dirty) { - try self.pending_inserts.loadNextRowid(cb_ctx.arena); + if (self.next_rowid) |_| { + self.clearNextRowid(); } } @@ -424,18 +427,34 @@ pub fn savepoint(_: *Self, _: *vtab.CallbackContext, savepoint_id: i32) !void { pub fn release(self: *Self, cb_ctx: *vtab.CallbackContext, savepoint_id: i32) !void { log.debug("txn savepoint {d} release", .{savepoint_id}); - if (self.dirty) { - try self.pending_inserts.persistNextRowid(cb_ctx.arena); + if (self.next_rowid) |_| { + // TODO Is this necessary? Releasing the savepoint does not end the transaction so I don't + // think it is necessary to persist and clear the next rowid. + try self.unloadNextRowid(cb_ctx.arena); } } -pub fn rollbackTo(self: *Self, cb_ctx: *vtab.CallbackContext, savepoint_id: i32) !void { +pub fn rollbackTo(self: *Self, _: *vtab.CallbackContext, savepoint_id: i32) !void { log.debug("txn savepoint {d} rollback", .{savepoint_id}); - if (self.dirty) { - try self.pending_inserts.loadNextRowid(cb_ctx.arena); + if (self.next_rowid) |_| { + self.clearNextRowid(); } } +fn loadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void { + self.next_rowid = (try self.table_data.readInt(tmp_arena, .next_rowid)) orelse 1; +} + +/// Persists the next rowid and removes it from memory +fn unloadNextRowid(self: *Self, tmp_arena: *ArenaAllocator) !void { + try self.table_data.writeInt(tmp_arena, .next_rowid, self.next_rowid.?); + self.next_rowid = null; +} + +fn clearNextRowid(self: *Self) void { + self.next_rowid = null; +} + pub fn isShadowName(suffix: [:0]const u8) bool { log.debug("checking shadow name: {s}", .{suffix}); inline for (.{ TableData, BlobManager, SchemaManager, RowGroupIndex, PendingInserts }) |st| { diff --git a/src/row_group/Creator.zig b/src/row_group/Creator.zig index 358673c..f14f57c 100644 --- a/src/row_group/Creator.zig +++ b/src/row_group/Creator.zig @@ -615,15 +615,15 @@ test "row group: create single from pending inserts" { var row_group_index = Index.init(&ctx); defer row_group_index.deinit(); try row_group_index.table().create(&arena); - var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data); + var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx); errdefer pending_inserts.deinit(); try pending_inserts.table().create(&arena); const table_values = datasets.planets.fixed_data[0..4]; const rowids = [_]i64{ 1, 2, 4, 3 }; - for (table_values) |*row| { - _ = try pending_inserts.insert(&arena, MemoryTuple{ .values = row }); + for (table_values, 1..) |*row, rowid| { + _ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = row }); } var new_row_group: Self.NewRowGroup = undefined; @@ -698,15 +698,15 @@ test "row group: create all" { var row_group_index = Index.init(&ctx); defer row_group_index.deinit(); try row_group_index.table().create(&arena); - var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data); + var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx); errdefer pending_inserts.deinit(); try pending_inserts.table().create(&arena); const table_values = datasets.planets.fixed_data[0..4]; const rowids = [_]i64{ 1, 2, 4, 3 }; - for (table_values) |*row| { - _ = try pending_inserts.insert(&arena, MemoryTuple{ .values = row }); + for (table_values, 1..) |*row, rowid| { + _ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = row }); } { @@ -776,7 +776,7 @@ pub fn benchRowGroupCreate() !void { var row_group_index = Index.init(&ctx); defer row_group_index.deinit(); try row_group_index.table().create(&arena); - var pending_inserts = try PendingInserts.init(arena.allocator(), &arena, &ctx, &table_data); + var pending_inserts = try PendingInserts.init(arena.allocator(), &ctx); errdefer pending_inserts.deinit(); try pending_inserts.table().create(&arena); @@ -785,11 +785,14 @@ pub fn benchRowGroupCreate() !void { // Create a row group from pending inserts only (no merge) + var rowid: i64 = 1; + try conn.exec("BEGIN"); const start_insert = std.time.microTimestamp(); for (0..row_group_len) |_| { var row = datasets.planets.randomRecord(&prng); - _ = try pending_inserts.insert(&arena, MemoryTuple{ .values = &row }); + _ = try pending_inserts.insert(&arena, @intCast(rowid), MemoryTuple{ .values = &row }); + rowid += 1; } try conn.exec("COMMIT"); const end_insert = std.time.microTimestamp(); @@ -826,7 +829,8 @@ pub fn benchRowGroupCreate() !void { try conn.exec("BEGIN"); for (0..(row_group_len * 3)) |_| { var row = datasets.planets.randomRecord(&prng); - _ = try pending_inserts.insert(&arena, MemoryTuple{ .values = &row }); + _ = try pending_inserts.insert(&arena, rowid, MemoryTuple{ .values = &row }); + rowid += 1; } try conn.exec("COMMIT"); diff --git a/src/sqlite3/ChangeSet.zig b/src/sqlite3/ChangeSet.zig index 75340fd..6a65def 100644 --- a/src/sqlite3/ChangeSet.zig +++ b/src/sqlite3/ChangeSet.zig @@ -35,12 +35,6 @@ pub fn changeType(self: Self) ChangeType { return .Update; } -pub fn readRowid(self: Self) ValueRef { - return .{ - .value = self.values[1], - }; -} - /// Number of values in this change set (not including rowid). Should not be called when /// change type is `.Delete` pub fn valuesLen(self: Self) usize {