Skip to content

Commit

Permalink
Use Prometheus metrics to observe the consumption
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 17, 2023
1 parent 073b756 commit 530f83c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 293 deletions.
169 changes: 38 additions & 131 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,19 @@ package server

import (
"context"
"encoding/json"
"sort"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
)

const (
defaultConsumptionChanSize = 1024
metricsFlushInterval = time.Minute
metricsGCInterval = time.Hour
// metricsGCLifeInterval is the life time of the metrics.
// After every `metricsGCInterval`, the metrics older than
// this will be deleted.
metricsGCLifeInterval = 24 * time.Hour
)
const defaultConsumptionChanSize = 1024

// Manager is the manager of resource group.
type Manager struct {
Expand All @@ -53,9 +41,6 @@ type Manager struct {
resourceGroupName string
*rmpb.Consumption
}
// metricsMap is used to store the metrics of each resource group.
// It will be updated and persisted by the (*Manager).backgroundMetricsFlush.
metricsMap map[string]*Metrics
}

// NewManager returns a new Manager.
Expand All @@ -67,12 +52,9 @@ func NewManager(srv *server.Server) *Manager {
resourceGroupName string
*rmpb.Consumption
}, defaultConsumptionChanSize),
metricsMap: make(map[string]*Metrics),
}
srv.AddStartCallback(m.Init)
ctx := srv.Context()
go m.backgroundMetricsFlush(ctx)
go m.backgroundMetricsGC(ctx)
go m.backgroundMetricsFlush(srv.Context())
return m
}

Expand Down Expand Up @@ -178,129 +160,54 @@ func (m *Manager) GetResourceGroupList() []*ResourceGroup {
return res
}

// Update and flush the metrics info periodically.
// Receive the consumption and flush it to the metrics.
func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
ticker := time.NewTicker(metricsFlushInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case consumption := <-m.consumptionDispatcher:
// Aggregate the consumption info to the metrics.
if metrics := m.getMetrics(consumption.resourceGroupName); metrics != nil {
metrics.Update(consumption.Consumption)
case consumptionInfo := <-m.consumptionDispatcher:
consumption := consumptionInfo.Consumption
if consumption == nil {
continue
}
// Flush the metrics info to the storage.
case <-ticker.C:
m.RLock()
metricsMapToFlush := make(map[string]*Metrics, len(m.metricsMap))
for name, metrics := range m.metricsMap {
// No update since the last flush.
if metrics.updateTime.Sub(metrics.flushTime) <= 0 {
continue
}
metricsMapToFlush[name] = metrics.Copy()
var (
name = consumptionInfo.resourceGroupName
rruMetrics = readRequestUnitCost.WithLabelValues(name)
wruMetrics = writeRequestUnitCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name)
writeByteMetrics = writeByteCost.WithLabelValues(name)
kvCPUMetrics = kvCPUCost.WithLabelValues(name)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name)
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
// RU info.
if consumption.RRU != 0 {
rruMetrics.Observe(consumption.RRU)
}
m.RUnlock()
// Start a txn to write in batch.
if err := m.storage().RunInTxn(ctx, func(txn kv.Txn) error {
for name, metrics := range metricsMapToFlush {
key := endpoint.HistoryResourceGroupMetricsKeyPath(name, metrics.updateTime.UnixMilli())
value, err := json.Marshal(metrics)
if err != nil {
log.Error("failed to marshal the resource group metrics info",
zap.String("key", key),
zap.Error(err))
return err
}
if err = txn.Save(key, string(value)); err != nil {
log.Error("failed to save the resource group metrics info in txn",
zap.String("key", key),
zap.Error(err))
return err
}
}
return nil
}); err != nil {
log.Error("failed to flush the resource group metrics info", zap.Error(err))
continue
if consumption.WRU != 0 {
wruMetrics.Observe(consumption.WRU)
}
// Reset the metrics info in memory safely after flushing.
for name := range metricsMapToFlush {
m.resetMetricsAfterFlush(name)
// Byte info.
if consumption.ReadBytes != 0 {
readByteMetrics.Observe(consumption.ReadBytes)
}
}
}
}

// GC the metrics info periodically.
func (m *Manager) backgroundMetricsGC(ctx context.Context) {
ticker := time.NewTicker(metricsGCInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
// GC the outdated metrics info in storage.
case <-ticker.C:
// Get all the group names.
m.RLock()
groupNameToGC := make([]string, 0, len(m.metricsMap))
for name := range m.metricsMap {
groupNameToGC = append(groupNameToGC, name)
if consumption.WriteBytes != 0 {
writeByteMetrics.Observe(consumption.WriteBytes)
}
m.RUnlock()
gcPoint := time.Now().Add(-metricsGCLifeInterval).UnixMilli()
// Start a txn to delete in batch.
if err := m.storage().RunInTxn(ctx, func(txn kv.Txn) error {
for _, name := range groupNameToGC {
startKey := endpoint.HistoryResourceGroupMetricsKeyPath(name, 0)
endKey := endpoint.HistoryResourceGroupMetricsKeyPath(name, gcPoint)
keys, _, err := txn.LoadRange(startKey, endKey, 0)
if err != nil {
log.Error("failed to load the resource group metrics info in txn",
zap.String("startKey", startKey),
zap.String("endKey", endKey),
zap.Error(err))
return err
}
for _, key := range keys {
if err = txn.Remove(key); err != nil {
log.Error("failed to remove the resource group metrics info in txn",
zap.String("key", key),
zap.Error(err))
return err
}
}
}
return nil
}); err != nil {
log.Error("failed to gc the resource group metrics info", zap.Error(err))
continue
// CPU time info.
if consumption.SqlLayerCpuTimeMs != 0 {
sqlCPUMetrics.Observe(consumption.SqlLayerCpuTimeMs)
kvCPUMetrics.Observe(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs)
}
// RPC count info.
if consumption.KvReadRpcCount != 0 {
readRequestCountMetrics.Add(consumption.KvReadRpcCount)
}
// GC the outdated metrics info in memory.
m.Lock()
for name, metrics := range m.metricsMap {
if metrics != nil && metrics.updateTime.UnixMilli() < gcPoint {
delete(m.metricsMap, name)
}
if consumption.KvWriteRpcCount != 0 {
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
}
m.Unlock()
}
}
}

func (m *Manager) getMetrics(name string) *Metrics {
m.RLock()
defer m.RUnlock()
return m.metricsMap[name]
}

func (m *Manager) resetMetricsAfterFlush(name string) {
m.Lock()
defer m.Unlock()
if metrics := m.metricsMap[name]; metrics != nil {
metrics.ResetAfterFlush()
}
}
Loading

0 comments on commit 530f83c

Please sign in to comment.