diff --git a/metric/metricexport/reader.go b/metric/metricexport/reader.go index b920bacd8..8a09d0f00 100644 --- a/metric/metricexport/reader.go +++ b/metric/metricexport/reader.go @@ -130,7 +130,7 @@ func (ir *IntervalReader) Start() error { reportingInterval = ir.ReportingInterval } - if ir.done != nil { + if ir.quit != nil { return errAlreadyStarted } ir.timer = time.NewTicker(reportingInterval) @@ -172,6 +172,19 @@ func (ir *IntervalReader) Stop() { ir.quit = nil } +// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op. +func (ir *IntervalReader) Flush() { + ir.mu.Lock() + defer ir.mu.Unlock() + + // No-op if IntervalReader is not stopped + if ir.quit != nil { + return + } + + ir.reader.ReadAndExport(ir.exporter) +} + // ReadAndExport reads metrics from all producer registered with // producer manager and then exports them using provided exporter. func (r *Reader) ReadAndExport(exporter Exporter) { diff --git a/metric/metricexport/reader_test.go b/metric/metricexport/reader_test.go index 33313dcf2..61c915b7f 100644 --- a/metric/metricexport/reader_test.go +++ b/metric/metricexport/reader_test.go @@ -117,6 +117,69 @@ func TestManualReadForIntervalReader(t *testing.T) { resetExporter(exporter1) } +func TestFlushNoOpForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + + // since IR is not stopped, flush does nothing + ir1.Flush() + + // expect no data points + checkExportedCount(exporter1, 0, t) + checkExportedMetricDesc(exporter1, "active_request", t) + ir1.Stop() + resetExporter(exporter1) +} + +func TestFlushAllowMultipleForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + + ir1.Stop() + ir1.Flush() + + // metric is still coming in + gaugeEntry.Add(1) + + // one more flush after IR stopped + ir1.Flush() + + // expect 2 data point, one from each flush + checkExportedCount(exporter1, 2, t) + checkExportedValues(exporter1, []int64{1, 2}, t) + checkExportedMetricDesc(exporter1, "active_request", t) + + resetExporter(exporter1) +} + +func TestFlushRestartForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + ir1.Stop() + ir1.Flush() + + // restart the IR + err := ir1.Start() + if err != nil { + t.Fatalf("error starting reader %v\n", err) + } + + gaugeEntry.Add(1) + + ir1.Stop() + ir1.Flush() + + // expect 2 data point, one from each flush + checkExportedCount(exporter1, 2, t) + checkExportedValues(exporter1, []int64{1, 2}, t) + checkExportedMetricDesc(exporter1, "active_request", t) + + resetExporter(exporter1) +} + func TestProducerWithIntervalReaderStop(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) ir1.Stop() @@ -166,7 +229,10 @@ func TestIntervalReaderMultipleStop(t *testing.T) { func TestIntervalReaderMultipleStart(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) - ir1.Start() + err := ir1.Start() + if err == nil { + t.Fatalf("expected error but got nil\n") + } gaugeEntry.Add(1)