From f8bf4aed7ba22e0f270170b7b0a02e303bb7d6ae Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Fri, 30 Aug 2019 14:34:49 +0700 Subject: [PATCH] stats/cloud: stop sending cloud metrics when limit reached Once the cloud test gets aborted by limit, we should not send metrics anymore. Close #1074 --- stats/cloud/collector.go | 39 +++++++++++-- stats/cloud/collector_test.go | 103 ++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 4 deletions(-) diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index ec2c90e26ed8..b018a0358e94 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -23,6 +23,7 @@ package cloud import ( "context" "encoding/json" + "net/http" "path/filepath" "sync" "time" @@ -69,6 +70,8 @@ type Collector struct { // checks basically O(1). And even if for some reason there are occasional metrics with past times that // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated aggrBuckets map[int64]aggregationBucket + + stopCh chan struct{} } // Verify that Collector implements lib.Collector @@ -143,6 +146,7 @@ func New(conf Config, src *loader.SourceData, opts lib.Options, version string) duration: duration, opts: opts, aggrBuckets: map[int64]aggregationBucket{}, + stopCh: make(chan struct{}), }, nil } @@ -204,6 +208,12 @@ func (c *Collector) Run(ctx context.Context) { go func() { for { + select { + case <-c.stopCh: + return + default: + } + select { case <-aggregationTicker.C: c.aggregateHTTPTrails(time.Duration(c.config.AggregationWaitPeriod.Duration)) @@ -225,6 +235,11 @@ func (c *Collector) Run(ctx context.Context) { pushTicker := time.NewTicker(time.Duration(c.config.MetricPushInterval.Duration)) for { + select { + case <-c.stopCh: + return + default: + } select { case <-pushTicker.C: c.pushMetrics() @@ -441,6 +456,19 @@ func (c *Collector) flushHTTPTrails() { c.aggrBuckets = map[int64]aggregationBucket{} c.bufferSamples = append(c.bufferSamples, newSamples...) } + +func (c *Collector) shouldStopSendingMetrics(err error) bool { + if err == nil { + return false + } + + if errResp, ok := err.(ErrorResponse); ok && errResp.Response != nil { + return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 + } + + return false +} + func (c *Collector) pushMetrics() { c.bufferMutex.Lock() if len(c.bufferSamples) == 0 { @@ -462,9 +490,12 @@ func (c *Collector) pushMetrics() { } err := c.client.PushMetric(c.referenceID, c.config.NoCompress.Bool, buffer[:size]) if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - }).Warn("Failed to send metrics to cloud") + if c.shouldStopSendingMetrics(err) { + logrus.Warn("Stopped sending metrics to cloud due to an error.") + close(c.stopCh) + break + } + logrus.WithError(err).Warn("Failed to send metrics to cloud") } buffer = buffer[size:] } @@ -493,7 +524,7 @@ func (c *Collector) testFinished() { }).Debug("Sending test finished") runStatus := lib.RunStatusFinished - if c.runStatus != 0 { + if c.runStatus != lib.RunStatusQueued { runStatus = c.runStatus } diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go index a0f926271c09..80ab621ae71a 100644 --- a/stats/cloud/collector_test.go +++ b/stats/cloud/collector_test.go @@ -355,3 +355,106 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) { wg.Wait() require.True(t, gotTheLimit) } + +func TestCloudCollectorStopSendingMetric(t *testing.T) { + t.Parallel() + tb := testutils.NewHTTPMultiBin(t) + tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := fmt.Fprint(w, `{ + "reference_id": "12", + "config": { + "metricPushInterval": "200ms", + "aggregationPeriod": "100ms", + "maxMetricSamplesPerPackage": 20, + "aggregationCalcInterval": "100ms", + "aggregationWaitPeriod": "100ms" + } + }`) + require.NoError(t, err) + })) + defer tb.Cleanup() + + script := &loader.SourceData{ + Data: []byte(""), + URL: &url.URL{Path: "/script.js"}, + } + + options := lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + } + + config := NewConfig().Apply(Config{ + Host: null.StringFrom(tb.ServerHTTP.URL), + NoCompress: null.BoolFrom(true), + }) + collector, err := New(config, script, options, "1.0") + require.NoError(t, err) + now := time.Now() + tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) + + count := 1 + max := 5 + tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), + func(w http.ResponseWriter, r *http.Request) { + count++ + if count == max { + type payload struct { + Error ErrorResponse `json:"error"` + } + res := &payload{} + res.Error = ErrorResponse{Code: 4} + w.Header().Set("Content-Type", "application/json") + data, err := json.Marshal(res) + if err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write(data) + return + } + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + receivedSamples := []Sample{} + assert.NoError(t, json.Unmarshal(body, &receivedSamples)) + }) + + require.NoError(t, collector.Init()) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + + collector.Collect([]stats.SampleContainer{stats.Sample{ + Time: now, + Metric: metrics.VUs, + Tags: stats.NewSampleTags(tags.CloneTags()), + Value: 1.0, + }}) + for j := time.Duration(1); j <= 200; j++ { + var container = make([]stats.SampleContainer, 0, 500) + for i := time.Duration(1); i <= 50; i++ { + container = append(container, &httpext.Trail{ + Blocked: i % 200 * 100 * time.Millisecond, + Connecting: i % 200 * 200 * time.Millisecond, + TLSHandshaking: i % 200 * 300 * time.Millisecond, + Sending: i * i * 400 * time.Millisecond, + Waiting: 500 * time.Millisecond, + Receiving: 600 * time.Millisecond, + + EndTime: now.Add(i * 100), + ConnDuration: 500 * time.Millisecond, + Duration: j * i * 1500 * time.Millisecond, + Tags: stats.NewSampleTags(tags.CloneTags()), + }) + } + collector.Collect(container) + } + + cancel() + wg.Wait() + require.Equal(t, lib.RunStatusQueued, collector.runStatus) + require.Equal(t, struct{}{}, <-collector.stopCh) +}