Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torch): add new metric to expose the consensus nodes ids #6

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"flag"
"fmt"
"os"
"path"
"runtime"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
Expand All @@ -13,6 +17,52 @@ import (
"github.com/celestiaorg/torch/pkg/k8s"
)

func init() {
setupLogging()
}

func setupLogging() {
// Set the default log level
log.SetLevel(log.InfoLevel)

// Set the custom formatter
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
directory := path.Base(path.Dir(f.File))
return "", directory + "/" + filename + ":" + strconv.Itoa(f.Line)
},
})

// Enable reporting the file and line
log.SetReportCaller(true)

// Read the LOG_LEVEL environment variable
logLevel := os.Getenv("LOG_LEVEL")

// Adjust the log level based on the environment variable
switch strings.ToLower(logLevel) {
case "debug":
log.SetLevel(log.DebugLevel)
case "info":
log.SetLevel(log.InfoLevel)
case "warn":
log.SetLevel(log.WarnLevel)
case "error":
log.SetLevel(log.ErrorLevel)
case "fatal":
log.SetLevel(log.FatalLevel)
case "panic":
log.SetLevel(log.PanicLevel)
default:
// If LOG_LEVEL is not set or has an unrecognized value, use the default Info level
log.Warn("LOG_LEVEL not defined in the env vars, using default 'info'")
}

log.Info("LOG_LEVEL: ", log.GetLevel())
}

// ParseFlags parses the command-line flags and reads the configuration file.
func ParseFlags() config.MutualPeersConfig {
// Define the flag for the configuration file path
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type MutualPeer struct {
// Peer represents a peer structure.
type Peer struct {
NodeName string `yaml:"nodeName"` // NodeName name of the sts/deployment
ServiceName string `yaml:"serviceName,omitempty"` // ServiceName name of the service
NodeType string `yaml:"nodeType"` // NodeType specify the type of node
Namespace string `yaml:"namespace,omitempty"` // Namespace of the node
ContainerName string `yaml:"containerName,omitempty"` // ContainerName name of the main container
Expand Down
76 changes: 68 additions & 8 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric.
consType = "consensus" // consType type of Consensus node.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
Expand Down Expand Up @@ -204,19 +205,25 @@ func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) er
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
log.Info("Context canceled, stopping metrics watch process.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
hashMetricsErr := GenerateHashMetrics(cfg)
consensusMetricsErr := ConsNodesIDs(cfg)

// Check if both metrics generation are successful
if hashMetricsErr == nil && consensusMetricsErr == nil {
log.Info("Metrics generated successfully, stopping the process...")
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)
// Log errors if they occur
if hashMetricsErr != nil {
log.Error("Error generating hash metrics: ", hashMetricsErr)
}
if consensusMetricsErr != nil {
log.Error("Error generating consensus node ID metrics: ", consensusMetricsErr)
}

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
Expand Down Expand Up @@ -267,6 +274,59 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
return nil
}

// handleConsensusPeer processes an individual consensus peer by registering its node ID and metrics.
func handleConsensusPeer(peer config.Peer) error {
if peer.NodeType != consType {
return nil
}

consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName)
if err != nil {
log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err)
return err
}

err = metrics.RegisterConsensusNodeMetric(
consNodeId,
peer.ServiceName,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err)
return err
}

return nil
}

// GetAllPeers collects and returns all the Peers from each MutualPeer in the configuration.
func GetAllPeers(cfg config.MutualPeersConfig) []config.Peer {
var allPeers []config.Peer

log.Debug("Processing cfg.MutualPeers: ", cfg.MutualPeers)
for _, mutualPeer := range cfg.MutualPeers {
log.Debug("mutualPeer: ", mutualPeer)
allPeers = append(allPeers, mutualPeer.Peers...)
}

return allPeers
}

// ConsNodesIDs generates the metric with the consensus nodes ids.
func ConsNodesIDs(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the consensus nodes ids...")

allPeers := GetAllPeers(cfg)
for _, peer := range allPeers {
log.Debug("Processing peer ", peer)
if err := handleConsensusPeer(peer); err != nil {
return err
}
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
func RegisterMetrics(cfg config.MutualPeersConfig) error {
red := redis.InitRedisConfig()
Expand Down
51 changes: 50 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,54 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error {

// Register the callback with the meter and the Float64ObservableGauge.
_, err = meter.RegisterCallback(callback, loadBalancersGauge)
return err
if err != nil {
log.Error("Error registering callback: ", err)
return err
}

return nil
}

// ConsensusNodeMetric represents the information for consensus node metrics.
type ConsensusNodeMetric struct {
NodeName string // NodeName is the name of the node.
NodeID string // NodeID is the ID of the node.
Namespace string // Namespace of the node.
}

// RegisterConsensusNodeMetric creates and registers metrics for consensus nodes.
func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error {
log.Info("Registering metric for consensus node: ", nodeName)

// Create an ObservableGauge for consensus node metrics.
consensusNodeGauge, err := meter.Float64ObservableGauge(
"consensus_node_ids_metric",
metric.WithDescription("Metric for Consensus Node IDs"),
)
if err != nil {
log.Error("Error creating metric: ", err)
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
labels := metric.WithAttributes(
attribute.String("node_name", nodeName),
attribute.String("node_id", nodeID),
attribute.String("namespace", namespace),
)
// Observe the value for current consensus node metrics with associated labels.
observer.ObserveFloat64(consensusNodeGauge, 1, labels)
tty47 marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

// Register the callback with the meter and the ObservableGauge.
_, err = meter.RegisterCallback(callback, consensusNodeGauge)
if err != nil {
log.Error("Error registering callback: ", err)
return err
}

return nil
}
89 changes: 58 additions & 31 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodes

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -16,6 +17,12 @@ var (
consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes.
consContainerName = "consensus" // consContainerName container name which the pod runs.
namespace = k8s.GetCurrentNamespace() // namespace of the node.
KeyResult = "result" // KeyResult result field in the JSON response
KeyBlockID = "block_id" // KeyBlockID block_id field within the 'result' field
KeyHash = "hash" // KeyHash hash field within the 'block_id' field
KeyNodeInfo = "node_info" // KeyNodeInfo node_info field within the 'result' field
KeyID = "id" // KeyID id field within the 'node_info' field

)

// SetConsNodeDefault sets all the default values in case they are empty
Expand All @@ -32,57 +39,77 @@ func SetConsNodeDefault(peer config.Peer) config.Peer {
return peer
}

// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode
// makes a request to the API and gets the info about the genesis and return it
// GenesisHash connects to the specified consensus node, makes a request to the API,
// and retrieves information about the genesis block including its hash and time.
func GenesisHash(consensusNode string) (string, string, error) {
url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", "", err
}

blockIDHash, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyBlockID].(map[string]interface{})[KeyHash].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", errors.New("error accessing block ID hash")
}

blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", errors.New("error accessing block time")
}

return blockIDHash, blockTime, nil
}

// ConsensusNodesIDs connects to the specified consensus node, makes a request to the API,
// and retrieves the node ID from the status response.
func ConsensusNodesIDs(consensusNode string) (string, error) {
url := fmt.Sprintf("http://%s:26657/status?", consensusNode)
jsonResponse, err := makeAPIRequest(url)
if err != nil {
return "", err
}

nodeID, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyNodeInfo].(map[string]interface{})[KeyID].(string)
if !ok {
log.Error("Unable to access .result.node_info.id")
return "", errors.New("error accessing node ID")
}

log.Info("Consensus Node [", consensusNode, "] ID: [", nodeID, "]")

return nodeID, nil
}

// makeAPIRequest handles the common task of making an HTTP request to a given URL
// and parsing the JSON response. It returns a map representing the JSON response or an error.
func makeAPIRequest(url string) (map[string]interface{}, error) {
response, err := http.Get(url)
if err != nil {
log.Error("Error making the request to the node [", consensusNode, "] - ", err)
return "", "", err
log.Error("Error making the request: ", err)
return nil, err
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
log.Error("Non-OK response:", response.Status)
return "", "", err
return nil, err
}

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Error reading response body:", err)
return "", "", err
return nil, err
}

bodyString := string(bodyBytes)
log.Info("Response Body: ", bodyString)

// Parse the JSON response into a generic map
var jsonResponse map[string]interface{}
err = json.Unmarshal([]byte(bodyString), &jsonResponse)
err = json.Unmarshal(bodyBytes, &jsonResponse)
if err != nil {
log.Error("Error parsing JSON:", err)
return "", "", err
return nil, err
}

// Access and print the .block_id.hash field
blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", err
}

// Access and print the .block.header.time field
blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", "", err
}

log.Info("Block ID Hash: ", blockIDHash)
log.Info("Block Time: ", blockTime)
log.Info("Full output: ", bodyString)

return blockIDHash, blockTime, nil
return jsonResponse, nil
}
Loading