Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to operator #20

Merged
merged 10 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions aggregator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"google.golang.org/grpc/peer"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/AvaProtocol/ap-avs/core/config"
Expand All @@ -15,7 +17,10 @@ import (

type OperatorNode struct {
Address string `json:"address"`
RemoteIP string `json:"remote_ip"`
LastPingEpoch int64 `json:"last_ping"`
Version string `json:"version"`
MetricsPort int32 `json:"metrics_port"`
}

func (o *OperatorNode) LastSeen() string {
Expand Down Expand Up @@ -54,12 +59,15 @@ type OperatorPool struct {
db storage.Storage
}

func (o *OperatorPool) Checkin(payload *avsproto.Checkin) error {
func (o *OperatorPool) Checkin(payload *avsproto.Checkin, remoteIp string) error {
now := time.Now()

status := &OperatorNode{
Address: payload.Address,
LastPingEpoch: now.Unix(),
MetricsPort: payload.MetricsPort,
RemoteIP: remoteIp,
Version: payload.Version,
}

data, err := json.Marshal(status)
Expand Down Expand Up @@ -90,8 +98,10 @@ func (o *OperatorPool) GetAll() []*OperatorNode {
}

func (r *RpcServer) Ping(ctx context.Context, payload *avsproto.Checkin) (*avsproto.CheckinResp, error) {
p, _ := peer.FromContext(ctx)
remoteIp := strings.Split(p.Addr.String(), ":")[0]

if err := r.operatorPool.Checkin(payload); err != nil {
if err := r.operatorPool.Checkin(payload, remoteIp); err != nil {
return nil, fmt.Errorf("cannot update operator status error: %w", err)
}

Expand Down
13 changes: 13 additions & 0 deletions aggregator/resources/index.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
</div>
</div>

<div class="flex min-w-0 gap-x-4">
<div class="min-w-0 flex-auto">
<p class="text-sm font-semibold leading-6 text-white">
v{{ .Version }}
</p>
{{/*
<p class="text-sm font-semibold leading-6 text-white">
<a href="http://{{ .RemoteIP }}:{{ .MetricsPort }}/metrics">Metric</a>
</p>*}}
</div>
</div>


<div class="hidden shrink-0 sm:flex sm:flex-col sm:items-end">
<p class="text-sm leading-6 text-white">Active</p>
<p class="mt-1 text-xs leading-5 text-gray-400">Last seen <time
Expand Down
24 changes: 24 additions & 0 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"math/big"
"net"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -140,6 +141,29 @@ func (r *RpcServer) GetTask(ctx context.Context, taskID *avsproto.UUID) (*avspro
return task.ToProtoBuf()
}

func (r *RpcServer) SyncTasks(payload *avsproto.SyncTasksReq, srv avsproto.Aggregator_SyncTasksServer) error {
log.Printf("sync task for operator : %s", payload.Address)

for {
resp := avsproto.SyncTasksResp{
// TODO: Hook up to the new task channel to syncdicate in realtime
// Currently this is setup just to generate the metrics so we can
// prepare the dashboard
// Our actually task will be more completed
Id: "1",
TaskType: "CurrentBlockTime",
}

if err := srv.Send(&resp); err != nil {
log.Printf("error when sending task to operator %s: %v", payload.Address, err)
return err
}
time.Sleep(time.Duration(5) * time.Second)
}

return nil
}

// startRpcServer initializes and establish a tcp socket on given address from
// config file
func (agg *Aggregator) startRpcServer(ctx context.Context) error {
Expand Down
77 changes: 77 additions & 0 deletions core/timekeeper/timekeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package timekeeper

import (
"fmt"
"time"
)

type ElapsingStatus int

const (
Running ElapsingStatus = 1
Pause ElapsingStatus = 2
)

type Elapsing struct {
checkpoint time.Time

carryOn time.Duration

status ElapsingStatus
}

func NewElapsing() *Elapsing {
elapse := &Elapsing{
// In Go, Now keeps track both of wallclock and monotonic clock
// therefore we can use it to check delta as well
checkpoint: time.Now(),

status: Running,
}

return elapse
}

func (e *Elapsing) Pause() error {
if e.status == Pause {
return fmt.Errorf("elapsing is pause already")
}

e.carryOn = e.Report()
e.status = Pause

return nil
}

func (e *Elapsing) Resume() error {
if e.status != Pause {
return fmt.Errorf("elapsing is not pause")
}

e.checkpoint = time.Now()
e.status = Running

return nil
}

func (e *Elapsing) Reset() error {
e.status = Running
e.carryOn = 0
e.checkpoint = time.Now()

return nil
}

func (e *Elapsing) Report() time.Duration {
if e.status == Pause {
return time.Duration(0)
}

now := time.Now()
total := now.Sub(e.checkpoint) + e.carryOn

e.carryOn = time.Duration(0)
e.checkpoint = now

return total
}
53 changes: 53 additions & 0 deletions core/timekeeper/timekeeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package timekeeper

import (
"testing"
"time"
)

func TestElapsing(t *testing.T) {
elapse := NewElapsing()

time.Sleep(50 * time.Millisecond)

d1 := elapse.Report()
if d1 <= 50*time.Millisecond {
t.Errorf("elapse time is wrong. expect some amount > 50ms, got %s", d1)
}

elapse.Pause()

time.Sleep(50 * time.Millisecond)
elapse.Resume()
d2 := elapse.Report()

if d2 >= 1*time.Millisecond {
t.Errorf("elapse time is wrong. expect some amount > 50ms, got %s", d2)
}
}

func TestReset(t *testing.T) {
elapse := NewElapsing()

time.Sleep(50 * time.Millisecond)

elapse.Reset()
d1 := elapse.Report()
if d1 >= 1*time.Millisecond {
t.Errorf("elapse time is wrong. expect some amount ~1us, got %s", d1)
}
}

func TestCarryon(t *testing.T) {
elapse := NewElapsing()
time.Sleep(10 * time.Millisecond)
elapse.Pause()

time.Sleep(10 * time.Millisecond)
elapse.Resume()
d1 := elapse.Report()

if d1 >= 20*time.Millisecond {
t.Errorf("elapse time is wrong. expect some amount ~10ms, got %s", d1)
}
}
96 changes: 81 additions & 15 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics interface {
type MetricsGenerator interface {
metrics.Metrics
IncNumTasksReceived()

IncWorkerLoop()
IncPing(string)

AddUptime(float64)

IncNumCheckRun(string, string)

IncNumTasksReceived(string)
IncNumTasksAcceptedByAggregator()
// This metric would either need to be tracked by the aggregator itself,
// or we would need to write a collector that queries onchain for this info
Expand All @@ -18,35 +26,93 @@ type Metrics interface {
// AvsMetrics contains instrumented metrics that should be incremented by the avs node using the methods below
type AvsAndEigenMetrics struct {
metrics.Metrics
numTasksReceived prometheus.Counter

uptime *prometheus.CounterVec

numWorkerLoop *prometheus.CounterVec
numPingSent *prometheus.CounterVec
numCheckProcessed *prometheus.CounterVec
numTasksReceived *prometheus.CounterVec
// if numSignedTaskResponsesAcceptedByAggregator != numTasksReceived, then there is a bug
numSignedTaskResponsesAcceptedByAggregator prometheus.Counter
numSignedTaskResponsesAcceptedByAggregator *prometheus.CounterVec

operatorAddress string
version string
}

const incredibleSquaringNamespace = "incsq"
const apNamespace = "ap"

func NewAvsAndEigenMetrics(avsName string, eigenMetrics *metrics.EigenMetrics, reg prometheus.Registerer) *AvsAndEigenMetrics {
func NewAvsAndEigenMetrics(avsName, operatorAddress, version string, eigenMetrics *metrics.EigenMetrics, reg prometheus.Registerer) *AvsAndEigenMetrics {
return &AvsAndEigenMetrics{
Metrics: eigenMetrics,
numTasksReceived: promauto.With(reg).NewCounter(

uptime: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "uptime_milliseconds_total",
Help: "The elapse time in milliseconds since the node is booted",
}, []string{"operator", "version"}),

numWorkerLoop: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "num_worker_loop_total",
Help: "The number of worker loop by the operator. If it isn't increasing, the operator is stuck",
}, []string{"operator", "version"}),

numPingSent: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "num_ping_total",
Help: "The number of heartbeat send by operator. If it isn't increasing, the operator failed to communicate with aggregator",
}, []string{"operator", "version", "status"}),

numCheckProcessed: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: apNamespace,
Name: "num_check_processed_total",
Help: "The number of check has been performed by operator.",
}, []string{"operator", "version", "type", "status"}),

numTasksReceived: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: incredibleSquaringNamespace,
Namespace: apNamespace,
Name: "num_tasks_received",
Help: "The number of tasks received by reading from the avs service manager contract",
}),
numSignedTaskResponsesAcceptedByAggregator: promauto.With(reg).NewCounter(
}, []string{"operator", "version", "type"}),

numSignedTaskResponsesAcceptedByAggregator: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: incredibleSquaringNamespace,
Namespace: apNamespace,
Name: "num_signed_task_responses_accepted_by_aggregator",
Help: "The number of signed task responses accepted by the aggregator",
}),
}, []string{"operator", "version",}),

operatorAddress: operatorAddress,
version: version,
}
}

func (m *AvsAndEigenMetrics) IncNumTasksReceived() {
m.numTasksReceived.Inc()
func (m *AvsAndEigenMetrics) IncNumTasksReceived(checkType string) {
m.numTasksReceived.WithLabelValues(m.operatorAddress, m.version, checkType). Inc()
}

func (m *AvsAndEigenMetrics) IncNumTasksAcceptedByAggregator() {
m.numSignedTaskResponsesAcceptedByAggregator.Inc()
m.numSignedTaskResponsesAcceptedByAggregator.WithLabelValues(m.operatorAddress, m.version).Inc()
}

func (m *AvsAndEigenMetrics) IncWorkerLoop() {
m.numWorkerLoop.WithLabelValues(m.operatorAddress, m.version).Inc()
}

func (m *AvsAndEigenMetrics) IncPing(status string) {
m.numPingSent.WithLabelValues(m.operatorAddress, m.version, status).Inc()
}

func (m *AvsAndEigenMetrics) IncNumCheckRun(checkType, status string) {
m.numCheckProcessed.WithLabelValues(m.operatorAddress, m.version, checkType, status).Inc()
}

func (m *AvsAndEigenMetrics) AddUptime(total float64) {
m.uptime.WithLabelValues(m.operatorAddress, m.version).Add(total)
}
Loading