From 450fa1e379995b7394fb7df5dd9b39b98d00035a Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Fri, 30 Aug 2019 14:34:49 +0700 Subject: [PATCH] stats/cloud: stop sending cloud metrics when limit reached Once the cloud test gets aborted by limit, we should not send metrics anymore. Close #1074 --- stats/cloud/collector.go | 71 ++++++++++---- stats/cloud/collector_test.go | 171 ++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 20 deletions(-) diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index ec2c90e26ed..de0f799e054 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -23,6 +23,7 @@ package cloud import ( "context" "encoding/json" + "net/http" "path/filepath" "sync" "time" @@ -69,6 +70,8 @@ type Collector struct { // checks basically O(1). And even if for some reason there are occasional metrics with past times that // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated aggrBuckets map[int64]aggregationBucket + + stopSendingMetricsCh chan struct{} } // Verify that Collector implements lib.Collector @@ -136,13 +139,14 @@ func New(conf Config, src *loader.SourceData, opts lib.Options, version string) } return &Collector{ - config: conf, - thresholds: thresholds, - client: NewClient(conf.Token.String, conf.Host.String, version), - anonymous: !conf.Token.Valid, - duration: duration, - opts: opts, - aggrBuckets: map[int64]aggregationBucket{}, + config: conf, + thresholds: thresholds, + client: NewClient(conf.Token.String, conf.Host.String, version), + anonymous: !conf.Token.Valid, + duration: duration, + opts: opts, + aggrBuckets: map[int64]aggregationBucket{}, + stopSendingMetricsCh: make(chan struct{}), }, nil } @@ -196,22 +200,28 @@ func (c *Collector) Link() string { // at regular intervals and when the context is terminated. func (c *Collector) Run(ctx context.Context) { wg := sync.WaitGroup{} - + quit := ctx.Done() + aggregationPeriod := time.Duration(c.config.AggregationPeriod.Duration) // If enabled, start periodically aggregating the collected HTTP trails - if c.config.AggregationPeriod.Duration > 0 { + if aggregationPeriod > 0 { wg.Add(1) - aggregationTicker := time.NewTicker(time.Duration(c.config.AggregationCalcInterval.Duration)) + aggregationTicker := time.NewTicker(aggregationPeriod) + aggregationWaitPeriod := time.Duration(c.config.AggregationWaitPeriod.Duration) + signalQuit := make(chan struct{}) + quit = signalQuit go func() { + defer wg.Done() for { select { + case <-c.stopSendingMetricsCh: + return case <-aggregationTicker.C: - c.aggregateHTTPTrails(time.Duration(c.config.AggregationWaitPeriod.Duration)) + c.aggregateHTTPTrails(aggregationWaitPeriod) case <-ctx.Done(): c.aggregateHTTPTrails(0) c.flushHTTPTrails() - c.pushMetrics() - wg.Done() + close(signalQuit) return } } @@ -226,11 +236,16 @@ func (c *Collector) Run(ctx context.Context) { pushTicker := time.NewTicker(time.Duration(c.config.MetricPushInterval.Duration)) for { select { - case <-pushTicker.C: - c.pushMetrics() - case <-ctx.Done(): + case <-c.stopSendingMetricsCh: + return + default: + } + select { + case <-quit: c.pushMetrics() return + case <-pushTicker.C: + c.pushMetrics() } } } @@ -441,6 +456,19 @@ func (c *Collector) flushHTTPTrails() { c.aggrBuckets = map[int64]aggregationBucket{} c.bufferSamples = append(c.bufferSamples, newSamples...) } + +func (c *Collector) shouldStopSendingMetrics(err error) bool { + if err == nil { + return false + } + + if errResp, ok := err.(ErrorResponse); ok && errResp.Response != nil { + return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 + } + + return false +} + func (c *Collector) pushMetrics() { c.bufferMutex.Lock() if len(c.bufferSamples) == 0 { @@ -462,9 +490,12 @@ func (c *Collector) pushMetrics() { } err := c.client.PushMetric(c.referenceID, c.config.NoCompress.Bool, buffer[:size]) if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - }).Warn("Failed to send metrics to cloud") + if c.shouldStopSendingMetrics(err) { + logrus.WithError(err).Warn("Stopped sending metrics to cloud due to an error") + close(c.stopSendingMetricsCh) + break + } + logrus.WithError(err).Warn("Failed to send metrics to cloud") } buffer = buffer[size:] } @@ -493,7 +524,7 @@ func (c *Collector) testFinished() { }).Debug("Sending test finished") runStatus := lib.RunStatusFinished - if c.runStatus != 0 { + if c.runStatus != lib.RunStatusQueued { runStatus = c.runStatus } diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go index 6c6580f7bad..5c4dddb56ae 100644 --- a/stats/cloud/collector_test.go +++ b/stats/cloud/collector_test.go @@ -355,3 +355,174 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { wg.Wait() require.True(t, gotTheLimit) } + +func TestCloudCollectorStopSendingMetric(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprint(w, `{ + "reference_id": "12", + "config": { + "metricPushInterval": "200ms", + "aggregationPeriod": "100ms", + "maxMetricSamplesPerPackage": 20, + "aggregationCalcInterval": "100ms", + "aggregationWaitPeriod": "100ms" + } + }`) + require.NoError(t, err) + })) + defer tb.Cleanup() + + script := &loader.SourceData{ + Data: []byte(""), + URL: &url.URL{Path: "/script.js"}, + } + + options := lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + } + + config := NewConfig().Apply(Config{ + Host: null.StringFrom(tb.ServerHTTP.URL), + NoCompress: null.BoolFrom(true), + }) + collector, err := New(config, script, options, "1.0") + require.NoError(t, err) + now := time.Now() + tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) + + count := 1 + max := 5 + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), + func(w http.ResponseWriter, r *http.Request) { + count++ + if count == max { + type payload struct { + Error ErrorResponse `json:"error"` + } + res := &payload{} + res.Error = ErrorResponse{Code: 4} + w.Header().Set("Content-Type", "application/json") + data, err := json.Marshal(res) + if err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write(data) + return + } + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + }) + + require.NoError(t, collector.Init()) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + + collector.Collect([]stats.SampleContainer{stats.Sample{ + Time: now, + Metric: metrics.VUs, + Tags: stats.NewSampleTags(tags.CloneTags()), + Value: 1.0, + }}) + for j := time.Duration(1); j <= 200; j++ { + var container = make([]stats.SampleContainer, 0, 500) + for i := time.Duration(1); i <= 50; i++ { + container = append(container, &httpext.Trail{ + Blocked: i % 200 * 100 * time.Millisecond, + Connecting: i % 200 * 200 * time.Millisecond, + TLSHandshaking: i % 200 * 300 * time.Millisecond, + Sending: i * i * 400 * time.Millisecond, + Waiting: 500 * time.Millisecond, + Receiving: 600 * time.Millisecond, + + EndTime: now.Add(i * 100), + ConnDuration: 500 * time.Millisecond, + Duration: j * i * 1500 * time.Millisecond, + Tags: stats.NewSampleTags(tags.CloneTags()), + }) + } + collector.Collect(container) + } + + cancel() + wg.Wait() + require.Equal(t, lib.RunStatusQueued, collector.runStatus) + _, ok := <-collector.stopSendingMetricsCh + require.False(t, ok) + require.Equal(t, max, count) +} + +func TestCloudCollectorAggregationPeriodZeroNoBlock(t *testing.T) { + t.Parallel() + tb := httpmultibin.NewHTTPMultiBin(t) + tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprintf(w, `{ + "reference_id": "123", + "config": { + "metricPushInterval": "10ms", + "aggregationPeriod": "0ms", + "aggregationCalcInterval": "40ms", + "aggregationWaitPeriod": "5ms" + } + }`) + require.NoError(t, err) + })) + defer tb.Cleanup() + + script := &loader.SourceData{ + Data: []byte(""), + URL: &url.URL{Path: "/script.js"}, + } + + options := lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + } + + config := NewConfig().Apply(Config{ + Host: null.StringFrom(tb.ServerHTTP.URL), + NoCompress: null.BoolFrom(true), + }) + collector, err := New(config, script, options, "1.0") + require.NoError(t, err) + + assert.True(t, collector.config.Host.Valid) + assert.Equal(t, tb.ServerHTTP.URL, collector.config.Host.String) + assert.True(t, collector.config.NoCompress.Valid) + assert.True(t, collector.config.NoCompress.Bool) + assert.False(t, collector.config.MetricPushInterval.Valid) + assert.False(t, collector.config.AggregationPeriod.Valid) + assert.False(t, collector.config.AggregationWaitPeriod.Valid) + + require.NoError(t, collector.Init()) + assert.Equal(t, "123", collector.referenceID) + assert.True(t, collector.config.MetricPushInterval.Valid) + assert.Equal(t, types.Duration(10*time.Millisecond), collector.config.MetricPushInterval.Duration) + assert.True(t, collector.config.AggregationPeriod.Valid) + assert.Equal(t, types.Duration(0), collector.config.AggregationPeriod.Duration) + assert.True(t, collector.config.AggregationWaitPeriod.Valid) + assert.Equal(t, types.Duration(5*time.Millisecond), collector.config.AggregationWaitPeriod.Duration) + + expSamples := make(chan []Sample) + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples)) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + + cancel() + wg.Wait() + require.Equal(t, lib.RunStatusQueued, collector.runStatus) +}