Skip to content

Commit

Permalink
Adding Lag-dependent implementation of Redis streams scaler (kedacore…
Browse files Browse the repository at this point in the history
…#4592)

Signed-off-by: mikelam-us <michael.lam@aixplain.com>
Signed-off-by: mikelam-us <69995279+mikelam-us@users.noreply.github.com>
Signed-off-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Co-authored-by: mikelam-us <michael.lam@aixplain.com>
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
3 people authored and SpiritZhou committed Jun 30, 2023
1 parent daaaff1 commit 26b0afd
Show file tree
Hide file tree
Showing 9 changed files with 1,917 additions and 99 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **General**: Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **Redis Scalers**: Allow scaling using consumer group lag ([#3127](https://github.com/kedacore/keda/issues/3127))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))

### Improvements

Expand Down
146 changes: 124 additions & 22 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/redis/go-redis/v9"
Expand All @@ -19,22 +20,27 @@ type scaleFactor int8
const (
xPendingFactor scaleFactor = iota + 1
xLengthFactor
lagFactor
)

const (
// defaults
defaultDBIndex = 0
defaultTargetEntries = 5
defaultDBIndex = 0
defaultTargetEntries = 5
defaultTargetLag = 5
defaultActivationLagCount = 0

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"
lagMetadata = "lagCount"
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"
activationValueTriggerConfigName = "activationLagCount"
)

type redisStreamsScaler struct {
Expand All @@ -49,11 +55,13 @@ type redisStreamsMetadata struct {
scaleFactor scaleFactor
targetPendingEntriesCount int64
targetStreamLength int64
targetLag int64
streamName string
consumerGroupName string
databaseIndex int
connectionInfo redisConnectionInfo
scalerIndex int
activationLagCount int64
}

// NewRedisStreamsScaler creates a new redisStreamsScaler
Expand Down Expand Up @@ -87,6 +95,7 @@ func NewRedisStreamsScaler(ctx context.Context, isClustered, isSentinel bool, co

func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
client, err := getRedisClusterClient(ctx, meta.connectionInfo)

if err != nil {
return nil, fmt.Errorf("connection to redis cluster failed: %w", err)
}
Expand Down Expand Up @@ -166,6 +175,76 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent
}
return entriesLength, nil
}
case lagFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
// Make sure that redis is version 7+, which is required for xinfo lag
info, err := client.Info(ctx).Result()
if err != nil {
err := errors.New("could not find Redis version")
return -1, err
}
infoLines := strings.Split(info, "\n")
versionFound := false
for i := 0; i < len(infoLines); i++ {
line := infoLines[i]
lineSplit := strings.Split(line, ":")
if len(lineSplit) > 1 {
fieldName := lineSplit[0]
fieldValue := lineSplit[1]
if fieldName == "redis_version" {
versionFound = true
versionNumString := strings.Split(fieldValue, ".")[0]
versionNum, err := strconv.ParseInt(versionNumString, 10, 64)
if err != nil {
err := errors.New("redis version could not be converted to number")
return -1, err
}
if versionNum < int64(7) {
err := errors.New("redis version 7+ required for lag")
return -1, err
}
break
}
}
}
if !versionFound {
err := errors.New("could not find Redis version number")
return -1, err
}
groups, err := client.XInfoGroups(ctx, meta.streamName).Result()

// If XINFO GROUPS can't find the stream key, it hasn't been created
// yet. In that case, we return a lag of 0.
if fmt.Sprint(err) == "ERR no such key" {
return 0, nil
}

// If the stream has been created, then we find the consumer group
// associated with this scaler and return its lag.
numGroups := len(groups)
for i := 0; i < numGroups; i++ {
group := groups[i]
if group.Name == meta.consumerGroupName {
return group.Lag, nil
}
}

// There is an edge case where the Redis producer has set up the
// stream [meta.streamName], but the consumer group [meta.consumerGroupName]
// for that stream isn't registered with Redis. In other words, the
// producer has created messages for the stream, but the consumer group
// hasn't yet registered itself on Redis because scaling starts with 0
// consumers. In this case, it's necessary to use XLEN to return what
// the lag would have been if the consumer group had been created since
// it's not possible to obtain the lag for a nonexistent consumer
// group. From here, the consumer group gets instantiated, and scaling
// again occurs according to XINFO GROUP lag.
entriesLength, err := client.XLen(ctx, meta.streamName).Result()
if err != nil {
return -1, err
}
return entriesLength, nil
}
default:
err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor)
}
Expand Down Expand Up @@ -210,16 +289,38 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)
return nil, ErrRedisMissingStreamName
}

meta.activationLagCount = defaultActivationLagCount

if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
meta.scaleFactor = xPendingFactor
meta.targetPendingEntriesCount = defaultTargetEntries
if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if val, ok := config.TriggerMetadata[lagMetadata]; ok {
meta.scaleFactor = lagFactor
lag, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
return nil, fmt.Errorf("error parsing lag: %w", err)
}
meta.targetLag = lag

if val, ok := config.TriggerMetadata[activationValueTriggerConfigName]; ok {
activationVal, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, errors.New("error while parsing activation lag value")
}
meta.activationLagCount = activationVal
} else {
err := errors.New("activationLagCount required for Redis lag")
return nil, err
}
} else {
meta.scaleFactor = xPendingFactor
meta.targetPendingEntriesCount = defaultTargetEntries
if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
}
meta.targetPendingEntriesCount = pendingEntriesCount
}
} else {
meta.scaleFactor = xLengthFactor
Expand Down Expand Up @@ -259,6 +360,8 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
metricValue = s.metadata.targetPendingEntriesCount
case xLengthFactor:
metricValue = s.metadata.targetStreamLength
case lagFactor:
metricValue = s.metadata.targetLag
}

externalMetric := &v2.ExternalMetricSource{
Expand All @@ -271,16 +374,15 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream
// GetMetricsAndActivity fetches the metric value for a consumer group in a stream
func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
pendingEntriesCount, err := s.getEntriesCountFn(ctx)
metricCount, err := s.getEntriesCountFn(ctx)

if err != nil {
s.logger.Error(err, "error fetching pending entries count")
s.logger.Error(err, "error fetching metric count")
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, float64(pendingEntriesCount))

return []external_metrics.ExternalMetricValue{metric}, pendingEntriesCount > 0, nil
metric := GenerateMetricInMili(metricName, float64(metricCount))
return []external_metrics.ExternalMetricValue{metric}, metricCount > s.metadata.activationLagCount, nil
}
Loading

0 comments on commit 26b0afd

Please sign in to comment.