diff --git a/cloudapi/util.go b/cloudapi/util.go index 9afbc97c06b..70d16203b58 100644 --- a/cloudapi/util.go +++ b/cloudapi/util.go @@ -22,10 +22,7 @@ package cloudapi import "fmt" +// URLForResults returns the cloud URL with the test run results. func URLForResults(refID string, config Config) string { - path := "runs" - if config.Token.String == "" { - path = "anonymous" - } - return fmt.Sprintf("%s/%s/%s", config.WebAppURL.String, path, refID) + return fmt.Sprintf("%s/runs/%s", config.WebAppURL.String, refID) } diff --git a/cmd/outputs.go b/cmd/outputs.go index 02fb2ed9de6..25950fc6b56 100644 --- a/cmd/outputs.go +++ b/cmd/outputs.go @@ -29,14 +29,12 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" - "github.com/loadimpact/k6/lib/consts" "github.com/loadimpact/k6/loader" "github.com/loadimpact/k6/output" + "github.com/loadimpact/k6/output/cloud" "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/stats/cloud" "github.com/loadimpact/k6/stats/csv" "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" @@ -49,7 +47,8 @@ import ( func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) { // Start with the built-in outputs result := map[string]func(output.Params) (output.Output, error){ - "json": json.New, + "json": json.New, + "cloud": cloud.New, // TODO: remove all of these "influxdb": func(params output.Params) (output.Output, error) { @@ -61,20 +60,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, influxc) - }, - "cloud": func(params output.Params) (output.Output, error) { - conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) - if err != nil { - return nil, err - } - cloudc, err := cloud.New( - params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version, - ) - if err != nil { - return nil, err - } - return newCollectorAdapter(params, cloudc) + return newCollectorAdapter(params, influxc), nil }, "kafka": func(params output.Params) (output.Output, error) { conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) @@ -85,7 +71,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, kafkac) + return newCollectorAdapter(params, kafkac), nil }, "statsd": func(params output.Params) (output.Output, error) { conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment) @@ -96,7 +82,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, statsdc) + return newCollectorAdapter(params, statsdc), nil }, "datadog": func(params output.Params) (output.Output, error) { conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment) @@ -107,7 +93,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, datadogc) + return newCollectorAdapter(params, datadogc), nil }, "csv": func(params output.Params) (output.Output, error) { conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) @@ -118,7 +104,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, if err != nil { return nil, err } - return newCollectorAdapter(params, csvc) + return newCollectorAdapter(params, csvc), nil }, } @@ -202,27 +188,12 @@ func parseOutputArgument(s string) (t, arg string) { // TODO: remove this after we transition every collector to the output interface -func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) { - // Check if all required tags are present - missingRequiredTags := []string{} - requiredTags := collector.GetRequiredSystemTags() - for _, tag := range stats.SystemTagSetValues() { - if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) { - missingRequiredTags = append(missingRequiredTags, tag.String()) - } - } - if len(missingRequiredTags) > 0 { - return nil, fmt.Errorf( - "the specified output '%s' needs the following system tags enabled: %s", - params.OutputType, strings.Join(missingRequiredTags, ", "), - ) - } - +func newCollectorAdapter(params output.Params, collector lib.Collector) output.Output { return &collectorAdapter{ outputType: params.OutputType, collector: collector, stopCh: make(chan struct{}), - }, nil + } } // collectorAdapter is a _temporary_ fix until we move all of the old @@ -259,15 +230,9 @@ func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) { ca.collector.Collect(samples) } -func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) { - ca.collector.SetRunStatus(latestStatus) -} - // Stop implements the new output interface. func (ca *collectorAdapter) Stop() error { ca.runCtxCancel() <-ca.stopCh return nil } - -var _ output.WithRunStatusUpdates = &collectorAdapter{} diff --git a/lib/collector.go b/lib/collector.go index 408f517002e..fbcf8dafb0d 100644 --- a/lib/collector.go +++ b/lib/collector.go @@ -67,10 +67,4 @@ type Collector interface { // Optionally return a link that is shown to the user. Link() string - - // Return the required system sample tags for the specific collector - GetRequiredSystemTags() stats.SystemTagSet - - // Set run status - SetRunStatus(status RunStatus) } diff --git a/stats/cloud/bench_test.go b/output/cloud/bench_test.go similarity index 88% rename from stats/cloud/bench_test.go rename to output/cloud/bench_test.go index 27a674feb46..5e1c23c0ec5 100644 --- a/stats/cloud/bench_test.go +++ b/output/cloud/bench_test.go @@ -23,6 +23,7 @@ package cloud import ( "bytes" "compress/gzip" + json "encoding/json" "fmt" "io" "io/ioutil" @@ -34,28 +35,26 @@ import ( "github.com/mailru/easyjson" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" - "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/netext/httpext" "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) func BenchmarkAggregateHTTP(b *testing.B) { - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - NoCompress: null.BoolFrom(true), - AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), - AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(b), + JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) now := time.Now() collector.referenceID = "something" @@ -79,7 +78,7 @@ func BenchmarkAggregateHTTP(b *testing.B) { tags := generateTags(i, tagCount, map[string]string{"status": status}) container[i-1] = generateHTTPExtTrail(now, time.Duration(i), tags) } - collector.Collect(container) + collector.AddMetricSamples(container) b.StartTimer() collector.aggregateHTTPTrails(time.Millisecond * 200) collector.bufferSamples = nil @@ -289,9 +288,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags } func BenchmarkHTTPPush(b *testing.B) { - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } tb := httpmultibin.NewHTTPMultiBin(b) tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, err := fmt.Fprint(w, `{ @@ -307,12 +303,20 @@ func BenchmarkHTTPPush(b *testing.B) { }, ) - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200), - AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(b), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", + "noCompress": true, + "aggregationCalcInterval": "200ms", + "aggregationPeriod": "200ms" + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(b, err) collector.referenceID = "fake" diff --git a/stats/cloud/cloud_easyjson.go b/output/cloud/cloud_easyjson.go similarity index 100% rename from stats/cloud/cloud_easyjson.go rename to output/cloud/cloud_easyjson.go diff --git a/stats/cloud/data.go b/output/cloud/data.go similarity index 100% rename from stats/cloud/data.go rename to output/cloud/data.go diff --git a/stats/cloud/data_test.go b/output/cloud/data_test.go similarity index 100% rename from stats/cloud/data_test.go rename to output/cloud/data_test.go diff --git a/stats/cloud/collector.go b/output/cloud/output.go similarity index 76% rename from stats/cloud/collector.go rename to output/cloud/output.go index a7c752e1ea6..ba5f0020328 100644 --- a/stats/cloud/collector.go +++ b/output/cloud/output.go @@ -30,6 +30,7 @@ import ( "net/http" "path/filepath" "strconv" + "strings" "sync" "time" @@ -39,8 +40,10 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/cloudapi" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/consts" "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" "github.com/loadimpact/k6/lib/netext/httpext" @@ -61,7 +64,6 @@ type Collector struct { client *cloudapi.Client pushBufferPool sync.Pool - anonymous bool runStatus lib.RunStatus bufferMutex sync.Mutex @@ -81,27 +83,48 @@ type Collector struct { // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated aggrBuckets map[int64]map[[3]string]aggregationBucket - stopSendingMetricsCh chan struct{} + stopSendingMetrics chan struct{} + stopAggregation chan struct{} + aggregationDone *sync.WaitGroup + stopOutput chan struct{} + outputDone *sync.WaitGroup } -// Verify that Collector implements lib.Collector -var _ lib.Collector = &Collector{} +// Verify that Output implements the wanted interfaces +var _ interface { + output.WithRunStatusUpdates + output.WithThresholds +} = &Collector{} -// New creates a new cloud collector -func New( - logger logrus.FieldLogger, - conf cloudapi.Config, scriptURL fmt.Stringer, opts lib.Options, executionPlan []lib.ExecutionStep, version string, -) (*Collector, error) { - if err := cloudapi.MergeFromExternal(opts.External, &conf); err != nil { +// New creates a new cloud collector. +func New(params output.Params) (output.Output, error) { + return newOutput(params) +} + +// New creates a new cloud collector. +func newOutput(params output.Params) (*Collector, error) { + conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) + if err != nil { + return nil, err + } + + if err := validateRequiredSystemTags(params.ScriptOptions.SystemTags); err != nil { + return nil, err + } + + logger := params.Logger.WithFields(logrus.Fields{"output": "cloud"}) + + if err := cloudapi.MergeFromExternal(params.ScriptOptions.External, &conf); err != nil { return nil, err } - if conf.AggregationPeriod.Duration > 0 && (opts.SystemTags.Has(stats.TagVU) || opts.SystemTags.Has(stats.TagIter)) { + if conf.AggregationPeriod.Duration > 0 && + (params.ScriptOptions.SystemTags.Has(stats.TagVU) || params.ScriptOptions.SystemTags.Has(stats.TagIter)) { return nil, errors.New("aggregation cannot be enabled if the 'vu' or 'iter' system tag is also enabled") } if !conf.Name.Valid || conf.Name.String == "" { - scriptPath := scriptURL.String() + scriptPath := params.ScriptPath.String() if scriptPath == "" { // Script from stdin without a name, likely from stdin return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.ext.loadimpact.name") @@ -113,12 +136,7 @@ func New( conf.Name = null.StringFrom(TestName) } - thresholds := make(map[string][]*stats.Threshold) - for name, t := range opts.Thresholds { - thresholds[name] = append(thresholds[name], t.Thresholds...) - } - - duration, testEnds := lib.GetEndOffset(executionPlan) + duration, testEnds := lib.GetEndOffset(params.ExecutionPlan) if !testEnds { return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") } @@ -139,30 +157,50 @@ func New( } return &Collector{ - config: conf, - thresholds: thresholds, - client: cloudapi.NewClient(logger, conf.Token.String, conf.Host.String, version), - anonymous: !conf.Token.Valid, - executionPlan: executionPlan, - duration: int64(duration / time.Second), - opts: opts, - aggrBuckets: map[int64]map[[3]string]aggregationBucket{}, - stopSendingMetricsCh: make(chan struct{}), - logger: logger, + config: conf, + client: cloudapi.NewClient(logger, conf.Token.String, conf.Host.String, consts.Version), + executionPlan: params.ExecutionPlan, + duration: int64(duration / time.Second), + opts: params.ScriptOptions, + aggrBuckets: map[int64]map[[3]string]aggregationBucket{}, + logger: logger, pushBufferPool: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, + stopSendingMetrics: make(chan struct{}), + stopAggregation: make(chan struct{}), + aggregationDone: &sync.WaitGroup{}, + stopOutput: make(chan struct{}), + outputDone: &sync.WaitGroup{}, }, nil } -// Init is called between the collector's creation and the call to Run(). -// You should do any lengthy setup here rather than in New. -func (c *Collector) Init() error { +// validateRequiredSystemTags checks if all required tags are present. +func validateRequiredSystemTags(scriptTags *stats.SystemTagSet) error { + missingRequiredTags := []string{} + requiredTags := stats.TagName | stats.TagMethod | stats.TagStatus | stats.TagError | stats.TagCheck | stats.TagGroup + for _, tag := range stats.SystemTagSetValues() { + if requiredTags.Has(tag) && !scriptTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return fmt.Errorf( + "the cloud output needs the following system tags enabled: %s", + strings.Join(missingRequiredTags, ", "), + ) + } + return nil +} + +// Start calls the k6 Cloud API to initialize the test run, and then starts the +// goroutine that would listen for metric samples and send them to the cloud. +func (c *Collector) Start() error { if c.config.PushRefID.Valid { c.referenceID = c.config.PushRefID.String - c.logger.WithField("referenceId", c.referenceID).Debug("Cloud: directly pushing metrics without init") + c.logger.WithField("referenceId", c.referenceID).Debug("directly pushing metrics without init") return nil } @@ -192,76 +230,105 @@ func (c *Collector) Init() error { if response.ConfigOverride != nil { c.logger.WithFields(logrus.Fields{ "override": response.ConfigOverride, - }).Debug("Cloud: overriding config options") + }).Debug("overriding config options") c.config = c.config.Apply(*response.ConfigOverride) } + c.startBackgroundProcesses() + c.logger.WithFields(logrus.Fields{ "name": c.config.Name, "projectId": c.config.ProjectID, "duration": c.duration, "referenceId": c.referenceID, - }).Debug("Cloud: Initialized") + }).Debug("Started!") return nil } -// Link return a link that is shown to the user. -func (c *Collector) Link() string { - return cloudapi.URLForResults(c.referenceID, c.config) -} - -// Run is called in a goroutine and starts the collector. Should commit samples to the backend -// at regular intervals and when the context is terminated. -func (c *Collector) Run(ctx context.Context) { - wg := sync.WaitGroup{} - quit := ctx.Done() +func (c *Collector) startBackgroundProcesses() { aggregationPeriod := time.Duration(c.config.AggregationPeriod.Duration) // If enabled, start periodically aggregating the collected HTTP trails if aggregationPeriod > 0 { - wg.Add(1) - aggregationTicker := time.NewTicker(aggregationPeriod) - aggregationWaitPeriod := time.Duration(c.config.AggregationWaitPeriod.Duration) - signalQuit := make(chan struct{}) - quit = signalQuit - + c.aggregationDone.Add(1) go func() { - defer wg.Done() + defer c.aggregationDone.Done() + aggregationWaitPeriod := time.Duration(c.config.AggregationWaitPeriod.Duration) + aggregationTicker := time.NewTicker(aggregationPeriod) + defer aggregationTicker.Stop() + for { select { - case <-c.stopSendingMetricsCh: + case <-c.stopSendingMetrics: return case <-aggregationTicker.C: c.aggregateHTTPTrails(aggregationWaitPeriod) - case <-ctx.Done(): + case <-c.stopAggregation: c.aggregateHTTPTrails(0) c.flushHTTPTrails() - close(signalQuit) return } } }() } - defer func() { - wg.Wait() - c.testFinished() + c.outputDone.Add(1) + go func() { + defer c.outputDone.Done() + pushTicker := time.NewTicker(time.Duration(c.config.MetricPushInterval.Duration)) + defer pushTicker.Stop() + for { + select { + case <-c.stopSendingMetrics: + return + default: + } + select { + case <-c.stopOutput: + c.pushMetrics() + return + case <-pushTicker.C: + c.pushMetrics() + } + } }() +} - pushTicker := time.NewTicker(time.Duration(c.config.MetricPushInterval.Duration)) - for { - select { - case <-c.stopSendingMetricsCh: - return - default: - } - select { - case <-quit: - c.pushMetrics() - return - case <-pushTicker.C: - c.pushMetrics() - } +// Stop gracefully stops all metric emission from the output and when all metric +// samples are emitted, it sends an API to the cloud to finish the test run. +func (c *Collector) Stop() error { + c.logger.Debug("Stopping the cloud output...") + close(c.stopAggregation) + c.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation + c.logger.Debug("Aggregation stopped, stopping metric emission...") + close(c.stopOutput) + c.outputDone.Wait() + c.logger.Debug("Metric emission stopped, calling cloud API...") + err := c.testFinished() + if err != nil { + c.logger.WithFields(logrus.Fields{"error": err}).Warn("Failed to send test finished to the cloud") + } else { + c.logger.Debug("Cloud output successfully stopped!") + } + return err +} + +// Description returns the URL with the test run results. +func (c *Collector) Description() string { + return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(c.referenceID, c.config)) +} + +// SetRunStatus receives the latest run status from the Engine. +func (c *Collector) SetRunStatus(status lib.RunStatus) { + c.runStatus = status +} + +// SetThresholds receives the thresholds before the output is Start()-ed. +func (c *Collector) SetThresholds(scriptThresholds map[string]stats.Thresholds) { + thresholds := make(map[string][]*stats.Threshold) + for name, t := range scriptThresholds { + thresholds[name] = append(thresholds[name], t.Thresholds...) } + c.thresholds = thresholds } func useCloudTags(source *httpext.Trail) *httpext.Trail { @@ -282,11 +349,12 @@ func useCloudTags(source *httpext.Trail) *httpext.Trail { return dest } -// Collect receives a set of samples. This method is never called concurrently, and only while -// the context for Run() is valid, but should defer as much work as possible to Run(). -func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { +// AddMetricSamples receives a set of metric samples. This method is never +// called concurrently, so it defers as much of the work as possible to the +// asynchronous goroutines initialized in Start(). +func (c *Collector) AddMetricSamples(sampleContainers []stats.SampleContainer) { select { - case <-c.stopSendingMetricsCh: + case <-c.stopSendingMetrics: return default: } @@ -601,7 +669,7 @@ func (c *Collector) pushMetrics() { if err != nil { if c.shouldStopSendingMetrics(err) { c.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") - close(c.stopSendingMetricsCh) + close(c.stopSendingMetrics) break } c.logger.WithError(err).Warn("Failed to send metrics to cloud") @@ -613,9 +681,9 @@ func (c *Collector) pushMetrics() { }).Debug("Pushing metrics to cloud finished") } -func (c *Collector) testFinished() { +func (c *Collector) testFinished() error { if c.referenceID == "" || c.config.PushRefID.Valid { - return + return nil } testTainted := false @@ -640,22 +708,7 @@ func (c *Collector) testFinished() { runStatus = c.runStatus } - err := c.client.TestFinished(c.referenceID, thresholdResults, testTainted, runStatus) - if err != nil { - c.logger.WithFields(logrus.Fields{ - "error": err, - }).Warn("Failed to send test finished to cloud") - } -} - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.TagName | stats.TagMethod | stats.TagStatus | stats.TagError | stats.TagCheck | stats.TagGroup -} - -// SetRunStatus Set run status -func (c *Collector) SetRunStatus(status lib.RunStatus) { - c.runStatus = status + return c.client.TestFinished(c.referenceID, thresholdResults, testTainted, runStatus) } const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate diff --git a/stats/cloud/collector_test.go b/output/cloud/output_test.go similarity index 81% rename from stats/cloud/collector_test.go rename to output/cloud/output_test.go index c8b65b936c0..9bfc50b95a7 100644 --- a/stats/cloud/collector_test.go +++ b/output/cloud/output_test.go @@ -17,13 +17,11 @@ * along with this program. If not, see . * */ - package cloud import ( "bytes" "compress/gzip" - "context" "encoding/json" "fmt" "io" @@ -40,7 +38,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/cloudapi" "github.com/loadimpact/k6/lib" @@ -50,6 +47,7 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/testutils/httpmultibin" "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/output" "github.com/loadimpact/k6/stats" ) @@ -176,15 +174,15 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { })) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -195,7 +193,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { assert.False(t, collector.config.AggregationPeriod.Valid) assert.False(t, collector.config.AggregationWaitPeriod.Valid) - require.NoError(t, collector.Init()) + require.NoError(t, collector.Start()) assert.Equal(t, "123", collector.referenceID) assert.True(t, collector.config.MetricPushInterval.Valid) assert.Equal(t, types.Duration(10*time.Millisecond), collector.config.MetricPushInterval.Duration) @@ -218,15 +216,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { rw.WriteHeader(http.StatusOK) // silence a test warning }) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - - collector.Collect([]stats.SampleContainer{stats.Sample{ + collector.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: tags, @@ -256,7 +246,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { Duration: 1500 * time.Millisecond, Tags: tags, } - collector.Collect([]stats.SampleContainer{&simpleTrail}) + collector.AddMetricSamples([]stats.SampleContainer{&simpleTrail}) expSamples <- []Sample{*NewSampleFromTrail(&simpleTrail)} smallSkew := 0.02 @@ -281,7 +271,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { outlierTrail := skewTrail(simpleTrail, 2.0+smallSkew, 3.0+smallSkew) trails = append(trails, &outlierTrail) - collector.Collect(trails) + collector.AddMetricSamples(trails) expSamples <- []Sample{ *NewSampleFromTrail(&outlierTrail), { @@ -306,8 +296,7 @@ func runCloudCollectorTestCase(t *testing.T, minSamples int) { }, } - cancel() - wg.Wait() + require.NoError(t, collector.Stop()) } func TestCloudCollectorMaxPerPacket(t *testing.T) { @@ -327,17 +316,19 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { }`, maxMetricSamplesPerPackage) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/12", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") + require.NoError(t, err) require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -358,16 +349,9 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { } }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, collector.Start()) - collector.Collect([]stats.SampleContainer{stats.Sample{ + collector.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), @@ -390,11 +374,10 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { Tags: stats.NewSampleTags(tags.CloneTags()), }) } - collector.Collect(container) + collector.AddMetricSamples(container) } - cancel() - wg.Wait() + require.NoError(t, collector.Stop()) require.True(t, gotTheLimit) } @@ -421,19 +404,25 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { }`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/12", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), - MaxMetricSamplesPerPackage: null.IntFrom(50), - Name: null.StringFrom("my-custom-name"), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50, + "name": "something-that-should-be-overwritten" + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + External: map[string]json.RawMessage{ + "loadimpact": json.RawMessage(`{"name": "my-custom-name"}`), + }, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) now := time.Now() tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) @@ -464,16 +453,9 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { assert.NoError(t, json.Unmarshal(body, &receivedSamples)) }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, collector.Start()) - collector.Collect([]stats.SampleContainer{stats.Sample{ + collector.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), @@ -496,14 +478,14 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { Tags: stats.NewSampleTags(tags.CloneTags()), }) } - collector.Collect(container) + collector.AddMetricSamples(container) } - cancel() - wg.Wait() + require.NoError(t, collector.Stop()) + require.Equal(t, lib.RunStatusQueued, collector.runStatus) select { - case <-collector.stopSendingMetricsCh: + case <-collector.stopSendingMetrics: // all is fine default: t.Fatal("sending metrics wasn't stopped") @@ -512,7 +494,7 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { nBufferSamples := len(collector.bufferSamples) nBufferHTTPTrails := len(collector.bufferHTTPTrails) - collector.Collect([]stats.SampleContainer{stats.Sample{ + collector.AddMetricSamples([]stats.SampleContainer{stats.Sample{ Time: now, Metric: metrics.VUs, Tags: stats.NewSampleTags(tags.CloneTags()), @@ -525,15 +507,14 @@ func TestCloudCollectorStopSendingMetric(t *testing.T) { func TestCloudCollectorRequireScriptName(t *testing.T) { t.Parallel() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - NoCompress: null.BoolFrom(true), - MaxMetricSamplesPerPackage: null.IntFrom(50), + _, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: ""}, }) - _, err := New(testutils.NewLogger(t), config, &url.URL{Path: ""}, options, []lib.ExecutionStep{}, "1.0") require.Error(t, err) assert.Contains(t, err.Error(), "script name not set") } @@ -553,17 +534,21 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { }`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/123", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50 + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) assert.True(t, collector.config.Host.Valid) @@ -574,7 +559,7 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { assert.False(t, collector.config.AggregationPeriod.Valid) assert.False(t, collector.config.AggregationWaitPeriod.Valid) - require.NoError(t, collector.Init()) + require.NoError(t, collector.Start()) assert.Equal(t, "123", collector.referenceID) assert.True(t, collector.config.MetricPushInterval.Valid) assert.Equal(t, types.Duration(10*time.Millisecond), collector.config.MetricPushInterval.Duration) @@ -587,16 +572,7 @@ func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { defer close(expSamples) tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples)) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - - cancel() - wg.Wait() + require.NoError(t, collector.Stop()) require.Equal(t, lib.RunStatusQueued, collector.runStatus) } @@ -614,17 +590,21 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { _, err = fmt.Fprintf(resp, `{"reference_id": "123"}`) require.NoError(t, err) })) + tb.Mux.HandleFunc("/v1/tests/123", func(rw http.ResponseWriter, _ *http.Request) { rw.WriteHeader(http.StatusOK) }) defer tb.Cleanup() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(tb.ServerHTTP.URL), - NoCompress: null.BoolFrom(true), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{ + "host": "%s", "noCompress": true, + "maxMetricSamplesPerPackage": 50 + }`, tb.ServerHTTP.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "path/to/script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "path/to/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) gotIterations := false @@ -656,14 +636,7 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { m.Unlock() }) - require.NoError(t, collector.Init()) - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() + require.NoError(t, collector.Start()) now := time.Now() simpleNetTrail := netext.NetTrail{ @@ -691,10 +664,8 @@ func TestCloudCollectorRecvIterLIAllIterations(t *testing.T) { }, } - collector.Collect([]stats.SampleContainer{&simpleNetTrail}) - - cancel() - wg.Wait() + collector.AddMetricSamples([]stats.SampleContainer{&simpleNetTrail}) + require.NoError(t, collector.Stop()) require.True(t, gotIterations) } @@ -734,9 +705,14 @@ func TestNewName(t *testing.T) { testCase := testCase t.Run(testCase.url.String(), func(t *testing.T) { - collector, err := New(testutils.NewLogger(t), cloudapi.NewConfig(), testCase.url, lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - }, []lib.ExecutionStep{}, "1.0") + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: testCase.url, + }) require.NoError(t, err) require.Equal(t, collector.config.Name.String, testCase.expected) }) @@ -767,14 +743,15 @@ func TestPublishMetric(t *testing.T) { })) defer server.Close() - options := lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - } - config := cloudapi.NewConfig().Apply(cloudapi.Config{ - Host: null.StringFrom(server.URL), - NoCompress: null.BoolFrom(true), + collector, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, server.URL)), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &stats.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "script.js"}, }) - collector, err := New(testutils.NewLogger(t), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0") require.NoError(t, err) samples := []*Sample{ diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 31431532d7d..7506e9fd12b 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -130,9 +130,6 @@ func (c *Collector) Init() error { return nil } -// SetRunStatus does nothing -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - // Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { ticker := time.NewTicker(c.saveInterval) @@ -245,8 +242,3 @@ func IsStringInSlice(slice []string, str string) bool { } return true } - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 6044b5d3789..f4a1e36f10c 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -462,18 +462,6 @@ func TestNew(t *testing.T) { } } -func TestGetRequiredSystemTags(t *testing.T) { - collector, err := New( - testutils.NewLogger(t), - afero.NewMemMapFs(), - stats.TagSet{"tag1": true, "tag2": false, "tag3": true}, - Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, - ) - assert.NoError(t, err) - assert.NotNil(t, collector) - assert.Equal(t, stats.SystemTagSet(0), collector.GetRequiredSystemTags()) -} - func TestLink(t *testing.T) { collector, err := New( testutils.NewLogger(t), diff --git a/stats/influxdb/collector.go b/stats/influxdb/collector.go index 13c1ffdbb8e..56f6d7f6a0a 100644 --- a/stats/influxdb/collector.go +++ b/stats/influxdb/collector.go @@ -235,11 +235,3 @@ func (c *Collector) Format(samples []stats.Sample) ([]string, error) { return metrics, nil } - -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus does nothing in the InfluxDB collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} diff --git a/stats/kafka/collector.go b/stats/kafka/collector.go index db402cf553a..18ac9c47f01 100644 --- a/stats/kafka/collector.go +++ b/stats/kafka/collector.go @@ -29,7 +29,6 @@ import ( "github.com/Shopify/sarama" "github.com/sirupsen/logrus" - "github.com/loadimpact/k6/lib" jsono "github.com/loadimpact/k6/output/json" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/influxdb" @@ -101,14 +100,6 @@ func (c *Collector) Link() string { return "" } -// GetRequiredSystemTags returns which sample tags are needed by this collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // There are no required tags for this collector -} - -// SetRunStatus does nothing in the Kafka collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) { var metrics []string diff --git a/stats/statsd/common/collector.go b/stats/statsd/common/collector.go index 6abde5f1a60..5d307c3d91b 100644 --- a/stats/statsd/common/collector.go +++ b/stats/statsd/common/collector.go @@ -110,14 +110,6 @@ func (c *Collector) Run(ctx context.Context) { } } -// GetRequiredSystemTags Return the required system sample tags for the specific collector -func (c *Collector) GetRequiredSystemTags() stats.SystemTagSet { - return stats.SystemTagSet(0) // no tags are required -} - -// SetRunStatus does nothing in statsd collector -func (c *Collector) SetRunStatus(status lib.RunStatus) {} - // Collect metrics func (c *Collector) Collect(containers []stats.SampleContainer) { var pointSamples []*Sample diff --git a/stats/statsd/common/collector_test.go b/stats/statsd/common/collector_test.go index 53b0423ccdf..78469a67587 100644 --- a/stats/statsd/common/collector_test.go +++ b/stats/statsd/common/collector_test.go @@ -28,7 +28,6 @@ import ( "github.com/loadimpact/k6/lib/testutils" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/stats" ) type config struct { @@ -84,8 +83,3 @@ func TestLinkReturnAddress(t *testing.T) { } require.Equal(t, bogusValue, c.Link()) } - -func TestGetRequiredSystemTags(t *testing.T) { - c := &Collector{} - require.Equal(t, stats.SystemTagSet(0), c.GetRequiredSystemTags()) -}