Skip to content

Commit

Permalink
mvcc: chunk reads for restoring
Browse files Browse the repository at this point in the history
Loading all keys at once would cause etcd to use twice as much
memory than it would need to serve the keys, causing RSS to spike on
boot. Instead, load the keys into the mvcc by chunk. Uses pipelining
for some concurrency.

Fixes #7822
  • Loading branch information
heyitsanthony committed May 10, 2017
1 parent 47f5b7c commit 163fd2d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 61 deletions.
138 changes: 79 additions & 59 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ var (
keyBucketName = []byte("key")
metaBucketName = []byte("meta")

// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'

consistentIndexKeyName = []byte("consistent_index")
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
Expand All @@ -52,6 +45,17 @@ var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
)

const (
// markedRevBytesLen is the byte length of marked revision.
// The first `revBytesLen` bytes represents a normal revision. The last
// one byte is the mark.
markedRevBytesLen = revBytesLen + 1
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'

restoreChunkKeys = 10000
)

// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
Expand Down Expand Up @@ -247,11 +251,6 @@ func (s *store) restore() error {

keyToLease := make(map[string]lease.LeaseID)

// use an unordered map to hold the temp index data to speed up
// the initial key index recovery.
// we will convert this unordered map into the tree index later.
unordered := make(map[string]*keyIndex, 100000)

// restore index
tx := s.b.BatchTx()
tx.Lock()
Expand All @@ -260,55 +259,51 @@ func (s *store) restore() error {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
plog.Printf("restore compact to %d", s.compactMainRev)
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}

// TODO: limit N to reduce max memory usage
keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
for i, key := range keys {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}

rev := bytesToRev(key[:revBytesLen])
s.currentRev = rev.main

// restore index
switch {
case isTombstone(key):
if ki, ok := unordered[string(kv.Key)]; ok {
ki.tombstone(rev.main, rev.sub)
}
delete(keyToLease, string(kv.Key))

default:
ki, ok := unordered[string(kv.Key)]
if ok {
ki.put(rev.main, rev.sub)
} else {
ki = &keyIndex{key: kv.Key}
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
unordered[string(kv.Key)] = ki
}

if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
// index keys concurrently as they're loaded in from tx
unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
go func() {
defer close(donec)
for unordered := range unorderedc {
// restore the tree index from the unordered index.
for _, v := range unordered {
s.kvindex.Insert(v)
}
}
}()
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys)
if len(keys) == 0 {
break
}
// unbuffered so keys don't pile up in memory
unorderedc <- s.restoreChunk(keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
}
// next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}

// restore the tree index from the unordered index.
for _, v := range unordered {
s.kvindex.Insert(v)
}
close(unorderedc)
<-donec

// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}

for key, lid := range keyToLease {
if s.le == nil {
Expand All @@ -320,15 +315,6 @@ func (s *store) restore() error {
}
}

_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}
}

tx.Unlock()

if scheduledCompact != 0 {
Expand All @@ -339,6 +325,40 @@ func (s *store) restore() error {
return nil
}

func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
// assume half of keys are overwrites
unordered := make(map[string]*keyIndex, len(keys)/2)
for i, key := range keys {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
}
rev := bytesToRev(key[:revBytesLen])
s.currentRev = rev.main
kstr := string(kv.Key)
if isTombstone(key) {
if ki, ok := unordered[kstr]; ok {
ki.tombstone(rev.main, rev.sub)
}
delete(keyToLease, kstr)
continue
}
if ki, ok := unordered[kstr]; ok {
ki.put(rev.main, rev.sub)
} else {
ki = &keyIndex{key: kv.Key}
ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
unordered[kstr] = ki
}
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[kstr] = lid
} else {
delete(keyToLease, kstr)
}
}
return unordered
}

func (s *store) Close() error {
close(s.stopc)
s.fifoSched.Stop()
Expand Down
6 changes: 4 additions & 2 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}

b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil}

s.restore()

if s.compactMainRev != 3 {
Expand All @@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) {
}
wact := []testutil.Action{
{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
Expand Down

0 comments on commit 163fd2d

Please sign in to comment.