Skip to content

Commit

Permalink
Merge pull request #8127 from heyitsanthony/fix-restore
Browse files Browse the repository at this point in the history
mvcc: restore into tree index with one key index
  • Loading branch information
Anthony Romano authored Jun 19, 2017
2 parents 45fbac5 + 51a568a commit 66f553a
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 47 deletions.
21 changes: 16 additions & 5 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type index interface {
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Equal(b index) bool

Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}

type treeIndex struct {
Expand Down Expand Up @@ -60,18 +62,27 @@ func (ti *treeIndex) Put(key []byte, rev revision) {

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}

ti.RLock()
defer ti.RUnlock()
item := ti.tree.Get(keyi)
if item == nil {
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}

keyi = item.(*keyIndex)
return keyi.get(atRev)
}

func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
}

func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}

func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
Expand Down
111 changes: 69 additions & 42 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ const (
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'

restoreChunkKeys = 10000
)

var restoreChunkKeys = 10000 // non-const for testing

// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
Expand Down Expand Up @@ -275,23 +275,15 @@ func (s *store) restore() error {
}

// index keys concurrently as they're loaded in from tx
unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
go func() {
defer close(donec)
for unordered := range unorderedc {
// restore the tree index from the unordered index.
for _, v := range unordered {
s.kvindex.Insert(v)
}
}
}()
rkvc, revc := restoreIntoIndex(s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys)
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// unbuffered so keys don't pile up in memory
unorderedc <- s.restoreChunk(keys, vals, keyToLease)
// rkvc blocks if the total pending keys exceeds the restore
// chunk size to keep keys from consuming too much memory.
restoreChunk(rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
Expand All @@ -301,8 +293,8 @@ func (s *store) restore() error {
newMin.sub++
revToBytes(newMin, min)
}
close(unorderedc)
<-donec
close(rkvc)
s.currentRev = <-revc

// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
Expand Down Expand Up @@ -334,38 +326,73 @@ func (s *store) restore() error {
return nil
}

func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
// assume half of keys are overwrites
unordered := make(map[string]*keyIndex, len(keys)/2)
type revKeyValue struct {
key []byte
kv mvccpb.KeyValue
kstr string
}

func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
go func() {
currentRev := int64(1)
defer func() { revc <- currentRev }()
// restore the tree index from streaming the unordered index.
kiCache := make(map[string]*keyIndex, restoreChunkKeys)
for rkv := range rkvc {
ki, ok := kiCache[rkv.kstr]
// purge kiCache if many keys but still missing in the cache
if !ok && len(kiCache) >= restoreChunkKeys {
i := 10
for k := range kiCache {
delete(kiCache, k)
if i--; i == 0 {
break
}
}
}
// cache miss, fetch from tree index if there
if !ok {
ki = &keyIndex{key: rkv.kv.Key}
if idxKey := idx.KeyIndex(ki); idxKey != nil {
kiCache[rkv.kstr], ki = idxKey, idxKey
ok = true
}
}
rev := bytesToRev(rkv.key)
currentRev = rev.main
if ok {
if isTombstone(rkv.key) {
ki.tombstone(rev.main, rev.sub)
continue
}
ki.put(rev.main, rev.sub)
} else if !isTombstone(rkv.key) {
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
idx.Insert(ki)
kiCache[rkv.kstr] = ki
}
}
}()
return rkvc, revc
}

func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
rev := bytesToRev(key[:revBytesLen])
s.currentRev = rev.main
kstr := string(kv.Key)
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
if ki, ok := unordered[kstr]; ok {
ki.tombstone(rev.main, rev.sub)
}
delete(keyToLease, kstr)
continue
}
if ki, ok := unordered[kstr]; ok {
ki.put(rev.main, rev.sub)
} else {
ki = &keyIndex{key: kv.Key}
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
unordered[kstr] = ki
}
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[kstr] = lid
delete(keyToLease, rkv.kstr)
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
keyToLease[rkv.kstr] = lid
} else {
delete(keyToLease, kstr)
delete(keyToLease, rkv.kstr)
}
kvc <- rkv
}
return unordered
}

func (s *store) Close() error {
Expand Down
58 changes: 58 additions & 0 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package mvcc
import (
"crypto/rand"
"encoding/binary"
"fmt"
"math"
mrand "math/rand"
"os"
"reflect"
"testing"
Expand Down Expand Up @@ -401,13 +403,64 @@ func TestStoreRestore(t *testing.T) {
}
ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
wact = []testutil.Action{
{"keyIndex", []interface{}{ki}},
{"insert", []interface{}{ki}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("index action = %+v, want %+v", g, wact)
}
}

func TestRestoreDelete(t *testing.T) {
oldChunk := restoreChunkKeys
restoreChunkKeys = mrand.Intn(3) + 2
defer func() { restoreChunkKeys = oldChunk }()

b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

keys := make(map[string]struct{})
for i := 0; i < 20; i++ {
ks := fmt.Sprintf("foo-%d", i)
k := []byte(ks)
s.Put(k, []byte("bar"), lease.NoLease)
keys[ks] = struct{}{}
switch mrand.Intn(3) {
case 0:
// put random key from past via random range on map
ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1))
s.Put([]byte(ks), []byte("baz"), lease.NoLease)
keys[ks] = struct{}{}
case 1:
// delete random key via random range on map
for k := range keys {
s.DeleteRange([]byte(k), nil)
delete(keys, k)
break
}
}
}
s.Close()

s = NewStore(b, &lease.FakeLessor{}, nil)
defer s.Close()
for i := 0; i < 20; i++ {
ks := fmt.Sprintf("foo-%d", i)
r, err := s.Range([]byte(ks), nil, RangeOptions{})
if err != nil {
t.Fatal(err)
}
if _, ok := keys[ks]; ok {
if len(r.KVs) == 0 {
t.Errorf("#%d: expected %q, got deleted", i, ks)
}
} else if len(r.KVs) != 0 {
t.Errorf("#%d: expected deleted, got %q", i, ks)
}
}
}

func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s0 := NewStore(b, &lease.FakeLessor{}, nil)
Expand Down Expand Up @@ -646,6 +699,11 @@ func (i *fakeIndex) Insert(ki *keyIndex) {
i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
}

func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
return nil
}

func createBytesSlice(bytesN, sliceN int) [][]byte {
rs := [][]byte{}
for len(rs) != sliceN {
Expand Down

0 comments on commit 66f553a

Please sign in to comment.