diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 0548f380a4d67..48fd923a5ff22 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -76,6 +76,9 @@ pub struct KafkaSourceConfig { /// The consumer group name to be used to consume events from Kafka. group_id: String, + /// Override dynamic membership and broker assignment behavior with static membership, using a group instance (member) id. + group_instance_id: Option, + /// If offsets for consumer group do not exist, set them using this strategy. /// /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the `auto.offset.reset` option for further clarification. @@ -616,6 +619,10 @@ fn create_consumer(config: &KafkaSourceConfig) -> crate::Result>(CustomContext::default()) .context(KafkaCreateSnafu)?; diff --git a/website/cue/reference/components/sources/base/kafka.cue b/website/cue/reference/components/sources/base/kafka.cue index fdad509606adf..d7727da1f3e49 100644 --- a/website/cue/reference/components/sources/base/kafka.cue +++ b/website/cue/reference/components/sources/base/kafka.cue @@ -179,6 +179,11 @@ base: components: sources: kafka: configuration: { required: true type: string: {} } + group_instance_id: { + description: "Override dynamic membership and broker assignment behavior with static membership, using a group instance (member) id." + required: false + type: string: {} + } headers_key: { description: """ Overrides the name of the log field used to add the headers to each event.