Skip to content

Commit

Permalink
feat: Adding Metrics for Cluster Collector (odigos-io#1691)
Browse files Browse the repository at this point in the history
  • Loading branch information
yodigos authored Nov 5, 2024
1 parent 09c02a8 commit 720a09d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 6 deletions.
12 changes: 11 additions & 1 deletion cli/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ const (
LogsDir = "Logs"
CRDsDir = "CRDs"
ProfileDir = "Profile"
MetricsDir = "Metrics"
)

var (
diagnoseDirs = []string{LogsDir, CRDsDir, ProfileDir}
diagnoseDirs = []string{LogsDir, CRDsDir, ProfileDir, MetricsDir}
)

var diagnoseCmd = &cobra.Command{
Expand Down Expand Up @@ -79,6 +80,15 @@ func startDiagnose(ctx context.Context, client *kube.Client) error {
}
}()

// Fetch Odigos Collector Metrics
wg.Add(1)
go func() {
defer wg.Done()
if err = diagnose_util.FetchOdigosCollectorMetrics(ctx, client, filepath.Join(mainTempDir, MetricsDir)); err != nil {
fmt.Printf("Error calculating Odigos Metrics: %v\n", err)
}
}()

// Fetch Odigos Destinations
wg.Add(1)
go func() {
Expand Down
99 changes: 99 additions & 0 deletions cli/cmd/diagnose_util/profiling_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
"github.com/odigos-io/odigos/cli/cmd/resources"
"github.com/odigos-io/odigos/cli/pkg/kube"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"path/filepath"
"strconv"
"sync"
)

Expand Down Expand Up @@ -133,3 +135,100 @@ func captureProfile(ctx context.Context, client *kube.Client, podName string, na

return nil
}

func FetchOdigosCollectorMetrics(ctx context.Context, client *kube.Client, metricsDir string) error {
odigosNamespace, err := resources.GetOdigosNamespace(client, ctx)
if err != nil {
return nil
}

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
err = collectMetrics(ctx, client, odigosNamespace, metricsDir, consts.CollectorsRoleClusterGateway)
if err != nil {
fmt.Printf("Error Getting Metrics Data of: %v, because: %v\n", consts.CollectorsRoleClusterGateway, err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
err = collectMetrics(ctx, client, odigosNamespace, metricsDir, consts.CollectorsRoleNodeCollector)
if err != nil {
fmt.Printf("Error Getting Metrics Data of: %v, because: %v\n", consts.CollectorsRoleNodeCollector, err)
}
}()

wg.Wait()

return nil
}

func collectMetrics(ctx context.Context, client *kube.Client, odigosNamespace string, metricsDir string, collectorRole consts.CollectorRole) error {
collectorPods, err := client.CoreV1().Pods(odigosNamespace).List(ctx, metav1.ListOptions{
LabelSelector: "odigos.io/collector-role=" + string(collectorRole),
})
if err != nil {
return err
}

var wg sync.WaitGroup

for _, collectorPod := range collectorPods.Items {
fmt.Printf("Fetching metrics for pod: %v", collectorPod.Name)
metricFilePath := filepath.Join(metricsDir, collectorPod.Name)
metricFile, err := os.OpenFile(metricFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
fmt.Printf("Error creating file: %v, because: %v", metricFilePath, err)
continue
}
defer metricFile.Close()

wg.Add(1)

go func() {
defer wg.Done()
err = captureMetrics(ctx, client, collectorPod.Name, odigosNamespace, metricFile, collectorRole)
if err != nil {
fmt.Printf("Error Getting Metrics Data of: %v, because: %v\n", metricFile, err)
}
}()
}

wg.Wait()
return nil
}

func captureMetrics(ctx context.Context, client *kube.Client, podName string, namespace string, metricFile *os.File, collectorRole consts.CollectorRole) error {
portNumber := ""
if collectorRole == consts.CollectorsRoleClusterGateway {
portNumber = strconv.Itoa(int(consts.OdigosClusterCollectorOwnTelemetryPortDefault))
} else if collectorRole == consts.CollectorsRoleNodeCollector {
portNumber = strconv.Itoa(int(consts.OdigosNodeCollectorOwnTelemetryPortDefault))
} else {
return nil
}

proxyURL := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s:%s/proxy/metrics", namespace, podName, portNumber)

// Make the HTTP GET request via the API server proxy
request := client.Clientset.CoreV1().RESTClient().
Get().
AbsPath(proxyURL).
Do(ctx)

response, err := request.Raw()
if err != nil {
return err
}

_, err = io.Copy(metricFile, bytes.NewReader(response))
if err != nil {
return err
}

return nil
}
4 changes: 4 additions & 0 deletions k8sutils/pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (

OdigosClusterCollectorCollectorGroupName = OdigosClusterCollectorDeploymentName
OdigosClusterCollectorConfigMapKey = "collector-conf"

// The cluster gateway collector runs as a deployment and the pod is exposed as a service.
// Thus it cannot collide with other ports on the same node, and we can use an handy default port.
OdigosClusterCollectorOwnTelemetryPortDefault = int32(8888)
)

const (
Expand Down
6 changes: 1 addition & 5 deletions scheduler/controllers/collectorgroups/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// The cluster gateway collector runs as a deployment and the pod is exposed as a service.
// Thus it cannot collide with other ports on the same node, and we can use an handy default port.
const ClusterCollectorDefaultOwnMetricsPort = 8888

func NewClusterCollectorGroup(namespace string) *odigosv1.CollectorsGroup {
return &odigosv1.CollectorsGroup{
TypeMeta: metav1.TypeMeta{
Expand All @@ -22,7 +18,7 @@ func NewClusterCollectorGroup(namespace string) *odigosv1.CollectorsGroup {
},
Spec: odigosv1.CollectorsGroupSpec{
Role: odigosv1.CollectorsGroupRoleClusterGateway,
CollectorOwnMetricsPort: ClusterCollectorDefaultOwnMetricsPort,
CollectorOwnMetricsPort: consts.OdigosClusterCollectorOwnTelemetryPortDefault,
},
}
}

0 comments on commit 720a09d

Please sign in to comment.