Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the RURuntimeStats #732

Merged
merged 12 commits into from
Mar 16, 2023
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/stretchr/testify v1.8.1
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.6-0.20230228091502-e2da5527026f
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac
go.uber.org/goleak v1.2.1
)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67 h1:AXgc/Ij348pp0TsMPq/tmQA4O0EOAGntTKzB1imhpcU=
github.com/tikv/pd/client v0.0.0-20230309025512-47cd76ae5d67/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac h1:0XDlEdxbxEsy6lWfUxjvuO30q7wVavPeLYLNHfLL2E8=
github.com/tikv/pd/client v0.0.0-20230313083840-3e3ae55f68ac/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
Expand Down
32 changes: 25 additions & 7 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package client

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/tikv/client-go/v2/internal/resourcecontrol"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/util"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
)

Expand All @@ -34,17 +36,17 @@ var _ Client = interceptedClient{}

type interceptedClient struct {
Client
ruRuntimeStatsMap *sync.Map
}

// NewInterceptedClient creates a Client which can execute interceptor.
func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client {
return interceptedClient{client, ruRuntimeStatsMap}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
resourceGroupName := req.GetResourceGroupName()
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, resourceGroupName)
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS()))
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
Expand All @@ -61,6 +63,16 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
return r.Client.SendRequest(ctx, addr, req, timeout)
}

func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats {
if r.ruRuntimeStatsMap == nil || startTS == 0 {
return nil
}
if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok {
return v.(*util.RURuntimeStats)
}
return nil
}

var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
Expand All @@ -73,11 +85,12 @@ var (
func buildResourceControlInterceptor(
ctx context.Context,
req *tikvrpc.Request,
resourceGroupName string,
ruRuntimeStats *util.RURuntimeStats,
) interceptor.RPCInterceptor {
if !ResourceControlSwitch.Load().(bool) {
return nil
}
resourceGroupName := req.GetResourceGroupName()
// When the group name is empty or "default", we don't need to
// perform the resource control.
if len(resourceGroupName) == 0 || resourceGroupName == "default" {
Expand All @@ -92,14 +105,19 @@ func buildResourceControlInterceptor(
// Build the interceptor.
return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
consumption, err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
ResourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo)
consumption, err = ResourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
}
return resp, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c emptyClient) CloseAddr(addr string) error {

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{})
client := NewInterceptedClient(emptyClient{}, nil)
ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
executed = true
Expand Down
59 changes: 53 additions & 6 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ import (
"google.golang.org/grpc/keepalive"
)

// DCLabelKey indicates the key of label which represents the dc for Store.
const DCLabelKey = "zone"
const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
Expand Down Expand Up @@ -127,6 +134,9 @@ type KVStore struct {

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

// StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction.
ruRuntimeStatsMap sync.Map
JmPotato marked this conversation as resolved.
Show resolved Hide resolved

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -209,13 +219,14 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
cancel: cancel,
gP: NewSpool(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap))
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(2)
store.wg.Add(3)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -531,14 +542,14 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {

func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(time.Second * 2)
t := time.NewTicker(safeTSUpdateInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
return
case <-t.C:
s.updateSafeTS(ctx)
Expand Down Expand Up @@ -600,6 +611,42 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case now := <-t.C:
s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
})
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
return rrs.(*util.RURuntimeStats)
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
28 changes: 28 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

func TestKV(t *testing.T) {
util.EnableFailpoints()
suite.Run(t, new(testKVSuite))
}

Expand Down Expand Up @@ -122,3 +125,28 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *testKVSuite) TestRURuntimeStatsCleanUp() {
s.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
s.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := s.store.CreateRURuntimeStats(startTS)
s.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
s.store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
s.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}
48 changes: 48 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,51 @@ func (req *Request) IsRawWriteRequest() bool {

// ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context.
type ResourceGroupTagger func(req *Request)

// GetStartTS returns the `start_ts` of the request.
func (req *Request) GetStartTS() uint64 {
switch req.Type {
case CmdGet:
return req.Get().GetVersion()
case CmdScan:
return req.Scan().GetVersion()
case CmdPrewrite:
return req.Prewrite().GetStartVersion()
case CmdCommit:
return req.Commit().GetStartVersion()
case CmdCleanup:
return req.Cleanup().GetStartVersion()
case CmdBatchGet:
return req.BatchGet().GetVersion()
case CmdBatchRollback:
return req.BatchRollback().GetStartVersion()
case CmdScanLock:
return req.ScanLock().GetMaxVersion()
case CmdResolveLock:
return req.ResolveLock().GetStartVersion()
case CmdPessimisticLock:
return req.PessimisticLock().GetStartVersion()
case CmdPessimisticRollback:
return req.PessimisticRollback().GetStartVersion()
case CmdTxnHeartBeat:
return req.TxnHeartBeat().GetStartVersion()
case CmdCheckTxnStatus:
return req.CheckTxnStatus().GetLockTs()
case CmdCheckSecondaryLocks:
return req.CheckSecondaryLocks().GetStartVersion()
case CmdFlashbackToVersion:
return req.FlashbackToVersion().GetStartTs()
case CmdPrepareFlashbackToVersion:
req.PrepareFlashbackToVersion().GetStartTs()
case CmdCop:
return req.Cop().GetStartTs()
case CmdCopStream:
return req.Cop().GetStartTs()
case CmdBatchCop:
return req.BatchCop().GetStartTs()
case CmdMvccGetByStartTs:
return req.MvccGetByStartTs().GetStartTs()
default:
}
return 0
}
2 changes: 1 addition & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
// resourceGroupName is the name of tenent resource group.
// resourceGroupName is the name of tenant resource group.
resourceGroupName string

aggressiveLockingContext *aggressiveLockingContext
Expand Down
1 change: 1 addition & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo

req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos},
kvrpcpb.Context{
// TODO: how to pass the `start_ts` here?
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
},
Expand Down
Loading