Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into sche-redirect6
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Nov 1, 2023
2 parents d8cd97f + a1a1eea commit 78d26b6
Show file tree
Hide file tree
Showing 78 changed files with 3,096 additions and 601 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ concurrency:
jobs:
statics:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20
steps:
- uses: actions/setup-go@v3
with:
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
error = '''
cannot find scheduling address
'''

["PD:mcs:ErrSchedulingServer"]
error = '''
scheduling server meets %v
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk=
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
105 changes: 105 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,111 @@
"timeFrom": null,
"timeShift": null
},
{
"bars": false,
"cacheTimeout": null,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The current peer count of the cluster",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 6,
"w": 4,
"x": 16,
"y": 13
},
"hiddenSeries": false,
"id": 22,
"interval": null,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"maxDataPoints": 100,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(pd_rule_manager_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (type)",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{type}}",
"refId": "A",
"step": 4
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Placement Rules Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:192",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:193",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"collapsed": true,
"gridPos": {
Expand Down
21 changes: 19 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,8 +1339,8 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
r.t.RLock()
defer r.t.RUnlock()
return r.tree.notFromStorageRegionsCnt
}

Expand Down Expand Up @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 {
return r.tree.TotalSize() / int64(r.tree.length())
}

// ValidRegion is used to decide if the region is valid.
func (r *RegionsInfo) ValidRegion(region *metapb.Region) error {
startKey := region.GetStartKey()
currnetRegion := r.GetRegionByKey(startKey)
if currnetRegion == nil {
return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region)))
}
// If the request epoch is less than current region epoch, then returns an error.
regionEpoch := region.GetRegionEpoch()
currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch()
if regionEpoch.GetVersion() < currnetEpoch.GetVersion() ||
regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch)
}
return nil
}

// DiffRegionPeersInfo return the difference of peers info between two RegionInfo
func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string {
var ret []string
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,9 @@ var (
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
// RequestRU requests the RU of the resource group.
func (rg *ResourceGroup) RequestRU(
now time.Time,
neededTokens float64,
requiredToken float64,
targetPeriodMs, clientUniqueID uint64,
) *rmpb.GrantedRUTokenBucket {
rg.Lock()
Expand All @@ -147,7 +147,7 @@ func (rg *ResourceGroup) RequestRU(
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID)
tb, trickleTimeMs := rg.RUSettings.RU.request(now, requiredToken, targetPeriodMs, clientUniqueID)
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

Expand Down
42 changes: 21 additions & 21 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
}

// updateTokens updates the tokens and settings.
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, consumptionToken float64) {
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, requiredToken float64) {
var elapseTokens float64
if !gtb.Initialized {
gtb.init(now, clientUniqueID)
Expand All @@ -288,46 +288,46 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
gtb.Tokens = burst
}
// Balance each slots.
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens)
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, requiredToken, elapseTokens)
}

// request requests tokens from the corresponding slot.
func (gtb *GroupTokenBucket) request(now time.Time,
neededTokens float64,
requiredToken float64,
targetPeriodMs, clientUniqueID uint64,
) (*rmpb.TokenBucket, int64) {
burstLimit := gtb.Settings.GetBurstLimit()
gtb.updateTokens(now, burstLimit, clientUniqueID, neededTokens)
gtb.updateTokens(now, burstLimit, clientUniqueID, requiredToken)
slot, ok := gtb.tokenSlots[clientUniqueID]
if !ok {
return &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{BurstLimit: burstLimit}}, 0
}
res, trickleDuration := slot.assignSlotTokens(neededTokens, targetPeriodMs)
res, trickleDuration := slot.assignSlotTokens(requiredToken, targetPeriodMs)
// Update bucket to record all tokens.
gtb.Tokens -= slot.lastTokenCapacity - slot.tokenCapacity
slot.lastTokenCapacity = slot.tokenCapacity

return res, trickleDuration
}

func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) {
func (ts *TokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) {
var res rmpb.TokenBucket
burstLimit := ts.settings.GetBurstLimit()
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit}
// If BurstLimit < 0, just return.
if burstLimit < 0 {
res.Tokens = neededTokens
res.Tokens = requiredToken
return &res, 0
}
// FillRate is used for the token server unavailable in abnormal situation.
if neededTokens <= 0 {
if requiredToken <= 0 {
return &res, 0
}
// If the current tokens can directly meet the requirement, returns the need token.
if ts.tokenCapacity >= neededTokens {
ts.tokenCapacity -= neededTokens
if ts.tokenCapacity >= requiredToken {
ts.tokenCapacity -= requiredToken
// granted the total request tokens
res.Tokens = neededTokens
res.Tokens = requiredToken
return &res, 0
}

Expand All @@ -336,7 +336,7 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6
hasRemaining := false
if ts.tokenCapacity > 0 {
grantedTokens = ts.tokenCapacity
neededTokens -= grantedTokens
requiredToken -= grantedTokens
ts.tokenCapacity = 0
hasRemaining = true
}
Expand Down Expand Up @@ -373,36 +373,36 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6
for i := 1; i < loanCoefficient; i++ {
p[i] = float64(loanCoefficient-i)*float64(fillRate)*targetPeriodTimeSec + p[i-1]
}
for i := 0; i < loanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTimeSec; i++ {
for i := 0; i < loanCoefficient && requiredToken > 0 && trickleTime < targetPeriodTimeSec; i++ {
loan := -ts.tokenCapacity
if loan >= p[i] {
continue
}
roundReserveTokens := p[i] - loan
fillRate := float64(loanCoefficient-i) * float64(fillRate)
if roundReserveTokens > neededTokens {
ts.tokenCapacity -= neededTokens
grantedTokens += neededTokens
if roundReserveTokens > requiredToken {
ts.tokenCapacity -= requiredToken
grantedTokens += requiredToken
trickleTime += grantedTokens / fillRate
neededTokens = 0
requiredToken = 0
} else {
roundReserveTime := roundReserveTokens / fillRate
if roundReserveTime+trickleTime >= targetPeriodTimeSec {
roundTokens := (targetPeriodTimeSec - trickleTime) * fillRate
neededTokens -= roundTokens
requiredToken -= roundTokens
ts.tokenCapacity -= roundTokens
grantedTokens += roundTokens
trickleTime = targetPeriodTimeSec
} else {
grantedTokens += roundReserveTokens
neededTokens -= roundReserveTokens
requiredToken -= roundReserveTokens
ts.tokenCapacity -= roundReserveTokens
trickleTime += roundReserveTime
}
}
}
if neededTokens > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec {
reservedTokens := math.Min(neededTokens+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec)
if requiredToken > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec {
reservedTokens := math.Min(requiredToken+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec)
ts.tokenCapacity -= reservedTokens - grantedTokens
grantedTokens = reservedTokens
}
Expand Down
Loading

0 comments on commit 78d26b6

Please sign in to comment.