diff --git a/e2e/advanced/operator_metrics_test.go b/e2e/advanced/operator_metrics_test.go index 5efa4c1638..803691efa1 100644 --- a/e2e/advanced/operator_metrics_test.go +++ b/e2e/advanced/operator_metrics_test.go @@ -72,23 +72,24 @@ func TestMetrics(t *testing.T) { Should(Equal(corev1.ConditionTrue)) g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) - pod := OperatorPod(t, ctx, ns)() - g.Expect(pod).NotTo(BeNil()) + operatorPod := OperatorPod(t, ctx, ns)() + g.Expect(operatorPod).NotTo(BeNil()) // pod.Namespace could be different from ns if using global operator - fmt.Printf("Fetching logs for operator pod %s in namespace %s", pod.Name, pod.Namespace) + fmt.Printf("Fetching logs for operator pod %s in namespace %s", operatorPod.Name, operatorPod.Namespace) logOptions := &corev1.PodLogOptions{ Container: "camel-k-operator", } - logs, err := StructuredLogs(t, ctx, pod.Namespace, pod.Name, logOptions, false) + logs, err := StructuredLogs(t, ctx, operatorPod.Namespace, operatorPod.Name, logOptions, false) g.Expect(err).To(BeNil()) g.Expect(logs).NotTo(BeEmpty()) - response, err := TestClient(t).CoreV1().RESTClient().Get(). - AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", pod.Namespace, pod.Name)).DoRaw(ctx) - g.Expect(err).To(BeNil()) - metrics, err := parsePrometheusData(response) - g.Expect(err).To(BeNil()) + platformcontrollerPod := PlatformcontrollerPod(t, ctx, ns)() + Expect(platformcontrollerPod).NotTo(BeNil()) + + operatorLogs, operatorMetrics := getLogsAndMetrics(t, ctx, operatorPod, "camel-k-operator") + + platformcontrollerLogs, platformcontrollerMetrics := getLogsAndMetrics(t, ctx, platformcontrollerPod, "camel-k-platformcontroller") it := Integration(t, ctx, ns, name)() g.Expect(it).NotTo(BeNil()) @@ -100,9 +101,9 @@ func TestMetrics(t *testing.T) { duration, err := time.ParseDuration(build.Status.Duration) g.Expect(err).To(BeNil()) - // Check it's consistent with the duration observed from logs + // Check it's consistent with the duration observed from operatorLogs var ts1, ts2 time.Time - err = NewLogWalker(&logs). + err = NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -132,8 +133,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<", 10)) // Check the duration is observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_duration_seconds")) - g.Expect(metrics["camel_k_build_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_duration_seconds")) + g.Expect(operatorMetrics["camel_k_build_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_duration_seconds"), Help: stringP("Camel K build duration"), @@ -159,8 +160,8 @@ func TestMetrics(t *testing.T) { // Check there are no failures reported in the Build status g.Expect(build.Status.Failure).To(BeNil()) - // Check no recovery attempts are reported in the logs - recoveryAttempts, err := NewLogCounter(&logs).Count(MatchFields(IgnoreExtras, Fields{ + // Check no recovery attempts are reported in the operatorLogs + recoveryAttempts, err := NewLogCounter(&operatorLogs).Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": HavePrefix("Recovery attempt"), "Kind": Equal("Build"), @@ -170,8 +171,8 @@ func TestMetrics(t *testing.T) { g.Expect(recoveryAttempts).To(BeNumerically("==", 0)) // Check no recovery attempts are observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_recovery_attempts")) - g.Expect(metrics["camel_k_build_recovery_attempts"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_recovery_attempts")) + g.Expect(operatorMetrics["camel_k_build_recovery_attempts"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_recovery_attempts"), Help: stringP("Camel K build recovery attempts"), @@ -194,8 +195,8 @@ func TestMetrics(t *testing.T) { }) t.Run("reconciliation duration metric", func(t *testing.T) { - g.Expect(metrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) - g.Expect(metrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, + g.Expect(platformcontrollerMetrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) + g.Expect(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, Fields{ "Name": EqualP("camel_k_reconciliation_duration_seconds"), "Help": EqualP("Camel K reconciliation loop duration"), @@ -203,10 +204,10 @@ func TestMetrics(t *testing.T) { }, ))) - counter := NewLogCounter(&logs) + platformcontrollerCounter := NewLogCounter(&platformcontrollerLogs) // Count the number of IntegrationPlatform reconciliations - platformReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + platformReconciliations, err := platformcontrollerCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationplatform"), "Message": Equal("Reconciling IntegrationPlatform"), "RequestNamespace": Equal(ns), @@ -215,7 +216,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - platformReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformReconciled := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -230,7 +231,7 @@ func TestMetrics(t *testing.T) { platformReconciledCount := *platformReconciled.Histogram.SampleCount g.Expect(platformReconciledCount).To(BeNumerically(">", 0)) - platformRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformRequeued := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -246,7 +247,7 @@ func TestMetrics(t *testing.T) { platformRequeuedCount = *platformRequeued.Histogram.SampleCount } - platformErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformErrored := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -267,8 +268,9 @@ func TestMetrics(t *testing.T) { g.Expect(platformReconciliations).To(BeNumerically("==", platformReconciledCount+platformRequeuedCount+platformErroredCount)) + operatorCounter := NewLogCounter(&operatorLogs) // Count the number of Integration reconciliations - integrationReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), "RequestNamespace": Equal(it.Namespace), @@ -278,7 +280,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -293,7 +295,7 @@ func TestMetrics(t *testing.T) { integrationReconciledCount := *integrationReconciled.Histogram.SampleCount g.Expect(integrationReconciledCount).To(BeNumerically(">", 0)) - integrationRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -309,7 +311,7 @@ func TestMetrics(t *testing.T) { integrationRequeuedCount = *integrationRequeued.Histogram.SampleCount } - integrationErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationErrored := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -331,7 +333,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically("==", integrationReconciledCount+integrationRequeuedCount+integrationErroredCount)) // Count the number of IntegrationKit reconciliations - integrationKitReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationKitReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationkit"), "Message": Equal("Reconciling IntegrationKit"), "RequestNamespace": Equal(it.Status.IntegrationKit.Namespace), @@ -341,7 +343,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationKitReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -357,7 +359,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciledCount).To(BeNumerically(">", 0)) // Kit can be requeued, above all when a catalog needs to be built - integrationKitRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -380,7 +382,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically("==", integrationKitReconciledCount+integrationKitRequeuedCount)) // Count the number of Build reconciliations - buildReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + buildReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("Reconciling Build"), "RequestNamespace": Equal(build.Namespace), @@ -389,7 +391,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - buildReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -404,7 +406,7 @@ func TestMetrics(t *testing.T) { buildReconciledCount := *buildReconciled.Histogram.SampleCount g.Expect(buildReconciledCount).To(BeNumerically(">", 0)) - buildRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -431,8 +433,8 @@ func TestMetrics(t *testing.T) { // The start queuing time is taken from the creation time ts1 = build.CreationTimestamp.Time - // Retrieve the end queuing time from the logs - err = NewLogWalker(&logs). + // Retrieve the end queuing time from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -448,8 +450,8 @@ func TestMetrics(t *testing.T) { durationFromLogs := ts2.Sub(ts1) // Retrieve the queuing duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_build_queue_duration_seconds")) - metric := metrics["camel_k_build_queue_duration_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_queue_duration_seconds")) + metric := operatorMetrics["camel_k_build_queue_duration_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -461,7 +463,7 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(durationFromLogs.Seconds() - duration)).To(BeNumerically("<", 1)) // Check the queuing duration is correctly observed in the corresponding metric - g.Expect(metrics["camel_k_build_queue_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics["camel_k_build_queue_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_queue_duration_seconds"), Help: stringP("Camel K build queue duration"), @@ -494,8 +496,8 @@ func TestMetrics(t *testing.T) { duration := ts2.Sub(ts1) - // Retrieve these start and end times from the logs - err = NewLogWalker(&logs). + // Retrieve these start and end times from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), @@ -519,8 +521,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<=", 1)) // Retrieve the first readiness duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - metric := metrics["camel_k_integration_first_readiness_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + metric := operatorMetrics["camel_k_integration_first_readiness_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -530,8 +532,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(*histogram.SampleSum - d)).To(BeNumerically("<=", 1)) // Check the duration is correctly observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - g.Expect(metrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + g.Expect(operatorMetrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_integration_first_readiness_seconds"), Help: stringP("Camel K integration time to first readiness"), @@ -554,6 +556,24 @@ func TestMetrics(t *testing.T) { }) } +func getLogsAndMetrics(t *testing.T, ctx context.Context, componentPod *corev1.Pod, containerName string) ([]LogEntry, map[string]*prometheus.MetricFamily) { + // componentPod.Namespace could be different from ns if using global operator + fmt.Printf("Fetching logs for component pod %s in namespace %s", componentPod.Name, componentPod.Namespace) + logOptions := &corev1.PodLogOptions{ + Container: containerName, + } + logs, err := StructuredLogs(t, ctx, componentPod.Namespace, componentPod.Name, logOptions, false) + Expect(err).To(BeNil()) + Expect(logs).NotTo(BeEmpty()) + + response, err := TestClient(t).CoreV1().RESTClient().Get(). + AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", componentPod.Namespace, componentPod.Name)).DoRaw(ctx) + Expect(err).To(BeNil()) + metrics, err := parsePrometheusData(response) + Expect(err).To(BeNil()) + return logs, metrics +} + func getMetric(family *prometheus.MetricFamily, matcher types.GomegaMatcher) *prometheus.Metric { for _, metric := range family.Metric { if match, err := matcher.Match(metric); err != nil { diff --git a/e2e/common/cli/config_test.go b/e2e/common/cli/config_test.go index e29297616d..653d7cef10 100644 --- a/e2e/common/cli/config_test.go +++ b/e2e/common/cli/config_test.go @@ -28,6 +28,7 @@ import ( "strings" "testing" + "github.com/apache/camel-k/v2/pkg/cmd" corev1 "k8s.io/api/core/v1" . "github.com/onsi/gomega" @@ -36,7 +37,6 @@ import ( . "github.com/apache/camel-k/v2/e2e/support" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" - "github.com/apache/camel-k/v2/pkg/cmd" ) func TestKamelCLIConfig(t *testing.T) { diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 3cde54a750..e91a8819bc 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -2420,6 +2420,14 @@ func ConsoleCLIDownload(t *testing.T, ctx context.Context, name string) func() * } func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "operator") +} + +func PlatformcontrollerPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "platformcontroller") +} + +func componentPod(t *testing.T, ctx context.Context, ns string, componentLabelValue string) func() *corev1.Pod { return func() *corev1.Pod { lst := corev1.PodList{ TypeMeta: metav1.TypeMeta{ @@ -2430,7 +2438,7 @@ func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Po if err := TestClient(t).List(ctx, &lst, ctrl.InNamespace(ns), ctrl.MatchingLabels{ - "camel.apache.org/component": "operator", + "camel.apache.org/component": componentLabelValue, }); err != nil { failTest(t, err) } diff --git a/helm/camel-k/templates/platformcontroller.yaml b/helm/camel-k/templates/platformcontroller.yaml new file mode 100644 index 0000000000..bb50211c2f --- /dev/null +++ b/helm/camel-k/templates/platformcontroller.yaml @@ -0,0 +1,111 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + {{- include "camel-k.labels" . | nindent 4 }} + {{- with .Values.operator.annotations }} + annotations: + {{ toYaml . | nindent 4 }} + {{- end }} + name: camel-k-platformcontroller +spec: + replicas: 1 + selector: + matchLabels: + name: camel-k-platformcontroller + strategy: + type: Recreate + template: + metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + spec: + {{- if .Values.operator.imagePullSecrets }} + imagePullSecrets: +{{ toYaml .Values.operator.imagePullSecrets | indent 8 }} + {{- end }} + + containers: + - command: + - kamel + - platformcontroller + env: + - name: WATCH_NAMESPACE + {{- if eq .Values.operator.global "false" }} + valueFrom: + fieldRef: + fieldPath: metadata.namespace + {{- else }} + value: "" + {{- end }} + - name: LOG_LEVEL + value: {{ .Values.operator.logLevel }} + - name: OPERATOR_NAME + value: camel-k-platformcontroller + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KAMEL_OPERATOR_ID + value: {{ .Values.operator.operatorId }} + image: {{ .Values.operator.image }} + imagePullPolicy: IfNotPresent + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + name: camel-k-platformcontroller + ports: + - containerPort: 8080 + name: metrics + {{- with .Values.operator.resources }} + resources: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if .Values.operator.securityContext }} + {{- with .Values.operator.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- else }} + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + {{- end }} + serviceAccountName: camel-k-operator + {{- with .Values.operator.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/pkg/cmd/manager/controller.go b/pkg/cmd/manager/controller.go new file mode 100644 index 0000000000..9a84f167e3 --- /dev/null +++ b/pkg/cmd/manager/controller.go @@ -0,0 +1,193 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "strconv" + "time" + + "github.com/apache/camel-k/v2/pkg/apis" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + logutil "github.com/apache/camel-k/v2/pkg/util/log" + coordination "k8s.io/api/coordination/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" +) + +type Manager interface { + PrintVersion() + CreateBootstrapClient(cfg *rest.Config) (client.Client, string, error) + GetControllerNamespaceAndLeaderElection(ctx context.Context, bootstrapClient client.Client, leaderElection bool) (string, bool, string, error) + GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) + CreateManager(ctx context.Context, healthPort int32, monitoringPort int32, leaderElection bool, leaderElectionID string, cfg *rest.Config, controllerNamespace string, options cache.Options) (manager.Manager, client.Client, string, error) + ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr manager.Manager) (string, error) +} + +func NewControllerCmd(controllerManager Manager, log logutil.Logger) *ControllerCmd { + return &ControllerCmd{ + controllerManager: controllerManager, + log: log, + } +} + +type ControllerCmd struct { + controllerManager Manager + log logutil.Logger +} + +func (c ControllerCmd) Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) (string, error) { + errMessage, err := setMaxprocs(c.log) + if err != nil { + return errMessage, err + } + + c.controllerManager.PrintVersion() + // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL + c.log.Debug("*** DEBUG level messages will be logged ***") + + cfg, err := config.GetConfig() + if err != nil { + return "cannot get client config", err + } + bootstrapClient, errMessage, err := c.controllerManager.CreateBootstrapClient(cfg) + if err != nil { + return errMessage, err + } + + ctx := signals.SetupSignalHandler() + + controllerNamespace, leaderElection, errMessage, err := c.controllerManager.GetControllerNamespaceAndLeaderElection(ctx, bootstrapClient, leaderElection) + if err != nil { + return errMessage, err + } + if !leaderElection { + c.log.Info("Leader election is disabled!") + } + + errMessage, err = setOperatorImage(ctx, bootstrapClient, controllerNamespace) + if err != nil { + return errMessage, err + } + + options, errMessage, err := c.controllerManager.GetManagerOptions(bootstrapClient) + if err != nil { + return errMessage, err + } + + mgr, ctrlClient, errMessage, err := c.controllerManager.CreateManager(ctx, healthPort, monitoringPort, leaderElection, leaderElectionID, cfg, controllerNamespace, options) + if err != nil { + return errMessage, err + } + + initCtx, initCancel := context.WithTimeout(ctx, 1*time.Minute) + defer initCancel() + + errMessage, err = c.controllerManager.ControllerPreStartResourcesInit(ctx, initCtx, bootstrapClient, controllerNamespace, ctrlClient, mgr) + if err != nil { + return errMessage, err + } + + c.log.Info("Starting the manager") + return "manager exited non-zero", mgr.Start(ctx) +} + +type BaseManager struct { + Log logutil.Logger + WatchNamespace string + ControllerNamespace string + AddToManager func(ctx context.Context, manager manager.Manager, client client.Client) error +} + +func (bm BaseManager) CreateBootstrapClient(cfg *rest.Config) (client.Client, string, error) { + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + bootstrapClient, err := client.NewClientWithConfig(false, cfg) + if err != nil { + return nil, "cannot initialize client", err + } + + return bootstrapClient, "", nil +} + +func (bm BaseManager) GetControllerNamespaceAndLeaderElection(ctx context.Context, bootstrapClient client.Client, leaderElection bool) (string, bool, string, error) { + controllerNamespace := bm.ControllerNamespace + if controllerNamespace == "" { + // Fallback to using the watch namespace when the operator is not in-cluster. + // It does not support local (off-cluster) operator watching resources globally, + // in which case it's not possible to determine a namespace. + controllerNamespace = bm.WatchNamespace + if controllerNamespace == "" { + leaderElection = false + bm.Log.Info("unable to determine namespace for leader election") + } + } + + if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, coordination.GroupName, "leases", controllerNamespace, "", "create"); err != nil || !ok { + leaderElection = false + if err != nil { + return controllerNamespace, leaderElection, "cannot check permissions for creating Leases", err + } + bm.Log.Info("The operator is not granted permissions to create Leases") + } + + return controllerNamespace, leaderElection, "", nil +} + +func (bm BaseManager) CreateManager(ctx context.Context, healthPort int32, monitoringPort int32, leaderElection bool, leaderElectionID string, cfg *rest.Config, controllerNamespace string, options cache.Options) (manager.Manager, client.Client, string, error) { + mgr, err := manager.New(cfg, manager.Options{ + LeaderElection: leaderElection, + LeaderElectionNamespace: controllerNamespace, + LeaderElectionID: leaderElectionID, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaderElectionReleaseOnCancel: true, + HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), + Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))}, + Cache: options, + }) + if err != nil { + return nil, nil, "", err + } + + bm.Log.Info("Configuring manager") + if err := mgr.AddHealthzCheck("health-probe", healthz.Ping); err != nil { + return nil, nil, "Unable add liveness check", err + } + if err := apis.AddToScheme(mgr.GetScheme()); err != nil { + return nil, nil, "", err + } + ctrlClient, err := client.FromManager(mgr) + if err != nil { + return nil, nil, "", err + } + if err := bm.AddToManager(ctx, mgr, ctrlClient); err != nil { + return nil, nil, "", err + } + + return mgr, ctrlClient, "", nil +} diff --git a/pkg/cmd/manager/util.go b/pkg/cmd/manager/util.go new file mode 100644 index 0000000000..1c32092acc --- /dev/null +++ b/pkg/cmd/manager/util.go @@ -0,0 +1,55 @@ +package manager + +import ( + "context" + "fmt" + "os" + + "github.com/apache/camel-k/v2/pkg/platform" + logutil "github.com/apache/camel-k/v2/pkg/util/log" + "go.uber.org/automaxprocs/maxprocs" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// setMaxprocs set go maxprocs according to the container environment. +func setMaxprocs(log logutil.Logger) (string, error) { + _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + + return "failed to set GOMAXPROCS from cgroups", err +} + +// setOperatorImage set the operator container image if it runs in-container. +func setOperatorImage(ctx context.Context, bootstrapClient ctrl.Client, controllerNamespace string) (string, error) { + var err error + platform.OperatorImage, err = getOperatorImage(controllerNamespace, platform.GetOperatorPodName(), ctx, bootstrapClient) + return "cannot get operator container image", err +} + +// getOperatorImage returns the image currently used by the running operator if present (when running out of cluster, it may be absent). +func getOperatorImage(namespace string, podName string, ctx context.Context, c ctrl.Reader) (string, error) { + if namespace == "" || podName == "" { + return "", nil + } + + pod := corev1.Pod{} + if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: podName}, &pod); err != nil && k8serrors.IsNotFound(err) { + return "", nil + } else if err != nil { + return "", err + } + if len(pod.Spec.Containers) == 0 { + return "", fmt.Errorf("no containers found in operator pod") + } + return pod.Spec.Containers[0].Image, nil +} + +// GetWatchNamespace returns the Namespace the operator should be watching for changes. +func GetWatchNamespace(watchNamespaceEnvVar string) (string, error) { + ns, found := os.LookupEnv(watchNamespaceEnvVar) + if !found { + return "", fmt.Errorf("%s must be set", watchNamespaceEnvVar) + } + return ns, nil +} diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index ad892e41eb..9d03f636dd 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -24,16 +24,9 @@ import ( "os" "reflect" "runtime" - "strconv" - "strings" - "time" - - "k8s.io/klog/v2" - - "go.uber.org/automaxprocs/maxprocs" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/apache/camel-k/v2/pkg/cmd/manager" + "github.com/apache/camel-k/v2/pkg/controller" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -41,23 +34,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/tools/leaderelection/resourcelock" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" "sigs.k8s.io/controller-runtime/pkg/cache" ctrl "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/healthz" - logf "sigs.k8s.io/controller-runtime/pkg/log" - zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - - servingv1 "knative.dev/serving/pkg/apis/serving/v1" + ctrlManager "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/apache/camel-k/v2/pkg/apis" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" - "github.com/apache/camel-k/v2/pkg/controller" "github.com/apache/camel-k/v2/pkg/controller/synthetic" "github.com/apache/camel-k/v2/pkg/install" "github.com/apache/camel-k/v2/pkg/platform" @@ -66,104 +49,59 @@ import ( logutil "github.com/apache/camel-k/v2/pkg/util/log" ) -var log = logutil.Log.WithName("cmd") - -func printVersion() { - log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) - log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) - log.Info(fmt.Sprintf("Camel K Operator Version: %v", defaults.Version)) - log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) - log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) - log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) +var log = logutil.Log.WithName("operator") - // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL - log.Debug("*** DEBUG level messages will be logged ***") +type operatorManager struct { + manager.BaseManager } // Run starts the Camel K operator. func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { - flag.Parse() - // The logger instantiated here can be changed to any logger - // implementing the logr.Logger interface. This logger will - // be propagated through the whole operator, generating - // uniform and structured logs. - - // The constants specified here are zap specific - var logLevel zapcore.Level - logLevelVal, ok := os.LookupEnv("LOG_LEVEL") - if ok { - switch strings.ToLower(logLevelVal) { - case "error": - logLevel = zapcore.ErrorLevel - case "info": - logLevel = zapcore.InfoLevel - case "debug": - logLevel = zapcore.DebugLevel - default: - customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) - exitOnError(err, "Invalid log-level") - // Need to multiply by -1 to turn logr expected level into zap level - logLevel = zapcore.Level(int8(customLevel) * -1) - } - } else { - logLevel = zapcore.InfoLevel + errMessage, err := logutil.LoggerSetup(&log) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) } - - // Use and set atomic level that all following log events are compared with - // in order to evaluate if a given log level on the event is enabled. - logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { - o.Development = false - o.Level = zap.NewAtomicLevelAt(logLevel) - })) - - klog.SetLogger(log.AsLogger()) - - _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + watchNamespace, err := manager.GetWatchNamespace(platform.OperatorWatchNamespaceEnvVariable) if err != nil { - log.Error(err, "failed to set GOMAXPROCS from cgroups") + log.Error(err, "failed to get watch namespace") + os.Exit(1) } - printVersion() - - watchNamespace, err := getWatchNamespace() - exitOnError(err, "failed to get watch namespace") - - ctx := signals.SetupSignalHandler() - - cfg, err := config.GetConfig() - exitOnError(err, "cannot get client config") - // Increase maximum burst that is used by client-side throttling, - // to prevent the requests made to apply the bundled Kamelets - // from being throttled. - cfg.QPS = 20 - cfg.Burst = 200 - bootstrapClient, err := client.NewClientWithConfig(false, cfg) - exitOnError(err, "cannot initialize client") - - operatorNamespace := platform.GetOperatorNamespace() - if operatorNamespace == "" { - // Fallback to using the watch namespace when the operator is not in-cluster. - // It does not support local (off-cluster) operator watching resources globally, - // in which case it's not possible to determine a namespace. - operatorNamespace = watchNamespace - if operatorNamespace == "" { - leaderElection = false - log.Info("unable to determine namespace for leader election") - } + om := operatorManager{ + manager.BaseManager{ + Log: log, + WatchNamespace: watchNamespace, + ControllerNamespace: platform.GetOperatorNamespace(), + AddToManager: controller.AddToManager, + }, } - // Set the operator container image if it runs in-container - platform.OperatorImage, err = getOperatorImage(ctx, bootstrapClient) - exitOnError(err, "cannot get operator container image") + controllerCmd := manager.NewControllerCmd(om, log) - if !leaderElection { - log.Info("Leader election is disabled!") + errMessage, err = controllerCmd.Run(healthPort, monitoringPort, leaderElection, leaderElectionID) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) } +} + +func (om operatorManager) PrintVersion() { + om.Log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + om.Log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + om.Log.Info(fmt.Sprintf("Camel K Operator Version: %v", defaults.Version)) + om.Log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) + om.Log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + om.Log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) +} +func (om operatorManager) GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) { hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{}) - exitOnError(err, "cannot create Integration label selector") + if err != nil { + return cache.Options{}, "cannot create Integration label selector", err + } labelsSelector := labels.NewSelector().Add(*hasIntegrationLabel) selector := cache.ByObject{ @@ -173,7 +111,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID if !platform.IsCurrentOperatorGlobal() { selector = cache.ByObject{ Label: labelsSelector, - Namespaces: getNamespacesSelector(operatorNamespace, watchNamespace), + Namespaces: getNamespacesSelector(om.ControllerNamespace, om.WatchNamespace), } } @@ -196,43 +134,30 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID } if !platform.IsCurrentOperatorGlobal() { - options.DefaultNamespaces = getNamespacesSelector(operatorNamespace, watchNamespace) + options.DefaultNamespaces = getNamespacesSelector(om.ControllerNamespace, om.WatchNamespace) } - mgr, err := manager.New(cfg, manager.Options{ - LeaderElection: leaderElection, - LeaderElectionNamespace: operatorNamespace, - LeaderElectionID: leaderElectionID, - LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaderElectionReleaseOnCancel: true, - HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), - Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))}, - Cache: options, - }) - exitOnError(err, "") - - log.Info("Configuring manager") - exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") - exitOnError(apis.AddToScheme(mgr.GetScheme()), "") - ctrlClient, err := client.FromManager(mgr) - exitOnError(err, "") - exitOnError(controller.AddToManager(ctx, mgr, ctrlClient), "") - - log.Info("Installing operator resources") - installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute) - defer installCancel() - install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log) - exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform") + return options, "", nil +} + +func (om operatorManager) ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr ctrlManager.Manager) (string, error) { + om.Log.Info("Installing operator resources") + install.TryRegisterOpenShiftConsoleDownloadLink(initCtx, bootstrapClient, log) + if err := findOrCreateIntegrationPlatform(initCtx, bootstrapClient, controllerNamespace); err != nil { + return "failed to create integration platform", err + } synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS") if synth && synthEnvVal == "true" { - log.Info("Starting the synthetic Integration manager") - exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error") + om.Log.Info("Starting the synthetic Integration manager") + if err := synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()); err != nil { + return "synthetic Integration manager error", err + } } else { - log.Info("Synthetic Integration manager not configured, skipping") + om.Log.Info("Synthetic Integration manager not configured, skipping") } - log.Info("Starting the manager") - exitOnError(mgr.Start(ctx), "manager exited non-zero") + + return "", nil } func getNamespacesSelector(operatorNamespace string, watchNamespace string) map[string]cache.Config { @@ -282,39 +207,3 @@ func findOrCreateIntegrationPlatform(ctx context.Context, c client.Client, opera return nil } - -// getWatchNamespace returns the Namespace the operator should be watching for changes. -func getWatchNamespace() (string, error) { - ns, found := os.LookupEnv(platform.OperatorWatchNamespaceEnvVariable) - if !found { - return "", fmt.Errorf("%s must be set", platform.OperatorWatchNamespaceEnvVariable) - } - return ns, nil -} - -// getOperatorImage returns the image currently used by the running operator if present (when running out of cluster, it may be absent). -func getOperatorImage(ctx context.Context, c ctrl.Reader) (string, error) { - ns := platform.GetOperatorNamespace() - name := platform.GetOperatorPodName() - if ns == "" || name == "" { - return "", nil - } - - pod := corev1.Pod{} - if err := c.Get(ctx, ctrl.ObjectKey{Namespace: ns, Name: name}, &pod); err != nil && k8serrors.IsNotFound(err) { - return "", nil - } else if err != nil { - return "", err - } - if len(pod.Spec.Containers) == 0 { - return "", fmt.Errorf("no containers found in operator pod") - } - return pod.Spec.Containers[0].Image, nil -} - -func exitOnError(err error, msg string) { - if err != nil { - log.Error(err, msg) - os.Exit(1) - } -} diff --git a/pkg/cmd/platformcontroller.go b/pkg/cmd/platformcontroller.go new file mode 100644 index 0000000000..e2815b82a2 --- /dev/null +++ b/pkg/cmd/platformcontroller.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "github.com/apache/camel-k/v2/pkg/cmd/platformcontroller" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + "github.com/spf13/cobra" +) + +const ( + platformcontrollerCommand = "platformcontroller" +) + +func newCmdPlatformController(rootCmdOptions *RootCmdOptions) (*cobra.Command, *platformcontrollerCmdOptions) { + options := platformcontrollerCmdOptions{} + + cmd := cobra.Command{ + Use: "platformcontroller", + Short: "Run the Camel K platform controller", + Long: `Run the Camel K platform controller`, + Hidden: true, + PreRunE: decode(&options, rootCmdOptions.Flags), + Run: options.run, + } + + cmd.Flags().Int32("health-port", defaultHealthPort, "The port of the health endpoint") + cmd.Flags().Int32("monitoring-port", defaultMonitoringPort, "The port of the metrics endpoint") + cmd.Flags().Bool("leader-election", true, "Use leader election") + cmd.Flags().String("leader-election-id", "", "Use the given ID as the leader election Lease name") + + return &cmd, &options +} + +type platformcontrollerCmdOptions struct { + HealthPort int32 `mapstructure:"health-port"` + MonitoringPort int32 `mapstructure:"monitoring-port"` + LeaderElection bool `mapstructure:"leader-election"` + LeaderElectionID string `mapstructure:"leader-election-id"` +} + +func (o *platformcontrollerCmdOptions) run(_ *cobra.Command, _ []string) { + + leaderElectionID := o.LeaderElectionID + if leaderElectionID == "" { + if defaults.OperatorID() != "" { + leaderElectionID = platform.GetPlatformControllerLockName(defaults.OperatorID()) + } else { + leaderElectionID = platform.PlatformControllerLockName + } + } + + platformcontroller.Run(o.HealthPort, o.MonitoringPort, o.LeaderElection, leaderElectionID) +} diff --git a/pkg/cmd/platformcontroller/platformcontroller.go b/pkg/cmd/platformcontroller/platformcontroller.go new file mode 100644 index 0000000000..12ac21cd25 --- /dev/null +++ b/pkg/cmd/platformcontroller/platformcontroller.go @@ -0,0 +1,102 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platformcontroller + +import ( + "context" + "flag" + "fmt" + "os" + "runtime" + + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/cmd/manager" + "github.com/apache/camel-k/v2/pkg/controller" + "github.com/apache/camel-k/v2/pkg/install" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + logutil "github.com/apache/camel-k/v2/pkg/util/log" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlManager "sigs.k8s.io/controller-runtime/pkg/manager" +) + +var log = logutil.Log.WithName("platformcontroller") + +type platformControllerManager struct { + manager.BaseManager +} + +// Run starts the Camel K platform controller. +func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { + flag.Parse() + + errMessage, err := logutil.LoggerSetup(&log) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) + } + watchNamespace, err := manager.GetWatchNamespace(platform.PlatformControllerWatchNamespaceEnvVariable) + if err != nil { + log.Error(err, "failed to get watch namespace") + os.Exit(1) + } + + pcm := platformControllerManager{ + manager.BaseManager{ + Log: log, + WatchNamespace: watchNamespace, + ControllerNamespace: platform.GetPlatformControllerNamespace(), + AddToManager: controller.AddToPlatformManager, + }, + } + + controllerCmd := manager.NewControllerCmd(pcm, log) + + errMessage, err = controllerCmd.Run(healthPort, monitoringPort, leaderElection, leaderElectionID) + if err != nil { + log.Error(err, errMessage) + os.Exit(1) + } +} + +func (pcm platformControllerManager) PrintVersion() { + pcm.Log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + pcm.Log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + pcm.Log.Info(fmt.Sprintf("Camel K Platform Controller Version: %v", defaults.Version)) + pcm.Log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + pcm.Log.Info(fmt.Sprintf("Camel K Operator ID: %v", defaults.OperatorID())) +} + +func (pcm platformControllerManager) GetManagerOptions(bootstrapClient client.Client) (cache.Options, string, error) { + options := cache.Options{} + + if !platform.IsCurrentOperatorGlobal() { + options = cache.Options{ + DefaultNamespaces: map[string]cache.Config{pcm.WatchNamespace: {}, pcm.ControllerNamespace: {}}, + } + } + + return options, "", nil +} + +func (pcm platformControllerManager) ControllerPreStartResourcesInit(ctx context.Context, initCtx context.Context, bootstrapClient client.Client, controllerNamespace string, ctrlClient client.Client, mgr ctrlManager.Manager) (string, error) { + pcm.Log.Info("Installing platform controller resources") + install.OperatorStartupOptionalTools(initCtx, bootstrapClient, pcm.WatchNamespace, controllerNamespace, log) + + return "", nil +} diff --git a/pkg/cmd/platformcontroller_test.go b/pkg/cmd/platformcontroller_test.go new file mode 100644 index 0000000000..1037d1901a --- /dev/null +++ b/pkg/cmd/platformcontroller_test.go @@ -0,0 +1,84 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "testing" + + "github.com/apache/camel-k/v2/pkg/util/test" + "github.com/spf13/cobra" + + "github.com/stretchr/testify/assert" +) + +const cmdPlatformcontroller = "platformcontroller" + +// nolint: unparam +func initializePlatformcontrollerCmdOptions(t *testing.T) (*platformcontrollerCmdOptions, *cobra.Command, RootCmdOptions) { + t.Helper() + + options, rootCmd := kamelTestPreAddCommandInit() + platformcontrollerCmdOptions := addTestPlatformcontrollerCmd(*options, rootCmd) + kamelTestPostAddCommandInit(t, rootCmd, options) + + return platformcontrollerCmdOptions, rootCmd, *options +} + +// nolint: unparam +func addTestPlatformcontrollerCmd(options RootCmdOptions, rootCmd *cobra.Command) *platformcontrollerCmdOptions { + // add a testing version of operator Command + platformcontrollerCmd, platformcontrollerOptions := newCmdPlatformController(&options) + platformcontrollerCmd.RunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.PostRunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.Args = test.ArbitraryArgs + rootCmd.AddCommand(platformcontrollerCmd) + return platformcontrollerOptions +} + +func TestPlatformcontrollerNoFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller) + assert.Nil(t, err) + // Check default expected values + assert.Equal(t, int32(8081), operatorCmdOptions.HealthPort) + assert.Equal(t, int32(8080), operatorCmdOptions.MonitoringPort) +} + +func TestPlatformcontrollerNonExistingFlag(t *testing.T) { + _, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--nonExistingFlag") + assert.NotNil(t, err) +} + +func TestPlatformcontrollerHealthPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--health-port", "7171") + assert.Nil(t, err) + assert.Equal(t, int32(7171), operatorCmdOptions.HealthPort) +} + +func TestPlatformcontrollerMonitoringPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--monitoring-port", "7172") + assert.Nil(t, err) + assert.Equal(t, int32(7172), operatorCmdOptions.MonitoringPort) +} diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index e7b9b83131..c8290e169e 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -146,6 +146,7 @@ func addKamelSubcommands(cmd *cobra.Command, options *RootCmdOptions) { cmd.AddCommand(newCmdDescribe(options)) cmd.AddCommand(cmdOnly(newCmdRebuild(options))) cmd.AddCommand(cmdOnly(newCmdOperator(options))) + cmd.AddCommand(cmdOnly(newCmdPlatformController(options))) cmd.AddCommand(cmdOnly(newCmdBuilder(options))) cmd.AddCommand(cmdOnly(newCmdDebug(options))) cmd.AddCommand(cmdOnly(newCmdDump(options))) @@ -200,7 +201,7 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, _ []string) error { // reconciled. Hence the compatibility check is skipped for the install and the operator command. // Furthermore, there can be any incompatibilities, as the install command deploys // the operator version it's compatible with. - if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand { + if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand && cmd.Use != platformcontrollerCommand { checkAndShowCompatibilityWarning(command.Context, cmd, c, command.Namespace) } } diff --git a/pkg/controller/add_integrationplatform.go b/pkg/controller/add_integrationplatform.go index 16b88f91aa..b0348bc525 100644 --- a/pkg/controller/add_integrationplatform.go +++ b/pkg/controller/add_integrationplatform.go @@ -22,5 +22,5 @@ import ( ) func init() { - addToManager = append(addToManager, integrationplatform.Add) + addToPlatformManager = append(addToPlatformManager, integrationplatform.Add) } diff --git a/pkg/controller/platformcontroller.go b/pkg/controller/platformcontroller.go new file mode 100644 index 0000000000..661e048bc6 --- /dev/null +++ b/pkg/controller/platformcontroller.go @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/apache/camel-k/v2/pkg/client" +) + +// addToPlatformManager is a list of functions to add Controllers to the PlatformManager. +var addToPlatformManager []func(context.Context, ctrl.Manager, client.Client) error + +// AddToPlatformManager adds Controllers to the PlatformManager. +func AddToPlatformManager(ctx context.Context, manager ctrl.Manager, client client.Client) error { + for _, f := range addToPlatformManager { + if err := f(ctx, manager, client); err != nil { + return err + } + } + return nil +} diff --git a/pkg/install/common.go b/pkg/install/common.go index 78b1601496..a856063b5d 100644 --- a/pkg/install/common.go +++ b/pkg/install/common.go @@ -36,6 +36,10 @@ import ( const serviceAccountName = "camel-k-operator" +func getComponentsNames() []string { + return []string{"operator", "platformcontroller"} +} + // ResourceCustomizer can be used to inject code that changes the objects before they are created. type ResourceCustomizer func(object ctrl.Object) ctrl.Object diff --git a/pkg/install/operator.go b/pkg/install/operator.go index 4dba69bb8e..b9804a3aac 100644 --- a/pkg/install/operator.go +++ b/pkg/install/operator.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" + "github.com/apache/camel-k/v2/pkg/util" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" @@ -97,7 +98,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, customizer := func(o ctrl.Object) ctrl.Object { if cfg.CustomImage != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Image = cfg.CustomImage } } @@ -105,7 +106,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.CustomImagePullPolicy != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullPolicy(cfg.CustomImagePullPolicy) } } @@ -113,7 +114,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Tolerations != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { tolerations, err := kubernetes.NewTolerations(cfg.Tolerations) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured tolerations!") @@ -125,7 +126,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.ResourcesRequirements != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { resourceReq, err := kubernetes.NewResourceRequirements(cfg.ResourcesRequirements) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured resources requests!") @@ -139,7 +140,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.EnvVars != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { envVars, _, _, err := env.ParseEnv(cfg.EnvVars, nil) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse environment variables!") @@ -155,7 +156,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.NodeSelectors != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { nodeSelector, err := kubernetes.NewNodeSelectors(cfg.NodeSelectors) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured node selectors!") @@ -166,7 +167,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Metrics endpoint port d.Spec.Template.Spec.Containers[0].Args = append(d.Spec.Template.Spec.Containers[0].Args, fmt.Sprintf("--monitoring-port=%d", cfg.Monitoring.Port)) @@ -179,7 +180,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if cfg.Debugging.Enabled { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Command = []string{"dlv", fmt.Sprintf("--listen=:%d", cfg.Debugging.Port), "--headless=true", "--api-version=2", "exec", cfg.Debugging.Path, "--", "operator", "--leader-election=false"} @@ -196,7 +197,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Global { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Make the operator watch all namespaces envvar.SetVal(&d.Spec.Template.Spec.Containers[0].Env, "WATCH_NAMESPACE", "") } @@ -273,6 +274,11 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, return err } + // Deploy the platformcontroller + if err := installPlatformcontroller(ctx, c, cfg.Namespace, customizer, collection, force); err != nil { + return err + } + if err = installEvents(ctx, c, cfg.Namespace, customizer, collection, force, cfg.Global); err != nil { if k8serrors.IsAlreadyExists(err) { return err @@ -499,6 +505,12 @@ func installOperator(ctx context.Context, c client.Client, namespace string, cus ) } +func installPlatformcontroller(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool) error { + return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, + "/config/manager/platformcontroller-deployment.yaml", + ) +} + func installKnativeBindings(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool, global bool) error { if global { return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, diff --git a/pkg/install/optional.go b/pkg/install/optional.go index eaa5999f74..100e652382 100644 --- a/pkg/install/optional.go +++ b/pkg/install/optional.go @@ -28,11 +28,7 @@ import ( // OperatorStartupOptionalTools tries to install optional tools at operator startup and warns if something goes wrong. func OperatorStartupOptionalTools(ctx context.Context, c client.Client, namespace string, operatorNamespace string, log logutil.Logger) { - // Try to register the OpenShift CLI Download link if possible - if err := OpenShiftConsoleDownloadLink(ctx, c); err != nil { - log.Info("Cannot install OpenShift CLI download link: skipping.") - log.Debug("Error while installing OpenShift CLI download link", "error", err) - } + TryRegisterOpenShiftConsoleDownloadLink(ctx, c, log) // Try to install Kamelet Catalog automatically var kameletNamespace string @@ -63,3 +59,11 @@ func OperatorStartupOptionalTools(ctx context.Context, c client.Client, namespac } } } + +func TryRegisterOpenShiftConsoleDownloadLink(ctx context.Context, c client.Client, log logutil.Logger) { + // Try to register the OpenShift CLI Download link if possible + if err := OpenShiftConsoleDownloadLink(ctx, c); err != nil { + log.Info("Cannot install OpenShift CLI download link: skipping.") + log.Debug("Error while installing OpenShift CLI download link", "error", err) + } +} diff --git a/pkg/platform/platformcontroller.go b/pkg/platform/platformcontroller.go new file mode 100644 index 0000000000..fc016a748c --- /dev/null +++ b/pkg/platform/platformcontroller.go @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platform + +import ( + "fmt" + "os" +) + +const ( + PlatformControllerWatchNamespaceEnvVariable = "WATCH_NAMESPACE" + platformControllerNamespaceEnvVariable = "NAMESPACE" + platformControllerPodNameEnvVariable = "POD_NAME" +) + +const PlatformControllerLockName = "camel-k-platform-controller-lock" + +var PlatformControllerImage string + +// GetPlatformControllerNamespace returns the namespace where the current platform controller is located (if set). +func GetPlatformControllerNamespace() string { + if podNamespace, envSet := os.LookupEnv(platformControllerNamespaceEnvVariable); envSet { + return podNamespace + } + return "" +} + +// GetPlatformControllerPodName returns the pod that is running the current platform controller (if any). +func GetPlatformControllerPodName() string { + if podName, envSet := os.LookupEnv(platformControllerPodNameEnvVariable); envSet { + return podName + } + return "" +} + +// GetPlatformControllerLockName returns the name of the lock lease that is electing a leader on the particular namespace. +func GetPlatformControllerLockName(platformControllerID string) string { + return fmt.Sprintf("camel-k-platform-controller-%s-lock", platformControllerID) +} diff --git a/pkg/resources/config/manager/kustomization.yaml b/pkg/resources/config/manager/kustomization.yaml index 38c67427fe..34dfd42419 100644 --- a/pkg/resources/config/manager/kustomization.yaml +++ b/pkg/resources/config/manager/kustomization.yaml @@ -20,6 +20,7 @@ kind: Kustomization resources: - operator-deployment.yaml +- platformcontroller-deployment.yaml - operator-service-account.yaml patchesStrategicMerge: diff --git a/pkg/resources/config/manager/platformcontroller-deployment.yaml b/pkg/resources/config/manager/platformcontroller-deployment.yaml new file mode 100644 index 0000000000..89691ffe68 --- /dev/null +++ b/pkg/resources/config/manager/platformcontroller-deployment.yaml @@ -0,0 +1,89 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.4.0-SNAPSHOT" +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + name: camel-k-platformcontroller + template: + metadata: + labels: + name: camel-k-platformcontroller + camel.apache.org/component: platformcontroller + app: "camel-k" + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.4.0-SNAPSHOT" + spec: + serviceAccountName: camel-k-operator + containers: + - name: camel-k-platformcontroller + image: docker.io/apache/camel-k:2.4.0-SNAPSHOT + imagePullPolicy: IfNotPresent + command: + - kamel + - platformcontroller + ports: + - containerPort: 8080 + name: metrics + env: + - name: WATCH_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OPERATOR_NAME + value: "camel-k-platformcontroller" + - name: OPERATOR_ID + value: "camel-k" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + # NAMESPACE is always the operator namespace, independently of WATCH_NAMESPACE + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + diff --git a/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml b/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml index af4f2305b3..b2a660002f 100644 --- a/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml +++ b/pkg/resources/config/manifests/bases/camel-k.clusterserviceversion.yaml @@ -23,7 +23,7 @@ metadata: categories: Integration & Delivery certified: "false" containerImage: docker.io/apache/camel-k:2.4.0-SNAPSHOT - createdAt: 2024-04-09T18:29:41Z + createdAt: 2024-04-12T16:59:16Z description: Apache Camel K is a lightweight integration platform, born on Kubernetes, with serverless superpowers. operators.operatorframework.io/builder: operator-sdk-v1.16.0 diff --git a/pkg/resources/config/prometheus/kustomization.yaml b/pkg/resources/config/prometheus/kustomization.yaml index a14ee154ca..c92fa62b23 100644 --- a/pkg/resources/config/prometheus/kustomization.yaml +++ b/pkg/resources/config/prometheus/kustomization.yaml @@ -18,3 +18,5 @@ resources: - operator-pod-monitor.yaml - operator-prometheus-rule.yaml +- platformcontroller-pod-monitor.yaml +- platformcontroller-prometheus-rule.yaml diff --git a/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml new file mode 100644 index 0000000000..42b1ac5e9e --- /dev/null +++ b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml @@ -0,0 +1,31 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller +spec: + selector: + matchLabels: + app: "camel-k" + camel.apache.org/component: platformcontroller + podMetricsEndpoints: + - port: metrics diff --git a/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml new file mode 100644 index 0000000000..6aae49fd24 --- /dev/null +++ b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml @@ -0,0 +1,55 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + name: camel-k-platformcontroller +spec: + groups: + - name: camel-k-platformcontroller + rules: + - alert: CamelKReconciliationDuration + expr: | + ( + 1 - sum(rate(camel_k_reconciliation_duration_seconds_bucket{le="0.5"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + ) + * 100 + > 10 + for: 1m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have their duration above 0.5s. + - alert: CamelKReconciliationFailure + expr: | + sum(rate(camel_k_reconciliation_duration_seconds_count{result="Errored"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + * 100 + > 1 + for: 10m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have failed. diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index c1eb490961..32244535a9 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -19,12 +19,18 @@ package log import ( "fmt" + "os" + "strconv" + "strings" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" "github.com/go-logr/logr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/klog/v2" logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" + zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" ) // Log --. @@ -36,9 +42,51 @@ func init() { } } +// LoggerSetup setup a common logging for operators. +func LoggerSetup(log *Logger) (string, error) { + // The logger instantiated here can be changed to any logger + // implementing the logr.Logger interface. This logger will + // be propagated through the whole operator, generating + // uniform and structured logs. + + // The constants specified here are zap specific + var logLevel zapcore.Level + logLevelVal, ok := os.LookupEnv("LOG_LEVEL") + if ok { + switch strings.ToLower(logLevelVal) { + case "error": + logLevel = zapcore.ErrorLevel + case "info": + logLevel = zapcore.InfoLevel + case "debug": + logLevel = zapcore.DebugLevel + default: + customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) + if err != nil { + return "Invalid log-level", err + } + // Need to multiply by -1 to turn logr expected level into zap level + logLevel = zapcore.Level(int8(customLevel) * -1) + } + } else { + logLevel = zapcore.InfoLevel + } + + // Use and set atomic level that all following log events are compared with + // in order to evaluate if a given log level on the event is enabled. + logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { + o.Development = false + o.Level = zap.NewAtomicLevelAt(logLevel) + })) + + klog.SetLogger(log.AsLogger()) + + return "Invalid log-level", nil +} + // InitForCmd is required to avoid nil pointer exceptions from command line. func InitForCmd() { - logf.SetLogger(zap.New(zap.UseDevMode(true))) + logf.SetLogger(zapctrl.New(zapctrl.UseDevMode(true))) } // Injectable identifies objects that can receive a Logger.