From e5f4ae501ae3379f6fc2b185e242a27b21b6dc8a Mon Sep 17 00:00:00 2001 From: Andrea Tarocchi Date: Tue, 16 Jan 2024 15:38:19 +0100 Subject: [PATCH 1/2] fix(#4948): Move handling of IntegrationPlatform CR to a separate operator. Add a separate platformcontroller subcommand to kamel, amend install command and other installations (OLM, kustomize, helm) as needed. The platformcontroller works as the operator command but runs an operator that handles just the IntegrationPlatform crd; the operator dose not manage IntegrationPlatform crd any more. --- e2e/advanced/operator_metrics_test.go | 110 ++++---- e2e/support/test_support.go | 10 +- .../camel-k/templates/platformcontroller.yaml | 111 ++++++++ pkg/cmd/platformcontroller.go | 70 +++++ .../platformcontroller/platformcontroller.go | 244 ++++++++++++++++++ pkg/cmd/platformcontroller_test.go | 84 ++++++ pkg/cmd/root.go | 3 +- pkg/controller/add_integrationplatform.go | 2 +- pkg/controller/platformcontroller.go | 39 +++ pkg/install/common.go | 4 + pkg/install/operator.go | 30 ++- pkg/platform/operator.go | 16 ++ pkg/platform/platformcontroller.go | 35 +++ .../config/manager/kustomization.yaml | 1 + .../platformcontroller-deployment.yaml | 89 +++++++ .../config/prometheus/kustomization.yaml | 2 + .../platformcontroller-pod-monitor.yaml | 31 +++ .../platformcontroller-prometheus-rule.yaml | 55 ++++ 18 files changed, 879 insertions(+), 57 deletions(-) create mode 100644 helm/camel-k/templates/platformcontroller.yaml create mode 100644 pkg/cmd/platformcontroller.go create mode 100644 pkg/cmd/platformcontroller/platformcontroller.go create mode 100644 pkg/cmd/platformcontroller_test.go create mode 100644 pkg/controller/platformcontroller.go create mode 100644 pkg/platform/platformcontroller.go create mode 100644 pkg/resources/config/manager/platformcontroller-deployment.yaml create mode 100644 pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml create mode 100644 pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml diff --git a/e2e/advanced/operator_metrics_test.go b/e2e/advanced/operator_metrics_test.go index 5efa4c1638..803691efa1 100644 --- a/e2e/advanced/operator_metrics_test.go +++ b/e2e/advanced/operator_metrics_test.go @@ -72,23 +72,24 @@ func TestMetrics(t *testing.T) { Should(Equal(corev1.ConditionTrue)) g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) - pod := OperatorPod(t, ctx, ns)() - g.Expect(pod).NotTo(BeNil()) + operatorPod := OperatorPod(t, ctx, ns)() + g.Expect(operatorPod).NotTo(BeNil()) // pod.Namespace could be different from ns if using global operator - fmt.Printf("Fetching logs for operator pod %s in namespace %s", pod.Name, pod.Namespace) + fmt.Printf("Fetching logs for operator pod %s in namespace %s", operatorPod.Name, operatorPod.Namespace) logOptions := &corev1.PodLogOptions{ Container: "camel-k-operator", } - logs, err := StructuredLogs(t, ctx, pod.Namespace, pod.Name, logOptions, false) + logs, err := StructuredLogs(t, ctx, operatorPod.Namespace, operatorPod.Name, logOptions, false) g.Expect(err).To(BeNil()) g.Expect(logs).NotTo(BeEmpty()) - response, err := TestClient(t).CoreV1().RESTClient().Get(). - AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", pod.Namespace, pod.Name)).DoRaw(ctx) - g.Expect(err).To(BeNil()) - metrics, err := parsePrometheusData(response) - g.Expect(err).To(BeNil()) + platformcontrollerPod := PlatformcontrollerPod(t, ctx, ns)() + Expect(platformcontrollerPod).NotTo(BeNil()) + + operatorLogs, operatorMetrics := getLogsAndMetrics(t, ctx, operatorPod, "camel-k-operator") + + platformcontrollerLogs, platformcontrollerMetrics := getLogsAndMetrics(t, ctx, platformcontrollerPod, "camel-k-platformcontroller") it := Integration(t, ctx, ns, name)() g.Expect(it).NotTo(BeNil()) @@ -100,9 +101,9 @@ func TestMetrics(t *testing.T) { duration, err := time.ParseDuration(build.Status.Duration) g.Expect(err).To(BeNil()) - // Check it's consistent with the duration observed from logs + // Check it's consistent with the duration observed from operatorLogs var ts1, ts2 time.Time - err = NewLogWalker(&logs). + err = NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -132,8 +133,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<", 10)) // Check the duration is observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_duration_seconds")) - g.Expect(metrics["camel_k_build_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_duration_seconds")) + g.Expect(operatorMetrics["camel_k_build_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_duration_seconds"), Help: stringP("Camel K build duration"), @@ -159,8 +160,8 @@ func TestMetrics(t *testing.T) { // Check there are no failures reported in the Build status g.Expect(build.Status.Failure).To(BeNil()) - // Check no recovery attempts are reported in the logs - recoveryAttempts, err := NewLogCounter(&logs).Count(MatchFields(IgnoreExtras, Fields{ + // Check no recovery attempts are reported in the operatorLogs + recoveryAttempts, err := NewLogCounter(&operatorLogs).Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": HavePrefix("Recovery attempt"), "Kind": Equal("Build"), @@ -170,8 +171,8 @@ func TestMetrics(t *testing.T) { g.Expect(recoveryAttempts).To(BeNumerically("==", 0)) // Check no recovery attempts are observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_recovery_attempts")) - g.Expect(metrics["camel_k_build_recovery_attempts"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_recovery_attempts")) + g.Expect(operatorMetrics["camel_k_build_recovery_attempts"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_recovery_attempts"), Help: stringP("Camel K build recovery attempts"), @@ -194,8 +195,8 @@ func TestMetrics(t *testing.T) { }) t.Run("reconciliation duration metric", func(t *testing.T) { - g.Expect(metrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) - g.Expect(metrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, + g.Expect(platformcontrollerMetrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) + g.Expect(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, Fields{ "Name": EqualP("camel_k_reconciliation_duration_seconds"), "Help": EqualP("Camel K reconciliation loop duration"), @@ -203,10 +204,10 @@ func TestMetrics(t *testing.T) { }, ))) - counter := NewLogCounter(&logs) + platformcontrollerCounter := NewLogCounter(&platformcontrollerLogs) // Count the number of IntegrationPlatform reconciliations - platformReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + platformReconciliations, err := platformcontrollerCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationplatform"), "Message": Equal("Reconciling IntegrationPlatform"), "RequestNamespace": Equal(ns), @@ -215,7 +216,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - platformReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformReconciled := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -230,7 +231,7 @@ func TestMetrics(t *testing.T) { platformReconciledCount := *platformReconciled.Histogram.SampleCount g.Expect(platformReconciledCount).To(BeNumerically(">", 0)) - platformRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformRequeued := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -246,7 +247,7 @@ func TestMetrics(t *testing.T) { platformRequeuedCount = *platformRequeued.Histogram.SampleCount } - platformErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformErrored := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -267,8 +268,9 @@ func TestMetrics(t *testing.T) { g.Expect(platformReconciliations).To(BeNumerically("==", platformReconciledCount+platformRequeuedCount+platformErroredCount)) + operatorCounter := NewLogCounter(&operatorLogs) // Count the number of Integration reconciliations - integrationReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), "RequestNamespace": Equal(it.Namespace), @@ -278,7 +280,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -293,7 +295,7 @@ func TestMetrics(t *testing.T) { integrationReconciledCount := *integrationReconciled.Histogram.SampleCount g.Expect(integrationReconciledCount).To(BeNumerically(">", 0)) - integrationRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -309,7 +311,7 @@ func TestMetrics(t *testing.T) { integrationRequeuedCount = *integrationRequeued.Histogram.SampleCount } - integrationErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationErrored := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -331,7 +333,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically("==", integrationReconciledCount+integrationRequeuedCount+integrationErroredCount)) // Count the number of IntegrationKit reconciliations - integrationKitReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationKitReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationkit"), "Message": Equal("Reconciling IntegrationKit"), "RequestNamespace": Equal(it.Status.IntegrationKit.Namespace), @@ -341,7 +343,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationKitReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -357,7 +359,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciledCount).To(BeNumerically(">", 0)) // Kit can be requeued, above all when a catalog needs to be built - integrationKitRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -380,7 +382,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically("==", integrationKitReconciledCount+integrationKitRequeuedCount)) // Count the number of Build reconciliations - buildReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + buildReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("Reconciling Build"), "RequestNamespace": Equal(build.Namespace), @@ -389,7 +391,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - buildReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -404,7 +406,7 @@ func TestMetrics(t *testing.T) { buildReconciledCount := *buildReconciled.Histogram.SampleCount g.Expect(buildReconciledCount).To(BeNumerically(">", 0)) - buildRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -431,8 +433,8 @@ func TestMetrics(t *testing.T) { // The start queuing time is taken from the creation time ts1 = build.CreationTimestamp.Time - // Retrieve the end queuing time from the logs - err = NewLogWalker(&logs). + // Retrieve the end queuing time from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -448,8 +450,8 @@ func TestMetrics(t *testing.T) { durationFromLogs := ts2.Sub(ts1) // Retrieve the queuing duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_build_queue_duration_seconds")) - metric := metrics["camel_k_build_queue_duration_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_queue_duration_seconds")) + metric := operatorMetrics["camel_k_build_queue_duration_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -461,7 +463,7 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(durationFromLogs.Seconds() - duration)).To(BeNumerically("<", 1)) // Check the queuing duration is correctly observed in the corresponding metric - g.Expect(metrics["camel_k_build_queue_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics["camel_k_build_queue_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_queue_duration_seconds"), Help: stringP("Camel K build queue duration"), @@ -494,8 +496,8 @@ func TestMetrics(t *testing.T) { duration := ts2.Sub(ts1) - // Retrieve these start and end times from the logs - err = NewLogWalker(&logs). + // Retrieve these start and end times from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), @@ -519,8 +521,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<=", 1)) // Retrieve the first readiness duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - metric := metrics["camel_k_integration_first_readiness_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + metric := operatorMetrics["camel_k_integration_first_readiness_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -530,8 +532,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(*histogram.SampleSum - d)).To(BeNumerically("<=", 1)) // Check the duration is correctly observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - g.Expect(metrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + g.Expect(operatorMetrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_integration_first_readiness_seconds"), Help: stringP("Camel K integration time to first readiness"), @@ -554,6 +556,24 @@ func TestMetrics(t *testing.T) { }) } +func getLogsAndMetrics(t *testing.T, ctx context.Context, componentPod *corev1.Pod, containerName string) ([]LogEntry, map[string]*prometheus.MetricFamily) { + // componentPod.Namespace could be different from ns if using global operator + fmt.Printf("Fetching logs for component pod %s in namespace %s", componentPod.Name, componentPod.Namespace) + logOptions := &corev1.PodLogOptions{ + Container: containerName, + } + logs, err := StructuredLogs(t, ctx, componentPod.Namespace, componentPod.Name, logOptions, false) + Expect(err).To(BeNil()) + Expect(logs).NotTo(BeEmpty()) + + response, err := TestClient(t).CoreV1().RESTClient().Get(). + AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", componentPod.Namespace, componentPod.Name)).DoRaw(ctx) + Expect(err).To(BeNil()) + metrics, err := parsePrometheusData(response) + Expect(err).To(BeNil()) + return logs, metrics +} + func getMetric(family *prometheus.MetricFamily, matcher types.GomegaMatcher) *prometheus.Metric { for _, metric := range family.Metric { if match, err := matcher.Match(metric); err != nil { diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 3cde54a750..e91a8819bc 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -2420,6 +2420,14 @@ func ConsoleCLIDownload(t *testing.T, ctx context.Context, name string) func() * } func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "operator") +} + +func PlatformcontrollerPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "platformcontroller") +} + +func componentPod(t *testing.T, ctx context.Context, ns string, componentLabelValue string) func() *corev1.Pod { return func() *corev1.Pod { lst := corev1.PodList{ TypeMeta: metav1.TypeMeta{ @@ -2430,7 +2438,7 @@ func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Po if err := TestClient(t).List(ctx, &lst, ctrl.InNamespace(ns), ctrl.MatchingLabels{ - "camel.apache.org/component": "operator", + "camel.apache.org/component": componentLabelValue, }); err != nil { failTest(t, err) } diff --git a/helm/camel-k/templates/platformcontroller.yaml b/helm/camel-k/templates/platformcontroller.yaml new file mode 100644 index 0000000000..bb50211c2f --- /dev/null +++ b/helm/camel-k/templates/platformcontroller.yaml @@ -0,0 +1,111 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + {{- include "camel-k.labels" . | nindent 4 }} + {{- with .Values.operator.annotations }} + annotations: + {{ toYaml . | nindent 4 }} + {{- end }} + name: camel-k-platformcontroller +spec: + replicas: 1 + selector: + matchLabels: + name: camel-k-platformcontroller + strategy: + type: Recreate + template: + metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + spec: + {{- if .Values.operator.imagePullSecrets }} + imagePullSecrets: +{{ toYaml .Values.operator.imagePullSecrets | indent 8 }} + {{- end }} + + containers: + - command: + - kamel + - platformcontroller + env: + - name: WATCH_NAMESPACE + {{- if eq .Values.operator.global "false" }} + valueFrom: + fieldRef: + fieldPath: metadata.namespace + {{- else }} + value: "" + {{- end }} + - name: LOG_LEVEL + value: {{ .Values.operator.logLevel }} + - name: OPERATOR_NAME + value: camel-k-platformcontroller + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KAMEL_OPERATOR_ID + value: {{ .Values.operator.operatorId }} + image: {{ .Values.operator.image }} + imagePullPolicy: IfNotPresent + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + name: camel-k-platformcontroller + ports: + - containerPort: 8080 + name: metrics + {{- with .Values.operator.resources }} + resources: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if .Values.operator.securityContext }} + {{- with .Values.operator.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- else }} + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + {{- end }} + serviceAccountName: camel-k-operator + {{- with .Values.operator.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/pkg/cmd/platformcontroller.go b/pkg/cmd/platformcontroller.go new file mode 100644 index 0000000000..e2815b82a2 --- /dev/null +++ b/pkg/cmd/platformcontroller.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "github.com/apache/camel-k/v2/pkg/cmd/platformcontroller" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + "github.com/spf13/cobra" +) + +const ( + platformcontrollerCommand = "platformcontroller" +) + +func newCmdPlatformController(rootCmdOptions *RootCmdOptions) (*cobra.Command, *platformcontrollerCmdOptions) { + options := platformcontrollerCmdOptions{} + + cmd := cobra.Command{ + Use: "platformcontroller", + Short: "Run the Camel K platform controller", + Long: `Run the Camel K platform controller`, + Hidden: true, + PreRunE: decode(&options, rootCmdOptions.Flags), + Run: options.run, + } + + cmd.Flags().Int32("health-port", defaultHealthPort, "The port of the health endpoint") + cmd.Flags().Int32("monitoring-port", defaultMonitoringPort, "The port of the metrics endpoint") + cmd.Flags().Bool("leader-election", true, "Use leader election") + cmd.Flags().String("leader-election-id", "", "Use the given ID as the leader election Lease name") + + return &cmd, &options +} + +type platformcontrollerCmdOptions struct { + HealthPort int32 `mapstructure:"health-port"` + MonitoringPort int32 `mapstructure:"monitoring-port"` + LeaderElection bool `mapstructure:"leader-election"` + LeaderElectionID string `mapstructure:"leader-election-id"` +} + +func (o *platformcontrollerCmdOptions) run(_ *cobra.Command, _ []string) { + + leaderElectionID := o.LeaderElectionID + if leaderElectionID == "" { + if defaults.OperatorID() != "" { + leaderElectionID = platform.GetPlatformControllerLockName(defaults.OperatorID()) + } else { + leaderElectionID = platform.PlatformControllerLockName + } + } + + platformcontroller.Run(o.HealthPort, o.MonitoringPort, o.LeaderElection, leaderElectionID) +} diff --git a/pkg/cmd/platformcontroller/platformcontroller.go b/pkg/cmd/platformcontroller/platformcontroller.go new file mode 100644 index 0000000000..27a8605adc --- /dev/null +++ b/pkg/cmd/platformcontroller/platformcontroller.go @@ -0,0 +1,244 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platformcontroller + +import ( + "context" + "flag" + "fmt" + "os" + "runtime" + "strconv" + "strings" + "time" + + "k8s.io/klog/v2" + + "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + coordination "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + logf "sigs.k8s.io/controller-runtime/pkg/log" + zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + "github.com/apache/camel-k/v2/pkg/apis" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/controller" + "github.com/apache/camel-k/v2/pkg/event" + "github.com/apache/camel-k/v2/pkg/install" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + logutil "github.com/apache/camel-k/v2/pkg/util/log" +) + +var log = logutil.Log.WithName("cmd") + +func printVersion() { + log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + log.Info(fmt.Sprintf("Camel K Platform Controller Version: %v", defaults.Version)) + log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) + log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + log.Info(fmt.Sprintf("Camel K Platform Controller ID: %v", defaults.OperatorID())) + + // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL + log.Debug("*** DEBUG level messages will be logged ***") +} + +// Run starts the Camel K platform controller. +func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { + + flag.Parse() + + // The logger instantiated here can be changed to any logger + // implementing the logr.Logger interface. This logger will + // be propagated through the whole operator, generating + // uniform and structured logs. + + // The constants specified here are zap specific + var logLevel zapcore.Level + logLevelVal, ok := os.LookupEnv("LOG_LEVEL") + if ok { + switch strings.ToLower(logLevelVal) { + case "error": + logLevel = zapcore.ErrorLevel + case "info": + logLevel = zapcore.InfoLevel + case "debug": + logLevel = zapcore.DebugLevel + default: + customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) + exitOnError(err, "Invalid log-level") + // Need to multiply by -1 to turn logr expected level into zap level + logLevel = zapcore.Level(int8(customLevel) * -1) + } + } else { + logLevel = zapcore.InfoLevel + } + + // Use and set atomic level that all following log events are compared with + // in order to evaluate if a given log level on the event is enabled. + logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { + o.Development = false + o.Level = zap.NewAtomicLevelAt(logLevel) + })) + + klog.SetLogger(log.AsLogger()) + + _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + if err != nil { + log.Error(err, "failed to set GOMAXPROCS from cgroups") + } + + printVersion() + + watchNamespace, err := getWatchNamespace() + exitOnError(err, "failed to get watch namespace") + + ctx := signals.SetupSignalHandler() + + cfg, err := config.GetConfig() + exitOnError(err, "cannot get client config") + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + bootstrapClient, err := client.NewClientWithConfig(false, cfg) + exitOnError(err, "cannot initialize client") + + // We do not rely on the event broadcaster managed by controller runtime, + // so that we can check the operator has been granted permission to create + // Events. This is required for the operator to be installable by standard + // admin users, that are not granted create permission on Events by default. + broadcaster := record.NewBroadcaster() + defer broadcaster.Shutdown() + + if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, corev1.GroupName, "events", watchNamespace, "", "create"); err != nil || !ok { + // Do not sink Events to the server as they'll be rejected + broadcaster = event.NewSinkLessBroadcaster(broadcaster) + exitOnError(err, "cannot check permissions for creating Events") + log.Info("Event broadcasting is disabled because of missing permissions to create Events") + } + + platformcontrollerNamespace := platform.GetPlatformControllerNamespace() + if platformcontrollerNamespace == "" { + // Fallback to using the watch namespace when the platform controller is not in-cluster. + // It does not support local (off-cluster) platform controller watching resources globally, + // in which case it's not possible to determine a namespace. + platformcontrollerNamespace = watchNamespace + if platformcontrollerNamespace == "" { + leaderElection = false + log.Info("unable to determine namespace for leader election") + } + } + + // Set the platform controller container image if it runs in-container + platform.PlatformControllerImage, err = getPlatformControllerImage(ctx, bootstrapClient) + exitOnError(err, "cannot get platform controller container image") + + if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, coordination.GroupName, "leases", platformcontrollerNamespace, "", "create"); err != nil || !ok { + leaderElection = false + exitOnError(err, "cannot check permissions for creating Leases") + log.Info("The platform controller is not granted permissions to create Leases") + } + + if !leaderElection { + log.Info("Leader election is disabled!") + } + + options := cache.Options{ + Namespaces: []string{watchNamespace}, + } + + mgr, err := manager.New(cfg, manager.Options{ + EventBroadcaster: broadcaster, + LeaderElection: leaderElection, + LeaderElectionNamespace: platformcontrollerNamespace, + LeaderElectionID: leaderElectionID, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaderElectionReleaseOnCancel: true, + HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), + MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)), + Cache: options, + }) + exitOnError(err, "") + + log.Info("Configuring manager") + exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") + exitOnError(apis.AddToScheme(mgr.GetScheme()), "") + ctrlClient, err := client.FromManager(mgr) + exitOnError(err, "") + exitOnError(controller.AddToPlatformManager(ctx, mgr, ctrlClient), "") + + log.Info("Installing platform manager resources") + installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute) + defer installCancel() + install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, platformcontrollerNamespace, log) + + log.Info("Starting the platform manager") + exitOnError(mgr.Start(ctx), "platform manager exited non-zero") +} + +// getWatchNamespace returns the Namespace the platform controller should be watching for changes. +func getWatchNamespace() (string, error) { + ns, found := os.LookupEnv(platform.PlatformControllerWatchNamespaceEnvVariable) + if !found { + return "", fmt.Errorf("%s must be set", platform.PlatformControllerWatchNamespaceEnvVariable) + } + return ns, nil +} + +// getPlatformControllerImage returns the image currently used by the running platform controller if present (when running out of cluster, it may be absent). +func getPlatformControllerImage(ctx context.Context, c ctrl.Reader) (string, error) { + ns := platform.GetPlatformControllerNamespace() + name := platform.GetOperatorPodName() + if ns == "" || name == "" { + return "", nil + } + + pod := corev1.Pod{} + if err := c.Get(ctx, ctrl.ObjectKey{Namespace: ns, Name: name}, &pod); err != nil && k8serrors.IsNotFound(err) { + return "", nil + } else if err != nil { + return "", err + } + if len(pod.Spec.Containers) == 0 { + return "", fmt.Errorf("no containers found in platform controller pod") + } + return pod.Spec.Containers[0].Image, nil +} + +func exitOnError(err error, msg string) { + if err != nil { + log.Error(err, msg) + os.Exit(1) + } +} diff --git a/pkg/cmd/platformcontroller_test.go b/pkg/cmd/platformcontroller_test.go new file mode 100644 index 0000000000..1037d1901a --- /dev/null +++ b/pkg/cmd/platformcontroller_test.go @@ -0,0 +1,84 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "testing" + + "github.com/apache/camel-k/v2/pkg/util/test" + "github.com/spf13/cobra" + + "github.com/stretchr/testify/assert" +) + +const cmdPlatformcontroller = "platformcontroller" + +// nolint: unparam +func initializePlatformcontrollerCmdOptions(t *testing.T) (*platformcontrollerCmdOptions, *cobra.Command, RootCmdOptions) { + t.Helper() + + options, rootCmd := kamelTestPreAddCommandInit() + platformcontrollerCmdOptions := addTestPlatformcontrollerCmd(*options, rootCmd) + kamelTestPostAddCommandInit(t, rootCmd, options) + + return platformcontrollerCmdOptions, rootCmd, *options +} + +// nolint: unparam +func addTestPlatformcontrollerCmd(options RootCmdOptions, rootCmd *cobra.Command) *platformcontrollerCmdOptions { + // add a testing version of operator Command + platformcontrollerCmd, platformcontrollerOptions := newCmdPlatformController(&options) + platformcontrollerCmd.RunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.PostRunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.Args = test.ArbitraryArgs + rootCmd.AddCommand(platformcontrollerCmd) + return platformcontrollerOptions +} + +func TestPlatformcontrollerNoFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller) + assert.Nil(t, err) + // Check default expected values + assert.Equal(t, int32(8081), operatorCmdOptions.HealthPort) + assert.Equal(t, int32(8080), operatorCmdOptions.MonitoringPort) +} + +func TestPlatformcontrollerNonExistingFlag(t *testing.T) { + _, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--nonExistingFlag") + assert.NotNil(t, err) +} + +func TestPlatformcontrollerHealthPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--health-port", "7171") + assert.Nil(t, err) + assert.Equal(t, int32(7171), operatorCmdOptions.HealthPort) +} + +func TestPlatformcontrollerMonitoringPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--monitoring-port", "7172") + assert.Nil(t, err) + assert.Equal(t, int32(7172), operatorCmdOptions.MonitoringPort) +} diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index e7b9b83131..c8290e169e 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -146,6 +146,7 @@ func addKamelSubcommands(cmd *cobra.Command, options *RootCmdOptions) { cmd.AddCommand(newCmdDescribe(options)) cmd.AddCommand(cmdOnly(newCmdRebuild(options))) cmd.AddCommand(cmdOnly(newCmdOperator(options))) + cmd.AddCommand(cmdOnly(newCmdPlatformController(options))) cmd.AddCommand(cmdOnly(newCmdBuilder(options))) cmd.AddCommand(cmdOnly(newCmdDebug(options))) cmd.AddCommand(cmdOnly(newCmdDump(options))) @@ -200,7 +201,7 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, _ []string) error { // reconciled. Hence the compatibility check is skipped for the install and the operator command. // Furthermore, there can be any incompatibilities, as the install command deploys // the operator version it's compatible with. - if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand { + if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand && cmd.Use != platformcontrollerCommand { checkAndShowCompatibilityWarning(command.Context, cmd, c, command.Namespace) } } diff --git a/pkg/controller/add_integrationplatform.go b/pkg/controller/add_integrationplatform.go index 16b88f91aa..b0348bc525 100644 --- a/pkg/controller/add_integrationplatform.go +++ b/pkg/controller/add_integrationplatform.go @@ -22,5 +22,5 @@ import ( ) func init() { - addToManager = append(addToManager, integrationplatform.Add) + addToPlatformManager = append(addToPlatformManager, integrationplatform.Add) } diff --git a/pkg/controller/platformcontroller.go b/pkg/controller/platformcontroller.go new file mode 100644 index 0000000000..661e048bc6 --- /dev/null +++ b/pkg/controller/platformcontroller.go @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/apache/camel-k/v2/pkg/client" +) + +// addToPlatformManager is a list of functions to add Controllers to the PlatformManager. +var addToPlatformManager []func(context.Context, ctrl.Manager, client.Client) error + +// AddToPlatformManager adds Controllers to the PlatformManager. +func AddToPlatformManager(ctx context.Context, manager ctrl.Manager, client client.Client) error { + for _, f := range addToPlatformManager { + if err := f(ctx, manager, client); err != nil { + return err + } + } + return nil +} diff --git a/pkg/install/common.go b/pkg/install/common.go index 78b1601496..a856063b5d 100644 --- a/pkg/install/common.go +++ b/pkg/install/common.go @@ -36,6 +36,10 @@ import ( const serviceAccountName = "camel-k-operator" +func getComponentsNames() []string { + return []string{"operator", "platformcontroller"} +} + // ResourceCustomizer can be used to inject code that changes the objects before they are created. type ResourceCustomizer func(object ctrl.Object) ctrl.Object diff --git a/pkg/install/operator.go b/pkg/install/operator.go index 4dba69bb8e..b9804a3aac 100644 --- a/pkg/install/operator.go +++ b/pkg/install/operator.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" + "github.com/apache/camel-k/v2/pkg/util" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" @@ -97,7 +98,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, customizer := func(o ctrl.Object) ctrl.Object { if cfg.CustomImage != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Image = cfg.CustomImage } } @@ -105,7 +106,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.CustomImagePullPolicy != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullPolicy(cfg.CustomImagePullPolicy) } } @@ -113,7 +114,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Tolerations != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { tolerations, err := kubernetes.NewTolerations(cfg.Tolerations) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured tolerations!") @@ -125,7 +126,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.ResourcesRequirements != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { resourceReq, err := kubernetes.NewResourceRequirements(cfg.ResourcesRequirements) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured resources requests!") @@ -139,7 +140,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.EnvVars != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { envVars, _, _, err := env.ParseEnv(cfg.EnvVars, nil) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse environment variables!") @@ -155,7 +156,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.NodeSelectors != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { nodeSelector, err := kubernetes.NewNodeSelectors(cfg.NodeSelectors) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured node selectors!") @@ -166,7 +167,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Metrics endpoint port d.Spec.Template.Spec.Containers[0].Args = append(d.Spec.Template.Spec.Containers[0].Args, fmt.Sprintf("--monitoring-port=%d", cfg.Monitoring.Port)) @@ -179,7 +180,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if cfg.Debugging.Enabled { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Command = []string{"dlv", fmt.Sprintf("--listen=:%d", cfg.Debugging.Port), "--headless=true", "--api-version=2", "exec", cfg.Debugging.Path, "--", "operator", "--leader-election=false"} @@ -196,7 +197,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Global { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Make the operator watch all namespaces envvar.SetVal(&d.Spec.Template.Spec.Containers[0].Env, "WATCH_NAMESPACE", "") } @@ -273,6 +274,11 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, return err } + // Deploy the platformcontroller + if err := installPlatformcontroller(ctx, c, cfg.Namespace, customizer, collection, force); err != nil { + return err + } + if err = installEvents(ctx, c, cfg.Namespace, customizer, collection, force, cfg.Global); err != nil { if k8serrors.IsAlreadyExists(err) { return err @@ -499,6 +505,12 @@ func installOperator(ctx context.Context, c client.Client, namespace string, cus ) } +func installPlatformcontroller(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool) error { + return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, + "/config/manager/platformcontroller-deployment.yaml", + ) +} + func installKnativeBindings(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool, global bool) error { if global { return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go index e9bebd0093..9579ae7f99 100644 --- a/pkg/platform/operator.go +++ b/pkg/platform/operator.go @@ -94,6 +94,14 @@ func GetOperatorNamespace() string { return "" } +// GetPlatformControllerNamespace returns the namespace where the current platform controller is located (if set). +func GetPlatformControllerNamespace() string { + if podNamespace, envSet := os.LookupEnv(platformControllerNamespaceEnvVariable); envSet { + return podNamespace + } + return "" +} + // GetOperatorPodName returns the pod that is running the current operator (if any). func GetOperatorPodName() string { if podName, envSet := os.LookupEnv(operatorPodNameEnvVariable); envSet { @@ -102,6 +110,14 @@ func GetOperatorPodName() string { return "" } +// GetPlatformControllerPodName returns the pod that is running the current platform controller (if any). +func GetPlatformControllerPodName() string { + if podName, envSet := os.LookupEnv(platformControllerPodNameEnvVariable); envSet { + return podName + } + return "" +} + // GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namespace. func GetOperatorLockName(operatorID string) string { return fmt.Sprintf("%s-lock", operatorID) diff --git a/pkg/platform/platformcontroller.go b/pkg/platform/platformcontroller.go new file mode 100644 index 0000000000..8ab6a47e82 --- /dev/null +++ b/pkg/platform/platformcontroller.go @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platform + +import "fmt" + +const ( + PlatformControllerWatchNamespaceEnvVariable = "WATCH_NAMESPACE" + platformControllerNamespaceEnvVariable = "NAMESPACE" + platformControllerPodNameEnvVariable = "POD_NAME" +) + +const PlatformControllerLockName = "camel-k-platform-controller-lock" + +var PlatformControllerImage string + +// GetPlatformControllerLockName returns the name of the lock lease that is electing a leader on the particular namespace. +func GetPlatformControllerLockName(platformControllerID string) string { + return fmt.Sprintf("camel-k-platform-controller-%s-lock", platformControllerID) +} diff --git a/pkg/resources/config/manager/kustomization.yaml b/pkg/resources/config/manager/kustomization.yaml index 38c67427fe..34dfd42419 100644 --- a/pkg/resources/config/manager/kustomization.yaml +++ b/pkg/resources/config/manager/kustomization.yaml @@ -20,6 +20,7 @@ kind: Kustomization resources: - operator-deployment.yaml +- platformcontroller-deployment.yaml - operator-service-account.yaml patchesStrategicMerge: diff --git a/pkg/resources/config/manager/platformcontroller-deployment.yaml b/pkg/resources/config/manager/platformcontroller-deployment.yaml new file mode 100644 index 0000000000..0311cfbd80 --- /dev/null +++ b/pkg/resources/config/manager/platformcontroller-deployment.yaml @@ -0,0 +1,89 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.3.0-SNAPSHOT" +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + name: camel-k-platformcontroller + template: + metadata: + labels: + name: camel-k-platformcontroller + camel.apache.org/component: platformcontroller + app: "camel-k" + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.3.0-SNAPSHOT" + spec: + serviceAccountName: camel-k-operator + containers: + - name: camel-k-platformcontroller + image: docker.io/apache/camel-k:2.3.0-SNAPSHOT + imagePullPolicy: IfNotPresent + command: + - kamel + - platformcontroller + ports: + - containerPort: 8080 + name: metrics + env: + - name: WATCH_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OPERATOR_NAME + value: "camel-k-platformcontroller" + - name: OPERATOR_ID + value: "camel-k" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + # NAMESPACE is always the operator namespace, independently of WATCH_NAMESPACE + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + diff --git a/pkg/resources/config/prometheus/kustomization.yaml b/pkg/resources/config/prometheus/kustomization.yaml index a14ee154ca..c92fa62b23 100644 --- a/pkg/resources/config/prometheus/kustomization.yaml +++ b/pkg/resources/config/prometheus/kustomization.yaml @@ -18,3 +18,5 @@ resources: - operator-pod-monitor.yaml - operator-prometheus-rule.yaml +- platformcontroller-pod-monitor.yaml +- platformcontroller-prometheus-rule.yaml diff --git a/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml new file mode 100644 index 0000000000..42b1ac5e9e --- /dev/null +++ b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml @@ -0,0 +1,31 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller +spec: + selector: + matchLabels: + app: "camel-k" + camel.apache.org/component: platformcontroller + podMetricsEndpoints: + - port: metrics diff --git a/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml new file mode 100644 index 0000000000..6aae49fd24 --- /dev/null +++ b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml @@ -0,0 +1,55 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + name: camel-k-platformcontroller +spec: + groups: + - name: camel-k-platformcontroller + rules: + - alert: CamelKReconciliationDuration + expr: | + ( + 1 - sum(rate(camel_k_reconciliation_duration_seconds_bucket{le="0.5"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + ) + * 100 + > 10 + for: 1m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have their duration above 0.5s. + - alert: CamelKReconciliationFailure + expr: | + sum(rate(camel_k_reconciliation_duration_seconds_count{result="Errored"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + * 100 + > 1 + for: 10m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have failed. From 09017913766418e6cced0ed6d066809434dd5676 Mon Sep 17 00:00:00 2001 From: Andrea Tarocchi Date: Tue, 27 Feb 2024 12:47:36 +0100 Subject: [PATCH 2/2] fix(#4948): Refactor common logic between operator and platformcontroller --- e2e/common/cli/config_test.go | 2 +- pkg/cmd/manager/controller.go | 193 +++++++++++++++ pkg/cmd/manager/util.go | 55 +++++ pkg/cmd/operator/operator.go | 227 +++++------------ .../platformcontroller/platformcontroller.go | 230 ++++-------------- pkg/install/optional.go | 14 +- pkg/platform/operator.go | 16 -- pkg/platform/platformcontroller.go | 21 +- .../platformcontroller-deployment.yaml | 6 +- .../bases/camel-k.clusterserviceversion.yaml | 2 +- pkg/util/log/log.go | 52 +++- 11 files changed, 434 insertions(+), 384 deletions(-) create mode 100644 pkg/cmd/manager/controller.go create mode 100644 pkg/cmd/manager/util.go diff --git a/e2e/common/cli/config_test.go b/e2e/common/cli/config_test.go index e29297616d..653d7cef10 100644 --- a/e2e/common/cli/config_test.go +++ b/e2e/common/cli/config_test.go @@ -28,6 +28,7 @@ import ( "strings" "testing" + "github.com/apache/camel-k/v2/pkg/cmd" corev1 "k8s.io/api/core/v1" . "github.com/onsi/gomega" @@ -36,7 +37,6 @@ import ( . "github.com/apache/camel-k/v2/e2e/support" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" - "github.com/apache/camel-k/v2/pkg/cmd" ) func TestKamelCLIConfig(t *testing.T) { diff --git a/pkg/cmd/manager/controller.go b/pkg/cmd/manager/controller.go new file mode 100644 index 0000000000..9a84f167e3 --- /dev/null +++ b/pkg/cmd/manager/controller.go @@ -0,0 +1,193 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "strconv" + "time" + + "github.com/apache/camel-k/v2/pkg/apis" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + logutil "github.com/apache/camel-k/v2/pkg/util/log" + coordination "k8s.io/api/coordination/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" +) + +type Manager interface { + PrintVersion() + CreateBootstrapClient(cfg *rest.Config) (client.Client, string, error) + GetControllerNamespaceAndLeaderElection(ctx context.Context, bootstrapClient client.Client, leaderElection bool) (string, bool, string, error) + GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) + CreateManager(ctx context.Context, healthPort int32, monitoringPort int32, leaderElection bool, leaderElectionID string, cfg *rest.Config, controllerNamespace string, options cache.Options) (manager.Manager, client.Client, string, error) + ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr manager.Manager) (string, error) +} + +func NewControllerCmd(controllerManager Manager, log logutil.Logger) *ControllerCmd { + return &ControllerCmd{ + controllerManager: controllerManager, + log: log, + } +} + +type ControllerCmd struct { + controllerManager Manager + log logutil.Logger +} + +func (c ControllerCmd) Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) (string, error) { + errMessage, err := setMaxprocs(c.log) + if err != nil { + return errMessage, err + } + + c.controllerManager.PrintVersion() + // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL + c.log.Debug("*** DEBUG level messages will be logged ***") + + cfg, err := config.GetConfig() + if err != nil { + return "cannot get client config", err + } + bootstrapClient, errMessage, err := c.controllerManager.CreateBootstrapClient(cfg) + if err != nil { + return errMessage, err + } + + ctx := signals.SetupSignalHandler() + + controllerNamespace, leaderElection, errMessage, err := c.controllerManager.GetControllerNamespaceAndLeaderElection(ctx, bootstrapClient, leaderElection) + if err != nil { + return errMessage, err + } + if !leaderElection { + c.log.Info("Leader election is disabled!") + } + + errMessage, err = setOperatorImage(ctx, bootstrapClient, controllerNamespace) + if err != nil { + return errMessage, err + } + + options, errMessage, err := c.controllerManager.GetManagerOptions(bootstrapClient) + if err != nil { + return errMessage, err + } + + mgr, ctrlClient, errMessage, err := c.controllerManager.CreateManager(ctx, healthPort, monitoringPort, leaderElection, leaderElectionID, cfg, controllerNamespace, options) + if err != nil { + return errMessage, err + } + + initCtx, initCancel := context.WithTimeout(ctx, 1*time.Minute) + defer initCancel() + + errMessage, err = c.controllerManager.ControllerPreStartResourcesInit(ctx, initCtx, bootstrapClient, controllerNamespace, ctrlClient, mgr) + if err != nil { + return errMessage, err + } + + c.log.Info("Starting the manager") + return "manager exited non-zero", mgr.Start(ctx) +} + +type BaseManager struct { + Log logutil.Logger + WatchNamespace string + ControllerNamespace string + AddToManager func(ctx context.Context, manager manager.Manager, client client.Client) error +} + +func (bm BaseManager) CreateBootstrapClient(cfg *rest.Config) (client.Client, string, error) { + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + bootstrapClient, err := client.NewClientWithConfig(false, cfg) + if err != nil { + return nil, "cannot initialize client", err + } + + return bootstrapClient, "", nil +} + +func (bm BaseManager) GetControllerNamespaceAndLeaderElection(ctx context.Context, bootstrapClient client.Client, leaderElection bool) (string, bool, string, error) { + controllerNamespace := bm.ControllerNamespace + if controllerNamespace == "" { + // Fallback to using the watch namespace when the operator is not in-cluster. + // It does not support local (off-cluster) operator watching resources globally, + // in which case it's not possible to determine a namespace. + controllerNamespace = bm.WatchNamespace + if controllerNamespace == "" { + leaderElection = false + bm.Log.Info("unable to determine namespace for leader election") + } + } + + if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, coordination.GroupName, "leases", controllerNamespace, "", "create"); err != nil || !ok { + leaderElection = false + if err != nil { + return controllerNamespace, leaderElection, "cannot check permissions for creating Leases", err + } + bm.Log.Info("The operator is not granted permissions to create Leases") + } + + return controllerNamespace, leaderElection, "", nil +} + +func (bm BaseManager) CreateManager(ctx context.Context, healthPort int32, monitoringPort int32, leaderElection bool, leaderElectionID string, cfg *rest.Config, controllerNamespace string, options cache.Options) (manager.Manager, client.Client, string, error) { + mgr, err := manager.New(cfg, manager.Options{ + LeaderElection: leaderElection, + LeaderElectionNamespace: controllerNamespace, + LeaderElectionID: leaderElectionID, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaderElectionReleaseOnCancel: true, + HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), + Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))}, + Cache: options, + }) + if err != nil { + return nil, nil, "", err + } + + bm.Log.Info("Configuring manager") + if err := mgr.AddHealthzCheck("health-probe", healthz.Ping); err != nil { + return nil, nil, "Unable add liveness check", err + } + if err := apis.AddToScheme(mgr.GetScheme()); err != nil { + return nil, nil, "", err + } + ctrlClient, err := client.FromManager(mgr) + if err != nil { + return nil, nil, "", err + } + if err := bm.AddToManager(ctx, mgr, ctrlClient); err != nil { + return nil, nil, "", err + } + + return mgr, ctrlClient, "", nil +} diff --git a/pkg/cmd/manager/util.go b/pkg/cmd/manager/util.go new file mode 100644 index 0000000000..1c32092acc --- /dev/null +++ b/pkg/cmd/manager/util.go @@ -0,0 +1,55 @@ +package manager + +import ( + "context" + "fmt" + "os" + + "github.com/apache/camel-k/v2/pkg/platform" + logutil "github.com/apache/camel-k/v2/pkg/util/log" + "go.uber.org/automaxprocs/maxprocs" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// setMaxprocs set go maxprocs according to the container environment. +func setMaxprocs(log logutil.Logger) (string, error) { + _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + + return "failed to set GOMAXPROCS from cgroups", err +} + +// setOperatorImage set the operator container image if it runs in-container. +func setOperatorImage(ctx context.Context, bootstrapClient ctrl.Client, controllerNamespace string) (string, error) { + var err error + platform.OperatorImage, err = getOperatorImage(controllerNamespace, platform.GetOperatorPodName(), ctx, bootstrapClient) + return "cannot get operator container image", err +} + +// getOperatorImage returns the image currently used by the running operator if present (when running out of cluster, it may be absent). +func getOperatorImage(namespace string, podName string, ctx context.Context, c ctrl.Reader) (string, error) { + if namespace == "" || podName == "" { + return "", nil + } + + pod := corev1.Pod{} + if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: podName}, &pod); err != nil && k8serrors.IsNotFound(err) { + return "", nil + } else if err != nil { + return "", err + } + if len(pod.Spec.Containers) == 0 { + return "", fmt.Errorf("no containers found in operator pod") + } + return pod.Spec.Containers[0].Image, nil +} + +// GetWatchNamespace returns the Namespace the operator should be watching for changes. +func GetWatchNamespace(watchNamespaceEnvVar string) (string, error) { + ns, found := os.LookupEnv(watchNamespaceEnvVar) + if !found { + return "", fmt.Errorf("%s must be set", watchNamespaceEnvVar) + } + return ns, nil +} diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index ad892e41eb..9d03f636dd 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -24,16 +24,9 @@ import ( "os" "reflect" "runtime" - "strconv" - "strings" - "time" - - "k8s.io/klog/v2" - - "go.uber.org/automaxprocs/maxprocs" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/apache/camel-k/v2/pkg/cmd/manager" + "github.com/apache/camel-k/v2/pkg/controller" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -41,23 +34,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/tools/leaderelection/resourcelock" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" "sigs.k8s.io/controller-runtime/pkg/cache" ctrl "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/healthz" - logf "sigs.k8s.io/controller-runtime/pkg/log" - zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - - servingv1 "knative.dev/serving/pkg/apis/serving/v1" + ctrlManager "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/apache/camel-k/v2/pkg/apis" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" - "github.com/apache/camel-k/v2/pkg/controller" "github.com/apache/camel-k/v2/pkg/controller/synthetic" "github.com/apache/camel-k/v2/pkg/install" "github.com/apache/camel-k/v2/pkg/platform" @@ -66,104 +49,59 @@ import ( logutil "github.com/apache/camel-k/v2/pkg/util/log" ) -var log = logutil.Log.WithName("cmd") - -func printVersion() { - log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) - log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) - log.Info(fmt.Sprintf("Camel K Operator Version: %v", defaults.Version)) - log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) - log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) - log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) +var log = logutil.Log.WithName("operator") - // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL - log.Debug("*** DEBUG level messages will be logged ***") +type operatorManager struct { + manager.BaseManager } // Run starts the Camel K operator. func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { - flag.Parse() - // The logger instantiated here can be changed to any logger - // implementing the logr.Logger interface. This logger will - // be propagated through the whole operator, generating - // uniform and structured logs. - - // The constants specified here are zap specific - var logLevel zapcore.Level - logLevelVal, ok := os.LookupEnv("LOG_LEVEL") - if ok { - switch strings.ToLower(logLevelVal) { - case "error": - logLevel = zapcore.ErrorLevel - case "info": - logLevel = zapcore.InfoLevel - case "debug": - logLevel = zapcore.DebugLevel - default: - customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) - exitOnError(err, "Invalid log-level") - // Need to multiply by -1 to turn logr expected level into zap level - logLevel = zapcore.Level(int8(customLevel) * -1) - } - } else { - logLevel = zapcore.InfoLevel + errMessage, err := logutil.LoggerSetup(&log) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) } - - // Use and set atomic level that all following log events are compared with - // in order to evaluate if a given log level on the event is enabled. - logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { - o.Development = false - o.Level = zap.NewAtomicLevelAt(logLevel) - })) - - klog.SetLogger(log.AsLogger()) - - _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + watchNamespace, err := manager.GetWatchNamespace(platform.OperatorWatchNamespaceEnvVariable) if err != nil { - log.Error(err, "failed to set GOMAXPROCS from cgroups") + log.Error(err, "failed to get watch namespace") + os.Exit(1) } - printVersion() - - watchNamespace, err := getWatchNamespace() - exitOnError(err, "failed to get watch namespace") - - ctx := signals.SetupSignalHandler() - - cfg, err := config.GetConfig() - exitOnError(err, "cannot get client config") - // Increase maximum burst that is used by client-side throttling, - // to prevent the requests made to apply the bundled Kamelets - // from being throttled. - cfg.QPS = 20 - cfg.Burst = 200 - bootstrapClient, err := client.NewClientWithConfig(false, cfg) - exitOnError(err, "cannot initialize client") - - operatorNamespace := platform.GetOperatorNamespace() - if operatorNamespace == "" { - // Fallback to using the watch namespace when the operator is not in-cluster. - // It does not support local (off-cluster) operator watching resources globally, - // in which case it's not possible to determine a namespace. - operatorNamespace = watchNamespace - if operatorNamespace == "" { - leaderElection = false - log.Info("unable to determine namespace for leader election") - } + om := operatorManager{ + manager.BaseManager{ + Log: log, + WatchNamespace: watchNamespace, + ControllerNamespace: platform.GetOperatorNamespace(), + AddToManager: controller.AddToManager, + }, } - // Set the operator container image if it runs in-container - platform.OperatorImage, err = getOperatorImage(ctx, bootstrapClient) - exitOnError(err, "cannot get operator container image") + controllerCmd := manager.NewControllerCmd(om, log) - if !leaderElection { - log.Info("Leader election is disabled!") + errMessage, err = controllerCmd.Run(healthPort, monitoringPort, leaderElection, leaderElectionID) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) } +} + +func (om operatorManager) PrintVersion() { + om.Log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + om.Log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + om.Log.Info(fmt.Sprintf("Camel K Operator Version: %v", defaults.Version)) + om.Log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) + om.Log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + om.Log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) +} +func (om operatorManager) GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) { hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{}) - exitOnError(err, "cannot create Integration label selector") + if err != nil { + return cache.Options{}, "cannot create Integration label selector", err + } labelsSelector := labels.NewSelector().Add(*hasIntegrationLabel) selector := cache.ByObject{ @@ -173,7 +111,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID if !platform.IsCurrentOperatorGlobal() { selector = cache.ByObject{ Label: labelsSelector, - Namespaces: getNamespacesSelector(operatorNamespace, watchNamespace), + Namespaces: getNamespacesSelector(om.ControllerNamespace, om.WatchNamespace), } } @@ -196,43 +134,30 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID } if !platform.IsCurrentOperatorGlobal() { - options.DefaultNamespaces = getNamespacesSelector(operatorNamespace, watchNamespace) + options.DefaultNamespaces = getNamespacesSelector(om.ControllerNamespace, om.WatchNamespace) } - mgr, err := manager.New(cfg, manager.Options{ - LeaderElection: leaderElection, - LeaderElectionNamespace: operatorNamespace, - LeaderElectionID: leaderElectionID, - LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaderElectionReleaseOnCancel: true, - HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), - Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))}, - Cache: options, - }) - exitOnError(err, "") - - log.Info("Configuring manager") - exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") - exitOnError(apis.AddToScheme(mgr.GetScheme()), "") - ctrlClient, err := client.FromManager(mgr) - exitOnError(err, "") - exitOnError(controller.AddToManager(ctx, mgr, ctrlClient), "") - - log.Info("Installing operator resources") - installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute) - defer installCancel() - install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log) - exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform") + return options, "", nil +} + +func (om operatorManager) ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr ctrlManager.Manager) (string, error) { + om.Log.Info("Installing operator resources") + install.TryRegisterOpenShiftConsoleDownloadLink(initCtx, bootstrapClient, log) + if err := findOrCreateIntegrationPlatform(initCtx, bootstrapClient, controllerNamespace); err != nil { + return "failed to create integration platform", err + } synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS") if synth && synthEnvVal == "true" { - log.Info("Starting the synthetic Integration manager") - exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error") + om.Log.Info("Starting the synthetic Integration manager") + if err := synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()); err != nil { + return "synthetic Integration manager error", err + } } else { - log.Info("Synthetic Integration manager not configured, skipping") + om.Log.Info("Synthetic Integration manager not configured, skipping") } - log.Info("Starting the manager") - exitOnError(mgr.Start(ctx), "manager exited non-zero") + + return "", nil } func getNamespacesSelector(operatorNamespace string, watchNamespace string) map[string]cache.Config { @@ -282,39 +207,3 @@ func findOrCreateIntegrationPlatform(ctx context.Context, c client.Client, opera return nil } - -// getWatchNamespace returns the Namespace the operator should be watching for changes. -func getWatchNamespace() (string, error) { - ns, found := os.LookupEnv(platform.OperatorWatchNamespaceEnvVariable) - if !found { - return "", fmt.Errorf("%s must be set", platform.OperatorWatchNamespaceEnvVariable) - } - return ns, nil -} - -// getOperatorImage returns the image currently used by the running operator if present (when running out of cluster, it may be absent). -func getOperatorImage(ctx context.Context, c ctrl.Reader) (string, error) { - ns := platform.GetOperatorNamespace() - name := platform.GetOperatorPodName() - if ns == "" || name == "" { - return "", nil - } - - pod := corev1.Pod{} - if err := c.Get(ctx, ctrl.ObjectKey{Namespace: ns, Name: name}, &pod); err != nil && k8serrors.IsNotFound(err) { - return "", nil - } else if err != nil { - return "", err - } - if len(pod.Spec.Containers) == 0 { - return "", fmt.Errorf("no containers found in operator pod") - } - return pod.Spec.Containers[0].Image, nil -} - -func exitOnError(err error, msg string) { - if err != nil { - log.Error(err, msg) - os.Exit(1) - } -} diff --git a/pkg/cmd/platformcontroller/platformcontroller.go b/pkg/cmd/platformcontroller/platformcontroller.go index 27a8605adc..12ac21cd25 100644 --- a/pkg/cmd/platformcontroller/platformcontroller.go +++ b/pkg/cmd/platformcontroller/platformcontroller.go @@ -23,222 +23,80 @@ import ( "fmt" "os" "runtime" - "strconv" - "strings" - "time" - "k8s.io/klog/v2" - - "go.uber.org/automaxprocs/maxprocs" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - - coordination "k8s.io/api/coordination/v1" - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/cache" - ctrl "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/healthz" - logf "sigs.k8s.io/controller-runtime/pkg/log" - zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - - "github.com/apache/camel-k/v2/pkg/apis" "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/cmd/manager" "github.com/apache/camel-k/v2/pkg/controller" - "github.com/apache/camel-k/v2/pkg/event" "github.com/apache/camel-k/v2/pkg/install" "github.com/apache/camel-k/v2/pkg/platform" "github.com/apache/camel-k/v2/pkg/util/defaults" - "github.com/apache/camel-k/v2/pkg/util/kubernetes" logutil "github.com/apache/camel-k/v2/pkg/util/log" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlManager "sigs.k8s.io/controller-runtime/pkg/manager" ) -var log = logutil.Log.WithName("cmd") +var log = logutil.Log.WithName("platformcontroller") -func printVersion() { - log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) - log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) - log.Info(fmt.Sprintf("Camel K Platform Controller Version: %v", defaults.Version)) - log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) - log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) - log.Info(fmt.Sprintf("Camel K Platform Controller ID: %v", defaults.OperatorID())) - - // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL - log.Debug("*** DEBUG level messages will be logged ***") +type platformControllerManager struct { + manager.BaseManager } // Run starts the Camel K platform controller. func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { - flag.Parse() - // The logger instantiated here can be changed to any logger - // implementing the logr.Logger interface. This logger will - // be propagated through the whole operator, generating - // uniform and structured logs. - - // The constants specified here are zap specific - var logLevel zapcore.Level - logLevelVal, ok := os.LookupEnv("LOG_LEVEL") - if ok { - switch strings.ToLower(logLevelVal) { - case "error": - logLevel = zapcore.ErrorLevel - case "info": - logLevel = zapcore.InfoLevel - case "debug": - logLevel = zapcore.DebugLevel - default: - customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) - exitOnError(err, "Invalid log-level") - // Need to multiply by -1 to turn logr expected level into zap level - logLevel = zapcore.Level(int8(customLevel) * -1) - } - } else { - logLevel = zapcore.InfoLevel - } - - // Use and set atomic level that all following log events are compared with - // in order to evaluate if a given log level on the event is enabled. - logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { - o.Development = false - o.Level = zap.NewAtomicLevelAt(logLevel) - })) - - klog.SetLogger(log.AsLogger()) - - _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + errMessage, err := logutil.LoggerSetup(&log) if err != nil { - log.Error(err, "failed to set GOMAXPROCS from cgroups") - } - - printVersion() - - watchNamespace, err := getWatchNamespace() - exitOnError(err, "failed to get watch namespace") - - ctx := signals.SetupSignalHandler() - - cfg, err := config.GetConfig() - exitOnError(err, "cannot get client config") - // Increase maximum burst that is used by client-side throttling, - // to prevent the requests made to apply the bundled Kamelets - // from being throttled. - cfg.QPS = 20 - cfg.Burst = 200 - bootstrapClient, err := client.NewClientWithConfig(false, cfg) - exitOnError(err, "cannot initialize client") - - // We do not rely on the event broadcaster managed by controller runtime, - // so that we can check the operator has been granted permission to create - // Events. This is required for the operator to be installable by standard - // admin users, that are not granted create permission on Events by default. - broadcaster := record.NewBroadcaster() - defer broadcaster.Shutdown() - - if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, corev1.GroupName, "events", watchNamespace, "", "create"); err != nil || !ok { - // Do not sink Events to the server as they'll be rejected - broadcaster = event.NewSinkLessBroadcaster(broadcaster) - exitOnError(err, "cannot check permissions for creating Events") - log.Info("Event broadcasting is disabled because of missing permissions to create Events") + log.Error(err, errMessage) + os.Exit(1) } - - platformcontrollerNamespace := platform.GetPlatformControllerNamespace() - if platformcontrollerNamespace == "" { - // Fallback to using the watch namespace when the platform controller is not in-cluster. - // It does not support local (off-cluster) platform controller watching resources globally, - // in which case it's not possible to determine a namespace. - platformcontrollerNamespace = watchNamespace - if platformcontrollerNamespace == "" { - leaderElection = false - log.Info("unable to determine namespace for leader election") - } + watchNamespace, err := manager.GetWatchNamespace(platform.PlatformControllerWatchNamespaceEnvVariable) + if err != nil { + log.Error(err, "failed to get watch namespace") + os.Exit(1) } - // Set the platform controller container image if it runs in-container - platform.PlatformControllerImage, err = getPlatformControllerImage(ctx, bootstrapClient) - exitOnError(err, "cannot get platform controller container image") - - if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, coordination.GroupName, "leases", platformcontrollerNamespace, "", "create"); err != nil || !ok { - leaderElection = false - exitOnError(err, "cannot check permissions for creating Leases") - log.Info("The platform controller is not granted permissions to create Leases") + pcm := platformControllerManager{ + manager.BaseManager{ + Log: log, + WatchNamespace: watchNamespace, + ControllerNamespace: platform.GetPlatformControllerNamespace(), + AddToManager: controller.AddToPlatformManager, + }, } - if !leaderElection { - log.Info("Leader election is disabled!") - } + controllerCmd := manager.NewControllerCmd(pcm, log) - options := cache.Options{ - Namespaces: []string{watchNamespace}, + errMessage, err = controllerCmd.Run(healthPort, monitoringPort, leaderElection, leaderElectionID) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) } - - mgr, err := manager.New(cfg, manager.Options{ - EventBroadcaster: broadcaster, - LeaderElection: leaderElection, - LeaderElectionNamespace: platformcontrollerNamespace, - LeaderElectionID: leaderElectionID, - LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaderElectionReleaseOnCancel: true, - HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), - MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)), - Cache: options, - }) - exitOnError(err, "") - - log.Info("Configuring manager") - exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") - exitOnError(apis.AddToScheme(mgr.GetScheme()), "") - ctrlClient, err := client.FromManager(mgr) - exitOnError(err, "") - exitOnError(controller.AddToPlatformManager(ctx, mgr, ctrlClient), "") - - log.Info("Installing platform manager resources") - installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute) - defer installCancel() - install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, platformcontrollerNamespace, log) - - log.Info("Starting the platform manager") - exitOnError(mgr.Start(ctx), "platform manager exited non-zero") } -// getWatchNamespace returns the Namespace the platform controller should be watching for changes. -func getWatchNamespace() (string, error) { - ns, found := os.LookupEnv(platform.PlatformControllerWatchNamespaceEnvVariable) - if !found { - return "", fmt.Errorf("%s must be set", platform.PlatformControllerWatchNamespaceEnvVariable) - } - return ns, nil +func (pcm platformControllerManager) PrintVersion() { + pcm.Log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + pcm.Log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + pcm.Log.Info(fmt.Sprintf("Camel K Platform Controller Version: %v", defaults.Version)) + pcm.Log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + pcm.Log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) } -// getPlatformControllerImage returns the image currently used by the running platform controller if present (when running out of cluster, it may be absent). -func getPlatformControllerImage(ctx context.Context, c ctrl.Reader) (string, error) { - ns := platform.GetPlatformControllerNamespace() - name := platform.GetOperatorPodName() - if ns == "" || name == "" { - return "", nil - } +func (pcm platformControllerManager) GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) { + options := cache.Options{} - pod := corev1.Pod{} - if err := c.Get(ctx, ctrl.ObjectKey{Namespace: ns, Name: name}, &pod); err != nil && k8serrors.IsNotFound(err) { - return "", nil - } else if err != nil { - return "", err - } - if len(pod.Spec.Containers) == 0 { - return "", fmt.Errorf("no containers found in platform controller pod") + if !platform.IsCurrentOperatorGlobal() { + options = cache.Options{ + DefaultNamespaces: map[string]cache.Config{pcm.WatchNamespace: {}, pcm.ControllerNamespace: {}}, + } } - return pod.Spec.Containers[0].Image, nil + + return options, "", nil } -func exitOnError(err error, msg string) { - if err != nil { - log.Error(err, msg) - os.Exit(1) - } +func (pcm platformControllerManager) ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr ctrlManager.Manager) (string, error) { + pcm.Log.Info("Installing platform controller resources") + install.OperatorStartupOptionalTools(initCtx, bootstrapClient, pcm.WatchNamespace, controllerNamespace, log) + + return "", nil } diff --git a/pkg/install/optional.go b/pkg/install/optional.go index eaa5999f74..100e652382 100644 --- a/pkg/install/optional.go +++ b/pkg/install/optional.go @@ -28,11 +28,7 @@ import ( // OperatorStartupOptionalTools tries to install optional tools at operator startup and warns if something goes wrong. func OperatorStartupOptionalTools(ctx context.Context, c client.Client, namespace string, operatorNamespace string, log logutil.Logger) { - // Try to register the OpenShift CLI Download link if possible - if err := OpenShiftConsoleDownloadLink(ctx, c); err != nil { - log.Info("Cannot install OpenShift CLI download link: skipping.") - log.Debug("Error while installing OpenShift CLI download link", "error", err) - } + TryRegisterOpenShiftConsoleDownloadLink(ctx, c, log) // Try to install Kamelet Catalog automatically var kameletNamespace string @@ -63,3 +59,11 @@ func OperatorStartupOptionalTools(ctx context.Context, c client.Client, namespac } } } + +func TryRegisterOpenShiftConsoleDownloadLink(ctx context.Context, c client.Client, log logutil.Logger) { + // Try to register the OpenShift CLI Download link if possible + if err := OpenShiftConsoleDownloadLink(ctx, c); err != nil { + log.Info("Cannot install OpenShift CLI download link: skipping.") + log.Debug("Error while installing OpenShift CLI download link", "error", err) + } +} diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go index 9579ae7f99..e9bebd0093 100644 --- a/pkg/platform/operator.go +++ b/pkg/platform/operator.go @@ -94,14 +94,6 @@ func GetOperatorNamespace() string { return "" } -// GetPlatformControllerNamespace returns the namespace where the current platform controller is located (if set). -func GetPlatformControllerNamespace() string { - if podNamespace, envSet := os.LookupEnv(platformControllerNamespaceEnvVariable); envSet { - return podNamespace - } - return "" -} - // GetOperatorPodName returns the pod that is running the current operator (if any). func GetOperatorPodName() string { if podName, envSet := os.LookupEnv(operatorPodNameEnvVariable); envSet { @@ -110,14 +102,6 @@ func GetOperatorPodName() string { return "" } -// GetPlatformControllerPodName returns the pod that is running the current platform controller (if any). -func GetPlatformControllerPodName() string { - if podName, envSet := os.LookupEnv(platformControllerPodNameEnvVariable); envSet { - return podName - } - return "" -} - // GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namespace. func GetOperatorLockName(operatorID string) string { return fmt.Sprintf("%s-lock", operatorID) diff --git a/pkg/platform/platformcontroller.go b/pkg/platform/platformcontroller.go index 8ab6a47e82..fc016a748c 100644 --- a/pkg/platform/platformcontroller.go +++ b/pkg/platform/platformcontroller.go @@ -17,7 +17,10 @@ limitations under the License. package platform -import "fmt" +import ( + "fmt" + "os" +) const ( PlatformControllerWatchNamespaceEnvVariable = "WATCH_NAMESPACE" @@ -29,6 +32,22 @@ const PlatformControllerLockName = "camel-k-platform-controller-lock" var PlatformControllerImage string +// GetPlatformControllerNamespace returns the namespace where the current platform controller is located (if set). +func GetPlatformControllerNamespace() string { + if podNamespace, envSet := os.LookupEnv(platformControllerNamespaceEnvVariable); envSet { + return podNamespace + } + return "" +} + +// GetPlatformControllerPodName returns the pod that is running the current platform controller (if any). +func GetPlatformControllerPodName() string { + if podName, envSet := os.LookupEnv(platformControllerPodNameEnvVariable); envSet { + return podName + } + return "" +} + // GetPlatformControllerLockName returns the name of the lock lease that is electing a leader on the particular namespace. func GetPlatformControllerLockName(platformControllerID string) string { return fmt.Sprintf("camel-k-platform-controller-%s-lock", platformControllerID) diff --git a/pkg/resources/config/manager/platformcontroller-deployment.yaml b/pkg/resources/config/manager/platformcontroller-deployment.yaml index 0311cfbd80..89691ffe68 100644 --- a/pkg/resources/config/manager/platformcontroller-deployment.yaml +++ b/pkg/resources/config/manager/platformcontroller-deployment.yaml @@ -25,7 +25,7 @@ metadata: name: camel-k-platformcontroller app.kubernetes.io/component: platformcontroller app.kubernetes.io/name: camel-k-platformcontroller - app.kubernetes.io/version: "2.3.0-SNAPSHOT" + app.kubernetes.io/version: "2.4.0-SNAPSHOT" spec: replicas: 1 strategy: @@ -41,12 +41,12 @@ spec: app: "camel-k" app.kubernetes.io/component: platformcontroller app.kubernetes.io/name: camel-k-platformcontroller - app.kubernetes.io/version: "2.3.0-SNAPSHOT" + app.kubernetes.io/version: "2.4.0-SNAPSHOT" spec: serviceAccountName: camel-k-operator containers: - name: camel-k-platformcontroller - image: docker.io/apache/camel-k:2.3.0-SNAPSHOT + image: docker.io/apache/camel-k:2.4.0-SNAPSHOT imagePullPolicy: IfNotPresent command: - kamel diff --git a/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml b/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml index af4f2305b3..b2a660002f 100644 --- a/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml +++ b/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml @@ -23,7 +23,7 @@ metadata: categories: Integration & Delivery certified: "false" containerImage: docker.io/apache/camel-k:2.4.0-SNAPSHOT - createdAt: 2024-04-09T18:29:41Z + createdAt: 2024-04-12T16:59:16Z description: Apache Camel K is a lightweight integration platform, born on Kubernetes, with serverless superpowers. operators.operatorframework.io/builder: operator-sdk-v1.16.0 diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index c1eb490961..32244535a9 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -19,12 +19,18 @@ package log import ( "fmt" + "os" + "strconv" + "strings" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" "github.com/go-logr/logr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/klog/v2" logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" + zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" ) // Log --. @@ -36,9 +42,51 @@ func init() { } } +// LoggerSetup setup a common logging for operators. +func LoggerSetup(log *Logger) (string, error) { + // The logger instantiated here can be changed to any logger + // implementing the logr.Logger interface. This logger will + // be propagated through the whole operator, generating + // uniform and structured logs. + + // The constants specified here are zap specific + var logLevel zapcore.Level + logLevelVal, ok := os.LookupEnv("LOG_LEVEL") + if ok { + switch strings.ToLower(logLevelVal) { + case "error": + logLevel = zapcore.ErrorLevel + case "info": + logLevel = zapcore.InfoLevel + case "debug": + logLevel = zapcore.DebugLevel + default: + customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) + if err != nil { + return "Invalid log-level", err + } + // Need to multiply by -1 to turn logr expected level into zap level + logLevel = zapcore.Level(int8(customLevel) * -1) + } + } else { + logLevel = zapcore.InfoLevel + } + + // Use and set atomic level that all following log events are compared with + // in order to evaluate if a given log level on the event is enabled. + logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { + o.Development = false + o.Level = zap.NewAtomicLevelAt(logLevel) + })) + + klog.SetLogger(log.AsLogger()) + + return "Invalid log-level", nil +} + // InitForCmd is required to avoid nil pointer exceptions from command line. func InitForCmd() { - logf.SetLogger(zap.New(zap.UseDevMode(true))) + logf.SetLogger(zapctrl.New(zapctrl.UseDevMode(true))) } // Injectable identifies objects that can receive a Logger.