diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index e1773a7885e..cb57db287c5 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -26,10 +26,11 @@ type kafkaScaler struct { } type kafkaMetadata struct { - bootstrapServers []string - group string - topic string - lagThreshold int64 + bootstrapServers []string + group string + topic string + lagThreshold int64 + offsetResetPolicy offsetResetPolicy // auth authMode kafkaAuthMode @@ -42,6 +43,13 @@ type kafkaMetadata struct { ca string } +type offsetResetPolicy string + +const ( + latest offsetResetPolicy = "latest" + earliest offsetResetPolicy = "earliest" +) + type kafkaAuthMode string const ( @@ -57,6 +65,7 @@ const ( lagThresholdMetricName = "lagThreshold" kafkaMetricType = "External" defaultKafkaLagThreshold = 10 + defaultOffsetResetPolicy = latest ) var kafkaLog = logf.Log.WithName("kafka_scaler") @@ -100,6 +109,16 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka } meta.topic = metadata["topic"] + meta.offsetResetPolicy = defaultOffsetResetPolicy + + if metadata["offsetResetPolicy"] != "" { + policy := offsetResetPolicy(metadata["offsetResetPolicy"]) + if policy != earliest && policy != latest { + return meta, fmt.Errorf("err offsetResetPolicy policy %s given", policy) + } + meta.offsetResetPolicy = policy + } + meta.lagThreshold = defaultKafkaLagThreshold if val, ok := metadata[lagThresholdMetricName]; ok { @@ -295,11 +314,13 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset } var lag int64 - // For now, assume a consumer group that has no committed - // offset will read all messages from the topic. This may be - // something we want to allow users to configure. + if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest { - lag = latestOffset + if s.metadata.offsetResetPolicy == latest { + lag = 0 + } else { + lag = latestOffset + } } else { lag = latestOffset - consumerOffset } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 41365920064..e5680a8d67c 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -6,12 +6,13 @@ import ( ) type parseKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic string + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic string + offsetResetPolicy offsetResetPolicy } // A complete valid metadata example for reference @@ -33,16 +34,21 @@ var validWithoutAuthParams = map[string]string{} var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ // failure, no bootstrapServers - {map[string]string{}, true, 0, nil, "", ""}, - + {map[string]string{}, true, 0, nil, "", "", ""}, // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest"}, // failure, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", ""}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest")}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic"}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic"}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, + // success, offsetResetPolicy policy latest + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")}, + // failure, offsetResetPolicy policy wrong + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", ""}, + // success, offsetResetPolicy policy earliest + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest")}, } func TestGetBrokers(t *testing.T) { @@ -67,6 +73,9 @@ func TestGetBrokers(t *testing.T) { if meta.topic != testData.topic { t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) } + if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + } meta, err = parseKafkaMetadata(nil, testData.metadata, validWithoutAuthParams) @@ -88,5 +97,8 @@ func TestGetBrokers(t *testing.T) { if meta.topic != testData.topic { t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) } + if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + } } }