Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telegraf Kafka input plugin issue with latest Kafka (2.12-2.3.0) when configuring multiple Kafka consumers #6136

Closed
adithyamk opened this issue Jul 18, 2019 · 12 comments · Fixed by #6181
Assignees
Labels
area/kafka bug unexpected problem or unintended behavior
Milestone

Comments

@adithyamk
Copy link

Relevant telegraf.conf:


# OUTPUTS
[outputs]
[[outputs.influxdb]]
    namepass = ["KI1"]
    # The full HTTP endpoint URL for your InfluxDB instance
    url = "http://localhost:8086" # EDIT THIS LINE
    # The target database for metrics. This database must already exist
    database = "telegraf" # required.
    skip_database_creation = true
    database_tag = "KI1"

[[outputs.influxdb]]
    namepass = ["KI2"]
    # The full HTTP endpoint URL for your InfluxDB instance
    url = "http://localhost:8086" # EDIT THIS LINE
    # The target database for metrics. This database must already exist
    database = "telegraf" # required.
    skip_database_creation = true
    database_tag = "KI2"

#INPUTS
[inputs]
[[inputs.kafka_consumer]]
    name_override = "KI1"
    ## kafka servers
    brokers = ["localhost:9092"]
    ## topic(s) to consume
    topics = ["sample_topic"]
    ## Add topic as tag if topic_tag is not empty
    # topic_tag = ""
    consumer_group = "sample"
    offset = "oldest"
    max_message_len = 65536
    data_format = "value"
    data_type = "integer"
    

[[inputs.kafka_consumer]]
    name_override = "KI2"
    ## kafka servers
    brokers = ["localhost:9092"]
    ## topic(s) to consume
    topics = ["sample_topic2"]
    ## Add topic as tag if topic_tag is not empty
    # topic_tag = ""
    consumer_group = "sample"
    offset = "oldest"
    max_message_len = 65536
    data_format = "value"
    data_type = "integer"


System info:

Telegraf 1.10.4
kafka_2.12-2.3.0

Steps to reproduce:

./telegraf -config telegraf.toml

Expected behavior:

2019-07-18T15:56:11Z I! Starting Telegraf 1.10.4
2019-07-18T15:56:11Z I! Loaded inputs: kafka_consumer kafka_consumer
2019-07-18T15:56:11Z I! Loaded aggregators:
2019-07-18T15:56:11Z I! Loaded processors:
2019-07-18T15:56:11Z I! Loaded outputs: influxdb influxdb
2019-07-18T15:56:11Z I! Tags enabled: host=xxxxx
2019-07-18T15:56:11Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"xxxxx", Flush Interval:10s
2019-07-18T15:56:11Z I! Started the kafka consumer service, brokers: [localhost:9092], topics: [sample_topic]
2019-07-18T15:56:11Z I! Started the kafka consumer service, brokers: [localhost:9092], topics: [sample_topic2]

Actual behavior:

2019-07-18T15:56:11Z I! Starting Telegraf 1.10.4
2019-07-18T15:56:11Z I! Loaded inputs: kafka_consumer kafka_consumer
2019-07-18T15:56:11Z I! Loaded aggregators:
2019-07-18T15:56:11Z I! Loaded processors:
2019-07-18T15:56:11Z I! Loaded outputs: influxdb influxdb
2019-07-18T15:56:11Z I! Tags enabled: host=xxxxx
2019-07-18T15:56:11Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"xxxxx", Flush Interval:10s
2019-07-18T15:56:11Z I! Started the kafka consumer service, brokers: [localhost:9092], topics: [sample_topic]
2019-07-18T15:56:11Z I! Started the kafka consumer service, brokers: [localhost:9092], topics: [sample_topic2]
2019-07-18T15:56:11Z E! [inputs.kafka_consumer]: Error in plugin: kafka server: The provided member is not known in the current generation.
2019-07-18T15:56:12Z E! [inputs.kafka_consumer]: Error in plugin: kafka server: The provided member is not known in the current generation.
2019-07-18T15:56:13Z E! [inputs.kafka_consumer]: Error in plugin: kafka server: The provided member is not known in the current generation.

Additional info:

When we have two Kafka Consumer Plugins enabled, listening to two different topics and if they belong to the same consumer group, then this error occurs.
Workaround: Place the consumers in different consumer groups and this error does not exist.

@danielnelson danielnelson self-assigned this Jul 19, 2019
@danielnelson danielnelson added area/kafka bug unexpected problem or unintended behavior labels Jul 19, 2019
@danielnelson
Copy link
Contributor

@adithyamk
Copy link
Author

@danielnelson

I run into this error.
Is it expected? Or am I missing some configuration setting somewhere?

