Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis Streams Scaler #765

Merged
merged 3 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
case "azure-monitor":
return scalers.NewAzureMonitorScaler(resolvedEnv, triggerMetadata, authParams)
case "redis-streams":
return scalers.NewRedisStreamsScaler(resolvedEnv, triggerMetadata, authParams)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
244 changes: 244 additions & 0 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package scalers

import (
"context"
"crypto/tls"
"fmt"
"strconv"

"github.com/go-redis/redis"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
pendingEntriesCountMetricName = "RedisStreamPendingEntriesCount"

// defaults
defaultTargetPendingEntriesCount = 5
defaultAddress = "redis-master.default.svc.cluster.local:6379"
defaultPassword = ""
defaultDbIndex = 0
defaultTLS = false
defaultRedisHost = ""
defaultRedisPort = ""

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
addressMetadata = "address"
hostMetadata = "host"
portMetadata = "port"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"

// error
missingRedisAddressOrHostPortInfo = "address or host missing. please provide redis address should in host:port format or set the host/port values"
)

type redisStreamsScaler struct {
metadata *redisStreamsMetadata
conn *redis.Client
}

type redisStreamsMetadata struct {
targetPendingEntriesCount int
streamName string
consumerGroupName string
address string
password string
host string
port string
databaseIndex int
enableTLS bool
}

var redisStreamsLog = logf.Log.WithName("redis_streams_scaler")

// NewRedisStreamsScaler creates a new redisStreamsScaler
func NewRedisStreamsScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parseRedisStreamsMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing redis streams metadata: %s", err)
}

c, err := getRedisConnection(meta)
if err != nil {
return nil, fmt.Errorf("redis connection failed: %s", err)
}

return &redisStreamsScaler{
metadata: meta,
conn: c,
}, nil
}

func getRedisConnection(metadata *redisStreamsMetadata) (*redis.Client, error) {
options := &redis.Options{
Addr: metadata.address,
Password: metadata.password,
DB: metadata.databaseIndex,
}

if metadata.enableTLS == true {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}

// this does not guarentee successful connection
c := redis.NewClient(options)

// confirm if connected
err := c.Ping().Err()
if err != nil {
return nil, err
}
return c, nil
}

func parseRedisStreamsMetadata(metadata, resolvedEnv, authParams map[string]string) (*redisStreamsMetadata, error) {
meta := redisStreamsMetadata{}
meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount

if val, ok := metadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count %v", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
} else {
return nil, fmt.Errorf("missing pending entries count")
}

if val, ok := metadata[streamNameMetadata]; ok {
meta.streamName = val
} else {
return nil, fmt.Errorf("missing redis stream name")
}

if val, ok := metadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
} else {
return nil, fmt.Errorf("missing redis stream consumer group name")
}

address := defaultAddress
host := defaultRedisHost
port := defaultRedisPort
if val, ok := metadata[addressMetadata]; ok && val != "" {
address = val
} else {
if val, ok := metadata[hostMetadata]; ok && val != "" {
host = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
if val, ok := metadata[portMetadata]; ok && val != "" {
port = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
}

if val, ok := resolvedEnv[address]; ok {
meta.address = val
} else {
if val, ok := resolvedEnv[host]; ok {
meta.host = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}

if val, ok := resolvedEnv[port]; ok {
meta.port = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
meta.address = fmt.Sprintf("%s:%s", meta.host, meta.port)
}

meta.password = defaultPassword
if val, ok := authParams[passwordMetadata]; ok {
meta.password = val
} else if val, ok := metadata[passwordMetadata]; ok && val != "" {
if passd, ok := resolvedEnv[val]; ok {
meta.password = passd
}
}

meta.databaseIndex = defaultDbIndex
if val, ok := metadata[databaseIndexMetadata]; ok {
dbIndex, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing redis database index %v", err)
}
meta.databaseIndex = int(dbIndex)
}

meta.enableTLS = defaultTLS
if val, ok := metadata[enableTLSMetadata]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing enableTLS %v", err)
}
meta.enableTLS = tls
}
return &meta, nil
}

// IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream
func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.getPendingEntriesCount()

if err != nil {
redisStreamsLog.Error(err, "error")
return false, err
}

return count > 0, nil
}

func (s *redisStreamsScaler) Close() error {
return s.conn.Close()
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {

targetPendingEntriesCount := resource.NewQuantity(int64(s.metadata.targetPendingEntriesCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: pendingEntriesCountMetricName, TargetAverageValue: targetPendingEntriesCount}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

// GetMetrics fetches the number of pending entries for a consumer group in a stream
func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pendingEntriesCount, err := s.getPendingEntriesCount()

if err != nil {
redisStreamsLog.Error(err, "error fetching pending entries count")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(pendingEntriesCount, resource.DecimalSI),
Timestamp: metav1.Now(),
}
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *redisStreamsScaler) getPendingEntriesCount() (int64, error) {
pendingEntries, err := s.conn.XPending(s.metadata.streamName, s.metadata.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
105 changes: 105 additions & 0 deletions pkg/scalers/redis_streams_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package scalers

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseRedisStreamsMetadata(t *testing.T) {

type testCase struct {
name string
metadata map[string]string
resolvedEnv map[string]string
authParams map[string]string
}

authParams := map[string]string{"password": "foobarred"}

testCases := []testCase{
{"with address", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "address": "REDIS_SERVICE", "password": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, map[string]string{
"REDIS_SERVICE": "myredis:6379",
"REDIS_PASSWORD": "foobarred",
}, nil},

{"with host and port", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, map[string]string{
"REDIS_HOST": "myredis",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "foobarred",
}, authParams},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(te *testing.T) {
m, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, tc.authParams)
assert.Nil(t, err)
assert.Equal(t, m.streamName, tc.metadata[streamNameMetadata])
assert.Equal(t, m.consumerGroupName, tc.metadata[consumerGroupNameMetadata])
assert.Equal(t, strconv.Itoa(m.targetPendingEntriesCount), tc.metadata[pendingEntriesCountMetadata])
if authParams != nil {
//if authParam is used
assert.Equal(t, m.password, authParams[passwordMetadata])
} else {
//if metadata is used to pass password env var name
assert.Equal(t, m.password, tc.resolvedEnv[tc.metadata[passwordMetadata]])
}
assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata])
b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata])
assert.Nil(t, err)
assert.Equal(t, m.enableTLS, b)
})
}
}

func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) {
resolvedEnvMap := map[string]string{
"REDIS_SERVER": "myredis:6379",
"REDIS_HOST": "myredis",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "",
}
type testCase struct {
name string
metadata map[string]string
resolvedEnv map[string]string
}

testCases := []testCase{
//missing mandatory metadata
{"missing address as well as host/port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group"}, resolvedEnvMap},

{"host present but missing port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "host": "REDIS_HOST"}, resolvedEnvMap},

{"port present but missing host", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "port": "REDIS_PORT"}, resolvedEnvMap},

{"missing stream", map[string]string{"pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap},

{"missing consumerGroup", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "address": "REDIS_HOST"}, resolvedEnvMap},

{"missing pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap},

//invalid value for respective fields
{"invalid pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "junk", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnvMap},

{"invalid databaseIndex", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "junk", "enableTLS": "false"}, resolvedEnvMap},

{"invalid enableTLS", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "1", "enableTLS": "no"}, resolvedEnvMap},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(te *testing.T) {
_, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, map[string]string{})
assert.NotNil(t, err)
})
}
}

type redisStreamsTestMetadata struct {
metadata map[string]string
isError bool
authParams map[string]string
}