Skip to content

Commit

Permalink
k6: group special async function treatment
Browse files Browse the repository at this point in the history
A group provided with an async function will now:
1. Treat the whole time it take for the async function promise to finisher
   the duration of the group.
2. It will also use goja's AsyncContextTracker to make it so that the code
   using `await` within the group will still continue to be tagged with the
   group after `await` returns.
3. Instead of directly calling the async function it schedules it to be
   called the next time a promise job will work.

The current AsyncContextTracker is only used for this and as such is
directly changed in the `k6` module. In the future there likely will be
API so that multiple modules can use it simultaneously, but that seems
way too involved to be included in this change and also currently only
`group` needs this.

fixes #2848 #2728
  • Loading branch information
mstoykov committed Jan 19, 2023
1 parent 0b563d4 commit cba2e6f
Show file tree
Hide file tree
Showing 3 changed files with 384 additions and 6 deletions.
75 changes: 75 additions & 0 deletions js/modules/k6/asyncGroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package k6

import (
"context"
"time"

"github.com/dop251/goja"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
)

type groupInstance struct {
*lib.Group
startTime time.Time
}

func (g *groupInstance) start() {
g.startTime = time.Now()
}

func (g *groupInstance) finalize(ctx context.Context, state *lib.State) {
t := time.Now()
ctm := state.Tags.GetCurrentValues()
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: state.BuiltinMetrics.GroupDuration,
Tags: ctm.Tags,
},
Time: t,
Value: metrics.D(t.Sub(g.startTime)),
Metadata: ctm.Metadata,
})
}

func (mi *K6) asyncGroup(name string, fn goja.Callable) (goja.Value, error) {
rt := mi.vu.Runtime()
state := mi.vu.State()
p, res, _ := rt.NewPromise()
promiseObject := rt.ToValue(p).ToObject(rt)
baseGroup := state.Group
then, _ := goja.AssertFunction(promiseObject.Get("then"))
res(nil)
return then(promiseObject, rt.ToValue(func(result goja.Value) (goja.Value, error) {
g, err := baseGroup.Group(name)
if err != nil {
return goja.Undefined(), err
}

gi := &groupInstance{Group: g}
mi.groupInstance = gi
setGroup(g, state)
gi.start()
if err != nil {
return nil, err // actually return a promise ?!?
}
return fn(goja.Undefined())
}))
}

func setGroup(g *lib.Group, state *lib.State) {
state.Group = g

if state.Options.SystemTags.Has(metrics.TagGroup) {
state.Tags.Modify(func(tagsAndMeta *metrics.TagsAndMeta) {
tagsAndMeta.SetSystemTagOrMeta(metrics.TagGroup, g.Path)
})
}
}

func rootGroup(g *lib.Group) *lib.Group {
for g.Parent != nil {
g = g.Parent
}
return g
}
255 changes: 255 additions & 0 deletions js/modules/k6/asyncGroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package k6

import (
"fmt"
"testing"
"time"

"github.com/dop251/goja"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js/modulestest"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
)

//nolint:unparam // I am returning *K6 here as this will be used in other tests.
func testSetup(tb testing.TB) (*modulestest.Runtime, *K6) {
runtime := modulestest.NewRuntime(tb)
m, ok := New().NewModuleInstance(runtime.VU).(*K6)
require.True(tb, ok)
require.NoError(tb, runtime.VU.Runtime().Set("k6", m.Exports().Named))
return runtime, m
}

// TODO try to get this in modulesTest
func moveToVUCode(tb testing.TB, runtime *modulestest.Runtime) chan metrics.SampleContainer {
root, err := lib.NewGroup("", nil)
assert.NoError(tb, err)
samples := make(chan metrics.SampleContainer, 1000)
state := &lib.State{
Samples: samples,
Tags: lib.NewVUStateTags(runtime.VU.InitEnvField.Registry.RootTagSet()),
Options: lib.Options{
SystemTags: metrics.NewSystemTagSet(metrics.TagGroup),
},
BuiltinMetrics: runtime.BuiltinMetrics,
}
setGroup(root, state)
runtime.MoveToVUContext(state)
return samples
}

