Skip to content

Commit

Permalink
[aggregator] Sample timers completely (#3184)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Feb 7, 2021
1 parent 1c3debf commit 2d9019d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
32 changes: 20 additions & 12 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (agg *aggregator) AddUntimed(
metric unaggregated.MetricUnion,
metadatas metadata.StagedMetadatas,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addUntimed.SuccessLatencyStopwatch()
if err := agg.checkMetricType(metric); err != nil {
agg.metrics.addUntimed.ReportError(err)
return err
Expand All @@ -197,15 +197,16 @@ func (agg *aggregator) AddUntimed(
agg.metrics.addUntimed.ReportError(err)
return err
}
agg.metrics.addUntimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addUntimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddTimed(
metric aggregated.Metric,
metadata metadata.TimedMetadata,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addTimed.SuccessLatencyStopwatch()
agg.metrics.timed.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -216,15 +217,16 @@ func (agg *aggregator) AddTimed(
agg.metrics.addTimed.ReportError(err)
return err
}
agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addTimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddTimedWithStagedMetadatas(
metric aggregated.Metric,
metas metadata.StagedMetadatas,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addTimed.SuccessLatencyStopwatch()
agg.metrics.timed.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -235,15 +237,16 @@ func (agg *aggregator) AddTimedWithStagedMetadatas(
agg.metrics.addTimed.ReportError(err)
return err
}
agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addTimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddForwarded(
metric aggregated.ForwardedMetric,
metadata metadata.ForwardMetadata,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addForwarded.SuccessLatencyStopwatch()
agg.metrics.forwarded.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -255,7 +258,8 @@ func (agg *aggregator) AddForwarded(
return err
}
callEnd := agg.nowFn()
agg.metrics.addForwarded.ReportSuccess(callEnd.Sub(callStart))
agg.metrics.addForwarded.ReportSuccess()
sw.Stop()
forwardingDelay := time.Duration(callEnd.UnixNano() - metric.TimeNanos)
agg.metrics.addForwarded.ReportForwardingLatency(
metadata.StoragePolicy.Resolution().Window,
Expand All @@ -269,7 +273,7 @@ func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addPassthrough.SuccessLatencyStopwatch()
agg.metrics.passthrough.Inc(1)

if agg.electionManager.ElectionState() == FollowerState {
Expand Down Expand Up @@ -298,7 +302,8 @@ func (agg *aggregator) AddPassthrough(
agg.metrics.addPassthrough.ReportError(err)
return err
}
agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addPassthrough.ReportSuccess()
sw.Stop()
return nil
}

Expand Down Expand Up @@ -758,9 +763,12 @@ func newAggregatorAddMetricMetrics(
}
}

func (m *aggregatorAddMetricMetrics) ReportSuccess(d time.Duration) {
func (m *aggregatorAddMetricMetrics) SuccessLatencyStopwatch() tally.Stopwatch {
return m.successLatency.Start()
}

func (m *aggregatorAddMetricMetrics) ReportSuccess() {
m.success.Inc(1)
m.successLatency.Record(d)
}

func (m *aggregatorAddMetricMetrics) ReportError(err error) {
Expand Down
14 changes: 7 additions & 7 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,9 @@ func TestAggregatorOwnedShards(t *testing.T) {

func TestAggregatorAddMetricMetrics(t *testing.T) {
s := tally.NewTestScope("testScope", nil)
m := newAggregatorAddUntimedMetrics(s, instrument.TimerOptions{})
m.ReportSuccess(time.Second)
m := newAggregatorAddUntimedMetrics(s, instrument.TimerOptions{StandardSampleRate: 0.001})
m.ReportSuccess()
m.SuccessLatencyStopwatch().Stop()
m.ReportError(errInvalidMetricType)
m.ReportError(errShardNotOwned)
m.ReportError(errAggregatorShardNotWriteable)
Expand Down Expand Up @@ -980,9 +981,8 @@ func TestAggregatorAddMetricMetrics(t *testing.T) {
for _, id := range []string{
"testScope.success-latency+",
} {
ti, exists := timers[id]
_, exists := timers[id]
require.True(t, exists)
require.Equal(t, []time.Duration{time.Second}, ti.Values())
}

// Validate we do not have any gauges.
Expand All @@ -992,7 +992,8 @@ func TestAggregatorAddMetricMetrics(t *testing.T) {
func TestAggregatorAddTimedMetrics(t *testing.T) {
s := tally.NewTestScope("testScope", nil)
m := newAggregatorAddTimedMetrics(s, instrument.TimerOptions{})
m.ReportSuccess(time.Second)
m.ReportSuccess()
m.SuccessLatencyStopwatch().Stop()
m.ReportError(errShardNotOwned)
m.ReportError(errAggregatorShardNotWriteable)
m.ReportError(errWriteNewMetricRateLimitExceeded)
Expand Down Expand Up @@ -1027,9 +1028,8 @@ func TestAggregatorAddTimedMetrics(t *testing.T) {
for _, id := range []string{
"testScope.success-latency+",
} {
ti, exists := timers[id]
_, exists := timers[id]
require.True(t, exists)
require.Equal(t, []time.Duration{time.Second}, ti.Values())
}

// Validate we do not have any gauges.
Expand Down

0 comments on commit 2d9019d

Please sign in to comment.