diff --git a/charts/elasti/templates/operator-additional-access-rbac.yaml b/charts/elasti/templates/operator-additional-access-rbac.yaml index 04abaa0..0fe5b9c 100644 --- a/charts/elasti/templates/operator-additional-access-rbac.yaml +++ b/charts/elasti/templates/operator-additional-access-rbac.yaml @@ -14,6 +14,9 @@ rules: - apiGroups: [""] resources: ["services"] verbs: ["get", "list", "watch", "update", "patch", "delete", "create"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch", "update"] - apiGroups: ["argoproj.io"] resources: ["rollouts"] verbs: ["get", "list", "watch", "update", "patch"] diff --git a/operator/internal/elastiserver/elastiServer.go b/operator/internal/elastiserver/elastiServer.go index 669e957..2cf0a7a 100644 --- a/operator/internal/elastiserver/elastiServer.go +++ b/operator/internal/elastiserver/elastiServer.go @@ -164,16 +164,11 @@ func (s *Server) scaleTargetForService(ctx context.Context, serviceName, namespa } } - if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil { + if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas, crd.CRDName); err != nil { prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, err.Error()).Inc() return fmt.Errorf("scaleTargetForService - error: %w, targetRefKind: %s, targetRefName: %s", err, crd.Spec.ScaleTargetRef.Kind, crd.Spec.ScaleTargetRef.Name) } prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc() - if err := s.scaleHandler.UpdateLastScaledUpTime(ctx, crd.CRDName, namespace); err != nil { - // not returning an error as scale up has been successful - s.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err)) - } - return nil } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index c0b25b2..a90df02 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -13,8 +13,8 @@ import ( "github.com/truefoundry/elasti/pkg/scaling/scalers" "github.com/truefoundry/elasti/pkg/values" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" @@ -176,7 +176,7 @@ func (h *ScaleHandler) handleScaleToZero(ctx context.Context, es *v1alpha1.Elast } } - if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind); err != nil { + if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Name); err != nil { return fmt.Errorf("failed to scale target to zero: %w", err) } return nil @@ -228,15 +228,10 @@ func (h *ScaleHandler) handleScaleFromZero(ctx context.Context, es *v1alpha1.Ela } } - if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas); err != nil { + if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas, es.Name); err != nil { return fmt.Errorf("failed to scale target from zero: %w", err) } - if err := h.UpdateLastScaledUpTime(ctx, es.Name, es.Namespace); err != nil { - // not returning an error as scale up has been successful - h.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err)) - } - return nil } @@ -258,51 +253,74 @@ func (h *ScaleHandler) createScalerForTrigger(trigger *v1alpha1.ScaleTrigger) (s } // ScaleTargetFromZero scales the TargetRef to the provided replicas when it's at 0 -func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32) error { +func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32, elastiServiceName string) error { mutex := h.getMutexForScale(namespacedName.String()) mutex.Lock() defer mutex.Unlock() h.logger.Info("Scaling up from zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String()), zap.Int32("replicas", replicas)) + + var err error switch strings.ToLower(targetKind) { case values.KindDeployments: - err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas) - if err != nil { - return fmt.Errorf("ScaleTargetFromZero - Deployment: %w", err) - } + err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas) case values.KindRollout: - err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas) - if err != nil { - return fmt.Errorf("ScaleTargetFromZero - Rollout: %w", err) - } + err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas) default: return fmt.Errorf("unsupported target kind: %s", targetKind) } + + if err != nil { + eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleFromZeroFailed", fmt.Sprintf("Failed to scale %s from zero to %d replicas: %v", targetKind, replicas, err)) + if eventErr != nil { + h.logger.Error("Failed to create failure event", zap.Error(eventErr)) + } + return fmt.Errorf("ScaleTargetFromZero - %s: %w", targetKind, err) + } + + eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledUpFromZero", fmt.Sprintf("Successfully scaled %s from zero to %d replicas", targetKind, replicas)) + if eventErr != nil { + h.logger.Error("Failed to create success event", zap.Error(eventErr)) + } + + if err := h.UpdateLastScaledUpTime(context.Background(), elastiServiceName, namespacedName.Namespace); err != nil { + h.logger.Error("Failed to update LastScaledUpTime", zap.Error(err), zap.String("namespacedName", namespacedName.String())) + } + return nil } // ScaleTargetToZero scales the target to zero -// TODO: Emit k8s events -func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string) error { +func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string, elastiServiceName string) error { mutex := h.getMutexForScale(namespacedName.String()) mutex.Lock() defer mutex.Unlock() h.logger.Info("Scaling down to zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String())) + + var err error switch strings.ToLower(targetKind) { case values.KindDeployments: - err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0) - if err != nil { - return fmt.Errorf("ScaleTargetToZero - Deployment: %w", err) - } + err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0) case values.KindRollout: - err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0) - if err != nil { - return fmt.Errorf("ScaleTargetToZero - Rollout: %w", err) - } + err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0) default: return fmt.Errorf("unsupported target kind: %s", targetKind) } + + if err != nil { + eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleToZeroFailed", fmt.Sprintf("Failed to scale %s to zero: %v", targetKind, err)) + if eventErr != nil { + h.logger.Error("Failed to create failure event", zap.Error(eventErr)) + } + return fmt.Errorf("ScaleTargetToZero - %s: %w", targetKind, err) + } + + eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledDownToZero", fmt.Sprintf("Successfully scaled %s to zero", targetKind)) + if eventErr != nil { + h.logger.Error("Failed to create success event", zap.Error(eventErr)) + } + return nil } @@ -354,53 +372,53 @@ func (h *ScaleHandler) ScaleArgoRollout(ns, targetName string, replicas int32) e } func (h *ScaleHandler) UpdateKedaScaledObjectPausedState(ctx context.Context, scaledObjectName, namespace string, paused bool) error { - scaledObject, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Get(ctx, scaledObjectName, metav1.GetOptions{}) + patchBytes := []byte(fmt.Sprintf(`{"metadata": {"annotations": {"%s": "%s"}}}`, kedaPausedAnnotation, strconv.FormatBool(paused))) + _, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Patch(ctx, scaledObjectName, types.MergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("failed to get ScaledObject: %w", err) - } - - annotations := scaledObject.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string) + return fmt.Errorf("failed to patch ScaledObject: %w", err) } - - annotations[kedaPausedAnnotation] = strconv.FormatBool(paused) - scaledObject.SetAnnotations(annotations) - - _, err = h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Update(ctx, scaledObject, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update ScaledObject: %w", err) - } - return nil } func (h *ScaleHandler) UpdateLastScaledUpTime(ctx context.Context, crdName, namespace string) error { - elastiService, err := h.kDynamicClient.Resource(values.ElastiServiceGVR). - Namespace(namespace). - Get(ctx, crdName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get ElastiService: %w", err) - } - - currentES := &v1alpha1.ElastiService{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(elastiService.Object, currentES); err != nil { - return fmt.Errorf("failed to convert unstructured to ElastiService: %w", err) - } now := metav1.Now() - currentES.Status.LastScaledUpTime = &now + patchBytes := []byte(fmt.Sprintf(`{"status": {"lastScaledUpTime": "%s"}}`, now.Format(time.RFC3339Nano))) - obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentES) + _, err := h.kDynamicClient.Resource(values.ElastiServiceGVR). + Namespace(namespace). + Patch(ctx, crdName, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { - return fmt.Errorf("failed to convert ElastiService to unstructured: %w", err) + return fmt.Errorf("failed to patch ElastiService status: %w", err) } + return nil +} - _, err = h.kDynamicClient.Resource(values.ElastiServiceGVR). - Namespace(currentES.Namespace). - UpdateStatus(ctx, &unstructured.Unstructured{Object: obj}, metav1.UpdateOptions{}) +// createEvent creates a new event on scaling up or down +func (h *ScaleHandler) createEvent(namespace, name, eventType, reason, message string) error { + h.logger.Info("createEvent", zap.String("eventType", eventType), zap.String("reason", reason), zap.String("message", message)) + event := &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: name + "-", + Namespace: namespace, + }, + InvolvedObject: v1.ObjectReference{ + APIVersion: "elasti.truefoundry.com/v1alpha1", + Kind: "ElastiService", + Name: name, + Namespace: namespace, + }, + Type: eventType, // Normal or Warning + Reason: reason, + Message: message, + Action: "Scale", + Source: v1.EventSource{ + Component: "elasti-operator", + }, + } + + _, err := h.kClient.CoreV1().Events(namespace).Create(context.TODO(), event, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("failed to update ElastiService status: %w", err) + return fmt.Errorf("failed to create event: %w", err) } - return nil }