Skip to content

Commit

Permalink
implement scale down to zero (#73)
Browse files Browse the repository at this point in the history
* implement scale down to zero

* create new package scaling and refactor

* remove unused function

* resolve comments

* update manifests

* rename scaler method and cleanup

* implement prometheus scaler

* update keda annotations to pause and implement scale up based on trigger

* undo go mod tidy

* undo changes to go.work.sum

* move polling interval to env variable and add scaledobject rbac

* add prometheus query to error message

* undo Dockerfile go version changes

* add lastScaledUpTime to status

* update lastScaledUpTime in after scale up

* set cooldownPeriod to int and move pollingInterval to env
  • Loading branch information
Maanas-23 authored Feb 6, 2025
1 parent c97f0e1 commit df2114e
Show file tree
Hide file tree
Showing 15 changed files with 687 additions and 126 deletions.
2 changes: 2 additions & 0 deletions charts/elasti/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ spec:
env:
- name: KUBERNETES_CLUSTER_DOMAIN
value: {{ .Values.global.kubernetesClusterDomain }}
- name: POLLING_INTERVAL
value: {{ .Values.elastiController.manager.env.pollingInterval | quote }}
{{- if .Values.elastiController.manager.sentry.enabled }}
- name: SENTRY_DSN
valueFrom:
Expand Down
3 changes: 3 additions & 0 deletions charts/elasti/templates/operator-additional-access-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ rules:
verbs: ["get", "list", "watch", "update", "patch", "delete", "create"]
- apiGroups: ["argoproj.io"]
resources: ["rollouts"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: ["keda.sh"]
resources: ["scaledobjects"]
verbs: ["get", "list", "watch", "update", "patch"]
2 changes: 2 additions & 0 deletions charts/elasti/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ elastiController:
sentry:
enabled: false
environment: ""
env:
pollingInterval: 30
replicas: 1
serviceAccount:
annotations: {}
Expand Down
26 changes: 21 additions & 5 deletions operator/api/v1alpha1/elastiservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ const (
type ElastiServiceSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
ScaleTargetRef ScaleTargetRef `json:"scaleTargetRef,omitempty"`
Service string `json:"service,omitempty"`
MinTargetReplicas int32 `json:"minTargetReplicas,omitempty" default:"1"`
ScaleTargetRef ScaleTargetRef `json:"scaleTargetRef,omitempty"`
Service string `json:"service,omitempty"`
MinTargetReplicas int32 `json:"minTargetReplicas,omitempty" default:"1"`
CooldownPeriod int32 `json:"cooldownPeriod,omitempty"`
Triggers []ScaleTrigger `json:"triggers,omitempty"`
Autoscaler *AutoscalerSpec `json:"autoscaler,omitempty"`
}

type ScaleTargetRef struct {
Expand All @@ -46,8 +49,9 @@ type ScaleTargetRef struct {
type ElastiServiceStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
LastReconciledTime metav1.Time `json:"lastReconciledTime,omitempty"`
Mode string `json:"mode,omitempty"`
LastReconciledTime metav1.Time `json:"lastReconciledTime,omitempty"`
LastScaledUpTime *metav1.Time `json:"lastScaledUpTime,omitempty"`
Mode string `json:"mode,omitempty"`
}

//+kubebuilder:object:root=true
Expand All @@ -71,6 +75,18 @@ type ElastiServiceList struct {
Items []ElastiService `json:"items"`
}

type ScaleTrigger struct {
Type string `json:"type"`
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
Metadata any `json:"metadata,omitempty"`
}

type AutoscalerSpec struct {
Type string `json:"type"` // keda/hpa
Name string `json:"name"` // Name of the ScaledObject/HorizontalPodAutoscaler
}

func init() {
SchemeBuilder.Register(&ElastiService{}, &ElastiServiceList{})
}
7 changes: 6 additions & 1 deletion operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/getsentry/sentry-go"
"github.com/truefoundry/elasti/pkg/scaling"

"truefoundry/elasti/operator/internal/elastiserver"

Expand Down Expand Up @@ -176,12 +177,16 @@ func mainWithError() error {
informerManager.Start()
defer informerManager.Stop()

// Initiate and start the shared scaleHandler
scaleHandler := scaling.NewScaleHandler(zapLogger, mgr.GetConfig())

// Set up the ElastiService controller
reconciler := &controller.ElastiServiceReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Logger: zapLogger,
InformerManager: informerManager,
ScaleHandler: scaleHandler,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ElastiService")
Expand All @@ -190,7 +195,7 @@ func mainWithError() error {
}

// Start the elasti server
eServer := elastiserver.NewServer(zapLogger, mgr.GetConfig(), 30*time.Second)
eServer := elastiserver.NewServer(zapLogger, scaleHandler, 30*time.Second)
errChan := make(chan error, 1)
go func() {
if err := eServer.Start(elastiServerPort); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ spec:
spec:
description: ElastiServiceSpec defines the desired state of ElastiService
properties:
autoscaler:
properties:
name:
type: string
type:
type: string
required:
- name
- type
type: object
cooldownPeriod:
format: int32
type: integer
minTargetReplicas:
format: int32
type: integer
Expand All @@ -56,6 +69,17 @@ spec:
type: object
service:
type: string
triggers:
items:
properties:
metadata:
x-kubernetes-preserve-unknown-fields: true
type:
type: string
required:
- type
type: object
type: array
type: object
status:
description: ElastiServiceStatus defines the observed state of ElastiService
Expand All @@ -66,6 +90,9 @@ spec:
Important: Run "make" to regenerate code after modifying this file
format: date-time
type: string
lastScaledUpTime:
format: date-time
type: string
mode:
type: string
type: object
Expand Down
7 changes: 3 additions & 4 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/getsentry/sentry-go"
"github.com/truefoundry/elasti/pkg/scaling"
"k8s.io/apimachinery/pkg/types"

"truefoundry/elasti/operator/internal/crddirectory"
Expand All @@ -32,6 +33,7 @@ type (
Logger *zap.Logger
InformerManager *informer.Manager
SwitchModeLocks sync.Map
ScaleHandler *scaling.ScaleHandler
InformerStartLocks sync.Map
ReconcileLocks sync.Map
}
Expand Down Expand Up @@ -85,14 +87,12 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return res, nil
}
r.Logger.Error("Failed to get ElastiService in Reconcile", zap.String("es", req.String()), zap.Error(esErr))
sentry.CaptureException(esErr)
return res, esErr
}

// If the ElastiService is being deleted, we need to clean up the resources
if isDeleted, err := r.finalizeCRDIfDeleted(ctx, es, req); err != nil {
r.Logger.Error("Failed to check if CRD is deleted", zap.String("es", req.String()), zap.Error(err))
sentry.CaptureException(err)
return res, err
} else if isDeleted {
r.Logger.Info("[CRD is deleted successfully]", zap.String("es", req.String()))
Expand All @@ -102,15 +102,13 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// We also check if the CRD has finalizer, and if not, we add the finalizer
if err := r.addCRDFinalizer(ctx, es); err != nil {
r.Logger.Error("Failed to finalize CRD", zap.String("es", req.String()), zap.Error(err))
sentry.CaptureException(err)
return res, err
}
r.Logger.Info("Finalizer added to CRD", zap.String("es", req.String()))

// Add watch for public service, so when the public service is modified, we can update the private service
if err := r.watchScaleTargetRef(ctx, es, req); err != nil {
r.Logger.Error("Failed to add watch for ScaleTargetRef", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef), zap.Error(err))
sentry.CaptureException(err)
return res, err
}
r.Logger.Info("Watch added for ScaleTargetRef", zap.String("es", req.String()), zap.Any("scaleTargetRef", es.Spec.ScaleTargetRef))
Expand Down Expand Up @@ -149,6 +147,7 @@ func (r *ElastiServiceReconciler) Initialize(ctx context.Context) error {
if err := r.InformerManager.InitializeResolverInformer(r.getResolverChangeHandler(ctx)); err != nil {
return fmt.Errorf("failed to initialize resolver informer: %w", err)
}
r.ScaleHandler.StartScaleDownWatcher(ctx)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion operator/internal/controller/opsCRD.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"strings"
"sync"

"truefoundry/elasti/operator/api/v1alpha1"
"truefoundry/elasti/operator/internal/crddirectory"
"truefoundry/elasti/operator/internal/informer"
Expand Down
70 changes: 26 additions & 44 deletions operator/internal/elastiserver/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@ package elastiserver
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/truefoundry/elasti/pkg/scaling"
"k8s.io/apimachinery/pkg/types"

"k8s.io/client-go/rest"

"truefoundry/elasti/operator/internal/crddirectory"
"truefoundry/elasti/operator/internal/prom"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/truefoundry/elasti/pkg/k8shelper"
"github.com/truefoundry/elasti/pkg/messages"
"go.uber.org/zap"
)
Expand All @@ -35,20 +33,18 @@ type (
// It is used by components about certain events, like when resolver receive the request
// for a service, that service is scaled up if it's at 0 replicas
Server struct {
logger *zap.Logger
k8shelper *k8shelper.Ops
scaleLocks sync.Map
logger *zap.Logger
scaleHandler *scaling.ScaleHandler
// rescaleDuration is the duration to wait before checking to rescaling the target
rescaleDuration time.Duration
}
)

func NewServer(logger *zap.Logger, config *rest.Config, rescaleDuration time.Duration) *Server {
func NewServer(logger *zap.Logger, scaleHandler *scaling.ScaleHandler, rescaleDuration time.Duration) *Server {
// Get Ops client
k8sUtil := k8shelper.NewOps(logger, config)
return &Server{
logger: logger.Named("elastiServer"),
k8shelper: k8sUtil,
logger: logger.Named("elastiServer"),
scaleHandler: scaleHandler,
// rescaleDuration is the duration to wait before checking to rescaling the target
rescaleDuration: rescaleDuration,
}
Expand Down Expand Up @@ -88,7 +84,7 @@ func (s *Server) Start(port string) error {
}()

s.logger.Info("Starting ElastiServer", zap.String("port", port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("Failed to start ElastiServer", zap.Error(err))
return err
}
Expand Down Expand Up @@ -149,49 +145,35 @@ func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) {
zap.String("namespace", body.Namespace))
}

func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error {
namespacedName := (types.NamespacedName{Namespace: namespace, Name: serviceName}).String()
scaleMutex, loaded := s.getMutexForServiceScale(namespacedName)
if loaded {
s.logger.Debug("Scale target lock already exists", zap.String("service", namespacedName))
return nil
}
scaleMutex.Lock()
func (s *Server) scaleTargetForService(ctx context.Context, serviceName, namespace string) error {
namespacedName := types.NamespacedName{Namespace: namespace, Name: serviceName}

defer s.logger.Debug("Scale target lock released", zap.String("service", namespacedName))
s.logger.Debug("Scale target lock taken", zap.String("service", namespacedName))
defer s.logger.Debug("Scale target lock released", zap.String("service", namespacedName.String()))
s.logger.Debug("Scale target lock taken", zap.String("service", namespacedName.String()))

crd, found := crddirectory.GetCRD(namespacedName)
crd, found := crddirectory.GetCRD(namespacedName.String())
if !found {
s.releaseMutexForServiceScale(namespacedName)
return fmt.Errorf("scaleTargetForService - error: failed to get CRD details from directory, namespacedName: %s", namespacedName)
}

if err := s.k8shelper.ScaleTargetWhenAtZero(namespace, crd.Spec.ScaleTargetRef.Name, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
s.releaseMutexForServiceScale(namespacedName)
// Unpause the Keda ScaledObject if it's paused
if crd.Spec.Autoscaler != nil && strings.ToLower(crd.Spec.Autoscaler.Type) == "keda" {
err := s.scaleHandler.UpdateKedaScaledObjectPausedState(ctx, crd.Spec.Autoscaler.Name, namespace, false)
if err != nil {
return fmt.Errorf("failed to update Keda ScaledObject for service %s: %w", namespacedName.String(), err)
}
}

if err := s.scaleHandler.ScaleTargetFromZero(namespacedName, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, err.Error()).Inc()
return fmt.Errorf("scaleTargetForService - error: %w, targetRefKind: %s, targetRefName: %s", err, crd.Spec.ScaleTargetRef.Kind, crd.Spec.ScaleTargetRef.Name)
}
prom.TargetScaleCounter.WithLabelValues(serviceName, namespace, crd.Spec.ScaleTargetRef.Kind+"-"+crd.Spec.ScaleTargetRef.Name, "success").Inc()

// If the target is scaled up, we will hold the lock for longer, to not scale up again
// TODO: Is there a better way to do this and why is it even needed?
time.AfterFunc(s.rescaleDuration, func() {
s.releaseMutexForServiceScale(namespacedName)
})
return nil
}

func (s *Server) releaseMutexForServiceScale(service string) {
lock, loaded := s.scaleLocks.Load(service)
if !loaded {
return
if err := s.scaleHandler.UpdateLastScaledUpTime(ctx, crd.CRDName, namespace); err != nil {
// not returning an error as scale up has been successful
s.logger.Error("failed to update LastScaledUpTime", zap.String("namespacedName", namespacedName.String()), zap.Error(err))
}
lock.(*sync.Mutex).Unlock()
s.scaleLocks.Delete(service)
}

func (s *Server) getMutexForServiceScale(serviceName string) (*sync.Mutex, bool) {
l, loaded := s.scaleLocks.LoadOrStore(serviceName, &sync.Mutex{})
return l.(*sync.Mutex), loaded
return nil
}
Loading

0 comments on commit df2114e

Please sign in to comment.