Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge branch 'no_panic_for_cons'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Jan 31, 2018
2 parents 70ecd54 + 1621a55 commit 1f9b24d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 16 deletions.
11 changes: 7 additions & 4 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
}

func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
res := s.getSeriesAggMetrics(ctx)
res, err := s.getSeriesAggMetrics(ctx)
if err != nil {
return res, err
}
select {
case <-ctx.ctx.Done():
//request canceled
Expand Down Expand Up @@ -468,22 +471,22 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema
return points
}

func (s *Server) getSeriesAggMetrics(ctx *requestContext) mdata.Result {
func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) {
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesAggMetrics")
defer span.Finish()
metric, ok := s.MemoryStore.Get(ctx.Key)
if !ok {
return mdata.Result{
Oldest: ctx.Req.To,
}
}, nil
}

if ctx.Cons != consolidation.None {
logLoad("memory", ctx.AggKey, ctx.From, ctx.To)
return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
} else {
logLoad("memory", ctx.Req.Key, ctx.From, ctx.To)
return metric.Get(ctx.From, ctx.To)
return metric.Get(ctx.From, ctx.To), nil
}
}

Expand Down
5 changes: 4 additions & 1 deletion api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,10 @@ func TestGetSeriesAggMetrics(t *testing.T) {
metric.Add(i, float64(i^2))
}

res := srv.getSeriesAggMetrics(ctx)
res, err := srv.getSeriesAggMetrics(ctx)
if err != nil {
t.Errorf("Got unexpected error %q", err)
}
timestamps := make([]uint32, 0)
values := make([]float64, 0)

Expand Down
39 changes: 29 additions & 10 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mdata

import (
"errors"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -137,30 +138,48 @@ func (a *AggMetric) getChunk(pos int) *chunk.Chunk {
return a.Chunks[pos]
}

func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) Result {
func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) {
// no lock needed cause aggregators don't change at runtime
for _, a := range a.aggregators {
if a.span == aggSpan {
var agg *AggMetric
switch consolidator {
case consolidation.None:
panic("cannot get an archive for no consolidation")
err := errors.New("internal error: AggMetric.GetAggregated(): cannot get an archive for no consolidation")
log.Error(3, "AM: %s", err.Error())
badConsolidator.Inc()
return Result{}, err
case consolidation.Avg:
panic("avg consolidator has no matching Archive(). you need sum and cnt")
err := errors.New("internal error: AggMetric.GetAggregated(): avg consolidator has no matching Archive(). you need sum and cnt")
log.Error(3, "AM: %s", err.Error())
badConsolidator.Inc()
return Result{}, err
case consolidation.Cnt:
return a.cntMetric.Get(from, to)
agg = a.cntMetric
case consolidation.Lst:
return a.lstMetric.Get(from, to)
agg = a.lstMetric
case consolidation.Min:
return a.minMetric.Get(from, to)
agg = a.minMetric
case consolidation.Max:
return a.maxMetric.Get(from, to)
agg = a.maxMetric
case consolidation.Sum:
return a.sumMetric.Get(from, to)
agg = a.sumMetric
default:
err := fmt.Errorf("internal error: AggMetric.GetAggregated(): unknown consolidator %q", consolidator)
log.Error(3, "AM: %s", err.Error())
badConsolidator.Inc()
return Result{}, err
}
if agg == nil {
return Result{}, fmt.Errorf("Consolidator %q not configured", consolidator)
}
panic(fmt.Sprintf("AggMetric.GetAggregated(): unknown consolidator %q", consolidator))
return agg.Get(from, to), nil
}
}
panic(fmt.Sprintf("GetAggregated called with unknown aggSpan %d", aggSpan))
err := fmt.Errorf("internal error: AggMetric.GetAggregated(): unknown aggSpan %d", aggSpan)
log.Error(3, "AM: %s", err.Error())
badAggSpan.Inc()
return Result{}, err
}

// Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to
Expand Down
2 changes: 1 addition & 1 deletion mdata/ifaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ type Metrics interface {
type Metric interface {
Add(ts uint32, val float64)
Get(from, to uint32) Result
GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) Result
GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
}
8 changes: 8 additions & 0 deletions mdata/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ var (
// metric tank.gc_metric is the number of times the metrics GC is about to inspect a metric (series)
gcMetric = stats.NewCounter32("tank.gc_metric")

// metric recovered_errors.aggmetric.getaggregated.bad-consolidator is how many times we detected an GetAggregated call
// with an incorrect consolidator specified
badConsolidator = stats.NewCounter32("recovered_errors.aggmetric.getaggregated.bad-consolidator")

// metric recovered_errors.aggmetric.getaggregated.bad-aggspan is how many times we detected an GetAggregated call
// with an incorrect aggspan specified
badAggSpan = stats.NewCounter32("recovered_errors.aggmetric.getaggregated.bad-aggspan")

// set either via ConfigProcess or from the unit tests. other code should not touch
Schemas conf.Schemas
Aggregations conf.Aggregations
Expand Down

0 comments on commit 1f9b24d

Please sign in to comment.