diff --git a/cloudapi/config.go b/cloudapi/config.go index 2e0ca2bb34d..fe451101321 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -28,6 +28,7 @@ type Config struct { TestRunDetails null.String `json:"testRunDetails" envconfig:"K6_CLOUD_TEST_RUN_DETAILS"` NoCompress null.Bool `json:"noCompress" envconfig:"K6_CLOUD_NO_COMPRESS"` StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"` + APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"` @@ -149,6 +150,7 @@ func NewConfig() Config { MetricPushConcurrency: null.NewInt(1, false), MaxMetricSamplesPerPackage: null.NewInt(100000, false), Timeout: types.NewNullDuration(1*time.Minute, false), + APIVersion: null.NewInt(1, false), // Aggregation is disabled by default, since AggregationPeriod has no default value // but if it's enabled manually or from the cloud service, those are the default values it will use: AggregationCalcInterval: types.NewNullDuration(3*time.Second, false), @@ -200,6 +202,9 @@ func (c Config) Apply(cfg Config) Config { if cfg.Timeout.Valid { c.Timeout = cfg.Timeout } + if cfg.APIVersion.Valid { + c.APIVersion = cfg.APIVersion + } if cfg.MaxMetricSamplesPerPackage.Valid { c.MaxMetricSamplesPerPackage = cfg.MaxMetricSamplesPerPackage } @@ -209,7 +214,6 @@ func (c Config) Apply(cfg Config) Config { if cfg.MetricPushConcurrency.Valid { c.MetricPushConcurrency = cfg.MetricPushConcurrency } - if cfg.AggregationPeriod.Valid { c.AggregationPeriod = cfg.AggregationPeriod } diff --git a/cloudapi/config_test.go b/cloudapi/config_test.go index b25cbff9ac0..4dac5fc768a 100644 --- a/cloudapi/config_test.go +++ b/cloudapi/config_test.go @@ -34,6 +34,7 @@ func TestConfigApply(t *testing.T) { NoCompress: null.NewBool(true, true), StopOnError: null.NewBool(true, true), Timeout: types.NewNullDuration(5*time.Second, true), + APIVersion: null.NewInt(2, true), MaxMetricSamplesPerPackage: null.NewInt(2, true), MetricPushInterval: types.NewNullDuration(1*time.Second, true), MetricPushConcurrency: null.NewInt(3, true), diff --git a/output/cloud/output.go b/output/cloud/output.go index dc4419893a0..f1fc7e3d5b1 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -39,6 +39,14 @@ type versionedOutput interface { AddMetricSamples(samples []metrics.SampleContainer) } +type apiVersion int64 + +const ( + apiVersionUndefined apiVersion = iota + apiVersion1 + // apiVersion2 // TODO: add version 2 +) + // Output sends result data to the k6 Cloud service. type Output struct { versionedOutput @@ -159,12 +167,11 @@ func (out *Output) Start() error { thresholds[name] = append(thresholds[name], threshold.Source) } } - maxVUs := lib.GetMaxPossibleVUs(out.executionPlan) testRun := &cloudapi.TestRun{ Name: out.config.Name.String, ProjectID: out.config.ProjectID.Int64, - VUsMax: int64(maxVUs), + VUsMax: int64(lib.GetMaxPossibleVUs(out.executionPlan)), Thresholds: thresholds, Duration: out.duration, } @@ -323,9 +330,14 @@ func (out *Output) startVersionedOutput() error { if out.referenceID == "" { return errors.New("ReferenceID is required") } - var err error - out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client) + switch out.config.APIVersion.Int64 { + case int64(apiVersion1): + out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client) + default: + err = fmt.Errorf("v%d is an unexpected version", out.config.APIVersion.Int64) + } + if err != nil { return err } diff --git a/output/cloud/output_test.go b/output/cloud/output_test.go index 62bdec23a91..0bf9da2dc20 100644 --- a/output/cloud/output_test.go +++ b/output/cloud/output_test.go @@ -1,7 +1,9 @@ package cloud import ( + "errors" "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -10,11 +12,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/errext" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" "go.k6.io/k6/output" + cloudv1 "go.k6.io/k6/output/cloud/v1" + "gopkg.in/guregu/null.v3" ) func TestNewOutputNameResolution(t *testing.T) { @@ -127,3 +133,211 @@ func TestOutputCreateTestWithConfigOverwrite(t *testing.T) { require.NoError(t, out.StopWithTestError(nil)) } + +func TestOutputStartVersionError(t *testing.T) { + t.Parallel() + o, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + ScriptOptions: lib.Options{ + Duration: types.NullDurationFrom(1 * time.Second), + SystemTags: &metrics.DefaultSystemTagSet, + }, + Environment: map[string]string{ + "K6_CLOUD_API_VERSION": "99", + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + + o.referenceID = "123" + err = o.startVersionedOutput() + require.ErrorContains(t, err, "v99 is an unexpected version") +} + +func TestOutputStartVersionedOutputV1(t *testing.T) { + t.Parallel() + + o := Output{ + referenceID: "123", + config: cloudapi.Config{ + APIVersion: null.IntFrom(1), + // Here, we are mostly silencing the flushing op + MetricPushInterval: types.NullDurationFrom(1 * time.Hour), + }, + } + + err := o.startVersionedOutput() + require.NoError(t, err) + + _, ok := o.versionedOutput.(*cloudv1.Output) + assert.True(t, ok) +} + +func TestOutputStartWithReferenceID(t *testing.T) { + t.Parallel() + + handler := func(w http.ResponseWriter, r *http.Request) { + // no calls are expected to the cloud service when + // the reference ID is passed + t.Error("got unexpected call") + } + ts := httptest.NewServer(http.HandlerFunc(handler)) + defer ts.Close() + + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + Environment: map[string]string{ + "K6_CLOUD_HOST": ts.URL, + "K6_CLOUD_PUSH_REF_ID": "my-passed-id", + }, + ScriptOptions: lib.Options{ + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + require.NoError(t, out.Start()) + require.NoError(t, out.Stop()) +} + +func TestCloudOutputDescription(t *testing.T) { + t.Parallel() + + t.Run("WithTestRunDetails", func(t *testing.T) { + t.Parallel() + o := Output{referenceID: "74"} + o.config.TestRunDetails = null.StringFrom("my-custom-string") + assert.Equal(t, "cloud (my-custom-string)", o.Description()) + }) + t.Run("WithWebAppURL", func(t *testing.T) { + t.Parallel() + o := Output{referenceID: "74"} + o.config.WebAppURL = null.StringFrom("mywebappurl.com") + assert.Equal(t, "cloud (mywebappurl.com/runs/74)", o.Description()) + }) +} + +func TestOutputStopWithTestError(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + + handler := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/v1/tests/test-ref-id-1234": + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + // aborted by system status + expB := `{"result_status":0, "run_status":6, "thresholds":{}}` + require.JSONEq(t, expB, string(b)) + + w.WriteHeader(http.StatusOK) + close(done) + default: + http.Error(w, "not expected path", http.StatusInternalServerError) + } + } + ts := httptest.NewServer(http.HandlerFunc(handler)) + defer ts.Close() + + out, err := newOutput(output.Params{ + Logger: testutils.NewLogger(t), + Environment: map[string]string{ + "K6_CLOUD_HOST": ts.URL, + }, + ScriptOptions: lib.Options{ + SystemTags: &metrics.DefaultSystemTagSet, + }, + ScriptPath: &url.URL{Path: "/script.js"}, + }) + require.NoError(t, err) + + calledStopFn := false + out.referenceID = "test-ref-id-1234" + out.versionedOutput = versionedOutputMock{ + callback: func(fn string) { + if fn == "StopWithTestError" { + calledStopFn = true + } + }, + } + + fakeErr := errors.New("this is my error") + require.NoError(t, out.StopWithTestError(fakeErr)) + assert.True(t, calledStopFn) + + select { + case <-time.After(1 * time.Second): + t.Error("timed out") + case <-done: + } +} + +func TestOutputGetStatusRun(t *testing.T) { + t.Parallel() + + t.Run("Success", func(t *testing.T) { + t.Parallel() + o := Output{} + assert.Equal(t, cloudapi.RunStatusFinished, o.getRunStatus(nil)) + }) + t.Run("WithErrorNoAbortReason", func(t *testing.T) { + t.Parallel() + o := Output{logger: testutils.NewLogger(t)} + assert.Equal(t, cloudapi.RunStatusAbortedSystem, o.getRunStatus(errors.New("my-error"))) + }) + t.Run("WithAbortReason", func(t *testing.T) { + t.Parallel() + o := Output{} + errWithReason := errext.WithAbortReasonIfNone( + errors.New("my-original-error"), + errext.AbortedByOutput, + ) + assert.Equal(t, cloudapi.RunStatusAbortedSystem, o.getRunStatus(errWithReason)) + }) +} + +func TestOutputProxyAddMetricSamples(t *testing.T) { + t.Parallel() + + called := false + o := &Output{ + versionedOutput: versionedOutputMock{ + callback: func(fn string) { + if fn != "AddMetricSamples" { + return + } + called = true + }, + }, + } + o.AddMetricSamples([]metrics.SampleContainer{}) + assert.True(t, called) +} + +type versionedOutputMock struct { + callback func(name string) +} + +func (o versionedOutputMock) Start() error { + o.callback("Start") + return nil +} + +func (o versionedOutputMock) StopWithTestError(testRunErr error) error { + o.callback("StopWithTestError") + return nil +} + +func (o versionedOutputMock) SetTestRunStopCallback(_ func(error)) { + o.callback("SetTestRunStopCallback") +} + +func (o versionedOutputMock) SetReferenceID(id string) { + o.callback("SetReferenceID") +} + +func (o versionedOutputMock) AddMetricSamples(samples []metrics.SampleContainer) { + o.callback("AddMetricSamples") +}