diff --git a/cloudapi/util.go b/cloudapi/util.go index 9afbc97c06b..bacd2bd6ffc 100644 --- a/cloudapi/util.go +++ b/cloudapi/util.go @@ -20,12 +20,7 @@ package cloudapi -import "fmt" - +// URLForResults returns the cloud URL with the test run results. func URLForResults(refID string, config Config) string { - path := "runs" - if config.Token.String == "" { - path = "anonymous" - } - return fmt.Sprintf("%s/%s/%s", config.WebAppURL.String, path, refID) + return config.WebAppURL.String + "/runs/" + refID } diff --git a/cmd/login_cloud.go b/cmd/login_cloud.go index a22e81e40d7..f2e472e79c2 100644 --- a/cmd/login_cloud.go +++ b/cmd/login_cloud.go @@ -129,6 +129,9 @@ This will set the default token used when just "k6 run -o cloud" is passed.`, newCloudConf.Token = null.StringFrom(res.Token) } + if currentDiskConf.Collectors == nil { + currentDiskConf.Collectors = make(map[string]json.RawMessage) + } currentDiskConf.Collectors["cloud"], err = json.Marshal(newCloudConf) if err != nil { return err diff --git a/cmd/login_influxdb.go b/cmd/login_influxdb.go index 264d5083b33..009cded451f 100644 --- a/cmd/login_influxdb.go +++ b/cmd/login_influxdb.go @@ -121,6 +121,9 @@ This will set the default server used when just "-o influxdb" is passed.`, return err } + if config.Collectors == nil { + config.Collectors = make(map[string]json.RawMessage) + } config.Collectors["influxdb"], err = json.Marshal(conf) if err != nil { return err diff --git a/cmd/outputs.go b/cmd/outputs.go index 02fb2ed9de6..25950fc6b56 100644 --- a/cmd/outputs.go +++ b/cmd/outputs.go @@ -29,14 +29,12 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/lib/consts" "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/output/cloud" "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/cloud" "github.com/loadimpact/k6/stats/csv" "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" @@ -49,7 +47,8 @@ import ( func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) { // Start with the built-in outputs result := map[string]func(output.Params) (output.Output, error){ - "json": json.New, + "json": json.New, + "cloud": cloud.New, // TODO: remove all of these "influxdb": func(params output.Params) (output.Output, error) { @@ -61,20 +60,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, influxc) - }, - "cloud": func(params output.Params) (output.Output, error) { - conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) - if err != nil { - return nil, err - } - cloudc, err := cloud.New( - params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version, - ) - if err != nil { - return nil, err - } - return newCollectorAdapter(params, cloudc) + return newCollectorAdapter(params, influxc), nil }, "kafka": func(params output.Params) (output.Output, error) { conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) @@ -85,7 +71,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, kafkac) + return newCollectorAdapter(params, kafkac), nil }, "statsd": func(params output.Params) (output.Output, error) { conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment) @@ -96,7 +82,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, statsdc) + return newCollectorAdapter(params, statsdc), nil }, "datadog": func(params output.Params) (output.Output, error) { conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment) @@ -107,7 +93,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, datadogc) + return newCollectorAdapter(params, datadogc), nil }, "csv": func(params output.Params) (output.Output, error) { conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) @@ -118,7 +104,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, csvc) + return newCollectorAdapter(params, csvc), nil }, } @@ -202,27 +188,12 @@ func parseOutputArgument(s string) (t, arg string) { // TODO: remove this after we transition every collector to the output interface -func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) { - // Check if all required tags are present - missingRequiredTags := []string{} - requiredTags := collector.GetRequiredSystemTags() - for _, tag := range stats.SystemTagSetValues() { - if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) { - missingRequiredTags = append(missingRequiredTags, tag.String()) - } - } - if len(missingRequiredTags) > 0 { - return nil, fmt.Errorf( - "the specified output '%s' needs the following system tags enabled: %s", - params.OutputType, strings.Join(missingRequiredTags, ", "), - ) - } - +func newCollectorAdapter(params output.Params, collector lib.Collector) output.Output { return &collectorAdapter{ outputType: params.OutputType, collector: collector, stopCh: make(chan struct{}), - }, nil + } } // collectorAdapter is a _temporary_ fix until we move all of the old @@ -259,15 +230,9 @@ func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) { ca.collector.Collect(samples) } -func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) { - ca.collector.SetRunStatus(latestStatus) -} - // Stop implements the new output interface. func (ca *collectorAdapter) Stop() error { ca.runCtxCancel() <-ca.stopCh return nil } - -var _ output.WithRunStatusUpdates = &collectorAdapter{} diff --git a/lib/collector.go b/lib/collector.go index 408f517002e..fbcf8dafb0d 100644 --- a/lib/collector.go +++ b/lib/collector.go @@ -67,10 +67,4 @@ type Collector interface { // Optionally return a link that is shown to the user. Link() string - - // Return the required system sample tags for the specific collector - GetRequiredSystemTags() stats.SystemTagSet - - // Set run status - SetRunStatus(status RunStatus) } diff --git a/stats/cloud/bench_test.go b/output/cloud/bench_test.go similarity index 87% rename from stats/cloud/bench_test.go rename to output/cloud/bench_test.go index bd125189ad8..253daa7c0fc 100644 --- a/stats/cloud/bench_test.go +++ b/output/cloud/bench_test.go @@ -23,6 +23,7 @@ package cloud import ( "bytes" "compress/gzip" + json "encoding/json" "fmt" "io" "io/ioutil" @@ -36,29 +37,28 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/netext/httpext" "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) func BenchmarkAggregateHTTP(b *testing.B) { - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - NoCompress: null.BoolFrom(true), - AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), - AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(b), + JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) now := time.Now() - collector.referenceID = "something" + out.referenceID = "something" containersCount := 500000 for _, tagCount := range []int{1, 5, 35, 315, 3645} { @@ -79,10 +79,10 @@ func BenchmarkAggregateHTTP(b *testing.B) { tags := generateTags(i, tagCount, map[string]string{"status": status}) container[i-1] = generateHTTPExtTrail(now, time.Duration(i), tags) } - collector.Collect(container) + out.AddMetricSamples(container) b.StartTimer() - collector.aggregateHTTPTrails(time.Millisecond * 200) - collector.bufferSamples = nil + out.aggregateHTTPTrails(time.Millisecond * 200) + out.bufferSamples = nil } }) } @@ -289,9 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags } func BenchmarkHTTPPush(b *testing.B) { - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } tb := httpmultibin.NewHTTPMultiBin(b) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, err := fmt.Fprint(w, `{ @@ -307,14 +304,22 @@ func BenchmarkHTTPPush(b *testing.B) { }, ) - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), - AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(b), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", + "noCompress": true, + "aggregationCalcInterval": "200ms", + "aggregationPeriod": "200ms" + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) - collector.referenceID = "fake" + out.referenceID = "fake" for _, count := range []int{1000, 5000, 50000, 100000, 250000} { count := count @@ -325,7 +330,7 @@ func BenchmarkHTTPPush(b *testing.B) { b.StopTimer() toSend := append([]*Sample{}, samples...) b.StartTimer() - require.NoError(b, collector.PushMetric("fake", false, toSend)) + require.NoError(b, out.PushMetric("fake", false, toSend)) } }) } diff --git a/stats/cloud/cloud_easyjson.go b/output/cloud/cloud_easyjson.go similarity index 100% rename from stats/cloud/cloud_easyjson.go rename to output/cloud/cloud_easyjson.go diff --git a/stats/cloud/data.go b/output/cloud/data.go similarity index 97% rename from stats/cloud/data.go rename to output/cloud/data.go index ee6ce33e8f8..807b742415c 100644 --- a/stats/cloud/data.go +++ b/output/cloud/data.go @@ -245,7 +245,9 @@ func (d durations) Less(i, j int) bool { return d[i] < d[j] } // Used when there are fewer samples in the bucket (so we can interpolate) // and for benchmark comparisons and verification of the quickselect // algorithm (it should return exactly the same results if interpolation isn't used). -func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float64, interpolate bool) (min, max time.Duration) { +func (d durations) SortGetNormalBounds( + radius, iqrLowerCoef, iqrUpperCoef float64, interpolate bool, +) (min, max time.Duration) { if len(d) == 0 { return } @@ -276,7 +278,7 @@ func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float6 min = q1 - time.Duration(iqrLowerCoef*iqr) // lower fence, anything below this is an outlier max = q3 + time.Duration(iqrUpperCoef*iqr) // upper fence, anything above this is an outlier - return + return min, max } // Reworked and translated in Go from: @@ -288,7 +290,7 @@ func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float6 // that only depends on the sort.Interface methods, but that would // probably introduce some performance overhead because of the // dynamic dispatch. -func (d durations) quickSelect(k int) time.Duration { +func (d durations) quickSelect(k int) time.Duration { //nolint:gocognit n := len(d) l := 0 ir := n - 1 diff --git a/stats/cloud/data_test.go b/output/cloud/data_test.go similarity index 94% rename from stats/cloud/data_test.go rename to output/cloud/data_test.go index 02c1c63535f..52697c091fb 100644 --- a/stats/cloud/data_test.go +++ b/output/cloud/data_test.go @@ -233,10 +233,10 @@ func TestMetricAggregation(t *testing.T) { // I've not used that after the initial tests because it's a big // external dependency that's not really needed for the tests at // this point. -func getDurations(count int, min, multiplier float64) durations { +func getDurations(r *rand.Rand, count int, min, multiplier float64) durations { data := make(durations, count) for j := 0; j < count; j++ { - data[j] = time.Duration(min + rand.Float64()*multiplier) + data[j] = time.Duration(min + r.Float64()*multiplier) //nolint:gosec } return data } @@ -246,13 +246,18 @@ func BenchmarkDurationBounds(b *testing.B) { iqrLowerCoef := 1.5 iqrUpperCoef := 1.5 + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + b.Logf("Random source seeded with %d\n", seed) + getData := func(b *testing.B, count int) durations { b.StopTimer() defer b.StartTimer() - return getDurations(count, 0.1*float64(time.Second), float64(time.Second)) + return getDurations(r, count, 0.1*float64(time.Second), float64(time.Second)) } for count := 100; count <= 5000; count += 500 { + count := count b.Run(fmt.Sprintf("Sort-no-interp-%d-elements", count), func(b *testing.B) { for i := 0; i < b.N; i++ { data := getData(b, count) @@ -276,11 +281,15 @@ func BenchmarkDurationBounds(b *testing.B) { func TestQuickSelectAndBounds(t *testing.T) { t.Parallel() + + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + t.Logf("Random source seeded with %d\n", seed) + mult := time.Millisecond - for _, count := range []int{1, 2, 3, 4, 5, 10, 15, 20, 25, 50, 100, 250 + rand.Intn(100)} { + for _, count := range []int{1, 2, 3, 4, 5, 10, 15, 20, 25, 50, 100, 250 + r.Intn(100)} { count := count t.Run(fmt.Sprintf("simple-%d", count), func(t *testing.T) { - t.Parallel() data := make(durations, count) for i := 0; i < count; i++ { data[i] = time.Duration(i) * mult @@ -289,13 +298,11 @@ func TestQuickSelectAndBounds(t *testing.T) { for i := 0; i < 10; i++ { dataCopy := make(durations, count) assert.Equal(t, count, copy(dataCopy, data)) - k := rand.Intn(count) + k := r.Intn(count) assert.Equal(t, dataCopy.quickSelect(k), time.Duration(k)*mult) } }) t.Run(fmt.Sprintf("random-%d", count), func(t *testing.T) { - t.Parallel() - testCases := []struct{ r, l, u float64 }{ {0.25, 1.5, 1.5}, // Textbook {0.25, 1.5, 1.3}, // Defaults @@ -305,7 +312,7 @@ func TestQuickSelectAndBounds(t *testing.T) { for tcNum, tc := range testCases { tc := tc - data := getDurations(count, 0.3*float64(time.Second), 2*float64(time.Second)) + data := getDurations(r, count, 0.3*float64(time.Second), 2*float64(time.Second)) dataForSort := make(durations, count) dataForSelect := make(durations, count) assert.Equal(t, count, copy(dataForSort, data)) @@ -313,13 +320,12 @@ func TestQuickSelectAndBounds(t *testing.T) { assert.Equal(t, dataForSort, dataForSelect) t.Run(fmt.Sprintf("bounds-tc%d", tcNum), func(t *testing.T) { - t.Parallel() sortMin, sortMax := dataForSort.SortGetNormalBounds(tc.r, tc.l, tc.u, false) selectMin, selectMax := dataForSelect.SelectGetNormalBounds(tc.r, tc.l, tc.u) assert.Equal(t, sortMin, selectMin) assert.Equal(t, sortMax, selectMax) - k := rand.Intn(count) + k := r.Intn(count) assert.Equal(t, dataForSort[k], dataForSelect.quickSelect(k)) assert.Equal(t, dataForSort[k], data.quickSelect(k)) }) diff --git a/stats/cloud/collector.go b/output/cloud/output.go similarity index 59% rename from stats/cloud/collector.go rename to output/cloud/output.go index a7c752e1ea6..96ff06e8be9 100644 --- a/stats/cloud/collector.go +++ b/output/cloud/output.go @@ -30,6 +30,7 @@ import ( "net/http" "path/filepath" "strconv" + "strings" "sync" "time" @@ -39,8 +40,10 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/cloudapi" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/consts" "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" "github.com/loadimpact/k6/lib/netext/httpext" @@ -50,8 +53,8 @@ import ( // TestName is the default Load Impact Cloud test name const TestName = "k6 test" -// Collector sends result data to the Load Impact cloud service. -type Collector struct { +// Output sends result data to the Load Impact cloud service. +type Output struct { config cloudapi.Config referenceID string @@ -61,7 +64,6 @@ type Collector struct { client *cloudapi.Client pushBufferPool sync.Pool - anonymous bool runStatus lib.RunStatus bufferMutex sync.Mutex @@ -81,27 +83,48 @@ type Collector struct { // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated aggrBuckets map[int64]map[[3]string]aggregationBucket - stopSendingMetricsCh chan struct{} + stopSendingMetrics chan struct{} + stopAggregation chan struct{} + aggregationDone *sync.WaitGroup + stopOutput chan struct{} + outputDone *sync.WaitGroup } -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} +// Verify that Output implements the wanted interfaces +var _ interface { + output.WithRunStatusUpdates + output.WithThresholds +} = &Output{} -// New creates a new cloud collector -func New( - logger logrus.FieldLogger, - conf cloudapi.Config, scriptURL fmt.Stringer, opts lib.Options, executionPlan []lib.ExecutionStep, version string, -) (*Collector, error) { - if err := cloudapi.MergeFromExternal(opts.External, &conf); err != nil { +// New creates a new cloud output. +func New(params output.Params) (output.Output, error) { + return newOutput(params) +} + +// New creates a new cloud output. +func newOutput(params output.Params) (*Output, error) { + conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { return nil, err } - if conf.AggregationPeriod.Duration > 0 && (opts.SystemTags.Has(stats.TagVU) || opts.SystemTags.Has(stats.TagIter)) { + if err := validateRequiredSystemTags(params.ScriptOptions.SystemTags); err != nil { + return nil, err + } + + logger := params.Logger.WithFields(logrus.Fields{"output": "cloud"}) + + if err := cloudapi.MergeFromExternal(params.ScriptOptions.External, &conf); err != nil { + return nil, err + } + + if conf.AggregationPeriod.Duration > 0 && + (params.ScriptOptions.SystemTags.Has(stats.TagVU) || params.ScriptOptions.SystemTags.Has(stats.TagIter)) { return nil, errors.New("aggregation cannot be enabled if the 'vu' or 'iter' system tag is also enabled") } if !conf.Name.Valid || conf.Name.String == "" { - scriptPath := scriptURL.String() + scriptPath := params.ScriptPath.String() if scriptPath == "" { // Script from stdin without a name, likely from stdin return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.ext.loadimpact.name") @@ -113,12 +136,7 @@ func New( conf.Name = null.StringFrom(TestName) } - thresholds := make(map[string][]*stats.Threshold) - for name, t := range opts.Thresholds { - thresholds[name] = append(thresholds[name], t.Thresholds...) - } - - duration, testEnds := lib.GetEndOffset(executionPlan) + duration, testEnds := lib.GetEndOffset(params.ExecutionPlan) if !testEnds { return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") } @@ -138,130 +156,179 @@ func New( conf.MaxMetricSamplesPerPackage.Int64) } - return &Collector{ - config: conf, - thresholds: thresholds, - client: cloudapi.NewClient(logger, conf.Token.String, conf.Host.String, version), - anonymous: !conf.Token.Valid, - executionPlan: executionPlan, - duration: int64(duration / time.Second), - opts: opts, - aggrBuckets: map[int64]map[[3]string]aggregationBucket{}, - stopSendingMetricsCh: make(chan struct{}), - logger: logger, + return &Output{ + config: conf, + client: cloudapi.NewClient(logger, conf.Token.String, conf.Host.String, consts.Version), + executionPlan: params.ExecutionPlan, + duration: int64(duration / time.Second), + opts: params.ScriptOptions, + aggrBuckets: map[int64]map[[3]string]aggregationBucket{}, + logger: logger, pushBufferPool: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, + stopSendingMetrics: make(chan struct{}), + stopAggregation: make(chan struct{}), + aggregationDone: &sync.WaitGroup{}, + stopOutput: make(chan struct{}), + outputDone: &sync.WaitGroup{}, }, nil } -// Init is called between the collector's creation and the call to Run(). -// You should do any lengthy setup here rather than in New. -func (c *Collector) Init() error { - if c.config.PushRefID.Valid { - c.referenceID = c.config.PushRefID.String - c.logger.WithField("referenceId", c.referenceID).Debug("Cloud: directly pushing metrics without init") +// validateRequiredSystemTags checks if all required tags are present. +func validateRequiredSystemTags(scriptTags *stats.SystemTagSet) error { + missingRequiredTags := []string{} + requiredTags := stats.TagName | stats.TagMethod | stats.TagStatus | stats.TagError | stats.TagCheck | stats.TagGroup + for _, tag := range stats.SystemTagSetValues() { + if requiredTags.Has(tag) && !scriptTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return fmt.Errorf( + "the cloud output needs the following system tags enabled: %s", + strings.Join(missingRequiredTags, ", "), + ) + } + return nil +} + +// Start calls the k6 Cloud API to initialize the test run, and then starts the +// goroutine that would listen for metric samples and send them to the cloud. +func (out *Output) Start() error { + if out.config.PushRefID.Valid { + out.referenceID = out.config.PushRefID.String + out.logger.WithField("referenceId", out.referenceID).Debug("directly pushing metrics without init") return nil } thresholds := make(map[string][]string) - for name, t := range c.thresholds { + for name, t := range out.thresholds { for _, threshold := range t { thresholds[name] = append(thresholds[name], threshold.Source) } } - maxVUs := lib.GetMaxPossibleVUs(c.executionPlan) + maxVUs := lib.GetMaxPossibleVUs(out.executionPlan) testRun := &cloudapi.TestRun{ - Name: c.config.Name.String, - ProjectID: c.config.ProjectID.Int64, + Name: out.config.Name.String, + ProjectID: out.config.ProjectID.Int64, VUsMax: int64(maxVUs), Thresholds: thresholds, - Duration: c.duration, + Duration: out.duration, } - response, err := c.client.CreateTestRun(testRun) + response, err := out.client.CreateTestRun(testRun) if err != nil { return err } - c.referenceID = response.ReferenceID + out.referenceID = response.ReferenceID if response.ConfigOverride != nil { - c.logger.WithFields(logrus.Fields{ + out.logger.WithFields(logrus.Fields{ "override": response.ConfigOverride, - }).Debug("Cloud: overriding config options") - c.config = c.config.Apply(*response.ConfigOverride) + }).Debug("overriding config options") + out.config = out.config.Apply(*response.ConfigOverride) } - c.logger.WithFields(logrus.Fields{ - "name": c.config.Name, - "projectId": c.config.ProjectID, - "duration": c.duration, - "referenceId": c.referenceID, - }).Debug("Cloud: Initialized") - return nil -} + out.startBackgroundProcesses() -// Link return a link that is shown to the user. -func (c *Collector) Link() string { - return cloudapi.URLForResults(c.referenceID, c.config) + out.logger.WithFields(logrus.Fields{ + "name": out.config.Name, + "projectId": out.config.ProjectID, + "duration": out.duration, + "referenceId": out.referenceID, + }).Debug("Started!") + return nil } -// Run is called in a goroutine and starts the collector. Should commit samples to the backend -// at regular intervals and when the context is terminated. -func (c *Collector) Run(ctx context.Context) { - wg := sync.WaitGroup{} - quit := ctx.Done() - aggregationPeriod := time.Duration(c.config.AggregationPeriod.Duration) +func (out *Output) startBackgroundProcesses() { + aggregationPeriod := time.Duration(out.config.AggregationPeriod.Duration) // If enabled, start periodically aggregating the collected HTTP trails if aggregationPeriod > 0 { - wg.Add(1) - aggregationTicker := time.NewTicker(aggregationPeriod) - aggregationWaitPeriod := time.Duration(c.config.AggregationWaitPeriod.Duration) - signalQuit := make(chan struct{}) - quit = signalQuit - + out.aggregationDone.Add(1) go func() { - defer wg.Done() + defer out.aggregationDone.Done() + aggregationWaitPeriod := time.Duration(out.config.AggregationWaitPeriod.Duration) + aggregationTicker := time.NewTicker(aggregationPeriod) + defer aggregationTicker.Stop() + for { select { - case <-c.stopSendingMetricsCh: + case <-out.stopSendingMetrics: return case <-aggregationTicker.C: - c.aggregateHTTPTrails(aggregationWaitPeriod) - case <-ctx.Done(): - c.aggregateHTTPTrails(0) - c.flushHTTPTrails() - close(signalQuit) + out.aggregateHTTPTrails(aggregationWaitPeriod) + case <-out.stopAggregation: + out.aggregateHTTPTrails(0) + out.flushHTTPTrails() return } } }() } - defer func() { - wg.Wait() - c.testFinished() + out.outputDone.Add(1) + go func() { + defer out.outputDone.Done() + pushTicker := time.NewTicker(time.Duration(out.config.MetricPushInterval.Duration)) + defer pushTicker.Stop() + for { + select { + case <-out.stopSendingMetrics: + return + default: + } + select { + case <-out.stopOutput: + out.pushMetrics() + return + case <-pushTicker.C: + out.pushMetrics() + } + } }() +} - pushTicker := time.NewTicker(time.Duration(c.config.MetricPushInterval.Duration)) - for { - select { - case <-c.stopSendingMetricsCh: - return - default: - } - select { - case <-quit: - c.pushMetrics() - return - case <-pushTicker.C: - c.pushMetrics() - } +// Stop gracefully stops all metric emission from the output and when all metric +// samples are emitted, it sends an API to the cloud to finish the test run. +func (out *Output) Stop() error { + out.logger.Debug("Stopping the cloud output...") + close(out.stopAggregation) + out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation + out.logger.Debug("Aggregation stopped, stopping metric emission...") + close(out.stopOutput) + out.outputDone.Wait() + out.logger.Debug("Metric emission stopped, calling cloud API...") + err := out.testFinished() + if err != nil { + out.logger.WithFields(logrus.Fields{"error": err}).Warn("Failed to send test finished to the cloud") + } else { + out.logger.Debug("Cloud output successfully stopped!") } + return err +} + +// Description returns the URL with the test run results. +func (out *Output) Description() string { + return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(out.referenceID, out.config)) +} + +// SetRunStatus receives the latest run status from the Engine. +func (out *Output) SetRunStatus(status lib.RunStatus) { + out.runStatus = status +} + +// SetThresholds receives the thresholds before the output is Start()-ed. +func (out *Output) SetThresholds(scriptThresholds map[string]stats.Thresholds) { + thresholds := make(map[string][]*stats.Threshold) + for name, t := range scriptThresholds { + thresholds[name] = append(thresholds[name], t.Thresholds...) + } + out.thresholds = thresholds } func useCloudTags(source *httpext.Trail) *httpext.Trail { @@ -282,16 +349,17 @@ func useCloudTags(source *httpext.Trail) *httpext.Trail { return dest } -// Collect receives a set of samples. This method is never called concurrently, and only while -// the context for Run() is valid, but should defer as much work as possible to Run(). -func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { +// AddMetricSamples receives a set of metric samples. This method is never +// called concurrently, so it defers as much of the work as possible to the +// asynchronous goroutines initialized in Start(). +func (out *Output) AddMetricSamples(sampleContainers []stats.SampleContainer) { select { - case <-c.stopSendingMetricsCh: + case <-out.stopSendingMetrics: return default: } - if c.referenceID == "" { + if out.referenceID == "" { return } @@ -303,7 +371,7 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { case *httpext.Trail: sc = useCloudTags(sc) // Check if aggregation is enabled, - if c.config.AggregationPeriod.Duration > 0 { + if out.config.AggregationPeriod.Duration > 0 { newHTTPTrails = append(newHTTPTrails, sc) } else { newSamples = append(newSamples, NewSampleFromTrail(sc)) @@ -346,21 +414,21 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { } if len(newSamples) > 0 || len(newHTTPTrails) > 0 { - c.bufferMutex.Lock() - c.bufferSamples = append(c.bufferSamples, newSamples...) - c.bufferHTTPTrails = append(c.bufferHTTPTrails, newHTTPTrails...) - c.bufferMutex.Unlock() + out.bufferMutex.Lock() + out.bufferSamples = append(out.bufferSamples, newSamples...) + out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) + out.bufferMutex.Unlock() } } //nolint:funlen,nestif,gocognit -func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { - c.bufferMutex.Lock() - newHTTPTrails := c.bufferHTTPTrails - c.bufferHTTPTrails = nil - c.bufferMutex.Unlock() +func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) { + out.bufferMutex.Lock() + newHTTPTrails := out.bufferHTTPTrails + out.bufferHTTPTrails = nil + out.bufferMutex.Unlock() - aggrPeriod := int64(c.config.AggregationPeriod.Duration) + aggrPeriod := int64(out.config.AggregationPeriod.Duration) // Distribute all newly buffered HTTP trails into buckets and sub-buckets @@ -372,10 +440,10 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { bucketID := trail.GetTime().UnixNano() / aggrPeriod // Get or create a time bucket for that trail period - bucket, ok := c.aggrBuckets[bucketID] + bucket, ok := out.aggrBuckets[bucketID] if !ok { bucket = make(map[[3]string]aggregationBucket) - c.aggrBuckets[bucketID] = bucket + out.aggrBuckets[bucketID] = bucket } subBucketKey[0], _ = trailTags.Get("name") subBucketKey[1], _ = trailTags.Get("group") @@ -403,13 +471,13 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { // Which buckets are still new and we'll wait for trails to accumulate before aggregating bucketCutoffID := time.Now().Add(-waitPeriod).UnixNano() / aggrPeriod - iqrRadius := c.config.AggregationOutlierIqrRadius.Float64 - iqrLowerCoef := c.config.AggregationOutlierIqrCoefLower.Float64 - iqrUpperCoef := c.config.AggregationOutlierIqrCoefUpper.Float64 + iqrRadius := out.config.AggregationOutlierIqrRadius.Float64 + iqrLowerCoef := out.config.AggregationOutlierIqrCoefLower.Float64 + iqrUpperCoef := out.config.AggregationOutlierIqrCoefUpper.Float64 newSamples := []*Sample{} // Handle all aggregation buckets older than bucketCutoffID - for bucketID, subBuckets := range c.aggrBuckets { + for bucketID, subBuckets := range out.aggrBuckets { if bucketID > bucketCutoffID { continue } @@ -418,7 +486,7 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { for tags, httpTrails := range subBucket { // start := time.Now() // this is in a combination with the log at the end trailCount := int64(len(httpTrails)) - if trailCount < c.config.AggregationMinSamples.Int64 { + if trailCount < out.config.AggregationMinSamples.Int64 { for _, trail := range httpTrails { newSamples = append(newSamples, NewSampleFromTrail(trail)) } @@ -431,7 +499,7 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { Tags: tags, } - if c.config.AggregationSkipOutlierDetection.Bool { + if out.config.AggregationSkipOutlierDetection.Bool { // Simply add up all HTTP trails, no outlier detection for _, trail := range httpTrails { aggrData.Add(trail) @@ -445,7 +513,7 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { } var minConnDur, maxConnDur, minReqDur, maxReqDur time.Duration - if trailCount < c.config.AggregationOutlierAlgoThreshold.Int64 { + if trailCount < out.config.AggregationOutlierAlgoThreshold.Int64 { // Since there are fewer samples, we'll use the interpolation-enabled and // more precise sorting-based algorithm minConnDur, maxConnDur = connDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) @@ -473,7 +541,7 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { if aggrData.Count > 0 { /* - c.logger.WithFields(logrus.Fields{ + out.logger.WithFields(logrus.Fields{ "http_samples": aggrData.Count, "ratio": fmt.Sprintf("%.2f", float64(aggrData.Count)/float64(trailCount)), "t": time.Since(start), @@ -487,25 +555,25 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { } } } - delete(c.aggrBuckets, bucketID) + delete(out.aggrBuckets, bucketID) } if len(newSamples) > 0 { - c.bufferMutex.Lock() - c.bufferSamples = append(c.bufferSamples, newSamples...) - c.bufferMutex.Unlock() + out.bufferMutex.Lock() + out.bufferSamples = append(out.bufferSamples, newSamples...) + out.bufferMutex.Unlock() } } -func (c *Collector) flushHTTPTrails() { - c.bufferMutex.Lock() - defer c.bufferMutex.Unlock() +func (out *Output) flushHTTPTrails() { + out.bufferMutex.Lock() + defer out.bufferMutex.Unlock() newSamples := []*Sample{} - for _, trail := range c.bufferHTTPTrails { + for _, trail := range out.bufferHTTPTrails { newSamples = append(newSamples, NewSampleFromTrail(trail)) } - for _, bucket := range c.aggrBuckets { + for _, bucket := range out.aggrBuckets { for _, subBucket := range bucket { for _, trails := range subBucket { for _, trail := range trails { @@ -515,12 +583,12 @@ func (c *Collector) flushHTTPTrails() { } } - c.bufferHTTPTrails = nil - c.aggrBuckets = map[int64]map[[3]string]aggregationBucket{} - c.bufferSamples = append(c.bufferSamples, newSamples...) + out.bufferHTTPTrails = nil + out.aggrBuckets = map[int64]map[[3]string]aggregationBucket{} + out.bufferSamples = append(out.bufferSamples, newSamples...) } -func (c *Collector) shouldStopSendingMetrics(err error) bool { +func (out *Output) shouldStopSendingMetrics(err error) bool { if err == nil { return false } @@ -546,24 +614,24 @@ func ceilDiv(a, b int) int { return r } -func (c *Collector) pushMetrics() { - c.bufferMutex.Lock() - if len(c.bufferSamples) == 0 { - c.bufferMutex.Unlock() +func (out *Output) pushMetrics() { + out.bufferMutex.Lock() + if len(out.bufferSamples) == 0 { + out.bufferMutex.Unlock() return } - buffer := c.bufferSamples - c.bufferSamples = nil - c.bufferMutex.Unlock() + buffer := out.bufferSamples + out.bufferSamples = nil + out.bufferMutex.Unlock() count := len(buffer) - c.logger.WithFields(logrus.Fields{ + out.logger.WithFields(logrus.Fields{ "samples": count, }).Debug("Pushing metrics to cloud") start := time.Now() - numberOfPackages := ceilDiv(len(buffer), int(c.config.MaxMetricSamplesPerPackage.Int64)) - numberOfWorkers := int(c.config.MetricPushConcurrency.Int64) + numberOfPackages := ceilDiv(len(buffer), int(out.config.MaxMetricSamplesPerPackage.Int64)) + numberOfWorkers := int(out.config.MetricPushConcurrency.Int64) if numberOfWorkers > numberOfPackages { numberOfWorkers = numberOfPackages } @@ -572,9 +640,9 @@ func (c *Collector) pushMetrics() { for i := 0; i < numberOfWorkers; i++ { go func() { for job := range ch { - err := c.PushMetric(c.referenceID, c.config.NoCompress.Bool, job.samples) + err := out.PushMetric(out.referenceID, out.config.NoCompress.Bool, job.samples) job.done <- err - if c.shouldStopSendingMetrics(err) { + if out.shouldStopSendingMetrics(err) { return } } @@ -585,8 +653,8 @@ func (c *Collector) pushMetrics() { for len(buffer) > 0 { size := len(buffer) - if size > int(c.config.MaxMetricSamplesPerPackage.Int64) { - size = int(c.config.MaxMetricSamplesPerPackage.Int64) + if size > int(out.config.MaxMetricSamplesPerPackage.Int64) { + size = int(out.config.MaxMetricSamplesPerPackage.Int64) } job := pushJob{done: make(chan error, 1), samples: buffer[:size]} ch <- job @@ -599,28 +667,28 @@ func (c *Collector) pushMetrics() { for _, job := range jobs { err := <-job.done if err != nil { - if c.shouldStopSendingMetrics(err) { - c.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") - close(c.stopSendingMetricsCh) + if out.shouldStopSendingMetrics(err) { + out.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") + close(out.stopSendingMetrics) break } - c.logger.WithError(err).Warn("Failed to send metrics to cloud") + out.logger.WithError(err).Warn("Failed to send metrics to cloud") } } - c.logger.WithFields(logrus.Fields{ + out.logger.WithFields(logrus.Fields{ "samples": count, "t": time.Since(start), }).Debug("Pushing metrics to cloud finished") } -func (c *Collector) testFinished() { - if c.referenceID == "" || c.config.PushRefID.Valid { - return +func (out *Output) testFinished() error { + if out.referenceID == "" || out.config.PushRefID.Valid { + return nil } testTainted := false thresholdResults := make(cloudapi.ThresholdResult) - for name, thresholds := range c.thresholds { + for name, thresholds := range out.thresholds { thresholdResults[name] = make(map[string]bool) for _, t := range thresholds { thresholdResults[name][t.Source] = t.LastFailed @@ -630,40 +698,25 @@ func (c *Collector) testFinished() { } } - c.logger.WithFields(logrus.Fields{ - "ref": c.referenceID, + out.logger.WithFields(logrus.Fields{ + "ref": out.referenceID, "tainted": testTainted, }).Debug("Sending test finished") runStatus := lib.RunStatusFinished - if c.runStatus != lib.RunStatusQueued { - runStatus = c.runStatus + if out.runStatus != lib.RunStatusQueued { + runStatus = out.runStatus } - err := c.client.TestFinished(c.referenceID, thresholdResults, testTainted, runStatus) - if err != nil { - c.logger.WithFields(logrus.Fields{ - "error": err, - }).Warn("Failed to send test finished to cloud") - } -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.TagName | stats.TagMethod | stats.TagStatus | stats.TagError | stats.TagCheck | stats.TagGroup -} - -// SetRunStatus Set run status -func (c *Collector) SetRunStatus(status lib.RunStatus) { - c.runStatus = status + return out.client.TestFinished(out.referenceID, thresholdResults, testTainted, runStatus) } const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate // PushMetric pushes the provided metric samples for the given referenceID -func (c *Collector) PushMetric(referenceID string, noCompress bool, s []*Sample) error { +func (out *Output) PushMetric(referenceID string, noCompress bool, s []*Sample) error { start := time.Now() - url := fmt.Sprintf("%s/v1/metrics/%s", c.config.Host.String, referenceID) + url := fmt.Sprintf("%s/v1/metrics/%s", out.config.Host.String, referenceID) jsonStart := time.Now() b, err := easyjson.Marshal(samples(s)) @@ -682,9 +735,9 @@ func (c *Collector) PushMetric(referenceID string, noCompress bool, s []*Sample) var additionalFields logrus.Fields if !noCompress { - buf := c.pushBufferPool.Get().(*bytes.Buffer) + buf := out.pushBufferPool.Get().(*bytes.Buffer) buf.Reset() - defer c.pushBufferPool.Put(buf) + defer out.pushBufferPool.Put(buf) unzippedSize := len(b) buf.Grow(unzippedSize / expectedGzipRatio) gzipStart := time.Now() @@ -717,9 +770,9 @@ func (c *Collector) PushMetric(referenceID string, noCompress bool, s []*Sample) return ioutil.NopCloser(bytes.NewReader(b)), nil } - err = c.client.Do(req, nil) + err = out.client.Do(req, nil) - c.logger.WithFields(logrus.Fields{ + out.logger.WithFields(logrus.Fields{ "t": time.Since(start), "json_t": jsonTime, "part_size": len(s), diff --git a/stats/cloud/collector_test.go b/output/cloud/output_test.go similarity index 66% rename from stats/cloud/collector_test.go rename to output/cloud/output_test.go index c8b65b936c0..5bbd9b9ead6 100644 --- a/stats/cloud/collector_test.go +++ b/output/cloud/output_test.go @@ -17,13 +17,11 @@ * along with this program. If not, see . * */ - package cloud import ( "bytes" "compress/gzip" - "context" "encoding/json" "fmt" "io" @@ -40,7 +38,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" @@ -50,6 +47,7 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) @@ -128,8 +126,8 @@ func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc } } -func skewTrail(t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { - coef := minCoef + rand.Float64()*(maxCoef-minCoef) +func skewTrail(r *rand.Rand, t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { + coef := minCoef + r.Float64()*(maxCoef-minCoef) addJitter := func(d *time.Duration) { *d = time.Duration(float64(*d) * coef) } @@ -144,13 +142,13 @@ func skewTrail(t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { return t } -func TestCloudCollector(t *testing.T) { +func TestCloudOutput(t *testing.T) { t.Parallel() getTestRunner := func(minSamples int) func(t *testing.T) { return func(t *testing.T) { t.Parallel() - runCloudCollectorTestCase(t, minSamples) + runCloudOutputTestCase(t, minSamples) } } @@ -159,7 +157,11 @@ func TestCloudCollector(t *testing.T) { } } -func runCloudCollectorTestCase(t *testing.T, minSamples int) { +func runCloudOutputTestCase(t *testing.T, minSamples int) { + seed := time.Now().UnixNano() + r := rand.New(rand.NewSource(seed)) //nolint:gosec + t.Logf("Random source seeded with %d\n", seed) + tb := httpmultibin.NewHTTPMultiBin(t) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, err := fmt.Fprintf(w, `{ @@ -176,33 +178,33 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { })) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) - assert.True(t, collector.config.Host.Valid) - assert.Equal(t, tb.ServerHTTP.URL, collector.config.Host.String) - assert.True(t, collector.config.NoCompress.Valid) - assert.True(t, collector.config.NoCompress.Bool) - assert.False(t, collector.config.MetricPushInterval.Valid) - assert.False(t, collector.config.AggregationPeriod.Valid) - assert.False(t, collector.config.AggregationWaitPeriod.Valid) - - require.NoError(t, collector.Init()) - assert.Equal(t, "123", collector.referenceID) - assert.True(t, collector.config.MetricPushInterval.Valid) - assert.Equal(t, types.Duration(10*time.Millisecond), collector.config.MetricPushInterval.Duration) - assert.True(t, collector.config.AggregationPeriod.Valid) - assert.Equal(t, types.Duration(30*time.Millisecond), collector.config.AggregationPeriod.Duration) - assert.True(t, collector.config.AggregationWaitPeriod.Valid) - assert.Equal(t, types.Duration(5*time.Millisecond), collector.config.AggregationWaitPeriod.Duration) + assert.True(t, out.config.Host.Valid) + assert.Equal(t, tb.ServerHTTP.URL, out.config.Host.String) + assert.True(t, out.config.NoCompress.Valid) + assert.True(t, out.config.NoCompress.Bool) + assert.False(t, out.config.MetricPushInterval.Valid) + assert.False(t, out.config.AggregationPeriod.Valid) + assert.False(t, out.config.AggregationWaitPeriod.Valid) + + require.NoError(t, out.Start()) + assert.Equal(t, "123", out.referenceID) + assert.True(t, out.config.MetricPushInterval.Valid) + assert.Equal(t, types.Duration(10*time.Millisecond), out.config.MetricPushInterval.Duration) + assert.True(t, out.config.AggregationPeriod.Valid) + assert.Equal(t, types.Duration(30*time.Millisecond), out.config.AggregationPeriod.Duration) + assert.True(t, out.config.AggregationWaitPeriod.Valid) + assert.Equal(t, types.Duration(5*time.Millisecond), out.config.AggregationWaitPeriod.Duration) now := time.Now() tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "url"} @@ -213,20 +215,12 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { expSamples := make(chan []Sample) defer close(expSamples) - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples)) - tb.Mux.HandleFunc(fmt.Sprintf("/v1/tests/%s", collector.referenceID), func(rw http.ResponseWriter, _ *http.Request) { + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) + tb.Mux.HandleFunc(fmt.Sprintf("/v1/tests/%s", out.referenceID), func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) // silence a test warning }) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - - collector.Collect([]stats.SampleContainer{stats.Sample{ + out.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: tags, @@ -256,15 +250,15 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { Duration: 1500 * time.Millisecond, Tags: tags, } - collector.Collect([]stats.SampleContainer{&simpleTrail}) + out.AddMetricSamples([]stats.SampleContainer{&simpleTrail}) expSamples <- []Sample{*NewSampleFromTrail(&simpleTrail)} smallSkew := 0.02 trails := []stats.SampleContainer{} durations := make([]time.Duration, len(trails)) - for i := int64(0); i < collector.config.AggregationMinSamples.Int64; i++ { - similarTrail := skewTrail(simpleTrail, 1.0, 1.0+smallSkew) + for i := int64(0); i < out.config.AggregationMinSamples.Int64; i++ { + similarTrail := skewTrail(r, simpleTrail, 1.0, 1.0+smallSkew) trails = append(trails, &similarTrail) durations = append(durations, similarTrail.Duration) } @@ -279,9 +273,9 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { assert.InEpsilon(t, normal, stats.ToD(aggr.Max), smallSkew) } - outlierTrail := skewTrail(simpleTrail, 2.0+smallSkew, 3.0+smallSkew) + outlierTrail := skewTrail(r, simpleTrail, 2.0+smallSkew, 3.0+smallSkew) trails = append(trails, &outlierTrail) - collector.Collect(trails) + out.AddMetricSamples(trails) expSamples <- []Sample{ *NewSampleFromTrail(&outlierTrail), { @@ -291,9 +285,9 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { aggrData, ok := data.(*SampleDataAggregatedHTTPReqs) assert.True(t, ok) assert.True(t, aggrData.Tags.IsEqual(expectedTags)) - assert.Equal(t, collector.config.AggregationMinSamples.Int64, int64(aggrData.Count)) + assert.Equal(t, out.config.AggregationMinSamples.Int64, int64(aggrData.Count)) assert.Equal(t, "aggregated_trend", aggrData.Type) - assert.InDelta(t, now.UnixNano(), aggrData.Time*1000, float64(collector.config.AggregationPeriod.Duration)) + assert.InDelta(t, now.UnixNano(), aggrData.Time*1000, float64(out.config.AggregationPeriod.Duration)) checkAggrMetric(simpleTrail.Duration, aggrData.Values.Duration) checkAggrMetric(simpleTrail.Blocked, aggrData.Values.Blocked) @@ -306,11 +300,10 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { }, } - cancel() - wg.Wait() + require.NoError(t, out.Stop()) } -func TestCloudCollectorMaxPerPacket(t *testing.T) { +func TestCloudOutputMaxPerPacket(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) maxMetricSamplesPerPackage := 20 @@ -327,24 +320,26 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { }`, maxMetricSamplesPerPackage) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/12", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") + require.NoError(t, err) require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) gotTheLimit := false var m sync.Mutex - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), func(_ http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -358,16 +353,9 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { } }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, out.Start()) - collector.Collect([]stats.SampleContainer{stats.Sample{ + out.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), @@ -390,15 +378,14 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { Tags: stats.NewSampleTags(tags.CloneTags()), }) } - collector.Collect(container) + out.AddMetricSamples(container) } - cancel() - wg.Wait() + require.NoError(t, out.Stop()) require.True(t, gotTheLimit) } -func TestCloudCollectorStopSendingMetric(t *testing.T) { +func TestCloudOutputStopSendingMetric(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { @@ -421,26 +408,32 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { }`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/12", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), - MaxMetricSamplesPerPackage: null.IntFrom(50), - Name: null.StringFrom("my-custom-name"), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50, + "name": "something-that-should-be-overwritten" + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + External: map[string]json.RawMessage{ + "loadimpact": json.RawMessage(`{"name": "my-custom-name"}`), + }, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) count := 1 max := 5 - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), func(w http.ResponseWriter, r *http.Request) { count++ if count == max { @@ -464,16 +457,9 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { assert.NoError(t, json.Unmarshal(body, &receivedSamples)) }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, out.Start()) - collector.Collect([]stats.SampleContainer{stats.Sample{ + out.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), @@ -496,49 +482,48 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { Tags: stats.NewSampleTags(tags.CloneTags()), }) } - collector.Collect(container) + out.AddMetricSamples(container) } - cancel() - wg.Wait() - require.Equal(t, lib.RunStatusQueued, collector.runStatus) + require.NoError(t, out.Stop()) + + require.Equal(t, lib.RunStatusQueued, out.runStatus) select { - case <-collector.stopSendingMetricsCh: + case <-out.stopSendingMetrics: // all is fine default: t.Fatal("sending metrics wasn't stopped") } require.Equal(t, max, count) - nBufferSamples := len(collector.bufferSamples) - nBufferHTTPTrails := len(collector.bufferHTTPTrails) - collector.Collect([]stats.SampleContainer{stats.Sample{ + nBufferSamples := len(out.bufferSamples) + nBufferHTTPTrails := len(out.bufferHTTPTrails) + out.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), Value: 1.0, }}) - if nBufferSamples != len(collector.bufferSamples) || nBufferHTTPTrails != len(collector.bufferHTTPTrails) { - t.Errorf("Collector still collects data after stop sending metrics") + if nBufferSamples != len(out.bufferSamples) || nBufferHTTPTrails != len(out.bufferHTTPTrails) { + t.Errorf("Output still collects data after stop sending metrics") } } -func TestCloudCollectorRequireScriptName(t *testing.T) { +func TestCloudOutputRequireScriptName(t *testing.T) { t.Parallel() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - NoCompress: null.BoolFrom(true), - MaxMetricSamplesPerPackage: null.IntFrom(50), + _, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: ""}, }) - _, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.Error(t, err) assert.Contains(t, err.Error(), "script name not set") } -func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { +func TestCloudOutputAggregationPeriodZeroNoBlock(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -553,54 +538,49 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { }`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/123", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50 + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) - assert.True(t, collector.config.Host.Valid) - assert.Equal(t, tb.ServerHTTP.URL, collector.config.Host.String) - assert.True(t, collector.config.NoCompress.Valid) - assert.True(t, collector.config.NoCompress.Bool) - assert.False(t, collector.config.MetricPushInterval.Valid) - assert.False(t, collector.config.AggregationPeriod.Valid) - assert.False(t, collector.config.AggregationWaitPeriod.Valid) - - require.NoError(t, collector.Init()) - assert.Equal(t, "123", collector.referenceID) - assert.True(t, collector.config.MetricPushInterval.Valid) - assert.Equal(t, types.Duration(10*time.Millisecond), collector.config.MetricPushInterval.Duration) - assert.True(t, collector.config.AggregationPeriod.Valid) - assert.Equal(t, types.Duration(0), collector.config.AggregationPeriod.Duration) - assert.True(t, collector.config.AggregationWaitPeriod.Valid) - assert.Equal(t, types.Duration(5*time.Millisecond), collector.config.AggregationWaitPeriod.Duration) + assert.True(t, out.config.Host.Valid) + assert.Equal(t, tb.ServerHTTP.URL, out.config.Host.String) + assert.True(t, out.config.NoCompress.Valid) + assert.True(t, out.config.NoCompress.Bool) + assert.False(t, out.config.MetricPushInterval.Valid) + assert.False(t, out.config.AggregationPeriod.Valid) + assert.False(t, out.config.AggregationWaitPeriod.Valid) + + require.NoError(t, out.Start()) + assert.Equal(t, "123", out.referenceID) + assert.True(t, out.config.MetricPushInterval.Valid) + assert.Equal(t, types.Duration(10*time.Millisecond), out.config.MetricPushInterval.Duration) + assert.True(t, out.config.AggregationPeriod.Valid) + assert.Equal(t, types.Duration(0), out.config.AggregationPeriod.Duration) + assert.True(t, out.config.AggregationWaitPeriod.Valid) + assert.Equal(t, types.Duration(5*time.Millisecond), out.config.AggregationWaitPeriod.Duration) expSamples := make(chan []Sample) defer close(expSamples) - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples)) - - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - - cancel() - wg.Wait() - require.Equal(t, lib.RunStatusQueued, collector.runStatus) + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) + + require.NoError(t, out.Stop()) + require.Equal(t, lib.RunStatusQueued, out.runStatus) } -func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { +func TestCloudOutputRecvIterLIAllIterations(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { @@ -614,17 +594,21 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { _, err = fmt.Fprintf(resp, `{"reference_id": "123"}`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/123", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50 + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "path/to/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "path/to/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) gotIterations := false @@ -636,7 +620,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { "iterations": 1, } - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), func(_ http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -656,14 +640,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { m.Unlock() }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, out.Start()) now := time.Now() simpleNetTrail := netext.NetTrail{ @@ -691,10 +668,8 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { }, } - collector.Collect([]stats.SampleContainer{&simpleNetTrail}) - - cancel() - wg.Wait() + out.AddMetricSamples([]stats.SampleContainer{&simpleNetTrail}) + require.NoError(t, out.Stop()) require.True(t, gotIterations) } @@ -734,11 +709,16 @@ func TestNewName(t *testing.T) { testCase := testCase t.Run(testCase.url.String(), func(t *testing.T) { - collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), testCase.url, lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - }, []lib.ExecutionStep{}, "1.0") + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: testCase.url, + }) require.NoError(t, err) - require.Equal(t, collector.config.Name.String, testCase.expected) + require.Equal(t, out.config.Name.String, testCase.expected) }) } } @@ -767,14 +747,15 @@ func TestPublishMetric(t *testing.T) { })) defer server.Close() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(server.URL), - NoCompress: null.BoolFrom(true), + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, server.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) samples := []*Sample{ @@ -788,7 +769,7 @@ func TestPublishMetric(t *testing.T) { }, }, } - err = collector.PushMetric("1", false, samples) + err = out.PushMetric("1", false, samples) assert.Nil(t, err) } diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 31431532d7d..7506e9fd12b 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -130,9 +130,6 @@ func (c *Collector) Init() error { return nil } -// SetRunStatus does nothing -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - // Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { ticker := time.NewTicker(c.saveInterval) @@ -245,8 +242,3 @@ func IsStringInSlice(slice []string, str string) bool { } return true } - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 6044b5d3789..f4a1e36f10c 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -462,18 +462,6 @@ func TestNew(t *testing.T) { } } -func TestGetRequiredSystemTags(t *testing.T) { - collector, err := New( - testutils.NewLogger(t), - afero.NewMemMapFs(), - stats.TagSet{"tag1": true, "tag2": false, "tag3": true}, - Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, - ) - assert.NoError(t, err) - assert.NotNil(t, collector) - assert.Equal(t, stats.SystemTagSet(0), collector.GetRequiredSystemTags()) -} - func TestLink(t *testing.T) { collector, err := New( testutils.NewLogger(t), diff --git a/stats/influxdb/collector.go b/stats/influxdb/collector.go index 13c1ffdbb8e..56f6d7f6a0a 100644 --- a/stats/influxdb/collector.go +++ b/stats/influxdb/collector.go @@ -235,11 +235,3 @@ func (c *Collector) Format(samples []stats.Sample) ([]string, error) { return metrics, nil } - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus does nothing in the InfluxDB collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go index db402cf553a..18ac9c47f01 100644 --- a/stats/kafka/collector.go +++ b/stats/kafka/collector.go @@ -29,7 +29,6 @@ import ( "github.com/Shopify/sarama" "github.com/sirupsen/logrus" - "github.com/loadimpact/k6/lib" jsono "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/influxdb" @@ -101,14 +100,6 @@ func (c *Collector) Link() string { return "" } -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus does nothing in the Kafka collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) { var metrics []string diff --git a/stats/statsd/common/collector.go b/stats/statsd/common/collector.go index 6abde5f1a60..5d307c3d91b 100644 --- a/stats/statsd/common/collector.go +++ b/stats/statsd/common/collector.go @@ -110,14 +110,6 @@ func (c *Collector) Run(ctx context.Context) { } } -// GetRequiredSystemTags Return the required system sample tags for the specific collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // no tags are required -} - -// SetRunStatus does nothing in statsd collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - // Collect metrics func (c *Collector) Collect(containers []stats.SampleContainer) { var pointSamples []*Sample diff --git a/stats/statsd/common/collector_test.go b/stats/statsd/common/collector_test.go index 53b0423ccdf..78469a67587 100644 --- a/stats/statsd/common/collector_test.go +++ b/stats/statsd/common/collector_test.go @@ -28,7 +28,6 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/stats" ) type config struct { @@ -84,8 +83,3 @@ func TestLinkReturnAddress(t *testing.T) { } require.Equal(t, bogusValue, c.Link()) } - -func TestGetRequiredSystemTags(t *testing.T) { - c := &Collector{} - require.Equal(t, stats.SystemTagSet(0), c.GetRequiredSystemTags()) -}