Skip to content

Commit

Permalink
Loki: Adds an interval paramater to query_range queries allowing a …
Browse files Browse the repository at this point in the history
…sampling of events to be returned based on the provided interval (#1965)

* Adds an `interval` paramater to query_range queries allowing a sampling of events to be returned based on the provided interval

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* lint

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* extract function for parsing parameter

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* Clarify the experimental nature of interval in the docs.

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* make min time a constant

Signed-off-by: Ed Welch <edward.welch@grafana.com>

* Expand the docs around the experimental nature of interval a little more

Signed-off-by: Ed Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored Apr 27, 2020
1 parent 21d4ca4 commit c7a1a1f
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 75 deletions.
3 changes: 2 additions & 1 deletion cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since)
cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from)
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
cmd.Flag("step", "Query resolution step width").DurationVar(&q.Step)
cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step)
cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval)
}

cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)
Expand Down
18 changes: 12 additions & 6 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,22 @@ accepts the following query parameters in the URL:
- `limit`: The max number of entries to return
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to one hour ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`.
- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a dynamic value based on `start` and `end`. Only applies to query types which produce a matrix response.
- `interval`: **Experimental, See Below** Only return entries at (or greater than) the specified interval, can be a `duration` format or float number of seconds. Only applies to queries which produce a stream response.
- `direction`: Determines the sort order of logs. Supported values are `forward` or `backward`. Defaults to `backward.`

Requests against this endpoint require Loki to query the index store in order to
find log streams for particular labels. Because the index store is spread out by
time, the time span covered by `start` and `end`, if large, may cause additional
load against the index server and result in a slow query.

In microservices mode, `/loki/api/v1/query_range` is exposed by the querier and the frontend.

##### Step vs Interval

Use the `step` parameter when making metric queries to Loki, or queries which return a matrix response. It is evaluated in exactly the same way Prometheus evaluates `step`. First the query will be evaluated at `start` and then evaluated again at `start + step` and again at `start + step + step` until `end` is reached. The result will be a matrix of the query result evaluated at each step.

Use the `interval` parameter when making log queries to Loki, or queries which return a stream response. It is evaluated by returning a log entry at `start`, then the next entry will be returned an entry with timestampe >= `start + interval`, and again at `start + interval + interval` and so on until `end` is reached. It does not fill missing entries.

**Note about the experimental nature of interval** This flag may be removed in the future, if so it will likely be in favor of a LogQL expression to perform similar behavior, however that is uncertain at this time. [Issue 1779](https://github.com/grafana/loki/issues/1779) was created to track the discussion, if you are using `interval` please go add your use case and thoughts to that issue.



Response:

```
Expand Down
6 changes: 5 additions & 1 deletion pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log
// QueryRange uses the /api/v1/query_range endpoint to execute a range query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
params := util.NewQueryStringBuilder()
params.SetString("query", queryStr)
params.SetInt32("limit", limit)
Expand All @@ -68,6 +68,10 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time,
params.SetInt("step", int64(step.Seconds()))
}

if interval != 0 {
params.SetInt("interval", int64(interval.Seconds()))
}

return c.doQuery(params.EncodeWithPath(queryRangePath), quiet)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Query struct {
Limit int
Forward bool
Step time.Duration
Interval time.Duration
Quiet bool
NoLabels bool
IgnoreLabelsKey []string
Expand Down Expand Up @@ -70,7 +71,7 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool)
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
} else {
resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Quiet)
resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Interval, q.Quiet)
}

if err != nil {
Expand Down Expand Up @@ -121,7 +122,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
if q.isInstant() {
query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit))
} else {
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.resultsDirection(), uint32(q.Limit))
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit))
}

// execute the query
Expand Down
31 changes: 21 additions & 10 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,15 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) {
if value == "" {
return time.Duration(defaultQueryRangeStep(start, end)) * time.Second, nil
}
return parseSecondsOrDuration(value)
}

