From da379395e2cecb7656f1ac52cd1dcc4b0cd3dfdb Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 30 Aug 2024 21:18:04 +0200 Subject: [PATCH] feat(inputs.kafka_consumer): Allow to select the metric time source (#15790) --- plugins/inputs/kafka_consumer/README.md | 7 ++ .../inputs/kafka_consumer/kafka_consumer.go | 26 ++++++- .../kafka_consumer/kafka_consumer_test.go | 76 +++++++++++++++++++ plugins/inputs/kafka_consumer/sample.conf | 7 ++ 4 files changed, 114 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 38b50595b397e..3a2ea0fcaab17 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -70,6 +70,13 @@ to use them. ## option, it will be excluded from the msg_headers_as_tags list. # msg_header_as_metric_name = "" + ## Set metric(s) timestamp using the given source. + ## Available options are: + ## metric -- do not modify the metric timestamp + ## inner -- use the inner message timestamp (Kafka v0.10+) + ## outer -- use the outer (compressed) block timestamp (Kafka v0.10+) + # timestamp_source = "metric" + ## Optional Client id # client_id = "Telegraf" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 429490805c348..3d4eaa62d2b5c 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -49,6 +49,7 @@ type KafkaConsumer struct { TopicTag string `toml:"topic_tag"` MsgHeadersAsTags []string `toml:"msg_headers_as_tags"` MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"` + TimestampSource string `toml:"timestamp_source"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"` @@ -108,6 +109,14 @@ func (k *KafkaConsumer) Init() error { k.ConsumerGroup = defaultConsumerGroup } + switch k.TimestampSource { + case "": + k.TimestampSource = "metric" + case "metric", "inner", "outer": + default: + return fmt.Errorf("invalid timestamp source %q", k.TimestampSource) + } + cfg := sarama.NewConfig() // Kafka version 0.10.2.0 is required for consumer groups. @@ -334,6 +343,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { } } handler.MsgHeadersToTags = msgHeadersMap + handler.TimestampSource = k.TimestampSource // We need to copy allWantedTopics; the Consume() is // long-running and we can easily deadlock if our @@ -399,6 +409,7 @@ type ConsumerGroupHandler struct { TopicTag string MsgHeadersToTags map[string]bool MsgHeaderToMetricName string + TimestampSource string acc telegraf.TrackingAccumulator sem semaphore @@ -495,12 +506,11 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * }) } - headerKey := "" // Check if any message header should override metric name or should be pass as tag if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" { for _, header := range msg.Headers { //convert to a string as the header and value are byte arrays. - headerKey = string(header.Key) + headerKey := string(header.Key) if _, exists := h.MsgHeadersToTags[headerKey]; exists { // If message header should be pass as tag then add it to the metrics for _, metric := range metrics { @@ -523,6 +533,18 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * } } + // Do override the metric timestamp if required + switch h.TimestampSource { + case "inner": + for _, metric := range metrics { + metric.SetTime(msg.Timestamp) + } + case "outer": + for _, metric := range metrics { + metric.SetTime(msg.BlockTimestamp) + } + } + h.mu.Lock() id := h.acc.AddTrackingMetricGroup(metrics) h.undelivered[id] = Message{session: session, message: msg} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 608f89b58f5cd..96aa736f17390 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" @@ -549,6 +550,81 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } } +func TestKafkaTimestampSourceIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + metrics := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1704067200, 0), + ), + } + + for _, source := range []string{"metric", "inner", "outer"} { + t.Run(source, func(t *testing.T) { + ctx := context.Background() + kafkaContainer, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0") + require.NoError(t, err) + defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored + + brokers, err := kafkaContainer.Brokers(ctx) + require.NoError(t, err) + + // Make kafka output + creator := outputs.Outputs["kafka"] + output, ok := creator().(*kafkaOutput.Kafka) + require.True(t, ok) + + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) + output.SetSerializer(s) + output.Brokers = brokers + output.Topic = "Test" + output.Log = &testutil.Logger{} + + require.NoError(t, output.Init()) + require.NoError(t, output.Connect()) + defer output.Close() + + // Make kafka input + input := KafkaConsumer{ + Brokers: brokers, + Log: testutil.Logger{}, + Topics: []string{"Test"}, + MaxUndeliveredMessages: 1, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) + require.NoError(t, input.Init()) + + var acc testutil.Accumulator + require.NoError(t, input.Start(&acc)) + defer input.Stop() + + // Send the metrics and check that we got it back + sendTimestamp := time.Now().Unix() + require.NoError(t, output.Write(metrics)) + require.Eventually(t, func() bool { return acc.NMetrics() > 0 }, 5*time.Second, 100*time.Millisecond) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, metrics, actual, testutil.IgnoreTime()) + + // Check the timestamp + m := actual[0] + switch source { + case "metric": + require.EqualValues(t, 1704067200, m.Time().Unix()) + case "inner", "outer": + require.GreaterOrEqual(t, sendTimestamp, m.Time().Unix()) + } + }) + } +} + func TestExponentialBackoff(t *testing.T) { var err error diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 00e6a9489074c..0097b2c6570ae 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -33,6 +33,13 @@ ## option, it will be excluded from the msg_headers_as_tags list. # msg_header_as_metric_name = "" + ## Set metric(s) timestamp using the given source. + ## Available options are: + ## metric -- do not modify the metric timestamp + ## inner -- use the inner message timestamp (Kafka v0.10+) + ## outer -- use the outer (compressed) block timestamp (Kafka v0.10+) + # timestamp_source = "metric" + ## Optional Client id # client_id = "Telegraf"