Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Mar 7, 2024
1 parent eecb7d7 commit 4d9a680
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
33 changes: 22 additions & 11 deletions pkg/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var (
setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion")
updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree")
updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree")
regionCollectDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("CollectRegionStats")
regionCollectCount = HeartbeatBreakdownHandleCount.WithLabelValues("CollectRegionStats")
otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other")
otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other")
)
Expand Down Expand Up @@ -109,6 +111,7 @@ type RegionHeartbeatProcessTracer interface {
OnValidateRegionFinished()
OnSetRegionFinished()
OnUpdateSubTreeFinished()
OnCollectRegionStatsFinished()
OnAllStageFinished()
LogFields() []zap.Field
}
Expand All @@ -120,17 +123,18 @@ func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &noopHeartbeatProcessTracer{}
}

func (n *noopHeartbeatProcessTracer) Begin() {}
func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (n *noopHeartbeatProcessTracer) Begin() {}
func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
func (n *noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}
Expand Down Expand Up @@ -192,6 +196,13 @@ func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() {
h.lastCheckTime = time.Now()
}

func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() {
now := time.Now()
regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds())
regionCollectCount.Inc()
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
now := time.Now()
h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime)
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.Re
}
tracer.OnSaveCacheFinished()
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
tracer.OnCollectRegionStatsFinished()
return nil
}

Expand Down
26 changes: 10 additions & 16 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,9 +990,9 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core.RegionHeartbeatProcessTracer) error {
origin, _, err := c.core.PreCheckPutRegion(region, trace)
trace.OnPreCheckFinished()
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error {
origin, _, err := c.core.PreCheckPutRegion(region, tracer)
tracer.OnPreCheckFinished()
if err != nil {
return err
}
Expand All @@ -1002,12 +1002,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.HandleStatsAsync(c, region)
}
trace.OnAsyncHotStatsFinished()
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync := regionGuide(region, origin)
trace.OnRegionGuideFinished()
tracer.OnRegionGuideFinished()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1022,7 +1022,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core
failpoint.Inject("concurrentRegionHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
})
trace.OnSaveCacheBegin()
tracer.OnSaveCacheBegin()
var overlaps []*core.RegionInfo
if saveCache {
failpoint.Inject("decEpoch", func() {
Expand All @@ -1032,28 +1032,22 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core
// check its validation again here.
//
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region, trace); err != nil {
trace.OnSaveCacheFinished()
if overlaps, err = c.core.AtomicCheckAndPutRegion(region, tracer); err != nil {
tracer.OnSaveCacheFinished()
return err
}
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.HandleOverlaps(c, overlaps)
}
regionUpdateCacheEventCounter.Inc()
}
<<<<<<< HEAD
trace.OnSaveCacheFinished()
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
}
=======

tracer.OnSaveCacheFinished()
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
>>>>>>> origin/master

tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand Down

0 comments on commit 4d9a680

Please sign in to comment.