Skip to content

Commit

Permalink
[v2]adds consumer offset reset policy option to keda kafka scaler (#925)
Browse files Browse the repository at this point in the history
* adds consumer offset reset policy option to keda kafka scaler
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
  • Loading branch information
grassiale committed Jul 20, 2020
1 parent 9295280 commit e107f26
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
37 changes: 29 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type kafkaScaler struct {
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
lagThreshold int64
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy

// auth
authMode kafkaAuthMode
Expand All @@ -42,6 +43,13 @@ type kafkaMetadata struct {
ca string
}

type offsetResetPolicy string

const (
latest offsetResetPolicy = "latest"
earliest offsetResetPolicy = "earliest"
)

type kafkaAuthMode string

const (
Expand All @@ -57,6 +65,7 @@ const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
defaultKafkaLagThreshold = 10
defaultOffsetResetPolicy = latest
)

var kafkaLog = logf.Log.WithName("kafka_scaler")
Expand Down Expand Up @@ -100,6 +109,16 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka
}
meta.topic = metadata["topic"]

meta.offsetResetPolicy = defaultOffsetResetPolicy

if metadata["offsetResetPolicy"] != "" {
policy := offsetResetPolicy(metadata["offsetResetPolicy"])
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetResetPolicy policy %s given", policy)
}
meta.offsetResetPolicy = policy
}

meta.lagThreshold = defaultKafkaLagThreshold

if val, ok := metadata[lagThresholdMetricName]; ok {
Expand Down Expand Up @@ -295,11 +314,13 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset
}

var lag int64
// For now, assume a consumer group that has no committed
// offset will read all messages from the topic. This may be
// something we want to allow users to configure.

if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest {
lag = latestOffset
if s.metadata.offsetResetPolicy == latest {
lag = 0
} else {
lag = latestOffset
}
} else {
lag = latestOffset - consumerOffset
}
Expand Down
36 changes: 24 additions & 12 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
)

type parseKafkaMetadataTestData struct {
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
offsetResetPolicy offsetResetPolicy
}

// A complete valid metadata example for reference
Expand All @@ -33,16 +34,21 @@ var validWithoutAuthParams = map[string]string{}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
// failure, no bootstrapServers
{map[string]string{}, true, 0, nil, "", ""},

{map[string]string{}, true, 0, nil, "", "", ""},
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""},
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest"},
// failure, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", ""},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest")},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic"},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
// success, more brokers
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic"},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
// success, offsetResetPolicy policy latest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
// failure, offsetResetPolicy policy wrong
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", ""},
// success, offsetResetPolicy policy earliest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest")},
}

func TestGetBrokers(t *testing.T) {
Expand All @@ -67,6 +73,9 @@ func TestGetBrokers(t *testing.T) {
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}

meta, err = parseKafkaMetadata(nil, testData.metadata, validWithoutAuthParams)

Expand All @@ -88,5 +97,8 @@ func TestGetBrokers(t *testing.T) {
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
}
}

0 comments on commit e107f26

Please sign in to comment.