Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen #8081

Merged
merged 22 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ required = ["github.com/golang/protobuf/jsonpb"]

[[constraint]]
name = "github.com/pingcap/kvproto"
branch = "master"
source = "https://github.com/MyonKeminta/kvproto"
branch = "misono/add-end-key"

[[constraint]]
name = "gopkg.in/natefinch/lumberjack.v2"
Expand Down
10 changes: 5 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ func (s *testDBSuite) TestTruncateTable(c *C) {
hasOldTableData := true
for i := 0; i < waitForCleanDataRound; i++ {
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err1 := txn.Seek(tablePrefix)
it, err1 := txn.Iter(tablePrefix, nil)
if err1 != nil {
return err1
}
Expand Down Expand Up @@ -2803,7 +2803,7 @@ func (s *testDBSuite) TestAlterTableDropPartition(c *C) {

s.tk.MustExec("drop table if exists tr;")
s.tk.MustExec(` create table tr(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2893,7 +2893,7 @@ func checkPartitionDelRangeDone(c *C, s *testDBSuite, partitionPrefix kv.Key) bo
hasOldPartitionData := true
for i := 0; i < waitForCleanDataRound; i++ {
err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err := txn.Seek(partitionPrefix)
it, err := txn.Iter(partitionPrefix, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -2943,7 +2943,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test truncate table partition.
s.tk.MustExec("drop table if exists t3;")
s.tk.MustExec(`create table t3(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2981,7 +2981,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test drop table partition.
s.tk.MustExec("drop table if exists t4;")
s.tk.MustExec(`create table t4(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down
2 changes: 1 addition & 1 deletion ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error {
iter, err := txn.Seek(oldStartKey)
iter, err := txn.Iter(oldStartKey, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.EndKey?

if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,8 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
upperBound := firstKey.PrefixNext()
it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
16 changes: 8 additions & 8 deletions kv/buffer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,26 @@ func (s *BufferStore) Get(k Key) ([]byte, error) {
return val, nil
}

// Seek implements the Retriever interface.
func (s *BufferStore) Seek(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Seek(k)
// Iter implements the Retriever interface.
func (s *BufferStore) Iter(k Key, upperBound Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.Seek(k)
retrieverIt, err := s.r.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, false)
}

// SeekReverse implements the Retriever interface.
func (s *BufferStore) SeekReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.SeekReverse(k)
// IterReverse implements the Retriever interface.
func (s *BufferStore) IterReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.SeekReverse(k)
retrieverIt, err := s.r.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/buffer_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

iter, err := mutator.Seek(nil)
iter, err := mutator.Iter(nil, nil)
c.Check(err, IsNil)
for iter.Valid() {
cmp := bytes.Compare(iter.Key(), iter.Value())
Expand Down
10 changes: 6 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ type Retriever interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(k Key) ([]byte, error)
// Seek creates an Iterator positioned on the first entry that k <= entry's key.
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Seek(k Key) (Iterator, error)
Iter(k Key, upperBound Key) (Iterator, error)

// SeekReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
SeekReverse(k Key) (Iterator, error)
// TODO: Add lower bound limit
IterReverse(k Key) (Iterator, error)
}

// Mutator is the interface wraps the basic Set and Delete methods.
Expand Down
20 changes: 10 additions & 10 deletions kv/mem_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func valToStr(c *C, iter Iterator) string {
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
Expand All @@ -86,7 +86,7 @@ func checkNewIterator(c *C, buffer MemBuffer) {
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
Expand All @@ -102,15 +102,15 @@ func checkNewIterator(c *C, buffer MemBuffer) {
}

// Non exist and beyond maximum seek test
iter, err := buffer.Seek(encodeInt(testCount * indexStep))
iter, err := buffer.Iter(encodeInt(testCount*indexStep), nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter, err = buffer.Seek(inBetween)
iter, err = buffer.Iter(inBetween, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), Not(BytesEquals), inBetween)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *testKVSuite) TestNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
// should be invalid
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

Expand All @@ -155,7 +155,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
insertData(c, buffer)

iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)

err = NextUntil(iter, func(k Key) bool {
Expand All @@ -168,7 +168,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
func (s *testKVSuite) TestBasicNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
it, err := buffer.Seek([]byte("2"))
it, err := buffer.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
}
Expand All @@ -193,15 +193,15 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}

cnt := 0
it, err := buffer.Seek(nil)
it, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)

it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
it, err = buffer.Iter([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"), nil)
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
if err != nil {
b.Error(err)
}
Expand Down
8 changes: 4 additions & 4 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func NewMemDbBuffer(cap int) MemBuffer {
}
}

// Seek creates an Iterator.
func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
// Iter creates an Iterator.
func (m *memDbBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
var i Iterator
MyonKeminta marked this conversation as resolved.
Show resolved Hide resolved
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: false}
Expand All @@ -69,7 +69,7 @@ func (m *memDbBuffer) SetCap(cap int) {

}

func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) {
func (m *memDbBuffer) IterReverse(k Key) (Iterator, error) {
var i *memDbIter
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (i *memDbIter) Close() {

// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k Key, v []byte) error) error {
iter, err := memBuf.Seek(nil)
iter, err := memBuf.Iter(nil, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) Seek(k Key) (Iterator, error) {
func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}

func (t *mockTxn) SeekReverse(k Key) (Iterator, error) {
func (t *mockTxn) IterReverse(k Key) (Iterator, error) {
return nil, nil
}

Expand Down Expand Up @@ -211,10 +211,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) SeekReverse(k Key) (Iterator, error) {
return s.store.SeekReverse(k)
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}
4 changes: 2 additions & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (s testMockSuite) TestInterface(c *C) {
if transaction.IsReadOnly() {
transaction.Get(Key("lock"))
transaction.Set(Key("lock"), []byte{})
transaction.Seek(Key("lock"))
transaction.SeekReverse(Key("lock"))
transaction.Iter(Key("lock"), nil)
transaction.IterReverse(Key("lock"))
}
transaction.Commit(context.Background())

Expand Down
8 changes: 4 additions & 4 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ func (lmb *lazyMemBuffer) Delete(k Key) error {
return lmb.mb.Delete(k)
}

func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.Seek(k)
return lmb.mb.Iter(k, upperBound)
}

func (lmb *lazyMemBuffer) SeekReverse(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) IterReverse(k Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.SeekReverse(k)
return lmb.mb.IterReverse(k)
}

func (lmb *lazyMemBuffer) Size() int {
Expand Down
Loading