Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add abort event #3929

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js"
Expand All @@ -26,6 +27,14 @@ import (
"go.k6.io/k6/output"
)

type eventAbortEmitterMock struct{}

func (e *eventAbortEmitterMock) Emit(event *event.Event) (wait func(context.Context) error) {
return func(ctx context.Context) error {
return nil
}
}

func TestSetupData(t *testing.T) {
t.Parallel()
testCases := []struct {
Expand Down Expand Up @@ -147,9 +156,12 @@ func TestSetupData(t *testing.T) {
globalCtx, globalCancel := context.WithCancel(context.Background())
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))
ebe := &eventAbortEmitterMock{}
defer runAbort(globalCtx, ebe, fmt.Errorf("unexpected abort"))

outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, func(err error) {
runAbort(globalCtx, ebe, err)
})
samples := make(chan metrics.SampleContainer, 1000)
_, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions api/v1/status_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func handlePatchStatus(cs *ControlSurface, rw http.ResponseWriter, r *http.Reque
if status.Stopped { //nolint:nestif
execution.AbortTestRun( //nolint:contextcheck // false-positive cs.RunCtx a right way of passing context there
cs.RunCtx,
cs.RunState.Events,
errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(fmt.Errorf("test run stopped from REST API"), exitcodes.ScriptStoppedFromRESTAPI),
errext.AbortedByUser,
Expand Down
9 changes: 6 additions & 3 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ func TestPatchStatus(t *testing.T) {
globalCtx, globalCancel := context.WithCancel(context.Background())
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))
ebe := &eventAbortEmitterMock{}
defer runAbort(globalCtx, ebe, fmt.Errorf("unexpected abort"))

outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, func(err error) {
runAbort(globalCtx, ebe, err)
})
samples := make(chan metrics.SampleContainer, 1000)
waitMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
Expand All @@ -147,7 +150,7 @@ func TestPatchStatus(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
runAbort(fmt.Errorf("custom cancel signal"))
runAbort(globalCtx, ebe, fmt.Errorf("custom cancel signal"))
waitMetricsFlushed()
wg.Wait()
}()
Expand Down
5 changes: 3 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
logger.WithError(err).Error("Received error to stop from output")
}
// TODO: attach run status and exit code?
runAbort(err)
runAbort(globalCtx, c.gs.Events, err)
})
samples := make(chan metrics.SampleContainer, test.derivedConfig.MetricSamplesBufferSize.Int64)
waitOutputsFlushed, stopOutputs, err := outputManager.Start(samples)
Expand All @@ -240,6 +240,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {

if !testRunState.RuntimeOptions.NoThresholds.Bool {
finalizeThresholds := metricsEngine.StartThresholdCalculations(
globalCtx, c.gs.Events,
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,
)
handleFinalThresholdCalculation := func() {
Expand Down Expand Up @@ -349,7 +350,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
gracefulStop := func(sig os.Signal) {
logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...")
// first abort the test run this way, to propagate the error
runAbort(errext.WithAbortReasonIfNone(
runAbort(globalCtx, c.gs.Events, errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(
fmt.Errorf("test run was aborted because k6 received a '%s' signal", sig), exitcodes.ExternalAbort,
), errext.AbortedByUser,
Expand Down
1 change: 1 addition & 0 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,6 +2106,7 @@ func TestEventSystemError(t *testing.T) {
"got event TestStart with data '<nil>'",
"got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:test aborted: oops! at default (file:///-:11:16(5))}'",
"got event Abort with data '<nil>'",
"got event TestEnd with data '<nil>'",
"got event Exit with data '&{Error:test aborted: oops! at default (file:///-:11:16(5))}'",
"test aborted: oops! at default (file:///-:11:16(5))",
Expand Down
2 changes: 1 addition & 1 deletion event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *System) Emit(event *Event) (wait func(context.Context) error) {
s.logger.WithFields(logrus.Fields{
"subscribers": totalSubs,
"event": event.Type,
}).Trace("Emitted event")
}).Info("Emitted event")

return func(ctx context.Context) error {
var doneCount int
Expand Down
2 changes: 1 addition & 1 deletion event/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestEventSystem(t *testing.T) {
var (
doneMx sync.RWMutex
processed = make(map[Type]int)
emitEvents = []Type{Init, TestStart, IterStart, IterEnd, TestEnd, Exit}
emitEvents = []Type{Init, TestStart, IterStart, IterEnd, TestEnd, Exit, Abort}
data int
)
for _, et := range emitEvents {
Expand Down
4 changes: 3 additions & 1 deletion event/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ const (
IterEnd
// Exit is emitted when the k6 process is about to exit.
Exit
// Abort is a temporary event that will be emitted when a gracefulStop occurs.
Abort
)

//nolint:gochecknoglobals
var (
// GlobalEvents are emitted once per test run.
GlobalEvents = []Type{Init, TestStart, TestEnd, Exit}
GlobalEvents = []Type{Init, TestStart, TestEnd, Exit, Abort}
// VUEvents are emitted multiple times per each VU.
VUEvents = []Type{IterStart, IterEnd}
)
Expand Down
7 changes: 4 additions & 3 deletions event/type_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 30 additions & 4 deletions execution/abort.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package execution
import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/event"
)

// testAbortKey is the key used to store the abort function for the context of
Expand All @@ -13,6 +15,13 @@ import (
// details for why they cancelled it via the attached error.
type testAbortKey struct{}

// EventAbortEmitter is used to abstract the event.System abort method.
// It especially helps for tests where we can hide the implementation
// and use a mock instead.
type EventAbortEmitter interface {
Emit(event *event.Event) (wait func(context.Context) error)
}

type testAbortController struct {
cancel context.CancelFunc

Expand All @@ -21,7 +30,24 @@ type testAbortController struct {
reason error // see errext package, you can wrap errors to attach exit status, run status, etc.
}

func (tac *testAbortController) abort(err error) {
func (tac *testAbortController) abort(ctx context.Context, events EventAbortEmitter, err error) {
// This is a temporary abort signal. It should be removed once
// https:/grafana/xk6-browser/issues/1410 is complete.
waitDone := events.Emit(&event.Event{
Type: event.Abort,
})
// Unlike in run.go where the timeout is 30 minutes, it is being set to
// 5 seconds here. Since this is a temporary event, it doesn't need to
// abide by the same rules as the other events. This is only being used
// by the browser module, and we know it can process the abort event within
// 5 seconds, this is good enough in the short term.
waitCtx, waitCancel := context.WithTimeout(ctx, 5*time.Second)
defer waitCancel()
tac.logger.Infof("abort event fired due to '%s'", err)
if werr := waitDone(waitCtx); werr != nil {
tac.logger.WithError(werr).Warn()
}

tac.lock.Lock()
defer tac.lock.Unlock()
if tac.reason != nil {
Expand All @@ -48,7 +74,7 @@ func (tac *testAbortController) getReason() error {
// API test stopping both work.
func NewTestRunContext(
ctx context.Context, logger logrus.FieldLogger,
) (newCtx context.Context, abortTest func(reason error)) {
) (newCtx context.Context, abortTest func(ctx context.Context, events EventAbortEmitter, err error)) {
ctx, cancel := context.WithCancel(ctx)

controller := &testAbortController{
Expand All @@ -61,10 +87,10 @@ func NewTestRunContext(

// AbortTestRun will cancel the test run context with the given reason if the
// provided context is actually a TestRuncontext or a child of one.
func AbortTestRun(ctx context.Context, err error) bool {
func AbortTestRun(ctx context.Context, events EventAbortEmitter, err error) bool {
if x := ctx.Value(testAbortKey{}); x != nil {
if v, ok := x.(*testAbortController); ok {
v.abort(err)
v.abort(ctx, events, err)
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/go-sourcemap/sourcemap v2.1.4+incompatible
github.com/golang/protobuf v1.5.4
github.com/gorilla/websocket v1.5.1
github.com/grafana/xk6-browser v1.7.1
github.com/grafana/xk6-browser v1.7.2-0.20240830125845-1cc2738ebb94
github.com/grafana/xk6-dashboard v0.7.5
github.com/grafana/xk6-output-opentelemetry v0.1.1
github.com/grafana/xk6-output-prometheus-remote v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grafana/sobek v0.0.0-20240816075701-fd55381ddfc3 h1:mJ1DN6EDA5MlRtcspUjVTsfINUVJMd4Yw70RdFoKN8E=
github.com/grafana/sobek v0.0.0-20240816075701-fd55381ddfc3/go.mod h1:14YTHWUwjApKs5kzRn+4akwbvPMRsXEmjfozc5OmT0I=
github.com/grafana/xk6-browser v1.7.1 h1:RKCcoFyKT97iGgbnK76WwxcXnkB23ijlO1LghqjHQ0E=
github.com/grafana/xk6-browser v1.7.1/go.mod h1:sZO7cT7/XQf2mz+rXkp6poswhOCA0JKA8isj3fQrfaU=
github.com/grafana/xk6-browser v1.7.2-0.20240830125845-1cc2738ebb94 h1:ACgftcYIgQGYfIRJGNajUDqQIxpt2Irykn/JRYA8kZs=
github.com/grafana/xk6-browser v1.7.2-0.20240830125845-1cc2738ebb94/go.mod h1:l1vbyRsaAqIeDiq87Ne/bDDbVQ8ukmBuR0zvdUWG+bs=
github.com/grafana/xk6-dashboard v0.7.5 h1:TcILyffT/Ea/XD7xG1jMA5lwtusOPRbEQsQDHmO30Mk=
github.com/grafana/xk6-dashboard v0.7.5/go.mod h1:Y75F8xmgCraKT+pBzFH6me9AyH5PkXD+Bxo1dm6Il/M=
github.com/grafana/xk6-output-opentelemetry v0.1.1 h1:kLfzKkL9olhmMO+Kmr7ObhX3LknSAbUbzFaDG6mQVeg=
Expand Down
16 changes: 14 additions & 2 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/errext"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js/modules/k6"
Expand Down Expand Up @@ -282,6 +283,14 @@ func TestMetricName(t *testing.T) {
require.Error(t, err)
}

type eventAbortEmitterMock struct{}

func (e *eventAbortEmitterMock) Emit(event *event.Event) (wait func(context.Context) error) {
return func(ctx context.Context) error {
return nil
}
}

func TestDataIsolation(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -374,7 +383,10 @@ func TestDataIsolation(t *testing.T) {
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger)

mockOutput := mockoutput.New()
outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort)
ebe := &eventAbortEmitterMock{}
outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, func(err error) {
runAbort(globalCtx, ebe, err)
})
samples := make(chan metrics.SampleContainer, 1000)
waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
Expand All @@ -388,7 +400,7 @@ func TestDataIsolation(t *testing.T) {

select {
case <-time.After(20 * time.Second):
runAbort(fmt.Errorf("unexpected abort"))
runAbort(globalCtx, ebe, fmt.Errorf("unexpected abort"))
t.Fatal("Test timed out")
case err := <-errC:
stopEmission()
Expand Down
7 changes: 4 additions & 3 deletions lib/executor/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/sirupsen/logrus"

"go.k6.io/k6/errext"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
Expand Down Expand Up @@ -86,10 +87,10 @@ func validateStages(stages []Stage) []error {

// handleInterrupt returns true if err is InterruptError and if so it
// cancels the executor context passed with ctx.
func handleInterrupt(ctx context.Context, err error) bool {
func handleInterrupt(ctx context.Context, events *event.System, err error) bool {
if err != nil {
if errext.IsInterruptError(err) {
execution.AbortTestRun(ctx, err)
execution.AbortTestRun(ctx, events, err)
return true
}
}
Expand Down Expand Up @@ -118,7 +119,7 @@ func getIterationRunner(
return false
default:
if err != nil {
if handleInterrupt(ctx, err) {
if handleInterrupt(ctx, executionState.Test.Events, err) {
executionState.AddInterruptedIterations(1)
return false
}
Expand Down
7 changes: 5 additions & 2 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package engine

import (
"context"
"fmt"
"sort"
"strings"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"gopkg.in/guregu/null.v3"
Expand Down Expand Up @@ -159,8 +161,9 @@ func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLo
// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
ctx context.Context, events execution.EventAbortEmitter,
ingester *OutputIngester,
abortRun func(error),
abortRun func(context.Context, execution.EventAbortEmitter, error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string)) {
if len(me.metricsWithThresholds) == 0 {
Expand Down Expand Up @@ -188,7 +191,7 @@ func (me *MetricsEngine) StartThresholdCalculations(
err = errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThreshold,
)
abortRun(err)
abortRun(ctx, events, err)
}
case <-stop:
return
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ github.com/grafana/sobek/ftoa/internal/fast
github.com/grafana/sobek/parser
github.com/grafana/sobek/token
github.com/grafana/sobek/unistring
# github.com/grafana/xk6-browser v1.7.1
## explicit; go 1.20
# github.com/grafana/xk6-browser v1.7.2-0.20240830125845-1cc2738ebb94
## explicit; go 1.21
github.com/grafana/xk6-browser/browser
github.com/grafana/xk6-browser/chromium
github.com/grafana/xk6-browser/common
Expand Down