Skip to content

Commit

Permalink
Support partition assignement strategy configuration in kafka_consumer (
Browse files Browse the repository at this point in the history
  • Loading branch information
elohmeier authored and idohalevi committed Sep 23, 2020
1 parent 0fc462f commit 8a61f1b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 5 deletions.
20 changes: 17 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@

[[constraint]]
name = "github.com/Shopify/sarama"
revision = "b12709e6ca29240128c89fe0b30b6a76be42b457"
source = "https://github.com/influxdata/sarama.git"
version = "1.24.0"

[[constraint]]
name = "github.com/soniah/gosnmp"
Expand Down
3 changes: 3 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5116,6 +5116,9 @@
# ## Initial offset position; one of "oldest" or "newest".
# # offset = "oldest"
#
# ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# # balance_strategy = "range"
#
# ## Maximum length of a message to consume, in bytes (default 0/unlimited);
# ## larger messages are dropped
# max_message_len = 1000000
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ and use the old zookeeper connection method.
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"

## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
Expand Down
15 changes: 15 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const sampleConfig = `
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
Expand Down Expand Up @@ -86,6 +89,7 @@ type KafkaConsumer struct {
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Version string `toml:"version"`
Expand Down Expand Up @@ -185,6 +189,17 @@ func (k *KafkaConsumer) Init() error {
return fmt.Errorf("invalid offset %q", k.Offset)
}

switch strings.ToLower(k.BalanceStrategy) {
case "range", "":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
default:
return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy)
}

if k.ConsumerCreator == nil {
k.ConsumerCreator = &SaramaCreator{}
}
Expand Down

0 comments on commit 8a61f1b

Please sign in to comment.