From d3119e2cb6d6de665e8814387ffe7310f6a4fdd9 Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 15:58:33 +0200 Subject: [PATCH 1/6] feat: added metadataMaxAge field --- internal/impl/kafka/input_kafka_franz.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/impl/kafka/input_kafka_franz.go b/internal/impl/kafka/input_kafka_franz.go index 6e6b1d999..ee3d9b4f4 100644 --- a/internal/impl/kafka/input_kafka_franz.go +++ b/internal/impl/kafka/input_kafka_franz.go @@ -126,6 +126,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add service.NewBatchPolicyField("batching"). Description("Allows you to configure a xref:configuration:batching.adoc[batching policy] that applies to individual topic partitions in order to batch messages together before flushing them for processing. Batching can be beneficial for performance as well as useful for windowed processing, and doing so this way preserves the ordering of topic partitions."). Advanced(), + service.NewDurationField("metadata_max_age"). + Description("The maximum age of metadata before it is refreshed."). + Default("5m"). + Advanced(), } } @@ -166,6 +170,7 @@ type FranzKafkaReader struct { regexPattern bool multiHeader bool batchPolicy service.BatchPolicy + metadataMaxAge time.Duration batchChan atomic.Value res *service.Resources @@ -269,6 +274,9 @@ func NewFranzKafkaReaderFromConfig(conf *service.ParsedConfig, res *service.Reso if f.saslConfs, err = SASLMechanismsFromConfig(conf); err != nil { return nil, err } + if f.metadataMaxAge, err = conf.FieldDuration("metadata_max_age"); err != nil { + return nil, err + } return &f, nil } @@ -642,6 +650,7 @@ func (f *FranzKafkaReader) Connect(ctx context.Context) error { kgo.ConsumerGroup(f.consumerGroup), kgo.ClientID(f.clientID), kgo.Rack(f.rackID), + kgo.MetadataMaxAge(f.metadataMaxAge), } if f.consumerGroup != "" { From bde8d9cb068997c5da5a9ddaa128d579cf0d20a6 Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 16:00:38 +0200 Subject: [PATCH 2/6] docs: added documentation for metadata_max_age --- docs/modules/components/pages/inputs/kafka_franz.adoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/modules/components/pages/inputs/kafka_franz.adoc b/docs/modules/components/pages/inputs/kafka_franz.adoc index d4343aa98..131ff55ee 100644 --- a/docs/modules/components/pages/inputs/kafka_franz.adoc +++ b/docs/modules/components/pages/inputs/kafka_franz.adoc @@ -75,6 +75,7 @@ input: client_certs: [] sasl: [] # No default (optional) multi_header: false + metadata_max_age: 5m batching: count: 0 byte_size: 0 @@ -592,6 +593,15 @@ Decode headers into lists to allow handling of multiple values with the same key *Default*: `false` +=== `metadata_max_age` + +The maximum age of metadata before it is refreshed. This can help reduce the number of requests made to the broker or speed up the detection of new topics. + + +*Type*: `string` + +*Default*: `5m` + === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy] that applies to individual topic partitions in order to batch messages together before flushing them for processing. Batching can be beneficial for performance as well as useful for windowed processing, and doing so this way preserves the ordering of topic partitions. From 7786edaf24fffbb3f973322b29f57aa4e785b3d9 Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 16:07:12 +0200 Subject: [PATCH 3/6] chore: added changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ac8fa028..1a0d91904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - New experimental `aws_bedrock_embeddings` processor. (@rockwotj) - New experimental `cohere_chat` and `cohere_embeddings` processors. (@rockwotj) - New experimental `questdb` output. (@sklarsa) +- Field `metadata_max_age` added to the `kafka_franz` input. (@Scarjit) ## 4.36.0 - 2024-09-11 From 962b96e56445c600b6725cc5c57fee5408fe87b8 Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 16:11:19 +0200 Subject: [PATCH 4/6] chore: build docs --- .../components/pages/inputs/kafka_franz.adoc | 20 +++++++++---------- .../components/pages/inputs/ockam_kafka.adoc | 10 ++++++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/docs/modules/components/pages/inputs/kafka_franz.adoc b/docs/modules/components/pages/inputs/kafka_franz.adoc index 131ff55ee..824fc8197 100644 --- a/docs/modules/components/pages/inputs/kafka_franz.adoc +++ b/docs/modules/components/pages/inputs/kafka_franz.adoc @@ -75,13 +75,13 @@ input: client_certs: [] sasl: [] # No default (optional) multi_header: false - metadata_max_age: 5m batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) + metadata_max_age: 5m ``` -- @@ -593,15 +593,6 @@ Decode headers into lists to allow handling of multiple values with the same key *Default*: `false` -=== `metadata_max_age` - -The maximum age of metadata before it is refreshed. This can help reduce the number of requests made to the broker or speed up the detection of new topics. - - -*Type*: `string` - -*Default*: `5m` - === `batching` Allows you to configure a xref:configuration:batching.adoc[batching policy] that applies to individual topic partitions in order to batch messages together before flushing them for processing. Batching can be beneficial for performance as well as useful for windowed processing, and doing so this way preserves the ordering of topic partitions. @@ -704,4 +695,13 @@ processors: format: json_array ``` +=== `metadata_max_age` + +The maximum age of metadata before it is refreshed. + + +*Type*: `string` + +*Default*: `"5m"` + diff --git a/docs/modules/components/pages/inputs/ockam_kafka.adoc b/docs/modules/components/pages/inputs/ockam_kafka.adoc index 049e26712..363b21bf2 100644 --- a/docs/modules/components/pages/inputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/inputs/ockam_kafka.adoc @@ -90,6 +90,7 @@ input: period: "" check: "" processors: [] # No default (optional) + metadata_max_age: 5m seed_brokers: [] # No default (optional) disable_content_encryption: false enrollment_ticket: "" # No default (optional) @@ -701,6 +702,15 @@ processors: format: json_array ``` +=== `kafka.metadata_max_age` + +The maximum age of metadata before it is refreshed. + + +*Type*: `string` + +*Default*: `"5m"` + === `kafka.seed_brokers` A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses. From d5e344da2c306d22e1f0875dbeedab96c1000b90 Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 16:13:14 +0200 Subject: [PATCH 5/6] fix: remove ockam changes --- docs/modules/components/pages/inputs/ockam_kafka.adoc | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docs/modules/components/pages/inputs/ockam_kafka.adoc b/docs/modules/components/pages/inputs/ockam_kafka.adoc index 363b21bf2..049e26712 100644 --- a/docs/modules/components/pages/inputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/inputs/ockam_kafka.adoc @@ -90,7 +90,6 @@ input: period: "" check: "" processors: [] # No default (optional) - metadata_max_age: 5m seed_brokers: [] # No default (optional) disable_content_encryption: false enrollment_ticket: "" # No default (optional) @@ -702,15 +701,6 @@ processors: format: json_array ``` -=== `kafka.metadata_max_age` - -The maximum age of metadata before it is refreshed. - - -*Type*: `string` - -*Default*: `"5m"` - === `kafka.seed_brokers` A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses. From e7a85d5230f1193e6d87a0b49cfd266c87686e2e Mon Sep 17 00:00:00 2001 From: Ferdinand Linnenberg Date: Wed, 18 Sep 2024 16:44:56 +0200 Subject: [PATCH 6/6] docs: include ockam again --- docs/modules/components/pages/inputs/ockam_kafka.adoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/modules/components/pages/inputs/ockam_kafka.adoc b/docs/modules/components/pages/inputs/ockam_kafka.adoc index 049e26712..363b21bf2 100644 --- a/docs/modules/components/pages/inputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/inputs/ockam_kafka.adoc @@ -90,6 +90,7 @@ input: period: "" check: "" processors: [] # No default (optional) + metadata_max_age: 5m seed_brokers: [] # No default (optional) disable_content_encryption: false enrollment_ticket: "" # No default (optional) @@ -701,6 +702,15 @@ processors: format: json_array ``` +=== `kafka.metadata_max_age` + +The maximum age of metadata before it is refreshed. + + +*Type*: `string` + +*Default*: `"5m"` + === `kafka.seed_brokers` A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses.