Skip to content

Commit

Permalink
emit events on scaling (#82)
Browse files Browse the repository at this point in the history
* emit events on scaling

* update event reason on scale

* add events to ElastiService

* update lastScaledUpTime with patch instead to avoid object has been modified

* update keda scaledobject with patch
  • Loading branch information
Maanas-23 authored Feb 17, 2025
1 parent 20e020e commit 072ac9b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 68 deletions.
3 changes: 3 additions & 0 deletions charts/elasti/templates/operator-additional-access-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "update", "patch", "delete", "create"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch", "update"]
- apiGroups: ["argoproj.io"]
resources: ["rollouts"]
verbs: ["get", "list", "watch", "update", "patch"]
Expand Down
7 changes: 1 addition & 6 deletions operator/internal/elastiserver/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,11 @@ func (s *Server) scaleTargetForService(ctx context.Context, serviceName, namespa
}
}

if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas, crd.CRDName); err != nil {
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, err.Error()).Inc()
return fmt.Errorf("scaleTargetForService - error: %w, targetRefKind: %s, targetRefName: %s", err, crd.Spec.ScaleTargetRef.Kind, crd.Spec.ScaleTargetRef.Name)
}
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc()

if err := s.scaleHandler.UpdateLastScaledUpTime(ctx, crd.CRDName, namespace); err != nil {
// not returning an error as scale up has been successful
s.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err))
}

return nil
}
142 changes: 80 additions & 62 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/truefoundry/elasti/pkg/scaling/scalers"
"github.com/truefoundry/elasti/pkg/values"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -176,7 +176,7 @@ func (h *ScaleHandler) handleScaleToZero(ctx context.Context, es *v1alpha1.Elast
}
}

if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind); err != nil {
if err := h.ScaleTargetToZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Name); err != nil {
return fmt.Errorf("failed to scale target to zero: %w", err)
}
return nil
Expand Down Expand Up @@ -228,15 +228,10 @@ func (h *ScaleHandler) handleScaleFromZero(ctx context.Context, es *v1alpha1.Ela
}
}

if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas); err != nil {
if err := h.ScaleTargetFromZero(namespacedName, es.Spec.ScaleTargetRef.Kind, es.Spec.MinTargetReplicas, es.Name); err != nil {
return fmt.Errorf("failed to scale target from zero: %w", err)
}

if err := h.UpdateLastScaledUpTime(ctx, es.Name, es.Namespace); err != nil {
// not returning an error as scale up has been successful
h.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err))
}

return nil
}

Expand All @@ -258,51 +253,74 @@ func (h *ScaleHandler) createScalerForTrigger(trigger *v1alpha1.ScaleTrigger) (s
}

// ScaleTargetFromZero scales the TargetRef to the provided replicas when it's at 0
func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32) error {
func (h *ScaleHandler) ScaleTargetFromZero(namespacedName types.NamespacedName, targetKind string, replicas int32, elastiServiceName string) error {
mutex := h.getMutexForScale(namespacedName.String())
mutex.Lock()
defer mutex.Unlock()

h.logger.Info("Scaling up from zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String()), zap.Int32("replicas", replicas))

var err error
switch strings.ToLower(targetKind) {
case values.KindDeployments:
err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas)
if err != nil {
return fmt.Errorf("ScaleTargetFromZero - Deployment: %w", err)
}
err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, replicas)
case values.KindRollout:
err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas)
if err != nil {
return fmt.Errorf("ScaleTargetFromZero - Rollout: %w", err)
}
err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, replicas)
default:
return fmt.Errorf("unsupported target kind: %s", targetKind)
}

if err != nil {
eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleFromZeroFailed", fmt.Sprintf("Failed to scale %s from zero to %d replicas: %v", targetKind, replicas, err))
if eventErr != nil {
h.logger.Error("Failed to create failure event", zap.Error(eventErr))
}
return fmt.Errorf("ScaleTargetFromZero - %s: %w", targetKind, err)
}

eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledUpFromZero", fmt.Sprintf("Successfully scaled %s from zero to %d replicas", targetKind, replicas))
if eventErr != nil {
h.logger.Error("Failed to create success event", zap.Error(eventErr))
}

if err := h.UpdateLastScaledUpTime(context.Background(), elastiServiceName, namespacedName.Namespace); err != nil {
h.logger.Error("Failed to update LastScaledUpTime", zap.Error(err), zap.String("namespacedName", namespacedName.String()))
}

return nil
}

