diff --git a/apis/eventing/v1alpha1/cloudeventsource_types.go b/apis/eventing/v1alpha1/cloudeventsource_types.go index f1436e23f94..c872fc6acd4 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_types.go +++ b/apis/eventing/v1alpha1/cloudeventsource_types.go @@ -18,19 +18,17 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // +kubebuilder:object:generate=false type CloudEventSourceInterface interface { - GetKind() string - GetName() string - GetNamespace() string - GetSpec() CloudEventSourceSpec - GetStatus() CloudEventSourceStatus - GetGeneration() int64 + client.Object GenerateIdentifier() string + GetSpec() *CloudEventSourceSpec + GetStatus() *CloudEventSourceStatus } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -129,28 +127,12 @@ func init() { SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}, &ClusterCloudEventSource{}, &ClusterCloudEventSourceList{}) } -func (ces *CloudEventSource) GetKind() string { - return ces.Kind +func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec { + return &ces.Spec } -func (ces *CloudEventSource) GetName() string { - return ces.Name -} - -func (ces *CloudEventSource) GetNamespace() string { - return ces.Namespace -} - -func (ces *CloudEventSource) GetSpec() CloudEventSourceSpec { - return ces.Spec -} - -func (ces *CloudEventSource) GetStatus() CloudEventSourceStatus { - return *ces.Status.DeepCopy() -} - -func (ces *CloudEventSource) GetGeneration() int64 { - return ces.Generation +func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus { + return &ces.Status } // GenerateIdentifier returns identifier for the object in for "kind.namespace.name" @@ -158,28 +140,12 @@ func (ces *CloudEventSource) GenerateIdentifier() string { return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name) } -func (cces *ClusterCloudEventSource) GetKind() string { - return cces.Kind -} - -func (cces *ClusterCloudEventSource) GetName() string { - return cces.Name -} - -func (cces *ClusterCloudEventSource) GetNamespace() string { - return cces.Namespace -} - -func (cces *ClusterCloudEventSource) GetSpec() CloudEventSourceSpec { - return cces.Spec -} - -func (cces *ClusterCloudEventSource) GetStatus() CloudEventSourceStatus { - return *cces.Status.DeepCopy() +func (cces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec { + return &cces.Spec } -func (cces *ClusterCloudEventSource) GetGeneration() int64 { - return cces.Generation +func (cces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus { + return &cces.Status } // GenerateIdentifier returns identifier for the object in for "kind.cluster-scoped.name" diff --git a/controllers/eventing/cloudeventsource_controller.go b/controllers/eventing/cloudeventsource_controller.go index 965d1554a47..5bb78f5e9ca 100644 --- a/controllers/eventing/cloudeventsource_controller.go +++ b/controllers/eventing/cloudeventsource_controller.go @@ -13,16 +13,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +// +//nolint:dupl package eventing import ( "context" "sync" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +30,6 @@ import ( eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/metricscollector" - kedastatus "github.com/kedacore/keda/v2/pkg/status" "github.com/kedacore/keda/v2/pkg/util" ) @@ -60,57 +57,11 @@ func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) // +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*" // Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). -// -//nolint:dupl + func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := log.FromContext(ctx) - // Fetch the EventSource instance cloudEventSource := &eventingv1alpha1.CloudEventSource{} - err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource) - if err != nil { - if errors.IsNotFound(err) { - // Request eventSource not found, could have been deleted after reconcile request. - // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. - // Return and don't requeue - return ctrl.Result{}, nil - } - // Error reading the object - requeue the request. - reqLogger.Error(err, "failed to get EventSource") - return ctrl.Result{}, err - } - - reqLogger.Info("Reconciling CloudEventSource") - - if !cloudEventSource.GetDeletionTimestamp().IsZero() { - return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String()) - } - r.updatePromMetrics(cloudEventSource, req.NamespacedName.String()) - - // ensure finalizer is set on this CR - if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil { - return ctrl.Result{}, err - } - - // ensure Status Conditions are initialized - if !cloudEventSource.Status.Conditions.AreInitialized() { - conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() - if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, cloudEventSource, conditions); err != nil { - return ctrl.Result{}, err - } - } - - eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, cloudEventSource) - if err != nil { - return ctrl.Result{}, err - } - - if eventSourceChanged { - if r.requestEventLoop(ctx, reqLogger, cloudEventSource) != nil { - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil + return Reconcile(ctx, reqLogger, r, req, cloudEventSource) } // SetupWithManager sets up the controller with the Manager. @@ -121,61 +72,19 @@ func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -// requestEventLoop tries to start EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error { - logger.V(1).Info("Notify eventHandler of an update in eventSource") - - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { - return err - } - - // store CloudEventSource's current Generation - r.cloudEventSourceGenerations.Store(key, eventSource.Generation) - - return nil +func (r *CloudEventSourceReconciler) GetClient() client.Client { + return r.Client } -// stopEventLoop stops EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.CloudEventSource)); err != nil { - return err - } - // delete CloudEventSource's current Generation - r.cloudEventSourceGenerations.Delete(key) - return nil +func (r *CloudEventSourceReconciler) GetEventEmitter() eventemitter.EventHandler { + return r.eventEmitter } -// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed -func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) (bool, error) { - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return true, err - } - - value, loaded := r.cloudEventSourceGenerations.Load(key) - if loaded { - generation := value.(int64) - if generation == eventSource.Generation { - return false, nil - } - } - return true, nil +func (r *CloudEventSourceReconciler) GetCloudEventSourceGeneration() *sync.Map { + return r.cloudEventSourceGenerations } -func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) { +func (r *CloudEventSourceReconciler) UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) { r.eventSourcePromMetricsLock.Lock() defer r.eventSourcePromMetricsLock.Unlock() @@ -183,8 +92,8 @@ func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1al metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace()) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace() } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects diff --git a/controllers/eventing/clustercloudeventsource_controller.go b/controllers/eventing/clustercloudeventsource_controller.go index dae9beb0a44..0ccb26f811a 100644 --- a/controllers/eventing/clustercloudeventsource_controller.go +++ b/controllers/eventing/clustercloudeventsource_controller.go @@ -13,16 +13,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +// +//nolint:dupl package eventing import ( "context" "sync" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +30,6 @@ import ( eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/metricscollector" - kedastatus "github.com/kedacore/keda/v2/pkg/status" "github.com/kedacore/keda/v2/pkg/util" ) @@ -60,58 +57,10 @@ func NewClusterCloudEventSourceReconciler(c client.Client, e eventemitter.EventH // +kubebuilder:rbac:groups=eventing.keda.sh,resources=clustercloudeventsources;clustercloudeventsources/status,verbs="*" // Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). -// -//nolint:dupl func (r *ClusterCloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := log.FromContext(ctx) - - // Fetch the EventSource instance - clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} - err := r.Client.Get(ctx, req.NamespacedName, clustercloudEventSource) - if err != nil { - if errors.IsNotFound(err) { - // Request eventSource not found, could have been deleted after reconcile request. - // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. - // Return and don't requeue - return ctrl.Result{}, nil - } - // Error reading the object - requeue the request. - reqLogger.Error(err, "failed to get EventSource") - return ctrl.Result{}, err - } - - reqLogger.Info("Reconciling ClusterCloudEventSource") - - if !clustercloudEventSource.GetDeletionTimestamp().IsZero() { - return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, clustercloudEventSource, req.NamespacedName.String()) - } - r.updatePromMetrics(clustercloudEventSource, req.NamespacedName.String()) - - // ensure finalizer is set on this CR - if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, clustercloudEventSource); err != nil { - return ctrl.Result{}, err - } - - // ensure Status Conditions are initialized - if !clustercloudEventSource.Status.Conditions.AreInitialized() { - conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() - if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, clustercloudEventSource, conditions); err != nil { - return ctrl.Result{}, err - } - } - - eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, clustercloudEventSource) - if err != nil { - return ctrl.Result{}, err - } - - if eventSourceChanged { - if r.requestEventLoop(ctx, reqLogger, clustercloudEventSource) != nil { - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil + cloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} + return Reconcile(ctx, reqLogger, r, req, cloudEventSource) } // SetupWithManager sets up the controller with the Manager. @@ -122,61 +71,19 @@ func (r *ClusterCloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) e Complete(r) } -// requestEventLoop tries to start EventLoop handler for the respective EventSource -func (r *ClusterCloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error { - logger.V(1).Info("Notify eventHandler of an update in eventSource", "name", eventSource.GetName()) - - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { - return err - } - - // store ClusterCloudEventSource's current Generation - r.clusterCloudEventSourceGenerations.Store(key, eventSource.GetGeneration()) - - return nil +func (r *ClusterCloudEventSourceReconciler) GetClient() client.Client { + return r.Client } -// stopEventLoop stops EventLoop handler for the respective EventSource -func (r *ClusterCloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.ClusterCloudEventSource)); err != nil { - return err - } - // delete CloudEventSource's current Generation - r.clusterCloudEventSourceGenerations.Delete(key) - return nil +func (r *ClusterCloudEventSourceReconciler) GetEventEmitter() eventemitter.EventHandler { + return r.eventEmitter } -// eventSourceGenerationChanged returns true if ClusterCloudEventSource's Generation was changed, ie. EventSource.Spec was changed -func (r *ClusterCloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.ClusterCloudEventSource) (bool, error) { - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return true, err - } - - value, loaded := r.clusterCloudEventSourceGenerations.Load(key) - if loaded { - generation := value.(int64) - if generation == eventSource.Generation { - return false, nil - } - } - return true, nil +func (r *ClusterCloudEventSourceReconciler) GetCloudEventSourceGeneration() *sync.Map { + return r.clusterCloudEventSourceGenerations } -func (r *ClusterCloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.ClusterCloudEventSource, namespacedName string) { +func (r *ClusterCloudEventSourceReconciler) UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) { r.eventSourcePromMetricsLock.Lock() defer r.eventSourcePromMetricsLock.Unlock() @@ -184,8 +91,8 @@ func (r *ClusterCloudEventSourceReconciler) updatePromMetrics(eventSource *event metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace()) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace() } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects diff --git a/controllers/eventing/finalizer.go b/controllers/eventing/finalizer.go index 60526789129..5c8a8f75297 100644 --- a/controllers/eventing/finalizer.go +++ b/controllers/eventing/finalizer.go @@ -30,19 +30,13 @@ const ( cloudEventSourceFinalizer = "finalizer.keda.sh" ) -type cloudEventSourceResourceReconciler interface { - client.Client - UpdatePromMetricsOnDelete(string) - StopEventLoop(logger logr.Logger, obj client.Object) error -} - -func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object) error { +func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, cloudEventSourceResource client.Object) error { if !util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { logger.Info(fmt.Sprintf("Adding Finalizer for the %s", cloudEventSourceResource.GetName())) cloudEventSourceResource.SetFinalizers(append(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) // Update CR - err := reconciler.Update(ctx, cloudEventSourceResource) + err := r.GetClient().Update(ctx, cloudEventSourceResource) if err != nil { logger.Error(err, fmt.Sprintf("Failed to update %s with a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) return err @@ -51,18 +45,18 @@ func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Lo return nil } -func FinalizeCloudEventSourceResource(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object, namespacedName string) error { +func FinalizeCloudEventSourceResource(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, cloudEventSourceResource client.Object, namespacedName string) error { if util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { - if err := reconciler.StopEventLoop(logger, cloudEventSourceResource); err != nil { + if err := StopEventLoop(logger, r, cloudEventSourceResource); err != nil { return err } cloudEventSourceResource.SetFinalizers(util.Remove(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) - if err := reconciler.Update(ctx, cloudEventSourceResource); err != nil { + if err := r.GetClient().Update(ctx, cloudEventSourceResource); err != nil { logger.Error(err, fmt.Sprintf("Failed to update %s after removing a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) return err } - reconciler.UpdatePromMetricsOnDelete(namespacedName) + r.UpdatePromMetricsOnDelete(namespacedName) } logger.Info(fmt.Sprintf("Successfully finalized %s", cloudEventSourceResource.GetName())) diff --git a/controllers/eventing/reconciler.go b/controllers/eventing/reconciler.go new file mode 100644 index 00000000000..6461ee16a11 --- /dev/null +++ b/controllers/eventing/reconciler.go @@ -0,0 +1,141 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventing + +import ( + "context" + "sync" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventemitter" + kedastatus "github.com/kedacore/keda/v2/pkg/status" +) + +type cloudEventSourceReconcilerInterface interface { + GetClient() client.Client + GetEventEmitter() eventemitter.EventHandler + GetCloudEventSourceGeneration() *sync.Map + UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) + UpdatePromMetricsOnDelete(namespacedName string) +} + +func Reconcile(ctx context.Context, reqLogger logr.Logger, r cloudEventSourceReconcilerInterface, req ctrl.Request, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) (ctrl.Result, error) { + err := r.GetClient().Get(ctx, req.NamespacedName, cloudEventSource) + if err != nil { + if errors.IsNotFound(err) { + // Request eventSource not found, could have been deleted after reconcile request. + // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. + // Return and don't requeue + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + reqLogger.Error(err, "failed to get EventSource") + return ctrl.Result{}, err + } + + reqLogger.Info("Reconciling CloudEventSource") + + if !cloudEventSource.GetDeletionTimestamp().IsZero() { + return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String()) + } + r.UpdatePromMetrics(cloudEventSource, req.NamespacedName.String()) + + // ensure finalizer is set on this CR + if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil { + return ctrl.Result{}, err + } + + // ensure Status Conditions are initialized + if !cloudEventSource.GetStatus().Conditions.AreInitialized() { + conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() + if err := kedastatus.SetStatusConditions(ctx, r.GetClient(), reqLogger, cloudEventSource, conditions); err != nil { + return ctrl.Result{}, err + } + } + + eventSourceChanged, err := CloudEventSourceGenerationChanged(reqLogger, r, cloudEventSource) + if err != nil { + return ctrl.Result{}, err + } + + if eventSourceChanged { + if RequestEventLoop(ctx, reqLogger, r, cloudEventSource) != nil { + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +// requestEventLoop tries to start EventLoop handler for the respective EventSource +func RequestEventLoop(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, eventSourceI eventingv1alpha1.CloudEventSourceInterface) error { + logger.V(1).Info("Notify eventHandler of an update in eventSource") + + key, err := cache.MetaNamespaceKeyFunc(eventSourceI) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err = r.GetEventEmitter().HandleCloudEventSource(ctx, eventSourceI); err != nil { + return err + } + + // store CloudEventSource's current Generation + r.GetCloudEventSourceGeneration().Store(key, eventSourceI.GetGeneration()) + return nil +} + +// stopEventLoop stops EventLoop handler for the respective EventSource +func StopEventLoop(logger logr.Logger, r cloudEventSourceReconcilerInterface, obj client.Object) error { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err := r.GetEventEmitter().DeleteCloudEventSource(obj.(eventingv1alpha1.CloudEventSourceInterface)); err != nil { + return err + } + // delete CloudEventSource's current Generation + r.GetCloudEventSourceGeneration().Delete(key) + return nil +} + +// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed +func CloudEventSourceGenerationChanged(logger logr.Logger, r cloudEventSourceReconcilerInterface, eventSourceI eventingv1alpha1.CloudEventSourceInterface) (bool, error) { + key, err := cache.MetaNamespaceKeyFunc(eventSourceI) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return true, err + } + + value, loaded := r.GetCloudEventSourceGeneration().Load(key) + if loaded { + generation := value.(int64) + if generation == eventSourceI.GetGeneration() { + return false, nil + } + } + return true, nil +} diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 047683dbb63..3ad6e02898c 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -113,7 +113,7 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste } func initializeLogger(cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceEmitterName string) logr.Logger { - return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSourceI.GetKind(), "namespace", cloudEventSourceI.GetNamespace(), "name", cloudEventSourceI.GetName()) + return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSourceI.GetObjectKind(), "namespace", cloudEventSourceI.GetNamespace(), "name", cloudEventSourceI.GetName()) } // HandleCloudEventSource will create CloudEventSource handlers that defined in spec and start an event loop once handlers @@ -147,15 +147,8 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou eventingMutex := &sync.Mutex{} // passing deep copy of CloudEventSource to the eventLoop go routines, it's a precaution to not have global objects shared between threads - switch obj := cloudEventSourceI.(type) { - case *eventingv1alpha1.CloudEventSource: - go e.startEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) - case *eventingv1alpha1.ClusterCloudEventSource: - go e.startClusterEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) - default: - return nil - } - + e.log.V(1).Info("Start CloudEventSource loop.") + go e.startEventLoop(cancelCtx, cloudEventSourceI.DeepCopyObject().(eventingv1alpha1.CloudEventSourceInterface), eventingMutex) return nil } @@ -283,35 +276,18 @@ func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource eventingv1alph return false } -func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Start CloudEventSource loop.", "name", cloudEventSource.GetName()) +func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Start CloudEventSource loop.", "name", cloudEventSourceI.GetName()) for { select { case eventData := <-e.cloudEventProcessingChan: - e.log.V(1).Info("Consuming events from CloudEventSource.", "name", cloudEventSource.GetName()) + e.log.V(1).Info("Consuming events from CloudEventSource.", "name", cloudEventSourceI.GetName()) e.emitEventByHandler(eventData) - e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + e.checkEventHandlers(ctx, cloudEventSourceI, cloudEventSourceMutex) + metricscollector.RecordCloudEventQueueStatus(cloudEventSourceI.GetNamespace(), len(e.cloudEventProcessingChan)) case <-ctx.Done(): e.log.V(1).Info("CloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) - return - } - } -} - -func (e *EventEmitter) startClusterEventLoop(ctx context.Context, clusterCloudEventSource *eventingv1alpha1.ClusterCloudEventSource, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Start CloudEventSource loop.", "name", clusterCloudEventSource.GetName()) - for { - select { - case eventData := <-e.cloudEventProcessingChan: - e.log.V(1).Info("Consuming events from ClusterCloudEventSource.", "name", clusterCloudEventSource.GetName()) - e.emitEventByHandler(eventData) - e.checkEventHandlers(ctx, clusterCloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) - case <-ctx.Done(): - e.log.V(1).Info("ClusterCloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSourceI.GetNamespace(), len(e.cloudEventProcessingChan)) return } } @@ -319,35 +295,22 @@ func (e *EventEmitter) startClusterEventLoop(ctx context.Context, clusterCloudEv // checkEventHandlers will check each eventhandler active status func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Checking event handlers status.", "name", cloudEventSourceI.GetName()) + e.log.V(1).Info("Checking event handlers status.") cloudEventSourceMutex.Lock() defer cloudEventSourceMutex.Unlock() // Get the latest object - switch cloudEventSourceI.(type) { - case *eventingv1alpha1.CloudEventSource: - cloudEventSource := &eventingv1alpha1.CloudEventSource{} - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, cloudEventSource) - if err != nil { - e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) - } - cloudEventSourceI = cloudEventSource - case *eventingv1alpha1.ClusterCloudEventSource: - clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, clustercloudEventSource) - if err != nil { - e.log.Error(err, "error getting clustercloudEventSource", "clustercloudEventSource", clustercloudEventSource) - } - cloudEventSourceI = clustercloudEventSource - default: + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, cloudEventSourceI) + if err != nil { + e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSourceI) + return } - keyPrefix := cloudEventSourceI.GenerateIdentifier() needUpdate := false - cloudEventSourceStatus := cloudEventSourceI.GetStatus() + cloudEventSourceStatus := cloudEventSourceI.GetStatus().DeepCopy() for k, v := range e.eventHandlersCache { - e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSourceStatus.Conditions.GetActiveCondition().Status) + e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSourceI.GetStatus().Conditions.GetActiveCondition().Status) if strings.Contains(k, keyPrefix) { - if v.GetActiveStatus() != cloudEventSourceStatus.Conditions.GetActiveCondition().Status { + if v.GetActiveStatus() != cloudEventSourceI.GetStatus().Conditions.GetActiveCondition().Status { needUpdate = true cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionFalse, @@ -357,7 +320,6 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSourceI } } } - if needUpdate { if updateErr := e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus); updateErr != nil { e.log.Error(updateErr, "Failed to update CloudEventSource status") @@ -475,7 +437,7 @@ func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, clou return e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus) } -func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus eventingv1alpha1.CloudEventSourceStatus) error { +func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSourceI.GetName()) transform := func(runtimeObj client.Object, target interface{}) error { status, ok := target.(eventingv1alpha1.CloudEventSourceStatus) @@ -494,7 +456,7 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv return nil } - if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSourceI, cloudEventSourceStatus, transform); err != nil { + if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSourceI, *cloudEventSourceStatus, transform); err != nil { e.log.Error(err, "Failed to update CloudEventSourceStatus") return err }