Skip to content

Commit

Permalink
core,cmd: Operations for init the new Atlas-based types
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Aug 2, 2022
1 parent fb3970f commit 1589807
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 48 deletions.
2 changes: 2 additions & 0 deletions cmd/integration_tests/eventloop/eventloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context,
Logger: logger,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
RootTagSet: metrics.NewTagSet(nil),
}

script = []byte("import {setTimeout} from 'k6/x/events';\n" + string(script))
Expand All @@ -54,6 +55,7 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context,
TestPreInitState: piState,
Options: newOpts,
Runner: runner,
RunTags: piState.RunTags(newOpts),
}

execScheduler, err := local.NewExecutionScheduler(testState)
Expand Down
9 changes: 7 additions & 2 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func loadTest(gs *globalState, cmd *cobra.Command, args []string) (*loadedTest,
)

gs.logger.Debugf("Gathering k6 runtime options...")

runtimeOptions, err := getRuntimeOptions(cmd.Flags(), gs.envVars)
if err != nil {
return nil, err
Expand All @@ -67,6 +68,11 @@ func loadTest(gs *globalState, cmd *cobra.Command, args []string) (*loadedTest,
RuntimeOptions: runtimeOptions,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
// The Atlas initialization: where the root node is created.
// All the new tag sets must branch out from the root,
// so the underhood structure can minimize the memory footprint
// by allocating the tag pair only once.
RootTagSet: metrics.NewTagSet(nil),
}

test := &loadedTest{
Expand Down Expand Up @@ -248,12 +254,11 @@ func (lct *loadedAndConfiguredTest) buildTestRunState(
return nil, err
}

// TODO: init atlas root node, etc.

return &lib.TestRunState{
TestPreInitState: lct.preInitState,
Runner: lct.initRunner,
Options: lct.derivedConfig.Options, // we will always run with the derived options
RunTags: lct.preInitState.RunTags(configToReinject),
}, nil
}

Expand Down
61 changes: 46 additions & 15 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
RuntimeOptions: lib.RuntimeOptions{},
Registry: reg,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg),
RootTagSet: metrics.NewTagSet(nil),
}
}

Expand All @@ -80,6 +81,7 @@ func getTestRunState(
TestPreInitState: piState,
Options: options,
Runner: runner,
RunTags: piState.RunTags(options),
}
}

