From c40f1cc5dd6c450a13d594957075f2d271cff729 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 30 Mar 2023 10:53:32 +0200 Subject: [PATCH] NETOBSERV-963, revert most of cert watching Reverting most of certificate watching (NETOBSERV-684) as it generates a lot of pods restart. We did not necessarily have to do this certificate watching as CM/secrets are updated within volumes. We might however monitor carefully if new (or old) problems arise, potentially due to the kubelet sync delay for updating volumes And also make sure certificates aren't cached in our different workloads --- .../consoleplugin/consoleplugin_objects.go | 9 +- .../consoleplugin/consoleplugin_reconciler.go | 6 +- .../consoleplugin/consoleplugin_test.go | 20 ++--- controllers/ebpf/agent_controller.go | 7 +- controllers/flowcollector_controller.go | 3 +- controllers/flowcollector_controller_test.go | 37 ++------ .../flowlogspipeline/flp_common_objects.go | 11 +-- .../flowlogspipeline/flp_ingest_objects.go | 5 +- .../flowlogspipeline/flp_ingest_reconciler.go | 6 +- .../flowlogspipeline/flp_monolith_objects.go | 5 +- .../flp_monolith_reconciler.go | 6 +- controllers/flowlogspipeline/flp_test.go | 90 +++++++++---------- .../flowlogspipeline/flp_transfo_objects.go | 5 +- .../flp_transfo_reconciler.go | 7 +- controllers/reconcilers/client_helper.go | 2 - pkg/helper/certificates.go | 14 ++- 16 files changed, 83 insertions(+), 150 deletions(-) diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index 23d01b3da..b1e5c3518 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -18,7 +18,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) const secretName = "console-serving-cert" @@ -39,10 +38,9 @@ type builder struct { selector map[string]string desired *flowslatest.FlowCollectorSpec imageName string - cWatcher *watchers.CertificatesWatcher } -func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec, cWatcher *watchers.CertificatesWatcher) builder { +func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec) builder { version := helper.ExtractVersion(imageName) return builder{ namespace: ns, @@ -55,7 +53,6 @@ func newBuilder(ns, imageName string, desired *flowslatest.FlowCollectorSpec, cW }, desired: desired, imageName: imageName, - cWatcher: cWatcher, } } @@ -236,12 +233,12 @@ func (b *builder) podTemplate(cmDigest string) *corev1.PodTemplateSpec { args := buildArgs(b.desired) if b.desired != nil && b.desired.Loki.TLS.Enable && !b.desired.Loki.TLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts) } statusTLS := helper.GetLokiStatusTLS(&b.desired.Loki) if b.desired != nil && statusTLS.Enable && !statusTLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &statusTLS, lokiStatusCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &statusTLS, lokiStatusCerts) } if helper.LokiUseHostToken(&b.desired.Loki) { diff --git a/controllers/consoleplugin/consoleplugin_reconciler.go b/controllers/consoleplugin/consoleplugin_reconciler.go index 8cde07b81..3a6fe86af 100644 --- a/controllers/consoleplugin/consoleplugin_reconciler.go +++ b/controllers/consoleplugin/consoleplugin_reconciler.go @@ -83,7 +83,7 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC } // Create object builder - builder := newBuilder(ns, r.image, &desired.Spec, r.CertWatcher) + builder := newBuilder(ns, r.image, &desired.Spec) if err := r.reconcilePermissions(ctx, &builder); err != nil { return err @@ -196,10 +196,6 @@ func (r *CPReconciler) reconcileDeployment(ctx context.Context, builder builder, defer report.LogIfNeeded(ctx) newDepl := builder.deployment(cmDigest) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &newDepl.Spec.Template, lokiCerts, lokiStatusCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.deployment) { if err := r.CreateOwned(ctx, newDepl); err != nil { return err diff --git a/controllers/consoleplugin/consoleplugin_test.go b/controllers/consoleplugin/consoleplugin_test.go index e66793285..eda1de52b 100644 --- a/controllers/consoleplugin/consoleplugin_test.go +++ b/controllers/consoleplugin/consoleplugin_test.go @@ -13,7 +13,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" promConfig "github.com/prometheus/common/config" ) @@ -28,7 +27,6 @@ var testResources = corev1.ResourceRequirements{ corev1.ResourceMemory: resource.MustParse("512Mi"), }, } -var certWatcher = watchers.NewCertificatesWatcher() func getPluginConfig() flowslatest.FlowCollectorConsolePlugin { return flowslatest.FlowCollectorConsolePlugin{ @@ -111,7 +109,7 @@ func TestContainerUpdateCheck(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://loki:3100/", TenantID: "netobserv"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) old := builder.deployment("digest") new := builder.deployment("digest") report := helper.NewChangeReport("") @@ -163,7 +161,7 @@ func TestContainerUpdateCheck(t *testing.T) { }, }} spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -173,7 +171,7 @@ func TestContainerUpdateCheck(t *testing.T) { //new loki cert name loki.TLS.CACert.Name = "cm-name-2" spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -183,7 +181,7 @@ func TestContainerUpdateCheck(t *testing.T) { //test again no change loki.TLS.CACert.Name = "cm-name-2" spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.False(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -195,7 +193,7 @@ func TestContainerUpdateCheck(t *testing.T) { loki.StatusTLS.Enable = true spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -210,7 +208,7 @@ func TestContainerUpdateCheck(t *testing.T) { } spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -226,7 +224,7 @@ func TestContainerUpdateCheck(t *testing.T) { } spec = flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder = newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder = newBuilder(testNamespace, testImage, &spec) new = builder.deployment("digest") report = helper.NewChangeReport("") assert.True(helper.PodChanged(&old.Spec.Template, &new.Spec.Template, constants.PluginName, &report)) @@ -264,7 +262,7 @@ func TestBuiltService(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://foo:1234"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) newService := builder.service(nil) report := helper.NewChangeReport("") assert.Equal(serviceNeedsUpdate(newService, &plugin, &report), false) @@ -277,7 +275,7 @@ func TestLabels(t *testing.T) { plugin := getPluginConfig() loki := flowslatest.FlowCollectorLoki{URL: "http://foo:1234"} spec := flowslatest.FlowCollectorSpec{ConsolePlugin: plugin, Loki: loki} - builder := newBuilder(testNamespace, testImage, &spec, &certWatcher) + builder := newBuilder(testNamespace, testImage, &spec) // Deployment depl := builder.deployment("digest") diff --git a/controllers/ebpf/agent_controller.go b/controllers/ebpf/agent_controller.go index 98e637f10..9c233d1ea 100644 --- a/controllers/ebpf/agent_controller.go +++ b/controllers/ebpf/agent_controller.go @@ -131,11 +131,6 @@ func (c *AgentController) Reconcile( } desired := c.desired(target) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := c.client.CertWatcher.AnnotatePod(ctx, c.client, &desired.Spec.Template, kafkaCerts); err != nil { - return err - } - switch c.requiredAction(current, desired) { case actionCreate: rlog.Info("action: create agent") @@ -175,7 +170,7 @@ func (c *AgentController) desired(coll *flowslatest.FlowCollector) *v1.DaemonSet if helper.UseKafka(&coll.Spec) && coll.Spec.Kafka.TLS.Enable { // NOTE: secrets need to be copied from the base netobserv namespace to the privileged one. // This operation must currently be performed manually (run "make fix-ebpf-kafka-tls"). It could be automated here. - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &coll.Spec.Kafka.TLS, kafkaCerts, c.client.CertWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &coll.Spec.Kafka.TLS, kafkaCerts) } return &v1.DaemonSet{ diff --git a/controllers/flowcollector_controller.go b/controllers/flowcollector_controller.go index 8309bf2a0..fe5b0d7d3 100644 --- a/controllers/flowcollector_controller.go +++ b/controllers/flowcollector_controller.go @@ -368,8 +368,7 @@ func (r *FlowCollectorReconciler) finalize(ctx context.Context, desired *flowsla func (r *FlowCollectorReconciler) newClientHelper(desired *flowslatest.FlowCollector) reconcilers.ClientHelper { return reconcilers.ClientHelper{ - CertWatcher: r.certWatcher, - Client: r.Client, + Client: r.Client, SetControllerReference: func(obj client.Object) error { return ctrl.SetControllerReference(desired, obj, r.Scheme) }, diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index 30c975507..6fbacf6c2 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -593,9 +593,8 @@ func flowCollectorControllerSpecs() { }) }) - Context("Using and watching certificates", func() { + Context("Using certificates", func() { flpDS := appsv1.DaemonSet{} - var certStamp1, certStamp2 string It("Should update Loki to use TLS", func() { // Create CM certificate Expect(k8sClient.Create(ctx, &v1.ConfigMap{ @@ -622,33 +621,8 @@ func flowCollectorControllerSpecs() { if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { return err } - certStamp1 = flpDS.Spec.Template.Annotations["flows.netobserv.io/cert-loki-certs-ca"] - return certStamp1 - }, timeout, interval).Should(Not(BeEmpty())) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(2)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) - }) - - It("Should watch certificate update", func() { - By("Updating certificate") - Expect(k8sClient.Update(ctx, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki-ca", - Namespace: operatorNamespace, - }, - Data: map[string]string{"test": "test"}, - })).Should(Succeed()) - - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - certStamp2 = flpDS.Spec.Template.Annotations["flows.netobserv.io/cert-loki-certs-ca"] - return certStamp2 - }, timeout, interval).Should(Not(Equal(certStamp1))) - Expect(certStamp2).To(Not(BeEmpty())) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(2)) + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(2)) Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) }) @@ -663,9 +637,8 @@ func flowCollectorControllerSpecs() { if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { return err } - return flpDS.Spec.Template.Annotations - }, timeout, interval).Should(Not(HaveKey("flows.netobserv.io/cert-loki-certs-ca"))) - Expect(flpDS.Spec.Template.Spec.Volumes).To(HaveLen(1)) + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) }) }) diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index b654c5c19..44cb8fbd4 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -24,7 +24,6 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/filters" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) const ( @@ -80,10 +79,9 @@ type builder struct { confKind ConfKind useOpenShiftSCC bool image string - cWatcher *watchers.CertificatesWatcher } -func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck ConfKind, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) builder { +func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck ConfKind, useOpenShiftSCC bool) builder { version := helper.ExtractVersion(image) name := name(ck) var promTLS flowslatest.CertificateReference @@ -112,7 +110,6 @@ func newBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, ck Con useOpenShiftSCC: useOpenShiftSCC, promTLS: &promTLS, image: image, - cWatcher: cWatcher, } } @@ -185,12 +182,12 @@ func (b *builder) podTemplate(hasHostPort, hasLokiInterface, hostNetwork bool, c }} if helper.UseKafka(b.desired) && b.desired.Kafka.TLS.Enable { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Kafka.TLS, kafkaCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Kafka.TLS, kafkaCerts) } if hasLokiInterface { if b.desired.Loki.TLS.Enable && !b.desired.Loki.TLS.InsecureSkipVerify { - volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendCertVolumes(volumes, volumeMounts, &b.desired.Loki.TLS, lokiCerts) } if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) { volumes, volumeMounts = helper.AppendTokenVolume(volumes, volumeMounts, lokiToken, constants.FLPName) @@ -198,7 +195,7 @@ func (b *builder) podTemplate(hasHostPort, hasLokiInterface, hostNetwork bool, c } if b.desired.Processor.Metrics.Server.TLS.Type != flowslatest.ServerTLSDisabled { - volumes, volumeMounts = helper.AppendSingleCertVolumes(volumes, volumeMounts, b.promTLS, promCerts, b.cWatcher) + volumes, volumeMounts = helper.AppendSingleCertVolumes(volumes, volumeMounts, b.promTLS, promCerts) } var envs []corev1.EnvVar diff --git a/controllers/flowlogspipeline/flp_ingest_objects.go b/controllers/flowlogspipeline/flp_ingest_objects.go index 175cfe834..860fc2b07 100644 --- a/controllers/flowlogspipeline/flp_ingest_objects.go +++ b/controllers/flowlogspipeline/flp_ingest_objects.go @@ -10,15 +10,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type ingestBuilder struct { generic builder } -func newIngestBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) ingestBuilder { - gen := newBuilder(ns, image, desired, ConfKafkaIngester, useOpenShiftSCC, cWatcher) +func newIngestBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) ingestBuilder { + gen := newBuilder(ns, image, desired, ConfKafkaIngester, useOpenShiftSCC) return ingestBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_ingest_reconciler.go b/controllers/flowlogspipeline/flp_ingest_reconciler.go index d807ca2c0..5896ff373 100644 --- a/controllers/flowlogspipeline/flp_ingest_reconciler.go +++ b/controllers/flowlogspipeline/flp_ingest_reconciler.go @@ -85,7 +85,7 @@ func (r *flpIngesterReconciler) reconcile(ctx context.Context, desired *flowslat return nil } - builder := newIngestBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newIngestBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, err := builder.configMap() if err != nil { return err @@ -147,10 +147,6 @@ func (r *flpIngesterReconciler) reconcileDaemonSet(ctx context.Context, desiredD report := helper.NewChangeReport("FLP DaemonSet") defer report.LogIfNeeded(ctx) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &desiredDS.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.daemonSet) { return r.CreateOwned(ctx, desiredDS) } else if helper.PodChanged(&r.owned.daemonSet.Spec.Template, &desiredDS.Spec.Template, constants.FLPName, &report) { diff --git a/controllers/flowlogspipeline/flp_monolith_objects.go b/controllers/flowlogspipeline/flp_monolith_objects.go index da6941661..92351bd53 100644 --- a/controllers/flowlogspipeline/flp_monolith_objects.go +++ b/controllers/flowlogspipeline/flp_monolith_objects.go @@ -10,15 +10,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type monolithBuilder struct { generic builder } -func newMonolithBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) monolithBuilder { - gen := newBuilder(ns, image, desired, ConfMonolith, useOpenShiftSCC, cWatcher) +func newMonolithBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) monolithBuilder { + gen := newBuilder(ns, image, desired, ConfMonolith, useOpenShiftSCC) return monolithBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_monolith_reconciler.go b/controllers/flowlogspipeline/flp_monolith_reconciler.go index 1ab7d2280..17317d5a8 100644 --- a/controllers/flowlogspipeline/flp_monolith_reconciler.go +++ b/controllers/flowlogspipeline/flp_monolith_reconciler.go @@ -88,7 +88,7 @@ func (r *flpMonolithReconciler) reconcile(ctx context.Context, desired *flowslat return nil } - builder := newMonolithBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newMonolithBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, dbConfigMap, err := builder.configMap() if err != nil { return err @@ -155,10 +155,6 @@ func (r *flpMonolithReconciler) reconcileDaemonSet(ctx context.Context, desiredD report := helper.NewChangeReport("FLP DaemonSet") defer report.LogIfNeeded(ctx) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &desiredDS.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } if !r.nobjMngr.Exists(r.owned.daemonSet) { return r.CreateOwned(ctx, desiredDS) } else if helper.PodChanged(&r.owned.daemonSet.Spec.Template, &desiredDS.Spec.Template, constants.FLPName, &report) { diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go index d227d5b06..58c077a3a 100644 --- a/controllers/flowlogspipeline/flp_test.go +++ b/controllers/flowlogspipeline/flp_test.go @@ -33,7 +33,6 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) var resources = corev1.ResourceRequirements{ @@ -48,7 +47,6 @@ var pullPolicy = corev1.PullIfNotPresent var minReplicas = int32(1) var maxReplicas = int32(5) var targetCPU = int32(75) -var certWatcher = watchers.NewCertificatesWatcher() var outputRecordTypes = flowslatest.LogTypeAll const testNamespace = "flp" @@ -157,14 +155,14 @@ func TestDaemonSetNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.daemonSet(digest) // Check no change cfg = getConfig() - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.daemonSet(digest) @@ -180,14 +178,14 @@ func TestDaemonSetChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.daemonSet(digest) // Check probes enabled change cfg.Processor.EnableKubeProbes = true - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.daemonSet(digest) @@ -218,7 +216,7 @@ func TestDaemonSetChanged(t *testing.T) { // Check log level change cfg.Processor.LogLevel = "info" - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) third := b.daemonSet(digest) @@ -232,7 +230,7 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("500m"), corev1.ResourceMemory: resource.MustParse("500Gi"), } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fourth := b.daemonSet(digest) @@ -246,7 +244,7 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("512Mi"), } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fifth := b.daemonSet(digest) @@ -267,7 +265,7 @@ func TestDaemonSetChanged(t *testing.T) { CertFile: "ca.crt", }, } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) sixth := b.daemonSet(digest) @@ -285,7 +283,7 @@ func TestDaemonSetChanged(t *testing.T) { CertFile: "ca.crt", }, } - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) seventh := b.daemonSet(digest) @@ -301,14 +299,14 @@ func TestDeploymentNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) // Check no change cfg = getConfig() - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -324,14 +322,14 @@ func TestDeploymentChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) // Check probes enabled change cfg.Processor.EnableKubeProbes = true - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -346,7 +344,7 @@ func TestDeploymentChanged(t *testing.T) { // Check log level change cfg.Processor.LogLevel = "info" - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) third := b.deployment(digest) @@ -360,7 +358,7 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("500m"), corev1.ResourceMemory: resource.MustParse("500Gi"), } - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fourth := b.deployment(digest) @@ -374,7 +372,7 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("512Mi"), } - b = newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err = b.configMap() assert.NoError(err) fifth := b.deployment(digest) @@ -389,7 +387,7 @@ func TestDeploymentChanged(t *testing.T) { // Check replicas didn't change because HPA is used cfg2 := cfg cfg2.Processor.KafkaConsumerReplicas = 5 - b = newTransfoBuilder(ns, image, &cfg2, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg2, true) _, digest, _, err = b.configMap() assert.NoError(err) sixth := b.deployment(digest) @@ -405,7 +403,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { // Get first ns := "namespace" cfg := getConfigNoHPA() - b := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + b := newTransfoBuilder(ns, image, &cfg, true) _, digest, _, err := b.configMap() assert.NoError(err) first := b.deployment(digest) @@ -413,7 +411,7 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { // Check replicas changed (need to copy flp, as Spec.Replicas stores a pointer) cfg2 := cfg cfg2.Processor.KafkaConsumerReplicas = 5 - b = newTransfoBuilder(ns, image, &cfg2, true, &certWatcher) + b = newTransfoBuilder(ns, image, &cfg2, true) _, digest, _, err = b.configMap() assert.NoError(err) second := b.deployment(digest) @@ -429,7 +427,7 @@ func TestServiceNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.newPromService() // Check no change @@ -446,12 +444,12 @@ func TestServiceChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.newPromService() // Check port changed cfg.Processor.Metrics.Server.Port = 9999 - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) second := b.fromPromService(first) report := helper.NewChangeReport("") @@ -460,7 +458,7 @@ func TestServiceChanged(t *testing.T) { // Make sure non-service settings doesn't trigger service update cfg.Processor.LogLevel = "error" - b = newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b = newMonolithBuilder(ns, image, &cfg, true) third := b.fromPromService(first) report = helper.NewChangeReport("") @@ -474,7 +472,7 @@ func TestServiceMonitorNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.serviceMonitor() // Check no change @@ -491,11 +489,11 @@ func TestServiceMonitorChanged(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.serviceMonitor() // Check namespace change - b = newMonolithBuilder("namespace2", image, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image, &cfg, true) second := b.generic.serviceMonitor() report := helper.NewChangeReport("") @@ -503,7 +501,7 @@ func TestServiceMonitorChanged(t *testing.T) { assert.Contains(report.String(), "ServiceMonitor spec changed") // Check labels change - b = newMonolithBuilder("namespace2", image2, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image2, &cfg, true) third := b.generic.serviceMonitor() report = helper.NewChangeReport("") @@ -517,7 +515,7 @@ func TestPrometheusRuleNoChange(t *testing.T) { // Get first ns := "namespace" cfg := getConfig() - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) first := b.generic.prometheusRule() // Check no change @@ -533,12 +531,12 @@ func TestPrometheusRuleChanged(t *testing.T) { // Get first cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) first := b.generic.prometheusRule() // Check namespace change cfg.Processor.Metrics.DisableAlerts = []flowslatest.FLPAlert{flowslatest.AlertNoFlows} - b = newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace", image, &cfg, true) second := b.generic.prometheusRule() report := helper.NewChangeReport("") @@ -546,7 +544,7 @@ func TestPrometheusRuleChanged(t *testing.T) { assert.Contains(report.String(), "PrometheusRule spec changed") // Check labels change - b = newMonolithBuilder("namespace2", image2, &cfg, true, &certWatcher) + b = newMonolithBuilder("namespace2", image2, &cfg, true) third := b.generic.prometheusRule() report = helper.NewChangeReport("") @@ -560,7 +558,7 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) { ns := "namespace" cfg := getConfig() loki := cfg.Loki - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) cm, digest, _, err := b.configMap() assert.NoError(err) assert.NotEmpty(t, digest) @@ -628,9 +626,9 @@ func TestLabels(t *testing.T) { assert := assert.New(t) cfg := getConfig() - builder := newMonolithBuilder("ns", image, &cfg, true, &certWatcher) - tBuilder := newTransfoBuilder("ns", image, &cfg, true, &certWatcher) - iBuilder := newIngestBuilder("ns", image, &cfg, true, &certWatcher) + builder := newMonolithBuilder("ns", image, &cfg, true) + tBuilder := newTransfoBuilder("ns", image, &cfg, true) + iBuilder := newIngestBuilder("ns", image, &cfg, true) // Deployment depl := tBuilder.deployment("digest") @@ -699,7 +697,7 @@ func TestPipelineConfig(t *testing.T) { ns := "namespace" cfg := getConfig() cfg.Processor.LogLevel = "info" - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -708,7 +706,7 @@ func TestPipelineConfig(t *testing.T) { // Kafka Ingester cfg.DeploymentModel = flowslatest.DeploymentModelKafka - bi := newIngestBuilder(ns, image, &cfg, true, &certWatcher) + bi := newIngestBuilder(ns, image, &cfg, true) stages, parameters, err = bi.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -716,7 +714,7 @@ func TestPipelineConfig(t *testing.T) { assert.Equal(`[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, string(jsonStages)) // Kafka Transformer - bt := newTransfoBuilder(ns, image, &cfg, true, &certWatcher) + bt := newTransfoBuilder(ns, image, &cfg, true) stages, parameters, _, err = bt.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -732,7 +730,7 @@ func TestPipelineConfigDropUnused(t *testing.T) { cfg := getConfig() cfg.Processor.LogLevel = "info" cfg.Processor.DropUnusedFields = true - b := newMonolithBuilder(ns, image, &cfg, true, &certWatcher) + b := newMonolithBuilder(ns, image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -750,7 +748,7 @@ func TestPipelineTraceStage(t *testing.T) { cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) @@ -763,7 +761,7 @@ func TestMergeMetricsConfigurationNoIgnore(t *testing.T) { cfg := getConfig() - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.NotNil(cm) @@ -792,7 +790,7 @@ func TestMergeMetricsConfigurationWithIgnore(t *testing.T) { cfg := getConfig() cfg.Processor.Metrics.IgnoreTags = []string{"nodes"} - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.NotNil(cm) @@ -815,7 +813,7 @@ func TestMergeMetricsConfigurationIgnoreAll(t *testing.T) { cfg := getConfig() cfg.Processor.Metrics.IgnoreTags = []string{"nodes", "namespaces", "workloads"} - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, cm, err := b.buildPipelineConfig() assert.NoError(err) assert.Nil(cm) @@ -834,7 +832,7 @@ func TestPipelineWithExporter(t *testing.T) { Kafka: flowslatest.FlowCollectorKafka{Address: "kafka-test", Topic: "topic-test"}, }) - b := newMonolithBuilder("namespace", image, &cfg, true, &certWatcher) + b := newMonolithBuilder("namespace", image, &cfg, true) stages, parameters, _, err := b.buildPipelineConfig() assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) diff --git a/controllers/flowlogspipeline/flp_transfo_objects.go b/controllers/flowlogspipeline/flp_transfo_objects.go index f029a2aa7..dba3712b2 100644 --- a/controllers/flowlogspipeline/flp_transfo_objects.go +++ b/controllers/flowlogspipeline/flp_transfo_objects.go @@ -11,15 +11,14 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" ) type transfoBuilder struct { generic builder } -func newTransfoBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool, cWatcher *watchers.CertificatesWatcher) transfoBuilder { - gen := newBuilder(ns, image, desired, ConfKafkaTransformer, useOpenShiftSCC, cWatcher) +func newTransfoBuilder(ns, image string, desired *flowslatest.FlowCollectorSpec, useOpenShiftSCC bool) transfoBuilder { + gen := newBuilder(ns, image, desired, ConfKafkaTransformer, useOpenShiftSCC) return transfoBuilder{ generic: gen, } diff --git a/controllers/flowlogspipeline/flp_transfo_reconciler.go b/controllers/flowlogspipeline/flp_transfo_reconciler.go index c12b9d7e7..d98f7db67 100644 --- a/controllers/flowlogspipeline/flp_transfo_reconciler.go +++ b/controllers/flowlogspipeline/flp_transfo_reconciler.go @@ -89,7 +89,7 @@ func (r *flpTransformerReconciler) reconcile(ctx context.Context, desired *flows return nil } - builder := newTransfoBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC, r.CertWatcher) + builder := newTransfoBuilder(r.nobjMngr.Namespace, r.image, &desired.Spec, r.useOpenShiftSCC) newCM, configDigest, dbConfigMap, err := builder.configMap() if err != nil { return err @@ -126,11 +126,6 @@ func (r *flpTransformerReconciler) reconcileDeployment(ctx context.Context, desi new := builder.deployment(configDigest) - // Annotate pod with certificate reference so that it is reloaded if modified - if err := r.CertWatcher.AnnotatePod(ctx, r.Client, &new.Spec.Template, lokiCerts, kafkaCerts); err != nil { - return err - } - if !r.nobjMngr.Exists(r.owned.deployment) { if err := r.CreateOwned(ctx, new); err != nil { return err diff --git a/controllers/reconcilers/client_helper.go b/controllers/reconcilers/client_helper.go index 6b408c1d2..35cfdf0de 100644 --- a/controllers/reconcilers/client_helper.go +++ b/controllers/reconcilers/client_helper.go @@ -6,7 +6,6 @@ import ( "reflect" "github.com/netobserv/network-observability-operator/pkg/helper" - "github.com/netobserv/network-observability-operator/pkg/watchers" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -22,7 +21,6 @@ type ClientHelper struct { SetControllerReference func(client.Object) error changed bool deplInProgress bool - CertWatcher *watchers.CertificatesWatcher } // CreateOwned is an helper function that creates an object, sets owner reference and writes info & errors logs diff --git a/pkg/helper/certificates.go b/pkg/helper/certificates.go index 4c6495cdc..8efea2ea1 100644 --- a/pkg/helper/certificates.go +++ b/pkg/helper/certificates.go @@ -5,40 +5,39 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" "github.com/netobserv/network-observability-operator/controllers/constants" - "github.com/netobserv/network-observability-operator/pkg/watchers" corev1 "k8s.io/api/core/v1" ) // AppendCertVolumes will add a volume + volume mount for a CA cert if defined, and another volume + volume mount for a user cert if defined. // It does nothing if neither is defined. -func AppendCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.ClientTLS, name string, cWatcher *watchers.CertificatesWatcher) ([]corev1.Volume, []corev1.VolumeMount) { +func AppendCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.ClientTLS, name string) ([]corev1.Volume, []corev1.VolumeMount) { volOut := volumes vmOut := volumeMounts if config.CACert.Name != "" { - vol, vm := buildVolume(config.CACert, constants.CertCAName(name), cWatcher) + vol, vm := buildVolume(config.CACert, constants.CertCAName(name)) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } if config.UserCert.Name != "" { - vol, vm := buildVolume(config.UserCert, constants.CertUserName(name), cWatcher) + vol, vm := buildVolume(config.UserCert, constants.CertUserName(name)) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } return volOut, vmOut } -func AppendSingleCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.CertificateReference, name string, cWatcher *watchers.CertificatesWatcher) ([]corev1.Volume, []corev1.VolumeMount) { +func AppendSingleCertVolumes(volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, config *flowslatest.CertificateReference, name string) ([]corev1.Volume, []corev1.VolumeMount) { volOut := volumes vmOut := volumeMounts if config.Name != "" { - vol, vm := buildVolume(*config, name, cWatcher) + vol, vm := buildVolume(*config, name) volOut = append(volOut, vol) vmOut = append(vmOut, vm) } return volOut, vmOut } -func buildVolume(ref flowslatest.CertificateReference, name string, cWatcher *watchers.CertificatesWatcher) (corev1.Volume, corev1.VolumeMount) { +func buildVolume(ref flowslatest.CertificateReference, name string) (corev1.Volume, corev1.VolumeMount) { var vol corev1.Volume if ref.Type == flowslatest.CertRefTypeConfigMap { vol = corev1.Volume{ @@ -61,7 +60,6 @@ func buildVolume(ref flowslatest.CertificateReference, name string, cWatcher *wa }, } } - cWatcher.SetWatchedCertificate(name, &ref) return vol, corev1.VolumeMount{ Name: name, ReadOnly: true,