if d, err := strconv.ParseFloat(value, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(value); err == nil {
return time.Duration(d), nil
func interval(r *http.Request) (time.Duration, error) {
value := r.Form.Get("interval")
if value == "" {
return 0, nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
return parseSecondsOrDuration(value)
}

// Match extracts and parses multiple matcher groups from a slice of strings
Expand Down Expand Up @@ -160,3 +157,17 @@ func parseDirection(value string, def logproto.Direction) (logproto.Direction, e
}
return logproto.Direction(d), nil
}

func parseSecondsOrDuration(value string) (time.Duration, error) {
if d, err := strconv.ParseFloat(value, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, errors.Errorf("cannot parse %q to a valid duration. It overflows int64", value)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(value); err == nil {
return time.Duration(d), nil
}
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
}
55 changes: 55 additions & 0 deletions pkg/loghttp/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,61 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
}
}

func Test_interval(t *testing.T) {
tests := []struct {
name string
reqPath string
expected time.Duration
wantErr bool
}{
{
"valid_step_int",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5",
5 * time.Second,
false,
},
{
"valid_step_duration",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=5m",
5 * time.Minute,
false,
},
{
"invalid",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=a",
0,
true,
},
{
"valid_0",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000&interval=0",
0,
false,
},
{
"valid_not_included",
"/loki/api/v1/query_range?query={}&start=0&end=3600000000000",
0,
false,
},
}
for _, testData := range tests {
testData := testData

t.Run(testData.name, func(t *testing.T) {
req := httptest.NewRequest("GET", testData.reqPath, nil)
err := req.ParseForm()
require.Nil(t, err)
actual, err := interval(req)
if testData.wantErr {
require.Error(t, err)
} else {
assert.Equal(t, testData.expected, actual)
}
})
}
}

func Test_parseTimestamp(t *testing.T) {

now := time.Now()
Expand Down
17 changes: 14 additions & 3 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

var (
errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
errNegativeInterval = errors.New("interval must be >= 0")
)

// QueryStatus holds the status of a query
Expand Down Expand Up @@ -239,6 +240,7 @@ type RangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Interval time.Duration
Query string
Direction logproto.Direction
Limit uint32
Expand Down Expand Up @@ -284,5 +286,14 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, errStepTooSmall
}

result.Interval, err = interval(r)
if err != nil {
return nil, err
}

if result.Interval < 0 {
return nil, errNegativeInterval
}

return &result, nil
}
4 changes: 4 additions & 0 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func TestParseRangeQuery(t *testing.T) {
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`),
}, nil, true},
{"negative interval",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1,interval=-1`),
}, nil, true},
{"good",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD&step=3600`),
Expand Down
39 changes: 28 additions & 11 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
Help: "LogQL query timings",
Buckets: prometheus.DefBuckets,
}, []string{"query_type"})
lastEntryMinTime = time.Unix(-100, 0)
)

// ValueTypeStreams promql.ValueType for log streams
Expand Down Expand Up @@ -68,7 +69,7 @@ func (opts *EngineOpts) applyDefault() {

// Engine interface used to construct queries
type Engine interface {
NewRangeQuery(qs string, start, end time.Time, step time.Duration, direction logproto.Direction, limit uint32) Query
NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
}

Expand Down Expand Up @@ -144,14 +145,15 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
// NewRangeQuery creates a new LogQL range query.
func (ng *engine) NewRangeQuery(
qs string,
start, end time.Time, step time.Duration,
start, end time.Time, step time.Duration, interval time.Duration,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
},
Expand All @@ -170,6 +172,7 @@ func (ng *engine) NewInstantQuery(
start: ts,
end: ts,
step: 0,
interval: 0,
direction: direction,
limit: limit,
},
Expand Down Expand Up @@ -199,7 +202,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
return streams, err
}

Expand Down Expand Up @@ -297,19 +300,33 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
return promql.Matrix{series}
}

func readStreams(i iter.EntryIterator, size uint32) (Streams, error) {
func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, interval time.Duration) (Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
for ; respSize < size && i.Next(); respSize++ {
// lastEntry should be a really old time so that the first comparison is always true, we use a negative
// value here because many unit tests start at time.Unix(0,0)
lastEntry := lastEntryMinTime
for respSize < size && i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
forwardShouldOutput := dir == logproto.FORWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(interval)) || i.Entry().Timestamp.After(lastEntry.Add(interval)))
backwardShouldOutput := dir == logproto.BACKWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(-interval)) || i.Entry().Timestamp.Before(lastEntry.Add(-interval)))
// If step == 0 output every line.
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward or reverse step
if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput {
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
streams[labels] = stream
stream.Entries = append(stream.Entries, entry)
lastEntry = i.Entry().Timestamp
respSize++
}
stream.Entries = append(stream.Entries, entry)
}

result := make([]*logproto.Stream, 0, len(streams))
Expand Down
Loading

0 comments on commit c7a1a1f

Please sign in to comment.