Skip to content

Commit

Permalink
Merge 307ae74 into f86938d
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- authored Jul 13, 2023
2 parents f86938d + 307ae74 commit df5f688
Show file tree
Hide file tree
Showing 18 changed files with 196 additions and 200 deletions.
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState)
me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
25 changes: 19 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,28 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
return err
}

executionState := execScheduler.GetState()
metricsEngine, err := engine.NewMetricsEngine(executionState.Test)
metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger)
if err != nil {
return err
}
if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool {

// We'll need to pipe metrics to the MetricsEngine and process them if any
// of these are enabled: thresholds, end-of-test summary
shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool ||
!testRunState.RuntimeOptions.NoThresholds.Bool)
var metricsIngester *engine.OutputIngester
if shouldProcessMetrics {
err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool)
if err != nil {
return err
}
// We'll need to pipe metrics to the MetricsEngine if either the
// thresholds or the end-of-test summary are enabled.
outputs = append(outputs, metricsEngine.CreateIngester())
metricsIngester = metricsEngine.CreateIngester()
outputs = append(outputs, metricsIngester)
}

executionState := execScheduler.GetState()
if !testRunState.RuntimeOptions.NoSummary.Bool {
defer func() {
logger.Debug("Generating the end-of-test summary...")
Expand Down Expand Up @@ -208,7 +219,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()

if !testRunState.RuntimeOptions.NoThresholds.Bool {
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration)
finalizeThresholds := metricsEngine.StartThresholdCalculations(
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,
)
handleFinalThresholdCalculation := func() {
// This gets called after the Samples channel has been closed and
// the OutputManager has flushed all of the cached samples to
Expand Down Expand Up @@ -283,7 +296,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

printExecutionDescription(
c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs,
c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs,
)

// Trap Interrupts, SIGINTs and SIGTERMs.
Expand Down
14 changes: 7 additions & 7 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (e *Scheduler) runExecutor(
// and emission of the `vus` and `vus_max` metrics.
func (e *Scheduler) Init(
runCtx context.Context, samplesOut chan<- metrics.SampleContainer,
) (stopVUEmission func(), err error) {
) (stopVUEmission func(), initErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
Expand All @@ -390,11 +390,11 @@ func (e *Scheduler) Init(

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, initErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
initErr = interruptErr
}
if err != nil {
if initErr != nil {
stopVUEmission()
}
}()
Expand All @@ -406,14 +406,14 @@ func (e *Scheduler) Init(
// out channel.
//
//nolint:funlen
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
runErr = interruptErr
}
}()

Expand Down
9 changes: 1 addition & 8 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"go.k6.io/k6/lib/testutils/mockoutput"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

Expand Down Expand Up @@ -387,23 +386,17 @@ func TestDataIsolation(t *testing.T) {
execScheduler, err := execution.NewScheduler(testRunState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testRunState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger)

mockOutput := mockoutput.New()
outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort)
outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration)
require.Nil(t, finalizeThresholds)

require.Empty(t, runner.defaultGroup.Groups)

stopEmission, err := execScheduler.Init(runCtx, samples)
Expand Down
2 changes: 1 addition & 1 deletion js/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func createTestMetrics(t *testing.T) (map[string]*metrics.Metric, *lib.Group) {
require.NoError(t, err)
checksMetric.Tainted = null.BoolFrom(false)
checksMetric.Thresholds = metrics.Thresholds{Thresholds: []*metrics.Threshold{{Source: "rate>70", LastFailed: false}}}
sink := &metrics.TrendSink{}
sink := metrics.NewTrendSink()

samples := []float64{10.0, 15.0, 20.0}
for _, s := range samples {
Expand Down
48 changes: 20 additions & 28 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
"gopkg.in/guregu/null.v3"
)

Expand All @@ -25,13 +24,11 @@ const thresholdsRate = 2 * time.Second
// aggregated metric sample values. They are used to generate the end-of-test
// summary and to evaluate the test thresholds.
type MetricsEngine struct {
logger logrus.FieldLogger
test *lib.TestRunState
outputIngester *outputIngester
registry *metrics.Registry
logger logrus.FieldLogger

// These can be both top-level metrics or sub-metrics
metricsWithThresholds []*metrics.Metric

metricsWithThresholds []*metrics.Metric
breachedThresholdsCount uint32

// TODO: completely refactor:
Expand All @@ -44,39 +41,31 @@ type MetricsEngine struct {
}

// NewMetricsEngine creates a new metrics Engine with the given parameters.
func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
registry: registry,
logger: logger.WithField("component", "metrics-engine"),
ObservedMetrics: make(map[string]*metrics.Metric),
}

if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) {
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
}
}

return me, nil
}

// CreateIngester returns a pseudo-Output that uses the given metric samples to
// update the engine's inner state.
func (me *MetricsEngine) CreateIngester() output.Output {
me.outputIngester = &outputIngester{
func (me *MetricsEngine) CreateIngester() *OutputIngester {
return &OutputIngester{
logger: me.logger.WithField("component", "metrics-engine-ingester"),
metricsEngine: me,
cardinality: newCardinalityControl(),
}
return me.outputIngester
}

func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) {
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := me.test.Registry.Get(nameParts[0])
metric := me.registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}
Expand Down Expand Up @@ -125,11 +114,14 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) {
}
}

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.test.Options.Thresholds {
// InitSubMetricsAndThresholds parses the thresholds from the test Options and
// initializes both the thresholds themselves, as well as any submetrics that
// were referenced in them.
func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error {
for metricName, thresholds := range options.Thresholds {
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.test.RuntimeOptions.NoThresholds.Bool {
if onlyLogErrors {
if err != nil {
me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
Expand All @@ -154,7 +146,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
if options.SystemTags.Has(metrics.TagExpectedResponse) {
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
Expand All @@ -167,10 +159,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
ingester *OutputIngester,
abortRun func(error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string),
) {
) (finalize func() (breached []string)) {
if len(me.metricsWithThresholds) == 0 {
return nil // no thresholds were defined
}
Expand Down Expand Up @@ -205,9 +197,9 @@ func (me *MetricsEngine) StartThresholdCalculations(
}()

return func() []string {
if me.outputIngester != nil {
if ingester != nil {
// Stop the ingester so we don't get any more metrics
err := me.outputIngester.Stop()
err := ingester.Stop()
if err != nil {
me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.")
}
Expand Down
40 changes: 13 additions & 27 deletions metrics/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ func TestNewMetricsEngineWithThresholds(t *testing.T) {
_, err = trs.Registry.NewMetric("metric2", metrics.Counter)
require.NoError(t, err)

me, err := NewMetricsEngine(trs)
me, err := NewMetricsEngine(trs.Registry, trs.Logger)
require.NoError(t, err)
require.NotNil(t, me)

require.NoError(t, me.InitSubMetricsAndThresholds(trs.Options, false))

assert.Len(t, me.metricsWithThresholds, 2)
}

Expand All @@ -57,7 +59,7 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
t.Parallel()

me := newTestMetricsEngine(t)
_, err := me.test.Registry.NewMetric("metric1", metrics.Counter)
_, err := me.registry.NewMetric("metric1", metrics.Counter)
require.NoError(t, err)

_, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition)
Expand All @@ -69,16 +71,8 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
func TestNewMetricsEngineNoThresholds(t *testing.T) {
t.Parallel()

trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
},
}

me, err := NewMetricsEngine(trs)
require.NoError(t, err)
me := newTestMetricsEngine(t)
require.NotNil(t, me)

assert.Empty(t, me.metricsWithThresholds)
}

Expand Down Expand Up @@ -113,9 +107,9 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) {
t.Parallel()
me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{tc.threshold})
Expand All @@ -138,9 +132,9 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {

me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{"count>5"})
Expand All @@ -159,18 +153,10 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {
assert.Empty(t, breached)
}

func newTestMetricsEngine(t *testing.T) MetricsEngine {
trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
Registry: metrics.NewRegistry(),
},
}

return MetricsEngine{
logger: trs.Logger,
test: trs,
}
func newTestMetricsEngine(t *testing.T) *MetricsEngine {
m, err := NewMetricsEngine(metrics.NewRegistry(), testutils.NewLogger(t))
require.NoError(t, err)
return m
}

func zeroTestRunDuration() time.Duration {
Expand Down
Loading

0 comments on commit df5f688

Please sign in to comment.