Skip to content

Commit

Permalink
feat(torch): split some files and rename
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Ramon Mañes <jose@celestia.org>
  • Loading branch information
tty47 committed Oct 25, 2023
1 parent 493e1c5 commit f5d298e
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 243 deletions.
115 changes: 55 additions & 60 deletions pkg/k8s/commands.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,71 @@
package k8s

import (
"fmt"
)
"bytes"

var (
trustedPeerFile = "/tmp/TP-ADDR"
trustedPeerFileConsensus = "/home/celestia/config/TP-ADDR"
trustedPeerFileDA = "/tmp/CONSENSUS_NODE_SERVICE"
nodeIpFile = "/tmp/NODE_IP"
cmd = `$(ifconfig | grep -oE 'inet addr:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | grep -v '127.0.0.1' | awk '{print substr($2, 6)}')`
trustedPeerPrefix = "/ip4/" + cmd + "/tcp/2121/p2p/"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

// CreateFileWithEnvVar creates the file in the FS with the node to connect
func CreateFileWithEnvVar(nodeToFile, nodeType string) []string {
f := ""
if nodeType == "consensus" {
f = trustedPeerFileConsensus
// RunRemoteCommand executes a remote command on the specified node.
func RunRemoteCommand(nodeName, container, namespace string, command []string) (string, error) {
clusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Error("Error: ", err.Error())
}
if nodeType == "da" {
f = trustedPeerFileDA
// creates the client
client, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
log.Fatalf("Error: %v", err.Error())
}

script := fmt.Sprintf(`
#!/bin/sh
echo -n "%[2]s" > "%[1]s"`, f, nodeToFile)

return []string{"sh", "-c", script}
}

// CreateTrustedPeerCommand generates the command for creating trusted peers.
// we have to use the shell script because we can only get the token and the
// nodeID from the node itself
func CreateTrustedPeerCommand() []string {
script := fmt.Sprintf(`
#!/bin/sh
# generate the token
export AUTHTOKEN=$(celestia bridge auth admin --node.store /home/celestia)
# remove the first warning line...
export AUTHTOKEN=$(echo $AUTHTOKEN|rev|cut -d' ' -f1|rev)
# make the request and parse the response
TP_ADDR=$(wget --header="Authorization: Bearer $AUTHTOKEN" \
--header="Content-Type: application/json" \
--post-data='{"jsonrpc":"2.0","id":0,"method":"p2p.Info","params":[]}' \
--output-document - \
http://localhost:26658 | grep -o '"ID":"[^"]*"' | sed 's/"ID":"\([^"]*\)"/\1/')
// Create a request to execute the command on the specified node.
req := client.CoreV1().RESTClient().Post().
Resource("pods").
Name(nodeName).
Namespace(namespace).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Command: command,
Container: container,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)

echo -n "${TP_ADDR}" >> "%[1]s"
cat "%[1]s"
`, trustedPeerFile, trustedPeerPrefix)
// Execute the remote command.
output, err := executeCommand(clusterConfig, req)
if err != nil {
log.Error("failed to execute remote command: ", err)
}

return []string{"sh", "-c", script}
return output, nil
}

// GetNodeIP
func GetNodeIP() []string {
script := fmt.Sprintf(`
#!/bin/sh
echo -n "%[2]s" > "%[1]s"
cat "%[1]s"`, nodeIpFile, trustedPeerPrefix)
return []string{"sh", "-c", script}
}
// executeCommand executes the remote command using the provided configuration, request, and output writer.
func executeCommand(config *rest.Config, req *rest.Request) (string, error) {
executor, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
log.Error("failed to create SPDY executor: ", err)
}

// WriteToFile writes content into a file
func WriteToFile(content, file string) []string {
script := fmt.Sprintf(`
#!/bin/sh
echo -n "%[1]s" > "%[2]s"
cat "%[2]s"`, content, file)
// Prepare the standard I/O streams.
var stdout, stderr bytes.Buffer

// Execute the remote command and capture the output.
err = executor.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if err != nil {
log.Error("failed to execute command stream: ", err)
}

return []string{"sh", "-c", script}
return stdout.String(), nil
}
84 changes: 0 additions & 84 deletions pkg/k8s/k8s.go

This file was deleted.

124 changes: 26 additions & 98 deletions pkg/k8s/metrics.go
Original file line number Diff line number Diff line change
@@ -1,115 +1,43 @@
package metrics
package k8s

import (
"context"
"time"
"github.com/celestiaorg/torch/pkg/metrics"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// Get the meter from the global meter provider with the name "torch".
var meter = otel.GetMeterProvider().Meter("torch")
// Declare a slice to hold multiple Multi Addresses metrics.
var multiAddresses []metrics.MultiAddrs

// MultiAddrs represents the information for a Multi Addresses.
type MultiAddrs struct {
ServiceName string // ServiceName Name of the service associated with the Multi Addresses.
NodeName string // NodeName Name of the node.
MultiAddr string // MultiAddr Multi Addresses value.
Namespace string // Namespace where the service is deployed.
Value float64 // Value to be observed for the Multi Addresses.
}

// WithMetricsMultiAddress creates a callback function to observe metrics for multiple Multi Addresses.
func WithMetricsMultiAddress(multiAddrs []MultiAddrs) error {
log.Info("registering metric: ", multiAddrs)
// Create a Float64ObservableGauge named "Multi Addresses" with a description for the metric.
multiAddressesGauge, err := meter.Float64ObservableGauge(
"multiaddr",
metric.WithDescription("Torch - MultiAddresses"),
)
if err != nil {
log.Fatalf(err.Error())
return err
}

// Define the callback function that will be called periodically to observe metrics.
callback := func(ctx context.Context, observer metric.Observer) error {
for _, ma := range multiAddrs {
// Create labels with attributes for each Multi Addresses.
labels := metric.WithAttributes(
attribute.String("service_name", ma.ServiceName),
attribute.String("node_name", ma.NodeName),
attribute.String("multiaddress", ma.MultiAddr),
attribute.String("namespace", ma.Namespace),
)
// Observe the float64 value for the current Multi Addresses with the associated labels.
observer.ObserveFloat64(multiAddressesGauge, ma.Value, labels)
// MultiAddrExists checks if a given MultiAddr already exists in the multiAddresses slice.
// It returns true if the MultiAddr already exists, and false otherwise.
func MultiAddrExists(multiAddr string) bool {
for _, addr := range multiAddresses {
// Compare each MultiAddr in the slice with the provided multiAddr.
if addr.MultiAddr == multiAddr {
return true
}

return nil
}

// Register the callback with the meter and the Float64ObservableGauge.
_, err = meter.RegisterCallback(callback, multiAddressesGauge)
return err
return false
}

// BlockHeight represents the information for the block height 1.
type BlockHeight struct {
ServiceName string // ServiceName Name of the service associated with the multi-address.
BlockHeight string // BlockHeight height of the block.
Value float64 // Value to be observed for the multi-address.
}

// WithMetricsBlockHeight creates a callback function to observe metrics for block_height_1.
// consensus-node:26657/block?height=1
func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespace string) error {
log.Info("registering metric: ", blockHeight)
// Create a Float64ObservableGauge named "block_height_1" with a description for the metric.
blockHeightGauge, err := meter.Float64ObservableGauge(
"block_height_1",
metric.WithDescription("Torch - BlockHeight"),
)
if err != nil {
log.Fatalf(err.Error())
return err
// RegisterMetric adds a new Multi Addresses metric to the multiAddresses slice.
// Before adding, it checks if the MultiAddr already exists in the slice using MultiAddrExists function.
// If the MultiAddr already exists, it logs a message and skips the addition.
// Otherwise, it appends the new Multi Addresses to the slice and registers the updated metrics.
func RegisterMetric(m metrics.MultiAddrs) {
// Check if the MultiAddr already exists in the array
if MultiAddrExists(m.MultiAddr) {
log.Info("MultiAddr already exists in the metrics array: ", m.NodeName, " ", m.MultiAddr)
return
}
callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
// Create labels with attributes for each block_height_1.
labels := metric.WithAttributes(
attribute.String("service_name", serviceName),
attribute.String("block_height_1", blockHeight),
attribute.String("earliest_block_time", earliestBlockTime),
attribute.Int("days_running", CalculateDaysDifference(earliestBlockTime)),
attribute.String("namespace", namespace),
)
// Observe the float64 value for the current block_height_1 with the associated labels.
observer.ObserveFloat64(blockHeightGauge, 1, labels)

return nil
}
// Append the new MultiAddr to the array
multiAddresses = append(multiAddresses, m)

// Register the callback with the meter and the Float64ObservableGauge.
_, err = meter.RegisterCallback(callback, blockHeightGauge)
return err
}

// CalculateDaysDifference based on the date received, returns the number of days since this day.
func CalculateDaysDifference(inputTimeString string) int {
layout := "2006-01-02T15:04:05.999999999Z"
inputTime, err := time.Parse(layout, inputTimeString)
// Register the metric
err := metrics.WithMetricsMultiAddress(multiAddresses)
if err != nil {
log.Error("Error parsing time: [", inputTimeString, "]", err)
return -1
log.Printf("Failed to update metrics: %v", err)
}

currentTime := time.Now()
timeDifference := currentTime.Sub(inputTime)
daysDifference := int(timeDifference.Hours() / 24)

return daysDifference
}
Loading

0 comments on commit f5d298e

Please sign in to comment.