Skip to content

Commit

Permalink
*: add table id in resource tag
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Aug 21, 2024
1 parent 1f095a3 commit 6171494
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 40 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,5 @@ replace (
sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)

replace github.com/pingcap/tipb => github.com/nolouch/tipb v0.0.0-20240821080320-6510d6ca73a1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk=
github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c=
github.com/nolouch/tipb v0.0.0-20240821080320-6510d6ca73a1 h1:DeghbXb/fPBXbbNkE+IIRyje3NA8X7iEocmEwMo2a2k=
github.com/nolouch/tipb v0.0.0-20240821080320-6510d6ca73a1/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down Expand Up @@ -703,8 +705,6 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b h1:tySAGYw21A3Xa8CcA9jBTfrgAB3+KQWyqyW7fUyokzk=
github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ go_library(
"//pkg/util/memory",
"//pkg/util/mock",
"//pkg/util/ranger",
"//pkg/util/resourcegrouptag",
"//pkg/util/rowDecoder",
"//pkg/util/rowcodec",
"//pkg/util/set",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -431,7 +430,7 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(jobID int64, jobType model.ActionType
ctx.setDDLLabelForDiagnosis(jobType)
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger {
func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) *kv.ResourceGroupTagBuilder {
dc.jobCtx.Lock()
defer dc.jobCtx.Unlock()
ctx, exists := dc.jobCtx.jobCtxMap[jobID]
Expand Down
10 changes: 2 additions & 8 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -627,17 +625,13 @@ func (w *worker) checkBeforeCommit() error {
return nil
}

func (w *ReorgContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
func (w *ReorgContext) getResourceGroupTaggerForTopSQL() *kv.ResourceGroupTagBuilder {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
}

digest := w.cacheDigest
tagger := func(req *tikvrpc.Request) {
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}
return tagger
return kv.NewResourceGroupTagBuilder().SetSQLDigest(digest)
}

func (w *ReorgContext) ddlJobSourceType() string {
Expand Down
1 change: 0 additions & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//metadata",
Expand Down
1 change: 0 additions & 1 deletion pkg/distsql/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ go_library(
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikvrpc",
],
)
3 changes: 1 addition & 2 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
)

// DistSQLContext provides all information needed by using functions in `distsql`
Expand Down Expand Up @@ -66,7 +65,7 @@ type DistSQLContext struct {
NotFillCache bool
TaskID uint64
Priority mysql.PriorityEnum
ResourceGroupTagger tikvrpc.ResourceGroupTagger
ResourceGroupTagger *kv.ResourceGroupTagBuilder
EnablePaging bool
MinPagingSize int
MaxPagingSize int
Expand Down
3 changes: 1 addition & 2 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc"
)

// RequestBuilder is used to build a "kv.Request".
Expand Down Expand Up @@ -374,7 +373,7 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.MetaOnlyInfoSchem
}

// SetResourceGroupTagger sets the request resource group tagger.
func (builder *RequestBuilder) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) *RequestBuilder {
func (builder *RequestBuilder) SetResourceGroupTagger(tagger *kv.ResourceGroupTagBuilder) *RequestBuilder {
builder.Request.ResourceGroupTagger = tagger
return builder
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,17 @@ func newLockCtx(sctx sessionctx.Context, lockWaitTime int64, numKeys int) (*tikv
if len(normalized) == 0 {
return nil
}

_, planDigest := seVars.StmtCtx.GetPlanDigest()
return resourcegrouptag.EncodeResourceGroupTag(digest, planDigest, label)
db := ""
if len(seVars.StmtCtx.Tables) > 0 {
db = seVars.StmtCtx.Tables[0].DB
}
if len(db) == 0 {
db = seVars.CurrentDB
}
tid := tablecodec.DecodeTableID(mutation.Key)
return resourcegrouptag.EncodeResourceGroupTagWithSchema(digest, planDigest, label, db, tid)
}
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/config",
"//pkg/domain/resourcegroup",
"//pkg/errno",
"//pkg/parser",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
Expand All @@ -34,6 +35,7 @@ go_library(
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/resourcegrouptag",
"//pkg/util/set",
"//pkg/util/size",
"//pkg/util/tiflash",
Expand Down
86 changes: 85 additions & 1 deletion pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -576,7 +580,7 @@ type Request struct {
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels []*metapb.StoreLabel
// ResourceGroupTagger indicates the kv request task group tagger.
ResourceGroupTagger tikvrpc.ResourceGroupTagger
ResourceGroupTagger *ResourceGroupTagBuilder
// Paging indicates whether the request is a paging request.
Paging struct {
Enable bool
Expand Down Expand Up @@ -766,3 +770,83 @@ const (
// RCCheckTS stands for 'read consistency read with ts check'.
RCCheckTS
)

// ResourceGroupTagBuilder is used to build the resource group tag for a kv request.
type ResourceGroupTagBuilder struct {
sqlDigest *parser.Digest
planDigest *parser.Digest
accessKey []byte
}

// NewResourceGroupTagBuilder creates a new ResourceGroupTagBuilder.
func NewResourceGroupTagBuilder() *ResourceGroupTagBuilder {
return &ResourceGroupTagBuilder{}
}

// SetSQLDigest sets the sql digest for the request.
func (b *ResourceGroupTagBuilder) SetSQLDigest(digest *parser.Digest) *ResourceGroupTagBuilder {
b.sqlDigest = digest
return b
}

// SetPlanDigest sets the plan digest for the request.
func (b *ResourceGroupTagBuilder) SetPlanDigest(digest *parser.Digest) *ResourceGroupTagBuilder {
b.planDigest = digest
return b
}

// SetAccessKey sets the access key for the request.
func (b *ResourceGroupTagBuilder) BuildProtoTagger() tikvrpc.ResourceGroupTagger {
return func(req *tikvrpc.Request) {
b.Build(req)
}
}

// Build builds the resource group tag for the request.
func (b *ResourceGroupTagBuilder) Build(req *tikvrpc.Request) {
if req == nil {
return
}
tag := &tipb.ResourceGroupTag{}
if b.sqlDigest != nil {
tag.SqlDigest = b.sqlDigest.Bytes()
}
if b.planDigest != nil {
tag.PlanDigest = b.planDigest.Bytes()
}
key := resourcegrouptag.GetFirstKeyFromRequest(req)
if len(b.accessKey) > 0 {
key = b.accessKey
}
if len(key) > 0 {
tag.TableId = decodeTableID(key)
label := resourcegrouptag.GetResourceGroupLabelByKey(key)
tag.Label = &label
}
tagEncoded, err := tag.Marshal()
if err != nil {
return
}
req.ResourceGroupTag = tagEncoded
}

var tablePrefix = []byte{'t'}

// DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0.
// avoid import cycle
func decodeTableID(key Key) int64 {
if !key.HasPrefix(tablePrefix) {
// If the key is in API V2, then ignore the prefix
_, k, err := tikv.DecodeKey(key, kvrpcpb.APIVersion_V2)
if err != nil {
return 0
}
key = k
if !key.HasPrefix(tablePrefix) {
return 0
}
}
key = key[len(tablePrefix):]
_, tableID, _ := codec.DecodeInt(key)
return tableID
}
3 changes: 1 addition & 2 deletions pkg/sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/distsql/context",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/kv",
"//pkg/parser",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand All @@ -25,10 +26,8 @@ go_library(
"//pkg/util/linter/constructor",
"//pkg/util/memory",
"//pkg/util/nocopy",
"//pkg/util/resourcegrouptag",
"//pkg/util/topsql/stmtstats",
"//pkg/util/tracing",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
Expand Down
21 changes: 7 additions & 14 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand All @@ -44,10 +45,8 @@ import (
"github.com/pingcap/tidb/pkg/util/linter/constructor"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/nocopy"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/tikv/client-go/v2/tikvrpc"
atomic2 "go.uber.org/atomic"
"golang.org/x/exp/maps"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -686,20 +685,14 @@ func (sc *StatementContext) SetBinaryPlan(binaryPlan string) {
sc.binaryPlan = binaryPlan
}

// GetResourceGroupTagger returns the implementation of tikvrpc.ResourceGroupTagger related to self.
func (sc *StatementContext) GetResourceGroupTagger() tikvrpc.ResourceGroupTagger {
// GetResourceGroupTagger returns the implementation of kv.ResourceGroupTagBuilder related to self.
func (sc *StatementContext) GetResourceGroupTagger() *kv.ResourceGroupTagBuilder {
tagger := kv.NewResourceGroupTagBuilder().SetPlanDigest(sc.planDigest)
normalized, digest := sc.SQLDigest()
planDigest := sc.planDigest
return func(req *tikvrpc.Request) {
if req == nil {
return
}
if len(normalized) == 0 {
return
}
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, planDigest,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
if len(normalized) > 0 {
tagger.SetSQLDigest(digest)
}
return tagger
}

// SetUseChunkAlloc set use chunk alloc status
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo
},
})
if b.req.ResourceGroupTagger != nil {
b.req.ResourceGroupTagger(req)
b.req.ResourceGroupTagger.Build(req)
}
req.StoreTp = getEndPointType(kv.TiFlash)

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
req.IsRetryRequest = true
}
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
worker.req.ResourceGroupTagger.Build(req)
}
timeout := config.GetGlobalConfig().TiKVClient.CoprReqTimeout
if task.tikvClientReadTimeout > 0 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (s *tikvSnapshot) SetOption(opt int, val any) {
case kv.ResourceGroupTag:
s.KVSnapshot.SetResourceGroupTag(val.([]byte))
case kv.ResourceGroupTagger:
switch tagger := val.(type) {
case tikvrpc.ResourceGroupTagger:
s.KVSnapshot.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger))
case *kv.ResourceGroupTagBuilder:
s.KVSnapshot.SetResourceGroupTagger(tagger.BuildProtoTagger())
}
s.KVSnapshot.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger))
case kv.ReadReplicaScope:
s.KVSnapshot.SetReadReplicaScope(val.(string))
Expand Down
7 changes: 6 additions & 1 deletion pkg/store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,12 @@ func (txn *tikvTxn) SetOption(opt int, val any) {
case kv.ResourceGroupTag:
txn.KVTxn.SetResourceGroupTag(val.([]byte))
case kv.ResourceGroupTagger:
txn.KVTxn.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger))
switch tagger := val.(type) {
case tikvrpc.ResourceGroupTagger:
txn.KVTxn.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger))
case *kv.ResourceGroupTagBuilder:
txn.KVTxn.SetResourceGroupTagger(tagger.BuildProtoTagger())
}
case kv.KVFilter:
txn.KVTxn.SetKVFilter(val.(tikv.KVFilter))
case kv.SnapInterceptor:
Expand Down
Loading

0 comments on commit 6171494

Please sign in to comment.