// ScaleTargetToZero scales the target to zero
// TODO: Emit k8s events
func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string) error {
func (h *ScaleHandler) ScaleTargetToZero(namespacedName types.NamespacedName, targetKind string, elastiServiceName string) error {
mutex := h.getMutexForScale(namespacedName.String())
mutex.Lock()
defer mutex.Unlock()

h.logger.Info("Scaling down to zero", zap.String("kind", targetKind), zap.String("namespacedName", namespacedName.String()))

var err error
switch strings.ToLower(targetKind) {
case values.KindDeployments:
err := h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0)
if err != nil {
return fmt.Errorf("ScaleTargetToZero - Deployment: %w", err)
}
err = h.ScaleDeployment(namespacedName.Namespace, namespacedName.Name, 0)
case values.KindRollout:
err := h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0)
if err != nil {
return fmt.Errorf("ScaleTargetToZero - Rollout: %w", err)
}
err = h.ScaleArgoRollout(namespacedName.Namespace, namespacedName.Name, 0)
default:
return fmt.Errorf("unsupported target kind: %s", targetKind)
}

if err != nil {
eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Warning", "ScaleToZeroFailed", fmt.Sprintf("Failed to scale %s to zero: %v", targetKind, err))
if eventErr != nil {
h.logger.Error("Failed to create failure event", zap.Error(eventErr))
}
return fmt.Errorf("ScaleTargetToZero - %s: %w", targetKind, err)
}

eventErr := h.createEvent(namespacedName.Namespace, elastiServiceName, "Normal", "ScaledDownToZero", fmt.Sprintf("Successfully scaled %s to zero", targetKind))
if eventErr != nil {
h.logger.Error("Failed to create success event", zap.Error(eventErr))
}

return nil
}

Expand Down Expand Up @@ -354,53 +372,53 @@ func (h *ScaleHandler) ScaleArgoRollout(ns, targetName string, replicas int32) e
}

func (h *ScaleHandler) UpdateKedaScaledObjectPausedState(ctx context.Context, scaledObjectName, namespace string, paused bool) error {
scaledObject, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Get(ctx, scaledObjectName, metav1.GetOptions{})
patchBytes := []byte(fmt.Sprintf(`{"metadata": {"annotations": {"%s": "%s"}}}`, kedaPausedAnnotation, strconv.FormatBool(paused)))
_, err := h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Patch(ctx, scaledObjectName, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("failed to get ScaledObject: %w", err)
}

annotations := scaledObject.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
return fmt.Errorf("failed to patch ScaledObject: %w", err)
}

annotations[kedaPausedAnnotation] = strconv.FormatBool(paused)
scaledObject.SetAnnotations(annotations)

_, err = h.kDynamicClient.Resource(values.ScaledObjectGVR).Namespace(namespace).Update(ctx, scaledObject, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update ScaledObject: %w", err)
}

return nil
}

func (h *ScaleHandler) UpdateLastScaledUpTime(ctx context.Context, crdName, namespace string) error {
elastiService, err := h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(namespace).
Get(ctx, crdName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get ElastiService: %w", err)
}

currentES := &v1alpha1.ElastiService{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(elastiService.Object, currentES); err != nil {
return fmt.Errorf("failed to convert unstructured to ElastiService: %w", err)
}
now := metav1.Now()
currentES.Status.LastScaledUpTime = &now
patchBytes := []byte(fmt.Sprintf(`{"status": {"lastScaledUpTime": "%s"}}`, now.Format(time.RFC3339Nano)))

obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentES)
_, err := h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(namespace).
Patch(ctx, crdName, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return fmt.Errorf("failed to convert ElastiService to unstructured: %w", err)
return fmt.Errorf("failed to patch ElastiService status: %w", err)
}
return nil
}

_, err = h.kDynamicClient.Resource(values.ElastiServiceGVR).
Namespace(currentES.Namespace).
UpdateStatus(ctx, &unstructured.Unstructured{Object: obj}, metav1.UpdateOptions{})
// createEvent creates a new event on scaling up or down
func (h *ScaleHandler) createEvent(namespace, name, eventType, reason, message string) error {
h.logger.Info("createEvent", zap.String("eventType", eventType), zap.String("reason", reason), zap.String("message", message))
event := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name + "-",
Namespace: namespace,
},
InvolvedObject: v1.ObjectReference{
APIVersion: "elasti.truefoundry.com/v1alpha1",
Kind: "ElastiService",
Name: name,
Namespace: namespace,
},
Type: eventType, // Normal or Warning
Reason: reason,
Message: message,
Action: "Scale",
Source: v1.EventSource{
Component: "elasti-operator",
},
}

_, err := h.kClient.CoreV1().Events(namespace).Create(context.TODO(), event, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to update ElastiService status: %w", err)
return fmt.Errorf("failed to create event: %w", err)
}

return nil
}

0 comments on commit 072ac9b

Please sign in to comment.