diff --git a/core/engine.go b/core/engine.go index df61f5a2380..765ab9df749 100644 --- a/core/engine.go +++ b/core/engine.go @@ -27,6 +27,7 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" @@ -67,6 +68,9 @@ type Engine struct { Metrics map[string]*stats.Metric MetricsLock sync.Mutex + // clock are an interface to the functions in the standard library time package. + clock clock.Clock + builtinMetrics *metrics.BuiltinMetrics Samples chan stats.SampleContainer @@ -95,6 +99,7 @@ func NewEngine( runtimeOptions: rtOpts, outputs: outputs, Metrics: make(map[string]*stats.Metric), + clock: clock.New(), Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64), stopChan: make(chan struct{}), logger: logger.WithField("component", "engine"), @@ -292,7 +297,7 @@ func (e *Engine) startBackgroundProcesses( go func() { defer processes.Done() defer e.logger.Debug("Engine: Thresholds terminated") - ticker := time.NewTicker(thresholdsRate) + ticker := e.clock.Ticker(thresholdsRate) defer ticker.Stop() for { @@ -331,7 +336,7 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu } }() - ticker := time.NewTicker(collectRate) + ticker := e.clock.Ticker(collectRate) defer ticker.Stop() e.logger.Debug("Metrics processing started...") @@ -403,7 +408,7 @@ func (e *Engine) IsStopped() bool { } func (e *Engine) runMetricsEmission(ctx context.Context) { - ticker := time.NewTicker(metricsRate) + ticker := e.clock.Ticker(metricsRate) for { select { case <-ticker.C: @@ -415,26 +420,26 @@ func (e *Engine) runMetricsEmission(ctx context.Context) { } func (e *Engine) emitMetrics() { - t := time.Now() + now := e.clock.Now() executionState := e.ExecutionScheduler.GetState() // TODO: optimize and move this, it shouldn't call processSamples() directly e.processSamples([]stats.SampleContainer{stats.ConnectedSamples{ Samples: []stats.Sample{ { - Time: t, + Time: now, Metric: e.builtinMetrics.VUs, Value: float64(executionState.GetCurrentlyActiveVUsCount()), Tags: e.Options.RunTags, }, { - Time: t, + Time: now, Metric: e.builtinMetrics.VUsMax, Value: float64(executionState.GetInitializedVUsCount()), Tags: e.Options.RunTags, }, }, Tags: e.Options.RunTags, - Time: t, + Time: now, }}) } diff --git a/core/engine_test.go b/core/engine_test.go index 7e148ba5d6a..76adc85af7e 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -101,84 +101,99 @@ func TestNewEngine(t *testing.T) { newTestEngine(t, nil, nil, nil, lib.Options{}) } -func TestEngineRun(t *testing.T) { +func TestEngineRunExitsWithContextTimeout(t *testing.T) { t.Parallel() logrus.SetLevel(logrus.DebugLevel) - t.Run("exits with context", func(t *testing.T) { - t.Parallel() - done := make(chan struct{}) - runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { - <-ctx.Done() - close(done) - return nil - }} - duration := 100 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), duration) - defer cancel() + // Arrange + done := make(chan struct{}) + runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { + <-ctx.Done() + close(done) + return nil + }} + contextDuration := 100 * time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), contextDuration) + defer cancel() - _, run, wait := newTestEngine(t, ctx, runner, nil, lib.Options{}) - defer wait() + engine, run, wait := newTestEngine(t, ctx, runner, nil, lib.Options{}) + defer wait() - startTime := time.Now() - assert.NoError(t, run()) - assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond) - <-done - }) - t.Run("exits with executor", func(t *testing.T) { - t.Parallel() - e, run, wait := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(10), - Iterations: null.IntFrom(100), - }) - defer wait() - assert.NoError(t, run()) - assert.Equal(t, uint64(100), e.ExecutionScheduler.GetState().GetFullIterationCount()) + // Act + runStartTime := engine.clock.Now() + runErr := run() + + // Assert + assert.NoError(t, runErr) + assert.WithinDuration(t, runStartTime.Add(contextDuration), engine.clock.Now(), 100*time.Millisecond) + <-done +} + +func TestEngineRunExitsWithExecutor(t *testing.T) { + t.Parallel() + logrus.SetLevel(logrus.DebugLevel) + + // Arrange + engine, run, wait := newTestEngine(t, nil, nil, nil, lib.Options{ + VUs: null.IntFrom(10), + Iterations: null.IntFrom(100), }) - // Make sure samples are discarded after context close (using "cutoff" timestamp in local.go) - t.Run("collects samples", func(t *testing.T) { - t.Parallel() - testMetric := stats.New("test_metric", stats.Trend) + defer wait() - signalChan := make(chan interface{}) + // Act + err := run() - runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { - stats.PushIfNotDone(ctx, out, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1}) - close(signalChan) - <-ctx.Done() - stats.PushIfNotDone(ctx, out, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1}) - return nil - }} + // Assert + assert.NoError(t, err) + assert.Equal(t, uint64(100), engine.ExecutionScheduler.GetState().GetFullIterationCount()) +} - mockOutput := mockoutput.New() - ctx, cancel := context.WithCancel(context.Background()) - _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), - Iterations: null.IntFrom(1), - }) +func TestEngineRunCollectsSamples(t *testing.T) { + t.Parallel() + logrus.SetLevel(logrus.DebugLevel) - errC := make(chan error) - go func() { errC <- run() }() - <-signalChan - cancel() - assert.NoError(t, <-errC) - wait() + // Arrange + testMetric := stats.New("test_metric", stats.Trend) + signalChan := make(chan interface{}) + runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { + stats.PushIfNotDone(ctx, out, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1}) + close(signalChan) + <-ctx.Done() + stats.PushIfNotDone(ctx, out, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1}) + return nil + }} - found := 0 - for _, s := range mockOutput.Samples { - if s.Metric != testMetric { - continue - } - found++ - assert.Equal(t, 1.0, s.Value, "wrong value") - } - assert.Equal(t, 1, found, "wrong number of samples") + mockOutput := mockoutput.New() + ctx, cancel := context.WithCancel(context.Background()) + _, run, wait := newTestEngine(t, ctx, runner, []output.Output{mockOutput}, lib.Options{ + VUs: null.IntFrom(1), + Iterations: null.IntFrom(1), }) + errChannel := make(chan error) + + // Act + go func() { errChannel <- run() }() + <-signalChan + cancel() + assert.NoError(t, <-errChannel) + wait() + + // Assert + found := 0 + for _, s := range mockOutput.Samples { + if s.Metric != testMetric { + continue + } + found++ + assert.Equal(t, 1.0, s.Value, "wrong value") + } + assert.Equal(t, 1, found, "wrong number of samples") } func TestEngineAtTime(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() _, run, wait := newTestEngine(t, ctx, nil, nil, lib.Options{ VUs: null.IntFrom(2), @@ -191,23 +206,31 @@ func TestEngineAtTime(t *testing.T) { func TestEngineStopped(t *testing.T) { t.Parallel() + + // Arrange ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() - e, run, wait := newTestEngine(t, ctx, nil, nil, lib.Options{ + engine, run, wait := newTestEngine(t, ctx, nil, nil, lib.Options{ VUs: null.IntFrom(1), Duration: types.NullDurationFrom(20 * time.Second), }) defer wait() - assert.NoError(t, run()) - assert.Equal(t, false, e.IsStopped(), "engine should be running") - e.Stop() - assert.Equal(t, true, e.IsStopped(), "engine should be stopped") - e.Stop() // test that a second stop doesn't panic + // Act + runErr := run() + initiallyStopped := engine.IsStopped() + engine.Stop() + engine.Stop() // Ensure that a second stop doesn't panic + + // Assert + assert.NoError(t, runErr) + assert.False(t, initiallyStopped, "engine should be running") + assert.True(t, engine.IsStopped(), "engine should be stopped") } func TestEngineOutput(t *testing.T) { t.Parallel() + testMetric := stats.New("test_metric", stats.Trend) runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { @@ -221,7 +244,8 @@ func TestEngineOutput(t *testing.T) { Iterations: null.IntFrom(1), }) - assert.NoError(t, run()) + err := run() + require.NoError(t, err) wait() cSamples := []stats.Sample{} @@ -241,76 +265,82 @@ func TestEngineOutput(t *testing.T) { } } -func TestEngine_processSamples(t *testing.T) { +func TestEngineProcessSamplesOfMetric(t *testing.T) { t.Parallel() + + // Arrange metric := stats.New("my_metric", stats.Gauge) + e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{}) + defer wait() - t.Run("metric", func(t *testing.T) { - t.Parallel() - e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{}) - defer wait() + // Act + e.processSamples( + []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}}, + ) - e.processSamples( - []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}}, - ) + // Assert + assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink) +} - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink) - }) - t.Run("submetric", func(t *testing.T) { - t.Parallel() - ths, err := stats.NewThresholds([]string{`1+1==2`}) - assert.NoError(t, err) +func TestEngineProcessSamplesOfSubmetric(t *testing.T) { + t.Parallel() - e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{ - Thresholds: map[string]stats.Thresholds{ - "my_metric{a:1}": ths, - }, - }) - defer wait() + // Arrange + metric := stats.New("my_metric", stats.Gauge) + thresholds, err := stats.NewThresholds([]string{`1+1==2`}) + require.NoError(t, err) - sms := e.submetrics["my_metric"] - assert.Len(t, sms, 1) - assert.Equal(t, "my_metric{a:1}", sms[0].Name) - assert.EqualValues(t, map[string]string{"a": "1"}, sms[0].Tags.CloneTags()) + engine, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{ + Thresholds: map[string]stats.Thresholds{ + "my_metric{a:1}": thresholds, + }, + }) + defer wait() - e.processSamples( - []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})}}, - ) + // Act + submetrics := engine.submetrics["my_metric"] + engine.processSamples( + []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})}}, + ) + + // Assert + assert.Len(t, submetrics, 1) + assert.Equal(t, "my_metric{a:1}", submetrics[0].Name) + assert.EqualValues(t, map[string]string{"a": "1"}, submetrics[0].Tags.CloneTags()) + assert.IsType(t, &stats.GaugeSink{}, engine.Metrics["my_metric"].Sink) + assert.IsType(t, &stats.GaugeSink{}, engine.Metrics["my_metric{a:1}"].Sink) - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink) - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric{a:1}"].Sink) - }) } func TestEngineThresholdsWillAbort(t *testing.T) { t.Parallel() - metric := stats.New("my_metric", stats.Gauge) - ths, err := stats.NewThresholds([]string{"1+1==3"}) - assert.NoError(t, err) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]stats.Thresholds{metric.Name: ths} - - e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{Thresholds: thresholds}) + // Arrange + metric := stats.New("my_metric", stats.Gauge) + thresholds, err := stats.NewThresholds([]string{"1+1==3"}) + require.NoError(t, err) + thresholds.Thresholds[0].AbortOnFail = true + engine, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{Thresholds: map[string]stats.Thresholds{metric.Name: thresholds}}) defer wait() - e.processSamples( + // Act + engine.processSamples( []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}}, ) - assert.True(t, e.processThresholds()) + shouldAbort := engine.processThresholds() + + // Assert + assert.True(t, shouldAbort) } func TestEngineAbortedByThresholds(t *testing.T) { t.Parallel() - metric := stats.New("my_metric", stats.Gauge) - - ths, err := stats.NewThresholds([]string{"1+1==3"}) - assert.NoError(t, err) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]stats.Thresholds{metric.Name: ths} + // Arrange + metric := stats.New("my_metric", stats.Gauge) + thresholds, err := stats.NewThresholds([]string{"1+1==3"}) + require.NoError(t, err) + thresholds.Thresholds[0].AbortOnFail = true done := make(chan struct{}) runner := &minirunner.MiniRunner{Fn: func(ctx context.Context, out chan<- stats.SampleContainer) error { out <- stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})} @@ -319,13 +349,15 @@ func TestEngineAbortedByThresholds(t *testing.T) { return nil }} - _, run, wait := newTestEngine(t, nil, runner, nil, lib.Options{Thresholds: thresholds}) + _, run, wait := newTestEngine(t, nil, runner, nil, lib.Options{Thresholds: map[string]stats.Thresholds{metric.Name: thresholds}}) defer wait() + // Act go func() { assert.NoError(t, run()) }() + // Assert select { case <-done: return @@ -549,6 +581,8 @@ func TestSentReceivedMetrics(t *testing.T) { func TestRunTags(t *testing.T) { t.Parallel() + + // Arrange tb := httpmultibin.NewHTTPMultiBin(t) runTagsMap := map[string]string{"foo": "bar", "test": "mest", "over": "written"} @@ -627,17 +661,6 @@ func TestRunTags(t *testing.T) { InsecureSkipTLSVerify: null.BoolFrom(true), }) - errC := make(chan error) - go func() { errC <- run() }() - - select { - case <-time.After(10 * time.Second): - t.Fatal("Test timed out") - case err := <-errC: - require.NoError(t, err) - } - wait() - systemMetrics := []string{ metrics.VUsName, metrics.VUsMaxName, metrics.IterationsName, metrics.IterationDurationName, metrics.GroupDurationName, metrics.DataSentName, metrics.DataReceivedName, @@ -652,6 +675,19 @@ func TestRunTags(t *testing.T) { return "the rainbow" } + // Act + errC := make(chan error) + go func() { errC <- run() }() + + // Assert + select { + case <-time.After(10 * time.Second): + t.Fatal("Test timed out") + case err := <-errC: + require.NoError(t, err) + } + wait() + for _, s := range mockOutput.Samples { for key, expVal := range runTagsMap { val, ok := s.Tags.Get(key) @@ -668,6 +704,8 @@ func TestRunTags(t *testing.T) { func TestSetupTeardownThresholds(t *testing.T) { t.Parallel() + + // Arrange tb := httpmultibin.NewHTTPMultiBin(t) script := []byte(tb.Replacer.Replace(` @@ -720,9 +758,11 @@ func TestSetupTeardownThresholds(t *testing.T) { }) defer wait() + // Act errC := make(chan error) go func() { errC <- run() }() + // Assert select { case <-time.After(10 * time.Second): t.Fatal("Test timed out") @@ -735,7 +775,8 @@ func TestSetupTeardownThresholds(t *testing.T) { func TestSetupException(t *testing.T) { t.Parallel() - script := []byte(` + // Arrange + srcScript := []byte(` import bar from "./bar.js"; export function setup() { bar(); @@ -744,20 +785,22 @@ func TestSetupException(t *testing.T) { }; `) - memfs := afero.NewMemMapFs() - require.NoError(t, afero.WriteFile(memfs, "/bar.js", []byte(` + fsScript := []byte(` export default function () { baz(); } function baz() { throw new Error("baz"); } - `), 0x666)) + `) + memfs := afero.NewMemMapFs() + require.NoError(t, afero.WriteFile(memfs, "/bar.js", fsScript, 0x666)) + registry := metrics.NewRegistry() builtinMetrics := metrics.RegisterBuiltinMetrics(registry) runner, err := js.New( testutils.NewLogger(t), - &loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: script}, + &loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: srcScript}, map[string]afero.Fs{"file": memfs}, lib.RuntimeOptions{}, builtinMetrics, @@ -773,9 +816,11 @@ func TestSetupException(t *testing.T) { }) defer wait() + // Act errC := make(chan error) go func() { errC <- run() }() + // Assert select { case <-time.After(10 * time.Second): t.Fatal("Test timed out") @@ -792,6 +837,7 @@ func TestSetupException(t *testing.T) { func TestVuInitException(t *testing.T) { t.Parallel() + // Arrange script := []byte(` export let options = { vus: 3, @@ -829,8 +875,11 @@ func TestVuInitException(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + // Act _, _, err = engine.Init(ctx, ctx) // no need for 2 different contexts + // Assert require.Error(t, err) var exception errext.Exception @@ -844,6 +893,8 @@ func TestVuInitException(t *testing.T) { func TestEmittedMetricsWhenScalingDown(t *testing.T) { t.Parallel() + + // Arrange tb := httpmultibin.NewHTTPMultiBin(t) script := []byte(tb.Replacer.Replace(` @@ -892,9 +943,11 @@ func TestEmittedMetricsWhenScalingDown(t *testing.T) { mockOutput := mockoutput.New() engine, run, wait := newTestEngine(t, nil, runner, []output.Output{mockOutput}, lib.Options{}) + // Act errC := make(chan error) go func() { errC <- run() }() + // Assert select { case <-time.After(12 * time.Second): t.Fatal("Test timed out") @@ -1107,8 +1160,9 @@ func TestMinIterationDurationInSetupTeardownStage(t *testing.T) { func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { t.Parallel() - testMetric := stats.New("teardown_metric", stats.Counter) + // Arrange + testMetric := stats.New("teardown_metric", stats.Counter) ctx, cancel := context.WithCancel(context.Background()) runner := &minirunner.MiniRunner{ @@ -1127,7 +1181,9 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) - assert.NoError(t, run()) + // Act + err := run() + require.NoError(t, err) wait() var count float64 @@ -1136,12 +1192,15 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { count += sample.Value } } + + // Assert assert.Equal(t, 1.0, count) } func TestActiveVUsCount(t *testing.T) { t.Parallel() + // Arrange script := []byte(` var sleep = require('k6').sleep; @@ -1214,9 +1273,11 @@ func TestActiveVUsCount(t *testing.T) { run, waitFn, err := engine.Init(ctx, ctx) // no need for 2 different contexts require.NoError(t, err) + // Act errC := make(chan error) go func() { errC <- run() }() + // Assert select { case <-time.After(15 * time.Second): t.Fatal("Test timed out") diff --git a/go.mod b/go.mod index cfa2620507d..120a3da316b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/PuerkitoBio/goquery v1.6.1 github.com/Soontao/goHttpDigestClient v0.0.0-20170320082612-6d28bb1415c5 github.com/andybalholm/brotli v1.0.3 + github.com/benbjohnson/clock v1.3.0 github.com/dop251/goja v0.0.0-20211115154819-26ebff68a7d5 github.com/fatih/color v1.12.0 github.com/gedex/inflector v0.0.0-20170307190818-16278e9db813 // indirect diff --git a/go.sum b/go.sum index d8a03478a66..8ee96395dbe 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= diff --git a/lib/executor/base_config.go b/lib/executor/base_config.go index 98c1c254191..186ad3c1d75 100644 --- a/lib/executor/base_config.go +++ b/lib/executor/base_config.go @@ -26,6 +26,7 @@ import ( "strings" "time" + "github.com/benbjohnson/clock" "gopkg.in/guregu/null.v3" "go.k6.io/k6/lib/consts" @@ -50,6 +51,13 @@ type BaseConfig struct { Exec null.String `json:"exec"` // function name, externally validated Tags map[string]string `json:"tags"` + // clock implements the Clock interface, and is a proxy for every + // request relative to time. The Clock interface exposes the standard + // time.Time methods, and allows us, for instance, to control the flow + // of time in tests by injecting a `clock.Mock`` implementation instance + // instead of the standard one. + clock clock.Clock + // TODO: future extensions like distribution, others? } @@ -59,6 +67,7 @@ func NewBaseConfig(name, configType string) BaseConfig { Name: name, Type: configType, GracefulStop: types.NewNullDuration(DefaultGracefulStopValue, false), + clock: clock.New(), } } diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index ce5c13f8cf8..7c192916b38 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -233,7 +233,12 @@ func (car ConstantArrivalRate) Run( activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts( + parentCtx, + car.config.clock, + duration, + gracefulStop, + ) vusPool := newActiveVUPool() defer func() { @@ -251,7 +256,7 @@ func (car ConstantArrivalRate) Run( progIters := fmt.Sprintf( pb.GetFixedLengthFloatFormat(arrivalRatePerSec, 0)+" iters/s", arrivalRatePerSec) progressFn := func() (float64, []string) { - spent := time.Since(startTime) + spent := car.config.clock.Since(startTime) currActiveVUs := atomic.LoadUint64(&activeVUsCount) progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", vusPool.Running(), currActiveVUs) @@ -324,7 +329,7 @@ func (car ConstantArrivalRate) Run( } start, offsets, _ := car.et.GetStripedOffsets() - timer := time.NewTimer(time.Hour * 24) + timer := car.config.clock.Timer(time.Hour * 24) // here the we need the not scaled one notScaledTickerPeriod := getTickerPeriod( big.NewRat( @@ -336,7 +341,7 @@ func (car ConstantArrivalRate) Run( shownWarning := false metricTags := car.getMetricTags(nil) for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] { - t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime) + t := notScaledTickerPeriod*time.Duration(gi) - car.config.clock.Since(startTime) timer.Reset(t) select { case <-timer.C: @@ -349,7 +354,7 @@ func (car ConstantArrivalRate) Run( stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: 1, Metric: droppedIterationMetric, - Tags: metricTags, Time: time.Now(), + Tags: metricTags, Time: car.config.clock.Now(), }) // We'll try to start allocating another VU in the background, diff --git a/lib/executor/constant_arrival_rate_test.go b/lib/executor/constant_arrival_rate_test.go index a7ce2ec2787..08347f2ee76 100644 --- a/lib/executor/constant_arrival_rate_test.go +++ b/lib/executor/constant_arrival_rate_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -58,7 +59,10 @@ func newExecutionSegmentSequenceFromString(str string) *lib.ExecutionSegmentSequ func getTestConstantArrivalRateConfig() *ConstantArrivalRateConfig { return &ConstantArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(1 * time.Second)}, + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(1 * time.Second), + clock: clock.NewMock(), + }, TimeUnit: types.NullDurationFrom(time.Second), Rate: null.IntFrom(50), Duration: types.NullDurationFrom(5 * time.Second), @@ -69,24 +73,29 @@ func getTestConstantArrivalRateConfig() *ConstantArrivalRateConfig { func TestConstantArrivalRateRunNotEnoughAllocatedVUsWarn(t *testing.T) { t.Parallel() - et, err := lib.NewExecutionTuple(nil, nil) + + // Arrange + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { time.Sleep(time.Second); return nil } ctx, cancel, executor, logHook := setupExecutor( - t, getTestConstantArrivalRateConfig(), es, - simpleRunner(func(ctx context.Context) error { - time.Sleep(time.Second) - return nil - }), + t, + getTestConstantArrivalRateConfig(), + executionState, + simpleRunner(vuFn), ) defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + // Act err = executor.Run(ctx, engineOut, builtinMetrics) - require.NoError(t, err) entries := logHook.Drain() + + // Assert + require.NoError(t, err) require.NotEmpty(t, entries) for _, entry := range entries { require.Equal(t, @@ -98,18 +107,25 @@ func TestConstantArrivalRateRunNotEnoughAllocatedVUsWarn(t *testing.T) { func TestConstantArrivalRateRunCorrectRate(t *testing.T) { t.Parallel() + + // Arrange var count int64 - et, err := lib.NewExecutionTuple(nil, nil) + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { atomic.AddInt64(&count, 1); return nil } ctx, cancel, executor, logHook := setupExecutor( - t, getTestConstantArrivalRateConfig(), es, - simpleRunner(func(ctx context.Context) error { - atomic.AddInt64(&count, 1) - return nil - }), + t, + getTestConstantArrivalRateConfig(), + executionState, + simpleRunner(vuFn), ) defer cancel() + + engineOut := make(chan stats.SampleContainer, 1000) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + // Act var wg sync.WaitGroup wg.Add(1) go func() { @@ -123,11 +139,10 @@ func TestConstantArrivalRateRunCorrectRate(t *testing.T) { require.InDelta(t, 50, currentCount, 1) } }() - engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) err = executor.Run(ctx, engineOut, builtinMetrics) wg.Wait() + + // Assert require.NoError(t, err) require.Empty(t, logHook.Drain()) } @@ -194,43 +209,47 @@ func TestConstantArrivalRateRunCorrectTiming(t *testing.T) { t.Run(fmt.Sprintf("segment %s sequence %s", test.segment, test.sequence), func(t *testing.T) { t.Parallel() + et, err := lib.NewExecutionTuple(test.segment, test.sequence) require.NoError(t, err) + es := lib.NewExecutionState(lib.Options{ ExecutionSegment: test.segment, ExecutionSegmentSequence: test.sequence, }, et, 10, 50) - var count int64 + seconds := 2 config := getTestConstantArrivalRateConfig() config.Duration.Duration = types.Duration(time.Second * time.Duration(seconds)) + newET, err := es.ExecutionTuple.GetNewExecutionTupleFromValue(config.MaxVUs.Int64) require.NoError(t, err) rateScaled := newET.ScaleInt64(config.Rate.Int64) + + var iterationsCount int64 startTime := time.Now() expectedTimeInt64 := int64(test.start) - ctx, cancel, executor, logHook := setupExecutor( - t, config, es, - simpleRunner(func(ctx context.Context) error { - current := atomic.AddInt64(&count, 1) - - expectedTime := test.start - if current != 1 { - expectedTime = time.Duration(atomic.AddInt64(&expectedTimeInt64, - int64(time.Millisecond)*test.steps[(current-2)%int64(len(test.steps))])) - } - assert.WithinDuration(t, - startTime.Add(expectedTime), - time.Now(), - time.Millisecond*12, - "%d expectedTime %s", current, expectedTime, - ) - - return nil - }), - ) + runnerFn := func(ctx context.Context) error { + currentIterationCount := atomic.AddInt64(&iterationsCount, 1) + + expectedStartTime := test.start + if currentIterationCount != 1 { + iterationDuration := int64(time.Millisecond) * test.steps[(currentIterationCount-2)%int64(len(test.steps))] + expectedStartTime = time.Duration(atomic.AddInt64(&expectedTimeInt64, iterationDuration)) + } + assert.WithinDuration(t, + startTime.Add(expectedStartTime), + time.Now(), + time.Millisecond*12, + "%d expectedTime %s", currentIterationCount, expectedStartTime, + ) + + return nil + } + ctx, cancel, executor, logHook := setupExecutor(t, config, es, simpleRunner(runnerFn)) defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -240,14 +259,16 @@ func TestConstantArrivalRateRunCorrectTiming(t *testing.T) { for i := 0; i < seconds; i++ { time.Sleep(time.Second) - currentCount = atomic.LoadInt64(&count) + currentCount = atomic.LoadInt64(&iterationsCount) assert.InDelta(t, int64(i+1)*rateScaled, currentCount, 3) } }() + startTime = time.Now() engineOut := make(chan stats.SampleContainer, 1000) err = executor.Run(ctx, engineOut, builtinMetrics) wg.Wait() + require.NoError(t, err) require.Empty(t, logHook.Drain()) }) @@ -267,22 +288,27 @@ func TestArrivalRateCancel(t *testing.T) { config := config t.Run(name, func(t *testing.T) { t.Parallel() + + // Arrange ch := make(chan struct{}) errCh := make(chan error, 1) weAreDoneCh := make(chan struct{}) - et, err := lib.NewExecutionTuple(nil, nil) + + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) - ctx, cancel, executor, logHook := setupExecutor( - t, config, es, simpleRunner(func(ctx context.Context) error { - select { - case <-ch: - <-ch - default: - } - return nil - })) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { + select { + case <-ch: + <-ch + default: + } + return nil + } + ctx, cancel, executor, logHook := setupExecutor(t, config, executionState, simpleRunner(vuFn)) defer cancel() + + // Act var wg sync.WaitGroup wg.Add(1) go func() { @@ -292,7 +318,6 @@ func TestArrivalRateCancel(t *testing.T) { errCh <- executor.Run(ctx, engineOut, builtinMetrics) close(weAreDoneCh) }() - time.Sleep(time.Second) ch <- struct{}{} cancel() @@ -305,6 +330,8 @@ func TestArrivalRateCancel(t *testing.T) { close(ch) <-weAreDoneCh wg.Wait() + + // Assert require.NoError(t, <-errCh) require.Empty(t, logHook.Drain()) }) @@ -313,12 +340,16 @@ func TestArrivalRateCancel(t *testing.T) { func TestConstantArrivalRateDroppedIterations(t *testing.T) { t.Parallel() - var count int64 + + // Arrange et, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) config := &ConstantArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(0 * time.Second), + clock: clock.NewMock(), + }, TimeUnit: types.NullDurationFrom(time.Second), Rate: null.IntFrom(10), Duration: types.NullDurationFrom(950 * time.Millisecond), @@ -326,22 +357,28 @@ func TestConstantArrivalRateDroppedIterations(t *testing.T) { MaxVUs: null.IntFrom(5), } + var count int64 + vuFn := func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + } es := lib.NewExecutionState(lib.Options{}, et, 10, 50) ctx, cancel, executor, logHook := setupExecutor( t, config, es, - simpleRunner(func(ctx context.Context) error { - atomic.AddInt64(&count, 1) - <-ctx.Done() - return nil - }), + simpleRunner(vuFn), ) defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + // Act err = executor.Run(ctx, engineOut, builtinMetrics) - require.NoError(t, err) logs := logHook.Drain() + + // Assert + require.NoError(t, err) require.Len(t, logs, 1) assert.Contains(t, logs[0].Message, "cannot initialize more") assert.Equal(t, int64(5), count) @@ -352,7 +389,10 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) { t.Parallel() config := &ConstantArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(100 * time.Millisecond)}, + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(100 * time.Millisecond), + clock: clock.NewMock(), + }, TimeUnit: types.NullDurationFrom(950 * time.Millisecond), Rate: null.IntFrom(20), Duration: types.NullDurationFrom(1 * time.Second), @@ -375,16 +415,18 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) { tc := tc t.Run(fmt.Sprintf("%s_%s", tc.seq, tc.seg), func(t *testing.T) { t.Parallel() + + // Arrange ess, err := lib.NewExecutionSegmentSequenceFromString(tc.seq) require.NoError(t, err) - seg, err := lib.NewExecutionSegmentFromString(tc.seg) + segment, err := lib.NewExecutionSegmentFromString(tc.seg) require.NoError(t, err) - et, err := lib.NewExecutionTuple(seg, &ess) + executionTuple, err := lib.NewExecutionTuple(segment, &ess) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 5, 5) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 5, 5) runner := &minirunner.MiniRunner{} - ctx, cancel, executor, _ := setupExecutor(t, config, es, runner) + ctx, cancel, executor, _ := setupExecutor(t, config, executionState, runner) defer cancel() gotIters := []uint64{} @@ -398,7 +440,11 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) { } engineOut := make(chan stats.SampleContainer, 100) + + // Act err = executor.Run(ctx, engineOut, builtinMetrics) + + // Assert require.NoError(t, err) assert.Equal(t, tc.expIters, gotIters) }) diff --git a/lib/executor/constant_vus.go b/lib/executor/constant_vus.go index b8311dda64f..bde1f051d31 100644 --- a/lib/executor/constant_vus.go +++ b/lib/executor/constant_vus.go @@ -150,7 +150,12 @@ func (clv ConstantVUs) Run( duration := clv.config.Duration.TimeDuration() gracefulStop := clv.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts( + parentCtx, + clv.config.clock, + duration, + gracefulStop, + ) defer cancel() // Make sure the log and the progress bar have accurate information @@ -159,7 +164,7 @@ func (clv ConstantVUs) Run( ).Debug("Starting executor run...") progressFn := func() (float64, []string) { - spent := time.Since(startTime) + spent := clv.config.clock.Since(startTime) right := []string{fmt.Sprintf("%d VUs", numVUs)} if spent > duration { right = append(right, duration.String()) diff --git a/lib/executor/constant_vus_test.go b/lib/executor/constant_vus_test.go index 11aceaff43c..09233b1f6f8 100644 --- a/lib/executor/constant_vus_test.go +++ b/lib/executor/constant_vus_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" @@ -36,35 +37,49 @@ import ( func getTestConstantVUsConfig() ConstantVUsConfig { return ConstantVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(100 * time.Millisecond)}, - VUs: null.IntFrom(10), - Duration: types.NullDurationFrom(1 * time.Second), + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(100 * time.Millisecond), + clock: clock.NewMock(), + }, + VUs: null.IntFrom(10), + Duration: types.NullDurationFrom(1 * time.Second), } } func TestConstantVUsRun(t *testing.T) { t.Parallel() + + // Arrange var result sync.Map - et, err := lib.NewExecutionTuple(nil, nil) + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) - ctx, cancel, executor, _ := setupExecutor( - t, getTestConstantVUsConfig(), es, - simpleRunner(func(ctx context.Context) error { - select { - case <-ctx.Done(): - return nil - default: - } - state := lib.GetState(ctx) - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - time.Sleep(210 * time.Millisecond) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { + select { + case <-ctx.Done(): return nil - }), + default: + } + state := lib.GetState(ctx) + currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currIter.(uint64)+1) + time.Sleep(210 * time.Millisecond) + return nil + } + + config := getTestConstantVUsConfig() + ctx, cancel, executor, _ := setupExecutor( + t, + config, + executionState, + simpleRunner(vuFn), ) defer cancel() + + // Act err = executor.Run(ctx, nil, nil) + + // Assert require.NoError(t, err) var totalIters uint64 diff --git a/lib/executor/helpers.go b/lib/executor/helpers.go index 86c514325ad..cdb2763fed8 100644 --- a/lib/executor/helpers.go +++ b/lib/executor/helpers.go @@ -27,6 +27,7 @@ import ( "math/big" "time" + "github.com/benbjohnson/clock" "github.com/sirupsen/logrus" "go.k6.io/k6/errext" @@ -140,17 +141,17 @@ func getIterationRunner( // - If the whole test is aborted, the parent context will be cancelled, so // that will also cancel these contexts, thus the "general abort" case is // handled transparently. -func getDurationContexts(parentCtx context.Context, regularDuration, gracefulStop time.Duration) ( +func getDurationContexts(parentCtx context.Context, clock clock.Clock, regularDuration, gracefulStop time.Duration) ( startTime time.Time, maxDurationCtx, regDurationCtx context.Context, maxDurationCancel func(), ) { - startTime = time.Now() + startTime = clock.Now() maxEndTime := startTime.Add(regularDuration + gracefulStop) - maxDurationCtx, maxDurationCancel = context.WithDeadline(parentCtx, maxEndTime) + maxDurationCtx, maxDurationCancel = clock.WithDeadline(parentCtx, maxEndTime) if gracefulStop == 0 { return startTime, maxDurationCtx, maxDurationCtx, maxDurationCancel } - regDurationCtx, _ = context.WithDeadline(maxDurationCtx, startTime.Add(regularDuration)) //nolint:govet + regDurationCtx, _ = clock.WithDeadline(maxDurationCtx, startTime.Add(regularDuration)) //nolint:govet return startTime, maxDurationCtx, regDurationCtx, maxDurationCancel } diff --git a/lib/executor/per_vu_iterations.go b/lib/executor/per_vu_iterations.go index 681cd009349..8e45f4f77cd 100644 --- a/lib/executor/per_vu_iterations.go +++ b/lib/executor/per_vu_iterations.go @@ -158,9 +158,12 @@ func (pvi PerVUIterations) Run( numVUs := pvi.config.GetVUs(pvi.executionState.ExecutionTuple) iterations := pvi.config.GetIterations() duration := pvi.config.MaxDuration.TimeDuration() - gracefulStop := pvi.config.GetGracefulStop() - - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts( + parentCtx, + pvi.config.clock, + duration, + pvi.config.GetGracefulStop(), + ) defer cancel() // Make sure the log and the progress bar have accurate information @@ -172,12 +175,12 @@ func (pvi PerVUIterations) Run( doneIters := new(uint64) vusFmt := pb.GetFixedLengthIntFormat(numVUs) - itersFmt := pb.GetFixedLengthIntFormat(int64(totalIters)) progressFn := func() (float64, []string) { - spent := time.Since(startTime) + spent := pvi.config.clock.Since(startTime) progVUs := fmt.Sprintf(vusFmt+" VUs", numVUs) currentDoneIters := atomic.LoadUint64(doneIters) - progIters := fmt.Sprintf(itersFmt+"/"+itersFmt+" iters, %d per VU", + iterationsFmt := pb.GetFixedLengthIntFormat(int64(totalIters)) + progIters := fmt.Sprintf(iterationsFmt+"/"+iterationsFmt+" iters, %d per VU", currentDoneIters, totalIters, iterations) right := []string{progVUs, duration.String(), progIters} if spent > duration { @@ -230,7 +233,7 @@ func (pvi PerVUIterations) Run( case <-regDurationDone: stats.PushIfNotDone(parentCtx, out, stats.Sample{ Value: float64(iterations - i), Metric: droppedIterationMetric, - Tags: pvi.getMetricTags(&vuID), Time: time.Now(), + Tags: pvi.getMetricTags(&vuID), Time: pvi.config.clock.Now(), }) return // don't make more iterations default: diff --git a/lib/executor/per_vu_iterations_test.go b/lib/executor/per_vu_iterations_test.go index 9d222b38a6d..2b73de5f615 100644 --- a/lib/executor/per_vu_iterations_test.go +++ b/lib/executor/per_vu_iterations_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" @@ -39,7 +40,10 @@ import ( func getTestPerVUIterationsConfig() PerVUIterationsConfig { return PerVUIterationsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(1 * time.Second)}, + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(1 * time.Second), + clock: clock.NewMock(), + }, VUs: null.IntFrom(10), Iterations: null.IntFrom(100), MaxDuration: types.NullDurationFrom(3 * time.Second), @@ -49,24 +53,36 @@ func getTestPerVUIterationsConfig() PerVUIterationsConfig { // Baseline test func TestPerVUIterationsRun(t *testing.T) { t.Parallel() + + // Arrange var result sync.Map - et, err := lib.NewExecutionTuple(nil, nil) + + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { + state := lib.GetState(ctx) + currentIteration, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currentIteration.(uint64)+1) + return nil + } + ctx, cancel, executor, _ := setupExecutor( - t, getTestPerVUIterationsConfig(), es, - simpleRunner(func(ctx context.Context) error { - state := lib.GetState(ctx) - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - return nil - }), + t, + getTestPerVUIterationsConfig(), + executionState, + simpleRunner(vuFn), ) defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + // Act err = executor.Run(ctx, engineOut, builtinMetrics) + + // Assert require.NoError(t, err) var totalIters uint64 @@ -83,77 +99,86 @@ func TestPerVUIterationsRun(t *testing.T) { // This is the reverse behavior of the SharedIterations executor. func TestPerVUIterationsRunVariableVU(t *testing.T) { t.Parallel() - var ( - result sync.Map - slowVUID = uint64(1) - ) - et, err := lib.NewExecutionTuple(nil, nil) + + // Arrange + var result sync.Map + var slowVUID uint64 = 1 + + executionTuple, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - es := lib.NewExecutionState(lib.Options{}, et, 10, 50) - ctx, cancel, executor, _ := setupExecutor( - t, getTestPerVUIterationsConfig(), es, - simpleRunner(func(ctx context.Context) error { - state := lib.GetState(ctx) - if state.VUID == slowVUID { - time.Sleep(200 * time.Millisecond) - } - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - return nil - }), - ) + executionState := lib.NewExecutionState(lib.Options{}, executionTuple, 10, 50) + vuFn := func(ctx context.Context) error { + state := lib.GetState(ctx) + if state.VUID == slowVUID { + time.Sleep(200 * time.Millisecond) + } + currentIteration, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currentIteration.(uint64)+1) + return nil + } + + ctx, cancel, executor, _ := setupExecutor(t, getTestPerVUIterationsConfig(), executionState, simpleRunner(vuFn)) defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - err = executor.Run(ctx, engineOut, builtinMetrics) - require.NoError(t, err) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + // Act + err = executor.Run(ctx, engineOut, builtinMetrics) val, ok := result.Load(slowVUID) + + // Assert + require.NoError(t, err) assert.True(t, ok) - var totalIters uint64 + var totalIterations uint64 result.Range(func(key, value interface{}) bool { vuIters := value.(uint64) if key != slowVUID { assert.Equal(t, uint64(100), vuIters) } - totalIters += vuIters + totalIterations += vuIters return true }) // The slow VU should complete 15 iterations given these timings, // while the rest should equally complete their assigned 100 iterations. assert.Equal(t, uint64(15), val) - assert.Equal(t, uint64(915), totalIters) + assert.Equal(t, uint64(915), totalIterations) } func TestPerVuIterationsEmitDroppedIterations(t *testing.T) { t.Parallel() + + // Arrange var count int64 + et, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) config := PerVUIterationsConfig{ + BaseConfig: BaseConfig{clock: clock.NewMock()}, VUs: null.IntFrom(5), Iterations: null.IntFrom(20), MaxDuration: types.NullDurationFrom(1 * time.Second), } es := lib.NewExecutionState(lib.Options{}, et, 10, 50) - ctx, cancel, executor, logHook := setupExecutor( - t, config, es, - simpleRunner(func(ctx context.Context) error { - atomic.AddInt64(&count, 1) - <-ctx.Done() - return nil - }), - ) + vuFn := func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + } + ctx, cancel, executor, logHook := setupExecutor(t, config, es, simpleRunner(vuFn)) defer cancel() + engineOut := make(chan stats.SampleContainer, 1000) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) + + // Act err = executor.Run(ctx, engineOut, builtinMetrics) + + // Assert require.NoError(t, err) assert.Empty(t, logHook.Drain()) assert.Equal(t, int64(5), count) diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index 7eaf7ef9338..2dcb4c686f4 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -345,7 +345,12 @@ func (varr RampingArrivalRate) Run( activeVUsWg := &sync.WaitGroup{} returnedVUs := make(chan struct{}) - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts( + parentCtx, + varr.config.clock, + duration, + gracefulStop, + ) vusPool := newActiveVUPool() diff --git a/lib/executor/ramping_arrival_rate_test.go b/lib/executor/ramping_arrival_rate_test.go index b21259d6f31..e69a1518bd0 100644 --- a/lib/executor/ramping_arrival_rate_test.go +++ b/lib/executor/ramping_arrival_rate_test.go @@ -29,6 +29,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,9 +44,12 @@ import ( func getTestRampingArrivalRateConfig() *RampingArrivalRateConfig { return &RampingArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(1 * time.Second)}, - TimeUnit: types.NullDurationFrom(time.Second), - StartRate: null.IntFrom(10), + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(1 * time.Second), + clock: clock.NewMock(), + }, + TimeUnit: types.NullDurationFrom(time.Second), + StartRate: null.IntFrom(10), Stages: []Stage{ { Duration: types.NullDurationFrom(time.Second * 1), diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 18a07b60306..d7126480a8c 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -517,7 +517,10 @@ func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) } startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts( - ctx, regularDuration, maxDuration-regularDuration, + ctx, + vlv.config.clock, + regularDuration, + maxDuration-regularDuration, ) defer cancel() diff --git a/lib/executor/shared_iterations.go b/lib/executor/shared_iterations.go index e31936f01b6..ee0d1779c46 100644 --- a/lib/executor/shared_iterations.go +++ b/lib/executor/shared_iterations.go @@ -192,7 +192,12 @@ func (si SharedIterations) Run( duration := si.config.MaxDuration.TimeDuration() gracefulStop := si.config.GetGracefulStop() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop) + startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts( + parentCtx, + si.config.clock, + duration, + gracefulStop, + ) defer cancel() // Make sure the log and the progress bar have accurate information diff --git a/vendor/github.com/benbjohnson/clock/LICENSE b/vendor/github.com/benbjohnson/clock/LICENSE new file mode 100644 index 00000000000..ce212cb1cee --- /dev/null +++ b/vendor/github.com/benbjohnson/clock/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Ben Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/benbjohnson/clock/README.md b/vendor/github.com/benbjohnson/clock/README.md new file mode 100644 index 00000000000..4f1f82fc6d7 --- /dev/null +++ b/vendor/github.com/benbjohnson/clock/README.md @@ -0,0 +1,105 @@ +clock +===== + +[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/mod/github.com/benbjohnson/clock) + +Clock is a small library for mocking time in Go. It provides an interface +around the standard library's [`time`][time] package so that the application +can use the realtime clock while tests can use the mock clock. + +The module is currently maintained by @djmitche. + +[time]: https://pkg.go.dev/github.com/benbjohnson/clock + +## Usage + +### Realtime Clock + +Your application can maintain a `Clock` variable that will allow realtime and +mock clocks to be interchangeable. For example, if you had an `Application` type: + +```go +import "github.com/benbjohnson/clock" + +type Application struct { + Clock clock.Clock +} +``` + +You could initialize it to use the realtime clock like this: + +```go +var app Application +app.Clock = clock.New() +... +``` + +Then all timers and time-related functionality should be performed from the +`Clock` variable. + + +### Mocking time + +In your tests, you will want to use a `Mock` clock: + +```go +import ( + "testing" + + "github.com/benbjohnson/clock" +) + +func TestApplication_DoSomething(t *testing.T) { + mock := clock.NewMock() + app := Application{Clock: mock} + ... +} +``` + +Now that you've initialized your application to use the mock clock, you can +adjust the time programmatically. The mock clock always starts from the Unix +epoch (midnight UTC on Jan 1, 1970). + + +### Controlling time + +The mock clock provides the same functions that the standard library's `time` +package provides. For example, to find the current time, you use the `Now()` +function: + +```go +mock := clock.NewMock() + +// Find the current time. +mock.Now().UTC() // 1970-01-01 00:00:00 +0000 UTC + +// Move the clock forward. +mock.Add(2 * time.Hour) + +// Check the time again. It's 2 hours later! +mock.Now().UTC() // 1970-01-01 02:00:00 +0000 UTC +``` + +Timers and Tickers are also controlled by this same mock clock. They will only +execute when the clock is moved forward: + +```go +mock := clock.NewMock() +count := 0 + +// Kick off a timer to increment every 1 mock second. +go func() { + ticker := mock.Ticker(1 * time.Second) + for { + <-ticker.C + count++ + } +}() +runtime.Gosched() + +// Move the clock forward 10 seconds. +mock.Add(10 * time.Second) + +// This prints 10. +fmt.Println(count) +``` diff --git a/vendor/github.com/benbjohnson/clock/clock.go b/vendor/github.com/benbjohnson/clock/clock.go new file mode 100644 index 00000000000..40555b3037b --- /dev/null +++ b/vendor/github.com/benbjohnson/clock/clock.go @@ -0,0 +1,378 @@ +package clock + +import ( + "context" + "sort" + "sync" + "time" +) + +// Re-export of time.Duration +type Duration = time.Duration + +// Clock represents an interface to the functions in the standard library time +// package. Two implementations are available in the clock package. The first +// is a real-time clock which simply wraps the time package's functions. The +// second is a mock clock which will only change when +// programmatically adjusted. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + Now() time.Time + Since(t time.Time) time.Duration + Until(t time.Time) time.Duration + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Ticker(d time.Duration) *Ticker + Timer(d time.Duration) *Timer + WithDeadline(parent context.Context, d time.Time) (context.Context, context.CancelFunc) + WithTimeout(parent context.Context, t time.Duration) (context.Context, context.CancelFunc) +} + +// New returns an instance of a real-time clock. +func New() Clock { + return &clock{} +} + +// clock implements a real-time clock by simply wrapping the time package functions. +type clock struct{} + +func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (c *clock) Now() time.Time { return time.Now() } + +func (c *clock) Since(t time.Time) time.Duration { return time.Since(t) } + +func (c *clock) Until(t time.Time) time.Duration { return time.Until(t) } + +func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } + +func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } + +func (c *clock) Ticker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{C: t.C, ticker: t} +} + +func (c *clock) Timer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{C: t.C, timer: t} +} + +func (c *clock) WithDeadline(parent context.Context, d time.Time) (context.Context, context.CancelFunc) { + return context.WithDeadline(parent, d) +} + +func (c *clock) WithTimeout(parent context.Context, t time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, t) +} + +// Mock represents a mock clock that only moves forward programmically. +// It can be preferable to a real-time clock when testing time-based functionality. +type Mock struct { + mu sync.Mutex + now time.Time // current time + timers clockTimers // tickers & timers +} + +// NewMock returns an instance of a mock clock. +// The current time of the mock clock on initialization is the Unix epoch. +func NewMock() *Mock { + return &Mock{now: time.Unix(0, 0)} +} + +// Add moves the current time of the mock clock forward by the specified duration. +// This should only be called from a single goroutine at a time. +func (m *Mock) Add(d time.Duration) { + // Calculate the final current time. + t := m.now.Add(d) + + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure that other goroutines get handled. + gosched() +} + +// Set sets the current time of the mock clock to a specific one. +// This should only be called from a single goroutine at a time. +func (m *Mock) Set(t time.Time) { + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure that other goroutines get handled. + gosched() +} + +// runNextTimer executes the next timer in chronological order and moves the +// current time to the timer's next tick time. The next time is not executed if +// its next time is after the max time. Returns true if a timer was executed. +func (m *Mock) runNextTimer(max time.Time) bool { + m.mu.Lock() + + // Sort timers by time. + sort.Sort(m.timers) + + // If we have no more timers then exit. + if len(m.timers) == 0 { + m.mu.Unlock() + return false + } + + // Retrieve next timer. Exit if next tick is after new time. + t := m.timers[0] + if t.Next().After(max) { + m.mu.Unlock() + return false + } + + // Move "now" forward and unlock clock. + m.now = t.Next() + m.mu.Unlock() + + // Execute timer. + t.Tick(m.now) + return true +} + +// After waits for the duration to elapse and then sends the current time on the returned channel. +func (m *Mock) After(d time.Duration) <-chan time.Time { + return m.Timer(d).C +} + +// AfterFunc waits for the duration to elapse and then executes a function. +// A Timer is returned that can be stopped. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + t := m.Timer(d) + t.C = nil + t.fn = f + return t +} + +// Now returns the current wall time on the mock clock. +func (m *Mock) Now() time.Time { + m.mu.Lock() + defer m.mu.Unlock() + return m.now +} + +// Since returns time since `t` using the mock clock's wall time. +func (m *Mock) Since(t time.Time) time.Duration { + return m.Now().Sub(t) +} + +// Until returns time until `t` using the mock clock's wall time. +func (m *Mock) Until(t time.Time) time.Duration { + return t.Sub(m.Now()) +} + +// Sleep pauses the goroutine for the given duration on the mock clock. +// The clock must be moved forward in a separate goroutine. +func (m *Mock) Sleep(d time.Duration) { + <-m.After(d) +} + +// Tick is a convenience function for Ticker(). +// It will return a ticker channel that cannot be stopped. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + return m.Ticker(d).C +} + +// Ticker creates a new instance of Ticker. +func (m *Mock) Ticker(d time.Duration) *Ticker { + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time, 1) + t := &Ticker{ + C: ch, + c: ch, + mock: m, + d: d, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTicker)(t)) + return t +} + +// Timer creates a new instance of Timer. +func (m *Mock) Timer(d time.Duration) *Timer { + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time, 1) + t := &Timer{ + C: ch, + c: ch, + mock: m, + next: m.now.Add(d), + stopped: false, + } + m.timers = append(m.timers, (*internalTimer)(t)) + return t +} + +func (m *Mock) removeClockTimer(t clockTimer) { + for i, timer := range m.timers { + if timer == t { + copy(m.timers[i:], m.timers[i+1:]) + m.timers[len(m.timers)-1] = nil + m.timers = m.timers[:len(m.timers)-1] + break + } + } + sort.Sort(m.timers) +} + +// clockTimer represents an object with an associated start time. +type clockTimer interface { + Next() time.Time + Tick(time.Time) +} + +// clockTimers represents a list of sortable timers. +type clockTimers []clockTimer + +func (a clockTimers) Len() int { return len(a) } +func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } + +// Timer represents a single event. +// The current time will be sent on C, unless the timer was created by AfterFunc. +type Timer struct { + C <-chan time.Time + c chan time.Time + timer *time.Timer // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + fn func() // AfterFunc function, if set + stopped bool // True if stopped, false if running +} + +// Stop turns off the ticker. +func (t *Timer) Stop() bool { + if t.timer != nil { + return t.timer.Stop() + } + + t.mock.mu.Lock() + registered := !t.stopped + t.mock.removeClockTimer((*internalTimer)(t)) + t.stopped = true + t.mock.mu.Unlock() + return registered +} + +// Reset changes the expiry time of the timer +func (t *Timer) Reset(d time.Duration) bool { + if t.timer != nil { + return t.timer.Reset(d) + } + + t.mock.mu.Lock() + t.next = t.mock.now.Add(d) + defer t.mock.mu.Unlock() + + registered := !t.stopped + if t.stopped { + t.mock.timers = append(t.mock.timers, (*internalTimer)(t)) + } + + t.stopped = false + return registered +} + +type internalTimer Timer + +func (t *internalTimer) Next() time.Time { return t.next } +func (t *internalTimer) Tick(now time.Time) { + // a gosched() after ticking, to allow any consequences of the + // tick to complete + defer gosched() + + t.mock.mu.Lock() + if t.fn != nil { + // defer function execution until the lock is released, and + defer t.fn() + } else { + t.c <- now + } + t.mock.removeClockTimer((*internalTimer)(t)) + t.stopped = true + t.mock.mu.Unlock() +} + +// Ticker holds a channel that receives "ticks" at regular intervals. +type Ticker struct { + C <-chan time.Time + c chan time.Time + ticker *time.Ticker // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + d time.Duration // time between ticks +} + +// Stop turns off the ticker. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + } else { + t.mock.mu.Lock() + t.mock.removeClockTimer((*internalTicker)(t)) + t.mock.mu.Unlock() + } +} + +// Reset resets the ticker to a new duration. +func (t *Ticker) Reset(dur time.Duration) { + if t.ticker != nil { + t.ticker.Reset(dur) + return + } + + t.mock.mu.Lock() + defer t.mock.mu.Unlock() + + t.d = dur + t.next = t.mock.now.Add(dur) +} + +type internalTicker Ticker + +func (t *internalTicker) Next() time.Time { return t.next } +func (t *internalTicker) Tick(now time.Time) { + select { + case t.c <- now: + default: + } + t.next = now.Add(t.d) + gosched() +} + +// Sleep momentarily so that other goroutines can process. +func gosched() { time.Sleep(1 * time.Millisecond) } + +var ( + // type checking + _ Clock = &Mock{} +) diff --git a/vendor/github.com/benbjohnson/clock/context.go b/vendor/github.com/benbjohnson/clock/context.go new file mode 100644 index 00000000000..eb67594f2c3 --- /dev/null +++ b/vendor/github.com/benbjohnson/clock/context.go @@ -0,0 +1,86 @@ +package clock + +import ( + "context" + "fmt" + "sync" + "time" +) + +func (m *Mock) WithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return m.WithDeadline(parent, m.Now().Add(timeout)) +} + +func (m *Mock) WithDeadline(parent context.Context, deadline time.Time) (context.Context, context.CancelFunc) { + if cur, ok := parent.Deadline(); ok && cur.Before(deadline) { + // The current deadline is already sooner than the new one. + return context.WithCancel(parent) + } + ctx := &timerCtx{clock: m, parent: parent, deadline: deadline, done: make(chan struct{})} + propagateCancel(parent, ctx) + dur := m.Until(deadline) + if dur <= 0 { + ctx.cancel(context.DeadlineExceeded) // deadline has already passed + return ctx, func() {} + } + ctx.Lock() + defer ctx.Unlock() + if ctx.err == nil { + ctx.timer = m.AfterFunc(dur, func() { + ctx.cancel(context.DeadlineExceeded) + }) + } + return ctx, func() { ctx.cancel(context.Canceled) } +} + +// propagateCancel arranges for child to be canceled when parent is. +func propagateCancel(parent context.Context, child *timerCtx) { + if parent.Done() == nil { + return // parent is never canceled + } + go func() { + select { + case <-parent.Done(): + child.cancel(parent.Err()) + case <-child.Done(): + } + }() +} + +type timerCtx struct { + sync.Mutex + + clock Clock + parent context.Context + deadline time.Time + done chan struct{} + + err error + timer *Timer +} + +func (c *timerCtx) cancel(err error) { + c.Lock() + defer c.Unlock() + if c.err != nil { + return // already canceled + } + c.err = err + close(c.done) + if c.timer != nil { + c.timer.Stop() + c.timer = nil + } +} + +func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { return c.deadline, true } + +func (c *timerCtx) Done() <-chan struct{} { return c.done } + +func (c *timerCtx) Err() error { return c.err } + +func (c *timerCtx) Value(key interface{}) interface{} { return c.parent.Value(key) } + +func (c *timerCtx) String() string { + return fmt.Sprintf("clock.WithDeadline(%s [%s])", c.deadline, c.deadline.Sub(c.clock.Now())) +} diff --git a/vendor/github.com/benbjohnson/clock/go.mod b/vendor/github.com/benbjohnson/clock/go.mod new file mode 100644 index 00000000000..758903a6830 --- /dev/null +++ b/vendor/github.com/benbjohnson/clock/go.mod @@ -0,0 +1,3 @@ +module github.com/benbjohnson/clock + +go 1.15 diff --git a/vendor/modules.txt b/vendor/modules.txt index ecc2a6390bb..7541397a819 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -15,6 +15,9 @@ github.com/Soontao/goHttpDigestClient github.com/andybalholm/brotli # github.com/andybalholm/cascadia v1.1.0 github.com/andybalholm/cascadia +# github.com/benbjohnson/clock v1.3.0 +## explicit +github.com/benbjohnson/clock # github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew/spew # github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91