Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
  • Loading branch information
SpiritZhou committed Jul 4, 2024
1 parent d977a1e commit cfdbaef
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 325 deletions.
58 changes: 12 additions & 46 deletions apis/eventing/v1alpha1/cloudeventsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

// +kubebuilder:object:generate=false
type CloudEventSourceInterface interface {
GetKind() string
GetName() string
GetNamespace() string
GetSpec() CloudEventSourceSpec
GetStatus() CloudEventSourceStatus
GetGeneration() int64
client.Object
GenerateIdentifier() string
GetSpec() *CloudEventSourceSpec
GetStatus() *CloudEventSourceStatus
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -129,57 +127,25 @@ func init() {
SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}, &ClusterCloudEventSource{}, &ClusterCloudEventSourceList{})
}

func (ces *CloudEventSource) GetKind() string {
return ces.Kind
func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec {
return &ces.Spec
}

func (ces *CloudEventSource) GetName() string {
return ces.Name
}

func (ces *CloudEventSource) GetNamespace() string {
return ces.Namespace
}

func (ces *CloudEventSource) GetSpec() CloudEventSourceSpec {
return ces.Spec
}

func (ces *CloudEventSource) GetStatus() CloudEventSourceStatus {
return *ces.Status.DeepCopy()
}

func (ces *CloudEventSource) GetGeneration() int64 {
return ces.Generation
func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus {
return &ces.Status
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (ces *CloudEventSource) GenerateIdentifier() string {
return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name)
}

func (cces *ClusterCloudEventSource) GetKind() string {
return cces.Kind
}

func (cces *ClusterCloudEventSource) GetName() string {
return cces.Name
}

func (cces *ClusterCloudEventSource) GetNamespace() string {
return cces.Namespace
}

func (cces *ClusterCloudEventSource) GetSpec() CloudEventSourceSpec {
return cces.Spec
}

func (cces *ClusterCloudEventSource) GetStatus() CloudEventSourceStatus {
return *cces.Status.DeepCopy()
func (cces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec {
return &cces.Spec
}

func (cces *ClusterCloudEventSource) GetGeneration() int64 {
return cces.Generation
func (cces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus {
return &cces.Status
}

// GenerateIdentifier returns identifier for the object in for "kind.cluster-scoped.name"
Expand Down
117 changes: 13 additions & 104 deletions controllers/eventing/cloudeventsource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

//
//nolint:dupl
package eventing

import (
"context"
"sync"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -32,7 +30,6 @@ import (
eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/metricscollector"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
"github.com/kedacore/keda/v2/pkg/util"
)

Expand Down Expand Up @@ -60,57 +57,11 @@ func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler)
// +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*"

// Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any).
//
//nolint:dupl

func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)
// Fetch the EventSource instance
cloudEventSource := &eventingv1alpha1.CloudEventSource{}
err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource)
if err != nil {
if errors.IsNotFound(err) {
// Request eventSource not found, could have been deleted after reconcile request.
// Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "failed to get EventSource")
return ctrl.Result{}, err
}

reqLogger.Info("Reconciling CloudEventSource")

if !cloudEventSource.GetDeletionTimestamp().IsZero() {
return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String())
}
r.updatePromMetrics(cloudEventSource, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil {
return ctrl.Result{}, err
}

// ensure Status Conditions are initialized
if !cloudEventSource.Status.Conditions.AreInitialized() {
conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, cloudEventSource, conditions); err != nil {
return ctrl.Result{}, err
}
}

eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, cloudEventSource)
if err != nil {
return ctrl.Result{}, err
}

if eventSourceChanged {
if r.requestEventLoop(ctx, reqLogger, cloudEventSource) != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
return Reconcile(ctx, reqLogger, r, req, cloudEventSource)
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -121,70 +72,28 @@ func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// requestEventLoop tries to start EventLoop handler for the respective EventSource
func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error {
logger.V(1).Info("Notify eventHandler of an update in eventSource")

key, err := cache.MetaNamespaceKeyFunc(eventSource)
if err != nil {
logger.Error(err, "error getting key for eventSource")
return err
}

if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil {
return err
}

// store CloudEventSource's current Generation
r.cloudEventSourceGenerations.Store(key, eventSource.Generation)

return nil
func (r *CloudEventSourceReconciler) GetClient() client.Client {
return r.Client
}

// stopEventLoop stops EventLoop handler for the respective EventSource
func (r *CloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
logger.Error(err, "error getting key for eventSource")
return err
}

if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.CloudEventSource)); err != nil {
return err
}
// delete CloudEventSource's current Generation
r.cloudEventSourceGenerations.Delete(key)
return nil
func (r *CloudEventSourceReconciler) GetEventEmitter() eventemitter.EventHandler {
return r.eventEmitter
}

// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed
func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(eventSource)
if err != nil {
logger.Error(err, "error getting key for eventSource")
return true, err
}

value, loaded := r.cloudEventSourceGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == eventSource.Generation {
return false, nil
}
}
return true, nil
func (r *CloudEventSourceReconciler) GetCloudEventSourceGeneration() *sync.Map {
return r.cloudEventSourceGenerations
}

func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) {
func (r *CloudEventSourceReconciler) UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) {
r.eventSourcePromMetricsLock.Lock()
defer r.eventSourcePromMetricsLock.Unlock()

if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns)
}

metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace)
r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace
metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace())
r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace()
}

// UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects
Expand Down
Loading

0 comments on commit cfdbaef

Please sign in to comment.