Skip to content

Commit

Permalink
feat(torch): some feats and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Ramon Mañes <jose@celestia.org>
  • Loading branch information
tty47 committed Jan 26, 2024
1 parent cc3c771 commit f10d0c4
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 24 deletions.
50 changes: 50 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"flag"
"fmt"
"os"
"path"
"runtime"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
Expand All @@ -13,6 +17,52 @@ import (
"github.com/celestiaorg/torch/pkg/k8s"
)

func init() {
setupLogging()
}

func setupLogging() {
// Set the default log level
log.SetLevel(log.InfoLevel)

// Set the custom formatter
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
directory := path.Base(path.Dir(f.File))
return "", directory + "/" + filename + ":" + strconv.Itoa(f.Line)
},
})

// Enable reporting the file and line
log.SetReportCaller(true)

// Read the LOG_LEVEL environment variable
logLevel := os.Getenv("LOG_LEVEL")

// Adjust the log level based on the environment variable
switch strings.ToLower(logLevel) {
case "debug":
log.SetLevel(log.DebugLevel)
case "info":
log.SetLevel(log.InfoLevel)
case "warn":
log.SetLevel(log.WarnLevel)
case "error":
log.SetLevel(log.ErrorLevel)
case "fatal":
log.SetLevel(log.FatalLevel)
case "panic":
log.SetLevel(log.PanicLevel)
default:
// If LOG_LEVEL is not set or has an unrecognized value, use the default Info level
log.Warn("LOG_LEVEL not defined in the env vars, using default 'info'")
}

log.Info("LOG_LEVEL: ", log.GetLevel())
}

// ParseFlags parses the command-line flags and reads the configuration file.
func ParseFlags() config.MutualPeersConfig {
// Define the flag for the configuration file path
Expand Down
63 changes: 44 additions & 19 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric.
consType = "consensus" // consType type of Consensus node.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
Expand Down Expand Up @@ -273,29 +274,53 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
return nil
}

// handleConsensusPeer processes an individual consensus peer by registering its node ID and metrics.
func handleConsensusPeer(peer config.Peer) error {
if peer.NodeType != consType {
return nil
}

consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName)
if err != nil {
log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err)
return err
}

err = metrics.RegisterConsensusNodeMetric(
consNodeId,
peer.ServiceName,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err)
return err
}

return nil
}

// GetAllPeers collects and returns all the Peers from each MutualPeer in the configuration.
func GetAllPeers(cfg config.MutualPeersConfig) []config.Peer {
var allPeers []config.Peer

log.Debug("Processing cfg.MutualPeers: ", cfg.MutualPeers)
for _, mutualPeer := range cfg.MutualPeers {
log.Debug("mutualPeer: ", mutualPeer)
allPeers = append(allPeers, mutualPeer.Peers...)
}

return allPeers
}

// ConsNodesIDs generates the metric with the consensus nodes ids.
func ConsNodesIDs(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the consensus nodes ids...")

for _, mutualPeer := range cfg.MutualPeers {
for _, peer := range mutualPeer.Peers {
if peer.NodeType == "consensus" {
consNodeId, err := nodes.ConsensusNodesIDs(peer.ServiceName)
if err != nil {
log.Error("Error getting consensus node ID for service [", peer.ServiceName, "]: ", err)
return err
}

err = metrics.RegisterConsensusNodeMetric(
consNodeId,
peer.ServiceName,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric for service [", peer.ServiceName, "]: ", err)
return err
}
}
allPeers := GetAllPeers(cfg)
for _, peer := range allPeers {
log.Debug("Processing peer ", peer)
if err := handleConsensusPeer(peer); err != nil {
return err
}
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error {

// Register the callback with the meter and the Float64ObservableGauge.
_, err = meter.RegisterCallback(callback, loadBalancersGauge)
return err
if err != nil {
log.Error("Error registering callback: ", err)
return err
}

return nil
}

// ConsensusNodeMetric represents the information for consensus node metrics.
Expand All @@ -182,7 +187,7 @@ func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error {
metric.WithDescription("Metric for Consensus Node IDs"),
)
if err != nil {
log.Fatalf("Error creating metric: ", err)
log.Error("Error creating metric: ", err)
return err
}

Expand All @@ -201,5 +206,10 @@ func RegisterConsensusNodeMetric(nodeID, nodeName, namespace string) error {

// Register the callback with the meter and the ObservableGauge.
_, err = meter.RegisterCallback(callback, consensusNodeGauge)
return err
if err != nil {
log.Error("Error registering callback: ", err)
return err
}

return nil
}
10 changes: 8 additions & 2 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ var (
consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes.
consContainerName = "consensus" // consContainerName container name which the pod runs.
namespace = k8s.GetCurrentNamespace() // namespace of the node.
KeyResult = "result" // KeyResult result field in the JSON response
KeyBlockID = "block_id" // KeyBlockID block_id field within the 'result' field
KeyHash = "hash" // KeyHash hash field within the 'block_id' field
KeyNodeInfo = "node_info" // KeyNodeInfo node_info field within the 'result' field
KeyID = "id" // KeyID id field within the 'node_info' field

)

// SetConsNodeDefault sets all the default values in case they are empty
Expand All @@ -42,7 +48,7 @@ func GenesisHash(consensusNode string) (string, string, error) {
return "", "", err
}

blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
blockIDHash, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyBlockID].(map[string]interface{})[KeyHash].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", "", errors.New("error accessing block ID hash")
Expand All @@ -66,7 +72,7 @@ func ConsensusNodesIDs(consensusNode string) (string, error) {
return "", err
}

nodeID, ok := jsonResponse["result"].(map[string]interface{})["node_info"].(map[string]interface{})["id"].(string)
nodeID, ok := jsonResponse[KeyResult].(map[string]interface{})[KeyNodeInfo].(map[string]interface{})[KeyID].(string)
if !ok {
log.Error("Unable to access .result.node_info.id")
return "", errors.New("error accessing node ID")
Expand Down

0 comments on commit f10d0c4

Please sign in to comment.