Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: push down RangeOptions.limit argv into index tree to reduce memory overhead #11990

Merged
merged 2 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Add [missing CRC checksum check in WAL validate method otherwise causes panic](https://github.com/etcd-io/etcd/pull/11924).
- See https://github.com/etcd-io/etcd/issues/11918.
- Improve logging around snapshot send and receive.
- [Push down RangeOptions.limit argv into index tree to reduce memory overhead](https://github.com/etcd-io/etcd/pull/11990).

### Package `embed`

Expand Down
29 changes: 20 additions & 9 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64) []revision
CountRevisions(key, end []byte, atRev int64) int
Revisions(key, end []byte, atRev int64, limit int) []revision
CountRevisions(key, end []byte, atRev int64, limit int) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Expand Down Expand Up @@ -89,7 +89,7 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
return nil
}

func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}

ti.RLock()
Expand All @@ -99,28 +99,34 @@ func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
f(item.(*keyIndex))
if !f(item.(*keyIndex)) {
return false
}
return true
})
}

func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil
}
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
if len(revs) == limit {
return false
}
}
return true
})
return revs
}

func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
if end == nil {
_, _, _, err := ti.Get(key, atRev)
if err != nil {
Expand All @@ -129,10 +135,14 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
return 1
}
total := 0
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++
if total == limit {
return false
}
}
return true
})
return total
}
Expand All @@ -145,11 +155,12 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
}
return [][]byte{key}, []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
return true
})
return keys, revs
}
Expand Down
8 changes: 6 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,12 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
if r.Rev != wrev {
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
}
if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
}
} else if r.Count != int(tt.limit) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,12 +936,12 @@ type fakeIndex struct {
indexCompactRespc chan map[revision]struct{}
}

func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision {
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) []revision {
_, rev := i.Range(key, end, atRev)
return rev
}

func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
_, rev := i.Range(key, end, atRev)
return len(rev)
}
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev)
total := tr.s.kvindex.CountRevisions(key, end, rev, int(ro.Limit))
tr.trace.Step("count revisions from in-memory index tree")
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
revpairs := tr.s.kvindex.Revisions(key, end, rev)
revpairs := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
tr.trace.Step("range keys from in-memory index tree")
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
Expand Down