Skip to content

Commit

Permalink
add storeID to metrics back (#1506)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Apr 23, 2019
1 parent 8cc000c commit 0efb6b4
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 95 deletions.
35 changes: 20 additions & 15 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -304,45 +305,49 @@ func (c *coordinator) collectHotSpotMetrics() {
status := s.Scheduler.(hasHotStatus).GetHotWriteStatus()
for _, s := range stores {
storeAddress := s.GetAddress()
stat, ok := status.AsPeer[s.GetID()]
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsPeer[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
}

stat, ok = status.AsLeader[s.GetID()]
stat, ok = status.AsLeader[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}
}

// Collects hot read region metrics.
status = s.Scheduler.(hasHotStatus).GetHotReadStatus()
for _, s := range stores {
storeAddress := s.GetAddress()
stat, ok := status.AsLeader[s.GetID()]
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsLeader[storeID]
if ok {
totalReadBytes := float64(stat.TotalFlowBytes)
hotReadRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(hotReadRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(hotReadRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}
}

Expand Down
16 changes: 9 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -349,44 +350,45 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
}

storeID := request.GetLeader().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := cluster.GetStore(storeID)
if err != nil {
return err
}
storeAddress := store.GetAddress()

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeAddress).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeAddress, storeLabel).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))

cluster.RLock()
hbStreams := cluster.coordinator.hbStreams
cluster.RUnlock()

if time.Since(lastBind) > s.cfg.heartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "bind").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
hbStreams.bindStream(storeID, server)
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
}

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
}
}

Expand Down
17 changes: 10 additions & 7 deletions server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *heartbeatStreams) run() {
s.streams[update.storeID] = update.stream
case msg := <-s.msgCh:
storeID := msg.GetTargetPeer().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Error("fail to get store",
Expand All @@ -94,15 +96,15 @@ func (s *heartbeatStreams) run() {
log.Error("send heartbeat message fail",
zap.Uint64("region-id", msg.RegionId), zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "ok").Inc()
}
} else {
log.Debug("heartbeat stream not found, skip send message",
zap.Uint64("region-id", msg.RegionId),
zap.Uint64("store-id", storeID))
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "skip").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "skip").Inc()
}
case <-keepAliveTicker.C:
for storeID, stream := range s.streams {
Expand All @@ -113,14 +115,15 @@ func (s *heartbeatStreams) run() {
continue
}
storeAddress := store.GetAddress()
storeLabel := strconv.FormatUint(storeID, 10)
if err := stream.Send(keepAlive); err != nil {
log.Error("send keepalive message fail",
zap.Uint64("target-store-id", storeID),
zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "keepalive", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "keepalive", "ok").Inc()
}
}
case <-s.ctx.Done():
Expand Down Expand Up @@ -161,8 +164,8 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc()
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string, storeLabel string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Header: &pdpb.ResponseHeader{
Expand Down
8 changes: 4 additions & 4 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var (
Subsystem: "scheduler",
Name: "region_heartbeat",
Help: "Counter of region hearbeat.",
}, []string{"address", "type", "status"})
}, []string{"address", "store", "type", "status"})

regionHeartbeatLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -104,23 +104,23 @@ var (
Name: "region_heartbeat_latency_seconds",
Help: "Bucketed histogram of latency (s) of receiving heartbeat.",
Buckets: prometheus.ExponentialBuckets(1, 2, 12),
}, []string{"address"})
}, []string{"address", "store"})

storeStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "store_status",
Help: "Store status for schedule",
}, []string{"namespace", "address", "type"})
}, []string{"namespace", "address", "store", "type"})

hotSpotStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "hotspot",
Name: "status",
Help: "Status of the hotspot.",
}, []string{"address", "type"})
}, []string{"address", "store", "type"})

tsoCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
8 changes: 6 additions & 2 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package schedule

import (
"fmt"

"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
Expand All @@ -33,9 +35,10 @@ type Filter interface {
// FilterSource checks if store can pass all Filters as source store.
func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterSource(opt, store) {
filterCounter.WithLabelValues("filter-source", storeAddress, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Type()).Inc()
return true
}
}
Expand All @@ -45,9 +48,10 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
// FilterTarget checks if store can pass all Filters as target store.
func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeAddress := store.GetAddress()
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterTarget(opt, store) {
filterCounter.WithLabelValues("filter-target", storeAddress, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Type()).Inc()
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
Subsystem: "schedule",
Name: "filter",
Help: "Counter of the filter",
}, []string{"action", "address", "type"})
}, []string{"action", "address", "store", "type"})

operatorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
59 changes: 36 additions & 23 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package schedulers

import (
"strconv"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
Expand Down Expand Up @@ -88,40 +90,45 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*schedule.
return nil
}

log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("max-store", source.GetID()), zap.Uint64("min-store", target.GetID()))
sourceID := source.GetID()
targetID := target.GetID()
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("max-store", sourceID), zap.Uint64("min-store", targetID))
sourceStoreLabel := strconv.FormatUint(sourceID, 10)
targetStoreLabel := strconv.FormatUint(targetID, 10)
sourceAddress := source.GetAddress()
targetAddress := target.GetAddress()
balanceLeaderCounter.WithLabelValues("high_score", sourceAddress).Inc()
balanceLeaderCounter.WithLabelValues("low_score", targetAddress).Inc()
balanceLeaderCounter.WithLabelValues("high_score", sourceAddress, sourceStoreLabel).Inc()
balanceLeaderCounter.WithLabelValues("low_score", targetAddress, targetStoreLabel).Inc()

