From 8b7585c48b9eb357211c747a009c0f7db867fe50 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 8 Mar 2023 17:37:29 +0800 Subject: [PATCH 01/11] Introduce the RURuntimeStats Signed-off-by: JmPotato --- go.mod | 5 ++ go.sum | 8 +-- integration_tests/go.mod | 5 ++ integration_tests/go.sum | 8 +-- internal/client/client_interceptor.go | 67 +++++++++++++++++++--- internal/client/client_interceptor_test.go | 2 +- internal/locate/region_request_test.go | 11 ++++ tikv/kv.go | 10 +++- txnkv/transaction/cleanup.go | 1 + txnkv/transaction/commit.go | 1 + txnkv/transaction/pessimistic.go | 1 + txnkv/transaction/prewrite.go | 1 + txnkv/transaction/txn.go | 2 +- txnkv/txnlock/lock_resolver.go | 6 ++ txnkv/txnsnapshot/scan.go | 1 + txnkv/txnsnapshot/snapshot.go | 2 + 16 files changed, 113 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index d2d43d4ee6..36362526ea 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,11 @@ module github.com/tikv/client-go/v2 go 1.19 +replace ( + github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 + github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af +) + require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 diff --git a/go.sum b/go.sum index 25e37eb020..9414918a7a 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 h1:Ir3NQi3ucSyXEKj8EKC/lnitqD0bJ342c9ft+pmdSmo= +github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= +github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af h1:aUrcF9obP/mqlTlAarTajZqYlXEZQ64lgdDJTKOugF8= +github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -147,8 +151,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924 h1:z6WwBPP0Txmal5zf+H7vf/lSmKZtSS8BTNwiLjEjdnA= -github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -202,8 +204,6 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 7ee8ee2248..cccb0ecd2a 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -2,6 +2,11 @@ module integration_tests go 1.19 +replace ( + github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 + github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af +) + require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 477647c52e..00f46636c3 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -19,6 +19,10 @@ github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EF github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= +github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 h1:Ir3NQi3ucSyXEKj8EKC/lnitqD0bJ342c9ft+pmdSmo= +github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= +github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af h1:aUrcF9obP/mqlTlAarTajZqYlXEZQ64lgdDJTKOugF8= +github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -338,8 +342,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZLmhahmvHm7n9DUxGRQT00208= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924 h1:z6WwBPP0Txmal5zf+H7vf/lSmKZtSS8BTNwiLjEjdnA= -github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= @@ -441,8 +443,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU= -github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index c7047c677b..c352619746 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -16,9 +16,12 @@ package client import ( "context" + "fmt" + "sync" "sync/atomic" "time" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/tikv/client-go/v2/internal/resourcecontrol" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" @@ -34,17 +37,17 @@ var _ Client = interceptedClient{} type interceptedClient struct { Client + ruRuntimeStatsMap *sync.Map } // NewInterceptedClient creates a Client which can execute interceptor. -func NewInterceptedClient(client Client) Client { - return interceptedClient{client} +func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client { + return interceptedClient{client, ruRuntimeStatsMap} } func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { // Build the resource control interceptor. - resourceGroupName := req.GetResourceGroupName() - var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, resourceGroupName) + var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTs())) // Chain the interceptors if there are multiple interceptors. if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { if finalInterceptor != nil { @@ -61,6 +64,16 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti return r.Client.SendRequest(ctx, addr, req, timeout) } +func (r interceptedClient) getRURuntimeStats(startTS uint64) *RURuntimeStats { + if r.ruRuntimeStatsMap == nil { + return nil + } + if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok { + return v.(*RURuntimeStats) + } + return nil +} + var ( // ResourceControlSwitch is used to control whether to enable the resource control. ResourceControlSwitch atomic.Value @@ -73,11 +86,12 @@ var ( func buildResourceControlInterceptor( ctx context.Context, req *tikvrpc.Request, - resourceGroupName string, + ruRuntimeStats *RURuntimeStats, ) interceptor.RPCInterceptor { if !ResourceControlSwitch.Load().(bool) { return nil } + resourceGroupName := req.GetResourceGroupName() // When the group name is empty or "default", we don't need to // perform the resource control. if len(resourceGroupName) == 0 || resourceGroupName == "default" { @@ -92,16 +106,55 @@ func buildResourceControlInterceptor( // Build the interceptor. return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) + consumption, err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo) if err != nil { return nil, err } + ruRuntimeStats.Update(consumption) resp, err := next(target, req) if resp != nil { respInfo := resourcecontrol.MakeResponseInfo(resp) - ResourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo) + consumption, err = ResourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo) + if err != nil { + return nil, err + } + ruRuntimeStats.Update(consumption) } return resp, err } } } + +// RURuntimeStats is the runtime stats collector for RU. +type RURuntimeStats struct { + readRU float64 + writeRU float64 +} + +// Clone implements the RuntimeStats interface. +func (rs *RURuntimeStats) Clone() *RURuntimeStats { + return &RURuntimeStats{ + readRU: rs.readRU, + writeRU: rs.writeRU, + } +} + +// Merge implements the RuntimeStats interface. +func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { + rs.readRU += other.readRU + rs.writeRU += other.writeRU +} + +// String implements fmt.Stringer interface. +func (rs *RURuntimeStats) String() string { + return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU, rs.writeRU) +} + +// Update updates the RU runtime stats with the given consumption info. +func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { + if rs == nil || consumption == nil { + return + } + rs.readRU += consumption.RRU + rs.writeRU += consumption.WRU +} diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 2e1cca2230..88442fe238 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -40,7 +40,7 @@ func (c emptyClient) CloseAddr(addr string) error { func TestInterceptedClient(t *testing.T) { executed := false - client := NewInterceptedClient(emptyClient{}) + client := NewInterceptedClient(emptyClient{}, nil) ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { executed = true diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 70ad47ac42..c85324d0a2 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -422,6 +422,9 @@ func (s *mockTikvGrpcServer) BatchRaft(tikvpb.Tikv_BatchRaftServer) error { func (s *mockTikvGrpcServer) Snapshot(tikvpb.Tikv_SnapshotServer) error { return errors.New("unreachable") } +func (s *mockTikvGrpcServer) TabletSnapshot(tikvpb.Tikv_TabletSnapshotServer) error { + return errors.New("unreachable") +} func (s *mockTikvGrpcServer) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { return nil, errors.New("unreachable") } @@ -480,6 +483,14 @@ func (s *mockTikvGrpcServer) TryMarkDelete(context.Context, *disaggregated.TryMa return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) EstablishDisaggTask(context.Context, *disaggregated.EstablishDisaggTaskRequest) (*disaggregated.EstablishDisaggTaskResponse, error) { + return nil, errors.New("unreachable") +} + +func (s *mockTikvGrpcServer) FetchDisaggPages(*disaggregated.FetchDisaggPagesRequest, tikvpb.Tikv_FetchDisaggPagesServer) error { + return errors.New("unreachable") +} + func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.FlashbackToVersionRequest) (*kvrpcpb.FlashbackToVersionResponse, error) { return nil, errors.New("unreachable") } diff --git a/tikv/kv.go b/tikv/kv.go index 3776014e85..8b080f35ae 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -127,6 +127,9 @@ type KVStore struct { replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled + // StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction. + ruRuntimeStatsMap sync.Map + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -209,7 +212,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl cancel: cancel, gP: NewSpool(128, 10*time.Second), } - store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap)) store.lockResolver = txnlock.NewLockResolver(store) loadOption(store, opt...) @@ -600,6 +603,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } +// CreateRURuntimeStats creates a RURuntimeStats for the startTS. +func (s *KVStore) CreateRURuntimeStats(startTS uint64) { + s.ruRuntimeStatsMap.LoadOrStore(startTS, &client.RURuntimeStats{}) +} + // EnableResourceControl enables the resource control. func EnableResourceControl() { client.ResourceControlSwitch.Store(true) diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index 03cf6d0c33..16e19cd7b1 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -66,6 +66,7 @@ func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Ba Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, }, kvrpcpb.Context{ + StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 4871b07ec8..45d48fd4ba 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -77,6 +77,7 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac Keys: keys, CommitVersion: c.commitTS, }, kvrpcpb.Context{ + StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index fbdda27357..b48d0d12c1 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -133,6 +133,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * WakeUpMode: action.wakeUpMode, LockOnlyIfExists: action.LockOnlyIfExists, }, kvrpcpb.Context{ + StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag, diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 5f6c385c0a..4b3872cc45 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -184,6 +184,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } r := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, kvrpcpb.Context{ + StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 2bc42d62b2..7074b967bc 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -151,7 +151,7 @@ type KVTxn struct { interceptor interceptor.RPCInterceptor assertionLevel kvrpcpb.AssertionLevel *util.RequestSource - // resourceGroupName is the name of tenent resource group. + // resourceGroupName is the name of tenant resource group. resourceGroupName string aggressiveLockingContext *aggressiveLockingContext diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 8005909e4c..23070fc322 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -288,6 +288,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos}, kvrpcpb.Context{ + // TODO: how to pass the `start_ts` here? RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }, @@ -708,6 +709,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary ForceSyncCommit: forceSyncCommit, ResolvingPessimisticLock: resolvingPessimisticLock, }, kvrpcpb.Context{ + StartTs: txnID, RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) @@ -852,6 +854,7 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK StartVersion: txnID, } req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq, kvrpcpb.Context{ + StartTs: txnID, RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) @@ -1009,6 +1012,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region } lreq.Keys = keys req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{ + StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -1087,6 +1091,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat lreq.Keys = [][]byte{l.Key} } req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{ + StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -1143,6 +1148,7 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err Keys: [][]byte{l.Key}, } req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{ + StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 109c2a9442..e1ff56ffeb 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -240,6 +240,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{ + StartTs: s.startTS(), Priority: s.snapshot.priority.ToPB(), NotFillCache: s.snapshot.notFillCache, TaskId: s.snapshot.mu.taskID, diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ba0b5efcd3..3252142275 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -391,6 +391,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ + StartTs: s.version, Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, @@ -600,6 +601,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ + StartTs: s.version, Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, From 48359a76768a42cd9ec7f224a3e5c3cd9595660a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 9 Mar 2023 18:37:27 +0800 Subject: [PATCH 02/11] Use atomic.Float64 Signed-off-by: JmPotato --- internal/client/client_interceptor.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index c352619746..6821d8b1ea 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" @@ -26,6 +25,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" resourceControlClient "github.com/tikv/pd/client/resource_group/controller" + "go.uber.org/atomic" ) func init() { @@ -127,27 +127,27 @@ func buildResourceControlInterceptor( // RURuntimeStats is the runtime stats collector for RU. type RURuntimeStats struct { - readRU float64 - writeRU float64 + readRU *atomic.Float64 + writeRU *atomic.Float64 } // Clone implements the RuntimeStats interface. func (rs *RURuntimeStats) Clone() *RURuntimeStats { return &RURuntimeStats{ - readRU: rs.readRU, - writeRU: rs.writeRU, + readRU: atomic.NewFloat64(rs.readRU.Load()), + writeRU: atomic.NewFloat64(rs.writeRU.Load()), } } // Merge implements the RuntimeStats interface. func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { - rs.readRU += other.readRU - rs.writeRU += other.writeRU + rs.readRU.Add(other.readRU.Load()) + rs.writeRU.Add(other.writeRU.Load()) } // String implements fmt.Stringer interface. func (rs *RURuntimeStats) String() string { - return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU, rs.writeRU) + return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU.Load(), rs.writeRU.Load()) } // Update updates the RU runtime stats with the given consumption info. @@ -155,6 +155,6 @@ func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { if rs == nil || consumption == nil { return } - rs.readRU += consumption.RRU - rs.writeRU += consumption.WRU + rs.readRU.Add(consumption.RRU) + rs.writeRU.Add(consumption.WRU) } From a7b2b83f7200694269505708d5f61549c89e1c50 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 9 Mar 2023 19:06:25 +0800 Subject: [PATCH 03/11] Return RURuntimeStats after creating Signed-off-by: JmPotato --- tikv/kv.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tikv/kv.go b/tikv/kv.go index 8b080f35ae..39c4c1d438 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -603,9 +603,10 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } -// CreateRURuntimeStats creates a RURuntimeStats for the startTS. -func (s *KVStore) CreateRURuntimeStats(startTS uint64) { - s.ruRuntimeStatsMap.LoadOrStore(startTS, &client.RURuntimeStats{}) +// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. +func (s *KVStore) CreateRURuntimeStats(startTS uint64) *client.RURuntimeStats { + rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, &client.RURuntimeStats{}) + return rrs.(*client.RURuntimeStats) } // EnableResourceControl enables the resource control. From eece09618767a2a3f3f63c2466e5ab50d818d0ae Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 9 Mar 2023 19:20:23 +0800 Subject: [PATCH 04/11] Move RURuntimeStats out of internal pkg Signed-off-by: JmPotato --- internal/client/client_interceptor.go | 45 +++------------------------ tikv/kv.go | 6 ++-- util/execdetails.go | 37 ++++++++++++++++++++++ 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index 6821d8b1ea..24c4c54d53 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -16,16 +16,15 @@ package client import ( "context" - "fmt" "sync" + "sync/atomic" "time" - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/tikv/client-go/v2/internal/resourcecontrol" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" + "github.com/tikv/client-go/v2/util" resourceControlClient "github.com/tikv/pd/client/resource_group/controller" - "go.uber.org/atomic" ) func init() { @@ -64,12 +63,12 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti return r.Client.SendRequest(ctx, addr, req, timeout) } -func (r interceptedClient) getRURuntimeStats(startTS uint64) *RURuntimeStats { +func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats { if r.ruRuntimeStatsMap == nil { return nil } if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok { - return v.(*RURuntimeStats) + return v.(*util.RURuntimeStats) } return nil } @@ -86,7 +85,7 @@ var ( func buildResourceControlInterceptor( ctx context.Context, req *tikvrpc.Request, - ruRuntimeStats *RURuntimeStats, + ruRuntimeStats *util.RURuntimeStats, ) interceptor.RPCInterceptor { if !ResourceControlSwitch.Load().(bool) { return nil @@ -124,37 +123,3 @@ func buildResourceControlInterceptor( } } } - -// RURuntimeStats is the runtime stats collector for RU. -type RURuntimeStats struct { - readRU *atomic.Float64 - writeRU *atomic.Float64 -} - -// Clone implements the RuntimeStats interface. -func (rs *RURuntimeStats) Clone() *RURuntimeStats { - return &RURuntimeStats{ - readRU: atomic.NewFloat64(rs.readRU.Load()), - writeRU: atomic.NewFloat64(rs.writeRU.Load()), - } -} - -// Merge implements the RuntimeStats interface. -func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { - rs.readRU.Add(other.readRU.Load()) - rs.writeRU.Add(other.writeRU.Load()) -} - -// String implements fmt.Stringer interface. -func (rs *RURuntimeStats) String() string { - return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU.Load(), rs.writeRU.Load()) -} - -// Update updates the RU runtime stats with the given consumption info. -func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { - if rs == nil || consumption == nil { - return - } - rs.readRU.Add(consumption.RRU) - rs.writeRU.Add(consumption.WRU) -} diff --git a/tikv/kv.go b/tikv/kv.go index 39c4c1d438..f89b006902 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -604,9 +604,9 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { } // CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. -func (s *KVStore) CreateRURuntimeStats(startTS uint64) *client.RURuntimeStats { - rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, &client.RURuntimeStats{}) - return rrs.(*client.RURuntimeStats) +func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats { + rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, &util.RURuntimeStats{}) + return rrs.(*util.RURuntimeStats) } // EnableResourceControl enables the resource control. diff --git a/util/execdetails.go b/util/execdetails.go index 64d779914a..95a00fb805 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -37,6 +37,7 @@ package util import ( "bytes" "context" + "fmt" "math" "strconv" "sync" @@ -44,6 +45,8 @@ import ( "time" "github.com/pingcap/kvproto/pkg/kvrpcpb" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + uatomic "go.uber.org/atomic" ) type commitDetailCtxKeyType struct{} @@ -663,3 +666,37 @@ type ResolveLockDetail struct { func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) { rd.ResolveLockTime += resolveLock.ResolveLockTime } + +// RURuntimeStats is the runtime stats collector for RU. +type RURuntimeStats struct { + readRU *uatomic.Float64 + writeRU *uatomic.Float64 +} + +// Clone implements the RuntimeStats interface. +func (rs *RURuntimeStats) Clone() *RURuntimeStats { + return &RURuntimeStats{ + readRU: uatomic.NewFloat64(rs.readRU.Load()), + writeRU: uatomic.NewFloat64(rs.writeRU.Load()), + } +} + +// Merge implements the RuntimeStats interface. +func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { + rs.readRU.Add(other.readRU.Load()) + rs.writeRU.Add(other.writeRU.Load()) +} + +// String implements fmt.Stringer interface. +func (rs *RURuntimeStats) String() string { + return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU.Load(), rs.writeRU.Load()) +} + +// Update updates the RU runtime stats with the given consumption info. +func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) { + if rs == nil || consumption == nil { + return + } + rs.readRU.Add(consumption.RRU) + rs.writeRU.Add(consumption.WRU) +} From 55e0b314ab905352ce018123dcb3eb2177843133 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 10 Mar 2023 15:18:57 +0800 Subject: [PATCH 05/11] Fix the panic Signed-off-by: JmPotato --- tikv/kv.go | 2 +- util/execdetails.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tikv/kv.go b/tikv/kv.go index f89b006902..95250f8fca 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -605,7 +605,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { // CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats { - rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, &util.RURuntimeStats{}) + rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats()) return rrs.(*util.RURuntimeStats) } diff --git a/util/execdetails.go b/util/execdetails.go index 95a00fb805..23c07a6161 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -673,6 +673,14 @@ type RURuntimeStats struct { writeRU *uatomic.Float64 } +// NewRURuntimeStats creates a new RURuntimeStats. +func NewRURuntimeStats() *RURuntimeStats { + return &RURuntimeStats{ + readRU: uatomic.NewFloat64(0), + writeRU: uatomic.NewFloat64(0), + } +} + // Clone implements the RuntimeStats interface. func (rs *RURuntimeStats) Clone() *RURuntimeStats { return &RURuntimeStats{ From 21aa2e2388db83940442875a835e608ab532c96a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 13 Mar 2023 16:56:52 +0800 Subject: [PATCH 06/11] Update the PD version Signed-off-by: JmPotato --- go.mod | 7 ++----- go.sum | 8 ++++---- integration_tests/go.mod | 7 ++----- integration_tests/go.sum | 8 ++++---- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 36362526ea..6196411a05 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,7 @@ module github.com/tikv/client-go/v2 go 1.19 -replace ( - github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 - github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af -) +replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 @@ -26,7 +23,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/stretchr/testify v1.8.1 github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a - github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 + github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac github.com/twmb/murmur3 v1.1.3 go.etcd.io/etcd/api/v3 v3.5.2 go.etcd.io/etcd/client/v3 v3.5.2 diff --git a/go.sum b/go.sum index 9414918a7a..3ef38a0f01 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 h1:Ir3NQi3ucSyXEKj8EKC/lnitqD0bJ342c9ft+pmdSmo= -github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= -github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af h1:aUrcF9obP/mqlTlAarTajZqYlXEZQ64lgdDJTKOugF8= -github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= +github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a h1:+2l2pgdv0mb+0MBqOpvIVdDaotn8lUz+AwMQwX54VGM= +github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -204,6 +202,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index cccb0ecd2a..753238e3c9 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -2,10 +2,7 @@ module integration_tests go 1.19 -replace ( - github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 - github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af -) +replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a require ( github.com/ninedraft/israce v0.0.3 @@ -17,7 +14,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tidwall/gjson v1.14.1 github.com/tikv/client-go/v2 v2.0.6-0.20230228091502-e2da5527026f - github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 + github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac go.uber.org/goleak v1.2.1 ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 00f46636c3..515c686e8e 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -19,10 +19,8 @@ github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EF github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= -github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6 h1:Ir3NQi3ucSyXEKj8EKC/lnitqD0bJ342c9ft+pmdSmo= -github.com/JmPotato/kvproto v0.0.0-20230309094958-c2ae1b770df6/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= -github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af h1:aUrcF9obP/mqlTlAarTajZqYlXEZQ64lgdDJTKOugF8= -github.com/JmPotato/pd/client v0.0.0-20230309083851-79cdaf82c8af/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= +github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a h1:+2l2pgdv0mb+0MBqOpvIVdDaotn8lUz+AwMQwX54VGM= +github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -443,6 +441,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8= +github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= From ec6f5c5c4ab188e854031d978540e09833a5bed4 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 13 Mar 2023 18:03:09 +0800 Subject: [PATCH 07/11] Fix the test Signed-off-by: JmPotato --- internal/locate/region_request_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index c85324d0a2..70ad47ac42 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -422,9 +422,6 @@ func (s *mockTikvGrpcServer) BatchRaft(tikvpb.Tikv_BatchRaftServer) error { func (s *mockTikvGrpcServer) Snapshot(tikvpb.Tikv_SnapshotServer) error { return errors.New("unreachable") } -func (s *mockTikvGrpcServer) TabletSnapshot(tikvpb.Tikv_TabletSnapshotServer) error { - return errors.New("unreachable") -} func (s *mockTikvGrpcServer) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { return nil, errors.New("unreachable") } @@ -483,14 +480,6 @@ func (s *mockTikvGrpcServer) TryMarkDelete(context.Context, *disaggregated.TryMa return nil, errors.New("unreachable") } -func (s *mockTikvGrpcServer) EstablishDisaggTask(context.Context, *disaggregated.EstablishDisaggTaskRequest) (*disaggregated.EstablishDisaggTaskResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) FetchDisaggPages(*disaggregated.FetchDisaggPagesRequest, tikvpb.Tikv_FetchDisaggPagesServer) error { - return errors.New("unreachable") -} - func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.FlashbackToVersionRequest) (*kvrpcpb.FlashbackToVersionResponse, error) { return nil, errors.New("unreachable") } From ab813e336a358d718248199b7311498ffecbd835 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 14 Mar 2023 14:51:24 +0800 Subject: [PATCH 08/11] Adjust the RURuntimeStats string format Signed-off-by: JmPotato --- util/execdetails.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/execdetails.go b/util/execdetails.go index 23c07a6161..d30f576cbf 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -697,7 +697,7 @@ func (rs *RURuntimeStats) Merge(other *RURuntimeStats) { // String implements fmt.Stringer interface. func (rs *RURuntimeStats) String() string { - return fmt.Sprintf("RRU: %f, WRU: %f", rs.readRU.Load(), rs.writeRU.Load()) + return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load()) } // Update updates the RU runtime stats with the given consumption info. From 249992d948ba78de7d34872f5b4afa8346e4f994 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 14 Mar 2023 16:42:28 +0800 Subject: [PATCH 09/11] Add RURuntimeStats clean-up Signed-off-by: JmPotato --- tikv/kv.go | 49 ++++++++++++++++++++++++++++++++++++++++++++----- tikv/kv_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/tikv/kv.go b/tikv/kv.go index 95250f8fca..818df3fbf5 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -74,8 +74,15 @@ import ( "google.golang.org/grpc/keepalive" ) -// DCLabelKey indicates the key of label which represents the dc for Store. -const DCLabelKey = "zone" +const ( + // DCLabelKey indicates the key of label which represents the dc for Store. + DCLabelKey = "zone" + safeTSUpdateInterval = time.Second * 2 + // Since the default max transaction TTL is 1 hour, we can use this to + // clean up the RU runtime stats as well. + ruRuntimeStatsCleanThreshold = time.Hour + ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2 +) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { cfg := config.GetGlobalConfig() @@ -216,9 +223,10 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl store.lockResolver = txnlock.NewLockResolver(store) loadOption(store, opt...) - store.wg.Add(2) + store.wg.Add(3) go store.runSafePointChecker() go store.safeTSUpdater() + go store.ruRuntimeStatsMapCleaner() return store, nil } @@ -534,14 +542,14 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() - t := time.NewTicker(time.Second * 2) + t := time.NewTicker(safeTSUpdateInterval) defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) defer cancel() for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case <-t.C: s.updateSafeTS(ctx) @@ -603,6 +611,37 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } +func (s *KVStore) ruRuntimeStatsMapCleaner() { + defer s.wg.Done() + t := time.NewTicker(ruRuntimeStatsCleanInterval) + defer t.Stop() + ctx, cancel := context.WithCancel(s.ctx) + ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) + defer cancel() + + cleanThreshold := ruRuntimeStatsCleanThreshold + if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil { + t.Reset(time.Millisecond * 100) + cleanThreshold = time.Millisecond + } + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + now := time.Now() + s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool { + startTSTime := oracle.GetTimeFromTS(key.(uint64)) + if now.Sub(startTSTime) >= cleanThreshold { + s.ruRuntimeStatsMap.Delete(key) + } + return true + }) + } + } +} + // CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it. func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats { rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats()) diff --git a/tikv/kv_test.go b/tikv/kv_test.go index a027cd8262..801948bb85 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" @@ -28,9 +29,11 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" ) func TestKV(t *testing.T) { + util.EnableFailpoints() suite.Run(t, new(testKVSuite)) } @@ -122,3 +125,28 @@ func (s *testKVSuite) TestMinSafeTs() { s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) } + +func (s *testKVSuite) TestRURuntimeStatsCleanUp() { + s.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`)) + defer func() { + s.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean")) + }() + + mockClient := storeSafeTsMockClient{ + Client: s.store.GetTiKVClient(), + testSuite: s, + } + s.store.SetTiKVClient(&mockClient) + + // Create a ruRuntimeStats first. + startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0) + ruRuntimeStats := s.store.CreateRURuntimeStats(startTS) + s.NotNil(ruRuntimeStats) + // Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap. + time.Sleep(time.Millisecond * 150) + // The ruRuntimeStatsMap should be cleaned up. + s.store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool { + s.Fail("ruRuntimeStatsMap should be cleaned up") + return true + }) +} From d793ffa4807d3be144cbf70abb2031a9ba49e3ea Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 15 Mar 2023 10:45:35 +0800 Subject: [PATCH 10/11] Use type matching to get start_ts Signed-off-by: JmPotato --- go.mod | 2 -- go.sum | 4 +-- integration_tests/go.mod | 2 -- integration_tests/go.sum | 4 +-- internal/client/client_interceptor.go | 4 +-- tikv/kv.go | 3 +- tikvrpc/tikvrpc.go | 46 +++++++++++++++++++++++++++ txnkv/transaction/cleanup.go | 1 - txnkv/transaction/commit.go | 1 - txnkv/transaction/pessimistic.go | 1 - txnkv/transaction/prewrite.go | 1 - txnkv/txnlock/lock_resolver.go | 5 --- txnkv/txnsnapshot/scan.go | 1 - txnkv/txnsnapshot/snapshot.go | 2 -- 14 files changed, 53 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 6196411a05..f3772efed0 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/tikv/client-go/v2 go 1.19 -replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a - require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 diff --git a/go.sum b/go.sum index 3ef38a0f01..d15ddbaf41 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a h1:+2l2pgdv0mb+0MBqOpvIVdDaotn8lUz+AwMQwX54VGM= -github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -149,6 +147,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924 h1:z6WwBPP0Txmal5zf+H7vf/lSmKZtSS8BTNwiLjEjdnA= +github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 753238e3c9..f90b85454c 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -2,8 +2,6 @@ module integration_tests go 1.19 -replace github.com/pingcap/kvproto => github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a - require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 515c686e8e..aacf1c2181 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -19,8 +19,6 @@ github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EF github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= -github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a h1:+2l2pgdv0mb+0MBqOpvIVdDaotn8lUz+AwMQwX54VGM= -github.com/JmPotato/kvproto v0.0.0-20230313095855-d6780293a06a/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -340,6 +338,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZLmhahmvHm7n9DUxGRQT00208= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924 h1:z6WwBPP0Txmal5zf+H7vf/lSmKZtSS8BTNwiLjEjdnA= +github.com/pingcap/kvproto v0.0.0-20230312142449-01623096c924/go.mod h1:KUrW1FGoznGMMTssYBu0czfAhn6vQcIrHyZoSC6T990= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index 24c4c54d53..546953aa76 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -46,7 +46,7 @@ func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client { func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { // Build the resource control interceptor. - var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTs())) + var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS())) // Chain the interceptors if there are multiple interceptors. if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { if finalInterceptor != nil { @@ -64,7 +64,7 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti } func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats { - if r.ruRuntimeStatsMap == nil { + if r.ruRuntimeStatsMap == nil || startTS == 0 { return nil } if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok { diff --git a/tikv/kv.go b/tikv/kv.go index 818df3fbf5..86f02a0243 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -629,8 +629,7 @@ func (s *KVStore) ruRuntimeStatsMapCleaner() { select { case <-ctx.Done(): return - case <-t.C: - now := time.Now() + case now := <-t.C: s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool { startTSTime := oracle.GetTimeFromTS(key.(uint64)) if now.Sub(startTSTime) >= cleanThreshold { diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 40f33f0dfb..3cf57dce05 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -1289,3 +1289,49 @@ func (req *Request) IsRawWriteRequest() bool { // ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context. type ResourceGroupTagger func(req *Request) + +// GetStartTS returns the `start_ts` of the request. +func (req *Request) GetStartTS() uint64 { + switch req.Type { + case CmdGet: + return req.Get().GetVersion() + case CmdScan: + return req.Scan().GetVersion() + case CmdPrewrite: + return req.Prewrite().GetStartVersion() + case CmdPessimisticLock: + return req.PessimisticLock().GetStartVersion() + case CmdPessimisticRollback: + return req.PessimisticRollback().GetStartVersion() + case CmdCommit: + return req.Commit().GetStartVersion() + case CmdCleanup: + return req.Cleanup().GetStartVersion() + case CmdBatchGet: + return req.BatchGet().GetVersion() + case CmdBatchRollback: + return req.BatchRollback().GetStartVersion() + case CmdResolveLock: + return req.ResolveLock().GetStartVersion() + case CmdCop: + return req.Cop().GetStartTs() + case CmdCopStream: + return req.Cop().GetStartTs() + case CmdBatchCop: + return req.BatchCop().GetStartTs() + case CmdMvccGetByStartTs: + return req.MvccGetByStartTs().GetStartTs() + case CmdTxnHeartBeat: + return req.TxnHeartBeat().GetStartVersion() + case CmdCheckTxnStatus: + return req.CheckTxnStatus().GetLockTs() + case CmdCheckSecondaryLocks: + return req.CheckSecondaryLocks().GetStartVersion() + case CmdFlashbackToVersion: + return req.FlashbackToVersion().GetStartTs() + case CmdPrepareFlashbackToVersion: + req.PrepareFlashbackToVersion().GetStartTs() + default: + } + return 0 +} diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index 16e19cd7b1..03cf6d0c33 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -66,7 +66,6 @@ func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Ba Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, }, kvrpcpb.Context{ - StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 45d48fd4ba..4871b07ec8 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -77,7 +77,6 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac Keys: keys, CommitVersion: c.commitTS, }, kvrpcpb.Context{ - StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index b48d0d12c1..fbdda27357 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -133,7 +133,6 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * WakeUpMode: action.wakeUpMode, LockOnlyIfExists: action.LockOnlyIfExists, }, kvrpcpb.Context{ - StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag, diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 4b3872cc45..5f6c385c0a 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -184,7 +184,6 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } r := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, kvrpcpb.Context{ - StartTs: c.startTS, Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 23070fc322..c5882e39af 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -709,7 +709,6 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary ForceSyncCommit: forceSyncCommit, ResolvingPessimisticLock: resolvingPessimisticLock, }, kvrpcpb.Context{ - StartTs: txnID, RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) @@ -854,7 +853,6 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK StartVersion: txnID, } req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq, kvrpcpb.Context{ - StartTs: txnID, RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) @@ -1012,7 +1010,6 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region } lreq.Keys = keys req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{ - StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -1091,7 +1088,6 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat lreq.Keys = [][]byte{l.Key} } req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{ - StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -1148,7 +1144,6 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err Keys: [][]byte{l.Key}, } req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{ - StartTs: l.TxnID, ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index e1ff56ffeb..109c2a9442 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -240,7 +240,6 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{ - StartTs: s.startTS(), Priority: s.snapshot.priority.ToPB(), NotFillCache: s.snapshot.notFillCache, TaskId: s.snapshot.mu.taskID, diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 3252142275..ba0b5efcd3 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -391,7 +391,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ - StartTs: s.version, Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, @@ -601,7 +600,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ - StartTs: s.version, Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, From 19de8ea52837b304641e49496e8d035819928416 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 15 Mar 2023 11:10:20 +0800 Subject: [PATCH 11/11] Sort out the requests with start_ts field Signed-off-by: JmPotato --- tikvrpc/tikvrpc.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 3cf57dce05..9993543410 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -1299,10 +1299,6 @@ func (req *Request) GetStartTS() uint64 { return req.Scan().GetVersion() case CmdPrewrite: return req.Prewrite().GetStartVersion() - case CmdPessimisticLock: - return req.PessimisticLock().GetStartVersion() - case CmdPessimisticRollback: - return req.PessimisticRollback().GetStartVersion() case CmdCommit: return req.Commit().GetStartVersion() case CmdCleanup: @@ -1311,16 +1307,14 @@ func (req *Request) GetStartTS() uint64 { return req.BatchGet().GetVersion() case CmdBatchRollback: return req.BatchRollback().GetStartVersion() + case CmdScanLock: + return req.ScanLock().GetMaxVersion() case CmdResolveLock: return req.ResolveLock().GetStartVersion() - case CmdCop: - return req.Cop().GetStartTs() - case CmdCopStream: - return req.Cop().GetStartTs() - case CmdBatchCop: - return req.BatchCop().GetStartTs() - case CmdMvccGetByStartTs: - return req.MvccGetByStartTs().GetStartTs() + case CmdPessimisticLock: + return req.PessimisticLock().GetStartVersion() + case CmdPessimisticRollback: + return req.PessimisticRollback().GetStartVersion() case CmdTxnHeartBeat: return req.TxnHeartBeat().GetStartVersion() case CmdCheckTxnStatus: @@ -1331,6 +1325,14 @@ func (req *Request) GetStartTS() uint64 { return req.FlashbackToVersion().GetStartTs() case CmdPrepareFlashbackToVersion: req.PrepareFlashbackToVersion().GetStartTs() + case CmdCop: + return req.Cop().GetStartTs() + case CmdCopStream: + return req.Cop().GetStartTs() + case CmdBatchCop: + return req.BatchCop().GetStartTs() + case CmdMvccGetByStartTs: + return req.MvccGetByStartTs().GetStartTs() default: } return 0