From 4c09c5e1d23c702ad6b319ccb7f6f48c517ac777 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Fri, 12 May 2023 11:11:59 +0200 Subject: [PATCH] Handle the interrupt case when it get an error --- output/cloud/expv2/output.go | 55 ++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 296048dfe162..7cfa06f01d01 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -4,9 +4,12 @@ package expv2 import ( "context" + "net/http" "time" "go.k6.io/k6/cloudapi" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/metrics" "go.k6.io/k6/output" "go.k6.io/k6/output/cloud/expv2/pbcloud" @@ -40,6 +43,9 @@ type Output struct { // a sequential ID for metrics // then we could reuse the same strategy here activeSeries map[*metrics.Metric]aggregatedSamples + + testStopFunc func(error) + stopSendingMetrics chan struct{} } // New creates a new cloud output. @@ -57,10 +63,11 @@ func New(logger logrus.FieldLogger, conf cloudapi.Config) (*Output, error) { // } return &Output{ - config: conf, - metricsFlusher: mc, - logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}), - activeSeries: make(map[*metrics.Metric]aggregatedSamples), + config: conf, + metricsFlusher: mc, + logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}), + activeSeries: make(map[*metrics.Metric]aggregatedSamples), + stopSendingMetrics: make(chan struct{}), }, nil } @@ -93,6 +100,11 @@ func (o *Output) SetReferenceID(refID string) { o.referenceID = refID } +// SetTestRunStopCallback receives the function that stops the engine on error +func (o *Output) SetTestRunStopCallback(stopFunc func(error)) { + o.testStopFunc = stopFunc +} + // AddMetricSamples receives a set of metric samples. func (o *Output) collectMetrics() { if o.referenceID == "" { @@ -130,13 +142,46 @@ func (o *Output) collectMetrics() { err := o.metricsFlusher.Push(ctx, o.referenceID, &pbcloud.MetricSet{Metrics: metricSet}) if err != nil { - o.logger.WithError(err).Error("failed to push metrics to the cloud") + o.logger.WithError(err).Error("Failed to push metrics to the cloud") + + if o.shouldStopSendingMetrics(err) { + o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error") + serr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), + errext.AbortedByOutput, + ) + if o.config.StopOnError.Bool { + o.testStopFunc(serr) + } + close(o.stopSendingMetrics) + } return } o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered samples to the cloud") } +// shouldStopSendingMetrics returns true if the output should interrupt the metric flush. +// +// note: The actual test execution should continues, +// since for local k6 run tests the end-of-test summary (or any other outputs) will still work, +// but the cloud output doesn't send any more metrics. +// Instead, if cloudapi.Config.StopOnError is enabled +// the cloud output should stop the whole test run too. +// This logic should be handled by the caller. +func (o *Output) shouldStopSendingMetrics(err error) bool { + if err == nil { + return false + } + if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint + // The Cloud service returns the error code 4 when it doesn't accept any more metrics. + // So, when k6 sees that, the cloud output just stops prematurely. + return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 + } + + return false +} + // collectSamples drain the buffer and collect all the samples func (o *Output) collectSamples(containers []metrics.SampleContainer) { var (