Skip to content

Commit

Permalink
chore(memtable): refactor code for memtable flush (#1866)
Browse files Browse the repository at this point in the history
removes the `flushTask` struct that was used in two different contexts.
  • Loading branch information
mangalaman93 authored Mar 3, 2023
1 parent 38c1c0e commit 94d6168
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 71 deletions.
108 changes: 48 additions & 60 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type DB struct {
lc *levelsController
vlog valueLog
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
flushChan chan *memTable // For flushing memtables.
closeOnce sync.Once // For closing DB only once.

blockWrites atomic.Int32
Expand Down Expand Up @@ -240,7 +240,7 @@ func Open(opt Options) (*DB, error) {

db := &DB{
imm: make([]*memTable, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
flushChan: make(chan *memTable, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
Expand Down Expand Up @@ -351,11 +351,11 @@ func Open(opt Options) (*DB, error) {

db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- flushTask{mt: mt}
db.flushChan <- mt
}
}
// We do increment nextTxnTs below. So, no need to do it here.
Expand Down Expand Up @@ -568,12 +568,12 @@ func (db *DB) close() (err error) {
} else {
db.opt.Debugf("Flushing memtable")
for {
pushedFlushTask := func() bool {
pushedMemTable := func() bool {
db.lock.Lock()
defer db.lock.Unlock()
y.AssertTrue(db.mt != nil)
select {
case db.flushChan <- flushTask{mt: db.mt}:
case db.flushChan <- db.mt:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.opt.Debugf("pushed to flush chan\n")
Expand All @@ -586,7 +586,7 @@ func (db *DB) close() (err error) {
}
return false
}()
if pushedFlushTask {
if pushedMemTable {
break
}
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -987,7 +988,7 @@ func (db *DB) ensureRoomForWrite() error {
}

select {
case db.flushChan <- flushTask{mt: db.mt}:
case db.flushChan <- db.mt:
db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.sl.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
Expand All @@ -1009,12 +1010,12 @@ func arenaSize(opt Options) int64 {
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -1024,23 +1025,15 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b
}

type flushTask struct {
mt *memTable
dropPrefixes [][]byte
return b
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

// handleMemTableFlush must be run serially.
func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
itr := mt.sl.NewUniIterator(false)
builder := buildL0Table(itr, nil, bopts)
defer builder.Close()

// buildL0Table can return nil if the none of the items in the skiplist are
Expand Down Expand Up @@ -1069,39 +1062,39 @@ func (db *DB) handleFlushTask(ft flushTask) error {
return err
}

// flushMemtable must keep running until we send it an empty flushTask. If there
// are errors during handling the flush task, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) error {
// flushMemtable must keep running until we send it an empty memtable. If there
// are errors during handling the memtable flush, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) {
defer lc.Done()

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
for mt := range db.flushChan {
if mt == nil {
continue
}
for {
err := db.handleFlushTask(ft)
if err == nil {
// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. ft.mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next ft.mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
db.lock.Unlock()

break
for {
if err := db.handleMemTableFlush(mt, nil); err != nil {
// Encountered error. Retry indefinitely.
db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
time.Sleep(time.Second)
continue
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)

// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
// unlock
db.lock.Unlock()
break
}
}
return nil
}

func exists(path string) (bool, error) {
Expand Down Expand Up @@ -1521,10 +1514,10 @@ func (db *DB) startCompactions() {
func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan flushTask, db.opt.NumMemtables)
db.flushChan = make(chan *memTable, db.opt.NumMemtables)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable)
db.flushMemtable(db.closers.memtable)
}()
}
}
Expand Down Expand Up @@ -1627,7 +1620,7 @@ func (db *DB) prepareToDrop() (func(), error) {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending flushtask. So that, we
// write it to db. Then, flush all the pending memtable. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
Expand Down Expand Up @@ -1676,7 +1669,7 @@ func (db *DB) dropAll() (func(), error) {
if err != nil {
return f, err
}
// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// prepareToDrop will stop all the incomming write and flushes any pending memtables.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
db.stopCompactions()
Expand Down Expand Up @@ -1758,13 +1751,8 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
memtable.DecrRef()
continue
}
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
dropPrefixes: filtered,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
if err := db.handleMemTableFlush(memtable, filtered); err != nil {
db.opt.Errorf("While trying to flush memtable: %v", err)
return err
}
Expand Down
21 changes: 10 additions & 11 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ func TestGetSetDeadlock(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

val := make([]byte, 1<<19)
key := []byte("key1")
Expand Down Expand Up @@ -1506,7 +1506,7 @@ func TestWriteDeadlock(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()
print := func(count *int) {
*count++
if *count%100 == 0 {
Expand Down Expand Up @@ -1886,7 +1886,7 @@ func ExampleOpen() {
if err != nil {
panic(err)
}
defer db.Close()
defer func() { y.Check(db.Close()) }()

err = db.View(func(txn *Txn) error {
_, err := txn.Get([]byte("key"))
Expand Down Expand Up @@ -1942,7 +1942,7 @@ func ExampleTxn_NewIterator() {
if err != nil {
panic(err)
}
defer db.Close()
defer func() { y.Check(db.Close()) }()

bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
Expand All @@ -1962,8 +1962,7 @@ func ExampleTxn_NewIterator() {
}
}

err = txn.Commit()
if err != nil {
if err := txn.Commit(); err != nil {
panic(err)
}

Expand Down Expand Up @@ -1995,7 +1994,7 @@ func TestSyncForRace(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

closeChan := make(chan struct{})
doneChan := make(chan struct{})
Expand Down Expand Up @@ -2038,14 +2037,14 @@ func TestSyncForRace(t *testing.T) {

func TestForceFlushMemtable(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")

ops := getTestOptions(dir)
ops.ValueLogMaxEntries = 1

db, err := Open(ops)
require.NoError(t, err, "error while openning db")
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

for i := 0; i < 3; i++ {
err = db.Update(func(txn *Txn) error {
Expand Down Expand Up @@ -2179,7 +2178,7 @@ func TestMinCacheSize(t *testing.T) {

func TestUpdateMaxCost(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")
defer os.RemoveAll(dir)

ops := getTestOptions(dir).
Expand Down Expand Up @@ -2286,7 +2285,7 @@ func TestOpenDBReadOnly(t *testing.T) {

func TestBannedPrefixes(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")
defer os.RemoveAll(dir)

opt := getTestOptions(dir).WithNamespaceOffset(3)
Expand Down

0 comments on commit 94d6168

Please sign in to comment.