diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ac8fa028..1a0d91904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - New experimental `aws_bedrock_embeddings` processor. (@rockwotj) - New experimental `cohere_chat` and `cohere_embeddings` processors. (@rockwotj) - New experimental `questdb` output. (@sklarsa) +- Field `metadata_max_age` added to the `kafka_franz` input. (@Scarjit) ## 4.36.0 - 2024-09-11 diff --git a/docs/modules/components/pages/inputs/kafka_franz.adoc b/docs/modules/components/pages/inputs/kafka_franz.adoc index d4343aa98..824fc8197 100644 --- a/docs/modules/components/pages/inputs/kafka_franz.adoc +++ b/docs/modules/components/pages/inputs/kafka_franz.adoc @@ -81,6 +81,7 @@ input: period: "" check: "" processors: [] # No default (optional) + metadata_max_age: 5m ``` -- @@ -694,4 +695,13 @@ processors: format: json_array ``` +=== `metadata_max_age` + +The maximum age of metadata before it is refreshed. + + +*Type*: `string` + +*Default*: `"5m"` + diff --git a/docs/modules/components/pages/inputs/ockam_kafka.adoc b/docs/modules/components/pages/inputs/ockam_kafka.adoc index 049e26712..363b21bf2 100644 --- a/docs/modules/components/pages/inputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/inputs/ockam_kafka.adoc @@ -90,6 +90,7 @@ input: period: "" check: "" processors: [] # No default (optional) + metadata_max_age: 5m seed_brokers: [] # No default (optional) disable_content_encryption: false enrollment_ticket: "" # No default (optional) @@ -701,6 +702,15 @@ processors: format: json_array ``` +=== `kafka.metadata_max_age` + +The maximum age of metadata before it is refreshed. + + +*Type*: `string` + +*Default*: `"5m"` + === `kafka.seed_brokers` A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses. diff --git a/internal/impl/kafka/input_kafka_franz.go b/internal/impl/kafka/input_kafka_franz.go index 6e6b1d999..ee3d9b4f4 100644 --- a/internal/impl/kafka/input_kafka_franz.go +++ b/internal/impl/kafka/input_kafka_franz.go @@ -126,6 +126,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add service.NewBatchPolicyField("batching"). Description("Allows you to configure a xref:configuration:batching.adoc[batching policy] that applies to individual topic partitions in order to batch messages together before flushing them for processing. Batching can be beneficial for performance as well as useful for windowed processing, and doing so this way preserves the ordering of topic partitions."). Advanced(), + service.NewDurationField("metadata_max_age"). + Description("The maximum age of metadata before it is refreshed."). + Default("5m"). + Advanced(), } } @@ -166,6 +170,7 @@ type FranzKafkaReader struct { regexPattern bool multiHeader bool batchPolicy service.BatchPolicy + metadataMaxAge time.Duration batchChan atomic.Value res *service.Resources @@ -269,6 +274,9 @@ func NewFranzKafkaReaderFromConfig(conf *service.ParsedConfig, res *service.Reso if f.saslConfs, err = SASLMechanismsFromConfig(conf); err != nil { return nil, err } + if f.metadataMaxAge, err = conf.FieldDuration("metadata_max_age"); err != nil { + return nil, err + } return &f, nil } @@ -642,6 +650,7 @@ func (f *FranzKafkaReader) Connect(ctx context.Context) error { kgo.ConsumerGroup(f.consumerGroup), kgo.ClientID(f.clientID), kgo.Rack(f.rackID), + kgo.MetadataMaxAge(f.metadataMaxAge), } if f.consumerGroup != "" {