func TestAsyncGroup(t *testing.T) {
t.Parallel()

cases := []string{
`
k6.group("my group", async () => {
fn("::my group", "");
await fn("::my group", "");
fn("::my group", "");
Promise.resolve("").then( () => {
fn("")
})
}).then(() => {
fn("");
})
fn("");
`,
`
k6.group("my group", async () => {
fn("::my group", "");
await fn("::my group", "");
fn("::my group", "");
k6.group("second", async() => {
fn("::my group::second", "my group", "");
await fn("::my group::second", "my group", "");
fn("::my group::second", "my group", "");
await fn("::my group::second", "my group", "");
fn("::my group::second", "my group", "");
});
}).then(() => {
fn("");
})
fn("");
`,
`
k6.group("my group", async () => {
fn("::my group", "");
await fn("::my group", "");
fn("::my group", "");
k6.group("second", async() => {
fn("::my group::second", "my group", "");
await fn("::my group::second", "my group", "");
fn("::my group::second", "my group", "");
});
}).then(() => {
fn("");
})
fn("");
`,
`
k6.group("my group", async () => {
fn("::my group", "");
await fn("::my group", "");
fn("::my group", "");
k6.group("second", async() => {
fn("::my group::second", "my group", "");
});
}).then(() => {
fn("");
})
`,
`
k6.group("my group", async () => {
fn("::my group", "");
await fn("::my group", "");
await k6.group("second", async() => {
await fn("::my group::second", "my group", "");
});
}).then(() => {
fn("");
})
`,
}
for i, c := range cases {
c := c
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()

runtime, _ := testSetup(t)
moveToVUCode(t, runtime)
rt := runtime.VU.Runtime()
state := runtime.VU.State()
root := state.Group
require.NoError(t, rt.Set("fn", func(expectedGroupTag string, expectedParentNames ...string) *goja.Promise {
p, res, _ := rt.NewPromise()
groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group")
require.True(t, ok)
require.Equal(t, expectedGroupTag, groupTag)
parentGroup := state.Group.Parent
for _, expectedParentName := range expectedParentNames {
require.NotNil(t, parentGroup)
require.Equal(t, expectedParentName, parentGroup.Name)
parentGroup = parentGroup.Parent
}
require.Nil(t, parentGroup)
res("")
return p
}))
err := runtime.EventLoop.Start(func() error {
_, err := rt.RunScript("main.js", c)
return err
})
require.NoError(t, err)
runtime.EventLoop.WaitOnRegistered()
assert.Equal(t, state.Group, root)
groupTag, ok := state.Tags.GetCurrentValues().Tags.Get("group")
require.True(t, ok)
assert.Equal(t, groupTag, root.Name)
})
}
}

func TestAsyncGroupDuration(t *testing.T) {
t.Parallel()

runtime, _ := testSetup(t)
samples := moveToVUCode(t, runtime)
rt := runtime.VU.Runtime()
require.NoError(t, rt.Set("delay", func(ms float64) *goja.Promise {
p, res, _ := rt.NewPromise()
fn := runtime.VU.RegisterCallback()
time.AfterFunc(time.Duration(ms*float64(time.Millisecond)), func() {
fn(func() error {
res("")
return nil
})
})
return p
}))
err := runtime.EventLoop.Start(func() error {
_, err := rt.RunScript("main.js", `
k6.group("1", async () => {
await delay(100);
await k6.group("2", async () => {
await delay(100);
})
await delay(100);
})`)
return err
})

require.NoError(t, err)
runtime.EventLoop.WaitOnRegistered()
bufSamples := metrics.GetBufferedSamples(samples)
require.Len(t, bufSamples, 2)
{
firstSample := bufSamples[0].GetSamples()[0]
require.Equal(t, metrics.GroupDurationName, firstSample.Metric.Name)
require.Equal(t, "::1::2", firstSample.Tags.Map()[metrics.TagGroup.String()])
require.InDelta(t, 100, firstSample.Value, 10)
}

{
secondSample := bufSamples[1].GetSamples()[0]
require.Equal(t, metrics.GroupDurationName, secondSample.Metric.Name)
require.Equal(t, "::1", secondSample.Tags.Map()[metrics.TagGroup.String()])
require.InDelta(t, 300, secondSample.Value, 10)
}
}

func TestAsyncGroupOrder(t *testing.T) {
t.Parallel()
cases := []struct {
name string
expected []string
script string
}{
{
name: "basic",
expected: []string{"C", "A", "B"},
script: `
k6.group("somename", async () => {
log("A");
await 5;
log("B");
})
log("C")`,
},
{
name: "basic + promise",
expected: []string{"C", "A", "D", "B"},
script: `
k6.group("somename", async () => {
log("A");
await 5;
log("B");
})
log("C")
Promise.resolve("D").then((s) => {log(s)});`,
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
runtime, _ := testSetup(t)
moveToVUCode(t, runtime)
rt := runtime.VU.Runtime()
var s []string
require.NoError(t, rt.Set("log", func(line string) {
s = append(s, line)
}))
err := runtime.EventLoop.Start(func() error {
_, err := rt.RunScript("main.js", c.script)
return err
})

require.NoError(t, err)
runtime.EventLoop.WaitOnRegistered()
require.Equal(t, c.expected, s)
})
}
}
Loading

0 comments on commit cba2e6f

Please sign in to comment.