Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(memtable): refactor code for memtable flush #1866

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 48 additions & 60 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type DB struct {
lc *levelsController
vlog valueLog
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
flushChan chan *memTable // For flushing memtables.
closeOnce sync.Once // For closing DB only once.

blockWrites atomic.Int32
Expand Down Expand Up @@ -240,7 +240,7 @@ func Open(opt Options) (*DB, error) {

db := &DB{
imm: make([]*memTable, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
flushChan: make(chan *memTable, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
Expand Down Expand Up @@ -351,11 +351,11 @@ func Open(opt Options) (*DB, error) {

db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- flushTask{mt: mt}
db.flushChan <- mt
}
}
// We do increment nextTxnTs below. So, no need to do it here.
Expand Down Expand Up @@ -568,12 +568,12 @@ func (db *DB) close() (err error) {
} else {
db.opt.Debugf("Flushing memtable")
for {
pushedFlushTask := func() bool {
pushedMemTable := func() bool {
db.lock.Lock()
defer db.lock.Unlock()
y.AssertTrue(db.mt != nil)
select {
case db.flushChan <- flushTask{mt: db.mt}:
case db.flushChan <- db.mt:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.opt.Debugf("pushed to flush chan\n")
Expand All @@ -586,7 +586,7 @@ func (db *DB) close() (err error) {
}
return false
}()
if pushedFlushTask {
if pushedMemTable {
break
}
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -987,7 +988,7 @@ func (db *DB) ensureRoomForWrite() error {
}

select {
case db.flushChan <- flushTask{mt: db.mt}:
case db.flushChan <- db.mt:
db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.sl.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
Expand All @@ -1009,12 +1010,12 @@ func arenaSize(opt Options) int64 {
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -1024,23 +1025,15 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b
}

type flushTask struct {
mt *memTable
dropPrefixes [][]byte
return b
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

// handleMemTableFlush must be run serially.
func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
itr := mt.sl.NewUniIterator(false)
builder := buildL0Table(itr, nil, bopts)
defer builder.Close()

// buildL0Table can return nil if the none of the items in the skiplist are
Expand Down Expand Up @@ -1069,39 +1062,39 @@ func (db *DB) handleFlushTask(ft flushTask) error {
return err
}

// flushMemtable must keep running until we send it an empty flushTask. If there
// are errors during handling the flush task, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) error {
// flushMemtable must keep running until we send it an empty memtable. If there
// are errors during handling the memtable flush, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) {
defer lc.Done()

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
for mt := range db.flushChan {
if mt == nil {
continue
}
for {
err := db.handleFlushTask(ft)
if err == nil {
// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. ft.mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next ft.mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
db.lock.Unlock()

break
for {
if err := db.handleMemTableFlush(mt, nil); err != nil {
// Encountered error. Retry indefinitely.
db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
time.Sleep(time.Second)
continue
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)

// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
// unlock
db.lock.Unlock()
break
}
}
return nil
}

func exists(path string) (bool, error) {
Expand Down Expand Up @@ -1521,10 +1514,10 @@ func (db *DB) startCompactions() {
func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan flushTask, db.opt.NumMemtables)
db.flushChan = make(chan *memTable, db.opt.NumMemtables)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable)
db.flushMemtable(db.closers.memtable)
}()
}
}
Expand Down Expand Up @@ -1627,7 +1620,7 @@ func (db *DB) prepareToDrop() (func(), error) {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending flushtask. So that, we
// write it to db. Then, flush all the pending memtable. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
Expand Down Expand Up @@ -1676,7 +1669,7 @@ func (db *DB) dropAll() (func(), error) {
if err != nil {
return f, err
}
// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// prepareToDrop will stop all the incomming write and flushes any pending memtables.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
db.stopCompactions()
Expand Down Expand Up @@ -1758,13 +1751,8 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
memtable.DecrRef()
continue
}
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
dropPrefixes: filtered,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
if err := db.handleMemTableFlush(memtable, filtered); err != nil {
db.opt.Errorf("While trying to flush memtable: %v", err)
return err
}
Expand Down
21 changes: 10 additions & 11 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ func TestGetSetDeadlock(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

val := make([]byte, 1<<19)
key := []byte("key1")
Expand Down Expand Up @@ -1506,7 +1506,7 @@ func TestWriteDeadlock(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()
print := func(count *int) {
*count++
if *count%100 == 0 {
Expand Down Expand Up @@ -1886,7 +1886,7 @@ func ExampleOpen() {
if err != nil {
panic(err)
}
defer db.Close()
defer func() { y.Check(db.Close()) }()

err = db.View(func(txn *Txn) error {
_, err := txn.Get([]byte("key"))
Expand Down Expand Up @@ -1942,7 +1942,7 @@ func ExampleTxn_NewIterator() {
if err != nil {
panic(err)
}
defer db.Close()
defer func() { y.Check(db.Close()) }()

bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
Expand All @@ -1962,8 +1962,7 @@ func ExampleTxn_NewIterator() {
}
}

err = txn.Commit()
if err != nil {
if err := txn.Commit(); err != nil {
panic(err)
}

Expand Down Expand Up @@ -1995,7 +1994,7 @@ func TestSyncForRace(t *testing.T) {

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

closeChan := make(chan struct{})
doneChan := make(chan struct{})
Expand Down Expand Up @@ -2038,14 +2037,14 @@ func TestSyncForRace(t *testing.T) {

func TestForceFlushMemtable(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")

ops := getTestOptions(dir)
ops.ValueLogMaxEntries = 1

db, err := Open(ops)
require.NoError(t, err, "error while openning db")
defer db.Close()
defer func() { require.NoError(t, db.Close()) }()

for i := 0; i < 3; i++ {
err = db.Update(func(txn *Txn) error {
Expand Down Expand Up @@ -2179,7 +2178,7 @@ func TestMinCacheSize(t *testing.T) {

func TestUpdateMaxCost(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")
defer os.RemoveAll(dir)

ops := getTestOptions(dir).
Expand Down Expand Up @@ -2286,7 +2285,7 @@ func TestOpenDBReadOnly(t *testing.T) {

func TestBannedPrefixes(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
require.NoError(t, err, "temp dir for badger could not be created")
defer os.RemoveAll(dir)

opt := getTestOptions(dir).WithNamespaceOffset(3)
Expand Down