diff --git a/js/modules/k6/asyncGroup.go b/js/modules/k6/asyncGroup.go new file mode 100644 index 000000000000..c1ffc0a04a67 --- /dev/null +++ b/js/modules/k6/asyncGroup.go @@ -0,0 +1,78 @@ +package k6 + +import ( + "context" + "time" + + "github.com/dop251/goja" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" +) + +type groupInstance struct { + *lib.Group + startTime time.Time +} + +func (g *groupInstance) start() { + g.startTime = time.Now() +} + +func (g *groupInstance) finalize(ctx context.Context, state *lib.State) { + t := time.Now() + ctm := state.Tags.GetCurrentValues() + metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: state.BuiltinMetrics.GroupDuration, + Tags: ctm.Tags, + }, + Time: t, + Value: metrics.D(t.Sub(g.startTime)), + Metadata: ctm.Metadata, + }) +} + +func (mi *K6) asyncGroup(name string, fn goja.Callable) (goja.Value, error) { + rt := mi.vu.Runtime() + state := mi.vu.State() + p, res, _ := rt.NewPromise() + promiseObject := rt.ToValue(p).ToObject(rt) + baseGroup := state.Group + then, _ := goja.AssertFunction(promiseObject.Get("then")) + res(nil) + return then(promiseObject, rt.ToValue(func(result goja.Value) (goja.Value, error) { + g, err := baseGroup.Group(name) + if err != nil { + return goja.Undefined(), err + } + + gi := &groupInstance{Group: g} + mi.groupInstance = gi + setGroup(g, state) + gi.start() + if err != nil { + return nil, err // actually return a promise ?!? + } + return fn(goja.Undefined()) + })) +} + +func setGroup(g *lib.Group, state *lib.State) *lib.Group { + old := state.Group + state.Group = g + + if state.Options.SystemTags.Has(metrics.TagGroup) { + state.Tags.Modify(func(tagsAndMeta *metrics.TagsAndMeta) { + tagsAndMeta.SetSystemTagOrMeta(metrics.TagGroup, g.Path) + }) + } + + return old +} + +func rootGroup(g *lib.Group) *lib.Group { + for g.Parent != nil { + g = g.Parent + } + return g +} diff --git a/js/modules/k6/asyncGroup_test.go b/js/modules/k6/asyncGroup_test.go new file mode 100644 index 000000000000..c43ff3ec4fbe --- /dev/null +++ b/js/modules/k6/asyncGroup_test.go @@ -0,0 +1,253 @@ +package k6 + +import ( + "fmt" + "testing" + "time" + + "github.com/dop251/goja" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js/modulestest" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" +) + +func testSetup(tb testing.TB) (*modulestest.Runtime, *K6) { + runtime := modulestest.NewRuntime(tb) + m, ok := New().NewModuleInstance(runtime.VU).(*K6) + require.True(tb, ok) + require.NoError(tb, runtime.VU.Runtime().Set("k6", m.Exports().Named)) + return runtime, m +} + +// TODO try to get this in modulesTest +func moveToVUCode(tb testing.TB, runtime *modulestest.Runtime) chan metrics.SampleContainer { + root, err := lib.NewGroup("", nil) + assert.NoError(tb, err) + samples := make(chan metrics.SampleContainer, 1000) + state := &lib.State{ + Samples: samples, + Tags: lib.NewVUStateTags(runtime.VU.InitEnvField.Registry.RootTagSet()), + Options: lib.Options{ + SystemTags: metrics.NewSystemTagSet(metrics.TagGroup), + }, + BuiltinMetrics: runtime.BuiltinMetrics, + } + setGroup(root, state) + runtime.MoveToVUContext(state) + return samples +} + +func TestAsyncGroup(t *testing.T) { + t.Parallel() + + cases := []string{ + ` + k6.group("my group", async () => { + fn("::my group", ""); + await fn("::my group", ""); + fn("::my group", ""); + Promise.resolve("").then( () => { + fn("") + }) + }).then(() => { + fn(""); + }) + fn(""); + `, + ` + k6.group("my group", async () => { + fn("::my group", ""); + await fn("::my group", ""); + fn("::my group", ""); + k6.group("second", async() => { + fn("::my group::second", "my group", ""); + await fn("::my group::second", "my group", ""); + fn("::my group::second", "my group", ""); + await fn("::my group::second", "my group", ""); + fn("::my group::second", "my group", ""); + }); + }).then(() => { + fn(""); + }) + fn(""); + `, + ` + k6.group("my group", async () => { + fn("::my group", ""); + await fn("::my group", ""); + fn("::my group", ""); + k6.group("second", async() => { + fn("::my group::second", "my group", ""); + await fn("::my group::second", "my group", ""); + fn("::my group::second", "my group", ""); + }); + }).then(() => { + fn(""); + }) + fn(""); + `, + ` + k6.group("my group", async () => { + fn("::my group", ""); + await fn("::my group", ""); + fn("::my group", ""); + k6.group("second", async() => { + fn("::my group::second", "my group", ""); + }); + }).then(() => { + fn(""); + }) + `, + ` + k6.group("my group", async () => { + fn("::my group", ""); + await fn("::my group", ""); + await k6.group("second", async() => { + await fn("::my group::second", "my group", ""); + }); + }).then(() => { + fn(""); + }) + `, + } + for i, c := range cases { + c := c + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Parallel() + + runtime, _ := testSetup(t) + moveToVUCode(t, runtime) + rt := runtime.VU.Runtime() + state := runtime.VU.State() + root := state.Group + require.NoError(t, rt.Set("fn", func(expectedGroupTag string, expectedParentNames ...string) *goja.Promise { + p, res, _ := rt.NewPromise() + groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group") + require.True(t, ok) + require.Equal(t, expectedGroupTag, groupTag) + parentGroup := state.Group.Parent + for _, expectedParentName := range expectedParentNames { + require.NotNil(t, parentGroup) + require.Equal(t, expectedParentName, parentGroup.Name) + parentGroup = parentGroup.Parent + } + require.Nil(t, parentGroup) + res("") + return p + })) + err := runtime.EventLoop.Start(func() error { + _, err := rt.RunScript("main.js", c) + return err + }) + require.NoError(t, err) + runtime.EventLoop.WaitOnRegistered() + assert.Equal(t, state.Group, root) + groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group") + require.True(t, ok) + assert.Equal(t, groupTag, root.Name) + }) + } +} + +func TestAsyncGroupDuration(t *testing.T) { + t.Parallel() + + runtime, _ := testSetup(t) + samples := moveToVUCode(t, runtime) + rt := runtime.VU.Runtime() + require.NoError(t, rt.Set("delay", func(ms float64) *goja.Promise { + p, res, _ := rt.NewPromise() + fn := runtime.VU.RegisterCallback() + time.AfterFunc(time.Duration(ms*float64(time.Millisecond)), func() { + fn(func() error { + res("") + return nil + }) + }) + return p + })) + err := runtime.EventLoop.Start(func() error { + _, err := rt.RunScript("main.js", ` + k6.group("1", async () => { + await delay(100); + await k6.group("2", async () => { + await delay(100); + }) + await delay(100); + })`) + return err + }) + + require.NoError(t, err) + runtime.EventLoop.WaitOnRegistered() + bufSamples := metrics.GetBufferedSamples(samples) + require.Len(t, bufSamples, 2) + { + firstSample := bufSamples[0].GetSamples()[0] + require.Equal(t, metrics.GroupDurationName, firstSample.Metric.Name) + require.Equal(t, "::1::2", firstSample.Tags.Map()[metrics.TagGroup.String()]) + require.InDelta(t, 100, firstSample.Value, 10) + } + + { + secondSample := bufSamples[1].GetSamples()[0] + require.Equal(t, metrics.GroupDurationName, secondSample.Metric.Name) + require.Equal(t, "::1", secondSample.Tags.Map()[metrics.TagGroup.String()]) + require.InDelta(t, 300, secondSample.Value, 10) + } +} + +func TestAsyncGroupOrder(t *testing.T) { + t.Parallel() + cases := []struct { + name string + expected []string + script string + }{ + { + name: "basic", + expected: []string{"C", "A", "B"}, + script: ` + group("somename", async () => { + console.log("A"); + await 5; + console.log("B"); + }) + console.log("C")`, + }, + { + name: "basic + promise", + expected: []string{"C", "A", "D", "B"}, + script: ` + k6.group("somename", async () => { + log("A"); + await 5; + log("B"); + }) + log("C") + Promise.resolve("D").then((s) => {log(s)});`, + }, + } + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + runtime, _ := testSetup(t) + moveToVUCode(t, runtime) + rt := runtime.VU.Runtime() + var s []string + require.NoError(t, rt.Set("log", func(line string) { + s = append(s, line) + })) + err := runtime.EventLoop.Start(func() error { + _, err := rt.RunScript("main.js", c.script) + return err + }) + + require.NoError(t, err) + runtime.EventLoop.WaitOnRegistered() + require.Equal(t, c.expected, s) + }) + } +} diff --git a/js/modules/k6/k6.go b/js/modules/k6/k6.go index 57d525c3c871..816301dc70b7 100644 --- a/js/modules/k6/k6.go +++ b/js/modules/k6/k6.go @@ -3,6 +3,7 @@ package k6 import ( "errors" + "fmt" "math/rand" "sync/atomic" "time" @@ -29,7 +30,8 @@ type ( // K6 represents an instance of the k6 module. K6 struct { - vu modules.VU + vu modules.VU + groupInstance *groupInstance // TODO this basically is a second copy of lib.State.Group + time info } ) @@ -46,7 +48,42 @@ func New() *RootModule { // NewModuleInstance implements the modules.Module interface to return // a new instance for each VU. func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { - return &K6{vu: vu} + k := &K6{vu: vu} + // TODO this is not ideal, it should probably be done through some compositable + // k6 specific API that we can add multiple trackers with + vu.Runtime().SetAsyncContextTracker(k) + return k +} + +func (mi *K6) Suspended() interface{} { + if state := mi.vu.State(); state != nil { + setGroup(rootGroup(state.Group), state) + result := mi.groupInstance + mi.groupInstance = nil + return result + } + return nil +} + +func (mi *K6) Completed() { + if state := mi.vu.State(); state != nil { + mi.groupInstance.finalize(mi.vu.Context(), state) + setGroup(rootGroup(mi.groupInstance.Group), state) + mi.groupInstance = nil + } +} + +func (mi *K6) Resumed(i interface{}) { + if state := mi.vu.State(); state != nil && i != nil { + gi, ok := i.(*groupInstance) + if !ok { + panic(fmt.Sprintf( + "couldn't cast %+v to groupInstance on async resume", + i)) + } + setGroup(gi.Group, state) + mi.groupInstance = gi + } } // Exports returns the exports of the k6 module. @@ -85,16 +122,25 @@ func (mi *K6) RandomSeed(seed int64) { } // Group wraps a function call and executes it within the provided group name. -func (mi *K6) Group(name string, fn goja.Callable) (goja.Value, error) { +func (mi *K6) Group(name string, val goja.Value) (goja.Value, error) { state := mi.vu.State() if state == nil { return nil, ErrGroupInInitContext } - if fn == nil { + if val == nil || goja.IsNull(val) { return nil, errors.New("group() requires a callback as a second argument") } - + fn, ok := goja.AssertFunction(val) + if !ok { + return nil, errors.New("group() requires a callback as a second argument") + } + rt := mi.vu.Runtime() + o := val.ToObject(rt) + async := o.ClassName() == "AsyncFunction" + if async { + return mi.asyncGroup(name, fn) + } g, err := state.Group.Group(name) if err != nil { return goja.Undefined(), err @@ -138,6 +184,7 @@ func (mi *K6) Group(name string, fn goja.Callable) (goja.Value, error) { } // Check will emit check metrics for the provided checks. +// //nolint:cyclop func (mi *K6) Check(arg0, checks goja.Value, extras ...goja.Value) (bool, error) { state := mi.vu.State()