From 163fd2d76b75e0d04d0dfbcd9c7c02a375a70d51 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 8 May 2017 19:29:53 -0700 Subject: [PATCH] mvcc: chunk reads for restoring Loading all keys at once would cause etcd to use twice as much memory than it would need to serve the keys, causing RSS to spike on boot. Instead, load the keys into the mvcc by chunk. Uses pipelining for some concurrency. Fixes #7822 --- mvcc/kvstore.go | 138 +++++++++++++++++++++++++------------------ mvcc/kvstore_test.go | 6 +- 2 files changed, 83 insertions(+), 61 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 36b3d9a261d..e5260638cd9 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -33,13 +33,6 @@ var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") - // markedRevBytesLen is the byte length of marked revision. - // The first `revBytesLen` bytes represents a normal revision. The last - // one byte is the mark. - markedRevBytesLen = revBytesLen + 1 - markBytePosition = markedRevBytesLen - 1 - markTombstone byte = 't' - consistentIndexKeyName = []byte("consistent_index") scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -52,6 +45,17 @@ var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) +const ( + // markedRevBytesLen is the byte length of marked revision. + // The first `revBytesLen` bytes represents a normal revision. The last + // one byte is the mark. + markedRevBytesLen = revBytesLen + 1 + markBytePosition = markedRevBytesLen - 1 + markTombstone byte = 't' + + restoreChunkKeys = 10000 +) + // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. type ConsistentIndexGetter interface { @@ -247,11 +251,6 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) - // use an unordered map to hold the temp index data to speed up - // the initial key index recovery. - // we will convert this unordered map into the tree index later. - unordered := make(map[string]*keyIndex, 100000) - // restore index tx := s.b.BatchTx() tx.Lock() @@ -260,48 +259,41 @@ func (s *store) restore() error { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main plog.Printf("restore compact to %d", s.compactMainRev) } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) + scheduledCompact := int64(0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main + } - // TODO: limit N to reduce max memory usage - keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) - for i, key := range keys { - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vals[i]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) - } - - rev := bytesToRev(key[:revBytesLen]) - s.currentRev = rev.main - - // restore index - switch { - case isTombstone(key): - if ki, ok := unordered[string(kv.Key)]; ok { - ki.tombstone(rev.main, rev.sub) - } - delete(keyToLease, string(kv.Key)) - - default: - ki, ok := unordered[string(kv.Key)] - if ok { - ki.put(rev.main, rev.sub) - } else { - ki = &keyIndex{key: kv.Key} - ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) - unordered[string(kv.Key)] = ki - } - - if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { - keyToLease[string(kv.Key)] = lid - } else { - delete(keyToLease, string(kv.Key)) + // index keys concurrently as they're loaded in from tx + unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{}) + go func() { + defer close(donec) + for unordered := range unorderedc { + // restore the tree index from the unordered index. + for _, v := range unordered { + s.kvindex.Insert(v) } } + }() + for { + keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys) + if len(keys) == 0 { + break + } + // unbuffered so keys don't pile up in memory + unorderedc <- s.restoreChunk(keys, vals, keyToLease) + if len(keys) < restoreChunkKeys { + // partial set implies final set + break + } + // next set begins after where this one ended + newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) + newMin.sub++ + revToBytes(newMin, min) } - - // restore the tree index from the unordered index. - for _, v := range unordered { - s.kvindex.Insert(v) - } + close(unorderedc) + <-donec // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision @@ -309,6 +301,9 @@ func (s *store) restore() error { if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } + if scheduledCompact <= s.compactMainRev { + scheduledCompact = 0 + } for key, lid := range keyToLease { if s.le == nil { @@ -320,15 +315,6 @@ func (s *store) restore() error { } } - _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) - scheduledCompact := int64(0) - if len(scheduledCompactBytes) != 0 { - scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main - if scheduledCompact <= s.compactMainRev { - scheduledCompact = 0 - } - } - tx.Unlock() if scheduledCompact != 0 { @@ -339,6 +325,40 @@ func (s *store) restore() error { return nil } +func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex { + // assume half of keys are overwrites + unordered := make(map[string]*keyIndex, len(keys)/2) + for i, key := range keys { + var kv mvccpb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { + plog.Fatalf("cannot unmarshal event: %v", err) + } + rev := bytesToRev(key[:revBytesLen]) + s.currentRev = rev.main + kstr := string(kv.Key) + if isTombstone(key) { + if ki, ok := unordered[kstr]; ok { + ki.tombstone(rev.main, rev.sub) + } + delete(keyToLease, kstr) + continue + } + if ki, ok := unordered[kstr]; ok { + ki.put(rev.main, rev.sub) + } else { + ki = &keyIndex{key: kv.Key} + ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) + unordered[kstr] = ki + } + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[kstr] = lid + } else { + delete(keyToLease, kstr) + } + } + return unordered +} + func (s *store) Close() error { close(s.stopc) s.fifoSched.Stop() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f1e8167c3b6..2d85d8baffa 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) { t.Fatal(err) } b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} + b.tx.rangeRespc <- rangeResp{nil, nil} + s.restore() if s.compactMainRev != 3 { @@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) { } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact)