-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
k6: group special async function treatment
A group provided with an async function will now: 1. Treat the whole time it take for the async function promise to finisher the duration of the group. 2. It will also use goja's AsyncContextTracker to make it so that the code using `await` within the group will still continue to be tagged with the group after `await` returns. 3. Instead of directly calling the async function it schedules it to be called the next time a promise job will work. The current AsyncContextTracker is only used for this and as such is directly changed in the `k6` module. In the future there likely will be API so that multiple modules can use it simultaneously, but that seems way too involved to be included in this change and also currently only `group` needs this. fixes #2848 #2728
- Loading branch information
Showing
3 changed files
with
383 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package k6 | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/dop251/goja" | ||
"go.k6.io/k6/lib" | ||
"go.k6.io/k6/metrics" | ||
) | ||
|
||
type groupInstance struct { | ||
*lib.Group | ||
startTime time.Time | ||
} | ||
|
||
func (g *groupInstance) start() { | ||
g.startTime = time.Now() | ||
} | ||
|
||
func (g *groupInstance) finalize(ctx context.Context, state *lib.State) { | ||
t := time.Now() | ||
ctm := state.Tags.GetCurrentValues() | ||
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{ | ||
TimeSeries: metrics.TimeSeries{ | ||
Metric: state.BuiltinMetrics.GroupDuration, | ||
Tags: ctm.Tags, | ||
}, | ||
Time: t, | ||
Value: metrics.D(t.Sub(g.startTime)), | ||
Metadata: ctm.Metadata, | ||
}) | ||
} | ||
|
||
func (mi *K6) asyncGroup(name string, fn goja.Callable) (goja.Value, error) { | ||
rt := mi.vu.Runtime() | ||
state := mi.vu.State() | ||
p, res, _ := rt.NewPromise() | ||
promiseObject := rt.ToValue(p).ToObject(rt) | ||
baseGroup := state.Group | ||
then, _ := goja.AssertFunction(promiseObject.Get("then")) | ||
res(nil) | ||
return then(promiseObject, rt.ToValue(func(result goja.Value) (goja.Value, error) { | ||
g, err := baseGroup.Group(name) | ||
if err != nil { | ||
return goja.Undefined(), err | ||
} | ||
|
||
gi := &groupInstance{Group: g} | ||
mi.groupInstance = gi | ||
setGroup(g, state) | ||
gi.start() | ||
if err != nil { | ||
return nil, err // actually return a promise ?!? | ||
} | ||
return fn(goja.Undefined()) | ||
})) | ||
} | ||
|
||
func setGroup(g *lib.Group, state *lib.State) *lib.Group { | ||
old := state.Group | ||
state.Group = g | ||
|
||
if state.Options.SystemTags.Has(metrics.TagGroup) { | ||
state.Tags.Modify(func(tagsAndMeta *metrics.TagsAndMeta) { | ||
tagsAndMeta.SetSystemTagOrMeta(metrics.TagGroup, g.Path) | ||
}) | ||
} | ||
|
||
return old | ||
} | ||
|
||
func rootGroup(g *lib.Group) *lib.Group { | ||
for g.Parent != nil { | ||
g = g.Parent | ||
} | ||
return g | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
package k6 | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/dop251/goja" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.k6.io/k6/js/modulestest" | ||
"go.k6.io/k6/lib" | ||
"go.k6.io/k6/metrics" | ||
) | ||
|
||
func testSetup(tb testing.TB) (*modulestest.Runtime, *K6) { | ||
runtime := modulestest.NewRuntime(tb) | ||
m, ok := New().NewModuleInstance(runtime.VU).(*K6) | ||
require.True(tb, ok) | ||
require.NoError(tb, runtime.VU.Runtime().Set("k6", m.Exports().Named)) | ||
return runtime, m | ||
} | ||
|
||
// TODO try to get this in modulesTest | ||
func moveToVUCode(tb testing.TB, runtime *modulestest.Runtime) chan metrics.SampleContainer { | ||
root, err := lib.NewGroup("", nil) | ||
assert.NoError(tb, err) | ||
samples := make(chan metrics.SampleContainer, 1000) | ||
state := &lib.State{ | ||
Samples: samples, | ||
Tags: lib.NewVUStateTags(runtime.VU.InitEnvField.Registry.RootTagSet()), | ||
Options: lib.Options{ | ||
SystemTags: metrics.NewSystemTagSet(metrics.TagGroup), | ||
}, | ||
BuiltinMetrics: runtime.BuiltinMetrics, | ||
} | ||
setGroup(root, state) | ||
runtime.MoveToVUContext(state) | ||
return samples | ||
} | ||
|
||
func TestAsyncGroup(t *testing.T) { | ||
t.Parallel() | ||
|
||
cases := []string{ | ||
` | ||
k6.group("my group", async () => { | ||
fn("::my group", ""); | ||
await fn("::my group", ""); | ||
fn("::my group", ""); | ||
Promise.resolve("").then( () => { | ||
fn("") | ||
}) | ||
}).then(() => { | ||
fn(""); | ||
}) | ||
fn(""); | ||
`, | ||
` | ||
k6.group("my group", async () => { | ||
fn("::my group", ""); | ||
await fn("::my group", ""); | ||
fn("::my group", ""); | ||
k6.group("second", async() => { | ||
fn("::my group::second", "my group", ""); | ||
await fn("::my group::second", "my group", ""); | ||
fn("::my group::second", "my group", ""); | ||
await fn("::my group::second", "my group", ""); | ||
fn("::my group::second", "my group", ""); | ||
}); | ||
}).then(() => { | ||
fn(""); | ||
}) | ||
fn(""); | ||
`, | ||
` | ||
k6.group("my group", async () => { | ||
fn("::my group", ""); | ||
await fn("::my group", ""); | ||
fn("::my group", ""); | ||
k6.group("second", async() => { | ||
fn("::my group::second", "my group", ""); | ||
await fn("::my group::second", "my group", ""); | ||
fn("::my group::second", "my group", ""); | ||
}); | ||
}).then(() => { | ||
fn(""); | ||
}) | ||
fn(""); | ||
`, | ||
` | ||
k6.group("my group", async () => { | ||
fn("::my group", ""); | ||
await fn("::my group", ""); | ||
fn("::my group", ""); | ||
k6.group("second", async() => { | ||
fn("::my group::second", "my group", ""); | ||
}); | ||
}).then(() => { | ||
fn(""); | ||
}) | ||
`, | ||
` | ||
k6.group("my group", async () => { | ||
fn("::my group", ""); | ||
await fn("::my group", ""); | ||
await k6.group("second", async() => { | ||
await fn("::my group::second", "my group", ""); | ||
}); | ||
}).then(() => { | ||
fn(""); | ||
}) | ||
`, | ||
} | ||
for i, c := range cases { | ||
c := c | ||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { | ||
t.Parallel() | ||
|
||
runtime, _ := testSetup(t) | ||
moveToVUCode(t, runtime) | ||
rt := runtime.VU.Runtime() | ||
state := runtime.VU.State() | ||
root := state.Group | ||
require.NoError(t, rt.Set("fn", func(expectedGroupTag string, expectedParentNames ...string) *goja.Promise { | ||
p, res, _ := rt.NewPromise() | ||
groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group") | ||
require.True(t, ok) | ||
require.Equal(t, expectedGroupTag, groupTag) | ||
parentGroup := state.Group.Parent | ||
for _, expectedParentName := range expectedParentNames { | ||
require.NotNil(t, parentGroup) | ||
require.Equal(t, expectedParentName, parentGroup.Name) | ||
parentGroup = parentGroup.Parent | ||
} | ||
require.Nil(t, parentGroup) | ||
res("") | ||
return p | ||
})) | ||
err := runtime.EventLoop.Start(func() error { | ||
_, err := rt.RunScript("main.js", c) | ||
return err | ||
}) | ||
require.NoError(t, err) | ||
runtime.EventLoop.WaitOnRegistered() | ||
assert.Equal(t, state.Group, root) | ||
groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group") | ||
require.True(t, ok) | ||
assert.Equal(t, groupTag, root.Name) | ||
}) | ||
} | ||
} | ||
|
||
func TestAsyncGroupDuration(t *testing.T) { | ||
t.Parallel() | ||
|
||
runtime, _ := testSetup(t) | ||
samples := moveToVUCode(t, runtime) | ||
rt := runtime.VU.Runtime() | ||
require.NoError(t, rt.Set("delay", func(ms float64) *goja.Promise { | ||
p, res, _ := rt.NewPromise() | ||
fn := runtime.VU.RegisterCallback() | ||
time.AfterFunc(time.Duration(ms*float64(time.Millisecond)), func() { | ||
fn(func() error { | ||
res("") | ||
return nil | ||
}) | ||
}) | ||
return p | ||
})) | ||
err := runtime.EventLoop.Start(func() error { | ||
_, err := rt.RunScript("main.js", ` | ||
k6.group("1", async () => { | ||
await delay(100); | ||
await k6.group("2", async () => { | ||
await delay(100); | ||
}) | ||
await delay(100); | ||
})`) | ||
return err | ||
}) | ||
|
||
require.NoError(t, err) | ||
runtime.EventLoop.WaitOnRegistered() | ||
bufSamples := metrics.GetBufferedSamples(samples) | ||
require.Len(t, bufSamples, 2) | ||
{ | ||
firstSample := bufSamples[0].GetSamples()[0] | ||
require.Equal(t, metrics.GroupDurationName, firstSample.Metric.Name) | ||
require.Equal(t, "::1::2", firstSample.Tags.Map()[metrics.TagGroup.String()]) | ||
require.InDelta(t, 100, firstSample.Value, 10) | ||
} | ||
|
||
{ | ||
secondSample := bufSamples[1].GetSamples()[0] | ||
require.Equal(t, metrics.GroupDurationName, secondSample.Metric.Name) | ||
require.Equal(t, "::1", secondSample.Tags.Map()[metrics.TagGroup.String()]) | ||
require.InDelta(t, 300, secondSample.Value, 10) | ||
} | ||
} | ||
|
||
func TestAsyncGroupOrder(t *testing.T) { | ||
t.Parallel() | ||
cases := []struct { | ||
name string | ||
expected []string | ||
script string | ||
}{ | ||
{ | ||
name: "basic", | ||
expected: []string{"C", "A", "B"}, | ||
script: ` | ||
group("somename", async () => { | ||
console.log("A"); | ||
await 5; | ||
console.log("B"); | ||
}) | ||
console.log("C")`, | ||
}, | ||
{ | ||
name: "basic + promise", | ||
expected: []string{"C", "A", "D", "B"}, | ||
script: ` | ||
k6.group("somename", async () => { | ||
log("A"); | ||
await 5; | ||
log("B"); | ||
}) | ||
log("C") | ||
Promise.resolve("D").then((s) => {log(s)});`, | ||
}, | ||
} | ||
for _, c := range cases { | ||
c := c | ||
t.Run(c.name, func(t *testing.T) { | ||
runtime, _ := testSetup(t) | ||
moveToVUCode(t, runtime) | ||
rt := runtime.VU.Runtime() | ||
var s []string | ||
require.NoError(t, rt.Set("log", func(line string) { | ||
s = append(s, line) | ||
})) | ||
err := runtime.EventLoop.Start(func() error { | ||
_, err := rt.RunScript("main.js", c.script) | ||
return err | ||
}) | ||
|
||
require.NoError(t, err) | ||
runtime.EventLoop.WaitOnRegistered() | ||
require.Equal(t, c.expected, s) | ||
}) | ||
} | ||
} |
Oops, something went wrong.