diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 1a8f5eed8df9..9893d87bfe48 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -44,7 +44,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa execScheduler, err := execution.NewScheduler(testState) require.NoError(tb, err) - me, err := engine.NewMetricsEngine(testState) + me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(tb, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 790b8e14647d..65917e0a8ab7 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 89365b23d489..f74f0ed5a201 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/cmd/run.go b/cmd/run.go index cd69ed043a75..2c640513c179 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -147,17 +147,28 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { return err } - executionState := execScheduler.GetState() - metricsEngine, err := engine.NewMetricsEngine(executionState.Test) + metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger) if err != nil { return err } - if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool { + + // We'll need to pipe metrics to the MetricsEngine and process them if any + // of these are enabled: thresholds, end-of-test summary + shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool || + !testRunState.RuntimeOptions.NoThresholds.Bool) + var metricsIngester *engine.OutputIngester + if shouldProcessMetrics { + err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool) + if err != nil { + return err + } // We'll need to pipe metrics to the MetricsEngine if either the // thresholds or the end-of-test summary are enabled. - outputs = append(outputs, metricsEngine.CreateIngester()) + metricsIngester = metricsEngine.CreateIngester() + outputs = append(outputs, metricsIngester) } + executionState := execScheduler.GetState() if !testRunState.RuntimeOptions.NoSummary.Bool { defer func() { logger.Debug("Generating the end-of-test summary...") @@ -208,7 +219,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() if !testRunState.RuntimeOptions.NoThresholds.Bool { - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration) + finalizeThresholds := metricsEngine.StartThresholdCalculations( + metricsIngester, runAbort, executionState.GetCurrentTestRunDuration, + ) handleFinalThresholdCalculation := func() { // This gets called after the Samples channel has been closed and // the OutputManager has flushed all of the cached samples to @@ -283,7 +296,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } printExecutionDescription( - c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, + c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs, ) // Trap Interrupts, SIGINTs and SIGTERMs. diff --git a/execution/scheduler.go b/execution/scheduler.go index 8a5f18bef58b..5469e452d6c1 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -377,7 +377,7 @@ func (e *Scheduler) runExecutor( // and emission of the `vus` and `vus_max` metrics. func (e *Scheduler) Init( runCtx context.Context, samplesOut chan<- metrics.SampleContainer, -) (stopVUEmission func(), err error) { +) (stopVUEmission func(), initErr error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) @@ -390,11 +390,11 @@ func (e *Scheduler) Init( defer func() { if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { - logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err) + logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, initErr) e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted) - err = interruptErr + initErr = interruptErr } - if err != nil { + if initErr != nil { stopVUEmission() } }() @@ -406,14 +406,14 @@ func (e *Scheduler) Init( // out channel. // //nolint:funlen -func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { +func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") defer func() { if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { - logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err) + logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr) e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted) - err = interruptErr + runErr = interruptErr } }() diff --git a/js/runner_test.go b/js/runner_test.go index ed72b9828d34..4b34dcd16412 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -47,7 +47,6 @@ import ( "go.k6.io/k6/lib/testutils/mockoutput" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" - "go.k6.io/k6/metrics/engine" "go.k6.io/k6/output" ) @@ -387,23 +386,17 @@ func TestDataIsolation(t *testing.T) { execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testRunState) - require.NoError(t, err) - globalCtx, globalCancel := context.WithCancel(context.Background()) defer globalCancel() runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger) mockOutput := mockoutput.New() - outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort) + outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort) samples := make(chan metrics.SampleContainer, 1000) waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples) require.NoError(t, err) defer stopOutputs(nil) - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration) - require.Nil(t, finalizeThresholds) - require.Empty(t, runner.defaultGroup.Groups) stopEmission, err := execScheduler.Init(runCtx, samples) diff --git a/js/summary_test.go b/js/summary_test.go index 2f30669a0d43..a43f68aa4b0d 100644 --- a/js/summary_test.go +++ b/js/summary_test.go @@ -150,7 +150,7 @@ func createTestMetrics(t *testing.T) (map[string]*metrics.Metric, *lib.Group) { require.NoError(t, err) checksMetric.Tainted = null.BoolFrom(false) checksMetric.Thresholds = metrics.Thresholds{Thresholds: []*metrics.Threshold{{Source: "rate>70", LastFailed: false}}} - sink := &metrics.TrendSink{} + sink := metrics.NewTrendSink() samples := []float64{10.0, 15.0, 20.0} for _, s := range samples { diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 8aa81c11fad8..55f0b87fdb74 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -15,7 +15,6 @@ import ( "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" - "go.k6.io/k6/output" "gopkg.in/guregu/null.v3" ) @@ -25,13 +24,11 @@ const thresholdsRate = 2 * time.Second // aggregated metric sample values. They are used to generate the end-of-test // summary and to evaluate the test thresholds. type MetricsEngine struct { - logger logrus.FieldLogger - test *lib.TestRunState - outputIngester *outputIngester + registry *metrics.Registry + logger logrus.FieldLogger // These can be both top-level metrics or sub-metrics - metricsWithThresholds []*metrics.Metric - + metricsWithThresholds []*metrics.Metric breachedThresholdsCount uint32 // TODO: completely refactor: @@ -44,39 +41,31 @@ type MetricsEngine struct { } // NewMetricsEngine creates a new metrics Engine with the given parameters. -func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) { +func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) { me := &MetricsEngine{ - test: runState, - logger: runState.Logger.WithField("component", "metrics-engine"), + registry: registry, + logger: logger.WithField("component", "metrics-engine"), ObservedMetrics: make(map[string]*metrics.Metric), } - if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) { - err := me.initSubMetricsAndThresholds() - if err != nil { - return nil, err - } - } - return me, nil } // CreateIngester returns a pseudo-Output that uses the given metric samples to // update the engine's inner state. -func (me *MetricsEngine) CreateIngester() output.Output { - me.outputIngester = &outputIngester{ +func (me *MetricsEngine) CreateIngester() *OutputIngester { + return &OutputIngester{ logger: me.logger.WithField("component", "metrics-engine-ingester"), metricsEngine: me, cardinality: newCardinalityControl(), } - return me.outputIngester } func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) { // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) - metric := me.test.Registry.Get(nameParts[0]) + metric := me.registry.Get(nameParts[0]) if metric == nil { return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) } @@ -125,11 +114,14 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) { } } -func (me *MetricsEngine) initSubMetricsAndThresholds() error { - for metricName, thresholds := range me.test.Options.Thresholds { +// InitSubMetricsAndThresholds parses the thresholds from the test Options and +// initializes both the thresholds themselves, as well as any submetrics that +// were referenced in them. +func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error { + for metricName, thresholds := range options.Thresholds { metric, err := me.getThresholdMetricOrSubmetric(metricName) - if me.test.RuntimeOptions.NoThresholds.Bool { + if onlyLogErrors { if err != nil { me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) } @@ -154,7 +146,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 // lands and there is a better way to enable a metric with tag - if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) { + if options.SystemTags.Has(metrics.TagExpectedResponse) { _, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}") if err != nil { return err // shouldn't happen, but ¯\_(ツ)_/¯ @@ -167,10 +159,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // StartThresholdCalculations spins up a new goroutine to crunch thresholds and // returns a callback that will stop the goroutine and finalizes calculations. func (me *MetricsEngine) StartThresholdCalculations( + ingester *OutputIngester, abortRun func(error), getCurrentTestRunDuration func() time.Duration, -) (finalize func() (breached []string), -) { +) (finalize func() (breached []string)) { if len(me.metricsWithThresholds) == 0 { return nil // no thresholds were defined } @@ -205,9 +197,9 @@ func (me *MetricsEngine) StartThresholdCalculations( }() return func() []string { - if me.outputIngester != nil { + if ingester != nil { // Stop the ingester so we don't get any more metrics - err := me.outputIngester.Stop() + err := ingester.Stop() if err != nil { me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.") } diff --git a/metrics/engine/engine_test.go b/metrics/engine/engine_test.go index 06267cfa3c86..a941b4671b84 100644 --- a/metrics/engine/engine_test.go +++ b/metrics/engine/engine_test.go @@ -32,10 +32,12 @@ func TestNewMetricsEngineWithThresholds(t *testing.T) { _, err = trs.Registry.NewMetric("metric2", metrics.Counter) require.NoError(t, err) - me, err := NewMetricsEngine(trs) + me, err := NewMetricsEngine(trs.Registry, trs.Logger) require.NoError(t, err) require.NotNil(t, me) + require.NoError(t, me.InitSubMetricsAndThresholds(trs.Options, false)) + assert.Len(t, me.metricsWithThresholds, 2) } @@ -57,7 +59,7 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { t.Parallel() me := newTestMetricsEngine(t) - _, err := me.test.Registry.NewMetric("metric1", metrics.Counter) + _, err := me.registry.NewMetric("metric1", metrics.Counter) require.NoError(t, err) _, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition) @@ -69,16 +71,8 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { func TestNewMetricsEngineNoThresholds(t *testing.T) { t.Parallel() - trs := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: testutils.NewLogger(t), - }, - } - - me, err := NewMetricsEngine(trs) - require.NoError(t, err) + me := newTestMetricsEngine(t) require.NotNil(t, me) - assert.Empty(t, me.metricsWithThresholds) } @@ -113,9 +107,9 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) { t.Parallel() me := newTestMetricsEngine(t) - m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + m1, err := me.registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + m2, err := me.registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) ths := metrics.NewThresholds([]string{tc.threshold}) @@ -138,9 +132,9 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { me := newTestMetricsEngine(t) - m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + m1, err := me.registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + m2, err := me.registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) ths := metrics.NewThresholds([]string{"count>5"}) @@ -159,18 +153,10 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { assert.Empty(t, breached) } -func newTestMetricsEngine(t *testing.T) MetricsEngine { - trs := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: testutils.NewLogger(t), - Registry: metrics.NewRegistry(), - }, - } - - return MetricsEngine{ - logger: trs.Logger, - test: trs, - } +func newTestMetricsEngine(t *testing.T) *MetricsEngine { + m, err := NewMetricsEngine(metrics.NewRegistry(), testutils.NewLogger(t)) + require.NoError(t, err) + return m } func zeroTestRunDuration() time.Duration { diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go index 62ffee8293e6..496b6a6eaa74 100644 --- a/metrics/engine/ingester.go +++ b/metrics/engine/ingester.go @@ -13,16 +13,16 @@ const ( timeSeriesFirstLimit = 100_000 ) -var _ output.Output = &outputIngester{} +var _ output.Output = &OutputIngester{} // IngesterDescription is a short description for ingester. // This variable is used from a function in cmd/ui file for matching this output // and print a special text. const IngesterDescription = "Internal Metrics Ingester" -// outputIngester implements the output.Output interface and can be used to +// OutputIngester implements the output.Output interface and can be used to // "feed" the MetricsEngine data from a `k6 run` test run. -type outputIngester struct { +type OutputIngester struct { output.SampleBuffer logger logrus.FieldLogger @@ -32,12 +32,12 @@ type outputIngester struct { } // Description returns a human-readable description of the output. -func (oi *outputIngester) Description() string { +func (oi *OutputIngester) Description() string { return IngesterDescription } // Start the engine by initializing a new output.PeriodicFlusher -func (oi *outputIngester) Start() error { +func (oi *OutputIngester) Start() error { oi.logger.Debug("Starting...") pf, err := output.NewPeriodicFlusher(collectRate, oi.flushMetrics) @@ -51,7 +51,7 @@ func (oi *outputIngester) Start() error { } // Stop flushes any remaining metrics and stops the goroutine. -func (oi *outputIngester) Stop() error { +func (oi *OutputIngester) Stop() error { oi.logger.Debug("Stopping...") defer oi.logger.Debug("Stopped!") oi.periodicFlusher.Stop() @@ -59,7 +59,7 @@ func (oi *outputIngester) Stop() error { } // flushMetrics Writes samples to the MetricsEngine -func (oi *outputIngester) flushMetrics() { +func (oi *OutputIngester) flushMetrics() { sampleContainers := oi.GetBufferedSamples() if len(sampleContainers) == 0 { return diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go index 71f7a7c84973..b299322e363b 100644 --- a/metrics/engine/ingester_test.go +++ b/metrics/engine/ingester_test.go @@ -19,7 +19,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) { testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) require.NoError(t, err) - ingester := outputIngester{ + ingester := OutputIngester{ logger: piState.Logger, metricsEngine: &MetricsEngine{ ObservedMetrics: make(map[string]*metrics.Metric), @@ -55,9 +55,8 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { require.NoError(t, err) me := &MetricsEngine{ - test: &lib.TestRunState{ - TestPreInitState: piState, - }, + logger: piState.Logger, + registry: piState.Registry, ObservedMetrics: make(map[string]*metrics.Metric), } _, err = me.getThresholdMetricOrSubmetric("test_metric{a:1}") @@ -66,7 +65,7 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { // assert that observed metrics is empty before to start require.Empty(t, me.ObservedMetrics) - ingester := outputIngester{ + ingester := OutputIngester{ logger: piState.Logger, metricsEngine: me, cardinality: newCardinalityControl(), @@ -107,7 +106,7 @@ func TestOutputFlushMetricsTimeSeriesWarning(t *testing.T) { require.NoError(t, err) logger, hook := testutils.NewLoggerWithHook(nil) - ingester := outputIngester{ + ingester := OutputIngester{ logger: logger, metricsEngine: &MetricsEngine{ ObservedMetrics: make(map[string]*metrics.Metric), diff --git a/metrics/metric_test.go b/metrics/metric_test.go index 4b29ebe54e7a..cfd2d7510227 100644 --- a/metrics/metric_test.go +++ b/metrics/metric_test.go @@ -15,7 +15,7 @@ func TestNewMetric(t *testing.T) { }{ "Counter": {Counter, &CounterSink{}}, "Gauge": {Gauge, &GaugeSink{}}, - "Trend": {Trend, &TrendSink{}}, + "Trend": {Trend, NewTrendSink()}, "Rate": {Rate, &RateSink{}}, } diff --git a/metrics/sample.go b/metrics/sample.go index 81c1a942afdf..3d1e873821bd 100644 --- a/metrics/sample.go +++ b/metrics/sample.go @@ -140,11 +140,11 @@ func PushIfNotDone(ctx context.Context, output chan<- SampleContainer, sample Sa // the summary output and then returns a map of the corresponding resolvers. func GetResolversForTrendColumns(trendColumns []string) (map[string]func(s *TrendSink) float64, error) { staticResolvers := map[string]func(s *TrendSink) float64{ - "avg": func(s *TrendSink) float64 { return s.Avg }, - "min": func(s *TrendSink) float64 { return s.Min }, + "avg": func(s *TrendSink) float64 { return s.Avg() }, + "min": func(s *TrendSink) float64 { return s.Min() }, "med": func(s *TrendSink) float64 { return s.P(0.5) }, - "max": func(s *TrendSink) float64 { return s.Max }, - "count": func(s *TrendSink) float64 { return float64(s.Count) }, + "max": func(s *TrendSink) float64 { return s.Max() }, + "count": func(s *TrendSink) float64 { return float64(s.Count()) }, } dynamicResolver := func(percentile float64) func(s *TrendSink) float64 { return func(s *TrendSink) float64 { diff --git a/metrics/sample_test.go b/metrics/sample_test.go index 2c2042f7f323..f5654e7ea7a5 100644 --- a/metrics/sample_test.go +++ b/metrics/sample_test.go @@ -99,11 +99,11 @@ func TestGetResolversForTrendColumnsCalculation(t *testing.T) { } func createTestTrendSink(count int) *TrendSink { - sink := TrendSink{} + sink := NewTrendSink() for i := 0; i < count; i++ { sink.Add(Sample{Value: float64(i)}) } - return &sink + return sink } diff --git a/metrics/sink.go b/metrics/sink.go index e238dae78507..046a6bfee6ae 100644 --- a/metrics/sink.go +++ b/metrics/sink.go @@ -1,7 +1,6 @@ package metrics import ( - "errors" "fmt" "math" "sort" @@ -11,9 +10,8 @@ import ( var ( _ Sink = &CounterSink{} _ Sink = &GaugeSink{} - _ Sink = &TrendSink{} + _ Sink = NewTrendSink() _ Sink = &RateSink{} - _ Sink = &DummySink{} ) type Sink interface { @@ -32,7 +30,7 @@ func NewSink(mt MetricType) Sink { case Gauge: sink = &GaugeSink{} case Trend: - sink = &TrendSink{} + sink = NewTrendSink() case Rate: sink = &RateSink{} default: @@ -90,67 +88,101 @@ func (g *GaugeSink) Format(t time.Duration) map[string]float64 { return map[string]float64{"value": g.Value} } +// NewTrendSink makes a Trend sink with the OpenHistogram circllhist histogram. +func NewTrendSink() *TrendSink { + return &TrendSink{} +} + type TrendSink struct { - Values []float64 + values []float64 sorted bool - Count uint64 - Min, Max float64 - Sum, Avg float64 + count uint64 + min, max float64 + // TODO: unexport after this dependency is removed: + // https://github.com/grafana/xk6-output-prometheus-remote/blob/v0.2.1/pkg/remotewrite/remotewrite.go#L173 + Sum float64 } // IsEmpty indicates whether the TrendSink is empty. -func (t *TrendSink) IsEmpty() bool { return t.Count == 0 } +func (t *TrendSink) IsEmpty() bool { return t.count == 0 } func (t *TrendSink) Add(s Sample) { - if t.Count == 0 { - t.Max, t.Min = s.Value, s.Value + if t.count == 0 { + t.max, t.min = s.Value, s.Value } else { - if s.Value > t.Max { - t.Max = s.Value + if s.Value > t.max { + t.max = s.Value } - if s.Value < t.Min { - t.Min = s.Value + if s.Value < t.min { + t.min = s.Value } } - t.Values = append(t.Values, s.Value) + t.values = append(t.values, s.Value) t.sorted = false - t.Count += 1 + t.count++ t.Sum += s.Value - t.Avg = t.Sum / float64(t.Count) } // P calculates the given percentile from sink values. func (t *TrendSink) P(pct float64) float64 { - switch t.Count { + switch t.count { case 0: return 0 case 1: - return t.Values[0] + return t.values[0] default: if !t.sorted { - sort.Float64s(t.Values) + sort.Float64s(t.values) t.sorted = true } // If percentile falls on a value in Values slice, we return that value. // If percentile does not fall on a value in Values slice, we calculate (linear interpolation) // the value that would fall at percentile, given the values above and below that percentile. - i := pct * (float64(t.Count) - 1.0) - j := t.Values[int(math.Floor(i))] - k := t.Values[int(math.Ceil(i))] + i := pct * (float64(t.count) - 1.0) + j := t.values[int(math.Floor(i))] + k := t.values[int(math.Ceil(i))] f := i - math.Floor(i) return j + (k-j)*f } } +// Min returns the minimum value. +func (t *TrendSink) Min() float64 { + return t.min +} + +// Max returns the maximum value. +func (t *TrendSink) Max() float64 { + return t.max +} + +// Count returns the number of recorded values. +func (t *TrendSink) Count() uint64 { + return t.count +} + +// Avg returns the average (i.e. mean) value. +func (t *TrendSink) Avg() float64 { + if t.count > 0 { + return t.Sum / float64(t.count) + } + return 0 +} + +// Total returns the total (i.e. "sum") value for all measurements. +func (t *TrendSink) Total() float64 { + return t.Sum +} + func (t *TrendSink) Format(tt time.Duration) map[string]float64 { // TODO: respect the summaryTrendStats for REST API return map[string]float64{ - "min": t.Min, - "max": t.Max, - "avg": t.Avg, + "min": t.Min(), + "max": t.Max(), + "avg": t.Avg(), "med": t.P(0.5), "p(90)": t.P(0.90), "p(95)": t.P(0.95), @@ -180,16 +212,3 @@ func (r RateSink) Format(t time.Duration) map[string]float64 { return map[string]float64{"rate": rate} } - -type DummySink map[string]float64 - -// IsEmpty indicates whether the DummySink is empty. -func (d DummySink) IsEmpty() bool { return len(d) == 0 } - -func (d DummySink) Add(s Sample) { - panic(errors.New("you can't add samples to a dummy sink")) -} - -func (d DummySink) Format(t time.Duration) map[string]float64 { - return map[string]float64(d) -} diff --git a/metrics/sink_test.go b/metrics/sink_test.go index ddf7081267d3..5f872e70e9a1 100644 --- a/metrics/sink_test.go +++ b/metrics/sink_test.go @@ -19,7 +19,7 @@ func TestNewSink(t *testing.T) { {mt: Counter, sink: &CounterSink{}}, {mt: Gauge, sink: &GaugeSink{}}, {mt: Rate, sink: &RateSink{}}, - {mt: Trend, sink: &TrendSink{}}, + {mt: Trend, sink: NewTrendSink()}, } for _, tc := range tests { assert.Equal(t, tc.sink, NewSink(tc.mt)) @@ -101,56 +101,56 @@ func TestTrendSink(t *testing.T) { t.Run("one value", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 7.0}) - assert.Equal(t, uint64(1), sink.Count) - assert.Equal(t, 7.0, sink.Min) - assert.Equal(t, 7.0, sink.Max) - assert.Equal(t, 7.0, sink.Avg) - assert.Equal(t, 7.0, sink.Sum) - assert.Equal(t, []float64{7.0}, sink.Values) + assert.Equal(t, uint64(1), sink.Count()) + assert.Equal(t, 7.0, sink.Min()) + assert.Equal(t, 7.0, sink.Max()) + assert.Equal(t, 7.0, sink.Avg()) + assert.Equal(t, 7.0, sink.Total()) + assert.Equal(t, []float64{7.0}, sink.values) }) t.Run("values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count) - assert.Equal(t, 0.0, sink.Min) - assert.Equal(t, 100.0, sink.Max) - assert.Equal(t, 54.0, sink.Avg) - assert.Equal(t, 540.0, sink.Sum) - assert.Equal(t, unsortedSamples10, sink.Values) + assert.Equal(t, uint64(len(unsortedSamples10)), sink.Count()) + assert.Equal(t, 0.0, sink.Min()) + assert.Equal(t, 100.0, sink.Max()) + assert.Equal(t, 54.0, sink.Avg()) + assert.Equal(t, 540.0, sink.Total()) + assert.Equal(t, unsortedSamples10, sink.values) }) t.Run("negative", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range []float64{-10, -20} { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(2), sink.Count) - assert.Equal(t, -20.0, sink.Min) - assert.Equal(t, -10.0, sink.Max) - assert.Equal(t, -15.0, sink.Avg) - assert.Equal(t, -30.0, sink.Sum) - assert.Equal(t, []float64{-10, -20}, sink.Values) + assert.Equal(t, uint64(2), sink.Count()) + assert.Equal(t, -20.0, sink.Min()) + assert.Equal(t, -10.0, sink.Max()) + assert.Equal(t, -15.0, sink.Avg()) + assert.Equal(t, -30.0, sink.Total()) + assert.Equal(t, []float64{-10, -20}, sink.values) }) t.Run("mixed", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range []float64{1.4, 0, -1.2} { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } - assert.Equal(t, uint64(3), sink.Count) - assert.Equal(t, -1.2, sink.Min) - assert.Equal(t, 1.4, sink.Max) - assert.Equal(t, 0.067, math.Round(sink.Avg*1000)/1000) - assert.Equal(t, 0.199, math.Floor(sink.Sum*1000)/1000) - assert.Equal(t, []float64{1.4, 0, -1.2}, sink.Values) + assert.Equal(t, uint64(3), sink.Count()) + assert.Equal(t, -1.2, sink.Min()) + assert.Equal(t, 1.4, sink.Max()) + assert.Equal(t, 0.067, math.Round(sink.Avg()*1000)/1000) + assert.Equal(t, 0.199, math.Floor(sink.Total()*1000)/1000) + assert.Equal(t, []float64{1.4, 0, -1.2}, sink.values) }) }) @@ -159,7 +159,7 @@ func TestTrendSink(t *testing.T) { t.Run("no values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for i := 1; i <= 100; i++ { assert.Equal(t, 0.0, sink.P(float64(i)/100.0)) } @@ -167,7 +167,7 @@ func TestTrendSink(t *testing.T) { t.Run("one value", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0}) for i := 1; i <= 100; i++ { assert.Equal(t, 10.0, sink.P(float64(i)/100.0)) @@ -176,7 +176,7 @@ func TestTrendSink(t *testing.T) { t.Run("two values", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 5.0}) sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: 10.0}) assert.Equal(t, false, sink.sorted) @@ -190,7 +190,7 @@ func TestTrendSink(t *testing.T) { t.Run("more than 2", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } @@ -205,7 +205,7 @@ func TestTrendSink(t *testing.T) { t.Run("format", func(t *testing.T) { t.Parallel() - sink := TrendSink{} + sink := NewTrendSink() for _, s := range unsortedSamples10 { sink.Add(Sample{TimeSeries: TimeSeries{Metric: &Metric{}}, Value: s}) } @@ -259,13 +259,3 @@ func TestRateSink(t *testing.T) { assert.Equal(t, map[string]float64{"rate": 0.5}, sink.Format(0)) }) } - -func TestDummySinkAddPanics(t *testing.T) { - assert.Panics(t, func() { - DummySink{}.Add(Sample{}) - }) -} - -func TestDummySinkFormatReturnsItself(t *testing.T) { - assert.Equal(t, map[string]float64{"a": 1}, DummySink{"a": 1}.Format(0)) -} diff --git a/metrics/thresholds.go b/metrics/thresholds.go index a83a5999497c..44badb4a10df 100644 --- a/metrics/thresholds.go +++ b/metrics/thresholds.go @@ -185,9 +185,9 @@ func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) { case *GaugeSink: ts.sinked["value"] = sinkImpl.Value case *TrendSink: - ts.sinked["min"] = sinkImpl.Min - ts.sinked["max"] = sinkImpl.Max - ts.sinked["avg"] = sinkImpl.Avg + ts.sinked["min"] = sinkImpl.Min() + ts.sinked["max"] = sinkImpl.Max() + ts.sinked["avg"] = sinkImpl.Avg() ts.sinked["med"] = sinkImpl.P(0.5) // Parse the percentile thresholds and insert them in @@ -206,10 +206,6 @@ func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) { if sinkImpl.Total > 0 { ts.sinked["rate"] = float64(sinkImpl.Trues) / float64(sinkImpl.Total) } - case DummySink: - for k, v := range sinkImpl { - ts.sinked[k] = v - } default: return false, fmt.Errorf("unable to run Thresholds; reason: unknown sink type") } diff --git a/metrics/thresholds_test.go b/metrics/thresholds_test.go index 9ee02127d8c3..21faa24a607e 100644 --- a/metrics/thresholds_test.go +++ b/metrics/thresholds_test.go @@ -639,6 +639,14 @@ func TestThresholdsRunAll(t *testing.T) { } } +func getTrendSink(values ...float64) *TrendSink { + sink := NewTrendSink() + for _, v := range values { + sink.Add(Sample{Value: v}) + } + return sink +} + func TestThresholdsRun(t *testing.T) { t.Parallel() @@ -656,8 +664,8 @@ func TestThresholdsRun(t *testing.T) { { name: "Running thresholds of existing sink", args: args{ - sink: DummySink{"p(95)": 1234.5}, - thresholdExpressions: []string{"p(95)<2000"}, + sink: &CounterSink{Value: 1234.5}, + thresholdExpressions: []string{"count<2000"}, duration: 0, }, want: true, @@ -666,8 +674,8 @@ func TestThresholdsRun(t *testing.T) { { name: "Running thresholds of existing sink but failing threshold", args: args{ - sink: DummySink{"p(95)": 3000}, - thresholdExpressions: []string{"p(95)<2000"}, + sink: &CounterSink{Value: 3000}, + thresholdExpressions: []string{"count<2000"}, duration: 0, }, want: false, @@ -676,7 +684,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on non existing sink does not fail", args: args{ - sink: DummySink{"dummy": 0}, + sink: &CounterSink{}, thresholdExpressions: []string{"p(95)<2000"}, duration: 0, }, @@ -686,7 +694,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with no values and passing med statement succeeds", args: args{ - sink: &TrendSink{Values: []float64{}}, + sink: getTrendSink(), thresholdExpressions: []string{"med<39"}, duration: 0, }, @@ -696,7 +704,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with no values and non passing med statement fails", args: args{ - sink: &TrendSink{Values: []float64{}}, + sink: getTrendSink(), thresholdExpressions: []string{"med>39"}, duration: 0, }, @@ -706,7 +714,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with values and passing med statement succeeds", args: args{ - sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3}, + sink: getTrendSink(70, 80, 90), thresholdExpressions: []string{"med>39"}, duration: 0, }, @@ -716,7 +724,7 @@ func TestThresholdsRun(t *testing.T) { { name: "Running threshold on trend sink with values and failing med statement fails", args: args{ - sink: &TrendSink{Values: []float64{70, 80, 90}, Count: 3}, + sink: getTrendSink(70, 80, 90), thresholdExpressions: []string{"med<39"}, duration: 0, },