Skip to content

Commit

Permalink
Small metrics cleanup (#3088)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 6, 2024
1 parent 1b82dce commit 7d3415c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 32 deletions.
18 changes: 16 additions & 2 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,25 @@ func New(
return nil, fmt.Errorf("initializing handler metrics errored with: %w", err)
}
cpuTracker := resourceTracker.CPUTracker()
h.syncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler")
h.syncMessageQueue, err = NewMessageQueue(
h.ctx.Log,
h.ctx.SubnetID,
h.validators,
cpuTracker,
"handler",
h.ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("initializing sync message queue errored with: %w", err)
}
h.asyncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler_async")
h.asyncMessageQueue, err = NewMessageQueue(
h.ctx.Log,
h.ctx.SubnetID,
h.validators,
cpuTracker,
"handler_async",
h.ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("initializing async message queue errored with: %w", err)
}
Expand Down
28 changes: 16 additions & 12 deletions snow/networking/handler/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)

Expand Down Expand Up @@ -60,7 +60,8 @@ type messageQueue struct {
clock mockable.Clock
metrics messageQueueMetrics

ctx *snow.ConsensusContext
log logging.Logger
subnetID ids.ID
// Validator set for the chain associated with this
vdrs validators.Manager
// Tracks CPU utilization of each node
Expand All @@ -75,20 +76,23 @@ type messageQueue struct {
}

func NewMessageQueue(
ctx *snow.ConsensusContext,
log logging.Logger,
subnetID ids.ID,
vdrs validators.Manager,
cpuTracker tracker.Tracker,
metricsNamespace string,
reg prometheus.Registerer,
) (MessageQueue, error) {
m := &messageQueue{
ctx: ctx,
log: log,
subnetID: subnetID,
vdrs: vdrs,
cpuTracker: cpuTracker,
cond: sync.NewCond(&sync.Mutex{}),
nodeToUnprocessedMsgs: make(map[ids.NodeID]int),
msgAndCtxs: buffer.NewUnboundedDeque[*msgAndContext](1 /*=initSize*/),
}
return m, m.metrics.initialize(metricsNamespace, ctx.Registerer)
return m, m.metrics.initialize(metricsNamespace, reg)
}

func (m *messageQueue) Push(ctx context.Context, msg Message) {
Expand Down Expand Up @@ -137,7 +141,7 @@ func (m *messageQueue) Pop() (context.Context, Message, bool) {
i := 0
for {
if i == n {
m.ctx.Log.Debug("canPop is false for all unprocessed messages",
m.log.Debug("canPop is false for all unprocessed messages",
zap.Int("numMessages", n),
)
}
Expand Down Expand Up @@ -212,21 +216,21 @@ func (m *messageQueue) canPop(msg message.InboundMessage) bool {
// the number of nodes with unprocessed messages.
baseMaxCPU := 1 / float64(len(m.nodeToUnprocessedMsgs))
nodeID := msg.NodeID()
weight := m.vdrs.GetWeight(m.ctx.SubnetID, nodeID)
weight := m.vdrs.GetWeight(m.subnetID, nodeID)

var portionWeight float64
if totalVdrsWeight, err := m.vdrs.TotalWeight(m.ctx.SubnetID); err != nil {
if totalVdrsWeight, err := m.vdrs.TotalWeight(m.subnetID); err != nil {
// The sum of validator weights should never overflow, but if they do,
// we treat portionWeight as 0.
m.ctx.Log.Error("failed to get total weight of validators",
zap.Stringer("subnetID", m.ctx.SubnetID),
m.log.Error("failed to get total weight of validators",
zap.Stringer("subnetID", m.subnetID),
zap.Error(err),
)
} else if totalVdrsWeight == 0 {
// The sum of validator weights should never be 0, but handle that case
// for completeness here to avoid divide by 0.
m.ctx.Log.Warn("validator set is empty",
zap.Stringer("subnetID", m.ctx.SubnetID),
m.log.Warn("validator set is empty",
zap.Stringer("subnetID", m.subnetID),
)
} else {
portionWeight = float64(weight) / float64(totalVdrsWeight)
Expand Down
19 changes: 13 additions & 6 deletions snow/networking/handler/message_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,35 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
)

func TestQueue(t *testing.T) {
ctrl := gomock.NewController(t)
require := require.New(t)
cpuTracker := tracker.NewMockTracker(ctrl)
snowCtx := snowtest.Context(t, snowtest.CChainID)
ctx := snowtest.ConsensusContext(snowCtx)
vdrs := validators.NewManager()
vdr1ID, vdr2ID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID()
require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr1ID, nil, ids.Empty, 1))
require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr2ID, nil, ids.Empty, 1))
mIntf, err := NewMessageQueue(ctx, vdrs, cpuTracker, "")
require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr1ID, nil, ids.Empty, 1))
require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr2ID, nil, ids.Empty, 1))
mIntf, err := NewMessageQueue(
logging.NoLog{},
constants.PrimaryNetworkID,
vdrs,
cpuTracker,
"",
prometheus.NewRegistry(),
)
require.NoError(err)
u := mIntf.(*messageQueue)
currentTime := time.Now()
Expand Down
12 changes: 6 additions & 6 deletions vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/versiondb"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/avm/block"
"github.com/ava-labs/avalanchego/vms/avm/config"
"github.com/ava-labs/avalanchego/vms/avm/metrics"
"github.com/ava-labs/avalanchego/vms/avm/network"
"github.com/ava-labs/avalanchego/vms/avm/state"
"github.com/ava-labs/avalanchego/vms/avm/txs"
Expand All @@ -47,6 +47,7 @@ import (
blockbuilder "github.com/ava-labs/avalanchego/vms/avm/block/builder"
blockexecutor "github.com/ava-labs/avalanchego/vms/avm/block/executor"
extensions "github.com/ava-labs/avalanchego/vms/avm/fxs"
avmmetrics "github.com/ava-labs/avalanchego/vms/avm/metrics"
txexecutor "github.com/ava-labs/avalanchego/vms/avm/txs/executor"
xmempool "github.com/ava-labs/avalanchego/vms/avm/txs/mempool"
)
Expand All @@ -66,7 +67,7 @@ type VM struct {

config.Config

metrics metrics.Metrics
metrics avmmetrics.Metrics

avax.AddressManager
ids.Aliaser
Expand Down Expand Up @@ -173,16 +174,15 @@ func (vm *VM) Initialize(
zap.Reflect("config", avmConfig),
)

registerer := prometheus.NewRegistry()
if err := ctx.Metrics.Register("", registerer); err != nil {
vm.registerer, err = metrics.MakeAndRegister(ctx.Metrics, "")
if err != nil {
return err
}
vm.registerer = registerer

vm.connectedPeers = make(map[ids.NodeID]*version.Application)

// Initialize metrics as soon as possible
vm.metrics, err = metrics.New(registerer)
vm.metrics, err = avmmetrics.New(vm.registerer)
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions vms/platformvm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"time"

"github.com/gorilla/rpc/v2"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/codec/linearcodec"
Expand All @@ -35,7 +35,6 @@ import (
"github.com/ava-labs/avalanchego/vms/platformvm/block"
"github.com/ava-labs/avalanchego/vms/platformvm/config"
"github.com/ava-labs/avalanchego/vms/platformvm/fx"
"github.com/ava-labs/avalanchego/vms/platformvm/metrics"
"github.com/ava-labs/avalanchego/vms/platformvm/network"
"github.com/ava-labs/avalanchego/vms/platformvm/reward"
"github.com/ava-labs/avalanchego/vms/platformvm/state"
Expand All @@ -47,6 +46,7 @@ import (
snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block"
blockbuilder "github.com/ava-labs/avalanchego/vms/platformvm/block/builder"
blockexecutor "github.com/ava-labs/avalanchego/vms/platformvm/block/executor"
platformvmmetrics "github.com/ava-labs/avalanchego/vms/platformvm/metrics"
txexecutor "github.com/ava-labs/avalanchego/vms/platformvm/txs/executor"
pmempool "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool"
pvalidators "github.com/ava-labs/avalanchego/vms/platformvm/validators"
Expand All @@ -65,7 +65,7 @@ type VM struct {
*network.Network
validators.State

metrics metrics.Metrics
metrics platformvmmetrics.Metrics

// Used to get time. Useful for faking time during tests.
clock mockable.Clock
Expand Down Expand Up @@ -113,13 +113,13 @@ func (vm *VM) Initialize(
}
chainCtx.Log.Info("using VM execution config", zap.Reflect("config", execConfig))

registerer := prometheus.NewRegistry()
if err := chainCtx.Metrics.Register("", registerer); err != nil {
registerer, err := metrics.MakeAndRegister(chainCtx.Metrics, "")
if err != nil {
return err
}

// Initialize metrics as soon as possible
vm.metrics, err = metrics.New(registerer)
vm.metrics, err = platformvmmetrics.New(registerer)
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}
Expand Down

0 comments on commit 7d3415c

Please sign in to comment.