diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index bad76cb85fd9..019b96dcacd9 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -229,7 +229,8 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query { cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since) cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from) cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to) - cmd.Flag("step", "Query resolution step width").DurationVar(&q.Step) + cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step) + cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval) } cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward) diff --git a/docs/api.md b/docs/api.md index f8ba849a29d8..3856c180461b 100644 --- a/docs/api.md +++ b/docs/api.md @@ -217,16 +217,22 @@ accepts the following query parameters in the URL: - `limit`: The max number of entries to return - `start`: The start time for the query as a nanosecond Unix epoch. Defaults to one hour ago. - `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now. -- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`. +- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`. Only applies to query types which produce a matrix response. +- `interval`: **Experimental, See Below** Only return entries at (or greater than) the specified interval, can be a `duration` format or float number of seconds. Only applies to queries which produce a stream response. - `direction`: Determines the sort order of logs. Supported values are `forward` or `backward`. Defaults to `backward.` -Requests against this endpoint require Loki to query the index store in order to -find log streams for particular labels. Because the index store is spread out by -time, the time span covered by `start` and `end`, if large, may cause additional -load against the index server and result in a slow query. - In microservices mode, `/loki/api/v1/query_range` is exposed by the querier and the frontend. +##### Step vs Interval + +Use the `step` parameter when making metric queries to Loki, or queries which return a matrix response. It is evaluated in exactly the same way Prometheus evaluates `step`. First the query will be evaluated at `start` and then evaluated again at `start + step` and again at `start + step + step` until `end` is reached. The result will be a matrix of the query result evaluated at each step. + +Use the `interval` parameter when making log queries to Loki, or queries which return a stream response. It is evaluated by returning a log entry at `start`, then the next entry will be returned an entry with timestampe >= `start + interval`, and again at `start + interval + interval` and so on until `end` is reached. It does not fill missing entries. + +**Note about the experimental nature of interval** This flag may be removed in the future, if so it will likely be in favor of a LogQL expression to perform similar behavior, however that is uncertain at this time. [Issue 1779](https://github.com/grafana/loki/issues/1779) was created to track the discussion, if you are using `interval` please go add your use case and thoughts to that issue. + + + Response: ``` diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 60ccc06b3512..cf350563bb32 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -54,7 +54,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log // QueryRange uses the /api/v1/query_range endpoint to execute a range query // excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method // nolint:interfacer -func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step time.Duration, quiet bool) (*loghttp.QueryResponse, error) { +func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { params := util.NewQueryStringBuilder() params.SetString("query", queryStr) params.SetInt32("limit", limit) @@ -68,6 +68,10 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, params.SetInt("step", int64(step.Seconds())) } + if interval != 0 { + params.SetInt("interval", int64(interval.Seconds())) + } + return c.doQuery(params.EncodeWithPath(queryRangePath), quiet) } diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 6700dd3bc662..f42738d2f291 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -43,6 +43,7 @@ type Query struct { Limit int Forward bool Step time.Duration + Interval time.Duration Quiet bool NoLabels bool IgnoreLabelsKey []string @@ -70,7 +71,7 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) if q.isInstant() { resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet) } else { - resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Quiet) + resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Interval, q.Quiet) } if err != nil { @@ -121,7 +122,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string if q.isInstant() { query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit)) } else { - query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.resultsDirection(), uint32(q.Limit)) + query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit)) } // execute the query diff --git a/pkg/loghttp/params.go b/pkg/loghttp/params.go index eef0499f23af..5221420b4f00 100644 --- a/pkg/loghttp/params.go +++ b/pkg/loghttp/params.go @@ -62,18 +62,15 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) { if value == "" { return time.Duration(defaultQueryRangeStep(start, end)) * time.Second, nil } + return parseSecondsOrDuration(value) +} - if d, err := strconv.ParseFloat(value, 64); err == nil { - ts := d * float64(time.Second) - if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { - return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value) - } - return time.Duration(ts), nil - } - if d, err := model.ParseDuration(value); err == nil { - return time.Duration(d), nil +func interval(r *http.Request) (time.Duration, error) { + value := r.Form.Get("interval") + if value == "" { + return 0, nil } - return 0, errors.Errorf("cannot parse %q to a valid duration", value) + return parseSecondsOrDuration(value) } // Match extracts and parses multiple matcher groups from a slice of strings @@ -160,3 +157,17 @@ func parseDirection(value string, def logproto.Direction) (logproto.Direction, e } return logproto.Direction(d), nil } + +func parseSecondsOrDuration(value string) (time.Duration, error) { + if d, err := strconv.ParseFloat(value, 64); err == nil { + ts := d * float64(time.Second) + if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { + return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value) + } + return time.Duration(ts), nil + } + if d, err := model.ParseDuration(value); err == nil { + return time.Duration(d), nil + } + return 0, errors.Errorf("cannot parse %q to a valid duration", value) +} diff --git a/pkg/loghttp/params_test.go b/pkg/loghttp/params_test.go index e8401dd00150..ca6a632e8203 100644 --- a/pkg/loghttp/params_test.go +++ b/pkg/loghttp/params_test.go @@ -137,6 +137,61 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) { } } +func Test_interval(t *testing.T) { + tests := []struct { + name string + reqPath string + expected time.Duration + wantErr bool + }{ + { + "valid_step_int", + "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5", + 5 * time.Second, + false, + }, + { + "valid_step_duration", + "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5m", + 5 * time.Minute, + false, + }, + { + "invalid", + "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=a", + 0, + true, + }, + { + "valid_0", + "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=0", + 0, + false, + }, + { + "valid_not_included", + "/loki/api/v1/query_range?query={}&start=0&end=3600000000000", + 0, + false, + }, + } + for _, testData := range tests { + testData := testData + + t.Run(testData.name, func(t *testing.T) { + req := httptest.NewRequest("GET", testData.reqPath, nil) + err := req.ParseForm() + require.Nil(t, err) + actual, err := interval(req) + if testData.wantErr { + require.Error(t, err) + } else { + assert.Equal(t, testData.expected, actual) + } + }) + } +} + func Test_parseTimestamp(t *testing.T) { now := time.Now() diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index a75783e98d03..ec6a5e21ebe6 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -16,9 +16,10 @@ import ( ) var ( - errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time") - errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time") + errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") + errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + errNegativeInterval = errors.New("interval must be >= 0") ) // QueryStatus holds the status of a query @@ -239,6 +240,7 @@ type RangeQuery struct { Start time.Time End time.Time Step time.Duration + Interval time.Duration Query string Direction logproto.Direction Limit uint32 @@ -284,5 +286,14 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) { return nil, errStepTooSmall } + result.Interval, err = interval(r) + if err != nil { + return nil, err + } + + if result.Interval < 0 { + return nil, errNegativeInterval + } + return &result, nil } diff --git a/pkg/loghttp/query_test.go b/pkg/loghttp/query_test.go index 34c717130e80..54b180378ab6 100644 --- a/pkg/loghttp/query_test.go +++ b/pkg/loghttp/query_test.go @@ -42,6 +42,10 @@ func TestParseRangeQuery(t *testing.T) { &http.Request{ URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`), }, nil, true}, + {"negative interval", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1,interval=-1`), + }, nil, true}, {"good", &http.Request{ URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD&step=3600`), diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 39d2618abee4..31404287deeb 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -26,6 +26,7 @@ var ( Help: "LogQL query timings", Buckets: prometheus.DefBuckets, }, []string{"query_type"}) + lastEntryMinTime = time.Unix(-100, 0) ) // ValueTypeStreams promql.ValueType for log streams @@ -68,7 +69,7 @@ func (opts *EngineOpts) applyDefault() { // Engine interface used to construct queries type Engine interface { - NewRangeQuery(qs string, start, end time.Time, step time.Duration, direction logproto.Direction, limit uint32) Query + NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query } @@ -144,7 +145,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) { // NewRangeQuery creates a new LogQL range query. func (ng *engine) NewRangeQuery( qs string, - start, end time.Time, step time.Duration, + start, end time.Time, step time.Duration, interval time.Duration, direction logproto.Direction, limit uint32) Query { return &query{ LiteralParams: LiteralParams{ @@ -152,6 +153,7 @@ func (ng *engine) NewRangeQuery( start: start, end: end, step: step, + interval: interval, direction: direction, limit: limit, }, @@ -170,6 +172,7 @@ func (ng *engine) NewInstantQuery( start: ts, end: ts, step: 0, + interval: 0, direction: direction, limit: limit, }, @@ -199,7 +202,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) { return nil, err } defer helpers.LogError("closing iterator", iter.Close) - streams, err := readStreams(iter, q.limit) + streams, err := readStreams(iter, q.limit, q.direction, q.interval) return streams, err } @@ -297,19 +300,33 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M return promql.Matrix{series} } -func readStreams(i iter.EntryIterator, size uint32) (Streams, error) { +func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, interval time.Duration) (Streams, error) { streams := map[string]*logproto.Stream{} respSize := uint32(0) - for ; respSize < size && i.Next(); respSize++ { + // lastEntry should be a really old time so that the first comparison is always true, we use a negative + // value here because many unit tests start at time.Unix(0,0) + lastEntry := lastEntryMinTime + for respSize < size && i.Next() { labels, entry := i.Labels(), i.Entry() - stream, ok := streams[labels] - if !ok { - stream = &logproto.Stream{ - Labels: labels, + forwardShouldOutput := dir == logproto.FORWARD && + (i.Entry().Timestamp.Equal(lastEntry.Add(interval)) || i.Entry().Timestamp.After(lastEntry.Add(interval))) + backwardShouldOutput := dir == logproto.BACKWARD && + (i.Entry().Timestamp.Equal(lastEntry.Add(-interval)) || i.Entry().Timestamp.Before(lastEntry.Add(-interval))) + // If step == 0 output every line. + // If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line. + // Then check to see if the entry is equal to, or past a forward or reverse step + if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput { + stream, ok := streams[labels] + if !ok { + stream = &logproto.Stream{ + Labels: labels, + } + streams[labels] = stream } - streams[labels] = stream + stream.Entries = append(stream.Entries, entry) + lastEntry = i.Entry().Timestamp + respSize++ } - stream.Entries = append(stream.Entries, entry) } result := make([]*logproto.Stream, 0, len(streams)) diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index c82245d98756..1add8d72bfe9 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -342,6 +342,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { start time.Time end time.Time step time.Duration + interval time.Duration direction logproto.Direction limit uint32 @@ -353,7 +354,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { expected promql.Value }{ { - `{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.FORWARD, 10, + `{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, 0, logproto.FORWARD, 10, [][]*logproto.Stream{ {newStream(testSize, identity, `{app="foo"}`)}, }, @@ -363,7 +364,27 @@ func TestEngine_NewRangeQuery(t *testing.T) { Streams([]*logproto.Stream{newStream(10, identity, `{app="foo"}`)}), }, { - `{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.BACKWARD, 30, + `{app="food"}`, time.Unix(0, 0), time.Unix(30, 0), 0, 2 * time.Second, logproto.FORWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, identity, `{app="food"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="food"}`}}, + }, + Streams([]*logproto.Stream{newIntervalStream(10, 2*time.Second, identity, `{app="food"}`)}), + }, + { + `{app="fed"}`, time.Unix(0, 0), time.Unix(30, 0), 0, 2 * time.Second, logproto.BACKWARD, 10, + [][]*logproto.Stream{ + {newBackwardStream(testSize, identity, `{app="fed"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="fed"}`}}, + }, + Streams([]*logproto.Stream{newBackwardIntervalStream(testSize, 10, 2*time.Second, identity, `{app="fed"}`)}), + }, + { + `{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), time.Second, 0, logproto.BACKWARD, 30, [][]*logproto.Stream{ {newStream(testSize, identity, `{app="bar"}`)}, }, @@ -373,7 +394,17 @@ func TestEngine_NewRangeQuery(t *testing.T) { Streams([]*logproto.Stream{newStream(30, identity, `{app="bar"}`)}), }, { - `rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), time.Minute, logproto.BACKWARD, 10, + `{app="barf"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), 0, 3 * time.Second, logproto.BACKWARD, 30, + [][]*logproto.Stream{ + {newBackwardStream(testSize, identity, `{app="barf"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="barf"}|="foo"|~".+bar"`}}, + }, + Streams([]*logproto.Stream{newBackwardIntervalStream(testSize, 30, 3*time.Second, identity, `{app="barf"}`)}), + }, + { + `rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), time.Minute, 0, logproto.BACKWARD, 10, [][]*logproto.Stream{ {newStream(testSize, identity, `{app="foo"}`)}, }, @@ -388,7 +419,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, logproto.FORWARD, 10, + `rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10, [][]*logproto.Stream{ {newStream(testSize, factor(2, identity), `{app="foo"}`)}, }, @@ -403,7 +434,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, logproto.BACKWARD, 10, + `count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, 0, logproto.BACKWARD, 10, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 60 = 6 total }, @@ -418,7 +449,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `count_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), time.Unix(5*120, 0), 30 * time.Second, logproto.BACKWARD, 10, + `count_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), time.Unix(5*120, 0), 30 * time.Second, 0, logproto.BACKWARD, 10, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 300 = 30 total }, @@ -445,7 +476,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, }, @@ -460,7 +491,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, }, @@ -475,7 +506,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -494,7 +525,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -509,7 +540,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(5, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -524,7 +555,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -543,7 +574,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, }, @@ -558,7 +589,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -573,7 +604,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(2, identity), `{app="bar"}`)}, }, @@ -588,7 +619,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `rate(({app=~"foo|bar"} |~".+bar")[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `rate(({app=~"foo|bar"} |~".+bar")[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -607,7 +638,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`), newStream(testSize, factor(15, identity), `{app="boo"}`)}, }, @@ -626,7 +657,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, }, @@ -641,7 +672,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(15, identity), `{app="fuzz"}`), newStream(testSize, factor(5, identity), `{app="fuzz"}`), newStream(testSize, identity, `{app="buzz"}`)}, @@ -665,7 +696,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(20, identity), `{app="bar"}`), newStream(testSize, factor(5, identity), `{app="fuzz"}`), newStream(testSize, identity, `{app="buzz"}`)}, @@ -685,7 +716,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, }, { - `bottomk(3,rate(({app=~"foo|bar|fuzz|buzz"} |~".+bar")[1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + `bottomk(3,rate(({app=~"foo|bar|fuzz|buzz"} |~".+bar")[1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(10, identity), `{app="foo"}`), @@ -715,7 +746,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { // binops { `rate({app="foo"}[1m]) or rate({app="bar"}[1m])`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -744,7 +775,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { rate({app=~"foo|bar"}[1m]) and rate({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -770,7 +801,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { rate({app=~"foo|bar"}[1m]) unless rate({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -796,7 +827,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { rate({app=~"foo|bar"}[1m]) + rate({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -822,7 +853,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { rate({app=~"foo|bar"}[1m]) - rate({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -848,7 +879,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { count_over_time({app=~"foo|bar"}[1m]) * count_over_time({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -874,7 +905,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { count_over_time({app=~"foo|bar"}[1m]) * count_over_time({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -900,7 +931,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { count_over_time({app=~"foo|bar"}[1m]) / count_over_time({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -926,7 +957,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { count_over_time({app=~"foo|bar"}[1m]) % count_over_time({app="bar"}[1m]) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -954,7 +985,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) / sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m])) `, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="foo"}`), @@ -977,7 +1008,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `1+1--1`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{}, []SelectParams{}, promql.Matrix{ @@ -988,7 +1019,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `rate({app="bar"}[1m]) - 1`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="bar"}`), @@ -1006,7 +1037,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `1 - rate({app="bar"}[1m])`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="bar"}`), @@ -1024,7 +1055,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `rate({app="bar"}[1m]) - 1 / 2`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="bar"}`), @@ -1042,7 +1073,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `count_over_time({app="bar"}[1m]) ^ count_over_time({app="bar"}[1m])`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{ { newStream(testSize, factor(5, identity), `{app="bar"}`), @@ -1060,7 +1091,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { }, { `2`, - time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]*logproto.Stream{}, []SelectParams{}, promql.Matrix{ @@ -1076,7 +1107,7 @@ func TestEngine_NewRangeQuery(t *testing.T) { eng := NewEngine(EngineOpts{}, newQuerierRecorder(test.streams, test.params)) - q := eng.NewRangeQuery(test.qs, test.start, test.end, test.step, test.direction, test.limit) + q := eng.NewRangeQuery(test.qs, test.start, test.end, test.step, test.interval, test.direction, test.limit) res, err := q.Exec(context.Background()) if err != nil { t.Fatal(err) @@ -1148,7 +1179,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) { {`bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, {`bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, logproto.FORWARD}, } { - q := eng.NewRangeQuery(test.qs, start, end, 60*time.Second, test.direction, 1000) + q := eng.NewRangeQuery(test.qs, start, end, 60*time.Second, 0, test.direction, 1000) res, err := q.Exec(context.Background()) if err != nil { b.Fatal(err) @@ -1230,6 +1261,47 @@ func newStream(n int64, f generator, labels string) *logproto.Stream { } } +func newIntervalStream(n int64, step time.Duration, f generator, labels string) *logproto.Stream { + entries := []logproto.Entry{} + lastEntry := int64(-100) // Start with a really small value (negative) so we always output the first item + for i := int64(0); int64(len(entries)) < n; i++ { + if float64(lastEntry)+step.Seconds() <= float64(i) { + entries = append(entries, f(i)) + lastEntry = i + } + } + return &logproto.Stream{ + Entries: entries, + Labels: labels, + } +} + +func newBackwardStream(n int64, f generator, labels string) *logproto.Stream { + entries := []logproto.Entry{} + for i := n - 1; i > 0; i-- { + entries = append(entries, f(i)) + } + return &logproto.Stream{ + Entries: entries, + Labels: labels, + } +} + +func newBackwardIntervalStream(n, expectedResults int64, step time.Duration, f generator, labels string) *logproto.Stream { + entries := []logproto.Entry{} + lastEntry := int64(100000) //Start with some really big value so that we always output the first item + for i := n - 1; int64(len(entries)) < expectedResults; i-- { + if float64(lastEntry)-step.Seconds() >= float64(i) { + entries = append(entries, f(i)) + lastEntry = i + } + } + return &logproto.Stream{ + Entries: entries, + Labels: labels, + } +} + func identity(i int64) logproto.Entry { return logproto.Entry{ Timestamp: time.Unix(i, 0), diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 85659db9bef8..8c0a29063d1e 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -37,6 +37,7 @@ type LiteralParams struct { qs string start, end time.Time step time.Duration + interval time.Duration direction logproto.Direction limit uint32 } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 73b4a1208939..501f1cb66711 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -50,7 +50,7 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { return } - query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) + query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Interval, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { writeError(err, w) @@ -127,7 +127,7 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) { return } - query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) + query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Interval, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { writeError(err, w)