Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka plugin refactor #375

Merged
merged 1 commit into from
Nov 19, 2015
Merged

Kafka plugin refactor #375

merged 1 commit into from
Nov 19, 2015

Conversation

sparrc
Copy link
Contributor

@sparrc sparrc commented Nov 16, 2015

@panda87, this is a work in progress, any feedback appreciated

}

func init() {
plugins.Add("kafka", func() plugins.Plugin {
plugins.Add("kafka_consumer", func() plugins.Plugin {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a breaking change, but this is what the plugin was supposed to be called in the 1st place. The original author only changed the directory name but not the plugin name.

I think it's OK to make this breaking change because this plugin was completely broken anyways 😷

@panda87
Copy link

panda87 commented Nov 16, 2015

Thanks @sparrc!
My only feedback is if you can add the consuming partitions amount to as a parameter, as I said previously for cases which using more than one partition per topic is a big deal, since it's consume from multiple threads instead of one.
But if it's too much and this plugin is working I said 👍

@sparrc
Copy link
Contributor Author

sparrc commented Nov 16, 2015

@panda87 do you happen to know how to do that? I can't find any configuration options for specifying consume partitions, (see: https://godoc.org/github.com/wvanbergen/kafka/consumergroup#Config and https://godoc.org/github.com/Shopify/sarama#Config)

@sparrc
Copy link
Contributor Author

sparrc commented Nov 16, 2015

@panda87 are you just asking for the consumer to have multiple goroutines reading the channel? https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

@panda87
Copy link

panda87 commented Nov 16, 2015

The answer is yes, but not from the Go channel but the kafka channel (topic).

I can see 2 types which implement what I asked:
https://godoc.org/github.com/Shopify/sarama#Consumer
https://godoc.org/github.com/Shopify/sarama#ConsumerMessage

@panda87
Copy link

panda87 commented Nov 16, 2015

Kafka are supporting consume topic from multiple threads. in Go it's called goroutines in others it's just another threads management methods.
Think that you have one kafka topic with 5 shards (5 different brokers), if you will use the default config which is 1 thread, you will need to connect each broker and pull the logs, and if you will work in parallel you will have 5 threads that each of them consume from a different broker.

In regarding to the offset management, the ability to choose the offset to start consume with, is for the cases you want to start consume from offset 1 or continue from the latest offset number.

Hope I explained myself better now.

btw, thanks for adding this plugin to be able to consume from multiple topic in a single conf file.

@sparrc
Copy link
Contributor Author

sparrc commented Nov 16, 2015

@panda87 From what I can tell, the consumergroup implementation is already doing this via zookeeper, see here: https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L276-L324

@panda87
Copy link

panda87 commented Nov 16, 2015

Yeah, it looks like it fetch the number of partitions,
If it's using this list so this answered my question.
What about the offset? what do you think?

@sparrc sparrc force-pushed the kafka-bug branch 2 times, most recently from c84b42d to 7c69dcb Compare November 16, 2015 23:17
@sparrc
Copy link
Contributor Author

sparrc commented Nov 16, 2015

@panda87 I think I can do the offset too. From what I understand, there are only two offset options, correct? OffsetOldest (default) or OffsetNewest?

@panda87
Copy link

panda87 commented Nov 17, 2015

Yes. Correct

@sparrc sparrc force-pushed the kafka-bug branch 8 times, most recently from a3ed5e5 to 42f3961 Compare November 18, 2015 20:51
@sparrc
Copy link
Contributor Author

sparrc commented Nov 18, 2015

@panda87 This is pretty much ready to go, let me know what you think, you can see that I've added facility for consuming multiple topics and setting the offset

@sparrc sparrc changed the title [WIP] Kafka plugin refactor Kafka plugin refactor Nov 18, 2015
@panda87
Copy link

panda87 commented Nov 19, 2015

@sparrc this is look very good!!
The only way to test it is only to clone and build the package or I can download it from the circleCI?

@sparrc
Copy link
Contributor Author

sparrc commented Nov 19, 2015

@panda87 currently there are no build artifacts being stored in CircleCI, I am going to work on getting nightly builds going for Telegraf soon

@sparrc sparrc merged commit 970bfce into master Nov 19, 2015
@sparrc sparrc deleted the kafka-bug branch November 30, 2015 18:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants