diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index 9faa2aa8cde..0d28c380faf 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -352,6 +352,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams) case "azure-monitor": return scalers.NewAzureMonitorScaler(resolvedEnv, triggerMetadata, authParams) + case "redis-streams": + return scalers.NewRedisStreamsScaler(resolvedEnv, triggerMetadata, authParams) default: return nil, fmt.Errorf("no scaler found for type: %s", triggerType) } diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go new file mode 100644 index 00000000000..8eb0e071d47 --- /dev/null +++ b/pkg/scalers/redis_streams_scaler.go @@ -0,0 +1,244 @@ +package scalers + +import ( + "context" + "crypto/tls" + "fmt" + "strconv" + + "github.com/go-redis/redis" + v2beta1 "k8s.io/api/autoscaling/v2beta1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + pendingEntriesCountMetricName = "RedisStreamPendingEntriesCount" + + // defaults + defaultTargetPendingEntriesCount = 5 + defaultAddress = "redis-master.default.svc.cluster.local:6379" + defaultPassword = "" + defaultDbIndex = 0 + defaultTLS = false + defaultRedisHost = "" + defaultRedisPort = "" + + // metadata names + pendingEntriesCountMetadata = "pendingEntriesCount" + streamNameMetadata = "stream" + consumerGroupNameMetadata = "consumerGroup" + addressMetadata = "address" + hostMetadata = "host" + portMetadata = "port" + passwordMetadata = "password" + databaseIndexMetadata = "databaseIndex" + enableTLSMetadata = "enableTLS" + + // error + missingRedisAddressOrHostPortInfo = "address or host missing. please provide redis address should in host:port format or set the host/port values" +) + +type redisStreamsScaler struct { + metadata *redisStreamsMetadata + conn *redis.Client +} + +type redisStreamsMetadata struct { + targetPendingEntriesCount int + streamName string + consumerGroupName string + address string + password string + host string + port string + databaseIndex int + enableTLS bool +} + +var redisStreamsLog = logf.Log.WithName("redis_streams_scaler") + +// NewRedisStreamsScaler creates a new redisStreamsScaler +func NewRedisStreamsScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + meta, err := parseRedisStreamsMetadata(metadata, resolvedEnv, authParams) + if err != nil { + return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) + } + + c, err := getRedisConnection(meta) + if err != nil { + return nil, fmt.Errorf("redis connection failed: %s", err) + } + + return &redisStreamsScaler{ + metadata: meta, + conn: c, + }, nil +} + +func getRedisConnection(metadata *redisStreamsMetadata) (*redis.Client, error) { + options := &redis.Options{ + Addr: metadata.address, + Password: metadata.password, + DB: metadata.databaseIndex, + } + + if metadata.enableTLS == true { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } + + // this does not guarentee successful connection + c := redis.NewClient(options) + + // confirm if connected + err := c.Ping().Err() + if err != nil { + return nil, err + } + return c, nil +} + +func parseRedisStreamsMetadata(metadata, resolvedEnv, authParams map[string]string) (*redisStreamsMetadata, error) { + meta := redisStreamsMetadata{} + meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount + + if val, ok := metadata[pendingEntriesCountMetadata]; ok { + pendingEntriesCount, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("error parsing pending entries count %v", err) + } + meta.targetPendingEntriesCount = pendingEntriesCount + } else { + return nil, fmt.Errorf("missing pending entries count") + } + + if val, ok := metadata[streamNameMetadata]; ok { + meta.streamName = val + } else { + return nil, fmt.Errorf("missing redis stream name") + } + + if val, ok := metadata[consumerGroupNameMetadata]; ok { + meta.consumerGroupName = val + } else { + return nil, fmt.Errorf("missing redis stream consumer group name") + } + + address := defaultAddress + host := defaultRedisHost + port := defaultRedisPort + if val, ok := metadata[addressMetadata]; ok && val != "" { + address = val + } else { + if val, ok := metadata[hostMetadata]; ok && val != "" { + host = val + } else { + return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo) + } + if val, ok := metadata[portMetadata]; ok && val != "" { + port = val + } else { + return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo) + } + } + + if val, ok := resolvedEnv[address]; ok { + meta.address = val + } else { + if val, ok := resolvedEnv[host]; ok { + meta.host = val + } else { + return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo) + } + + if val, ok := resolvedEnv[port]; ok { + meta.port = val + } else { + return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo) + } + meta.address = fmt.Sprintf("%s:%s", meta.host, meta.port) + } + + meta.password = defaultPassword + if val, ok := authParams[passwordMetadata]; ok { + meta.password = val + } else if val, ok := metadata[passwordMetadata]; ok && val != "" { + if passd, ok := resolvedEnv[val]; ok { + meta.password = passd + } + } + + meta.databaseIndex = defaultDbIndex + if val, ok := metadata[databaseIndexMetadata]; ok { + dbIndex, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing redis database index %v", err) + } + meta.databaseIndex = int(dbIndex) + } + + meta.enableTLS = defaultTLS + if val, ok := metadata[enableTLSMetadata]; ok { + tls, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing enableTLS %v", err) + } + meta.enableTLS = tls + } + return &meta, nil +} + +// IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream +func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { + count, err := s.getPendingEntriesCount() + + if err != nil { + redisStreamsLog.Error(err, "error") + return false, err + } + + return count > 0, nil +} + +func (s *redisStreamsScaler) Close() error { + return s.conn.Close() +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { + + targetPendingEntriesCount := resource.NewQuantity(int64(s.metadata.targetPendingEntriesCount), resource.DecimalSI) + externalMetric := &v2beta1.ExternalMetricSource{MetricName: pendingEntriesCountMetricName, TargetAverageValue: targetPendingEntriesCount} + metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta1.MetricSpec{metricSpec} +} + +// GetMetrics fetches the number of pending entries for a consumer group in a stream +func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + pendingEntriesCount, err := s.getPendingEntriesCount() + + if err != nil { + redisStreamsLog.Error(err, "error fetching pending entries count") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(pendingEntriesCount, resource.DecimalSI), + Timestamp: metav1.Now(), + } + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s *redisStreamsScaler) getPendingEntriesCount() (int64, error) { + pendingEntries, err := s.conn.XPending(s.metadata.streamName, s.metadata.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil +} diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go new file mode 100644 index 00000000000..b7bec841810 --- /dev/null +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -0,0 +1,105 @@ +package scalers + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseRedisStreamsMetadata(t *testing.T) { + + type testCase struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + } + + authParams := map[string]string{"password": "foobarred"} + + testCases := []testCase{ + {"with address", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "address": "REDIS_SERVICE", "password": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, map[string]string{ + "REDIS_SERVICE": "myredis:6379", + "REDIS_PASSWORD": "foobarred", + }, nil}, + + {"with host and port", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, map[string]string{ + "REDIS_HOST": "myredis", + "REDIS_PORT": "6379", + "REDIS_PASSWORD": "foobarred", + }, authParams}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(te *testing.T) { + m, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, tc.authParams) + assert.Nil(t, err) + assert.Equal(t, m.streamName, tc.metadata[streamNameMetadata]) + assert.Equal(t, m.consumerGroupName, tc.metadata[consumerGroupNameMetadata]) + assert.Equal(t, strconv.Itoa(m.targetPendingEntriesCount), tc.metadata[pendingEntriesCountMetadata]) + if authParams != nil { + //if authParam is used + assert.Equal(t, m.password, authParams[passwordMetadata]) + } else { + //if metadata is used to pass password env var name + assert.Equal(t, m.password, tc.resolvedEnv[tc.metadata[passwordMetadata]]) + } + assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata]) + b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata]) + assert.Nil(t, err) + assert.Equal(t, m.enableTLS, b) + }) + } +} + +func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) { + resolvedEnvMap := map[string]string{ + "REDIS_SERVER": "myredis:6379", + "REDIS_HOST": "myredis", + "REDIS_PORT": "6379", + "REDIS_PASSWORD": "", + } + type testCase struct { + name string + metadata map[string]string + resolvedEnv map[string]string + } + + testCases := []testCase{ + //missing mandatory metadata + {"missing address as well as host/port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group"}, resolvedEnvMap}, + + {"host present but missing port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "host": "REDIS_HOST"}, resolvedEnvMap}, + + {"port present but missing host", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "port": "REDIS_PORT"}, resolvedEnvMap}, + + {"missing stream", map[string]string{"pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap}, + + {"missing consumerGroup", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "address": "REDIS_HOST"}, resolvedEnvMap}, + + {"missing pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap}, + + //invalid value for respective fields + {"invalid pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "junk", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnvMap}, + + {"invalid databaseIndex", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "junk", "enableTLS": "false"}, resolvedEnvMap}, + + {"invalid enableTLS", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "1", "enableTLS": "no"}, resolvedEnvMap}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(te *testing.T) { + _, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, map[string]string{}) + assert.NotNil(t, err) + }) + } +} + +type redisStreamsTestMetadata struct { + metadata map[string]string + isError bool + authParams map[string]string +} diff --git a/tests/scalers/redis-streams.test.ts b/tests/scalers/redis-streams.test.ts new file mode 100644 index 00000000000..1f425c0a4a8 --- /dev/null +++ b/tests/scalers/redis-streams.test.ts @@ -0,0 +1,251 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' + +const redisNamespace = 'redis-ns' +const testNamespace = 'redis-streams-ns' +const redisDeploymentName = 'redis' +const redisPassword = 'foobared' +const redisHost = `redis-service.${redisNamespace}.svc.cluster.local:6379` +const numMessages = 100 + +test.before(t => { + // setup Redis + sh.exec(`kubectl create namespace ${redisNamespace}`) + + const tmpFile1 = tmp.fileSync() + fs.writeFileSync(tmpFile1.name, redisDeployYaml.replace('{{REDIS_PASSWORD}}', redisPassword)) + + t.is(0, sh.exec(`kubectl apply --namespace ${redisNamespace} -f ${tmpFile1.name}`).code, 'creating a Redis deployment should work.') + + // wait for redis to be ready + let redisReplicaCount = '0' + for (let i = 0; i < 30; i++) { + redisReplicaCount = sh.exec(`kubectl get deploy/${redisDeploymentName} -n ${redisNamespace} -o jsonpath='{.spec.replicas}'`).stdout + if (redisReplicaCount != '1') { + sh.exec('sleep 2s') + } + } + t.is('1', redisReplicaCount, 'Redis is not in a ready state') + + sh.exec(`kubectl create namespace ${testNamespace}`) + + // deploy streams consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + + fs.writeFileSync(tmpFile.name, redisStreamsDeployYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_HOST}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 1 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '1', 'replica count should start out as 1') +}) + +test.serial(`Deployment should scale to 5 with ${numMessages} messages and back to 1`, t => { + // publish messages + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, producerDeployYaml.replace('{{NUM_MESSAGES}}', numMessages.toString()) + .replace('{{REDIS_HOST}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'producer job should apply.' + ) + + // wait for the producer job to complete + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } + // with messages published, the consumer deployment should start receiving the messages + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 60 && replicaCount !== '1'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '1') { + sh.exec('sleep 10s') + } + } + + t.is('1', replicaCount, 'Replica count should be 1 within 10 minutes') +}) + + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + 'secret/redis-password', + 'deployment/redis-streams-consumer', + 'scaledobject.keda.k8s.io/redis-streams-scaledobject', + 'triggerauthentications.keda.k8s.io/keda-redis-stream-triggerauth' + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +const redisDeployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis +spec: + selector: + matchLabels: + app: redis + replicas: 1 + template: + metadata: + labels: + app: redis + spec: + containers: + - name: master + image: redis + command: ["redis-server", "--requirepass", "{{REDIS_PASSWORD}}"] + ports: + - containerPort: 6379 +--- +apiVersion: v1 +kind: Service +metadata: + name: redis-service + labels: + app: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + selector: + app: redis +` + +const redisStreamsDeployYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} +--- +apiVersion: keda.k8s.io/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-stream-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-streams-consumer +spec: + replicas: 1 + selector: + matchLabels: + app: redis-streams-consumer + template: + metadata: + labels: + app: redis-streams-consumer + spec: + containers: + - name: redis-streams-consumer + image: abhirockzz/redis-streams-consumer + imagePullPolicy: Always + env: + - name: REDIS_HOST + value: {{REDIS_HOST}} + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password +--- +apiVersion: keda.k8s.io/v1alpha1 +kind: ScaledObject +metadata: + name: redis-streams-scaledobject + labels: + deploymentName: redis-streams-consumer +spec: + scaleTargetRef: + deploymentName: redis-streams-consumer + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 1 + maxReplicaCount: 5 + triggers: + - type: redis-streams + metadata: + address: REDIS_HOST + stream: my-stream + consumerGroup: consumer-group-1 + pendingEntriesCount: "10" + authenticationRef: + name: keda-redis-stream-triggerauth +` + +const producerDeployYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: redis-streams-producer +spec: + template: + spec: + containers: + - name: producer + image: abhirockzz/redis-streams-producer + imagePullPolicy: Always + env: + - name: REDIS_HOST + value: {{REDIS_HOST}} + - name: REDIS_STREAM_NAME + value: my-stream + - name: NUM_MESSAGES + value: "{{NUM_MESSAGES}}" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + restartPolicy: Never +` \ No newline at end of file