Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vectors for message handler metrics #2987

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 58 additions & 48 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (h *handler) dispatchSync(ctx context.Context) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue, h.metrics.expired)
ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue)
if !ok {
return
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (h *handler) dispatchAsync(ctx context.Context) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue, h.metrics.asyncExpired)
ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue)
if !ok {
return
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (h *handler) dispatchChans(ctx context.Context) {
func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
var (
nodeID = msg.NodeID()
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
// Check if the chain is in normal operation at the start of message
Expand All @@ -455,13 +455,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.resourceTracker.StartProcessing(nodeID, startTime)
Expand All @@ -471,24 +471,28 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
h.ctx.Lock.Unlock()

var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
msgHandlingTime = endTime.Sub(lockAcquiredTime)
endTime = h.clock.Time()
lockingTime = lockAcquiredTime.Sub(startTime)
handlingTime = endTime.Sub(lockAcquiredTime)
)
h.resourceTracker.StopProcessing(nodeID, endTime)
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime))
h.metrics.lockingTime.Add(float64(lockingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling sync message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
if processingTime > syncProcessingTimeWarnLimit && isNormalOp {
if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp {
h.ctx.Log.Warn("handling sync message took longer than expected",
zap.Duration("processingTime", processingTime),
zap.Duration("msgHandlingTime", msgHandlingTime),
zap.Duration("lockingTime", lockingTime),
zap.Duration("handlingTime", handlingTime),
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
}
Expand All @@ -504,7 +508,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
// drop the message.
h.ctx.Log.Debug("dropping sync message",
zap.String("reason", "uninitialized engine type"),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("currentEngineType", currentState.Type),
zap.Stringer("requestedEngineType", msg.EngineType),
)
Expand Down Expand Up @@ -534,7 +538,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
// requested an Avalanche engine handle the message.
h.ctx.Log.Debug("dropping sync message",
zap.String("reason", "uninitialized engine state"),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("currentEngineType", currentState.Type),
zap.Stringer("requestedEngineType", msg.EngineType),
zap.Stringer("engineState", currentState.State),
Expand Down Expand Up @@ -787,36 +791,38 @@ func (h *handler) handleAsyncMsg(ctx context.Context, msg Message) {
func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error {
var (
nodeID = msg.NodeID()
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
)
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.resourceTracker.StartProcessing(nodeID, startTime)
defer func() {
var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
endTime = h.clock.Time()
handlingTime = endTime.Sub(startTime)
)
h.resourceTracker.StopProcessing(nodeID, endTime)
// There is no lock grabbed here, so both metrics are identical
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(processingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling async message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}()

Expand Down Expand Up @@ -901,7 +907,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error {
// Any returned error is treated as fatal
func (h *handler) handleChanMsg(msg message.InboundMessage) error {
var (
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
// Check if the chain is in normal operation at the start of message
Expand All @@ -910,12 +916,12 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
)
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.ctx.Lock.Lock()
Expand All @@ -924,22 +930,26 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
h.ctx.Lock.Unlock()

var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
msgHandlingTime = endTime.Sub(lockAcquiredTime)
endTime = h.clock.Time()
lockingTime = lockAcquiredTime.Sub(startTime)
handlingTime = endTime.Sub(lockAcquiredTime)
)
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime))
h.metrics.lockingTime.Add(float64(lockingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling chan message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
if processingTime > syncProcessingTimeWarnLimit && isNormalOp {
if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp {
h.ctx.Log.Warn("handling chan message took longer than expected",
zap.Duration("processingTime", processingTime),
zap.Duration("msgHandlingTime", msgHandlingTime),
zap.Stringer("messageOp", op),
zap.Duration("lockingTime", lockingTime),
zap.Duration("handlingTime", handlingTime),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
}
Expand Down Expand Up @@ -974,10 +984,7 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
}
}

func (h *handler) popUnexpiredMsg(
queue MessageQueue,
expired prometheus.Counter,
) (context.Context, Message, bool) {
func (h *handler) popUnexpiredMsg(queue MessageQueue) (context.Context, Message, bool) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
Expand All @@ -988,16 +995,19 @@ func (h *handler) popUnexpiredMsg(

// If this message's deadline has passed, don't process it.
if expiration := msg.Expiration(); h.clock.Time().After(expiration) {
op := msg.Op().String()
h.ctx.Log.Debug("dropping message",
zap.String("reason", "timeout"),
zap.Stringer("nodeID", msg.NodeID()),
zap.Stringer("messageOp", msg.Op()),
zap.String("messageOp", op),
)
span := trace.SpanFromContext(ctx)
span.AddEvent("dropping message", trace.WithAttributes(
attribute.String("reason", "timeout"),
))
expired.Inc()
h.metrics.expired.With(prometheus.Labels{
opLabel: op,
}).Inc()
msg.OnFinishedHandling()
continue
}
Expand Down
97 changes: 41 additions & 56 deletions snow/networking/handler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,54 @@
package handler

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/utils"
)

type metrics struct {
expired prometheus.Counter
asyncExpired prometheus.Counter
messages map[message.Op]*messageProcessing
}

type messageProcessing struct {
processingTime metric.Averager
msgHandlingTime metric.Averager
expired *prometheus.CounterVec // op
messages *prometheus.CounterVec // op
lockingTime prometheus.Counter
messageHandlingTime *prometheus.CounterVec // op
}

func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) {
errs := wrappers.Errs{}

expired := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "expired",
Help: "Incoming sync messages dropped because the message deadline expired",
})
asyncExpired := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "async_expired",
Help: "Incoming async messages dropped because the message deadline expired",
})
errs.Add(
reg.Register(expired),
reg.Register(asyncExpired),
)

messages := make(map[message.Op]*messageProcessing, len(message.ConsensusOps))
for _, op := range message.ConsensusOps {
opStr := op.String()
messageProcessing := &messageProcessing{
processingTime: metric.NewAveragerWithErrs(
namespace,
opStr,
"time (in ns) spent handling a "+opStr,
reg,
&errs,
),
msgHandlingTime: metric.NewAveragerWithErrs(
namespace,
opStr+"_msg_handling",
fmt.Sprintf("time (in ns) spent handling a %s after grabbing the lock", opStr),
reg,
&errs,
),
}
messages[op] = messageProcessing
m := &metrics{
expired: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "expired",
Help: "messages dropped because the deadline expired",
},
opLabels,
),
messages: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "messages",
Help: "messages handled",
},
opLabels,
),
messageHandlingTime: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "message_handling_time",
Help: "time spent handling messages",
},
opLabels,
),
lockingTime: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "locking_time",
Help: "time spent acquiring the context lock",
}),
}

return &metrics{
expired: expired,
asyncExpired: asyncExpired,
messages: messages,
}, errs.Err
return m, utils.Err(
reg.Register(m.expired),
reg.Register(m.messages),
reg.Register(m.messageHandlingTime),
reg.Register(m.lockingTime),
)
}
Loading