diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 36b3d9a261d..e5260638cd9 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -33,13 +33,6 @@ var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") - // markedRevBytesLen is the byte length of marked revision. - // The first `revBytesLen` bytes represents a normal revision. The last - // one byte is the mark. - markedRevBytesLen = revBytesLen + 1 - markBytePosition = markedRevBytesLen - 1 - markTombstone byte = 't' - consistentIndexKeyName = []byte("consistent_index") scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -52,6 +45,17 @@ var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) +const ( + // markedRevBytesLen is the byte length of marked revision. + // The first `revBytesLen` bytes represents a normal revision. The last + // one byte is the mark. + markedRevBytesLen = revBytesLen + 1 + markBytePosition = markedRevBytesLen - 1 + markTombstone byte = 't' + + restoreChunkKeys = 10000 +) + // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. type ConsistentIndexGetter interface { @@ -247,11 +251,6 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) - // use an unordered map to hold the temp index data to speed up - // the initial key index recovery. - // we will convert this unordered map into the tree index later. - unordered := make(map[string]*keyIndex, 100000) - // restore index tx := s.b.BatchTx() tx.Lock() @@ -260,48 +259,41 @@ func (s *store) restore() error { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main plog.Printf("restore compact to %d", s.compactMainRev) } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) + scheduledCompact := int64(0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main + } - // TODO: limit N to reduce max memory usage - keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) - for i, key := range keys { - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vals[i]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) - } - - rev := bytesToRev(key[:revBytesLen]) - s.currentRev = rev.main - - // restore index - switch { - case isTombstone(key): - if ki, ok := unordered[string(kv.Key)]; ok { - ki.tombstone(rev.main, rev.sub) - } - delete(keyToLease, string(kv.Key)) - - default: - ki, ok := unordered[string(kv.Key)] - if ok { - ki.put(rev.main, rev.sub) - } else { - ki = &keyIndex{key: kv.Key} - ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) - unordered[string(kv.Key)] = ki - } - - if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { - keyToLease[string(kv.Key)] = lid - } else { - delete(keyToLease, string(kv.Key)) + // index keys concurrently as they're loaded in from tx + unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{}) + go func() { + defer close(donec) + for unordered := range unorderedc { + // restore the tree index from the unordered index. + for _, v := range unordered { + s.kvindex.Insert(v) } } + }() + for { + keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys) + if len(keys) == 0 { + break + } + // unbuffered so keys don't pile up in memory + unorderedc <- s.restoreChunk(keys, vals, keyToLease) + if len(keys) < restoreChunkKeys { + // partial set implies final set + break + } + // next set begins after where this one ended + newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) + newMin.sub++ + revToBytes(newMin, min) } - - // restore the tree index from the unordered index. - for _, v := range unordered { - s.kvindex.Insert(v) - } + close(unorderedc) + <-donec // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision @@ -309,6 +301,9 @@ func (s *store) restore() error { if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } + if scheduledCompact <= s.compactMainRev { + scheduledCompact = 0 + } for key, lid := range keyToLease { if s.le == nil { @@ -320,15 +315,6 @@ func (s *store) restore() error { } } - _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) - scheduledCompact := int64(0) - if len(scheduledCompactBytes) != 0 { - scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main - if scheduledCompact <= s.compactMainRev { - scheduledCompact = 0 - } - } - tx.Unlock() if scheduledCompact != 0 { @@ -339,6 +325,40 @@ func (s *store) restore() error { return nil } +func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex { + // assume half of keys are overwrites + unordered := make(map[string]*keyIndex, len(keys)/2) + for i, key := range keys { + var kv mvccpb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { + plog.Fatalf("cannot unmarshal event: %v", err) + } + rev := bytesToRev(key[:revBytesLen]) + s.currentRev = rev.main + kstr := string(kv.Key) + if isTombstone(key) { + if ki, ok := unordered[kstr]; ok { + ki.tombstone(rev.main, rev.sub) + } + delete(keyToLease, kstr) + continue + } + if ki, ok := unordered[kstr]; ok { + ki.put(rev.main, rev.sub) + } else { + ki = &keyIndex{key: kv.Key} + ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) + unordered[kstr] = ki + } + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[kstr] = lid + } else { + delete(keyToLease, kstr) + } + } + return unordered +} + func (s *store) Close() error { close(s.stopc) s.fifoSched.Stop() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f1e8167c3b6..2d85d8baffa 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) { t.Fatal(err) } b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} + b.tx.rangeRespc <- rangeResp{nil, nil} + s.restore() if s.compactMainRev != 3 { @@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) { } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact)