Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Mar 21, 2024
1 parent 0583609 commit bce1408
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
42 changes: 26 additions & 16 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
recordMaxTicker := time.NewTicker(tickPerSecond)
defer recordMaxTicker.Stop()
maxPerSecTrackers := make(map[string]*maxPerSecCostTracker)
rruSum := make(map[string]float64)
wruSum := make(map[string]float64)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -394,14 +392,19 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel)
)
t, ok := maxPerSecTrackers[name]
if !ok {
t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
maxPerSecTrackers[name] = t
}
t.CollectConsumption(consumption)

// RU info.
if consumption.RRU > 0 {
rruMetrics.Add(consumption.RRU)
rruSum[name] += consumption.RRU
}
if consumption.WRU > 0 {
wruMetrics.Add(consumption.WRU)
wruSum[name] += consumption.WRU
}
// Byte info.
if consumption.ReadBytes > 0 {
Expand Down Expand Up @@ -448,8 +451,6 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType)
delete(m.consumptionRecord, r)
delete(maxPerSecTrackers, r.name)
delete(rruSum, r.name)
delete(wruSum, r.name)
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
}
Expand Down Expand Up @@ -482,10 +483,11 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
}
m.RUnlock()
for _, name := range names {
if maxPerSecTrackers[name] == nil {
if t, ok := maxPerSecTrackers[name]; !ok {
maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
} else {
t.FlushMetrics()
}
maxPerSecTrackers[name].Observe(rruSum[name], wruSum[name])
}
}
}
Expand All @@ -495,6 +497,8 @@ type maxPerSecCostTracker struct {
name string
maxPerSecRRU float64
maxPerSecWRU float64
rruSum float64
wruSum float64
lastRRUSum float64
lastWRUSum float64
flushPeriod int
Expand All @@ -512,17 +516,23 @@ func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker
}
}

// Observe and set the maxPerSecRRU and maxPerSecWRU to the metrics.
func (t *maxPerSecCostTracker) Observe(rruSum, wruSum float64) {
// CollectConsumption collects the consumption info.
func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) {
t.rruSum += consume.RRU
t.wruSum += consume.WRU
}

// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics.
func (t *maxPerSecCostTracker) FlushMetrics() {
if t.lastRRUSum == 0 && t.lastWRUSum == 0 {
t.lastRRUSum = rruSum
t.lastWRUSum = wruSum
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
return
}
deltaRRU := rruSum - t.lastRRUSum
deltaWRU := wruSum - t.lastWRUSum
t.lastRRUSum = rruSum
t.lastWRUSum = wruSum
deltaRRU := t.rruSum - t.lastRRUSum
deltaWRU := t.wruSum - t.lastWRUSum
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
if deltaRRU > t.maxPerSecRRU {
t.maxPerSecRRU = deltaRRU
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/mcs/resourcemanager/server/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"testing"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,22 +27,25 @@ func TestMaxPerSecCostTracker(t *testing.T) {
re := require.New(t)

// Define the expected max values for each flushPeriod
expectedMaxRRU := []float64{19, 39, 59}
expectedMaxWRU := []float64{19, 39, 59}
expectedMaxRU := []float64{19, 39, 59}
expectedSum := []float64{190, 780, 1770}

rSum := 0
wSum := 0
for i := 0; i < 60; i++ {
// Record data
rSum += i
wSum += i
tracker.Observe(float64(rSum), float64(wSum))
consumption := &rmpb.Consumption{
RRU: float64(i),
WRU: float64(i),
}
tracker.CollectConsumption(consumption)
tracker.FlushMetrics()

// Check the max values at the end of each flushPeriod
if (i+1)%20 == 0 {
period := i / 20
re.Equal(tracker.maxPerSecRRU, expectedMaxRRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecWRU, expectedMaxWRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1))
re.Equal(tracker.rruSum, expectedSum[period])
re.Equal(tracker.rruSum, expectedSum[period])
}
}
}

0 comments on commit bce1408

Please sign in to comment.