Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.kafka_consumer): Allow to select the metric time source #15790

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ to use them.
## option, it will be excluded from the msg_headers_as_tags list.
# msg_header_as_metric_name = ""

## Set metric(s) timestamp using the given source.
## Available options are:
## metric -- do not modify the metric timestamp
## inner -- use the inner message timestamp (Kafka v0.10+)
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
# timestamp_source = "metric"

## Optional Client id
# client_id = "Telegraf"

Expand Down
26 changes: 24 additions & 2 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type KafkaConsumer struct {
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
TimestampSource string `toml:"timestamp_source"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
Expand Down Expand Up @@ -108,6 +109,14 @@ func (k *KafkaConsumer) Init() error {
k.ConsumerGroup = defaultConsumerGroup
}

switch k.TimestampSource {
case "":
k.TimestampSource = "metric"
case "metric", "inner", "outer":
default:
return fmt.Errorf("invalid timestamp source %q", k.TimestampSource)
}

cfg := sarama.NewConfig()

// Kafka version 0.10.2.0 is required for consumer groups.
Expand Down Expand Up @@ -334,6 +343,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
}
}
handler.MsgHeadersToTags = msgHeadersMap
handler.TimestampSource = k.TimestampSource

// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
Expand Down Expand Up @@ -399,6 +409,7 @@ type ConsumerGroupHandler struct {
TopicTag string
MsgHeadersToTags map[string]bool
MsgHeaderToMetricName string
TimestampSource string

acc telegraf.TrackingAccumulator
sem semaphore
Expand Down Expand Up @@ -495,12 +506,11 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
})
}

headerKey := ""
// Check if any message header should override metric name or should be pass as tag
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
for _, header := range msg.Headers {
//convert to a string as the header and value are byte arrays.
headerKey = string(header.Key)
headerKey := string(header.Key)
if _, exists := h.MsgHeadersToTags[headerKey]; exists {
// If message header should be pass as tag then add it to the metrics
for _, metric := range metrics {
Expand All @@ -523,6 +533,18 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
}
}

// Do override the metric timestamp if required
switch h.TimestampSource {
case "inner":
for _, metric := range metrics {
metric.SetTime(msg.Timestamp)
}
case "outer":
for _, metric := range metrics {
metric.SetTime(msg.BlockTimestamp)
}
}

h.mu.Lock()
id := h.acc.AddTrackingMetricGroup(metrics)
h.undelivered[id] = Message{session: session, message: msg}
Expand Down
76 changes: 76 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -549,6 +550,81 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
}
}

func TestKafkaTimestampSourceIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

metrics := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1704067200, 0),
),
}

for _, source := range []string{"metric", "inner", "outer"} {
t.Run(source, func(t *testing.T) {
ctx := context.Background()
kafkaContainer, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
require.NoError(t, err)
defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored

brokers, err := kafkaContainer.Brokers(ctx)
require.NoError(t, err)

// Make kafka output
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)

s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = "Test"
output.Log = &testutil.Logger{}

require.NoError(t, output.Init())
require.NoError(t, output.Connect())
defer output.Close()

// Make kafka input
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{"Test"},
MaxUndeliveredMessages: 1,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())

var acc testutil.Accumulator
require.NoError(t, input.Start(&acc))
defer input.Stop()

// Send the metrics and check that we got it back
sendTimestamp := time.Now().Unix()
require.NoError(t, output.Write(metrics))
require.Eventually(t, func() bool { return acc.NMetrics() > 0 }, 5*time.Second, 100*time.Millisecond)
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, metrics, actual, testutil.IgnoreTime())

// Check the timestamp
m := actual[0]
switch source {
case "metric":
require.EqualValues(t, 1704067200, m.Time().Unix())
case "inner", "outer":
require.GreaterOrEqual(t, sendTimestamp, m.Time().Unix())
}
})
}
}

func TestExponentialBackoff(t *testing.T) {
var err error

Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
## option, it will be excluded from the msg_headers_as_tags list.
# msg_header_as_metric_name = ""

## Set metric(s) timestamp using the given source.
## Available options are:
## metric -- do not modify the metric timestamp
## inner -- use the inner message timestamp (Kafka v0.10+)
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
# timestamp_source = "metric"

## Optional Client id
# client_id = "Telegraf"

Expand Down
Loading