Skip to content

Commit

Permalink
improve metric endpoint reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Jul 24, 2024
1 parent ae25de6 commit b5c0d82
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 185 deletions.
11 changes: 3 additions & 8 deletions aggregator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"google.golang.org/grpc/peer"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/AvaProtocol/ap-avs/core/config"
Expand Down Expand Up @@ -59,14 +57,14 @@ type OperatorPool struct {
db storage.Storage
}

func (o *OperatorPool) Checkin(payload *avsproto.Checkin, remoteIp string) error {
func (o *OperatorPool) Checkin(payload *avsproto.Checkin) error {
now := time.Now()

status := &OperatorNode{
Address: payload.Address,
LastPingEpoch: now.Unix(),
MetricsPort: payload.MetricsPort,
RemoteIP: remoteIp,
RemoteIP: payload.RemoteIP,
Version: payload.Version,
}

Expand Down Expand Up @@ -98,10 +96,7 @@ func (o *OperatorPool) GetAll() []*OperatorNode {
}

func (r *RpcServer) Ping(ctx context.Context, payload *avsproto.Checkin) (*avsproto.CheckinResp, error) {
p, _ := peer.FromContext(ctx)
remoteIp := strings.Split(p.Addr.String(), ":")[0]

if err := r.operatorPool.Checkin(payload, remoteIp); err != nil {
if err := r.operatorPool.Checkin(payload); err != nil {
return nil, fmt.Errorf("cannot update operator status error: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion aggregator/resources/index.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<div class="min-w-0 flex-auto">
{{ if ne .Version "" }}
<p class="text-sm font-semibold leading-6 text-white">
v{{ .Version }}
{{ .Version }}
</p>
{{ end }}
{{ if gt .MetricsPort 0 }}
Expand Down
52 changes: 43 additions & 9 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/AvaProtocol/ap-avs/version"

"github.com/AvaProtocol/ap-avs/core/timekeeper"
"github.com/AvaProtocol/ap-avs/core/ipfetcher"

// insecure for local dev
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -67,6 +68,8 @@ type OperatorConfig struct {
EnableNodeApi bool `yaml:"enable_node_api"`

DbPath string `yaml:"db_path"`

PublicMetricsPort int32
}

type Operator struct {
Expand Down Expand Up @@ -106,7 +109,7 @@ type Operator struct {

elapsing *timekeeper.Elapsing

metricsPort int32
publicIP string
}

func RunWithConfig(configPath string) {
Expand Down Expand Up @@ -276,12 +279,6 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
}
aggregatorRpcClient := avsproto.NewAggregatorClient(aggregatorConn)

parts := strings.Split(c.EigenMetricsIpPortAddress, ":")
if len(parts) !=2 {
panic(fmt.Errorf("EigenMetricsIpPortAddress: %s in operator config file is malform", c.EigenMetricsIpPortAddress))
}
metricsPort, _ := strconv.Atoi(parts[1])

operator := &Operator{
config: c,
logger: logger,
Expand Down Expand Up @@ -310,8 +307,6 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {

txManager: txMgr,
elapsing: elapsing,

metricsPort: int32(metricsPort),
}

operator.PopulateKnownConfigByChainID(chainId)
Expand Down Expand Up @@ -387,6 +382,45 @@ func (o *Operator) retryConnect() error{
return nil
}

// Optimistic get public ip address of the operator
// the IP address is used in combination with
func (o *Operator) GetPublicIP() string {
if o.publicIP == "" {
var err error
o.publicIP, err = ipfetcher.GetIP()
if err != nil {
// We will retry and eventually successful, the public ip isn't
// being used widely in our operation, only for metric scrape
o.logger.Errorf("error fetching public ip address %v", err)
}
}

return o.publicIP
}

func (c *OperatorConfig) GetPublicMetricPort() int32 {
// If we had port from env, use it, if not, we parse the port from config
if c.PublicMetricsPort > 0 {
return c.PublicMetricsPort
}

port := os.Getenv("PUBLIC_METRICS_PORT");
if port == "" {
parts := strings.Split(c.EigenMetricsIpPortAddress, ":")
if len(parts) !=2 {
panic(fmt.Errorf("EigenMetricsIpPortAddress: %s in operator config file is malform", c.EigenMetricsIpPortAddress))
}

port = parts[1]
}

portNum, _ := strconv.Atoi(port)

c.PublicMetricsPort = int32(portNum)
return c.PublicMetricsPort
}


// // Takes a NewTaskCreatedLog struct as input and returns a TaskResponseHeader struct.
// // The TaskResponseHeader struct is the struct that is signed and sent to the contract as a task response.
// func (o *Operator) ProcessNewTaskCreatedLog(newTaskCreatedLog *cstaskmanager.ContractAutomationTaskManagerNewTaskCreated) *cstaskmanager.IAutomationTaskManagerTaskResponse {
Expand Down
3 changes: 2 additions & 1 deletion operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (o *Operator) PingServer() {
// TODO: generate signature with bls key
Signature: "pending",
Version: version.Get(),
MetricsPort: o.metricsPort,
RemoteIP: o.GetPublicIP(),
MetricsPort: o.config.GetPublicMetricPort(),
})

elapsed := time.Now().Sub(start)
Expand Down
Loading

0 comments on commit b5c0d82

Please sign in to comment.