Expand All @@ -89,7 +91,9 @@ func newTestEngineWithTestPreInitState( //nolint:golint
opts lib.Options, piState *lib.TestPreInitState,
) *testStruct {
if runner == nil {
runner = &minirunner.MiniRunner{}
runner = &minirunner.MiniRunner{
PreInitState: piState,
}
}

newOpts, err := executor.DeriveScenariosFromShortcuts(lib.Options{
Expand Down Expand Up @@ -296,7 +300,11 @@ func TestEngine_processSamples(t *testing.T) {
done := make(chan struct{})
runner := &minirunner.MiniRunner{
Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error {
out <- metrics.Sample{Metric: metric, Value: 1.25, Tags: metrics.IntoSampleTags(&map[string]string{"a": "1"})}
out <- metrics.Sample{
Metric: metric,
Value: 1.25,
Tags: metrics.NewTagSet(map[string]string{"a": "1"}).SampleTags(),
}
close(done)
return nil
},
Expand Down Expand Up @@ -332,7 +340,11 @@ func TestEngine_processSamples(t *testing.T) {
done := make(chan struct{})
runner := &minirunner.MiniRunner{
Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error {
out <- metrics.Sample{Metric: metric, Value: 1.25, Tags: metrics.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})}
out <- metrics.Sample{
Metric: metric,
Value: 1.25,
Tags: metrics.NewTagSet(map[string]string{"a": "1", "b": "2"}).SampleTags(),
}
close(done)
return nil
},
Expand Down Expand Up @@ -384,7 +396,11 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
done := make(chan struct{})
runner := &minirunner.MiniRunner{
Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error {
out <- metrics.Sample{Metric: metric, Value: 1.25, Tags: metrics.IntoSampleTags(&map[string]string{"a": "1"})}
out <- metrics.Sample{
Metric: metric,
Value: 1.25,
Tags: metrics.NewTagSet(map[string]string{"a": "1"}).SampleTags(),
}
close(done)
return nil
},
Expand Down Expand Up @@ -426,7 +442,11 @@ func TestEngineAbortedByThresholds(t *testing.T) {
done := make(chan struct{})
runner := &minirunner.MiniRunner{
Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error {
out <- metrics.Sample{Metric: metric, Value: 1.25, Tags: metrics.IntoSampleTags(&map[string]string{"a": "1"})}
out <- metrics.Sample{
Metric: metric,
Value: 1.25,
Tags: metrics.NewTagSet(map[string]string{"a": "1"}).SampleTags(),
}
<-ctx.Done()
close(done)
return nil
Expand Down Expand Up @@ -499,15 +519,27 @@ func TestEngine_processThresholds(t *testing.T) {
thresholds[m] = ths
}

runner := &minirunner.MiniRunner{}
test := newTestEngineWithTestPreInitState(
t, nil, runner, nil, lib.Options{Thresholds: thresholds}, piState,
t, nil, nil, nil, lib.Options{Thresholds: thresholds}, piState,
)

tag1 := piState.RootTagSet.BranchOut()
tag1.AddTag("a", "1")
tag2 := piState.RootTagSet.BranchOut()
tag2.AddTag("b", "1")

test.engine.OutputManager.AddMetricSamples(
[]metrics.SampleContainer{
metrics.Sample{Metric: gaugeMetric, Value: 1.25, Tags: metrics.IntoSampleTags(&map[string]string{"a": "1"})},
metrics.Sample{Metric: counterMetric, Value: 2, Tags: metrics.IntoSampleTags(&map[string]string{"b": "1"})},
metrics.Sample{
Metric: gaugeMetric,
Value: 1.25,
Tags: tag1.SampleTags(),
},
metrics.Sample{
Metric: counterMetric,
Value: 2,
Tags: tag2.SampleTags(),
},
},
)

Expand Down Expand Up @@ -688,10 +720,9 @@ func TestSentReceivedMetrics(t *testing.T) {

func TestRunTags(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)

runTagsMap := map[string]string{"foo": "bar", "test": "mest", "over": "written"}
runTags := metrics.NewSampleTags(runTagsMap)
tb := httpmultibin.NewHTTPMultiBin(t)
runTags := map[string]string{"foo": "bar", "test": "mest", "over": "written"}

script := []byte(tb.Replacer.Replace(`
import http from "k6/http";
Expand Down Expand Up @@ -780,22 +811,22 @@ func TestRunTags(t *testing.T) {
getExpectedOverVal := func(metricName string) string {
for _, sysMetric := range systemMetrics {
if sysMetric == metricName {
return runTagsMap["over"]
return runTags["over"]
}
}
return "the rainbow"
}

for _, s := range mockOutput.Samples {
for key, expVal := range runTagsMap {
for key, expVal := range runTags {
val, ok := s.Tags.Get(key)

if key == "over" {
expVal = getExpectedOverVal(s.Metric.Name)
}

assert.True(t, ok)
assert.Equalf(t, expVal, val, "Wrong tag value in sample for metric %#v", s.Metric)
assert.Equalf(t, expVal, val, "Wrong tag value in sample - expected: %v got: %v metric %#v", expVal, val, s.Metric)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,22 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- me

emitMetrics := func() {
t := time.Now()
tags := e.state.Test.RunTags.SampleTags()
samples := metrics.ConnectedSamples{
Samples: []metrics.Sample{
{
Time: t,
Metric: e.state.Test.BuiltinMetrics.VUs,
Value: float64(e.state.GetCurrentlyActiveVUsCount()),
Tags: e.state.Test.Options.RunTags,
Tags: tags,
}, {
Time: t,
Metric: e.state.Test.BuiltinMetrics.VUsMax,
Value: float64(e.state.GetInitializedVUsCount()),
Tags: e.state.Test.Options.RunTags,
Tags: tags,
},
},
Tags: e.state.Test.Options.RunTags,
Tags: tags,
Time: t,
}
metrics.PushIfNotDone(ctx, out, samples)
Expand Down
61 changes: 33 additions & 28 deletions core/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func getTestRunState(
TestPreInitState: piState,
Options: options,
Runner: runner,
RunTags: piState.RunTags(options),
}
}

Expand Down Expand Up @@ -355,29 +356,32 @@ func TestExecutionSchedulerSystemTags(t *testing.T) {
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
}()

expCommonTrailTags := metrics.IntoSampleTags(&map[string]string{
"group": "",
"method": "GET",
"name": sr("HTTPBIN_IP_URL/"),
"url": sr("HTTPBIN_IP_URL/"),
"proto": "HTTP/1.1",
"status": "200",
"expected_response": "true",
})
expTrailPVUTagsRaw := expCommonTrailTags.CloneTags()
expTrailPVUTagsRaw["scenario"] = "per_vu_test"
expTrailPVUTags := metrics.IntoSampleTags(&expTrailPVUTagsRaw)
expTrailSITagsRaw := expCommonTrailTags.CloneTags()
expTrailSITagsRaw["scenario"] = "shared_test"
expTrailSITags := metrics.IntoSampleTags(&expTrailSITagsRaw)
expNetTrailPVUTags := metrics.IntoSampleTags(&map[string]string{
"group": "",
"scenario": "per_vu_test",
})
expNetTrailSITags := metrics.IntoSampleTags(&map[string]string{
"group": "",
"scenario": "shared_test",
})
expCommonTrailTags := testRunState.RunTags.BranchOut()
expCommonTrailTags.AddTag("group", "")
expCommonTrailTags.AddTag("method", "GET")
expCommonTrailTags.AddTag("name", sr("HTTPBIN_IP_URL/"))
expCommonTrailTags.AddTag("url", sr("HTTPBIN_IP_URL/"))
expCommonTrailTags.AddTag("proto", "HTTP/1.1")
expCommonTrailTags.AddTag("status", "200")
expCommonTrailTags.AddTag("expected_response", "true")

expTrailPVUTagsRaw := expCommonTrailTags.BranchOut()
expTrailPVUTagsRaw.AddTag("scenario", "per_vu_test")
expTrailPVUTags := expTrailPVUTagsRaw.SampleTags()

expTrailSITagsRaw := expCommonTrailTags.BranchOut()
expTrailSITagsRaw.AddTag("scenario", "shared_test")
expTrailSITags := expTrailSITagsRaw.SampleTags()

expNetTrailPVUTagsRaw := testRunState.RunTags.BranchOut()
expNetTrailPVUTagsRaw.AddTag("group", "")
expNetTrailPVUTagsRaw.AddTag("scenario", "per_vu_test")
expNetTrailPVUTags := expNetTrailPVUTagsRaw.SampleTags()

expNetTrailSITagsRaw := testRunState.RunTags.BranchOut()
expNetTrailSITagsRaw.AddTag("group", "")
expNetTrailSITagsRaw.AddTag("scenario", "shared_test")
expNetTrailSITags := expNetTrailSITagsRaw.SampleTags()

var gotCorrectTags int
for {
Expand All @@ -393,8 +397,9 @@ func TestExecutionSchedulerSystemTags(t *testing.T) {
gotCorrectTags++
}
}

case <-done:
require.Equal(t, 4, gotCorrectTags, "received wrong amount of samples with expected tags")
assert.Equal(t, 4, gotCorrectTags, "received wrong amount of samples with expected tags")
return
}
}
Expand Down Expand Up @@ -900,7 +905,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) {
assert.True(t, isFinal)

startTime := time.Now()
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
runTime := time.Since(startTime)
assert.True(t, runTime > 1*time.Second, "test did not take 1s")
assert.True(t, runTime < 10*time.Second, "took more than 10 seconds")
Expand Down Expand Up @@ -1239,11 +1244,11 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
}

getTags := func(args ...string) *metrics.SampleTags {
tags := map[string]string{}
tags := metrics.NewTagSet(nil)
for i := 0; i < len(args)-1; i += 2 {
tags[args[i]] = args[i+1]
tags.AddTag(args[i], args[i+1])
}
return metrics.IntoSampleTags(&tags)
return tags.SampleTags()
}
testCounter, err := piState.Registry.NewMetric("test_counter", metrics.Counter)
require.NoError(t, err)
Expand Down

0 comments on commit 1589807

Please sign in to comment.