Skip to content

Commit

Permalink
allow passing influx config to kafka collector
Browse files Browse the repository at this point in the history
  • Loading branch information
jmccann committed May 15, 2018
1 parent d8211d4 commit aff7ab9
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 49 deletions.
26 changes: 26 additions & 0 deletions stats/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"strings"

"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -84,6 +86,30 @@ func (c Config) Apply(cfg Config) Config {
return c
}

// ParseArg parses an argument string into a Config
func ParseArg(arg string) (Config, error) {
c := Config{}
params, err := strvals.Parse(arg)

if err != nil {
return c, err
}

c, err = ParseMap(params)
return c, err
}

// ParseMap parses a map[string]interface{} into a Config
func ParseMap(m map[string]interface{}) (Config, error) {
c := Config{}
if v, ok := m["tagsAsFields"].(string); ok {
m["tagsAsFields"] = []string{v}
}

err := mapstructure.Decode(m, &c)
return c, err
}

func ParseURL(text string) (Config, error) {
c := Config{}
u, err := url.Parse(text)
Expand Down
21 changes: 20 additions & 1 deletion stats/influxdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@ import (
"github.com/stretchr/testify/assert"
)

func TestConfigText(t *testing.T) {
func TestParseArg(t *testing.T) {
testdata := map[string]Config{
"": {},
"db=dbname": {DB: "dbname"},
"addr=http://localhost:8086": {Addr: "http://localhost:8086"},
"addr=http://localhost:8086,db=dbname": {Addr: "http://localhost:8086", DB: "dbname"},
"addr=http://localhost:8086,db=dbname,insecure=false,payloadSize=69": {Addr: "http://localhost:8086", DB: "dbname", Insecure: false, PayloadSize: 69},
}

for str, expConfig := range testdata {
t.Run(str, func(t *testing.T) {
config, err := ParseArg(str)

assert.NoError(t, err)
assert.Equal(t, expConfig, config)
})
}
}

func TestParseURL(t *testing.T) {
testdata := map[string]Config{
"": {},
"dbname": {DB: "dbname"},
Expand Down
34 changes: 33 additions & 1 deletion stats/kafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ package kafka

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/influxdb"
jsonc "github.com/loadimpact/k6/stats/json"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -104,6 +107,35 @@ func (c *Collector) GetRequiredSystemTags() lib.TagSet {
return lib.TagSet{} // There are no required tags for this collector
}

func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) {
var metrics []string

switch c.Config.Format.String {
case "influxdb":
i, err := influxdb.New(c.Config.InfluxDBConfig)
if err != nil {
return nil, err
}

metrics, err = i.Format(samples)
if err != nil {
return nil, err
}
default:
for _, sample := range samples {
env := jsonc.WrapSample(&sample)
metric, err := json.Marshal(env)
if err != nil {
return nil, err
}

metrics = append(metrics, string(metric))
}
}

return metrics, nil
}

func (c *Collector) pushMetrics() {
startTime := time.Now()

Expand All @@ -113,7 +145,7 @@ func (c *Collector) pushMetrics() {
c.lock.Unlock()

// Format the samples
formattedSamples, err := formatSamples(c.Config.Format.String, samples)
formattedSamples, err := c.formatSamples(samples)
if err != nil {
log.WithError(err).Error("Kafka: Couldn't format the samples")
return
Expand Down
8 changes: 6 additions & 2 deletions stats/kafka/utils_test.go → stats/kafka/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@ import (

"github.com/loadimpact/k6/stats"
"github.com/stretchr/testify/assert"
"gopkg.in/guregu/null.v3"
)

func TestFormatSamples(t *testing.T) {
c := Collector{}
metric := stats.New("my_metric", stats.Gauge)
samples := stats.Samples{
{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
{Metric: metric, Value: 2, Tags: stats.IntoSampleTags(&map[string]string{"b": "2"})},
}

fmtdSamples, err := formatSamples("influx", samples)
c.Config.Format = null.NewString("influxdb", false)
fmtdSamples, err := c.formatSamples(samples)

assert.Nil(t, err)
assert.Equal(t, []string{"my_metric,a=1 value=1.25", "my_metric,b=2 value=2"}, fmtdSamples)

fmtdSamples, err = formatSamples("json", samples)
c.Config.Format = null.NewString("json", false)
fmtdSamples, err = c.formatSamples(samples)

expJSON1 := "{\"type\":\"Point\",\"data\":{\"time\":\"0001-01-01T00:00:00Z\",\"value\":1.25,\"tags\":{\"a\":\"1\"}},\"metric\":\"my_metric\"}"
expJSON2 := "{\"type\":\"Point\",\"data\":{\"time\":\"0001-01-01T00:00:00Z\",\"value\":2,\"tags\":{\"b\":\"2\"}},\"metric\":\"my_metric\"}"
Expand Down
18 changes: 13 additions & 5 deletions stats/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package kafka

import (
"github.com/kubernetes/helm/pkg/strvals"
"github.com/loadimpact/k6/stats/influxdb"
"github.com/mitchellh/mapstructure"
"gopkg.in/guregu/null.v3"
)
Expand All @@ -33,6 +34,8 @@ type ConfigFields struct {
// Samples.
Topic null.String `json:"topic" envconfig:"KAFKA_TOPIC"`
Format null.String `json:"format" envconfig:"KAFKA_FORMAT"`

InfluxDBConfig influxdb.Config `json:"influxdb"`
}

// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
Expand All @@ -41,6 +44,8 @@ type config struct {
Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"KAFKA_BROKERS"`
Topic string `json:"topic" mapstructure:"topic" envconfig:"KAFKA_TOPIC"`
Format string `json:"format" mapstructure:"format" envconfig:"KAFKA_FORMAT"`

InfluxDBConfig influxdb.Config `json:"influxdb" mapstructure:"influxdb"`
}

type Config ConfigFields
Expand Down Expand Up @@ -78,14 +83,17 @@ func ParseArg(arg string) (Config, error) {
params["brokers"] = []string{v}
}

input := map[string]interface{}{
"brokers": params["brokers"],
"topic": params["topic"],
"format": params["format"],
if v, ok := params["influxdb"].(map[string]interface{}); ok {
influxConfig, err := influxdb.ParseMap(v)
if err != nil {
return c, err
}
c.InfluxDBConfig = influxConfig
}
delete(params, "influxdb")

var cfg config
err = mapstructure.Decode(input, &cfg)
err = mapstructure.Decode(params, &cfg)
if err != nil {
return c, err
}
Expand Down
27 changes: 25 additions & 2 deletions stats/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,43 @@ package kafka
import (
"testing"

"github.com/loadimpact/k6/stats/influxdb"
"github.com/stretchr/testify/assert"
"gopkg.in/guregu/null.v3"
)

func TestConfigParseArg(t *testing.T) {
c, err := ParseArg("brokers=broker1,topic=someTopic,format=influx")
c, err := ParseArg("brokers=broker1,topic=someTopic,format=influxdb")
expInfluxConfig := influxdb.Config{}
assert.Nil(t, err)
assert.Equal(t, []string{"broker1"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("influx"), c.Format)
assert.Equal(t, null.StringFrom("influxdb"), c.Format)
assert.Equal(t, expInfluxConfig, c.InfluxDBConfig)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic2,format=json")
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic2"), c.Topic)
assert.Equal(t, null.StringFrom("json"), c.Format)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields=fake")
expInfluxConfig = influxdb.Config{
TagsAsFields: []string{"fake"},
}
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("influxdb"), c.Format)
assert.Equal(t, expInfluxConfig, c.InfluxDBConfig)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields={fake,anotherFake}")
expInfluxConfig = influxdb.Config{
TagsAsFields: []string{"fake", "anotherFake"},
}
assert.Nil(t, err)
assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers)
assert.Equal(t, null.StringFrom("someTopic"), c.Topic)
assert.Equal(t, null.StringFrom("influxdb"), c.Format)
assert.Equal(t, expInfluxConfig, c.InfluxDBConfig)
}
38 changes: 0 additions & 38 deletions stats/kafka/utils.go

This file was deleted.

0 comments on commit aff7ab9

Please sign in to comment.