diff --git a/db.go b/db.go index fee0481c8..0dc8f020c 100644 --- a/db.go +++ b/db.go @@ -109,7 +109,7 @@ type DB struct { lc *levelsController vlog valueLog writeCh chan *request - flushChan chan flushTask // For flushing memtables. + flushChan chan *memTable // For flushing memtables. closeOnce sync.Once // For closing DB only once. blockWrites atomic.Int32 @@ -240,7 +240,7 @@ func Open(opt Options) (*DB, error) { db := &DB{ imm: make([]*memTable, 0, opt.NumMemtables), - flushChan: make(chan flushTask, opt.NumMemtables), + flushChan: make(chan *memTable, opt.NumMemtables), writeCh: make(chan *request, kvWriteChCapacity), opt: opt, manifest: manifestFile, @@ -351,11 +351,11 @@ func Open(opt Options) (*DB, error) { db.closers.memtable = z.NewCloser(1) go func() { - _ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up. + db.flushMemtable(db.closers.memtable) // Need levels controller to be up. }() // Flush them to disk asap. for _, mt := range db.imm { - db.flushChan <- flushTask{mt: mt} + db.flushChan <- mt } } // We do increment nextTxnTs below. So, no need to do it here. @@ -568,12 +568,12 @@ func (db *DB) close() (err error) { } else { db.opt.Debugf("Flushing memtable") for { - pushedFlushTask := func() bool { + pushedMemTable := func() bool { db.lock.Lock() defer db.lock.Unlock() y.AssertTrue(db.mt != nil) select { - case db.flushChan <- flushTask{mt: db.mt}: + case db.flushChan <- db.mt: db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm. db.mt = nil // Will segfault if we try writing! db.opt.Debugf("pushed to flush chan\n") @@ -586,7 +586,7 @@ func (db *DB) close() (err error) { } return false }() - if pushedFlushTask { + if pushedMemTable { break } time.Sleep(10 * time.Millisecond) @@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error { } count += len(b.Entries) var i uint64 + var err error for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() { i++ if i%100 == 0 { @@ -987,7 +988,7 @@ func (db *DB) ensureRoomForWrite() error { } select { - case db.flushChan <- flushTask{mt: db.mt}: + case db.flushChan <- db.mt: db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n", db.mt.sl.MemSize(), len(db.flushChan)) // We manage to push this task. Let's modify imm. @@ -1009,12 +1010,12 @@ func arenaSize(opt Options) int64 { } // buildL0Table builds a new table from the memtable. -func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { - iter := ft.mt.sl.NewIterator() +func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder { defer iter.Close() + b := table.NewTableBuilder(bopts) - for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { + for iter.Rewind(); iter.Valid(); iter.Next() { + if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) { continue } vs := iter.Value() @@ -1024,23 +1025,15 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { } b.Add(iter.Key(), iter.Value(), vp.Len) } - return b -} -type flushTask struct { - mt *memTable - dropPrefixes [][]byte + return b } -// handleFlushTask must be run serially. -func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. - if ft.mt.sl.Empty() { - return nil - } - +// handleMemTableFlush must be run serially. +func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error { bopts := buildTableOptions(db) - builder := buildL0Table(ft, bopts) + itr := mt.sl.NewUniIterator(false) + builder := buildL0Table(itr, nil, bopts) defer builder.Close() // buildL0Table can return nil if the none of the items in the skiplist are @@ -1069,39 +1062,39 @@ func (db *DB) handleFlushTask(ft flushTask) error { return err } -// flushMemtable must keep running until we send it an empty flushTask. If there -// are errors during handling the flush task, we'll retry indefinitely. -func (db *DB) flushMemtable(lc *z.Closer) error { +// flushMemtable must keep running until we send it an empty memtable. If there +// are errors during handling the memtable flush, we'll retry indefinitely. +func (db *DB) flushMemtable(lc *z.Closer) { defer lc.Done() - for ft := range db.flushChan { - if ft.mt == nil { - // We close db.flushChan now, instead of sending a nil ft.mt. + for mt := range db.flushChan { + if mt == nil { continue } - for { - err := db.handleFlushTask(ft) - if err == nil { - // Update s.imm. Need a lock. - db.lock.Lock() - // This is a single-threaded operation. ft.mt corresponds to the head of - // db.imm list. Once we flush it, we advance db.imm. The next ft.mt - // which would arrive here would match db.imm[0], because we acquire a - // lock over DB when pushing to flushChan. - // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(ft.mt == db.imm[0]) - db.imm = db.imm[1:] - ft.mt.DecrRef() // Return memory. - db.lock.Unlock() - break + for { + if err := db.handleMemTableFlush(mt, nil); err != nil { + // Encountered error. Retry indefinitely. + db.opt.Errorf("error flushing memtable to disk: %v, retrying", err) + time.Sleep(time.Second) + continue } - // Encountered error. Retry indefinitely. - db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) - time.Sleep(time.Second) + + // Update s.imm. Need a lock. + db.lock.Lock() + // This is a single-threaded operation. mt corresponds to the head of + // db.imm list. Once we flush it, we advance db.imm. The next mt + // which would arrive here would match db.imm[0], because we acquire a + // lock over DB when pushing to flushChan. + // TODO: This logic is dirty AF. Any change and this could easily break. + y.AssertTrue(mt == db.imm[0]) + db.imm = db.imm[1:] + mt.DecrRef() // Return memory. + // unlock + db.lock.Unlock() + break } } - return nil } func exists(path string) (bool, error) { @@ -1521,10 +1514,10 @@ func (db *DB) startCompactions() { func (db *DB) startMemoryFlush() { // Start memory fluhser. if db.closers.memtable != nil { - db.flushChan = make(chan flushTask, db.opt.NumMemtables) + db.flushChan = make(chan *memTable, db.opt.NumMemtables) db.closers.memtable = z.NewCloser(1) go func() { - _ = db.flushMemtable(db.closers.memtable) + db.flushMemtable(db.closers.memtable) }() } } @@ -1627,7 +1620,7 @@ func (db *DB) prepareToDrop() (func(), error) { panic("Attempting to drop data in read-only mode.") } // In order prepare for drop, we need to block the incoming writes and - // write it to db. Then, flush all the pending flushtask. So that, we + // write it to db. Then, flush all the pending memtable. So that, we // don't miss any entries. if err := db.blockWrite(); err != nil { return nil, err @@ -1676,7 +1669,7 @@ func (db *DB) dropAll() (func(), error) { if err != nil { return f, err } - // prepareToDrop will stop all the incomming write and flushes any pending flush tasks. + // prepareToDrop will stop all the incomming write and flushes any pending memtables. // Before we drop, we'll stop the compaction because anyways all the datas are going to // be deleted. db.stopCompactions() @@ -1758,13 +1751,8 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error { memtable.DecrRef() continue } - task := flushTask{ - mt: memtable, - // Ensure that the head of value log gets persisted to disk. - dropPrefixes: filtered, - } db.opt.Debugf("Flushing memtable") - if err := db.handleFlushTask(task); err != nil { + if err := db.handleMemTableFlush(memtable, filtered); err != nil { db.opt.Errorf("While trying to flush memtable: %v", err) return err } diff --git a/db_test.go b/db_test.go index 660b71919..c299c152a 100644 --- a/db_test.go +++ b/db_test.go @@ -1464,7 +1464,7 @@ func TestGetSetDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20)) require.NoError(t, err) - defer db.Close() + defer func() { require.NoError(t, db.Close()) }() val := make([]byte, 1<<19) key := []byte("key1") @@ -1506,7 +1506,7 @@ func TestWriteDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20)) require.NoError(t, err) - defer db.Close() + defer func() { require.NoError(t, db.Close()) }() print := func(count *int) { *count++ if *count%100 == 0 { @@ -1886,7 +1886,7 @@ func ExampleOpen() { if err != nil { panic(err) } - defer db.Close() + defer func() { y.Check(db.Close()) }() err = db.View(func(txn *Txn) error { _, err := txn.Get([]byte("key")) @@ -1942,7 +1942,7 @@ func ExampleTxn_NewIterator() { if err != nil { panic(err) } - defer db.Close() + defer func() { y.Check(db.Close()) }() bkey := func(i int) []byte { return []byte(fmt.Sprintf("%09d", i)) @@ -1962,8 +1962,7 @@ func ExampleTxn_NewIterator() { } } - err = txn.Commit() - if err != nil { + if err := txn.Commit(); err != nil { panic(err) } @@ -1995,7 +1994,7 @@ func TestSyncForRace(t *testing.T) { db, err := Open(DefaultOptions(dir).WithSyncWrites(false)) require.NoError(t, err) - defer db.Close() + defer func() { require.NoError(t, db.Close()) }() closeChan := make(chan struct{}) doneChan := make(chan struct{}) @@ -2038,14 +2037,14 @@ func TestSyncForRace(t *testing.T) { func TestForceFlushMemtable(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger count not be created") + require.NoError(t, err, "temp dir for badger could not be created") ops := getTestOptions(dir) ops.ValueLogMaxEntries = 1 db, err := Open(ops) require.NoError(t, err, "error while openning db") - defer db.Close() + defer func() { require.NoError(t, db.Close()) }() for i := 0; i < 3; i++ { err = db.Update(func(txn *Txn) error { @@ -2179,7 +2178,7 @@ func TestMinCacheSize(t *testing.T) { func TestUpdateMaxCost(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger count not be created") + require.NoError(t, err, "temp dir for badger could not be created") defer os.RemoveAll(dir) ops := getTestOptions(dir). @@ -2286,7 +2285,7 @@ func TestOpenDBReadOnly(t *testing.T) { func TestBannedPrefixes(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger count not be created") + require.NoError(t, err, "temp dir for badger could not be created") defer os.RemoveAll(dir) opt := getTestOptions(dir).WithNamespaceOffset(3)