diff --git a/table.go b/table.go index 0a1b31c..1402e93 100644 --- a/table.go +++ b/table.go @@ -216,10 +216,6 @@ func (t *genTable[Obj]) RegisterInitializer(txn WriteTxn, name string) func(Writ slices.Clone(table.pendingInitializers), func(n string) bool { return n == name }, ) - if !table.initialized && len(table.pendingInitializers) == 0 { - close(table.initWatchChan) - table.initialized = true - } } }) } diff --git a/txn.go b/txn.go index ffc51c9..bc4fb48 100644 --- a/txn.go +++ b/txn.go @@ -459,9 +459,18 @@ func (txn *txn) Commit() ReadTxn { root := *db.root.Load() root = slices.Clone(root) + var initChansToClose []chan struct{} + // Insert the modified tables into the root tree of tables. for pos, table := range txn.modifiedTables { if table != nil { + // Check if tables become initialized. We close the channel only after + // we've swapped in the new root so that one cannot get a snapshot of + // an uninitialized table after observing the channel closing. + if !table.initialized && len(table.pendingInitializers) == 0 { + initChansToClose = append(initChansToClose, table.initWatchChan) + table.initialized = true + } root[pos] = *table } } @@ -480,6 +489,11 @@ func (txn *txn) Commit() ReadTxn { txn.Notify() } + // Notify table initializations + for _, ch := range initChansToClose { + close(ch) + } + txn.db.metrics.WriteTxnDuration( txn.handle, txn.tableNames,