-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added get multiple api for badger #1990
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -746,6 +746,8 @@ func (db *DB) getMemTables() ([]*memTable, func()) { | |
// get returns the value in memtable or disk for given key. | ||
// Note that value will include meta byte. | ||
// | ||
// getBatch would return the values of list of keys in order | ||
// | ||
// IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to | ||
// maintain this invariant to search for the latest value of a key, or else we need to search in all | ||
// tables and find the max version among them. To maintain this invariant, we also need to ensure | ||
|
@@ -757,7 +759,54 @@ func (db *DB) getMemTables() ([]*memTable, func()) { | |
// do that. For every get("fooX") call where X is the version, we will search | ||
// for "fooX" in all the levels of the LSM tree. This is expensive but it | ||
// removes the overhead of handling move keys completely. | ||
func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) { | ||
if db.IsClosed() { | ||
return []y.ValueStruct{}, ErrDBClosed | ||
} | ||
tables, decr := db.getMemTables() // Lock should be released. | ||
defer decr() | ||
|
||
maxVs := make([]y.ValueStruct, len(keys)) | ||
|
||
y.NumGetsAdd(db.opt.MetricsEnabled, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are using the old metric, we should increment this by |
||
// For memtable, we need to check every memtable each time | ||
for j, key := range keys { | ||
if done[j] { | ||
continue | ||
} | ||
version := y.ParseTs(key) | ||
for i := 0; i < len(tables); i++ { | ||
vs := tables[i].sl.Get(key) | ||
y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1) | ||
if vs.Meta == 0 && vs.Value == nil { | ||
continue | ||
} | ||
// Found the required version of the key, mark as done, no need to process | ||
// it further | ||
if vs.Version == version { | ||
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1) | ||
maxVs[j] = vs | ||
done[j] = true | ||
break | ||
} | ||
if maxVs[j].Version < vs.Version { | ||
maxVs[j] = vs | ||
} | ||
} | ||
} | ||
return db.lc.getBatch(keys, maxVs, 0, done) | ||
} | ||
|
||
func (db *DB) get(key []byte) (y.ValueStruct, error) { | ||
if db.opt.useGetBatch { | ||
done := make([]bool, 1) | ||
vals, err := db.getBatch([][]byte{key}, done) | ||
if len(vals) != 0 { | ||
return vals[0], err | ||
} | ||
return y.ValueStruct{}, err | ||
} | ||
|
||
if db.IsClosed() { | ||
return y.ValueStruct{}, ErrDBClosed | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -273,6 +273,113 @@ func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) | |
return []*table.Table{tbl}, tbl.DecrRef | ||
} | ||
|
||
func (s *levelHandler) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) { | ||
// Find the table for which the key is in, and then seek it | ||
getForKey := func(key []byte) (y.ValueStruct, func() error, []*table.Iterator) { | ||
tables, decr := s.getTableForKey(key) | ||
keyNoTs := y.ParseKey(key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should just parse the key once, we are doing it again and again |
||
itrs := make([]*table.Iterator, 0) | ||
|
||
hash := y.Hash(keyNoTs) | ||
var maxVs y.ValueStruct | ||
for _, th := range tables { | ||
if th.DoesNotHave(hash) { | ||
y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1) | ||
continue | ||
} | ||
|
||
it := th.NewIterator(0) | ||
itrs = append(itrs, it) | ||
|
||
y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1) | ||
it.Seek(key) | ||
if !it.Valid() { | ||
continue | ||
} | ||
if y.SameKey(key, it.Key()) { | ||
if version := y.ParseTs(it.Key()); maxVs.Version < version { | ||
maxVs = it.ValueCopy() | ||
maxVs.Version = version | ||
} | ||
} | ||
} | ||
|
||
return maxVs, decr, itrs | ||
} | ||
|
||
// Use old results from getForKey and find in those tables. | ||
findInIter := func(key []byte, itrs []*table.Iterator) y.ValueStruct { | ||
var maxVs y.ValueStruct | ||
|
||
for _, it := range itrs { | ||
it.Seek(key) | ||
if !it.Valid() { | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. break? |
||
} | ||
if y.SameKey(key, it.Key()) { | ||
if version := y.ParseTs(it.Key()); maxVs.Version < version { | ||
maxVs = it.ValueCopy() | ||
maxVs.Version = version | ||
} | ||
} | ||
} | ||
|
||
return maxVs | ||
} | ||
|
||
results := make([]y.ValueStruct, len(keys)) | ||
// For L0, we need to search all tables each time, so we can just call get() as required | ||
if s.level == 0 { | ||
var err error | ||
for i, key := range keys { | ||
if done[i] { | ||
continue | ||
} | ||
results[i], err = s.get(key) | ||
if err != nil { | ||
return results, err | ||
} | ||
} | ||
return results, nil | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @harshil-goel I need to understand this code block (lines 344-373) ... will sync up with you offline. |
||
decr := func() error { return nil } | ||
var itrs []*table.Iterator | ||
|
||
started := false | ||
for i := 0; i < len(keys); i++ { | ||
if done[i] { | ||
continue | ||
} | ||
if !started { | ||
var maxVs y.ValueStruct | ||
maxVs, decr, itrs = getForKey(keys[0]) | ||
results[i] = maxVs | ||
started = true | ||
} else { | ||
results[i] = findInIter(keys[i], itrs) | ||
// If we can't find in the current tables, maybe the | ||
// data is there in other tables | ||
if len(results[i].Value) == 0 { | ||
for i := 0; i < len(itrs); i++ { | ||
itrs[i].Close() | ||
} | ||
err := decr() | ||
if err != nil { | ||
return nil, err | ||
} | ||
results[i], decr, itrs = getForKey(keys[i]) | ||
} | ||
} | ||
} | ||
|
||
for i := 0; i < len(itrs); i++ { | ||
itrs[i].Close() | ||
} | ||
return results, decr() | ||
} | ||
|
||
} | ||
|
||
// get returns value for a given key or the key after that. If not found, return nil. | ||
func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { | ||
tables, decr := s.getTableForKey(key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we rename
done
tokeyRead
or something similar?