Skip to content

Commit

Permalink
Remove the unneccessary execution.Scheduler.Init()
Browse files Browse the repository at this point in the history
There is no need to have 2 separate methods, Run() can start the VU initialization itself. As a bonus, this immediately makes the error handling around init errors much more in line with other error handling, allowing us to respect --linger and to try and execute handleSummary() if there were problems. Except the first init that is used to get the exported options, of course, that one is still special.
  • Loading branch information
na-- committed Dec 8, 2022
1 parent 195c2ab commit 38e9082
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 131 deletions.
36 changes: 18 additions & 18 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,11 +976,18 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) {
export default function () {};
// Should not be called, since error is in the init context
export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};}
`

testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger)
t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
}

func TestAbortedByScriptAbortInVUCode(t *testing.T) {
Expand All @@ -995,12 +1002,12 @@ func TestAbortedByScriptAbortInVUCode(t *testing.T) {

t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger)
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithLinger)
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
}

Expand All @@ -1017,12 +1024,12 @@ func TestAbortedByScriptAbortInSetup(t *testing.T) {

t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger)
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithLinger)
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
}

Expand All @@ -1039,18 +1046,16 @@ func TestAbortedByScriptAbortInTeardown(t *testing.T) {

t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger)
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, true, script, runTestWithLinger)
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
}

func testAbortedByScriptTestAbort(
t *testing.T, shouldHaveMetrics bool, script string, runTest func(*testing.T, *globalTestState),
) *globalTestState {
func testAbortedByScriptTestAbort(t *testing.T, script string, runTest func(*testing.T, *globalTestState)) {
ts := getSimpleCloudOutputTestState(
t, script, nil, cloudapi.RunStatusAbortedUser, cloudapi.ResultStatusPassed, exitcodes.ScriptAborted,
)
Expand All @@ -1061,13 +1066,8 @@ func testAbortedByScriptTestAbort(
assert.Contains(t, stdOut, "test aborted: foo")
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
if shouldHaveMetrics {
assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdOut, "bogus summary")
} else {
assert.NotContains(t, stdOut, "bogus summary")
}
return ts
assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdOut, "bogus summary")
}

func TestAbortedByScriptInitError(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions cmd/integration_tests/eventloop/eventloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context,
}
}()

require.NoError(t, execScheduler.Init(ctx, samples))

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }()

Expand Down
5 changes: 0 additions & 5 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o
// - The second returned lambda can be used to wait for that process to finish.
func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) {
e.logger.Debug("Initialization starting...")
// TODO: if we ever need metrics processing in the init context, we can move
// this below the other components... or even start them concurrently?
if err := e.ExecutionScheduler.Init(runCtx, e.Samples); err != nil {
return nil, nil, err
}

// TODO: move all of this in a separate struct? see main TODO above
processMetricsAfterRun := make(chan struct{})
Expand Down
51 changes: 0 additions & 51 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,57 +893,6 @@ func TestSetupException(t *testing.T) {
}
}

// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the
// test functionality was duplicated in cmd/integration_test.go
func TestVuInitException(t *testing.T) {
t.Parallel()

script := []byte(`
export let options = {
vus: 3,
iterations: 5,
};
export default function() {};
if (__VU == 2) {
throw new Error('oops in ' + __VU);
}
`)

piState := getTestPreInitState(t)
runner, err := js.New(
piState,
&loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: script},
nil,
)
require.NoError(t, err)

opts, err := executor.DeriveScenariosFromShortcuts(runner.GetOptions(), nil)
require.NoError(t, err)

testState := getTestRunState(t, piState, opts, runner)

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, _, err = engine.Init(ctx, ctx) // no need for 2 different contexts

require.Error(t, err)

var exception errext.Exception
require.ErrorAs(t, err, &exception)
assert.Equal(t, "Error: oops in 2\n\tat file:///script.js:10:9(29)\n\tat native\n", err.Error())

var errWithHint errext.HasHint
require.ErrorAs(t, err, &errWithHint)
assert.Equal(t, "error while initializing VU #2 (script exception)", errWithHint.Hint())
}

func TestEmittedMetricsWhenScalingDown(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
Expand Down
71 changes: 34 additions & 37 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"

Expand All @@ -26,10 +27,6 @@ type Scheduler struct {
maxDuration time.Duration // cached value derived from the execution plan
maxPossibleVUs uint64 // cached value derived from the execution plan
state *lib.ExecutionState

// TODO: remove these when we don't have separate Init() and Run() methods
// and can use a context + a WaitGroup (or something like that)
stopVUsEmission, vusEmissionStopped chan struct{}
}

// NewScheduler creates and returns a new Scheduler instance, without
Expand Down Expand Up @@ -84,9 +81,6 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
maxDuration: maxDuration,
maxPossibleVUs: maxPossibleVUs,
state: executionState,

stopVUsEmission: make(chan struct{}),
vusEmissionStopped: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -199,9 +193,11 @@ func (e *Scheduler) initVUsConcurrently(
return doneInits
}

func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) {
func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) func() {
e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...")
tags := e.state.Test.RunTags
wg := &sync.WaitGroup{}
wg.Add(1)

emitMetrics := func() {
t := time.Now()
Expand Down Expand Up @@ -234,7 +230,7 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam
defer func() {
ticker.Stop()
e.state.Test.Logger.Debug("Metrics emission of VUs and VUsMax metrics stopped")
close(e.vusEmissionStopped)
wg.Done()
}()

for {
Expand All @@ -243,23 +239,17 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam
emitMetrics()
case <-ctx.Done():
return
case <-e.stopVUsEmission:
return
}
}
}()

return wg.Wait
}

// Init concurrently initializes all of the planned VUs and then sequentially
// initializes all of the configured executors.
func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
e.emitVUsAndVUsMax(ctx, samplesOut)
defer func() {
if err != nil {
close(e.stopVUsEmission)
<-e.vusEmissionStopped
}
}()
// initVusAndExecutors concurrently initializes all of the planned VUs and then
// sequentially initializes all of the configured executors.
func (e *Scheduler) initVusAndExecutors(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
e.initProgress.Modify(pb.WithConstProgress(0, "Init VUs..."))

logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")
vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan)
Expand Down Expand Up @@ -386,15 +376,19 @@ func (e *Scheduler) runExecutor(
// out channel.
//
//nolint:funlen
func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error {
defer func() {
close(e.stopVUsEmission)
<-e.vusEmissionStopped
}()
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error {
execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
defer waitForVUsMetricPush()
defer execSchedRunCancel()

if err := e.initVusAndExecutors(execSchedRunCtx, samplesOut); err != nil {
return err
}

executorsCount := len(e.executors)
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")
e.initProgress.Modify(pb.WithConstLeft("Run"))
e.initProgress.Modify(pb.WithConstLeft("Run"), pb.WithConstProgress(0, "Starting test..."))
var interrupted bool
defer func() {
e.state.MarkEnded()
Expand All @@ -410,7 +404,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
select {
case <-e.state.ResumeNotify():
// continue
case <-runCtx.Done():
case <-execSchedRunCtx.Done():
return nil
}
}
Expand All @@ -422,15 +416,16 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr

runResults := make(chan error, executorsCount) // nil values are successful runs

runCtx = lib.WithExecutionState(runCtx, e.state)
runSubCtx, cancel := context.WithCancel(runCtx)
defer cancel() // just in case, and to shut up go vet...
// TODO: get rid of this context, pass the e.state directly to VUs when they
// are initialized by e.initVusAndExecutors(). This will also give access to
// its properties in their init context executions.
withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state)

// Run setup() before any executors, if it's not disabled
if !e.state.Test.Options.NoSetup.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusSetup)
e.initProgress.Modify(pb.WithConstProgress(1, "setup()"))
if err := e.state.Test.Runner.Setup(runSubCtx, engineOut); err != nil {
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
return err
}
Expand All @@ -441,8 +436,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
logger.Debug("Start all executors...")
e.state.SetExecutionStatus(lib.ExecutionStatusRunning)

executorsRunCtx, executorsRunCancel := context.WithCancel(withExecStateCtx)
defer executorsRunCancel()
for _, exec := range e.executors {
go e.runExecutor(runSubCtx, runResults, engineOut, exec)
go e.runExecutor(executorsRunCtx, runResults, samplesOut, exec)
}

// Wait for all executors to finish
Expand All @@ -452,7 +449,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
if err != nil && firstErr == nil {
logger.WithError(err).Debug("Executor returned with an error, cancelling test run...")
firstErr = err
cancel()
executorsRunCancel()
}
}

Expand All @@ -462,13 +459,13 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
e.initProgress.Modify(pb.WithConstProgress(1, "teardown()"))

// We run teardown() with the global context, so it isn't interrupted by
// aborts caused by thresholds or even Ctrl+C (unless used twice).
if err := e.state.Test.Runner.Teardown(globalCtx, engineOut); err != nil {
// thresholds or test.abort() or even Ctrl+C (unless used twice).
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
return err
}
}
if err := GetCancelReasonIfTestAborted(runSubCtx); err != nil {
if err := GetCancelReasonIfTestAborted(executorsRunCtx); err != nil {
interrupted = true
return err
}
Expand Down
Loading

0 comments on commit 38e9082

Please sign in to comment.