diff --git a/errno/errcode.go b/errno/errcode.go index 1651494fd9a2a..f4c3fdba2cd2b 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1081,4 +1081,5 @@ const ( ErrWriteConflict = 9007 ErrTiKVStoreLimit = 9008 ErrPrometheusAddrIsNotSet = 9009 + ErrTiKVStaleCommand = 9010 ) diff --git a/errno/errname.go b/errno/errname.go index e04b43cf57efb..ad66df9267354 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1078,4 +1078,5 @@ var MySQLErrName = map[uint16]string{ ErrWriteConflict: "Write conflict, txnStartTS=%d, conflictStartTS=%d, conflictCommitTS=%d, key=%s", ErrTiKVStoreLimit: "Store token is up to the limit, store id = %d", ErrPrometheusAddrIsNotSet: "Prometheus address is not set in PD and etcd", + ErrTiKVStaleCommand: "TiKV server reports stale command", } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 55c9165d7e59a..48c993bdc58b0 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -51,6 +51,7 @@ var ( tikvBackoffHistogramRegionMiss = metrics.TiKVBackoffHistogram.WithLabelValues("regionMiss") tikvBackoffHistogramUpdateLeader = metrics.TiKVBackoffHistogram.WithLabelValues("updateLeader") tikvBackoffHistogramServerBusy = metrics.TiKVBackoffHistogram.WithLabelValues("serverBusy") + tikvBackoffHistogramStaleCmd = metrics.TiKVBackoffHistogram.WithLabelValues("staleCommand") tikvBackoffHistogramEmpty = metrics.TiKVBackoffHistogram.WithLabelValues("") ) @@ -70,6 +71,8 @@ func (t backoffType) metric() prometheus.Observer { return tikvBackoffHistogramUpdateLeader case boServerBusy: return tikvBackoffHistogramServerBusy + case boStaleCmd: + return tikvBackoffHistogramStaleCmd } return tikvBackoffHistogramEmpty } @@ -134,6 +137,7 @@ const ( BoUpdateLeader boServerBusy boTxnNotFound + boStaleCmd ) func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int { @@ -158,6 +162,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int return NewBackoffFn(1, 10, NoJitter) case boServerBusy: return NewBackoffFn(2000, 10000, EqualJitter) + case boStaleCmd: + return NewBackoffFn(2, 1000, NoJitter) } return nil } @@ -178,6 +184,8 @@ func (t backoffType) String() string { return "updateLeader" case boServerBusy: return "serverBusy" + case boStaleCmd: + return "staleCommand" case boTxnNotFound: return "txnNotFound" } @@ -196,6 +204,8 @@ func (t backoffType) TError() error { return ErrRegionUnavailable case boServerBusy: return ErrTiKVServerBusy + case boStaleCmd: + return ErrTiKVStaleCommand } return ErrUnknown } diff --git a/store/tikv/error.go b/store/tikv/error.go index cc5c87acc0c35..b135614321801 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -37,6 +37,7 @@ var ( ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]) ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]) ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) + ErrTiKVStaleCommand = terror.ClassTiKV.New(mysql.ErrTiKVStaleCommand, mysql.MySQLErrName[mysql.ErrTiKVStaleCommand]) ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet]) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index ac671683eb58b..568a641975e86 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -378,6 +378,10 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed } if regionErr.GetStaleCommand() != nil { logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) + err = bo.Backoff(boStaleCmd, errors.Errorf("stale command, ctx: %v", ctx)) + if err != nil { + return false, errors.Trace(err) + } return true, nil } if regionErr.GetRaftEntryTooLarge() != nil { diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index e45b33cf57321..e20a4807931d7 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -23,6 +23,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" @@ -89,6 +90,47 @@ func (s *testStoreLimitSuite) TearDownTest(c *C) { s.cache.Close() } +type fnClient struct { + fn func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) +} + +func (f *fnClient) Close() error { + return nil +} + +func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return f.fn(ctx, addr, req, timeout) +} + +func (s *testRegionRequestSuite) TestOnRegionError(c *C) { + req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ + Key: []byte("key"), + Value: []byte("value"), + }) + region, err := s.cache.LocateRegionByID(s.bo, s.region) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + + // test stale command retry. + func() { + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + staleResp := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: &errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}, + }} + return staleResp, nil + }} + bo := NewBackoffer(context.Background(), 5) + resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + c.Assert(err, NotNil) + c.Assert(resp, IsNil) + }() + +} + func (s *testStoreLimitSuite) TestStoreTokenLimit(c *C) { req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) region, err := s.cache.LocateRegionByID(s.bo, s.regionID)