diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index 23d01b3da..b1e5c3518 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -18,7 +18,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) const secretName = "console-serving-cert" @@ -39,10 +38,9 @@ type builder struct { selector map[string]string desired *flowslatest.FlowCollectorSpec imageName string - cWatcher *watchers.CertificatesWatcher } -func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec, cWatcher *watchers.CertificatesWatcher) builder { +func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec) builder { version := helper.ExtractVersion(imageName) return builder{ namespace: ns, @@ -55,7 +53,6 @@ func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec, cW }, desired: desired, imageName: imageName, - cWatcher: cWatcher, } } @@ -236,12 +233,12 @@ func (b *builder) podTemplate(cmDigest string) *corev1.PodTemplateSpec { args := buildArgs(b.desired) if b.desired != nil && b.desired.Loki.TLS.Enable && !b.desired.Loki.TLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts) } statusTLS := helper.GetLokiStatusTLS(&b.desired.Loki) if b.desired != nil && statusTLS.Enable && !statusTLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &statusTLS, lokiStatusCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &statusTLS, lokiStatusCerts) } if helper.LokiUseHostToken(&b.desired.Loki) { diff --git a/controllers/consoleplugin/consoleplugin_reconciler.go b/controllers/consoleplugin/consoleplugin_reconciler.go index 8cde07b81..3a6fe86af 100644 --- a/controllers/consoleplugin/consoleplugin_reconciler.go +++ b/controllers/consoleplugin/consoleplugin_reconciler.go @@ -83,7 +83,7 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC } // Create object builder - builder := newBuilder(ns, r.image, &desired.Spec, r.CertWatcher) + builder := newBuilder(ns, r.image, &desired.Spec) if err := r.reconcilePermissions(ctx, &builder); err != nil { return err @@ -196,10 +196,6 @@ func (r *CPReconciler) reconcileDeployment(ctx context.Context, builder builder, defer report.LogIfNeeded(ctx) newDepl := builder.deployment(cmDigest) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &newDepl.Spec.Template, lokiCerts, lokiStatusCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.deployment) { if err := r.CreateOwned(ctx, newDepl); err != nil { return err diff --git a/controllers/consoleplugin/consoleplugin_test.go b/controllers/consoleplugin/consoleplugin_test.go index e66793285..eda1de52b 100644 --- a/controllers/consoleplugin/consoleplugin_test.go +++ b/controllers/consoleplugin/consoleplugin_test.go @@ -13,7 +13,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" promConfig "github.com/prometheus/common/config" ) @@ -28,7 +27,6 @@ var testResources = corev1.ResourceRequirements{ corev1.ResourceMemory: resource.MustParse("512Mi"), }, } -var certWatcher = watchers.NewCertificatesWatcher() func getPluginConfig() flowslatest.FlowCollectorConsolePlugin { return flowslatest.FlowCollectorConsolePlugin{ @@ -111,7 +109,7 @@ func TestContainerUpdateCheck(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://loki:3100/", TenantID: "netobserv"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) old := builder.deployment("digest") new := builder.deployment("digest") report := helper.NewChangeReport("") @@ -163,7 +161,7 @@ func TestContainerUpdateCheck(t *testing.T) { }, }} spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -173,7 +171,7 @@ func TestContainerUpdateCheck(t *testing.T) { //new loki cert name loki.TLS.CACert.Name = "cm-name-2" spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -183,7 +181,7 @@ func TestContainerUpdateCheck(t *testing.T) { //test again no change loki.TLS.CACert.Name = "cm-name-2" spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.False(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -195,7 +193,7 @@ func TestContainerUpdateCheck(t *testing.T) { loki.StatusTLS.Enable = true spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -210,7 +208,7 @@ func TestContainerUpdateCheck(t *testing.T) { } spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -226,7 +224,7 @@ func TestContainerUpdateCheck(t *testing.T) { } spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -264,7 +262,7 @@ func TestBuiltService(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://foo:1234"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) newService := builder.service(nil) report := helper.NewChangeReport("") assert.Equal(serviceNeedsUpdate(newService, &plugin, &report), false) @@ -277,7 +275,7 @@ func TestLabels(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://foo:1234"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) // Deployment depl := builder.deployment("digest") diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index 98e637f10..9c233d1ea 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -131,11 +131,6 @@ func (c *AgentController) Reconcile( } desired := c.desired(target) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := c.client.CertWatcher.AnnotatePod(ctx, c.client, &desired.Spec.Template, kafkaCerts); err != nil { - return err - } - switch c.requiredAction(current, desired) { case actionCreate: rlog.Info("action: create agent") @@ -175,7 +170,7 @@ func (c *AgentController) desired(coll *flowslatest.FlowCollector) *v1.DaemonSet if helper.UseKafka(&coll.Spec) && coll.Spec.Kafka.TLS.Enable { // NOTE: secrets need to be copied from the base netobserv namespace to the privileged one. // This operation must currently be performed manually (run "make fix-ebpf-kafka-tls"). It could be automated here. - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &coll.Spec.Kafka.TLS, kafkaCerts, c.client.CertWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &coll.Spec.Kafka.TLS, kafkaCerts) } return &v1.DaemonSet{ diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go index 8309bf2a0..fe5b0d7d3 100644 --- a/controllers/flowcollector_controller.go +++ b/controllers/flowcollector_controller.go @@ -368,8 +368,7 @@ func (r *FlowCollectorReconciler) finalize(ctx context.Context, desired *flowsla func (r *FlowCollectorReconciler) newClientHelper(desired *flowslatest.FlowCollector) reconcilers.ClientHelper { return reconcilers.ClientHelper{ - CertWatcher: r.certWatcher, - Client: r.Client, + Client: r.Client, SetControllerReference: func(obj client.Object) error { return ctrl.SetControllerReference(desired, obj, r.Scheme) }, diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index 30c975507..6fbacf6c2 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -593,9 +593,8 @@ func flowCollectorControllerSpecs() { }) }) - Context("Using and watching certificates", func() { + Context("Using certificates", func() { flpDS := appsv1.DaemonSet{} - var certStamp1, certStamp2 string It("Should update Loki to use TLS", func() { // Create CM certificate Expect(k8sClient.Create(ctx, &v1.ConfigMap{ @@ -622,33 +621,8 @@ func flowCollectorControllerSpecs() { if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { return err } - certStamp1 = flpDS.Spec.Template.Annotations["flows.netobserv.io/cert-loki-certs-ca"] - return certStamp1 - }, timeout, interval).Should(Not(BeEmpty())) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(2)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) - }) - - It("Should watch certificate update", func() { - By("Updating certificate") - Expect(k8sClient.Update(ctx, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki-ca", - Namespace: operatorNamespace, - }, - Data: map[string]string{"test": "test"}, - })).Should(Succeed()) - - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - certStamp2 = flpDS.Spec.Template.Annotations["flows.netobserv.io/cert-loki-certs-ca"] - return certStamp2 - }, timeout, interval).Should(Not(Equal(certStamp1))) - Expect(certStamp2).To(Not(BeEmpty())) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(2)) + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(2)) Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) }) @@ -663,9 +637,8 @@ func flowCollectorControllerSpecs() { if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { return err } - return flpDS.Spec.Template.Annotations - }, timeout, interval).Should(Not(HaveKey("flows.netobserv.io/cert-loki-certs-ca"))) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(1)) + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) }) }) diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index b654c5c19..44cb8fbd4 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -24,7 +24,6 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/filters" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) const ( @@ -80,10 +79,9 @@ type builder struct { confKind ConfKind useOpenShiftSCC bool image string - cWatcher *watchers.CertificatesWatcher } -func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck ConfKind, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) builder { +func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck ConfKind, useOpenShiftSCC bool) builder { version := helper.ExtractVersion(image) name := name(ck) var promTLS flowslatest.CertificateReference @@ -112,7 +110,6 @@ func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck Con useOpenShiftSCC: useOpenShiftSCC, promTLS: &promTLS, image: image, - cWatcher: cWatcher, } } @@ -185,12 +182,12 @@ func (b *builder) podTemplate(hasHostPort, hasLokiInterface, hostNetwork bool, c }} if helper.UseKafka(b.desired) && b.desired.Kafka.TLS.Enable { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Kafka.TLS, kafkaCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Kafka.TLS, kafkaCerts) } if hasLokiInterface { if b.desired.Loki.TLS.Enable && !b.desired.Loki.TLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts) } if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) { volumes, volumeMounts = helper.AppendTokenVolume(volumes, volumeMounts, lokiToken, constants.FLPName) @@ -198,7 +195,7 @@ func (b *builder) podTemplate(hasHostPort, hasLokiInterface, hostNetwork bool, c } if b.desired.Processor.Metrics.Server.TLS.Type != flowslatest.ServerTLSDisabled { - volumes, volumeMounts = helper.AppendSingleCertVolumes(volumes, volumeMounts, b.promTLS, promCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendSingleCertVolumes(volumes, volumeMounts, b.promTLS, promCerts) } var envs []corev1.EnvVar diff --git a/controllers/flowlogspipeline/flp_ingest_objects.go b/controllers/flowlogspipeline/flp_ingest_objects.go index 175cfe834..860fc2b07 100644 --- a/controllers/flowlogspipeline/flp_ingest_objects.go +++ b/controllers/flowlogspipeline/flp_ingest_objects.go @@ -10,15 +10,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type ingestBuilder struct { generic builder } -func newIngestBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) ingestBuilder { - gen := newBuilder(ns, image, desired, ConfKafkaIngester, useOpenShiftSCC, cWatcher) +func newIngestBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) ingestBuilder { + gen := newBuilder(ns, image, desired, ConfKafkaIngester, useOpenShiftSCC) return ingestBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_ingest_reconciler.go b/controllers/flowlogspipeline/flp_ingest_reconciler.go index d807ca2c0..5896ff373 100644 --- a/controllers/flowlogspipeline/flp_ingest_reconciler.go +++ b/controllers/flowlogspipeline/flp_ingest_reconciler.go @@ -85,7 +85,7 @@ func (r *flpIngesterReconciler) reconcile(ctx context.Context, desired *flowslat return nil } - builder := newIngestBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newIngestBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, err := builder.configMap() if err != nil { return err @@ -147,10 +147,6 @@ func (r *flpIngesterReconciler) reconcileDaemonSet(ctx context.Context, desiredD report := helper.NewChangeReport("FLP DaemonSet") defer report.LogIfNeeded(ctx) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &desiredDS.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.daemonSet) { return r.CreateOwned(ctx, desiredDS) } else if helper.PodChanged(&r.owned.daemonSet.Spec.Template, &desiredDS.Spec.Template, constants.FLPName, &report) { diff --git a/controllers/flowlogspipeline/flp_monolith_objects.go b/controllers/flowlogspipeline/flp_monolith_objects.go index da6941661..92351bd53 100644 --- a/controllers/flowlogspipeline/flp_monolith_objects.go +++ b/controllers/flowlogspipeline/flp_monolith_objects.go @@ -10,15 +10,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type monolithBuilder struct { generic builder } -func newMonolithBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) monolithBuilder { - gen := newBuilder(ns, image, desired, ConfMonolith, useOpenShiftSCC, cWatcher) +func newMonolithBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) monolithBuilder { + gen := newBuilder(ns, image, desired, ConfMonolith, useOpenShiftSCC) return monolithBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_monolith_reconciler.go b/controllers/flowlogspipeline/flp_monolith_reconciler.go index 1ab7d2280..17317d5a8 100644 --- a/controllers/flowlogspipeline/flp_monolith_reconciler.go +++ b/controllers/flowlogspipeline/flp_monolith_reconciler.go @@ -88,7 +88,7 @@ func (r *flpMonolithReconciler) reconcile(ctx context.Context, desired *flowslat return nil } - builder := newMonolithBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newMonolithBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, dbConfigMap, err := builder.configMap() if err != nil { return err @@ -155,10 +155,6 @@ func (r *flpMonolithReconciler) reconcileDaemonSet(ctx context.Context, desiredD report := helper.NewChangeReport("FLP DaemonSet") defer report.LogIfNeeded(ctx) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &desiredDS.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.daemonSet) { return r.CreateOwned(ctx, desiredDS) } else if helper.PodChanged(&r.owned.daemonSet.Spec.Template, &desiredDS.Spec.Template, constants.FLPName, &report) { diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go index d227d5b06..58c077a3a 100644 --- a/controllers/flowlogspipeline/flp_test.go +++ b/controllers/flowlogspipeline/flp_test.go @@ -33,7 +33,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) var resources = corev1.ResourceRequirements{ @@ -48,7 +47,6 @@ var pullPolicy = corev1.PullIfNotPresent var minReplicas = int32(1) var maxReplicas = int32(5) var targetCPU = int32(75) -var certWatcher = watchers.NewCertificatesWatcher() var outputRecordTypes = flowslatest.LogTypeAll const testNamespace = "flp" @@ -157,14 +155,14 @@ func TestDaemonSetNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.daemonSet(digest) // Check no change cfg = getConfig() - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.daemonSet(digest) @@ -180,14 +178,14 @@ func TestDaemonSetChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.daemonSet(digest) // Check probes enabled change cfg.Processor.EnableKubeProbes = true - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.daemonSet(digest) @@ -218,7 +216,7 @@ func TestDaemonSetChanged(t *testing.T) { // Check log level change cfg.Processor.LogLevel = "info" - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) third := b.daemonSet(digest) @@ -232,7 +230,7 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("500m"), corev1.ResourceMemory: resource.MustParse("500Gi"), } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fourth := b.daemonSet(digest) @@ -246,7 +244,7 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("512Mi"), } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fifth := b.daemonSet(digest) @@ -267,7 +265,7 @@ func TestDaemonSetChanged(t *testing.T) { CertFile: "ca.crt", }, } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) sixth := b.daemonSet(digest) @@ -285,7 +283,7 @@ func TestDaemonSetChanged(t *testing.T) { CertFile: "ca.crt", }, } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) seventh := b.daemonSet(digest) @@ -301,14 +299,14 @@ func TestDeploymentNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) // Check no change cfg = getConfig() - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -324,14 +322,14 @@ func TestDeploymentChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) // Check probes enabled change cfg.Processor.EnableKubeProbes = true - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -346,7 +344,7 @@ func TestDeploymentChanged(t *testing.T) { // Check log level change cfg.Processor.LogLevel = "info" - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) third := b.deployment(digest) @@ -360,7 +358,7 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("500m"), corev1.ResourceMemory: resource.MustParse("500Gi"), } - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fourth := b.deployment(digest) @@ -374,7 +372,7 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("512Mi"), } - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fifth := b.deployment(digest) @@ -389,7 +387,7 @@ func TestDeploymentChanged(t *testing.T) { // Check replicas didn't change because HPA is used cfg2 := cfg cfg2.Processor.KafkaConsumerReplicas = 5 - b = newTransfoBuilder(ns, image, &cfg2, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg2, true) _, digest, _, err = b.configMap() assert.NoError(err) sixth := b.deployment(digest) @@ -405,7 +403,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { // Get first ns := "namespace" cfg := getConfigNoHPA() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) @@ -413,7 +411,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { // Check replicas changed (need to copy flp, as Spec.Replicas stores a pointer) cfg2 := cfg cfg2.Processor.KafkaConsumerReplicas = 5 - b = newTransfoBuilder(ns, image, &cfg2, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg2, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -429,7 +427,7 @@ func TestServiceNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.newPromService() // Check no change @@ -446,12 +444,12 @@ func TestServiceChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.newPromService() // Check port changed cfg.Processor.Metrics.Server.Port = 9999 - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) second := b.fromPromService(first) report := helper.NewChangeReport("") @@ -460,7 +458,7 @@ func TestServiceChanged(t *testing.T) { // Make sure non-service settings doesn't trigger service update cfg.Processor.LogLevel = "error" - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) third := b.fromPromService(first) report = helper.NewChangeReport("") @@ -474,7 +472,7 @@ func TestServiceMonitorNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.serviceMonitor() // Check no change @@ -491,11 +489,11 @@ func TestServiceMonitorChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.serviceMonitor() // Check namespace change - b = newMonolithBuilder("namespace2", image, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image, &cfg, true) second := b.generic.serviceMonitor() report := helper.NewChangeReport("") @@ -503,7 +501,7 @@ func TestServiceMonitorChanged(t *testing.T) { assert.Contains(report.String(), "ServiceMonitor spec changed") // Check labels change - b = newMonolithBuilder("namespace2", image2, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image2, &cfg, true) third := b.generic.serviceMonitor() report = helper.NewChangeReport("") @@ -517,7 +515,7 @@ func TestPrometheusRuleNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.prometheusRule() // Check no change @@ -533,12 +531,12 @@ func TestPrometheusRuleChanged(t *testing.T) { // Get first cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) first := b.generic.prometheusRule() // Check namespace change cfg.Processor.Metrics.DisableAlerts = []flowslatest.FLPAlert{flowslatest.AlertNoFlows} - b = newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace", image, &cfg, true) second := b.generic.prometheusRule() report := helper.NewChangeReport("") @@ -546,7 +544,7 @@ func TestPrometheusRuleChanged(t *testing.T) { assert.Contains(report.String(), "PrometheusRule spec changed") // Check labels change - b = newMonolithBuilder("namespace2", image2, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image2, &cfg, true) third := b.generic.prometheusRule() report = helper.NewChangeReport("") @@ -560,7 +558,7 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) { ns := "namespace" cfg := getConfig() loki := cfg.Loki - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) cm, digest, _, err := b.configMap() assert.NoError(err) assert.NotEmpty(t, digest) @@ -628,9 +626,9 @@ func TestLabels(t *testing.T) { assert := assert.New(t) cfg := getConfig() - builder := newMonolithBuilder("ns", image, &cfg, true, &certWatcher) - tBuilder := newTransfoBuilder("ns", image, &cfg, true, &certWatcher) - iBuilder := newIngestBuilder("ns", image, &cfg, true, &certWatcher) + builder := newMonolithBuilder("ns", image, &cfg, true) + tBuilder := newTransfoBuilder("ns", image, &cfg, true) + iBuilder := newIngestBuilder("ns", image, &cfg, true) // Deployment depl := tBuilder.deployment("digest") @@ -699,7 +697,7 @@ func TestPipelineConfig(t *testing.T) { ns := "namespace" cfg := getConfig() cfg.Processor.LogLevel = "info" - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -708,7 +706,7 @@ func TestPipelineConfig(t *testing.T) { // Kafka Ingester cfg.DeploymentModel = flowslatest.DeploymentModelKafka - bi := newIngestBuilder(ns, image, &cfg, true, &certWatcher) + bi := newIngestBuilder(ns, image, &cfg, true) stages, parameters, err = bi.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -716,7 +714,7 @@ func TestPipelineConfig(t *testing.T) { assert.Equal(`[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, string(jsonStages)) // Kafka Transformer - bt := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + bt := newTransfoBuilder(ns, image, &cfg, true) stages, parameters, _, err = bt.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -732,7 +730,7 @@ func TestPipelineConfigDropUnused(t *testing.T) { cfg := getConfig() cfg.Processor.LogLevel = "info" cfg.Processor.DropUnusedFields = true - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -750,7 +748,7 @@ func TestPipelineTraceStage(t *testing.T) { cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -763,7 +761,7 @@ func TestMergeMetricsConfigurationNoIgnore(t *testing.T) { cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.NotNil(cm) @@ -792,7 +790,7 @@ func TestMergeMetricsConfigurationWithIgnore(t *testing.T) { cfg := getConfig() cfg.Processor.Metrics.IgnoreTags = []string{"nodes"} - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.NotNil(cm) @@ -815,7 +813,7 @@ func TestMergeMetricsConfigurationIgnoreAll(t *testing.T) { cfg := getConfig() cfg.Processor.Metrics.IgnoreTags = []string{"nodes", "namespaces", "workloads"} - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.Nil(cm) @@ -834,7 +832,7 @@ func TestPipelineWithExporter(t *testing.T) { Kafka: flowslatest.FlowCollectorKafka{Address: "kafka-test", Topic: "topic-test"}, }) - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) diff --git a/controllers/flowlogspipeline/flp_transfo_objects.go b/controllers/flowlogspipeline/flp_transfo_objects.go index f029a2aa7..dba3712b2 100644 --- a/controllers/flowlogspipeline/flp_transfo_objects.go +++ b/controllers/flowlogspipeline/flp_transfo_objects.go @@ -11,15 +11,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type transfoBuilder struct { generic builder } -func newTransfoBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) transfoBuilder { - gen := newBuilder(ns, image, desired, ConfKafkaTransformer, useOpenShiftSCC, cWatcher) +func newTransfoBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) transfoBuilder { + gen := newBuilder(ns, image, desired, ConfKafkaTransformer, useOpenShiftSCC) return transfoBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_transfo_reconciler.go b/controllers/flowlogspipeline/flp_transfo_reconciler.go index c12b9d7e7..d98f7db67 100644 --- a/controllers/flowlogspipeline/flp_transfo_reconciler.go +++ b/controllers/flowlogspipeline/flp_transfo_reconciler.go @@ -89,7 +89,7 @@ func (r *flpTransformerReconciler) reconcile(ctx context.Context, desired *flows return nil } - builder := newTransfoBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newTransfoBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, dbConfigMap, err := builder.configMap() if err != nil { return err @@ -126,11 +126,6 @@ func (r *flpTransformerReconciler) reconcileDeployment(ctx context.Context, desi new := builder.deployment(configDigest) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &new.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } - if !r.nobjMngr.Exists(r.owned.deployment) { if err := r.CreateOwned(ctx, new); err != nil { return err diff --git a/controllers/reconcilers/client_helper.go b/controllers/reconcilers/client_helper.go index 6b408c1d2..35cfdf0de 100644 --- a/controllers/reconcilers/client_helper.go +++ b/controllers/reconcilers/client_helper.go @@ -6,7 +6,6 @@ import ( "reflect" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -22,7 +21,6 @@ type ClientHelper struct { SetControllerReference func(client.Object) error changed bool deplInProgress bool - CertWatcher *watchers.CertificatesWatcher } // CreateOwned is an helper function that creates an object, sets owner reference and writes info & errors logs diff --git a/pkg/helper/certificates.go b/pkg/helper/certificates.go index 4c6495cdc..8efea2ea1 100644 --- a/pkg/helper/certificates.go +++ b/pkg/helper/certificates.go @@ -5,40 +5,39 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/pkg/watchers" corev1 "k8s.io/api/core/v1" ) // AppendCertVolumes will add a volume + volume mount for a CA cert if defined, and another volume + volume mount for a user cert if defined. // It does nothing if neither is defined. -func AppendCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.ClientTLS, name string, cWatcher *watchers.CertificatesWatcher) ([]corev1.Volume, []corev1.VolumeMount) { +func AppendCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.ClientTLS, name string) ([]corev1.Volume, []corev1.VolumeMount) { volOut := volumes vmOut := volumeMounts if config.CACert.Name != "" { - vol, vm := buildVolume(config.CACert, constants.CertCAName(name), cWatcher) + vol, vm := buildVolume(config.CACert, constants.CertCAName(name)) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } if config.UserCert.Name != "" { - vol, vm := buildVolume(config.UserCert, constants.CertUserName(name), cWatcher) + vol, vm := buildVolume(config.UserCert, constants.CertUserName(name)) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } return volOut, vmOut } -func AppendSingleCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.CertificateReference, name string, cWatcher *watchers.CertificatesWatcher) ([]corev1.Volume, []corev1.VolumeMount) { +func AppendSingleCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.CertificateReference, name string) ([]corev1.Volume, []corev1.VolumeMount) { volOut := volumes vmOut := volumeMounts if config.Name != "" { - vol, vm := buildVolume(*config, name, cWatcher) + vol, vm := buildVolume(*config, name) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } return volOut, vmOut } -func buildVolume(ref flowslatest.CertificateReference, name string, cWatcher *watchers.CertificatesWatcher) (corev1.Volume, corev1.VolumeMount) { +func buildVolume(ref flowslatest.CertificateReference, name string) (corev1.Volume, corev1.VolumeMount) { var vol corev1.Volume if ref.Type == flowslatest.CertRefTypeConfigMap { vol = corev1.Volume{ @@ -61,7 +60,6 @@ func buildVolume(ref flowslatest.CertificateReference, name string, cWatcher *wa }, } } - cWatcher.SetWatchedCertificate(name, &ref) return vol, corev1.VolumeMount{ Name: name, ReadOnly: true,