From 493e1c5cffa2ee7a054a8fef1c32722f186448d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Wed, 25 Oct 2023 16:17:17 +0200 Subject: [PATCH] feat(torch): add func to generate the metrics if we already have them in the db MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon MaƱes --- pkg/http/server.go | 73 +++++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index 206387e..7ec13c1 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -10,6 +10,8 @@ import ( "time" "github.com/celestiaorg/torch/config" + "github.com/celestiaorg/torch/pkg/db/redis" + "github.com/celestiaorg/torch/pkg/k8s" "github.com/celestiaorg/torch/pkg/metrics" "github.com/celestiaorg/torch/pkg/nodes" @@ -79,9 +81,21 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) + // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") go nodes.ProcessTaskQueue() + // Initialize the goroutine to add a watcher to the StatefulSets in the namespace. + log.Info("Initializing goroutine to watch over the StatefulSets...") + go k8s.WatchStatefulSets() + + // Check if we already have some multi addresses in the DB and expose them, there might be a situation where Torch + // get restarted, and we already have the nodes IDs, so we can expose them. + err = RegisterMetrics(cfg) + if err != nil { + log.Error("Couldn't generate the metrics...", err) + } + <-done log.Info("Server Stopped") @@ -115,26 +129,39 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool { return false } -// -//// RegisterMetrics generates and registers the metrics for all nodes in the configuration. -//func RegisterMetrics(cfg config.MutualPeersConfig) error { -// log.Info("Generating initial metrics for all the nodes...") -// -// var nodeNames []string -// -// // Adding nodes from config to register the initial metrics -// for _, n := range cfg.MutualPeers { -// for _, no := range n.Peers { -// nodeNames = append(nodeNames, no.NodeName) -// } -// } -// -// // Generate the metrics for all nodes -// _, err := nodes.GenerateAllTrustedPeersAddr(cfg, nodeNames) -// if err != nil { -// log.Errorf("Error GenerateAllTrustedPeersAddr: %v", err) -// return err -// } -// -// return nil -//} +// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB. +func RegisterMetrics(cfg config.MutualPeersConfig) error { + red := redis.InitRedisConfig() + ctx := context.TODO() + + log.Info("Generating metrics from existing nodes...") + + // Adding nodes from config to register the initial metrics + for _, n := range cfg.MutualPeers { + for _, no := range n.Peers { + // checking the node in the DB first + ma, err := redis.CheckIfNodeExistsInDB(red, ctx, no.NodeName) + if err != nil { + log.Error("Error CheckIfNodeExistsInDB : [", no.NodeName, "]", err) + return err + } + + // check if the multi address is not empty + if ma != "" { + log.Info("Node: [", no.NodeName, "], found in the DB generating metric: ", " [", ma, "]") + + // Register a multi-address metric + m := metrics.MultiAddrs{ + ServiceName: "torch", + NodeName: no.NodeName, + MultiAddr: ma, + Namespace: k8s.GetCurrentNamespace(), + Value: 1, + } + k8s.RegisterMetric(m) + } + } + } + + return nil +}