Skip to content

Commit

Permalink
Add Kafka Collector (#617)
Browse files Browse the repository at this point in the history
* write stats to kafka

* vendor dependencies for kafka

* update Dockerfile to include library required for kafka

* allow passing an array of brokers

* allow passing single string to brokers

* warn if not all messages got delivered in a timely fashion

* fix cli arg parsing

* add ability to output to kafka with json

* put formatting of samples into function

* remove commented code

* install librdkafka from alpine edge community repo

* convert from confluent-kafka-go to sarama

* revert Dockerfile to original now that crazy deps are gone

* revendor to add sarama and remove confluent-kafka-go

* gofmt

* use mapstructure to unwind arg string

* remove invalid test

* convert string to []string

* use null types

* add simple test for kafka.formatSamples

* updates after rebasing from master

* remove unused functions

* update test name

* do not need []null.String for []string

* allow passing influx config to kafka collector

* make PushInterval configurable

* convert PushInterval to types.NullDuration

* use the Duration directly
  • Loading branch information
jmccann authored and na-- committed May 17, 2018
1 parent d5f30cb commit 073d7ea
Show file tree
Hide file tree
Showing 194 changed files with 25,285 additions and 18 deletions.
62 changes: 61 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions cmd/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ import (
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/influxdb"
jsonc "github.com/loadimpact/k6/stats/json"
"github.com/loadimpact/k6/stats/kafka"
"github.com/pkg/errors"
"github.com/spf13/afero"
)

const (
collectorInfluxDB = "influxdb"
collectorJSON = "json"
collectorKafka = "kafka"
collectorCloud = "cloud"
)

Expand Down Expand Up @@ -78,6 +80,19 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) (
config.Name = null.StringFrom(arg)
}
return cloud.New(config, src, conf.Options, Version)
case collectorKafka:
config := kafka.NewConfig().Apply(conf.Collectors.Kafka)
if err := envconfig.Process("k6", &config); err != nil {
return nil, err
}
if arg != "" {
cmdConfig, err := kafka.ParseArg(arg)
if err != nil {
return nil, err
}
config = config.Apply(cmdConfig)
}
return kafka.New(config)
default:
return nil, errors.Errorf("unknown output type: %s", collectorName)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/influxdb"
"github.com/loadimpact/k6/stats/kafka"
"github.com/shibukawa/configdir"
"github.com/spf13/afero"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -69,6 +70,7 @@ type Config struct {

Collectors struct {
InfluxDB influxdb.Config `json:"influxdb"`
Kafka kafka.Config `json:"kafka"`
Cloud cloud.Config `json:"cloud"`
} `json:"collectors"`
}
Expand Down
58 changes: 42 additions & 16 deletions stats/influxdb/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,36 @@ func (c *Collector) commit() {
c.bufferLock.Unlock()

log.Debug("InfluxDB: Committing...")

batch, err := c.batchFromSamples(samples)
if err != nil {
return
}

log.WithField("points", len(batch.Points())).Debug("InfluxDB: Writing...")
startTime := time.Now()
if err := c.Client.Write(batch); err != nil {
log.WithError(err).Error("InfluxDB: Couldn't write stats")
}
t := time.Since(startTime)
log.WithField("t", t).Debug("InfluxDB: Batch written!")
}

func (c *Collector) extractTagsToValues(tags map[string]string, values map[string]interface{}) map[string]interface{} {
for _, tag := range c.Config.TagsAsFields {
if val, ok := tags[tag]; ok {
values[tag] = val
delete(tags, tag)
}
}
return values
}

func (c *Collector) batchFromSamples(samples []stats.Sample) (client.BatchPoints, error) {
batch, err := client.NewBatchPoints(c.BatchConf)
if err != nil {
log.WithError(err).Error("InfluxDB: Couldn't make a batch")
return
return nil, err
}

type cacheItem struct {
Expand Down Expand Up @@ -137,28 +163,28 @@ func (c *Collector) commit() {
)
if err != nil {
log.WithError(err).Error("InfluxDB: Couldn't make point from sample!")
return
return nil, err
}
batch.AddPoint(p)
}

log.WithField("points", len(batch.Points())).Debug("InfluxDB: Writing...")
startTime := time.Now()
if err := c.Client.Write(batch); err != nil {
log.WithError(err).Error("InfluxDB: Couldn't write stats")
}
t := time.Since(startTime)
log.WithField("t", t).Debug("InfluxDB: Batch written!")
return batch, err
}

func (c *Collector) extractTagsToValues(tags map[string]string, values map[string]interface{}) map[string]interface{} {
for _, tag := range c.Config.TagsAsFields {
if val, ok := tags[tag]; ok {
values[tag] = val
delete(tags, tag)
}
// Format returns a string array of metrics in influx line-protocol
func (c *Collector) Format(samples []stats.Sample) ([]string, error) {
var metrics []string
batch, err := c.batchFromSamples(samples)

if err != nil {
return metrics, err
}
return values

for _, point := range batch.Points() {
metrics = append(metrics, point.String())
}

return metrics, nil
}

// GetRequiredSystemTags returns which sample tags are needed by this collector
Expand Down
26 changes: 26 additions & 0 deletions stats/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"strings"

"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -84,6 +86,30 @@ func (c Config) Apply(cfg Config) Config {
return c
}

// ParseArg parses an argument string into a Config
func ParseArg(arg string) (Config, error) {
c := Config{}
params, err := strvals.Parse(arg)

if err != nil {
return c, err
}

c, err = ParseMap(params)
return c, err
}

// ParseMap parses a map[string]interface{} into a Config
func ParseMap(m map[string]interface{}) (Config, error) {
c := Config{}
if v, ok := m["tagsAsFields"].(string); ok {
m["tagsAsFields"] = []string{v}
}

err := mapstructure.Decode(m, &c)
return c, err
}

func ParseURL(text string) (Config, error) {
c := Config{}
u, err := url.Parse(text)
Expand Down
21 changes: 20 additions & 1 deletion stats/influxdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@ import (
"github.com/stretchr/testify/assert"
)

func TestConfigText(t *testing.T) {
func TestParseArg(t *testing.T) {
testdata := map[string]Config{
"": {},
"db=dbname": {DB: "dbname"},
"addr=http://localhost:8086": {Addr: "http://localhost:8086"},
"addr=http://localhost:8086,db=dbname": {Addr: "http://localhost:8086", DB: "dbname"},
"addr=http://localhost:8086,db=dbname,insecure=false,payloadSize=69": {Addr: "http://localhost:8086", DB: "dbname", Insecure: false, PayloadSize: 69},
}

for str, expConfig := range testdata {
t.Run(str, func(t *testing.T) {
config, err := ParseArg(str)

assert.NoError(t, err)
assert.Equal(t, expConfig, config)
})
}
}

func TestParseURL(t *testing.T) {
testdata := map[string]Config{
"": {},
"dbname": {DB: "dbname"},
Expand Down
Loading

0 comments on commit 073d7ea

Please sign in to comment.