Skip to content

Commit

Permalink
Redis Streams Scaler (#765)
Browse files Browse the repository at this point in the history
Signed-off-by: abhishek <abhirockzz@gmail.com>
  • Loading branch information
abhirockzz authored Jun 17, 2020
1 parent 85794e9 commit c4acdc9
Show file tree
Hide file tree
Showing 4 changed files with 602 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
case "azure-monitor":
return scalers.NewAzureMonitorScaler(resolvedEnv, triggerMetadata, authParams)
case "redis-streams":
return scalers.NewRedisStreamsScaler(resolvedEnv, triggerMetadata, authParams)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
244 changes: 244 additions & 0 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package scalers

import (
"context"
"crypto/tls"
"fmt"
"strconv"

"github.com/go-redis/redis"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
pendingEntriesCountMetricName = "RedisStreamPendingEntriesCount"

// defaults
defaultTargetPendingEntriesCount = 5
defaultAddress = "redis-master.default.svc.cluster.local:6379"
defaultPassword = ""
defaultDbIndex = 0
defaultTLS = false
defaultRedisHost = ""
defaultRedisPort = ""

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
addressMetadata = "address"
hostMetadata = "host"
portMetadata = "port"
passwordMetadata = "password"
databaseIndexMetadata = "databaseIndex"
enableTLSMetadata = "enableTLS"

// error
missingRedisAddressOrHostPortInfo = "address or host missing. please provide redis address should in host:port format or set the host/port values"
)

type redisStreamsScaler struct {
metadata *redisStreamsMetadata
conn *redis.Client
}

type redisStreamsMetadata struct {
targetPendingEntriesCount int
streamName string
consumerGroupName string
address string
password string
host string
port string
databaseIndex int
enableTLS bool
}

var redisStreamsLog = logf.Log.WithName("redis_streams_scaler")

// NewRedisStreamsScaler creates a new redisStreamsScaler
func NewRedisStreamsScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parseRedisStreamsMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing redis streams metadata: %s", err)
}

c, err := getRedisConnection(meta)
if err != nil {
return nil, fmt.Errorf("redis connection failed: %s", err)
}

return &redisStreamsScaler{
metadata: meta,
conn: c,
}, nil
}

func getRedisConnection(metadata *redisStreamsMetadata) (*redis.Client, error) {
options := &redis.Options{
Addr: metadata.address,
Password: metadata.password,
DB: metadata.databaseIndex,
}

if metadata.enableTLS == true {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}

// this does not guarentee successful connection
c := redis.NewClient(options)

// confirm if connected
err := c.Ping().Err()
if err != nil {
return nil, err
}
return c, nil
}

func parseRedisStreamsMetadata(metadata, resolvedEnv, authParams map[string]string) (*redisStreamsMetadata, error) {
meta := redisStreamsMetadata{}
meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount

if val, ok := metadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count %v", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
} else {
return nil, fmt.Errorf("missing pending entries count")
}

if val, ok := metadata[streamNameMetadata]; ok {
meta.streamName = val
} else {
return nil, fmt.Errorf("missing redis stream name")
}

if val, ok := metadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
} else {
return nil, fmt.Errorf("missing redis stream consumer group name")
}

address := defaultAddress
host := defaultRedisHost
port := defaultRedisPort
if val, ok := metadata[addressMetadata]; ok && val != "" {
address = val
} else {
if val, ok := metadata[hostMetadata]; ok && val != "" {
host = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
if val, ok := metadata[portMetadata]; ok && val != "" {
port = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
}

if val, ok := resolvedEnv[address]; ok {
meta.address = val
} else {
if val, ok := resolvedEnv[host]; ok {
meta.host = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}

if val, ok := resolvedEnv[port]; ok {
meta.port = val
} else {
return nil, fmt.Errorf(missingRedisAddressOrHostPortInfo)
}
meta.address = fmt.Sprintf("%s:%s", meta.host, meta.port)
}

meta.password = defaultPassword
if val, ok := authParams[passwordMetadata]; ok {
meta.password = val
} else if val, ok := metadata[passwordMetadata]; ok && val != "" {
if passd, ok := resolvedEnv[val]; ok {
meta.password = passd
}
}

meta.databaseIndex = defaultDbIndex
if val, ok := metadata[databaseIndexMetadata]; ok {
dbIndex, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing redis database index %v", err)
}
meta.databaseIndex = int(dbIndex)
}

meta.enableTLS = defaultTLS
if val, ok := metadata[enableTLSMetadata]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing enableTLS %v", err)
}
meta.enableTLS = tls
}
return &meta, nil
}

// IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream
func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.getPendingEntriesCount()

if err != nil {
redisStreamsLog.Error(err, "error")
return false, err
}

return count > 0, nil
}

func (s *redisStreamsScaler) Close() error {
return s.conn.Close()
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {

targetPendingEntriesCount := resource.NewQuantity(int64(s.metadata.targetPendingEntriesCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: pendingEntriesCountMetricName, TargetAverageValue: targetPendingEntriesCount}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

// GetMetrics fetches the number of pending entries for a consumer group in a stream
func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pendingEntriesCount, err := s.getPendingEntriesCount()

if err != nil {
redisStreamsLog.Error(err, "error fetching pending entries count")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(pendingEntriesCount, resource.DecimalSI),
Timestamp: metav1.Now(),
}
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *redisStreamsScaler) getPendingEntriesCount() (int64, error) {
pendingEntries, err := s.conn.XPending(s.metadata.streamName, s.metadata.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
105 changes: 105 additions & 0 deletions pkg/scalers/redis_streams_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package scalers

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseRedisStreamsMetadata(t *testing.T) {

type testCase struct {
name string
metadata map[string]string
resolvedEnv map[string]string
authParams map[string]string
}

authParams := map[string]string{"password": "foobarred"}

testCases := []testCase{
{"with address", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "address": "REDIS_SERVICE", "password": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, map[string]string{
"REDIS_SERVICE": "myredis:6379",
"REDIS_PASSWORD": "foobarred",
}, nil},

{"with host and port", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, map[string]string{
"REDIS_HOST": "myredis",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "foobarred",
}, authParams},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(te *testing.T) {
m, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, tc.authParams)
assert.Nil(t, err)
assert.Equal(t, m.streamName, tc.metadata[streamNameMetadata])
assert.Equal(t, m.consumerGroupName, tc.metadata[consumerGroupNameMetadata])
assert.Equal(t, strconv.Itoa(m.targetPendingEntriesCount), tc.metadata[pendingEntriesCountMetadata])
if authParams != nil {
//if authParam is used
assert.Equal(t, m.password, authParams[passwordMetadata])
} else {
//if metadata is used to pass password env var name
assert.Equal(t, m.password, tc.resolvedEnv[tc.metadata[passwordMetadata]])
}
assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata])
b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata])
assert.Nil(t, err)
assert.Equal(t, m.enableTLS, b)
})
}
}

func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) {
resolvedEnvMap := map[string]string{
"REDIS_SERVER": "myredis:6379",
"REDIS_HOST": "myredis",
"REDIS_PORT": "6379",
"REDIS_PASSWORD": "",
}
type testCase struct {
name string
metadata map[string]string
resolvedEnv map[string]string
}

testCases := []testCase{
//missing mandatory metadata
{"missing address as well as host/port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group"}, resolvedEnvMap},

{"host present but missing port", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "host": "REDIS_HOST"}, resolvedEnvMap},

{"port present but missing host", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "port": "REDIS_PORT"}, resolvedEnvMap},

{"missing stream", map[string]string{"pendingEntriesCount": "5", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap},

{"missing consumerGroup", map[string]string{"stream": "my-stream", "pendingEntriesCount": "5", "address": "REDIS_HOST"}, resolvedEnvMap},

{"missing pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "address": "REDIS_HOST"}, resolvedEnvMap},

//invalid value for respective fields
{"invalid pendingEntriesCount", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "junk", "host": "REDIS_HOST", "port": "REDIS_PORT", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnvMap},

{"invalid databaseIndex", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "junk", "enableTLS": "false"}, resolvedEnvMap},

{"invalid enableTLS", map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "address": "REDIS_SERVER", "databaseIndex": "1", "enableTLS": "no"}, resolvedEnvMap},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(te *testing.T) {
_, err := parseRedisStreamsMetadata(tc.metadata, tc.resolvedEnv, map[string]string{})
assert.NotNil(t, err)
})
}
}

type redisStreamsTestMetadata struct {
metadata map[string]string
isError bool
authParams map[string]string
}
Loading

0 comments on commit c4acdc9

Please sign in to comment.