Skip to content

Commit

Permalink
wip add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Jul 16, 2024
1 parent e562e2c commit f2acc51
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
45 changes: 41 additions & 4 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics interface {
type MetricsGenerator interface {
metrics.Metrics

IncTick()
IncPing()

IncNumCheckRun()

IncNumTasksReceived()
IncNumTasksAcceptedByAggregator()
// This metric would either need to be tracked by the aggregator itself,
Expand All @@ -18,25 +24,44 @@ type Metrics interface {
// AvsMetrics contains instrumented metrics that should be incremented by the avs node using the methods below
type AvsAndEigenMetrics struct {
metrics.Metrics

numTick prometheus.Counter
numPingSent prometheus.Counter

numTasksReceived prometheus.Counter
// if numSignedTaskResponsesAcceptedByAggregator != numTasksReceived, then there is a bug
numSignedTaskResponsesAcceptedByAggregator prometheus.Counter
}

const incredibleSquaringNamespace = "incsq"
const apNamespace = "ap"

func NewAvsAndEigenMetrics(avsName string, eigenMetrics *metrics.EigenMetrics, reg prometheus.Registerer) *AvsAndEigenMetrics {
return &AvsAndEigenMetrics{
Metrics: eigenMetrics,

numTick: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "num_tick",
Help: "The number of worker loop tick by the operator. If it isn't increasing, the operator is stuck",
}),

numPingSent: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "num_ping",
Help: "The number of heartbeat send by operator. If it isn't increasing, the operator failed to communicate with aggregator",
}),

numTasksReceived: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: incredibleSquaringNamespace,
Namespace: apNamespace,
Name: "num_tasks_received",
Help: "The number of tasks received by reading from the avs service manager contract",
}),
numSignedTaskResponsesAcceptedByAggregator: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: incredibleSquaringNamespace,
Namespace: apNamespace,
Name: "num_signed_task_responses_accepted_by_aggregator",
Help: "The number of signed task responses accepted by the aggregator",
}),
Expand All @@ -50,3 +75,15 @@ func (m *AvsAndEigenMetrics) IncNumTasksReceived() {
func (m *AvsAndEigenMetrics) IncNumTasksAcceptedByAggregator() {
m.numSignedTaskResponsesAcceptedByAggregator.Inc()
}

func (m *AvsAndEigenMetrics) IncTick() {
m.numTick.Inc()
}

func (m *AvsAndEigenMetrics) IncPing() {
m.numTick.Inc()
}

func (m *AvsAndEigenMetrics) IncNumCheckRun() {
m.numTick.Inc()
}
15 changes: 3 additions & 12 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,8 @@ type Operator struct {
ethWsClient eth.Client
txManager *txmgr.SimpleTxManager

// TODO(samlaf): remove both avsWriter and eigenlayerWrite from operator
// they are only used for registration, so we should make a special registration package
// this way, auditing this operator code makes it obvious that operators don't need to
// write to the chain during the course of their normal operations
// writing to the chain should be done via the cli only
metricsReg *prometheus.Registry
metrics metrics.Metrics
metrics metrics.MetricsGenerator
nodeApi *nodeapi.NodeApi
avsWriter *chainio.AvsWriter
avsReader chainio.AvsReaderer
Expand Down Expand Up @@ -269,11 +264,6 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
panic(err)
}
aggregatorRpcClient := avsproto.NewAggregatorClient(aggregatorConn)
//aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, logger, avsAndEigenMetrics)
//if err != nil {
// logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err)
// return nil, err
//}

operator := &Operator{
config: c,
Expand All @@ -285,6 +275,7 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
ethWsClient: ethWsClient,
avsWriter: avsWriter,
avsReader: avsReader,

// avsSubscriber: avsSubscriber,
eigenlayerReader: sdkClients.ElChainReader,
eigenlayerWriter: sdkClients.ElChainWriter,
Expand Down Expand Up @@ -318,10 +309,10 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
"signerAddr", operator.signerAddress,
"operatorG1Pubkey", operator.blsKeypair.GetPubKeyG1(),
"operatorG2Pubkey", operator.blsKeypair.GetPubKeyG2(),
"prmMetricsEndpoint", fmt.Sprintf("%s/metrics/", operator.PromMetricsIpPortAddress),
)

return operator, nil

}

func (o *Operator) Start(ctx context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ func (o *Operator) runWorkLoop(ctx context.Context) error {
}

for {
o.metrics.IncTick()

select {
case <-ctx.Done():
return nil
case err := <-metricsErrChan:
// TODO: handle gracefully
o.logger.Fatal("Error in metrics server", "err", err)
case <-timer.C:
// Check in
o.PingServer()
}
}
Expand All @@ -45,6 +46,7 @@ func (o *Operator) PingServer() {
})
elapsed := time.Now().Sub(start)
if err == nil {
o.metrics.IncPing()
o.logger.Infof("operator update status succesfully in %d ms", elapsed.Milliseconds())
} else {
o.logger.Infof("error update status %v", err)
Expand Down

0 comments on commit f2acc51

Please sign in to comment.