Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.kafka_consumer): Use metric name from message header #14320

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ to use them.
## are not available
# msg_headers_to_tags = []

## The name of kafka message header which value should override the metric name.
## In case when the same header specified in current option and in msg_headers_to_tags
## option, it will be excluded from the msg_headers_to_tags list.
# msg_header_as_metric_name = ""

## Optional Client id
# client_id = "Telegraf"

Expand Down
23 changes: 17 additions & 6 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type KafkaConsumer struct {
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`

Expand Down Expand Up @@ -321,11 +322,14 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
handler.MsgHeaderToMetricName = k.MsgHeaderAsMetricName
//if message headers list specified, put it as map to handler
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
if len(k.MsgHeadersAsTags) > 0 {
for _, header := range k.MsgHeadersAsTags {
msgHeadersMap[header] = true
if k.MsgHeaderAsMetricName != header {
msgHeadersMap[header] = true
}
}
}
handler.MsgHeadersToTags = msgHeadersMap
Expand Down Expand Up @@ -390,9 +394,10 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parse

// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool
MsgHeaderToMetricName string

acc telegraf.TrackingAccumulator
sem semaphore
Expand Down Expand Up @@ -482,9 +487,9 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
return err
}

// Check if any message header should be pass as tag
headerKey := ""
if len(h.MsgHeadersToTags) > 0 {
// Check if any message header should override metric name or should be pass as tag
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
for _, header := range msg.Headers {
//convert to a string as the header and value are byte arrays.
headerKey = string(header.Key)
Expand All @@ -493,6 +498,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
for _, metric := range metrics {
metric.AddTag(headerKey, string(header.Value))
}
} else {
if h.MsgHeaderToMetricName == headerKey {
for _, metric := range metrics {
metric.SetName(string(header.Value))
}
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
## works only for Kafka version 0.11+, on lower versions the message headers
## are not available
# msg_headers_to_tags = []

## The name of kafka message header which value should override the metric name.
## In case when the same header specified in current option and in msg_headers_to_tags
## option, it will be excluded from the msg_headers_to_tags list.
# msg_header_as_metric_name = ""

## Optional Client id
# client_id = "Telegraf"
Expand Down
Loading