./telegraf -config telegraf3.toml
2019-07-26T19:52:40Z I! Starting Telegraf
2019-07-26T19:52:40Z I! Loaded inputs: kafka_consumer kafka_consumer
2019-07-26T19:52:40Z I! Loaded aggregators:
2019-07-26T19:52:40Z I! Loaded processors:
2019-07-26T19:52:40Z I! Loaded outputs: influxdb influxdb
2019-07-26T19:52:40Z I! Tags enabled: host=xxxxx
2019-07-26T19:52:40Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"xxxxx", Flush Interval:10s
2019-07-26T19:52:40Z E! [agent] Service for input inputs.kafka_consumer failed to start: kafka: invalid configuration (consumer groups require Version to be >= V0_10_2_0)
2019-07-26T19:52:40Z E! [telegraf] Error running agent: kafka: invalid configuration (consumer groups require Version to be >= V0_10_2_0)

@danielnelson
Copy link
Contributor

Try setting version = "2.0.0" in the kafka_consumer.

@adithyamk
Copy link
Author

@danielnelson

kafka_2.12-2.3.0
telegraf-1.12.0~9eb7c8dd
This configuration works and there are no issues when I added version = "2.0.0" to the telegraf kafka consumer plugin.

So this fixes the issue.

But I just want to bring to notice that this is not backward compatible, which makes sense.

So with older version of kafka, kafka_2.11-0.11.0.1 and telegraf-1.12.0~9eb7c8dd, this configuration does not work (which would make sense if the kafka library has been updated in telegraf).

@danielnelson
Copy link
Contributor

Can you clarify what happens when you try to use it with Kafka 2.11-0.11.0.1?

@adithyamk
Copy link
Author

./telegraf -config telegraf3.toml
2019-07-26T20:44:20Z I! Starting Telegraf
2019-07-26T20:44:20Z I! Loaded inputs: kafka_consumer kafka_consumer
2019-07-26T20:44:20Z I! Loaded aggregators:
2019-07-26T20:44:20Z I! Loaded processors:
2019-07-26T20:44:20Z I! Loaded outputs: influxdb influxdb
2019-07-26T20:44:20Z I! Tags enabled: host=xxxxx
2019-07-26T20:44:20Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"xxxxx", Flush Interval:10s
2019-07-26T20:44:21Z E! [agent] Service for input inputs.kafka_consumer failed to start: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2019-07-26T20:44:21Z E! [telegraf] Error running agent: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Kafka error

[2019-07-26 16:02:19,290] ERROR Closing socket for xx.xx.xx.xx:9092-xx.xx.xx.xx:52456 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 5
Caused by: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 5
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:173)
at org.apache.kafka.common.protocol.ApiKeys.requestSchema(ApiKeys.java:141)
at org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:149)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:112)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:99)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:93)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:518)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:511)
at kafka.network.Processor.run(SocketServer.scala:436)
at java.lang.Thread.run(Thread.java:748)
[2019-07-26 16:02:19,543] ERROR Closing socket for xx.xx.xx.xx:9092-xx.xx.xx.xx:52458 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 5
Caused by: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 5
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:173)
at org.apache.kafka.common.protocol.ApiKeys.requestSchema(ApiKeys.java:141)
at org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:149)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:112) [0/761]
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:99)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:93)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:518)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:511)
at kafka.network.Processor.run(SocketServer.scala:436)
at java.lang.Thread.run(Thread.java:748)
[2019-07-26 16:02:19,796] ERROR Closing socket for xx.xx.xx.xx:9092-xx.xx.xx.xx:52460 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 5
Caused by: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 5
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:173)
at org.apache.kafka.common.protocol.ApiKeys.requestSchema(ApiKeys.java:141)
at org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:149)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:112)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:99)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:93)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:518)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:511)
at kafka.network.Processor.run(SocketServer.scala:436)
at java.lang.Thread.run(Thread.java:748)
[2019-07-26 16:02:20,048] ERROR Closing socket for xx.xx.xx.xx:9092-xx.xx.xx.xx:52462 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 5
Caused by: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 5
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:173)
at org.apache.kafka.common.protocol.ApiKeys.requestSchema(ApiKeys.java:141)
at org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:149)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:112)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:99)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:93)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:518)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:511)
at kafka.network.Processor.run(SocketServer.scala:436)
at java.lang.Thread.run(Thread.java:748)

@danielnelson
Copy link
Contributor

Does it help for that instance if `version = "0.11.0.1" is set?

@adithyamk
Copy link
Author

When I set the version to 0.11.0.1, it works. So, looks like I need to add version in the telegraf kafka consumer plugin.

@danielnelson
Copy link
Contributor

What I think I'll do then is automatically set the version to at least 0.10.2.0 if consumer groups are being used, because it wasn't previously required to have this set to use consumer groups. So long as the version is set to a value lower than your Kafka version it should work, but some functionality may not be available and it uses older APIs in some cases.

Let's keep the issue open until the PR is merged, and I still need to do a bit of performance verification to make sure things are still running as expected.

@danielnelson danielnelson reopened this Jul 26, 2019
@adithyamk
Copy link
Author

Yes, that should work. Thanks Daniel.

@danielnelson
Copy link
Contributor

Just found a bug in my new code, but I have to run, probably not safe to use the builds for long under load. Will work on an update early next week.

@adithyamk
Copy link
Author

Thanks Daniel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka bug unexpected problem or unintended behavior
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants