From 34ca2c33c1fbdf26541b334b542d90e32431d98c Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Fri, 25 Oct 2024 16:56:39 +0300 Subject: [PATCH] feat: move default node collector port to avoid conflicts and add setting in config (#1618) Since node collector runs as a DaemonSet with host network (so that otlp ports 4317 and 4318 are accessible from the pods exporters), it also shares the ports namespace with the node. The ports it binds to might conflict with ports of other tools installed in the cluster that does similar things, and in this case data collection will fail to start with error: "listen tcp 0.0.0.0:8888: bind: address already in use". The collectors currently expose and scrape their own metrics on port `8888` which is good candidate for collistions. This PR: - makes the default port for own telemetry endpoint for node collector `55682` to have low chance of collision with something that already runs on the node (instead of using `8888` as default). - Adds an option to odigos config to set the port, in case one needs it hard-coded to a specific value to manage the public ports on the node). The port to use is written on the collectors group CRD by the scheduler to abstract away the default and odigos config resolving, and allow the resolved value to be more easer to consume. For the cluster collector, the collector runs as deployment with k8s service, thus we can safely use `8888` without colliding with anything else --- .../crd/bases/odigos.io_collectorsgroups.yaml | 7 + .../odigos/v1alpha1/collectorsgroupspec.go | 11 +- api/odigos/v1alpha1/collectorsgroup_types.go | 4 + .../controllers/collectorsgroup_controller.go | 27 +-- .../controllers/datacollection/configmap.go | 16 +- autoscaler/controllers/gateway/configmap.go | 22 +- .../controllers/gateway/configmap_test.go | 14 +- autoscaler/controllers/gateway/service.go | 6 +- common/odigos_config.go | 7 + helm/odigos/templates/odigos-config-cm.yaml | 6 + helm/odigos/values.yaml | 5 + k8sutils/pkg/consts/consts.go | 7 +- k8sutils/pkg/utils/collectorgroup_util.go | 15 +- .../controllers/collectorgroups/cluster.go | 11 +- .../collectorgroups/datacollection.go | 20 +- .../controllers/collectorsgroup_controller.go | 113 ----------- .../controllers/destination_controller.go | 18 +- .../instrumentationconfig_controller.go | 81 -------- .../controllers/nodecollector_controller.go | 188 ++++++++++++++++++ scheduler/go.mod | 4 +- scheduler/main.go | 28 ++- 21 files changed, 331 insertions(+), 279 deletions(-) delete mode 100644 scheduler/controllers/collectorsgroup_controller.go delete mode 100644 scheduler/controllers/instrumentationconfig_controller.go create mode 100644 scheduler/controllers/nodecollector_controller.go diff --git a/api/config/crd/bases/odigos.io_collectorsgroups.yaml b/api/config/crd/bases/odigos.io_collectorsgroups.yaml index 5628d2883..0fc09ae44 100644 --- a/api/config/crd/bases/odigos.io_collectorsgroups.yaml +++ b/api/config/crd/bases/odigos.io_collectorsgroups.yaml @@ -42,12 +42,19 @@ spec: spec: description: CollectorsGroupSpec defines the desired state of Collector properties: + collectorOwnMetricsPort: + description: |- + The port to use for exposing the collector's own metrics as a prometheus endpoint. + This can be used to resolve conflicting ports when a collector is using the host network. + format: int32 + type: integer role: enum: - CLUSTER_GATEWAY - NODE_COLLECTOR type: string required: + - collectorOwnMetricsPort - role type: object status: diff --git a/api/generated/odigos/applyconfiguration/odigos/v1alpha1/collectorsgroupspec.go b/api/generated/odigos/applyconfiguration/odigos/v1alpha1/collectorsgroupspec.go index 29d343f93..776732e4a 100644 --- a/api/generated/odigos/applyconfiguration/odigos/v1alpha1/collectorsgroupspec.go +++ b/api/generated/odigos/applyconfiguration/odigos/v1alpha1/collectorsgroupspec.go @@ -24,7 +24,8 @@ import ( // CollectorsGroupSpecApplyConfiguration represents a declarative configuration of the CollectorsGroupSpec type for use // with apply. type CollectorsGroupSpecApplyConfiguration struct { - Role *v1alpha1.CollectorsGroupRole `json:"role,omitempty"` + Role *v1alpha1.CollectorsGroupRole `json:"role,omitempty"` + CollectorOwnMetricsPort *int32 `json:"collectorOwnMetricsPort,omitempty"` } // CollectorsGroupSpecApplyConfiguration constructs a declarative configuration of the CollectorsGroupSpec type for use with @@ -40,3 +41,11 @@ func (b *CollectorsGroupSpecApplyConfiguration) WithRole(value v1alpha1.Collecto b.Role = &value return b } + +// WithCollectorOwnMetricsPort sets the CollectorOwnMetricsPort field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CollectorOwnMetricsPort field is set to the value of the last call. +func (b *CollectorsGroupSpecApplyConfiguration) WithCollectorOwnMetricsPort(value int32) *CollectorsGroupSpecApplyConfiguration { + b.CollectorOwnMetricsPort = &value + return b +} diff --git a/api/odigos/v1alpha1/collectorsgroup_types.go b/api/odigos/v1alpha1/collectorsgroup_types.go index 3a9787de2..e04fc37ca 100644 --- a/api/odigos/v1alpha1/collectorsgroup_types.go +++ b/api/odigos/v1alpha1/collectorsgroup_types.go @@ -33,6 +33,10 @@ const ( // CollectorsGroupSpec defines the desired state of Collector type CollectorsGroupSpec struct { Role CollectorsGroupRole `json:"role"` + + // The port to use for exposing the collector's own metrics as a prometheus endpoint. + // This can be used to resolve conflicting ports when a collector is using the host network. + CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort"` } // CollectorsGroupStatus defines the observed state of Collector diff --git a/autoscaler/controllers/collectorsgroup_controller.go b/autoscaler/controllers/collectorsgroup_controller.go index 2e8726849..f46ec50bd 100644 --- a/autoscaler/controllers/collectorsgroup_controller.go +++ b/autoscaler/controllers/collectorsgroup_controller.go @@ -22,7 +22,6 @@ import ( odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers/datacollection" "github.com/odigos-io/odigos/autoscaler/controllers/gateway" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/log" @@ -32,28 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type onlyCreatePredicate struct { - predicate.Funcs -} - -func (i *onlyCreatePredicate) Create(e event.CreateEvent) bool { - return true -} - -func (i *onlyCreatePredicate) Update(e event.UpdateEvent) bool { - return false -} - -func (i *onlyCreatePredicate) Delete(e event.DeleteEvent) bool { - return false -} - -func (i *onlyCreatePredicate) Generic(e event.GenericEvent) bool { - return false -} - -var _ predicate.Predicate = &onlyCreatePredicate{} - // CollectorsGroupReconciler reconciles a CollectorsGroup object type CollectorsGroupReconciler struct { client.Client @@ -102,6 +79,8 @@ func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Requ func (r *CollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&odigosv1.CollectorsGroup{}). - WithEventFilter(&onlyCreatePredicate{}). + // we assume everything in the collectorsgroup spec is the configuration for the collectors to generate. + // thus, we need to monitor any change to the spec which is what the generation field is for. + WithEventFilter(&predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/autoscaler/controllers/datacollection/configmap.go b/autoscaler/controllers/datacollection/configmap.go index c5fd60505..49c4c7515 100644 --- a/autoscaler/controllers/datacollection/configmap.go +++ b/autoscaler/controllers/datacollection/configmap.go @@ -98,7 +98,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, datacollection *odigosv1.CollectorsGroup, scheme *runtime.Scheme, setTracesLoadBalancer bool, disableNameProcessor bool) (*v1.ConfigMap, error) { - cmData, err := calculateConfigMapData(apps, dests, processors, setTracesLoadBalancer, disableNameProcessor) + cmData, err := calculateConfigMapData(datacollection, apps, dests, processors, setTracesLoadBalancer, disableNameProcessor) if err != nil { return nil, err } @@ -124,16 +124,16 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig return &desired, nil } -func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, +func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) { + ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort + empty := struct{}{} processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors)) - if errs != nil { - for name, err := range errs { - log.Log.V(0).Info(err.Error(), "processor", name) - } + for name, err := range errs { + log.Log.V(0).Info(err.Error(), "processor", name) } if !disableNameProcessor { @@ -214,7 +214,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o "scrape_interval": "10s", "static_configs": []config.GenericMap{ { - "targets": []string{"127.0.0.1:8888"}, + "targets": []string{fmt.Sprintf("127.0.0.1:%d", ownMetricsPort)}, }, }, "metric_relabel_configs": []config.GenericMap{ @@ -247,7 +247,7 @@ func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *o Extensions: []string{"health_check"}, Telemetry: config.Telemetry{ Metrics: config.GenericMap{ - "address": "0.0.0.0:8888", + "address": fmt.Sprintf("0.0.0.0:%d", ownMetricsPort), }, Resource: map[string]*string{ // The collector add "otelcol" as a service name, so we need to remove it diff --git a/autoscaler/controllers/gateway/configmap.go b/autoscaler/controllers/gateway/configmap.go index 61a28e857..ab9a23af2 100644 --- a/autoscaler/controllers/gateway/configmap.go +++ b/autoscaler/controllers/gateway/configmap.go @@ -28,12 +28,12 @@ const ( ) var ( - errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline") + errNoPipelineConfigured = errors.New("no pipeline was configured, cannot add self telemetry pipeline") errNoReceiversConfigured = errors.New("no receivers were configured, cannot add self telemetry pipeline") errNoExportersConfigured = errors.New("no exporters were configured, cannot add self telemetry pipeline") ) -func addSelfTelemetryPipeline(c *config.Config) error { +func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error { if c.Service.Pipelines == nil { return errNoPipelineConfigured } @@ -47,18 +47,18 @@ func addSelfTelemetryPipeline(c *config.Config) error { "config": config.GenericMap{ "scrape_configs": []config.GenericMap{ { - "job_name": "otelcol", + "job_name": "otelcol", "scrape_interval": "10s", "static_configs": []config.GenericMap{ { - "targets": []string{"127.0.0.1:8888"}, + "targets": []string{fmt.Sprintf("127.0.0.1:%d", ownTelemetryPort)}, }, }, "metric_relabel_configs": []config.GenericMap{ { "source_labels": []string{"__name__"}, - "regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)", - "action": "keep", + "regex": "(.*odigos.*|^otelcol_processor_accepted.*|^otelcol_exporter_sent.*)", + "action": "keep", }, }, }, @@ -91,13 +91,13 @@ func addSelfTelemetryPipeline(c *config.Config) error { }, } c.Service.Pipelines["metrics/otelcol"] = config.Pipeline{ - Receivers: []string{"prometheus/self-metrics"}, + Receivers: []string{"prometheus/self-metrics"}, Processors: []string{"resource/pod-name"}, - Exporters: []string{"otlp/odigos-own-telemetry-ui"}, + Exporters: []string{"otlp/odigos-own-telemetry-ui"}, } c.Service.Telemetry.Metrics = config.GenericMap{ - "address": "0.0.0.0:8888", + "address": fmt.Sprintf("0.0.0.0:%d", ownTelemetryPort), } for pipelineName, pipeline := range c.Service.Pipelines { @@ -126,7 +126,9 @@ func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.Proc common.ToExporterConfigurerArray(dests), common.ToProcessorConfigurerArray(processors), memoryLimiterConfiguration, - addSelfTelemetryPipeline, + func(c *config.Config) error { + return addSelfTelemetryPipeline(c, gateway.Spec.CollectorOwnMetricsPort) + }, ) if err != nil { logger.Error(err, "Failed to calculate config") diff --git a/autoscaler/controllers/gateway/configmap_test.go b/autoscaler/controllers/gateway/configmap_test.go index b4bbe628c..3a037adb8 100644 --- a/autoscaler/controllers/gateway/configmap_test.go +++ b/autoscaler/controllers/gateway/configmap_test.go @@ -1,9 +1,11 @@ package gateway import ( + "fmt" "testing" "github.com/odigos-io/odigos/common/config" + "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/stretchr/testify/assert" ) @@ -12,7 +14,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) { cases := []struct { name string cfg *config.Config - err error + err error }{ { name: "no pipeline", @@ -66,7 +68,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) { }, Processors: config.GenericMap{ "memory_limiter": config.GenericMap{ - "check_interval": "1s", + "check_interval": "1s", }, "resource/odigos-version": config.GenericMap{ "attributes": []config.GenericMap{ @@ -98,7 +100,7 @@ func TestAddSelfTelemetryPipeline(t *testing.T) { }, }, } - + for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { c := tc.cfg @@ -115,15 +117,15 @@ func TestAddSelfTelemetryPipeline(t *testing.T) { assert.Equal(t, []string{"prometheus"}, c.Service.Pipelines["metrics/otelcol"].Receivers) assert.Equal(t, []string{"resource/pod-name"}, c.Service.Pipelines["metrics/otelcol"].Processors) assert.Equal(t, []string{"otlp/ui"}, c.Service.Pipelines["metrics/otelcol"].Exporters) - assert.Equal(t, "0.0.0.0:8888", c.Service.Telemetry.Metrics["address"]) + assert.Equal(t, fmt.Sprintf("0.0.0.0:%d", consts.OdigosNodeCollectorOwnTelemetryPortDefault), c.Service.Telemetry.Metrics["address"]) for pipelineName, pipeline := range c.Service.Pipelines { if pipelineName == "metrics/otelcol" { assert.NotContains(t, pipeline.Processors, "odigostrafficmetrics") } else { - assert.Equal(t, pipeline.Processors[len(pipeline.Processors) - 1], "odigostrafficmetrics") + assert.Equal(t, pipeline.Processors[len(pipeline.Processors)-1], "odigostrafficmetrics") } } }) } -} \ No newline at end of file +} diff --git a/autoscaler/controllers/gateway/service.go b/autoscaler/controllers/gateway/service.go index e747c2f57..e0c168f2c 100644 --- a/autoscaler/controllers/gateway/service.go +++ b/autoscaler/controllers/gateway/service.go @@ -54,7 +54,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien } result, err := controllerutil.CreateOrPatch(ctx, c, gatewaySvc, func() error { - updateGatewaySvc(gatewaySvc) + updateGatewaySvc(gatewaySvc, gateway) return nil }) @@ -67,7 +67,7 @@ func syncService(gateway *odigosv1.CollectorsGroup, ctx context.Context, c clien return gatewaySvc, nil } -func updateGatewaySvc(svc *v1.Service) { +func updateGatewaySvc(svc *v1.Service, collectorsGroup *odigosv1.CollectorsGroup) { svc.Spec.Ports = []v1.ServicePort{ { Name: "otlp", @@ -83,7 +83,7 @@ func updateGatewaySvc(svc *v1.Service) { }, { Name: "metrics", - Port: 8888, + Port: collectorsGroup.Spec.CollectorOwnMetricsPort, }, } diff --git a/common/odigos_config.go b/common/odigos_config.go index d10427edb..5509be2ce 100644 --- a/common/odigos_config.go +++ b/common/odigos_config.go @@ -2,6 +2,12 @@ package common type ProfileName string +type CollectorNodeConfiguration struct { + // The port to use for exposing the collector's own metrics as a prometheus endpoint. + // This can be used to resolve conflicting ports when a collector is using the host network. + CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort,omitempty"` +} + type CollectorGatewayConfiguration struct { // RequestMemoryMiB is the memory request for the cluster gateway collector deployment. // it will be embedded in the deployment as a resource request of the form "memory: Mi" @@ -38,6 +44,7 @@ type OdigosConfiguration struct { AutoscalerImage string `json:"autoscalerImage,omitempty"` DefaultSDKs map[ProgrammingLanguage]OtelSdk `json:"defaultSDKs,omitempty"` CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"` + CollectorNode *CollectorNodeConfiguration `json:"collectorNode,omitempty"` Profiles []ProfileName `json:"profiles,omitempty"` // this is internal currently, and is not exposed on the CLI / helm diff --git a/helm/odigos/templates/odigos-config-cm.yaml b/helm/odigos/templates/odigos-config-cm.yaml index 024c3287e..1ea0ec880 100644 --- a/helm/odigos/templates/odigos-config-cm.yaml +++ b/helm/odigos/templates/odigos-config-cm.yaml @@ -30,6 +30,12 @@ data: goMemLimitMiB: {{ . }} {{- end }} {{- end }} + {{- if .Values.collectorNode }} + collectorNode: + {{- with .Values.collectorNode.collectorOwnMetricsPort }} + collectorOwnMetricsPort: {{ . }} + {{- end }} + {{- end }} instrumentorImage: {{ .Values.instrumentor.image.repository }} telemetryEnabled: {{ .Values.telemetry.enabled }} openshiftEnabled: {{ .Values.openshift.enabled }} diff --git a/helm/odigos/values.yaml b/helm/odigos/values.yaml index 01651fcc6..e407e5dcb 100644 --- a/helm/odigos/values.yaml +++ b/helm/odigos/values.yaml @@ -38,6 +38,11 @@ collectorGateway: # if not specified, it will be set to 80% of the hard limit of the memory limiter. goMemLimitMiB: 340 +collectorNode: + # The port to use for exposing the collector's own metrics as a prometheus endpoint. + # This can be used to resolve conflicting ports when a collector is using the host network. + collectorOwnMetricsPort: 55682 + autoscaler: image: repository: keyval/odigos-autoscaler diff --git a/k8sutils/pkg/consts/consts.go b/k8sutils/pkg/consts/consts.go index f27b4c7c5..052b0fc44 100644 --- a/k8sutils/pkg/consts/consts.go +++ b/k8sutils/pkg/consts/consts.go @@ -24,9 +24,10 @@ const ( ) const ( - OdigosNodeCollectorDaemonSetName = "odigos-data-collection" - OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName - OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName + OdigosNodeCollectorDaemonSetName = "odigos-data-collection" + OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName + OdigosNodeCollectorCollectorGroupName = OdigosNodeCollectorDaemonSetName + OdigosNodeCollectorOwnTelemetryPortDefault = int32(55682) OdigosNodeCollectorConfigMapKey = "conf" // this key is different than the cluster collector value. not sure why ) diff --git a/k8sutils/pkg/utils/collectorgroup_util.go b/k8sutils/pkg/utils/collectorgroup_util.go index eff0b9122..b62b6e145 100644 --- a/k8sutils/pkg/utils/collectorgroup_util.go +++ b/k8sutils/pkg/utils/collectorgroup_util.go @@ -2,15 +2,24 @@ package utils import ( "context" + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) -func CreateCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error { - log.FromContext(ctx).Info("Creating collector group", "collectorGroupName", collectorGroup.Name) - return c.Create(ctx, collectorGroup) +func ApplyCollectorGroup(ctx context.Context, c client.Client, collectorGroup *odigosv1.CollectorsGroup) error { + logger := log.FromContext(ctx) + logger.Info("Applying collector group", "collectorGroupName", collectorGroup.Name) + + err := c.Patch(ctx, collectorGroup, client.Apply, client.ForceOwnership, client.FieldOwner("scheduler")) + if err != nil { + logger.Error(err, "Failed to apply collector group") + return err + } + + return nil } func GetCollectorGroup(ctx context.Context, c client.Client, namespace string, collectorGroupName string) (*odigosv1.CollectorsGroup, error) { diff --git a/scheduler/controllers/collectorgroups/cluster.go b/scheduler/controllers/collectorgroups/cluster.go index e528bb84b..864e7c590 100644 --- a/scheduler/controllers/collectorgroups/cluster.go +++ b/scheduler/controllers/collectorgroups/cluster.go @@ -6,14 +6,23 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// The cluster gateway collector runs as a deployment and the pod is exposed as a service. +// Thus it cannot collide with other ports on the same node, and we can use an handy default port. +const ClusterCollectorDefaultOwnMetricsPort = 8888 + func NewClusterCollectorGroup(namespace string) *odigosv1.CollectorsGroup { return &odigosv1.CollectorsGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CollectorsGroup", + APIVersion: "odigos.io/v1alpha1", + }, ObjectMeta: metav1.ObjectMeta{ Name: consts.OdigosClusterCollectorCollectorGroupName, Namespace: namespace, }, Spec: odigosv1.CollectorsGroupSpec{ - Role: odigosv1.CollectorsGroupRoleClusterGateway, + Role: odigosv1.CollectorsGroupRoleClusterGateway, + CollectorOwnMetricsPort: ClusterCollectorDefaultOwnMetricsPort, }, } } diff --git a/scheduler/controllers/collectorgroups/datacollection.go b/scheduler/controllers/collectorgroups/datacollection.go index b87da7b6f..3d3631d84 100644 --- a/scheduler/controllers/collectorgroups/datacollection.go +++ b/scheduler/controllers/collectorgroups/datacollection.go @@ -2,23 +2,35 @@ package collectorgroups import ( odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/common" "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func NewNodeCollectorGroup() *odigosv1.CollectorsGroup { +func NewNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.CollectorsGroup { + + ownMetricsPort := consts.OdigosNodeCollectorOwnTelemetryPortDefault + if odigosConfig.CollectorNode != nil && odigosConfig.CollectorNode.CollectorOwnMetricsPort != 0 { + ownMetricsPort = odigosConfig.CollectorNode.CollectorOwnMetricsPort + } + return &odigosv1.CollectorsGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CollectorsGroup", + APIVersion: "odigos.io/v1alpha1", + }, ObjectMeta: metav1.ObjectMeta{ Name: consts.OdigosNodeCollectorDaemonSetName, Namespace: env.GetCurrentNamespace(), }, Spec: odigosv1.CollectorsGroupSpec{ - Role: odigosv1.CollectorsGroupRoleNodeCollector, + Role: odigosv1.CollectorsGroupRoleNodeCollector, + CollectorOwnMetricsPort: ownMetricsPort, }, } } -func ShouldCreateNodeCollectorGroup(gatewayReady bool, dataCollectionExists bool, numberofInstrumentedApps int) bool { - return gatewayReady && !dataCollectionExists && numberofInstrumentedApps > 0 +func ShouldHaveNodeCollectorGroup(gatewayReady bool, numberofInstrumentedApps int) bool { + return gatewayReady && numberofInstrumentedApps > 0 } diff --git a/scheduler/controllers/collectorsgroup_controller.go b/scheduler/controllers/collectorsgroup_controller.go deleted file mode 100644 index 4dfa5eb98..000000000 --- a/scheduler/controllers/collectorsgroup_controller.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2022. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - - odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" - "github.com/odigos-io/odigos/k8sutils/pkg/utils" - "github.com/odigos-io/odigos/scheduler/controllers/collectorgroups" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -// CollectorsGroupReconciler reconciles a CollectorsGroup object -type CollectorsGroupReconciler struct { - client.Client - Scheme *runtime.Scheme -} - -// up until v1.0.31, the collectors group role names were "GATEWAY" and "DATA_COLLECTION". -// in v1.0.32, the role names were changed to "CLUSTER_GATEWAY" and "NODE_COLLECTOR", -// due to adding the Processor CRD which uses these role names. -// the new names are more descriptive and are preparations for future roles. -// unfortunately, the role names are used in the collectorgroup CR, which needs to be updated -// when a user upgrades from <=v1.0.31 to >=v1.0.32. -// this function is responsible to do this update. -// once we drop support for <=v1.0.31, we can remove this function. -func (r *CollectorsGroupReconciler) applyNewCollectorRoleNames(ctx context.Context, collectorGroup *odigosv1.CollectorsGroup) error { - if collectorGroup.Spec.Role == "GATEWAY" { - logger := log.FromContext(ctx) - logger.Info("updating collector group role name", "old", "GATEWAY", "new", "CLUSTER_GATEWAY") - collectorGroup.Spec.Role = odigosv1.CollectorsGroupRoleClusterGateway - return r.Update(ctx, collectorGroup) - } - if collectorGroup.Spec.Role == "DATA_COLLECTION" { - logger := log.FromContext(ctx) - logger.Info("updating collector group role name", "old", "DATA_COLLECTION", "new", "NODE_COLLECTOR") - collectorGroup.Spec.Role = odigosv1.CollectorsGroupRoleNodeCollector - return r.Update(ctx, collectorGroup) - } - return nil -} - -// +kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=odigos.io,resources=collectorsgroups/finalizers,verbs=update -func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - var collectorGroups odigosv1.CollectorsGroupList - err := r.List(ctx, &collectorGroups, client.InNamespace(req.Namespace)) - if err != nil { - logger.Error(err, "failed to list collectors groups") - return ctrl.Result{}, err - } - - gatewayReady := false - dataCollectionExists := false - for _, collectorGroup := range collectorGroups.Items { - err := r.applyNewCollectorRoleNames(ctx, &collectorGroup) - if err != nil { - logger.Error(err, "failed to apply new collector role names") - return ctrl.Result{}, err - } - - if collectorGroup.Spec.Role == odigosv1.CollectorsGroupRoleClusterGateway && collectorGroup.Status.Ready { - gatewayReady = true - } - - if collectorGroup.Spec.Role == odigosv1.CollectorsGroupRoleNodeCollector { - dataCollectionExists = true - } - } - - var instApps odigosv1.InstrumentationConfigList - if err = r.List(ctx, &instApps); err != nil { - logger.Error(err, "failed to list InstrumentationConfigs") - return ctrl.Result{}, err - } - - if collectorgroups.ShouldCreateNodeCollectorGroup(gatewayReady, dataCollectionExists, len(instApps.Items)) { - err = utils.CreateCollectorGroup(ctx, r.Client, collectorgroups.NewNodeCollectorGroup()) - if err != nil { - logger.Error(err, "failed to create data collection collector group") - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *CollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&odigosv1.CollectorsGroup{}). - Complete(r) -} diff --git a/scheduler/controllers/destination_controller.go b/scheduler/controllers/destination_controller.go index a1af2d6ae..679215a71 100644 --- a/scheduler/controllers/destination_controller.go +++ b/scheduler/controllers/destination_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "github.com/odigos-io/odigos/k8sutils/pkg/utils" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" @@ -49,22 +50,15 @@ func (r *DestinationReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if len(dests.Items) > 0 { - var collectorGroups odigosv1.CollectorsGroupList - err := r.List(ctx, &collectorGroups, client.InNamespace(req.Namespace)) + logger.V(0).Info("destinations found, syncing cluster collector group") + err := utils.ApplyCollectorGroup(ctx, r.Client, collectorgroups.NewClusterCollectorGroup(req.Namespace)) if err != nil { - logger.Error(err, "failed to list collectors groups") + logger.Error(err, "failed to sync cluster collector group") return ctrl.Result{}, err } - - if len(collectorGroups.Items) == 0 { - logger.V(0).Info("destinations found, but no collectors groups found, creating gateway") - err = utils.CreateCollectorGroup(ctx, r.Client, collectorgroups.NewClusterCollectorGroup(req.Namespace)) - if err != nil { - logger.Error(err, "failed to create gateway") - return ctrl.Result{}, err - } - } } + // once the gateway is created, it is not deleted, even if there are no destinations. + // we might want to re-consider this behavior. return ctrl.Result{}, nil } diff --git a/scheduler/controllers/instrumentationconfig_controller.go b/scheduler/controllers/instrumentationconfig_controller.go deleted file mode 100644 index 096a284ec..000000000 --- a/scheduler/controllers/instrumentationconfig_controller.go +++ /dev/null @@ -1,81 +0,0 @@ -package controllers - -import ( - "context" - - odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" - "github.com/odigos-io/odigos/k8sutils/pkg/consts" - "github.com/odigos-io/odigos/k8sutils/pkg/env" - "github.com/odigos-io/odigos/k8sutils/pkg/utils" - nodeCollectorGroupUtil "github.com/odigos-io/odigos/scheduler/controllers/collectorgroups" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -type InstrumentationConfigReconciler struct { - client.Client - Scheme *runtime.Scheme - ImagePullSecrets []string - OdigosVersion string -} - -func (r *InstrumentationConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - logger.V(0).Info("Reconciling InstrumentationConfig") - - namespace := env.GetCurrentNamespace() - - var instrumentedConfigs odigosv1.InstrumentationConfigList - err := r.List(ctx, &instrumentedConfigs) - if err != nil { - logger.Error(err, "failed to list InstrumentationConfigs") - return ctrl.Result{}, err - } - numberOfInstrumentedApps := len(instrumentedConfigs.Items) - - if numberOfInstrumentedApps == 0 { - if err = utils.DeleteCollectorGroup(ctx, r.Client, namespace, consts.OdigosNodeCollectorCollectorGroupName); err != nil { - return ctrl.Result{}, err - } - } - - clusterCollectorGroup, err := utils.GetCollectorGroup(ctx, r.Client, namespace, consts.OdigosClusterCollectorCollectorGroupName) - if err != nil { - if errors.IsNotFound(err) { - logger.V(3).Info("collector group doesn't exist", "collectorGroupName", clusterCollectorGroup) - return ctrl.Result{}, nil - } - logger.Error(err, "failed to get collector group", "collectorGroupName", consts.OdigosClusterCollectorCollectorGroupName) - return ctrl.Result{}, err - } - - dataCollectionExists := true - _, err = utils.GetCollectorGroup(ctx, r.Client, namespace, consts.OdigosNodeCollectorDaemonSetName) - if err != nil { - if errors.IsNotFound(err) { - dataCollectionExists = false - } else { - logger.Error(err, "failed to get collector group", "collectorGroupName", consts.OdigosNodeCollectorCollectorGroupName) - return ctrl.Result{}, err - } - } - - if nodeCollectorGroupUtil.ShouldCreateNodeCollectorGroup(clusterCollectorGroup.Status.Ready, dataCollectionExists, numberOfInstrumentedApps) { - err = utils.CreateCollectorGroup(ctx, r.Client, nodeCollectorGroupUtil.NewNodeCollectorGroup()) - if err != nil { - logger.Error(err, "failed to create data collection collector group") - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil -} - -func (r *InstrumentationConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&odigosv1.InstrumentationConfig{}). - Complete(r) -} diff --git a/scheduler/controllers/nodecollector_controller.go b/scheduler/controllers/nodecollector_controller.go new file mode 100644 index 000000000..f58c87652 --- /dev/null +++ b/scheduler/controllers/nodecollector_controller.go @@ -0,0 +1,188 @@ +package controllers + +import ( + "context" + + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + consts "github.com/odigos-io/odigos/common/consts" + k8sutilsconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts" + "github.com/odigos-io/odigos/k8sutils/pkg/env" + "github.com/odigos-io/odigos/k8sutils/pkg/utils" + nodeCollectorGroupUtil "github.com/odigos-io/odigos/scheduler/controllers/collectorgroups" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type NodeCollectorsGroupReconciler struct { + client.Client + Scheme *runtime.Scheme + ImagePullSecrets []string + OdigosVersion string +} + +// makes sure that the controller only reacts to events related to the odigos-config configmap +// and does not trigger on other configmaps +type odigosConfigPredicate struct{} + +func (i *odigosConfigPredicate) Create(e event.CreateEvent) bool { + return e.Object.GetName() == consts.OdigosConfigurationName +} + +func (i *odigosConfigPredicate) Update(e event.UpdateEvent) bool { + return e.ObjectNew.GetName() == consts.OdigosConfigurationName +} + +func (i *odigosConfigPredicate) Delete(e event.DeleteEvent) bool { + return e.Object.GetName() == consts.OdigosConfigurationName +} + +func (i *odigosConfigPredicate) Generic(e event.GenericEvent) bool { + return e.Object.GetName() == consts.OdigosConfigurationName +} + +var _ predicate.Predicate = &odigosConfigPredicate{} + +// For instrumentation configs, we only care if the object exists or not, since we count if there are more than 0. +// thus, we can filter out all updates events which will not affect reconciliation +type existingPredicate struct{} + +func (i *existingPredicate) Create(e event.CreateEvent) bool { + return true +} + +func (i *existingPredicate) Update(e event.UpdateEvent) bool { + return false +} + +func (i *existingPredicate) Delete(e event.DeleteEvent) bool { + return true +} + +func (i *existingPredicate) Generic(e event.GenericEvent) bool { + return false +} + +var _ predicate.Predicate = &existingPredicate{} + +// this predicate filters collectorsgroup events. +// it will only forward events that are: +// 1. for cluster collector group +// 2. If the cluster collector group was not ready and now it is ready +type clusterCollectorBecomesReadyPredicate struct{} + +func (i *clusterCollectorBecomesReadyPredicate) Create(e event.CreateEvent) bool { + return false +} + +func (i *clusterCollectorBecomesReadyPredicate) Update(e event.UpdateEvent) bool { + if e.ObjectNew.GetName() != k8sutilsconsts.OdigosClusterCollectorCollectorGroupName { + return false + } + + oldCollectorGroup, ok := e.ObjectOld.(*odigosv1.CollectorsGroup) + if !ok { + return false + } + newCollectorGroup, ok := e.ObjectNew.(*odigosv1.CollectorsGroup) + if !ok { + return false + } + + return !oldCollectorGroup.Status.Ready && newCollectorGroup.Status.Ready +} + +func (i *clusterCollectorBecomesReadyPredicate) Delete(e event.DeleteEvent) bool { + return false +} + +func (i *clusterCollectorBecomesReadyPredicate) Generic(e event.GenericEvent) bool { + return false +} + +var _ predicate.Predicate = &clusterCollectorBecomesReadyPredicate{} + +func (r *NodeCollectorsGroupReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + namespace := env.GetCurrentNamespace() + + var instrumentedConfigs odigosv1.InstrumentationConfigList + err := r.List(ctx, &instrumentedConfigs) + if err != nil { + logger.Error(err, "failed to list InstrumentationConfigs") + return ctrl.Result{}, err + } + numberOfInstrumentedApps := len(instrumentedConfigs.Items) + + if numberOfInstrumentedApps == 0 { + if err = utils.DeleteCollectorGroup(ctx, r.Client, namespace, k8sutilsconsts.OdigosNodeCollectorCollectorGroupName); err != nil { + return ctrl.Result{}, err + } + } + + clusterCollectorGroup, err := utils.GetCollectorGroup(ctx, r.Client, namespace, k8sutilsconsts.OdigosClusterCollectorCollectorGroupName) + if err != nil { + if errors.IsNotFound(err) { + logger.V(3).Info("collector group doesn't exist", "collectorGroupName", clusterCollectorGroup) + return ctrl.Result{}, nil + } + logger.Error(err, "failed to get collector group", "collectorGroupName", k8sutilsconsts.OdigosClusterCollectorCollectorGroupName) + return ctrl.Result{}, err + } + + odigosConfig, err := utils.GetCurrentOdigosConfig(ctx, r.Client) + if err != nil { + logger.Error(err, "failed to get odigos config") + return ctrl.Result{}, err + } + + if nodeCollectorGroupUtil.ShouldHaveNodeCollectorGroup(clusterCollectorGroup.Status.Ready, numberOfInstrumentedApps) { + err = utils.ApplyCollectorGroup(ctx, r.Client, nodeCollectorGroupUtil.NewNodeCollectorGroup(odigosConfig)) + if err != nil { + logger.Error(err, "failed to create data collection collector group") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +func (r *NodeCollectorsGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { + + // here we enumerate the inputs events that the controller when data collection collector group should be updated + + err := ctrl.NewControllerManagedBy(mgr). + For(&odigosv1.InstrumentationConfig{}). + Named("nodecollectorgroup-instrumentationconfig"). + WithEventFilter(&existingPredicate{}). + Complete(r) + if err != nil { + return err + } + + err = ctrl.NewControllerManagedBy(mgr). + For(&corev1.ConfigMap{}). + Named("nodecollectorgroup-odigosconfig"). + WithEventFilter(&odigosConfigPredicate{}). + Complete(r) + if err != nil { + return err + } + + err = ctrl.NewControllerManagedBy(mgr). + For(&odigosv1.CollectorsGroup{}). + Named("nodecollectorgroup-collectorsgroup"). + WithEventFilter(&clusterCollectorBecomesReadyPredicate{}). + Complete(r) + if err != nil { + return err + } + + return nil +} diff --git a/scheduler/go.mod b/scheduler/go.mod index dcc459f56..ef8bb2b33 100644 --- a/scheduler/go.mod +++ b/scheduler/go.mod @@ -5,10 +5,12 @@ go 1.22.0 require ( github.com/go-logr/zapr v1.3.0 github.com/odigos-io/odigos/api v0.0.0 + github.com/odigos-io/odigos/common v0.0.0 github.com/odigos-io/odigos/k8sutils v0.0.0 github.com/odigos-io/opentelemetry-zap-bridge v0.0.5 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.34.2 + k8s.io/api v0.31.0 k8s.io/apimachinery v0.31.0 k8s.io/client-go v0.31.0 sigs.k8s.io/controller-runtime v0.19.0 @@ -50,7 +52,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect - github.com/odigos-io/odigos/common v0.0.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -82,7 +83,6 @@ require ( gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.31.0 // indirect k8s.io/apiextensions-apiserver v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect diff --git a/scheduler/main.go b/scheduler/main.go index 9ae9e330c..21c1ad2a1 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -22,6 +22,8 @@ import ( "github.com/go-logr/zapr" bridge "github.com/odigos-io/opentelemetry-zap-bridge" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -29,6 +31,10 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/common/consts" + "github.com/odigos-io/odigos/k8sutils/pkg/env" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -71,11 +77,24 @@ func main() { logger := zapr.NewLogger(zapLogger) ctrl.SetLogger(logger) + odigosNs := env.GetCurrentNamespace() + nsSelector := client.InNamespace(odigosNs).AsSelector() + nameSelector := fields.OneTermEqualSelector("metadata.name", consts.OdigosConfigurationName) + odigosConfigSelector := fields.AndSelectors(nsSelector, nameSelector) + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ BindAddress: metricsAddr, }, + Cache: cache.Options{ + DefaultTransform: cache.TransformStripManagedFields(), + ByObject: map[client.Object]cache.ByObject{ + &corev1.ConfigMap{}: { + Field: odigosConfigSelector, + }, + }, + }, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "ce024640.odigos.io", @@ -85,13 +104,6 @@ func main() { os.Exit(1) } - if err = (&controllers.CollectorsGroupReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "CollectorsGroup") - os.Exit(1) - } if err = (&controllers.DestinationReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -99,7 +111,7 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Destination") os.Exit(1) } - if err = (&controllers.InstrumentationConfigReconciler{ + if err = (&controllers.NodeCollectorsGroupReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil {