Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Collector #617

Merged
merged 28 commits into from
May 17, 2018
Merged

Add Kafka Collector #617

merged 28 commits into from
May 17, 2018

Conversation

jmccann
Copy link
Contributor

@jmccann jmccann commented May 10, 2018

Adding the ability to output samples to Kafka. This is in support of #344

Sample command:
k6 run --out kafka=brokers=some.broker:8000,topic=k6,format=influxdb script.js

The above will make k6 connect to a Kafka instance listening to port 8000 on some.broker, and insert all test results data into a topic named k6 (which will be created if it doesn't exist) formatted in influx line-protocol (per format=influxdb).

This would allow Kafka to sit in front of an influxDB instance that ingests metrics from that same Kafka instance/topic. I have tested this setup and it worked!! 😸

Also supports format=json which is default if nothing is defined.

Additional Notes

I used (per @na-- suggestion, thanks!) github.com/Shopify/sarama Kafka library which is pure go. I did originally use github.com/confluentinc/confluent-kafka-go which required librdkafka and CGO to be enabled.

I also included github.com/kubernetes/helm/pkg/strvals to allow easy parsing of cmd line arguments. This allows an array of brokers to be passed via command line, e.g.

k6 --out kafka=brokers={broker1,broker2},topic=k6... or k6 --out kafka=brokers[0]=broker1,brokers[1]=broker2,topic=k6...

@jmccann
Copy link
Contributor Author

jmccann commented May 10, 2018

Looks like I need to prep the circleci env with the librdkafka and it's dependencies. I'll be adding multiple commits as I work through that. Will rebase/squash when I get it working.

Please review what I have so far though and let me know of any comments/changes! 😄

@jmccann
Copy link
Contributor Author

jmccann commented May 10, 2018

Since converting to use github.com/Shopify/sarama I think CircleCI will be more happy now. 😄

@codecov-io
Copy link

codecov-io commented May 10, 2018

Codecov Report

Merging #617 into master will decrease coverage by 0.32%.
The diff coverage is 42.85%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #617      +/-   ##
==========================================
- Coverage   64.51%   64.19%   -0.33%     
==========================================
  Files          98      100       +2     
  Lines        7829     7976     +147     
==========================================
+ Hits         5051     5120      +69     
- Misses       2488     2550      +62     
- Partials      290      306      +16
Impacted Files Coverage Δ
cmd/config.go 37.5% <ø> (ø) ⬆️
cmd/collectors.go 0% <0%> (ø) ⬆️
stats/influxdb/collector.go 0% <0%> (ø) ⬆️
stats/kafka/collector.go 51.38% <51.38%> (ø)
stats/kafka/config.go 58.97% <58.97%> (ø)
stats/influxdb/config.go 39.18% <69.23%> (+6.4%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ee5a9c3...e45ed9c. Read the comment docs.

Copy link
Member

@na-- na-- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the minor nitpicks I've commented on inline, this looks very good to me! Thank you very much for contributing it!

I'd never used the sarama library, I just knew about it, so I'm happy that it's working fine. It brings a bunch of dependencies, but since they're native Go, I'm very satisfied with it! 😄

Brokers []string `json:"brokers" envconfig:"KAFKA_BROKERS"`

// Samples.
Topic string `json:"topic" envconfig:"KAFKA_TOPIC"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you switch to null-able fields for the config fields, something like this or this? I know that the current config for the influxdb collector is not that way, but that's something that we'll soon fix.

return err
}

for key, value := range params {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very verbose if we're already using a library to parse the actual string. I'm not sure, maybe there's a library to directly inject the values in the Config struct (by using some sort of struct field tags like json and other encodings), but maybe the mapstructure library (which is already a k6 dependency) will work well with the strvals.Parse() result.

)

const (
pushInterval = 1 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this configurable as well with a default of 1 second, something like this? I know that, like the comment bellow says about the null-able config values, it's not currently configurable in the influxdb collector, but we'll soon fix it there as well.

jsonCollector "github.com/loadimpact/k6/stats/json"
)

func formatSamples(format string, samples []stats.Sample) ([]string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a very simple test for this. I doubt we'll have any issues with the influxdb line-protocol changing any time soon, but since this collector and the json and influxdb ones will be now linked through this function, it's possible for some future refactoring of the other collectors to inadvertently break this.


switch format {
case "influx":
i, err := influxdb.New(influxdb.Config{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be super cool if we can honour the "InfluxDB tags as fields setting" (PR) here as well.

It should be possible if you make formatSamples() a method of kafka.Collector and somehow pass to the constructor the whole Collectors config struct and save it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm but then this would become even more complicated... Not sure how much we can abuse the strvals library and if we can encode the influxdb config in it as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@na-- So I think I was able to get this to work locally ... but because of how the influx collector parses the args you need to provide it a fake config string to have a query parameter for tags ... e.g.

brokers={broker2,broker3:9092},topic=someTopic2,format=influxdb,influxdb=http://influx:20?tagsAsFields=vu

So even though I'm not really sending to an influxdb instance I need to specify a "url" (influxdb=http://influx:20?tagsAsFields=vu) to pass the parameter, even though it's not an actual influx db instance parameter and is meant actually for k6.

I can still proceed with this ... but maybe an issue on how to parse influxdb config should be created and addressed first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'd be okay if we support this but require that a dummy url is specified at this time, and then in the future fix it properly as part of the changes needed for #586.

@na-- na-- requested a review from luizbafilho May 11, 2018 10:54
@jmccann
Copy link
Contributor Author

jmccann commented May 11, 2018

I will get to work on the updates starting today. May not finish until sometime Monday. Thanks for looking through it!

@jmccann
Copy link
Contributor Author

jmccann commented May 14, 2018

Assuming CI passes I think this is mostly ready. I still am not sure how I should handle the influxDB config that could be passed to Kafka.

I did rebase from master and make some necessary updates from some "breaking" changes also. 😃

Copy link
Member

@na-- na-- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read through this again and left a few more comments. And sorry for merging huge changes in master while you were doing this, I had to do a lot of refactoring of how samples were generated and propagated to do the cloud aggregation in a relatively sane way...

log "github.com/sirupsen/logrus"
)

// Collector implements the lib.Collector interface and should be used only for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a copy-pasted comment 😄. I really like how the kafka collector is shaping, so I sincerely hope that people use it a lot for actual work 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want me to remove this and the below comment or update their text?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just update the text so it reflects the reality. The dummy collector from which this was copy-pasted is supposed to be used only for unit testing, that's why it says should be used only for testing

// detect incorrect usage.
// Also, theoretically the collector doesn't have to actually Run() before samples start
// being collected, it only has to be initialized.
func (c *Collector) Collect(scs []stats.SampleContainer) {

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into this more. When I was working on running multiple collectors there were panics coming from sending from Run() but when I moved it to collect (after seeing that is where json was writing the file) I had no issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but the json generally won't have to wait for network requests. And it's mostly meant for debugging, not for actual production usage.

Copy link
Contributor Author

@jmccann jmccann May 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I put this back to how it was before. Will test some more to see if it has issues again with multiple collectors or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be running fine! 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm seems like I can hide only an individual comment but not the whole inline thread... Anyway, this has been resolved, though the method comment could probably be shortened, since most of the stuff after the first line is useful only for the dummy collector and how it's used in the automated tests.

Format null.String `json:"format" envconfig:"KAFKA_FORMAT"`
}

// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, forgot about that annoying mapstructure limitation... Don't have a better solution though, besides finding something better than strvals that actually understands struct tags...


switch format {
case "influx":
i, err := influxdb.New(influxdb.Config{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'd be okay if we support this but require that a dummy url is specified at this time, and then in the future fix it properly as part of the changes needed for #586.


type ConfigFields struct {
// Connection.
Brokers []null.String `json:"brokers" envconfig:"KAFKA_BROKERS"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly required to be []null.String, since []string actually has a nil value - nil 😄. The issue with the normal config string/int/bool/etc. values is that there was no way to differentiate between an empty ""/0/false/etc. value and "there is no set value", "the user hasn't supplied anything", etc/ (a.k.a. null/nil). But for []string we can have an empty slice for a zero value and nil for "no value"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, OK ... thanks for leveling up my go-fu and I'll get this updated. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 and yeah, there's a lot of benefits to Go's simplicity, but its type system leaves much to be desired...

@jmccann jmccann force-pushed the kafka branch 2 times, most recently from 3afdd13 to 0a2f82f Compare May 15, 2018 13:27
@jmccann
Copy link
Contributor Author

jmccann commented May 15, 2018

I added the ability to pass influx config via Kafka. It would look something like

brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields={fake,anotherFake}

I also added a ParseArg and ParseMap functions to influxdb. I use ParseMap in the kafka package but figured was most of way to allowing passing influxdb args without using purely a query format so added ParseArg as well to it with some testing. (scope creep! 😨 )

Copy link
Member

@na-- na-- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, besides the small issue with c.Config.PushInterval in Run() I noted. @luizbafilho, please take a look as well when you have time.

// Run just blocks until the context is done
func (c *Collector) Run(ctx context.Context) {
log.Debug("Kafka: Running!")
d, err := time.ParseDuration(c.Config.PushInterval.String())
Copy link
Member

@na-- na-- May 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be c.Config.PushInterval.Duration, there's no reason to serialize the PushInterval to string only to parse it back to duration again. Besides, validating options shouldn't be done in Run() - instead New() or Init() are much more appropriate places for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will work on this

)

func TestRun(t *testing.T) {
broker := sarama.NewMockBroker(t, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I was really happy when I stumbled across this!

}

// ParseArg takes an arg string and converts it to a config
func ParseArg(arg string) (Config, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite like how complex the configuration parsing is turning out to be, but I think that's more of a reflection of the limitations we have with the current configuration approach in k6 and the libraries we use (nil-able types, mapstructure, strvals, envconfig, etc. don't play very nice together). So it's fine for now, but definitely something I'd like to revisit in the future.

Copy link
Contributor Author

@jmccann jmccann May 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree ... each has it's strengths for a purpose but not working good together. 😢 Thanks for allowing this to be worked in a separate issue/pr.

Copy link
Contributor

@luizbafilho luizbafilho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@na-- na-- merged commit 073d7ea into grafana:master May 17, 2018
@jmccann jmccann deleted the kafka branch May 17, 2018 18:20
@na-- na-- mentioned this pull request Aug 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants