From 2aa597523ff3132cf0f3d6af7463534cc12b1a48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 25 Jan 2024 16:24:07 +0100 Subject: [PATCH 1/3] feat(torch): add new metric to expose the consensus nodes ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- config/config.go | 1 + pkg/http/server.go | 51 ++++++++++++++++++++++---- pkg/metrics/metrics.go | 39 ++++++++++++++++++++ pkg/nodes/consensus.go | 83 ++++++++++++++++++++++++++---------------- 4 files changed, 135 insertions(+), 39 deletions(-) diff --git a/config/config.go b/config/config.go index c29ed03..3d3ec57 100644 --- a/config/config.go +++ b/config/config.go @@ -15,6 +15,7 @@ type MutualPeer struct { // Peer represents a peer structure. type Peer struct { NodeName string `yaml:"nodeName"` // NodeName name of the sts/deployment + ServiceName string `yaml:"serviceName,omitempty"` // ServiceName name of the service NodeType string `yaml:"nodeType"` // NodeType specify the type of node Namespace string `yaml:"namespace,omitempty"` // Namespace of the node ContainerName string `yaml:"containerName,omitempty"` // ContainerName name of the main container diff --git a/pkg/http/server.go b/pkg/http/server.go index d55afbc..e4be9df 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -204,19 +204,25 @@ func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) er select { case <-ctx.Done(): // Context canceled, stop the process - log.Info("Context canceled, stopping WatchHashMetric.") + log.Info("Context canceled, stopping metrics watch process.") return ctx.Err() default: - err := GenerateHashMetrics(cfg) - // Check if err is nil, if so, Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block, let's stop the process successfully...") - // The metric was successfully generated, stop the retries. + hashMetricsErr := GenerateHashMetrics(cfg) + consensusMetricsErr := ConsNodesIDs(cfg) + + // Check if both metrics generation are successful + if hashMetricsErr == nil && consensusMetricsErr == nil { + log.Info("Metrics generated successfully, stopping the process...") return nil } - // Log the error - log.Error("Error generating hash metrics: ", err) + // Log errors if they occur + if hashMetricsErr != nil { + log.Error("Error generating hash metrics: ", hashMetricsErr) + } + if consensusMetricsErr != nil { + log.Error("Error generating consensus node ID metrics: ", consensusMetricsErr) + } // Wait for the retry interval before the next execution using a timer if err := waitForRetry(ctx); err != nil { @@ -267,6 +273,35 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error { return nil } +// ConsNodesIDs generates the metric with the consensus nodes ids. +func ConsNodesIDs(cfg config.MutualPeersConfig) error { + log.Info("Generating the metric for the consensus nodes ids...") + + for _, mutualPeer := range cfg.MutualPeers { + for _, peer := range mutualPeer.Peers { + if peer.NodeType == "consensus" { + consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName) + if err != nil { + log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err) + return err + } + + err = metrics.RegisterConsensusNodeMetric( + consNodeId, + peer.ServiceName,q + os.Getenv("POD_NAMESPACE"), + ) + if err != nil { + log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err) + return err + } + } + } + } + + return nil +} + // RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB. func RegisterMetrics(cfg config.MutualPeersConfig) error { red := redis.InitRedisConfig() diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c9b2e3d..b8ddb0c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -164,3 +164,42 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error { _, err = meter.RegisterCallback(callback, loadBalancersGauge) return err } + +// ConsensusNodeMetric represents the information for consensus node metrics. +type ConsensusNodeMetric struct { + NodeName string // NodeName is the name of the node. + NodeID string // NodeID is the ID of the node. + Namespace string // Namespace of the node. +} + +// RegisterConsensusNodeMetric creates and registers metrics for consensus nodes. +func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error { + log.Info("Registering metric for consensus node: ", nodeName) + + // Create an ObservableGauge for consensus node metrics. + consensusNodeGauge, err := meter.Float64ObservableGauge( + "consensus_node_ids_metric", + metric.WithDescription("Metric for Consensus Node IDs"), + ) + if err != nil { + log.Fatalf("Error creating metric: ", err) + return err + } + + callback := func(ctx context.Context, observer metric.Observer) error { + // Define the callback function that will be called periodically to observe metrics. + labels := metric.WithAttributes( + attribute.String("node_name", nodeName), + attribute.String("node_id", nodeID), + attribute.String("namespace", namespace), + ) + // Observe the value for current consensus node metrics with associated labels. + observer.ObserveFloat64(consensusNodeGauge, 1, labels) + + return nil + } + + // Register the callback with the meter and the ObservableGauge. + _, err = meter.RegisterCallback(callback, consensusNodeGauge) + return err +} diff --git a/pkg/nodes/consensus.go b/pkg/nodes/consensus.go index a265be2..91248f3 100644 --- a/pkg/nodes/consensus.go +++ b/pkg/nodes/consensus.go @@ -2,6 +2,7 @@ package nodes import ( "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -32,57 +33,77 @@ func SetConsNodeDefault(peer config.Peer) config.Peer { return peer } -// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode -// makes a request to the API and gets the info about the genesis and return it +// GenesisHash connects to the specified consensus node, makes a request to the API, +// and retrieves information about the genesis block including its hash and time. func GenesisHash(consensusNode string) (string, string, error) { url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode) + jsonResponse, err := makeAPIRequest(url) + if err != nil { + return "", "", err + } + + blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) + if !ok { + log.Error("Unable to access .block_id.hash") + return "", "", errors.New("error accessing block ID hash") + } + + blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) + if !ok { + log.Error("Unable to access .block.header.time") + return "", "", errors.New("error accessing block time") + } + + return blockIDHash, blockTime, nil +} + +// ConsensusNodesIDs connects to the specified consensus node, makes a request to the API, +// and retrieves the node ID from the status response. +func ConsensusNodesIDs(consensusNode string) (string, error) { + url := fmt.Sprintf("http://%s:26657/status?", consensusNode) + jsonResponse, err := makeAPIRequest(url) + if err != nil { + return "", err + } + + nodeID, ok := jsonResponse["result"].(map[string]interface{})["node_info"].(map[string]interface{})["id"].(string) + if !ok { + log.Error("Unable to access .result.node_info.id") + return "", errors.New("error accessing node ID") + } + log.Info("Consensus Node [", consensusNode, "] ID: [", nodeID, "]") + + return nodeID, nil +} + +// makeAPIRequest handles the common task of making an HTTP request to a given URL +// and parsing the JSON response. It returns a map representing the JSON response or an error. +func makeAPIRequest(url string) (map[string]interface{}, error) { response, err := http.Get(url) if err != nil { - log.Error("Error making the request to the node [", consensusNode, "] - ", err) - return "", "", err + log.Error("Error making the request: ", err) + return nil, err } defer response.Body.Close() if response.StatusCode != http.StatusOK { log.Error("Non-OK response:", response.Status) - return "", "", err + return nil, err } bodyBytes, err := ioutil.ReadAll(response.Body) if err != nil { log.Error("Error reading response body:", err) - return "", "", err + return nil, err } - bodyString := string(bodyBytes) - log.Info("Response Body: ", bodyString) - - // Parse the JSON response into a generic map var jsonResponse map[string]interface{} - err = json.Unmarshal([]byte(bodyString), &jsonResponse) + err = json.Unmarshal(bodyBytes, &jsonResponse) if err != nil { log.Error("Error parsing JSON:", err) - return "", "", err - } - - // Access and print the .block_id.hash field - blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) - if !ok { - log.Error("Unable to access .block_id.hash") - return "", "", err + return nil, err } - // Access and print the .block.header.time field - blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) - if !ok { - log.Error("Unable to access .block.header.time") - return "", "", err - } - - log.Info("Block ID Hash: ", blockIDHash) - log.Info("Block Time: ", blockTime) - log.Info("Full output: ", bodyString) - - return blockIDHash, blockTime, nil + return jsonResponse, nil } From cc3c771fa97583287b137457e93128854ba85477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 25 Jan 2024 16:32:17 +0100 Subject: [PATCH 2/3] feat(torch): remove char MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index e4be9df..b5dc921 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -288,7 +288,7 @@ func ConsNodesIDs(cfg config.MutualPeersConfig) error { err = metrics.RegisterConsensusNodeMetric( consNodeId, - peer.ServiceName,q + peer.ServiceName, os.Getenv("POD_NAMESPACE"), ) if err != nil { From f10d0c48578a1b09c7f0d8aae4320b34111c9b65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Fri, 26 Jan 2024 13:28:12 +0100 Subject: [PATCH 3/3] feat(torch): some feats and fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- cmd/main.go | 50 +++++++++++++++++++++++++++++++++ pkg/http/server.go | 63 +++++++++++++++++++++++++++++------------- pkg/metrics/metrics.go | 16 +++++++++-- pkg/nodes/consensus.go | 10 +++++-- 4 files changed, 115 insertions(+), 24 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 7c13d06..8da0c8d 100755 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,6 +4,10 @@ import ( "flag" "fmt" "os" + "path" + "runtime" + "strconv" + "strings" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -13,6 +17,52 @@ import ( "github.com/celestiaorg/torch/pkg/k8s" ) +func init() { + setupLogging() +} + +func setupLogging() { + // Set the default log level + log.SetLevel(log.InfoLevel) + + // Set the custom formatter + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + directory := path.Base(path.Dir(f.File)) + return "", directory + "/" + filename + ":" + strconv.Itoa(f.Line) + }, + }) + + // Enable reporting the file and line + log.SetReportCaller(true) + + // Read the LOG_LEVEL environment variable + logLevel := os.Getenv("LOG_LEVEL") + + // Adjust the log level based on the environment variable + switch strings.ToLower(logLevel) { + case "debug": + log.SetLevel(log.DebugLevel) + case "info": + log.SetLevel(log.InfoLevel) + case "warn": + log.SetLevel(log.WarnLevel) + case "error": + log.SetLevel(log.ErrorLevel) + case "fatal": + log.SetLevel(log.FatalLevel) + case "panic": + log.SetLevel(log.PanicLevel) + default: + // If LOG_LEVEL is not set or has an unrecognized value, use the default Info level + log.Warn("LOG_LEVEL not defined in the env vars, using default 'info'") + } + + log.Info("LOG_LEVEL: ", log.GetLevel()) +} + // ParseFlags parses the command-line flags and reads the configuration file. func ParseFlags() config.MutualPeersConfig { // Define the flag for the configuration file path diff --git a/pkg/http/server.go b/pkg/http/server.go index b5dc921..535db09 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -23,6 +23,7 @@ import ( const ( retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric. hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric. + consType = "consensus" // consType type of Consensus node. ) // GetHttpPort GetPort retrieves the namespace where the service will be deployed @@ -273,29 +274,53 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error { return nil } +// handleConsensusPeer processes an individual consensus peer by registering its node ID and metrics. +func handleConsensusPeer(peer config.Peer) error { + if peer.NodeType != consType { + return nil + } + + consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName) + if err != nil { + log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err) + return err + } + + err = metrics.RegisterConsensusNodeMetric( + consNodeId, + peer.ServiceName, + os.Getenv("POD_NAMESPACE"), + ) + if err != nil { + log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err) + return err + } + + return nil +} + +// GetAllPeers collects and returns all the Peers from each MutualPeer in the configuration. +func GetAllPeers(cfg config.MutualPeersConfig) []config.Peer { + var allPeers []config.Peer + + log.Debug("Processing cfg.MutualPeers: ", cfg.MutualPeers) + for _, mutualPeer := range cfg.MutualPeers { + log.Debug("mutualPeer: ", mutualPeer) + allPeers = append(allPeers, mutualPeer.Peers...) + } + + return allPeers +} + // ConsNodesIDs generates the metric with the consensus nodes ids. func ConsNodesIDs(cfg config.MutualPeersConfig) error { log.Info("Generating the metric for the consensus nodes ids...") - for _, mutualPeer := range cfg.MutualPeers { - for _, peer := range mutualPeer.Peers { - if peer.NodeType == "consensus" { - consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName) - if err != nil { - log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err) - return err - } - - err = metrics.RegisterConsensusNodeMetric( - consNodeId, - peer.ServiceName, - os.Getenv("POD_NAMESPACE"), - ) - if err != nil { - log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err) - return err - } - } + allPeers := GetAllPeers(cfg) + for _, peer := range allPeers { + log.Debug("Processing peer ", peer) + if err := handleConsensusPeer(peer); err != nil { + return err } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index b8ddb0c..00832fd 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -162,7 +162,12 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error { // Register the callback with the meter and the Float64ObservableGauge. _, err = meter.RegisterCallback(callback, loadBalancersGauge) - return err + if err != nil { + log.Error("Error registering callback: ", err) + return err + } + + return nil } // ConsensusNodeMetric represents the information for consensus node metrics. @@ -182,7 +187,7 @@ func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error { metric.WithDescription("Metric for Consensus Node IDs"), ) if err != nil { - log.Fatalf("Error creating metric: ", err) + log.Error("Error creating metric: ", err) return err } @@ -201,5 +206,10 @@ func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error { // Register the callback with the meter and the ObservableGauge. _, err = meter.RegisterCallback(callback, consensusNodeGauge) - return err + if err != nil { + log.Error("Error registering callback: ", err) + return err + } + + return nil } diff --git a/pkg/nodes/consensus.go b/pkg/nodes/consensus.go index 91248f3..40da6b0 100644 --- a/pkg/nodes/consensus.go +++ b/pkg/nodes/consensus.go @@ -17,6 +17,12 @@ var ( consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes. consContainerName = "consensus" // consContainerName container name which the pod runs. namespace = k8s.GetCurrentNamespace() // namespace of the node. + KeyResult = "result" // KeyResult result field in the JSON response + KeyBlockID = "block_id" // KeyBlockID block_id field within the 'result' field + KeyHash = "hash" // KeyHash hash field within the 'block_id' field + KeyNodeInfo = "node_info" // KeyNodeInfo node_info field within the 'result' field + KeyID = "id" // KeyID id field within the 'node_info' field + ) // SetConsNodeDefault sets all the default values in case they are empty @@ -42,7 +48,7 @@ func GenesisHash(consensusNode string) (string, string, error) { return "", "", err } - blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) + blockIDHash, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyBlockID].(map[string]interface{})[KeyHash].(string) if !ok { log.Error("Unable to access .block_id.hash") return "", "", errors.New("error accessing block ID hash") @@ -66,7 +72,7 @@ func ConsensusNodesIDs(consensusNode string) (string, error) { return "", err } - nodeID, ok := jsonResponse["result"].(map[string]interface{})["node_info"].(map[string]interface{})["id"].(string) + nodeID, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyNodeInfo].(map[string]interface{})[KeyID].(string) if !ok { log.Error("Unable to access .result.node_info.id") return "", errors.New("error accessing node ID")