Skip to content

Commit

Permalink
feat(inputs.kafka_consumer): Allow to select the metric time source (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and asaharn committed Oct 16, 2024
1 parent e816630 commit da37939
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
7 changes: 7 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ to use them.
## option, it will be excluded from the msg_headers_as_tags list.
# msg_header_as_metric_name = ""

## Set metric(s) timestamp using the given source.
## Available options are:
## metric -- do not modify the metric timestamp
## inner -- use the inner message timestamp (Kafka v0.10+)
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
# timestamp_source = "metric"

## Optional Client id
# client_id = "Telegraf"

Expand Down
26 changes: 24 additions & 2 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type KafkaConsumer struct {
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
TimestampSource string `toml:"timestamp_source"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
Expand Down Expand Up @@ -108,6 +109,14 @@ func (k *KafkaConsumer) Init() error {
k.ConsumerGroup = defaultConsumerGroup
}

switch k.TimestampSource {
case "":
k.TimestampSource = "metric"
case "metric", "inner", "outer":
default:
return fmt.Errorf("invalid timestamp source %q", k.TimestampSource)
}

cfg := sarama.NewConfig()

// Kafka version 0.10.2.0 is required for consumer groups.
Expand Down Expand Up @@ -334,6 +343,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
}
}
handler.MsgHeadersToTags = msgHeadersMap
handler.TimestampSource = k.TimestampSource

// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
Expand Down Expand Up @@ -399,6 +409,7 @@ type ConsumerGroupHandler struct {
TopicTag string
MsgHeadersToTags map[string]bool
MsgHeaderToMetricName string
TimestampSource string

acc telegraf.TrackingAccumulator
sem semaphore
Expand Down Expand Up @@ -495,12 +506,11 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
})
}

headerKey := ""
// Check if any message header should override metric name or should be pass as tag
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
for _, header := range msg.Headers {
//convert to a string as the header and value are byte arrays.
headerKey = string(header.Key)
headerKey := string(header.Key)
if _, exists := h.MsgHeadersToTags[headerKey]; exists {
// If message header should be pass as tag then add it to the metrics
for _, metric := range metrics {
Expand All @@ -523,6 +533,18 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
}
}

// Do override the metric timestamp if required
switch h.TimestampSource {
case "inner":
for _, metric := range metrics {
metric.SetTime(msg.Timestamp)
}
case "outer":
for _, metric := range metrics {
metric.SetTime(msg.BlockTimestamp)
}
}

h.mu.Lock()
id := h.acc.AddTrackingMetricGroup(metrics)
h.undelivered[id] = Message{session: session, message: msg}
Expand Down
76 changes: 76 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -549,6 +550,81 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
}
}

func TestKafkaTimestampSourceIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

metrics := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1704067200, 0),
),
}

for _, source := range []string{"metric", "inner", "outer"} {
t.Run(source, func(t *testing.T) {
ctx := context.Background()
kafkaContainer, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
require.NoError(t, err)
defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored

brokers, err := kafkaContainer.Brokers(ctx)
require.NoError(t, err)

// Make kafka output
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)

s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = "Test"
output.Log = &testutil.Logger{}

require.NoError(t, output.Init())
require.NoError(t, output.Connect())
defer output.Close()

// Make kafka input
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"Test"},
MaxUndeliveredMessages: 1,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())

var acc testutil.Accumulator
require.NoError(t, input.Start(&acc))
defer input.Stop()

// Send the metrics and check that we got it back
sendTimestamp := time.Now().Unix()
require.NoError(t, output.Write(metrics))
require.Eventually(t, func() bool { return acc.NMetrics() > 0 }, 5*time.Second, 100*time.Millisecond)
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, metrics, actual, testutil.IgnoreTime())

// Check the timestamp
m := actual[0]
switch source {
case "metric":
require.EqualValues(t, 1704067200, m.Time().Unix())
case "inner", "outer":
require.GreaterOrEqual(t, sendTimestamp, m.Time().Unix())
}
})
}
}

func TestExponentialBackoff(t *testing.T) {
var err error

Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
## option, it will be excluded from the msg_headers_as_tags list.
# msg_header_as_metric_name = ""

## Set metric(s) timestamp using the given source.
## Available options are:
## metric -- do not modify the metric timestamp
## inner -- use the inner message timestamp (Kafka v0.10+)
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
# timestamp_source = "metric"

## Optional Client id
# client_id = "Telegraf"

Expand Down

0 comments on commit da37939

Please sign in to comment.