Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Adds CoudEvents when a ScaledObject scales a workload from/to zero or one #5632

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func main() {
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister(), eventEmitter)

if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create and pass a new event emitter as well so that it doesn't need to check nil later.

r.scaledJobGenerations = &sync.Map{}
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
6 changes: 4 additions & 2 deletions controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ var _ = BeforeSuite(func() {
scaleClient, _, err := k8s.InitScaleClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

eventEmitter := eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default")

err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil, eventEmitter),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default"),
EventEmitter: eventEmitter,
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expand Down
4 changes: 4 additions & 0 deletions pkg/common/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ const (
ScaleTargetNotFoundMsg = "Target resource doesn't exist"

ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource"

ScaleTargetFromZero = "Target resource is scaling up from zero number of replicas"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ScaleTargetFromZero = "Target resource is scaling up from zero number of replicas"
ScaleTargetFromZero = "Target resource is scaling out from zero"


ScaleTargetToZero = "Target resource is scaling down to zero number of replicas"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ScaleTargetToZero = "Target resource is scaling down to zero number of replicas"
ScaleTargetToZero = "Target resource is scaling in to zero replicas"

)
6 changes: 6 additions & 0 deletions pkg/eventemitter/eventtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ const (

// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType = "keda.scaledobject.failed.v1"

// ScaleToZeroType is for event when a workload scales to zero
ScaleToZeroType = "keda.workload.scaletozero.v1"

// ScaleFromZeroType is for event when a workload scales from zero
ScaleFromZeroType = "keda.workload.scalefromzero.v1"
)
5 changes: 4 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventemitter"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)

Expand All @@ -49,16 +50,18 @@ type scaleExecutor struct {
reconcilerScheme *runtime.Scheme
logger logr.Logger
recorder record.EventRecorder
eventEmitter eventemitter.EventHandler
}

// NewScaleExecutor creates a ScaleExecutor object
func NewScaleExecutor(client runtimeclient.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor {
func NewScaleExecutor(client runtimeclient.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder, eventEmitter eventemitter.EventHandler) ScaleExecutor {
return &scaleExecutor{
client: client,
scaleClient: scaleClient,
reconcilerScheme: reconcilerScheme,
logger: logf.Log.WithName("scaleexecutor"),
recorder: recorder,
eventEmitter: eventEmitter,
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/eventreason"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -272,6 +275,13 @@ func (e *scaleExecutor) scaleToZeroOrIdle(ctx context.Context, logger logr.Logge

e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetDeactivated,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one and the new one are duplicated. We should merge these two into one event. eventEmitter.Emit() includes emitting k8s event.

"Deactivated %s %s/%s from %d to %d", scaledObject.Status.ScaleTargetKind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, scaleToReplicas)

if scaleToReplicas == 0 && e.eventEmitter != nil {
e.eventEmitter.Emit(scaledObject, types.NamespacedName{Namespace: scaledObject.Namespace},
corev1.EventTypeNormal, eventemitter.ScaleToZeroType, eventreason.KEDAScaleTargetActivated,
message.ScaleTargetToZero)
}

if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error in setting active condition")
return
Expand Down Expand Up @@ -309,6 +319,13 @@ func (e *scaleExecutor) scaleFromZeroOrIdle(ctx context.Context, logger logr.Log
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", replicas)

if currentReplicas == 0 && e.eventEmitter != nil {
e.eventEmitter.Emit(scaledObject, types.NamespacedName{Namespace: scaledObject.Namespace},
corev1.EventTypeNormal, eventemitter.ScaleFromZeroType, eventreason.KEDAScaleTargetActivated,
message.ScaleTargetFromZero)
}

e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Status.ScaleTargetKind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one and the new one are duplicated. We should merge these two into one event.


// Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject
Expand Down
14 changes: 7 additions & 7 deletions pkg/scaling/executor/scale_scaledobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestScaleToFallbackReplicasWhenNotActiveAndIsError(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

scaledObject := v1alpha1.ScaledObject{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestScaleToMinReplicasWhenNotActive(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

minReplicas := int32(0)

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestScaleToMinReplicasFromLowerInitialReplicaCount(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

minReplicas := int32(5)

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestScaleFromMinReplicasWhenActive(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

minReplicas := int32(0)

Expand Down Expand Up @@ -284,7 +284,7 @@ func TestScaleToIdleReplicasWhenNotActive(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

idleReplicas := int32(0)
minReplicas := int32(5)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestScaleFromIdleToMinReplicasWhenActive(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

idleReplicas := int32(0)
minReplicas := int32(5)
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestScaleToPausedReplicasCount(t *testing.T) {
mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl)
statusWriter := mock_client.NewMockStatusWriter(ctrl)

scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder)
scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder, nil)

pausedReplicaCount := int32(0)
replicaCount := int32(2)
Expand Down
5 changes: 3 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/fallback"
"github.com/kedacore/keda/v2/pkg/metricscollector"
Expand Down Expand Up @@ -77,11 +78,11 @@ type scaleHandler struct {
}

// NewScaleHandler creates a ScaleHandler object
func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder, secretsLister corev1listers.SecretLister) ScaleHandler {
func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder, secretsLister corev1listers.SecretLister, eventEmitter eventemitter.EventHandler) ScaleHandler {
return &scaleHandler{
client: client,
scaleLoopContexts: &sync.Map{},
scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder),
scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder, eventEmitter),
globalHTTPTimeout: globalHTTPTimeout,
recorder: recorder,
scalerCaches: map[string]*cache.ScalersCache{},
Expand Down
Loading