opInfluence := l.opController.GetOpInfluence(cluster)
for i := 0; i < balanceLeaderRetryLimit; i++ {
if op := l.transferLeaderOut(source, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_out", sourceAddress).Inc()
balanceLeaderCounter.WithLabelValues("transfer_out", sourceAddress, sourceStoreLabel).Inc()
return op
}
if op := l.transferLeaderIn(target, cluster, opInfluence); op != nil {
balanceLeaderCounter.WithLabelValues("transfer_in", targetAddress).Inc()
balanceLeaderCounter.WithLabelValues("transfer_in", targetAddress, targetStoreLabel).Inc()
return op
}
}

// If no operator can be created for the selected stores, ignore them for a while.
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", source.GetID()), zap.Uint64("target", target.GetID()))
balanceLeaderCounter.WithLabelValues("add_taint", sourceAddress).Inc()
l.taintStores.Put(source.GetID())
balanceLeaderCounter.WithLabelValues("add_taint", targetAddress).Inc()
l.taintStores.Put(target.GetID())
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", sourceID), zap.Uint64("target", targetID))
balanceLeaderCounter.WithLabelValues("add_taint", sourceAddress, sourceStoreLabel).Inc()
l.taintStores.Put(sourceID)
balanceLeaderCounter.WithLabelValues("add_taint", targetAddress, targetStoreLabel).Inc()
l.taintStores.Put(targetID)
return nil
}

// transferLeaderOut transfers leader from the source store.
// It randomly selects a health region from the source store, then picks
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(source *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
region := cluster.RandLeaderRegion(source.GetID(), core.HealthRegion())
sourceID := source.GetID()
region := cluster.RandLeaderRegion(sourceID, core.HealthRegion())
if region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", source.GetID()))
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID))
schedulerCounter.WithLabelValues(l.GetName(), "no_leader_region").Inc()
return nil
}
Expand All @@ -138,9 +145,10 @@ func (l *balanceLeaderScheduler) transferLeaderOut(source *core.StoreInfo, clust
// It randomly selects a health region from the target store, then picks
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
region := cluster.RandFollowerRegion(target.GetID(), core.HealthRegion())
targetID := target.GetID()
region := cluster.RandFollowerRegion(targetID, core.HealthRegion())
if region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", target.GetID()))
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID))
schedulerCounter.WithLabelValues(l.GetName(), "no_follower_region").Inc()
return nil
}
Expand All @@ -158,28 +166,33 @@ func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluste
// no new operator need to be created, otherwise create an operator that transfers
// the leader from the source store to the target store for the region.
func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
if cluster.IsRegionHot(region.GetID()) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID()))
regionID := region.GetID()
if cluster.IsRegionHot(regionID) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID))
schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc()
return nil
}

sourceID := source.GetID()
targetID := target.GetID()
if !shouldBalance(cluster, source, target, region, core.LeaderKind, opInfluence) {
log.Debug("skip balance region",
zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID()), zap.Uint64("source-store", source.GetID()), zap.Uint64("target-store", target.GetID()),
zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID),
zap.Int64("source-size", source.GetLeaderSize()), zap.Float64("source-score", source.LeaderScore(0)),
zap.Int64("source-influence", opInfluence.GetStoreInfluence(source.GetID()).ResourceSize(core.LeaderKind)),
zap.Int64("source-influence", opInfluence.GetStoreInfluence(sourceID).ResourceSize(core.LeaderKind)),
zap.Int64("target-size", target.GetLeaderSize()), zap.Float64("target-score", target.LeaderScore(0)),
zap.Int64("target-influence", opInfluence.GetStoreInfluence(target.GetID()).ResourceSize(core.LeaderKind)),
zap.Int64("target-influence", opInfluence.GetStoreInfluence(targetID).ResourceSize(core.LeaderKind)),
zap.Int64("average-region-size", cluster.GetAverageRegionSize()))
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
return nil
}

schedulerCounter.WithLabelValues(l.GetName(), "new_operator").Inc()
balanceLeaderCounter.WithLabelValues("move_leader", source.GetAddress()+"-out").Inc()
balanceLeaderCounter.WithLabelValues("move_leader", target.GetAddress()+"-in").Inc()
step := schedule.TransferLeader{FromStore: region.GetLeader().GetStoreId(), ToStore: target.GetID()}
op := schedule.NewOperator("balance-leader", region.GetID(), region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step)
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
balanceLeaderCounter.WithLabelValues("move_leader", source.GetAddress()+"-out", sourceLabel).Inc()
balanceLeaderCounter.WithLabelValues("move_leader", target.GetAddress()+"-in", targetLabel).Inc()
step := schedule.TransferLeader{FromStore: region.GetLeader().GetStoreId(), ToStore: targetID}
op := schedule.NewOperator("balance-leader", regionID, region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step)
return []*schedule.Operator{op}
}
Loading

0 comments on commit 0efb6b4

Please sign in to comment.