Skip to content

Commit

Permalink
Merge pull request grafana#1406 from sandlis/chunks-max-look-back
Browse files Browse the repository at this point in the history
Limiting chunks query start time with config
  • Loading branch information
khaines authored May 21, 2019
2 parents 9320843 + 0a5b0a7 commit 4085bfa
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
25 changes: 18 additions & 7 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type StoreConfig struct {

MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"`
CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"`

// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -70,6 +73,7 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {

f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.")
f.DurationVar(&cfg.MaxLookBackPeriod, "store.max-look-back-period", 0, "Limit how long back data can be queried")

// Deprecated.
flagext.DeprecatedFlag(f, "store.cardinality-cache-size", "DEPRECATED. Use store.index-cache-read.enable-fifocache and store.index-cache-read.fifocache.size instead.")
Expand Down Expand Up @@ -173,7 +177,7 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .
level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers))

// Validate the query is within reasonable bounds.
metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers)
metricName, matchers, shortcut, err := c.validateQuery(ctx, &from, &through, allMatchers)
if err != nil {
return nil, err
} else if shortcut {
Expand All @@ -199,7 +203,7 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
return nil, err
}

shortcut, err := c.validateQueryTimeRange(ctx, from, &through)
shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
Expand Down Expand Up @@ -230,11 +234,11 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
return result, nil
}

func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, through *model.Time) (bool, error) {
func (c *store) validateQueryTimeRange(ctx context.Context, from *model.Time, through *model.Time) (bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange")
defer log.Span.Finish()

if *through < from {
if *through < *from {
return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
}

Expand All @@ -244,8 +248,8 @@ func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, thr
}

maxQueryLength := c.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (*through).Sub(from) > maxQueryLength {
return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength)
if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength {
return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength)
}

now := model.Now()
Expand All @@ -261,6 +265,13 @@ func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, thr
return true, nil
}

if c.cfg.MaxLookBackPeriod != 0 {
oldestStartTime := model.Now().Add(-c.cfg.MaxLookBackPeriod)
if oldestStartTime.After(*from) {
*from = oldestStartTime
}
}

if through.After(now.Add(5 * time.Minute)) {
// time-span end is in future ... regard as legal
level.Error(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now)
Expand All @@ -270,7 +281,7 @@ func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, thr
return false, nil
}

func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
func (c *store) validateQuery(ctx context.Context, from *model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQuery")
defer log.Span.Finish()

Expand Down
53 changes: 53 additions & 0 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,56 @@ func TestChunkStoreError(t *testing.T) {
}
}
}

func TestStoreMaxLookBack(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
metric := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
}
storeMaker := stores[0]
storeCfg := storeMaker.configFn()

// Creating 2 stores, One with no look back limit and another with 30 Mins look back limit
storeWithoutLookBackLimit := newTestChunkStoreConfig(t, "v9", storeCfg)
defer storeWithoutLookBackLimit.Stop()

storeCfg.MaxLookBackPeriod = 30 * time.Minute
storeWithLookBackLimit := newTestChunkStoreConfig(t, "v9", storeCfg)
defer storeWithLookBackLimit.Stop()

now := model.Now()

// Populating both stores with chunks
fooChunk1 := dummyChunkFor(now, metric)
err := fooChunk1.Encode()
require.NoError(t, err)
err = storeWithoutLookBackLimit.Put(ctx, []Chunk{fooChunk1})
require.NoError(t, err)
err = storeWithLookBackLimit.Put(ctx, []Chunk{fooChunk1})
require.NoError(t, err)

fooChunk2 := dummyChunkFor(now.Add(-time.Hour*1), metric)
err = fooChunk2.Encode()
require.NoError(t, err)
err = storeWithoutLookBackLimit.Put(ctx, []Chunk{fooChunk2})
require.NoError(t, err)
err = storeWithLookBackLimit.Put(ctx, []Chunk{fooChunk2})
require.NoError(t, err)

matchers, err := promql.ParseMetricSelector(`foo{bar="baz"}`)
if err != nil {
t.Fatal(err)
}

// Both the chunks should be returned
chunks, err := storeWithoutLookBackLimit.Get(ctx, now.Add(-time.Hour), now, matchers...)
require.NoError(t, err)
require.Equal(t, 2, len(chunks))

// Single chunk should be returned with newer timestamp
chunks, err = storeWithLookBackLimit.Get(ctx, now.Add(-time.Hour), now, matchers...)
require.NoError(t, err)
require.Equal(t, 1, len(chunks))
chunks[0].Through.Equal(now)
}
2 changes: 1 addition & 1 deletion series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time
}

// Validate the query is within reasonable bounds.
metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers)
metricName, matchers, shortcut, err := c.validateQuery(ctx, &from, &through, allMatchers)
if err != nil {
return nil, nil, err
} else if shortcut {
Expand Down

0 comments on commit 4085bfa

Please sign in to comment.