Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly log and set correct status if any of ScaleObject's triggers are in error state #2604

Merged
merged 1 commit into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ issues:
- path: scale_handler.go
linters:
- gocyclo
- path: scale_scaledobjects.go
linters:
- gocyclo
# Exclude for clustertriggerauthentication_controller and triggerauthentication_controller, reason:
# controllers/clustertriggerauthentication_controller.go:1: 1-59 lines are duplicate of `controllers/triggerauthentication_controller.go:1-58` (dupl)
- path: triggerauthentication_controller.go
Expand Down
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@

### Improvements

- **General:** Fix generation of metric names if any of ScaledObject's triggers is unavailable ([#2592](https://github.com/kedacore/keda/issues/2592))
- **General:** Fix failing tests based on the scale to zero bug ([#2603](https://github.com/kedacore/keda/issues/2603))
- **General**: Fix generation of metric names if any of ScaledObject's triggers is unavailable ([#2592](https://github.com/kedacore/keda/issues/2592))
- **General**: Fix logging in KEDA operator and properly set `ScaledObject.Status` in case there is a problem in a ScaledObject's trigger ([#2603](https://github.com/kedacore/keda/issues/2603))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- **General:** Fix failing tests based on the scale to zero bug ([#2603](https://github.com/kedacore/keda/issues/2603))

## v2.6.0

Expand Down
7 changes: 7 additions & 0 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
ConditionFallback ConditionType = "Fallback"
)

const (
// ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
)

// Condition to store the condition state
type Condition struct {
// Type of condition
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}
return "ScaledObject is defined correctly and is ready for scaling", nil
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}

// ensureScaledObjectLabel ensures that scaledobject.keda.sh/name=<scaledObject.Name> label exist in the ScaledObject
Expand Down
11 changes: 7 additions & 4 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, id int, metricNa
func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, []external_metrics.ExternalMetricValue) {
isActive := false
isError := false
// Let's collect status of all scalers, no matter if any scaler raises error or is active
for i, s := range c.Scalers {
isTriggerActive, err := s.Scaler.IsActive(ctx)
if err != nil {
Expand All @@ -92,19 +93,21 @@ func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *k
}
}

logger := c.Logger.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace,
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

if err != nil {
c.Logger.V(1).Info("Error getting scale decision", "Error", err)
isError = true
logger.Error(err, "Error getting scale decision")
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else if isTriggerActive {
isActive = true
if externalMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil {
c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name)
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name)
}
if resourceMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil {
c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name)
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name)
}
break
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, ob
return err
}

func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetReadyCondition(status, reason, message)
}
return e.setCondition(ctx, logger, object, status, reason, message, active)
}

func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetActiveCondition(status, reason, message)
Expand Down
34 changes: 34 additions & 0 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
minReplicas = *scaledObject.Spec.MinReplicaCount
}

// if the ScaledObject's triggers aren't in the error state,
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
if !isError && !readyCondition.IsTrue() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
}

if isActive {
switch {
case scaledObject.Spec.IdleReplicaCount != nil && currentReplicas < minReplicas,
Expand All @@ -89,6 +99,17 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al

// Scale the ScaleTarget up
e.scaleFromZeroOrIdle(ctx, logger, scaledObject, currentScale)
case isError:
// some triggers are active, but some responded with error

// Set ScaledObject.Status.ReadyCondition to Unknown
msg := "Some triggers defined in ScaledObject are not working correctly"
logger.V(1).Info(msg)
if !readyCondition.IsUnknown() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
default:
// triggers are active, but we didn't need to scale (replica count > 0)

Expand All @@ -109,6 +130,19 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al

// Scale to the fallback replicas count
e.doFallbackScaling(ctx, scaledObject, currentScale, logger, currentReplicas)
case isError && scaledObject.Spec.Fallback == nil:
// there are no active triggers, but a scaler responded with an error
// AND
// there is not a fallback replicas count defined

// Set ScaledObject.Status.ReadyCondition to false
msg := "Triggers defined in ScaledObject are not working correctly"
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
logger.V(1).Info(msg)
if !readyCondition.IsFalse() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "TriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
case scaledObject.Spec.IdleReplicaCount != nil && currentReplicas > *scaledObject.Spec.IdleReplicaCount,
// there are no active triggers, Idle Replicas mode is enabled
// AND
Expand Down
20 changes: 10 additions & 10 deletions pkg/scaling/executor/scale_scaledobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func TestScaleToMinReplicasWhenNotActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -206,8 +206,8 @@ func TestScaleToMinReplicasFromLowerInitialReplicaCount(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -265,8 +265,8 @@ func TestScaleFromMinReplicasWhenActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Times(2).Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)

Expand Down Expand Up @@ -328,8 +328,8 @@ func TestScaleToIdleReplicasWhenNotActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -389,8 +389,8 @@ func TestScaleFromIdleToMinReplicasWhenActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Times(2).Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)

Expand Down
52 changes: 36 additions & 16 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) {
Name: "test",
Namespace: "test",
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: "test",
},
},
}

cache := cache.ScalersCache{
Expand All @@ -71,34 +76,49 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) {
assert.Equal(t, true, isError)
}

func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) {
func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) {
ctrl := gomock.NewController(t)
recorder := record.NewFakeRecorder(1)
activeScaler := mock_scalers.NewMockScaler(ctrl)
failingScaler := mock_scalers.NewMockScaler(ctrl)

metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)}

activeFactory := func() (scalers.Scaler, error) {
scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().IsActive(gomock.Any()).Return(true, nil)
scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Times(2).Return(metricsSpecs)
scaler.EXPECT().Close(gomock.Any())
return scaler, nil
}
activeScaler, err := activeFactory()
assert.Nil(t, err)

failingFactory := func() (scalers.Scaler, error) {
scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error"))
scaler.EXPECT().Close(gomock.Any())
return scaler, nil
}
failingScaler, err := failingFactory()
assert.Nil(t, err)

scaledObject := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: "test",
},
},
}

metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)}

activeScaler.EXPECT().IsActive(gomock.Any()).Return(true, nil)
activeScaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Times(2).Return(metricsSpecs)
activeScaler.EXPECT().Close(gomock.Any())
failingScaler.EXPECT().Close(gomock.Any())

factory := func() (scalers.Scaler, error) {
return mock_scalers.NewMockScaler(ctrl), nil
}
scalers := []cache.ScalerBuilder{{
Scaler: activeScaler,
Factory: factory,
Factory: activeFactory,
}, {
Scaler: failingScaler,
Factory: factory,
Factory: failingFactory,
}}

scalersCache := cache.ScalersCache{
Expand All @@ -111,7 +131,7 @@ func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) {
scalersCache.Close(context.Background())

assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, true, isError)
}

func createMetricSpec(averageValue int) v2beta2.MetricSpec {
Expand Down