diff --git a/pkg/core/region.go b/pkg/core/region.go index 41bfb4d31add..ba5b818b29fe 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1004,6 +1004,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R return overlaps, nil } +// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error. +// Usually used with CheckAndPutSubTree together. +func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { + tracer := ctx.Tracer + r.t.Lock() + var ols []*regionItem + origin := r.getRegionLocked(region.GetID()) + if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { + ols = r.tree.overlaps(®ionItem{RegionInfo: region}) + } + tracer.OnCheckOverlapsFinished() + err := check(region, origin, convertItemsToRegions(ols)) + if err != nil { + r.t.Unlock() + tracer.OnValidateRegionFinished() + return nil, err + } + tracer.OnValidateRegionFinished() + _, overlaps, _ := r.setRegionLocked(region, true, ols...) + r.t.Unlock() + tracer.OnSetRegionFinished() + return overlaps, nil +} + +// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error. +// Usually used with CheckAndPutRootTree together. +func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) error { + // new region get from root tree again + var newRegion *RegionInfo + newRegion = r.GetRegion(region.GetID()) + if newRegion == nil { + newRegion = region + } + r.UpdateSubTreeOrderInsensitive(newRegion) + return nil +} + +// UpdateSubTreeOrderInsensitive updates the subtree. +// It's can used to update the subtree concurrently. +// because it can use concurrently, check region version to make sure the order. +// 1. if the version is stale, drop this update. +// 2. if the version is same, then only some statistic info need to be updated. the order of update is not important. +// +// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic. +func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) { + var origin *RegionInfo + r.st.Lock() + defer r.st.Unlock() + originItem, ok := r.subRegions[region.GetID()] + if ok { + origin = originItem.RegionInfo + } + rangeChanged := true + + if origin != nil { + re := region.GetRegionEpoch() + oe := origin.GetRegionEpoch() + isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() + if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() { + // Region meta is stale, skip. + return + } + rangeChanged = !origin.rangeEqualsTo(region) + + if rangeChanged || !origin.peersEqualTo(region) { + // If the range or peers have changed, the sub regionTree needs to be cleaned up. + // TODO: Improve performance by deleting only the different peers. + r.removeRegionFromSubTreeLocked(origin) + } else { + // The region tree and the subtree update is not atomic and the region tree is updated first. + // If there are two thread needs to update region tree, + // t1: thread-A update region tree + // t2: thread-B: update region tree again + // t3: thread-B: update subtree + // t4: thread-A: update region subtree + // to keep region tree consistent with subtree, we need to drop this update. + if tree, ok := r.subRegions[region.GetID()]; ok { + r.updateSubTreeStat(origin, region) + tree.RegionInfo = region + } + return + } + } + + if rangeChanged { + overlaps := r.getOverlapRegionFromSubTreeLocked(region) + for _, re := range overlaps { + r.removeRegionFromSubTreeLocked(re) + } + } + + item := ®ionItem{region} + r.subRegions[region.GetID()] = item + // It has been removed and all information needs to be updated again. + // Set peers then. + setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) { + store, ok := peersMap[storeID] + if !ok { + store = newRegionTree() + peersMap[storeID] = store + } + store.update(item, false) + } + + // Add to leaders and followers. + for _, peer := range region.GetVoters() { + storeID := peer.GetStoreId() + if peer.GetId() == region.leader.GetId() { + // Add leader peer to leaders. + setPeer(r.leaders, storeID, item) + } else { + // Add follower peer to followers. + setPeer(r.followers, storeID, item) + } + } + + setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) { + for _, peer := range peers { + storeID := peer.GetStoreId() + setPeer(peersMap, storeID, item) + } + } + // Add to learners. + setPeers(r.learners, region.GetLearners()) + // Add to witnesses. + setPeers(r.witnesses, region.GetWitnesses()) + // Add to PendingPeers + setPeers(r.pendingPeers, region.GetPendingPeers()) +} + +func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo { + it := ®ionItem{RegionInfo: region} + overlaps := make([]*RegionInfo, 0) + overlapsMap := make(map[uint64]struct{}) + collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) { + if tree, ok := peersMap[storeID]; ok { + items := tree.overlaps(it) + for _, item := range items { + if _, ok := overlapsMap[item.GetID()]; !ok { + overlapsMap[item.GetID()] = struct{}{} + overlaps = append(overlaps, item.RegionInfo) + } + } + } + } + for _, peer := range region.GetMeta().GetPeers() { + storeID := peer.GetStoreId() + collectFromItemSlice(r.leaders, storeID) + collectFromItemSlice(r.followers, storeID) + collectFromItemSlice(r.learners, storeID) + collectFromItemSlice(r.witnesses, storeID) + } + return overlaps +} + // GetRelevantRegions returns the relevant regions for a given region. func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { r.t.RLock() diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 42e8c3a35cb7..88d21421405b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -632,10 +632,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // However, it can't solve the race condition of concurrent heartbeats from the same region. // Async task in next PR. - if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "UpdateSubTree", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( ctx, ratelimit.TaskOpts{ diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go new file mode 100644 index 000000000000..bb6e3b890fd8 --- /dev/null +++ b/pkg/ratelimit/metrics.go @@ -0,0 +1,50 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_max_waiting_duration_seconds", + Help: "The duration of tasks waiting in the runner.", + }, []string{"name"}) + + RunnerTaskPendingTasks = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_pending_tasks", + Help: "The number of pending tasks in the runner.", + }, []string{"name"}) + RunnerTaskFailedTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_failed_tasks_total", + Help: "The number of failed tasks in the runner.", + }, []string{"name"}) +) + +func init() { + prometheus.MustRegister(RunnerTaskMaxWaitingDuration) + prometheus.MustRegister(RunnerTaskPendingTasks) + prometheus.MustRegister(RunnerTaskFailedTasks) +} diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 7f0ef21f7916..c4f2d5bc5ac6 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -53,6 +54,8 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + failedTaskCount prometheus.Counter + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. @@ -62,6 +65,8 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), + failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -77,6 +82,7 @@ type TaskOpts struct { func (s *ConcurrentRunner) Start() { s.stopChan = make(chan struct{}) s.wg.Add(1) + ticker := time.NewTicker(5 * time.Second) go func() { defer s.wg.Done() for { @@ -92,8 +98,19 @@ func (s *ConcurrentRunner) Start() { go s.run(task.Ctx, task.f, nil) } case <-s.stopChan: + s.pendingMu.Lock() + s.pendingTasks = make([]*Task, 0, initialCapacity) + s.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", s.name)) return + case <-ticker.C: + maxDuration := time.Duration(0) + s.pendingMu.Lock() + if len(s.pendingTasks) > 0 { + maxDuration = time.Since(s.pendingTasks[0].submittedAt) + } + s.pendingMu.Unlock() + s.maxWaitingDuration.Set(maxDuration.Seconds()) } } }() @@ -144,6 +161,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con if len(s.pendingTasks) > 0 { maxWait := time.Since(s.pendingTasks[0].submittedAt) if maxWait > s.maxPendingDuration { + s.failedTaskCount.Inc() return ErrMaxWaitingTasksExceeded } } diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 1f3701763837..5a67a5474834 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -52,7 +52,7 @@ const ( defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableHeartbeatBreakdownMetrics = true - defaultEnableHeartbeatConcurrentRunner = false + defaultEnableHeartbeatConcurrentRunner = true defaultEnableCrossTableMerge = true defaultEnableDiagnostic = true defaultStrictlyMatchLabel = false diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dd59b63240ff..32ce23ef09b7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1065,10 +1065,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "UpdateSubTree", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + tracer.OnUpdateSubTreeFinished() + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context,