diff --git a/stats/influxdb/config.go b/stats/influxdb/config.go index 131f72d1861..c9b443abeb5 100644 --- a/stats/influxdb/config.go +++ b/stats/influxdb/config.go @@ -25,6 +25,8 @@ import ( "strconv" "strings" + "github.com/kubernetes/helm/pkg/strvals" + "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -84,6 +86,30 @@ func (c Config) Apply(cfg Config) Config { return c } +// ParseArg parses an argument string into a Config +func ParseArg(arg string) (Config, error) { + c := Config{} + params, err := strvals.Parse(arg) + + if err != nil { + return c, err + } + + c, err = ParseMap(params) + return c, err +} + +// ParseMap parses a map[string]interface{} into a Config +func ParseMap(m map[string]interface{}) (Config, error) { + c := Config{} + if v, ok := m["tagsAsFields"].(string); ok { + m["tagsAsFields"] = []string{v} + } + + err := mapstructure.Decode(m, &c) + return c, err +} + func ParseURL(text string) (Config, error) { c := Config{} u, err := url.Parse(text) diff --git a/stats/influxdb/config_test.go b/stats/influxdb/config_test.go index ee712d21f02..414875c1bda 100644 --- a/stats/influxdb/config_test.go +++ b/stats/influxdb/config_test.go @@ -26,7 +26,26 @@ import ( "github.com/stretchr/testify/assert" ) -func TestConfigText(t *testing.T) { +func TestParseArg(t *testing.T) { + testdata := map[string]Config{ + "": {}, + "db=dbname": {DB: "dbname"}, + "addr=http://localhost:8086": {Addr: "http://localhost:8086"}, + "addr=http://localhost:8086,db=dbname": {Addr: "http://localhost:8086", DB: "dbname"}, + "addr=http://localhost:8086,db=dbname,insecure=false,payloadSize=69": {Addr: "http://localhost:8086", DB: "dbname", Insecure: false, PayloadSize: 69}, + } + + for str, expConfig := range testdata { + t.Run(str, func(t *testing.T) { + config, err := ParseArg(str) + + assert.NoError(t, err) + assert.Equal(t, expConfig, config) + }) + } +} + +func TestParseURL(t *testing.T) { testdata := map[string]Config{ "": {}, "dbname": {DB: "dbname"}, diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go index ef430cee813..77b66566b52 100644 --- a/stats/kafka/collector.go +++ b/stats/kafka/collector.go @@ -22,12 +22,15 @@ package kafka import ( "context" + "encoding/json" "sync" "time" "github.com/Shopify/sarama" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/influxdb" + jsonc "github.com/loadimpact/k6/stats/json" log "github.com/sirupsen/logrus" ) @@ -104,6 +107,35 @@ func (c *Collector) GetRequiredSystemTags() lib.TagSet { return lib.TagSet{} // There are no required tags for this collector } +func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) { + var metrics []string + + switch c.Config.Format.String { + case "influxdb": + i, err := influxdb.New(c.Config.InfluxDBConfig) + if err != nil { + return nil, err + } + + metrics, err = i.Format(samples) + if err != nil { + return nil, err + } + default: + for _, sample := range samples { + env := jsonc.WrapSample(&sample) + metric, err := json.Marshal(env) + if err != nil { + return nil, err + } + + metrics = append(metrics, string(metric)) + } + } + + return metrics, nil +} + func (c *Collector) pushMetrics() { startTime := time.Now() @@ -113,7 +145,7 @@ func (c *Collector) pushMetrics() { c.lock.Unlock() // Format the samples - formattedSamples, err := formatSamples(c.Config.Format.String, samples) + formattedSamples, err := c.formatSamples(samples) if err != nil { log.WithError(err).Error("Kafka: Couldn't format the samples") return diff --git a/stats/kafka/utils_test.go b/stats/kafka/collector_test.go similarity index 87% rename from stats/kafka/utils_test.go rename to stats/kafka/collector_test.go index c9359e5d2ca..5b836aa3c44 100644 --- a/stats/kafka/utils_test.go +++ b/stats/kafka/collector_test.go @@ -25,21 +25,25 @@ import ( "github.com/loadimpact/k6/stats" "github.com/stretchr/testify/assert" + "gopkg.in/guregu/null.v3" ) func TestFormatSamples(t *testing.T) { + c := Collector{} metric := stats.New("my_metric", stats.Gauge) samples := stats.Samples{ {Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}, {Metric: metric, Value: 2, Tags: stats.IntoSampleTags(&map[string]string{"b": "2"})}, } - fmtdSamples, err := formatSamples("influx", samples) + c.Config.Format = null.NewString("influxdb", false) + fmtdSamples, err := c.formatSamples(samples) assert.Nil(t, err) assert.Equal(t, []string{"my_metric,a=1 value=1.25", "my_metric,b=2 value=2"}, fmtdSamples) - fmtdSamples, err = formatSamples("json", samples) + c.Config.Format = null.NewString("json", false) + fmtdSamples, err = c.formatSamples(samples) expJSON1 := "{\"type\":\"Point\",\"data\":{\"time\":\"0001-01-01T00:00:00Z\",\"value\":1.25,\"tags\":{\"a\":\"1\"}},\"metric\":\"my_metric\"}" expJSON2 := "{\"type\":\"Point\",\"data\":{\"time\":\"0001-01-01T00:00:00Z\",\"value\":2,\"tags\":{\"b\":\"2\"}},\"metric\":\"my_metric\"}" diff --git a/stats/kafka/config.go b/stats/kafka/config.go index 234b1020fbd..6fd31042e59 100644 --- a/stats/kafka/config.go +++ b/stats/kafka/config.go @@ -22,6 +22,7 @@ package kafka import ( "github.com/kubernetes/helm/pkg/strvals" + "github.com/loadimpact/k6/stats/influxdb" "github.com/mitchellh/mapstructure" "gopkg.in/guregu/null.v3" ) @@ -33,6 +34,8 @@ type ConfigFields struct { // Samples. Topic null.String `json:"topic" envconfig:"KAFKA_TOPIC"` Format null.String `json:"format" envconfig:"KAFKA_FORMAT"` + + InfluxDBConfig influxdb.Config `json:"influxdb"` } // config is a duplicate of ConfigFields as we can not mapstructure.Decode into @@ -41,6 +44,8 @@ type config struct { Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"KAFKA_BROKERS"` Topic string `json:"topic" mapstructure:"topic" envconfig:"KAFKA_TOPIC"` Format string `json:"format" mapstructure:"format" envconfig:"KAFKA_FORMAT"` + + InfluxDBConfig influxdb.Config `json:"influxdb" mapstructure:"influxdb"` } type Config ConfigFields @@ -78,14 +83,17 @@ func ParseArg(arg string) (Config, error) { params["brokers"] = []string{v} } - input := map[string]interface{}{ - "brokers": params["brokers"], - "topic": params["topic"], - "format": params["format"], + if v, ok := params["influxdb"].(map[string]interface{}); ok { + influxConfig, err := influxdb.ParseMap(v) + if err != nil { + return c, err + } + c.InfluxDBConfig = influxConfig } + delete(params, "influxdb") var cfg config - err = mapstructure.Decode(input, &cfg) + err = mapstructure.Decode(params, &cfg) if err != nil { return c, err } diff --git a/stats/kafka/config_test.go b/stats/kafka/config_test.go index 76f184bcd78..cbced1f7e5d 100644 --- a/stats/kafka/config_test.go +++ b/stats/kafka/config_test.go @@ -23,20 +23,43 @@ package kafka import ( "testing" + "github.com/loadimpact/k6/stats/influxdb" "github.com/stretchr/testify/assert" "gopkg.in/guregu/null.v3" ) func TestConfigParseArg(t *testing.T) { - c, err := ParseArg("brokers=broker1,topic=someTopic,format=influx") + c, err := ParseArg("brokers=broker1,topic=someTopic,format=influxdb") + expInfluxConfig := influxdb.Config{} assert.Nil(t, err) assert.Equal(t, []string{"broker1"}, c.Brokers) assert.Equal(t, null.StringFrom("someTopic"), c.Topic) - assert.Equal(t, null.StringFrom("influx"), c.Format) + assert.Equal(t, null.StringFrom("influxdb"), c.Format) + assert.Equal(t, expInfluxConfig, c.InfluxDBConfig) c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic2,format=json") assert.Nil(t, err) assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers) assert.Equal(t, null.StringFrom("someTopic2"), c.Topic) assert.Equal(t, null.StringFrom("json"), c.Format) + + c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields=fake") + expInfluxConfig = influxdb.Config{ + TagsAsFields: []string{"fake"}, + } + assert.Nil(t, err) + assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers) + assert.Equal(t, null.StringFrom("someTopic"), c.Topic) + assert.Equal(t, null.StringFrom("influxdb"), c.Format) + assert.Equal(t, expInfluxConfig, c.InfluxDBConfig) + + c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields={fake,anotherFake}") + expInfluxConfig = influxdb.Config{ + TagsAsFields: []string{"fake", "anotherFake"}, + } + assert.Nil(t, err) + assert.Equal(t, []string{"broker2", "broker3:9092"}, c.Brokers) + assert.Equal(t, null.StringFrom("someTopic"), c.Topic) + assert.Equal(t, null.StringFrom("influxdb"), c.Format) + assert.Equal(t, expInfluxConfig, c.InfluxDBConfig) } diff --git a/stats/kafka/utils.go b/stats/kafka/utils.go deleted file mode 100644 index 689709558a7..00000000000 --- a/stats/kafka/utils.go +++ /dev/null @@ -1,38 +0,0 @@ -package kafka - -import ( - "encoding/json" - - "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/influxdb" - jsonCollector "github.com/loadimpact/k6/stats/json" -) - -func formatSamples(format string, samples stats.Samples) ([]string, error) { - var metrics []string - - switch format { - case "influx": - i, err := influxdb.New(influxdb.Config{}) - if err != nil { - return nil, err - } - - metrics, err = i.Format(samples) - if err != nil { - return nil, err - } - default: - for _, sample := range samples { - env := jsonCollector.WrapSample(&sample) - metric, err := json.Marshal(env) - if err != nil { - return nil, err - } - - metrics = append(metrics, string(metric)) - } - } - - return metrics, nil -}