diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..3160a14858e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ ### New - Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994)) - Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239)) +- Add Redis cluster support for Redis list and Redis streams scalers. ### Improvements - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index c3baddfada1..071616d2dfb 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "strconv" + "strings" "github.com/go-redis/redis" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -23,16 +24,19 @@ const ( defaultEnableTLS = false ) +type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) + type redisScaler struct { - metadata *redisMetadata - client *redis.Client + metadata *redisMetadata + closeFn func() error + getListLengthFn func() (int64, error) } type redisConnectionInfo struct { - address string + addresses []string password string - host string - port string + hosts []string + ports []string enableTLS bool } @@ -46,31 +50,96 @@ type redisMetadata struct { var redisLog = logf.Log.WithName("redis_scaler") // NewRedisScaler creates a new redisScaler -func NewRedisScaler(config *ScalerConfig) (Scaler, error) { - meta, err := parseRedisMetadata(config) +func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) { + luaScript := ` + local listName = KEYS[1] + local listType = redis.call('type', listName).ok + local cmd = { + zset = 'zcard', + set = 'scard', + list = 'llen', + hash = 'hlen', + none = 'llen' + } + + return redis.call(cmd[listType], listName) + ` + if isClustered { + meta, err := parseRedisMetadata(config, parseRedisClusterAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %s", err) + } + return createClusteredRedisScaler(meta, luaScript) + } + meta, err := parseRedisMetadata(config, parseRedisAddress) if err != nil { return nil, fmt.Errorf("error parsing redis metadata: %s", err) } - options := &redis.Options{ - Addr: meta.connectionInfo.address, - Password: meta.connectionInfo.password, - DB: meta.databaseIndex, + return createRedisScaler(meta, luaScript) +} + +func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, error) { + client, err := getRedisClusterClient(meta.connectionInfo) + if err != nil { + return nil, fmt.Errorf("connection to redis cluster failed: %s", err) } - if meta.connectionInfo.enableTLS { - options.TLSConfig = &tls.Config{ - InsecureSkipVerify: meta.connectionInfo.enableTLS, + closeFn := func() error { + if err := client.Close(); err != nil { + redisLog.Error(err, "error closing redis client") + return err + } + return nil + } + + listLengthFn := func() (int64, error) { + cmd := client.Eval(script, []string{meta.listName}) + if cmd.Err() != nil { + return -1, cmd.Err() + } + + return cmd.Int64() + } + + return &redisScaler{ + metadata: meta, + closeFn: closeFn, + getListLengthFn: listLengthFn, + }, nil +} + +func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) { + client, err := getRedisClient(meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisLog.Error(err, "error closing redis client") + return err + } + return nil + } + + listLengthFn := func() (int64, error) { + cmd := client.Eval(script, []string{meta.listName}) + if cmd.Err() != nil { + return -1, cmd.Err() } + + return cmd.Int64() } return &redisScaler{ - metadata: meta, - client: redis.NewClient(options), + metadata: meta, + closeFn: closeFn, + getListLengthFn: listLengthFn, }, nil } -func parseRedisMetadata(config *ScalerConfig) (*redisMetadata, error) { - connInfo, err := parseRedisAddress(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) +func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*redisMetadata, error) { + connInfo, err := parserFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) if err != nil { return nil, err } @@ -107,7 +176,7 @@ func parseRedisMetadata(config *ScalerConfig) (*redisMetadata, error) { // IsActive checks if there is any element in the Redis list func (s *redisScaler) IsActive(ctx context.Context) (bool, error) { - length, err := getRedisListLength(s.client, s.metadata.listName) + length, err := s.getListLengthFn() if err != nil { redisLog.Error(err, "error") @@ -118,15 +187,7 @@ func (s *redisScaler) IsActive(ctx context.Context) (bool, error) { } func (s *redisScaler) Close() error { - if s.client != nil { - err := s.client.Close() - if err != nil { - redisLog.Error(err, "error closing redis client") - return err - } - } - - return nil + return s.closeFn() } // GetMetricSpecForScaling returns the metric spec for the HPA @@ -149,7 +210,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics connects to Redis and finds the length of the list func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - listLen, err := getRedisListLength(s.client, s.metadata.listName) + listLen, err := s.getListLengthFn() if err != nil { redisLog.Error(err, "error getting list length") @@ -165,63 +226,40 @@ func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricS return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func getRedisListLength(client *redis.Client, listName string) (int64, error) { - luaScript := ` - local listName = KEYS[1] - local listType = redis.call('type', listName).ok - local cmd = { - zset = 'zcard', - set = 'scard', - list = 'llen', - hash = 'hlen', - none = 'llen' - } - - return redis.call(cmd[listType], listName) - ` - - cmd := client.Eval(luaScript, []string{listName}) - if cmd.Err() != nil { - return -1, cmd.Err() - } - - return cmd.Int64() -} - func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { info := redisConnectionInfo{} switch { case authParams["address"] != "": - info.address = authParams["address"] + info.addresses = append(info.addresses, authParams["address"]) case metadata["address"] != "": - info.address = metadata["address"] + info.addresses = append(info.addresses, metadata["address"]) case metadata["addressFromEnv"] != "": - info.address = resolvedEnv[metadata["addressFromEnv"]] + info.addresses = append(info.addresses, resolvedEnv[metadata["addressFromEnv"]]) default: switch { case authParams["host"] != "": - info.host = authParams["host"] + info.hosts = append(info.hosts, authParams["host"]) case metadata["host"] != "": - info.host = metadata["host"] + info.hosts = append(info.hosts, metadata["host"]) case metadata["hostFromEnv"] != "": - info.host = resolvedEnv[metadata["hostFromEnv"]] + info.hosts = append(info.hosts, resolvedEnv[metadata["hostFromEnv"]]) } switch { case authParams["port"] != "": - info.port = authParams["port"] + info.ports = append(info.ports, authParams["port"]) case metadata["port"] != "": - info.port = metadata["port"] + info.ports = append(info.ports, metadata["port"]) case metadata["portFromEnv"] != "": - info.port = resolvedEnv[metadata["portFromEnv"]] + info.ports = append(info.ports, resolvedEnv[metadata["portFromEnv"]]) } - if len(info.host) != 0 && len(info.port) != 0 { - info.address = fmt.Sprintf("%s:%s", info.host, info.port) + if len(info.hosts) != 0 && len(info.ports) != 0 { + info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[0], info.ports[0])) } } - if len(info.address) == 0 { + if len(info.addresses) == 0 || len(info.addresses[0]) == 0 { return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") } @@ -242,3 +280,113 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, nil } + +func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { + info := redisConnectionInfo{} + switch { + case authParams["addresses"] != "": + info.addresses = splitAndTrim(authParams["addresses"]) + case metadata["addresses"] != "": + info.addresses = splitAndTrim(metadata["addresses"]) + case metadata["addressesFromEnv"] != "": + info.addresses = splitAndTrim(resolvedEnv[metadata["addressesFromEnv"]]) + default: + switch { + case authParams["hosts"] != "": + info.hosts = splitAndTrim(authParams["hosts"]) + case metadata["hosts"] != "": + info.hosts = splitAndTrim(metadata["hosts"]) + case metadata["hostsFromEnv"] != "": + info.hosts = splitAndTrim(resolvedEnv[metadata["hostsFromEnv"]]) + } + + switch { + case authParams["ports"] != "": + info.ports = splitAndTrim(authParams["ports"]) + case metadata["ports"] != "": + info.ports = splitAndTrim(metadata["ports"]) + case metadata["portsFromEnv"] != "": + info.ports = splitAndTrim(resolvedEnv[metadata["portsFromEnv"]]) + } + + if len(info.hosts) != 0 && len(info.ports) != 0 { + if len(info.hosts) != len(info.ports) { + return info, fmt.Errorf("not enough hosts or ports given. number of hosts should be equal to the number of ports") + } + for i := range info.hosts { + info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[i], info.ports[i])) + } + } + } + + if len(info.addresses) == 0 { + return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") + } + + if authParams["password"] != "" { + info.password = authParams["password"] + } else if metadata["passwordFromEnv"] != "" { + info.password = resolvedEnv[metadata["passwordFromEnv"]] + } + + info.enableTLS = defaultEnableTLS + if val, ok := metadata["enableTLS"]; ok { + tls, err := strconv.ParseBool(val) + if err != nil { + return info, fmt.Errorf("enableTLS parsing error %s", err.Error()) + } + info.enableTLS = tls + } + + return info, nil +} + +func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, error) { + options := &redis.ClusterOptions{ + Addrs: info.addresses, + Password: info.password, + } + if info.enableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: info.enableTLS, + } + } + + // confirm if connected + c := redis.NewClusterClient(options) + err := c.Ping().Err() + if err != nil { + return nil, err + } + return c, nil +} + +func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) { + options := &redis.Options{ + Addr: info.addresses[0], + Password: info.password, + DB: dbIndex, + } + if info.enableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: info.enableTLS, + } + } + + // confirm if connected + c := redis.NewClient(options) + err := c.Ping().Err() + if err != nil { + return nil, err + } + return c, nil +} + +// Splits a string separated by comma and trims space from all the elements. +func splitAndTrim(s string) []string { + x := strings.Split(s, ",") + for i := range x { + x[i] = strings.Trim(x[i], " ") + } + return x +} diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index c9ca033bff9..aacae87a03e 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -1,9 +1,10 @@ package scalers import ( + "errors" "testing" - "github.com/go-redis/redis" + "github.com/stretchr/testify/assert" ) var testRedisResolvedEnv = map[string]string{ @@ -56,7 +57,7 @@ var redisMetricIdentifiers = []redisMetricIdentifier{ func TestRedisParseMetadata(t *testing.T) { testCaseNum := 1 for _, testData := range testRedisMetadata { - _, err := parseRedisMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.authParams}) + _, err := parseRedisMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.authParams}, parseRedisAddress) if err != nil && !testData.isError { t.Errorf("Expected success but got error for unit test # %v", testCaseNum) } @@ -69,11 +70,17 @@ func TestRedisParseMetadata(t *testing.T) { func TestRedisGetMetricSpecForScaling(t *testing.T) { for _, testData := range redisMetricIdentifiers { - meta, err := parseRedisMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.metadataTestData.authParams}) + meta, err := parseRedisMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.metadataTestData.authParams}, parseRedisAddress) if err != nil { t.Fatal("Could not parse metadata:", err) } - mockRedisScaler := redisScaler{meta, &redis.Client{}} + closeFn := func() error { return nil } + lengthFn := func() (int64, error) { return -1, nil } + mockRedisScaler := redisScaler{ + meta, + closeFn, + lengthFn, + } metricSpec := mockRedisScaler.GetMetricSpecForScaling() metricName := metricSpec[0].External.Metric.Name @@ -82,3 +89,105 @@ func TestRedisGetMetricSpecForScaling(t *testing.T) { } } } + +func TestParseRedisClusterMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no list name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listLength": "5", + }, + wantMeta: nil, + wantErr: errors.New("no list name given"), + }, + { + name: "invalid list length", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "listLength": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("list length parsing error"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisMetadata(config, parseRedisClusterAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 46276a82221..7379da9ce44 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -2,11 +2,9 @@ package scalers import ( "context" - "crypto/tls" "fmt" "strconv" - "github.com/go-redis/redis" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,8 +30,9 @@ const ( ) type redisStreamsScaler struct { - metadata *redisStreamsMetadata - conn *redis.Client + metadata *redisStreamsMetadata + closeFn func() error + getPendingEntriesCountFn func() (int64, error) } type redisStreamsMetadata struct { @@ -47,49 +46,81 @@ type redisStreamsMetadata struct { var redisStreamsLog = logf.Log.WithName("redis_streams_scaler") // NewRedisStreamsScaler creates a new redisStreamsScaler -func NewRedisStreamsScaler(config *ScalerConfig) (Scaler, error) { - meta, err := parseRedisStreamsMetadata(config) +func NewRedisStreamsScaler(isClustered bool, config *ScalerConfig) (Scaler, error) { + if isClustered { + meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) + } + return createClusteredRedisStreamsScaler(meta) + } + meta, err := parseRedisStreamsMetadata(config, parseRedisAddress) if err != nil { return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) } + return createRedisStreamsScaler(meta) +} - c, err := getRedisConnection(meta) +func createClusteredRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { + client, err := getRedisClusterClient(meta.connectionInfo) if err != nil { - return nil, fmt.Errorf("redis connection failed: %s", err) + return nil, fmt.Errorf("connection to redis cluster failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisStreamsLog.Error(err, "error closing redis client") + return err + } + return nil + } + + pendingEntriesCountFn := func() (int64, error) { + pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil } return &redisStreamsScaler{ - metadata: meta, - conn: c, + metadata: meta, + closeFn: closeFn, + getPendingEntriesCountFn: pendingEntriesCountFn, }, nil } -func getRedisConnection(metadata *redisStreamsMetadata) (*redis.Client, error) { - options := &redis.Options{ - Addr: metadata.connectionInfo.address, - Password: metadata.connectionInfo.password, - DB: metadata.databaseIndex, +func createRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { + client, err := getRedisClient(meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis failed: %s", err) } - if metadata.connectionInfo.enableTLS { - options.TLSConfig = &tls.Config{ - InsecureSkipVerify: true, + closeFn := func() error { + if err := client.Close(); err != nil { + redisStreamsLog.Error(err, "error closing redis client") + return err } + return nil } - // this does not guarantee successful connection - c := redis.NewClient(options) - - // confirm if connected - err := c.Ping().Err() - if err != nil { - return nil, err + pendingEntriesCountFn := func() (int64, error) { + pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil } - return c, nil + + return &redisStreamsScaler{ + metadata: meta, + closeFn: closeFn, + getPendingEntriesCountFn: pendingEntriesCountFn, + }, nil } -func parseRedisStreamsMetadata(config *ScalerConfig) (*redisStreamsMetadata, error) { - connInfo, err := parseRedisAddress(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) +func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) (*redisStreamsMetadata, error) { + connInfo, err := parseFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) if err != nil { return nil, err } @@ -134,7 +165,7 @@ func parseRedisStreamsMetadata(config *ScalerConfig) (*redisStreamsMetadata, err // IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { - count, err := s.getPendingEntriesCount() + count, err := s.getPendingEntriesCountFn() if err != nil { redisStreamsLog.Error(err, "error") @@ -145,7 +176,7 @@ func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { } func (s *redisStreamsScaler) Close() error { - return s.conn.Close() + return s.closeFn() } // GetMetricSpecForScaling returns the metric spec for the HPA @@ -166,7 +197,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - pendingEntriesCount, err := s.getPendingEntriesCount() + pendingEntriesCount, err := s.getPendingEntriesCountFn() if err != nil { redisStreamsLog.Error(err, "error fetching pending entries count") @@ -180,11 +211,3 @@ func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, } return append([]external_metrics.ExternalMetricValue{}, metric), nil } - -func (s *redisStreamsScaler) getPendingEntriesCount() (int64, error) { - pendingEntries, err := s.conn.XPending(s.metadata.streamName, s.metadata.consumerGroupName).Result() - if err != nil { - return -1, err - } - return pendingEntries.Count, nil -} diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index fd0067a242b..187d01bf89a 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "errors" "strconv" "testing" @@ -43,7 +44,7 @@ func TestParseRedisStreamsMetadata(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(te *testing.T) { - m, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}) + m, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}, parseRedisAddress) assert.Nil(t, err) assert.Equal(t, m.streamName, tc.metadata[streamNameMetadata]) assert.Equal(t, m.consumerGroupName, tc.metadata[consumerGroupNameMetadata]) @@ -101,7 +102,7 @@ func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(te *testing.T) { - _, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: map[string]string{}}) + _, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: map[string]string{}}, parseRedisAddress) assert.NotNil(t, err) }) } @@ -130,11 +131,13 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { } for _, testData := range redisStreamMetricIdentifiers { - meta, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: map[string]string{"REDIS_SERVICE": "my-address"}, AuthParams: testData.metadataTestData.authParams}) + meta, err := parseRedisStreamsMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: map[string]string{"REDIS_SERVICE": "my-address"}, AuthParams: testData.metadataTestData.authParams}, parseRedisAddress) if err != nil { t.Fatal("Could not parse metadata:", err) } - mockRedisStreamsScaler := redisStreamsScaler{meta, nil} + closeFn := func() error { return nil } + getPendingEntriesCountFn := func() (int64, error) { return -1, nil } + mockRedisStreamsScaler := redisStreamsScaler{meta, closeFn, getPendingEntriesCountFn} metricSpec := mockRedisStreamsScaler.GetMetricSpecForScaling() metricName := metricSpec[0].External.Metric.Name @@ -143,3 +146,120 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { } } } + +func TestParseRedisClusterStreamsMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisStreamsMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no stream name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "5", + }, + wantMeta: nil, + wantErr: errors.New("missing redis stream name"), + }, + { + name: "missing pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + }, + wantMeta: nil, + wantErr: errors.New("missing pending entries count"), + }, + { + name: "invalid pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("error parsing pending entries count"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index bba860fbf39..7662fc32140 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -459,9 +459,13 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal case "rabbitmq": return scalers.NewRabbitMQScaler(config) case "redis": - return scalers.NewRedisScaler(config) + return scalers.NewRedisScaler(false, config) + case "redis-cluster": + return scalers.NewRedisScaler(true, config) + case "redis-cluster-streams": + return scalers.NewRedisStreamsScaler(true, config) case "redis-streams": - return scalers.NewRedisStreamsScaler(config) + return scalers.NewRedisStreamsScaler(false, config) case "stan": return scalers.NewStanScaler(config) default: diff --git a/tests/scalers/redis-cluster-lists.test.ts b/tests/scalers/redis-cluster-lists.test.ts new file mode 100644 index 00000000000..0abb20b9ef3 --- /dev/null +++ b/tests/scalers/redis-cluster-lists.test.ts @@ -0,0 +1,532 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' + +const redisNamespace = 'redis-cluster' +const redisService = 'redis-cluster' +const testNamespace = 'redis-cluster-lists-test' +const redisStatefulSetName = 'redis-cluster' +const redisClusterName = 'redis-cluster' +const redisPassword = 'my-password' +let redisHost = '' +const redisPort = 6379 +let redisAddress = '' +const listNameForHostPortRef = 'my-test-list-host-port-ref' +const listNameForAddressRef = 'my-test-list-address-ref' +const listNameForHostPortTriggerAuth = 'my-test-list-host-port-trigger' +const redisWorkerHostPortRefDeploymentName = 'redis-worker-test-hostport' +const redisWorkerAddressRefDeploymentName = 'redis-worker-test-address' +const redisWorkerHostPortRefTriggerAuthDeploymentName = 'redis-worker-test-hostport-triggerauth' +const itemsToWrite = 200 +const deploymentContainerImage = 'goku321/redis-cluster-list:v1.4' +const writeJobNameForHostPortRef = 'redis-writer-host-port-ref' +const writeJobNameForAddressRef = 'redis-writer-address-ref' +const writeJobNameForHostPortInTriggerAuth = 'redis-writer-host-port-trigger-auth' + +test.before(t => { + // Deploy Redis cluster. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + t.is(0, + sh.exec(`helm install ${redisClusterName} --namespace ${redisNamespace} --set "global.redis.password=${redisPassword}" bitnami/redis-cluster`).code, + 'creating a Redis cluster should work.' + ) + + // Wait for Redis cluster to be ready. + let redisReplicaCount = '0' + for (let i = 0; i < 30; i++) { + redisReplicaCount = sh.exec(`kubectl get statefulset/${redisStatefulSetName} -n ${redisNamespace} -o jsonpath='{.spec.replicas}'`).stdout + if (redisReplicaCount != '6') { + sh.exec('sleep 2s') + } + } + t.is('6', redisReplicaCount, 'Redis is not in a ready state') + + // Get Redis cluster address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + redisAddress = `${redisHost}:${redisPort}` + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + const triggerAuthTmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + fs.writeFileSync(triggerAuthTmpFile.name, scaledObjectTriggerAuthYaml.replace('{{REDIS_PASSWORD}}', base64Password)) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth should work..' + ) + + const triggerAuthHostPortTmpFile = tmp.fileSync() + + fs.writeFileSync(triggerAuthHostPortTmpFile.name, + scaledObjectTriggerAuthHostPortYaml.replace('{{REDIS_PASSWORD}}', base64Password) + .replace('{{REDIS_HOSTS}}', Buffer.from(redisHost).toString('base64')) + .replace('{{REDIS_PORTS}}', Buffer.from(redisPort.toString()).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthHostPortTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth with host port should work..' + ) + + // Create a deployment with host and port. + const deploymentHostPortRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTmpFile.name, redisListDeployHostPortYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host and port envs should work..' + ) + + const deploymentAddressRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentAddressRefTmpFile.name, redisListDeployAddressYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_ADDRESSES}}/g, redisAddress) + .replace(/{{LIST_NAME}}/g, listNameForAddressRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerAddressRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentAddressRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis address var should work..' + ) + + + const deploymentHostPortRefTriggerAuthTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTriggerAuthTmpFile.name, redisListDeployHostPortInTriggerAuhYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortTriggerAuth) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefTriggerAuthDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTriggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host port in trigger auth should work..' + ) +}) + +test.serial('Deployment for redis host and port env vars should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port env vars should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + runWriteJob(t, writeJobNameForHostPortRef, listNameForHostPortRef) + + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + +test.serial('Deployment for redis address env var should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + + +test.serial(`Deployment using redis address env var should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForAddressRef, listNameForAddressRef) + + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.serial('Deployment for redis host and port in the trigger auth should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port in triggerAuth should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForHostPortInTriggerAuth, listNameForHostPortTriggerAuth) + + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + `job/${writeJobNameForHostPortRef}`, + `job/${writeJobNameForAddressRef}`, + `job/${writeJobNameForHostPortInTriggerAuth}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerAddressRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + 'triggerauthentication.keda.sh/keda-redis-list-triggerauth', + 'triggerauthentication.keda.sh/keda-redis-list-triggerauth-host-port', + `deployment/${redisWorkerAddressRefDeploymentName}`, + `deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + `deployment/${redisWorkerHostPortRefDeploymentName}`, + 'secret/redis-password', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +function runWriteJob(t, jobName, listName) { + // write to list + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESSES}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) + .replace('{{LIST_NAME}}', listName) + .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}', itemsToWrite.toString()) + .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) + .replace('{{JOB_NAME}}', jobName) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'list writer job should apply.' + ) + + // wait for the write job to complete + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job ${writeJobNameForHostPortRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } +} + +const redisListDeployHostPortYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: READ_PROCESS_TIME + value: "200" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-cluster + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + listName: {{LIST_NAME}} + listLength: "5" + authenticationRef: + name: keda-redis-cluster-list-triggerauth +` + + +const redisListDeployAddressYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-cluster + metadata: + addressesFromEnv: REDIS_ADDRESSES + listName: {{LIST_NAME}} + listLength: "5" + authenticationRef: + name: keda-redis-cluster-list-triggerauth +` + +const redisListDeployHostPortInTriggerAuhYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: READ_PROCESS_TIME + value: "200" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-cluster + metadata: + listName: {{LIST_NAME}} + listLength: "5" + authenticationRef: + name: keda-redis-cluster-list-triggerauth-host-port +` + +const scaledObjectTriggerAuthHostPortYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-config +type: Opaque +data: + password: {{REDIS_PASSWORD}} + redisHost: {{REDIS_HOSTS}} + redisPort: {{REDIS_PORTS}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-cluster-list-triggerauth-host-port +spec: + secretTargetRef: + - parameter: password + name: redis-config + key: password + - parameter: hosts + name: redis-config + key: redisHost + - parameter: ports + name: redis-config + key: redisPort +` + +const scaledObjectTriggerAuthYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-cluster-list-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password +` + + +const writeJobYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{JOB_NAME}} +spec: + template: + spec: + containers: + - name: redis + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: NO_LIST_ITEMS_TO_WRITE + value: "{{NUMBER_OF_ITEMS_TO_WRITE}}" + args: ["write"] + restartPolicy: Never + backoffLimit: 4 +` diff --git a/tests/scalers/redis-cluster-streams.test.ts b/tests/scalers/redis-cluster-streams.test.ts new file mode 100644 index 00000000000..37517facb67 --- /dev/null +++ b/tests/scalers/redis-cluster-streams.test.ts @@ -0,0 +1,232 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' + +const redisNamespace = 'redis-cluster' +const redisClusterName = 'redis-cluster' +const redisStatefulSetName = 'redis-cluster' +const redisService = 'redis-cluster' +const testNamespace = 'redis-cluster-streams' +const redisPassword = 'foobared' +let redisHost = '' +let redisAddress = '' +const redisPort = 6379 +const numMessages = 100 + +test.before(t => { + // Deploy Redis cluster. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + t.is(0, + sh.exec(`helm install ${redisClusterName} --namespace ${redisNamespace} --set "global.redis.password=${redisPassword}" bitnami/redis-cluster`).code, + 'creating a Redis cluster should work.' + ) + + // Wait for Redis cluster to be ready. + let redisReplicaCount = '0' + for (let i = 0; i < 30; i++) { + redisReplicaCount = sh.exec(`kubectl get statefulset/${redisStatefulSetName} -n ${redisNamespace} -o jsonpath='{.spec.replicas}'`).stdout + if (redisReplicaCount != '6') { + sh.exec('sleep 2s') + } + } + t.is('6', redisReplicaCount, 'Redis is not in a ready state') + + // Get Redis cluster address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + redisAddress = `${redisHost}:${redisPort}` + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + // Deploy streams consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + + fs.writeFileSync(tmpFile.name, redisStreamsDeployYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 1 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '1', 'replica count should start out as 1') +}) + +test.serial(`Deployment should scale to 5 with ${numMessages} messages and back to 1`, t => { + // Publish messages to redis streams. + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, producerDeployYaml.replace('{{NUM_MESSAGES}}', numMessages.toString()) + .replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'producer job should apply.' + ) + + // Wait for producer job to finish. + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } + // With messages published, the consumer deployment should start receiving the messages. + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 60 && replicaCount !== '1'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '1') { + sh.exec('sleep 10s') + } + } + + t.is('1', replicaCount, 'Replica count should be 1 within 10 minutes') +}) + + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + 'scaledobject.keda.sh/redis-streams-scaledobject', + 'triggerauthentications.keda.sh/keda-redis-stream-triggerauth', + 'secret/redis-password', + 'deployment/redis-streams-consumer', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +const redisStreamsDeployYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-stream-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-streams-consumer +spec: + replicas: 1 + selector: + matchLabels: + app: redis-streams-consumer + template: + metadata: + labels: + app: redis-streams-consumer + spec: + containers: + - name: redis-streams-consumer + image: goku321/redis-cluster-streams:v2.2 + command: ["./main"] + args: ["consumer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: redis-streams-scaledobject +spec: + scaleTargetRef: + name: redis-streams-consumer + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 1 + maxReplicaCount: 5 + triggers: + - type: redis-cluster-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + stream: my-stream + consumerGroup: consumer-group-1 + pendingEntriesCount: "10" + authenticationRef: + name: keda-redis-stream-triggerauth +` + +const producerDeployYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: redis-streams-producer +spec: + template: + spec: + containers: + - name: producer + image: goku321/redis-cluster-streams:v2.2 + command: ["./main"] + args: ["producer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "6379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: NUM_MESSAGES + value: "{{NUM_MESSAGES}}" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + restartPolicy: Never +` diff --git a/tests/setup.test.ts b/tests/setup.test.ts index ac277ce83da..3aeeaaaea7c 100644 --- a/tests/setup.test.ts +++ b/tests/setup.test.ts @@ -64,3 +64,16 @@ test.serial('verifyKeda', t => { t.true(success, 'expected keda deployments to start 2 pods successfully') }) + +test.serial('setup helm', t => { + // check if helm is already installed. + let result = sh.exec('helm version') + if(result.code == 0) { + t.pass('helm is already installed. skipping setup') + return + } + t.is(0, sh.exec(`curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3`).code, 'should be able to download helm script') + t.is(0, sh.exec(`chmod 700 get_helm.sh`).code, 'should be able to change helm script permissions') + t.is(0, sh.exec(`./get_helm.sh`).code, 'should be able to download helm') + t.is(0, sh.exec(`helm version`).code, 'should be able to get helm version') +})