From 58fc7d44f73b65cc4cf9121bd540d905c774b588 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 23 Oct 2019 15:08:59 +0800 Subject: [PATCH] store,kv: snapshot doesn't cache the non-exists kv entries lead to poor 'insert ignore' performance (#12872) --- kv/kv.go | 2 ++ store/mockstore/mocktikv/rpc.go | 7 +++++++ store/tikv/snapshot.go | 24 +++++++++++++++++++++--- store/tikv/snapshot_test.go | 22 ++++++++++++++++++++++ 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 8413d86f684f9..652229871d625 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -187,6 +187,8 @@ type Transaction interface { // SetVars sets variables to the transaction. SetVars(vars *Variables) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. + // Do not use len(value) == 0 or value == nil to represent non-exist. + // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error) IsPessimistic() bool } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 1361ffdd30d95..3dc2b10edcb90 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -708,6 +709,12 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, er // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { if val.(bool) { failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 96a30073ca4d1..584232e3e780f 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -21,6 +21,7 @@ import ( "time" "unsafe" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -62,6 +63,10 @@ type tikvSnapshot struct { // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, // the result should not change. + // NOTE: This representation here is different from the BatchGet API. + // cached use len(value)=0 to represent a key-value entry doesn't exist (a reliable truth from TiKV). + // In the BatchGet API, it use no key-value entry to represent non-exist. + // It's OK as long as there are no zero-byte values in the protocol. cached map[string][]byte } @@ -95,7 +100,9 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] tmp := keys[:0] for _, key := range keys { if val, ok := s.cached[string(key)]; ok { - m[string(key)] = val + if len(val) > 0 { + m[string(key)] = val + } } else { tmp = append(tmp, key) } @@ -121,6 +128,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] if len(v) == 0 { return } + mu.Lock() m[string(k)] = v mu.Unlock() @@ -138,8 +146,8 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] if s.cached == nil { s.cached = make(map[string][]byte, len(m)) } - for key, value := range m { - s.cached[key] = value + for _, key := range keys { + s.cached[string(key)] = m[string(key)] } return m, nil @@ -253,6 +261,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll // Get gets the value for key k from snapshot. func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + ctx = context.WithValue(ctx, txnStartKey, s.version.Ver) val, err := s.get(NewBackoffer(ctx, getMaxBackoff), k) if err != nil { @@ -272,6 +286,10 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { } } + failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { + panic("cache miss") + }) + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 7ea31aebe801e..d8ea5792975c2 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" @@ -117,6 +118,27 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { } } +func (s *testSnapshotSuite) TestSnapshotCache(c *C) { + txn := s.beginTxn(c) + c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Commit(context.Background()), IsNil) + + txn = s.beginTxn(c) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) + _, err := snapshot.BatchGet(context.Background(), []kv.Key{kv.Key("x"), kv.Key("y")}) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail", `return(true)`), IsNil) + ctx := context.WithValue(context.Background(), "TestSnapshotCache", true) + _, err = snapshot.Get(ctx, kv.Key("x")) + c.Assert(err, IsNil) + + _, err = snapshot.Get(ctx, kv.Key("y")) + c.Assert(kv.IsErrNotFound(err), IsTrue) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail"), IsNil) +} + func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { for _, rowNum := range s.rowNums { logutil.BgLogger().Debug("test BatchGetNotExist",