Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new feature] API: Introduce exemplar api #7974

Closed
wants to merge 18 commits into from
141 changes: 141 additions & 0 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ These endpoints are exposed by the querier and the query frontend:

- [`GET /loki/api/v1/query`](#query-loki)
- [`GET /loki/api/v1/query_range`](#query-loki-over-a-range-of-time)
- [`GET /loki/api/v1/query_query_exemplars`](#query-exemplars-over-a-range-of-time)
- [`GET /loki/api/v1/labels`](#list-labels-within-a-range-of-time)
- [`GET /loki/api/v1/label/<name>/values`](#list-label-values-within-a-range-of-time)
- [`GET /loki/api/v1/series`](#list-series)
Expand Down Expand Up @@ -427,6 +428,146 @@ $ curl -G -s "http://localhost:3100/loki/api/v1/query_range" --data-urlencode '
}
```

## Query Exemplars over a range of time

```
GET /loki/api/v1/query_exemplars
```

`/loki/api/v1/query_exemplars` is used to do a query exemplars over a range of time and
accepts the following query parameters in the URL:

- `query`: The [LogQL](../logql/) query to perform
- `limit`: The max number of entries to return. It defaults to `100`. Only applies to query types which produce a
stream(log lines) response.
- `start`: The start time for the query as a nanosecond Unix epoch or another [supported format](#timestamp-formats).
Defaults to one hour ago.
- `end`: The end time for the query as a nanosecond Unix epoch or another [supported format](#timestamp-formats).
Defaults to now.
- `step`: Query resolution step width in `duration` format or float number of seconds. `duration` refers to Prometheus
duration strings of the form `[0-9]+[smhdwy]`. For example, 5m refers to a duration of 5 minutes. Defaults to a
dynamic value based on `start` and `end`. Only applies to query types which produce a matrix response.
- `interval`: <span style="background-color:#f3f973;">This parameter is experimental; see the explanation under Step
versus interval.</span> Only return entries at (or greater than) the specified interval, can be a `duration` format or
float number of seconds. Only applies to queries which produce a stream response.
- `direction`: Determines the sort order of logs. Supported values are `forward` or `backward`. Defaults to `backward.`

In microservices mode, `/loki/api/v1/query_exemplars` is exposed by the querier and the frontend.

### Step versus interval

Use the `step` parameter when making metric queries to Loki, or queries which return a matrix response. It is evaluated
in exactly the same way Prometheus evaluates `step`. First the query will be evaluated at `start` and then evaluated
again at `start + step` and again at `start + step + step` until `end` is reached. The result will be a matrix of the
query result evaluated at each step.

Use the `interval` parameter when making log queries to Loki, or queries which return a stream response. It is evaluated
by returning a log entry at `start`, then the next entry will be returned an entry with timestampe >= `start + interval`
, and again at `start + interval + interval` and so on until `end` is reached. It does not fill missing entries.

<span style="background-color:#f3f973;">Note about the experimental nature of the interval parameter:</span> This flag
may be removed in the future, if so it will likely be in favor of a LogQL expression to perform similar behavior,
however that is uncertain at this time. [Issue 1779](https://github.com/grafana/loki/issues/1779) was created to track
the discussion, if you are using `interval` please go add your use case and thoughts to that issue.

Response:

```
{
"status": "success",
"data": {
"resultType": "exemplars",
"result": [<exemplar value>]
"stats" : [<statistics>]
}
}
```

Where `<exemplar value>` is:

```
{
"metric": {
<label key-value pairs>
},
"values": [
[
labels: {
<label key-value pairs>
},
timestamp: <number: second unix epoch>,
value: <string: value>
],
...
]
}
```

The items in the `values` array are sorted by timestamp, and the oldest item is first.

See [statistics](#statistics) for information about the statistics returned by Loki.

### Examples

```bash
$ curl -G -s "http://localhost:3100/loki/api/v1/query_exemplars" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' --data-urlencode 'step=300' | jq
{
"status": "success",
"data": {
"resultType": "exemplars",
"result": [
{
"metric": {
"level": "info"
},
values: [{
labels: {
_line: "[Info]2022/12/19 20:36:07 main.go:43: Ali cloud print updater check again at 2022-12-19 20:36:07.363764 +0800 CST m=+295201.049795879",
"level": "info"
},
value: "0",
timestamp: 1671452983
},
{
labels: {
_line: "[Info]2022/12/19 20:36:07 main.go:43: at 2022-12-19 20:36:07.363764 +0800 CST m=+295201.049795879",
"level": "info"
},
value: "0",
timestamp: 1671452985
}
]
},
{
"metric": {
"level": "warn"
},
values: [{
labels: {
_line: "[Warn]2022/12/19 20:36:07 main.go:43: check again at 2022-12-19 20:36:07.363764 +0800 CST m=+295201.049795879",
"level": "warn"
},
value: "0",
timestamp: 1671452983
},
{
labels: {
_line: "[Warn]2022/12/19 20:36:07 main.go:43: Ali cloud print updater check again at 2022-12-19 20:36:07.363764 +0800 CST m=+295201.049795879",
"level": "warn"
},
value: "0",
timestamp: 1671452985
}
]
}
],
"stats": {
...
}
}
}
```

## List labels within a range of time

```
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,5 @@ type Block interface {
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
ExemplarIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.ExemplarIterator
}
143 changes: 143 additions & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,47 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
)
}

func (c *MemChunk) ExemplarIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.ExemplarIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.ExemplarIterator, 0, len(c.blocks)+1)

var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
// skip this block
if maxt < b.mint || b.maxt < mint {
continue
}

if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
its = append(its, encBlock{c.encoding, b}.ExemplarIterator(ctx, extractor))
}

if !c.head.IsEmpty() {
from, _ := c.head.Bounds()
if from < lastMax {
ordered = false
}
its = append(its, c.head.ExemplarIterator(ctx, mint, maxt, extractor))
}

var it iter.ExemplarIterator
if ordered {
it = iter.NewNonOverlappingExemplarIterator(its)
} else {
it = iter.NewSortExemplarIterator(its)
}

return iter.NewTimeRangedExemplarIterator(
it,
mint,
maxt,
)
}

// Blocks implements Chunk
func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
Expand Down Expand Up @@ -959,6 +1000,13 @@ type encBlock struct {
block
}

func (b encBlock) ExemplarIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.ExemplarIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newExemplarIterator(ctx, getReaderPool(b.enc), b.b, extractor)
}

func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator {
if len(b.b) == 0 {
return iter.NoopIterator
Expand Down Expand Up @@ -1101,6 +1149,59 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
})
}

func (hb *headBlock) ExemplarIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.ExemplarIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
stats := stats.FromContext(ctx)
stats.AddHeadChunkLines(int64(len(hb.entries)))
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s)
if !ok {
continue
}
var (
found bool
s *logproto.Series
)

lbs := parsedLabels.String()
if s, found = series[lbs]; !found {
s = &logproto.Series{
Labels: lbs,
Exemplars: ExemplarsPool.Get(len(hb.entries)).([]logproto.Exemplar)[:0],
StreamHash: baseHash,
}
series[lbs] = s
}

s.Exemplars = append(s.Exemplars, logproto.Exemplar{
TimestampMs: e.t,
Value: value,
Labels: logproto.FromExemplarLabelsToLabelAdapters(parsedLabels.Labels(), []byte(e.s)),
Hash: xxhash.Sum64(unsafeGetBytes(e.s)),
})
}

if len(series) == 0 {
return iter.NoopIterator
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
seriesRes = append(seriesRes, *s)
}
return iter.ExemplarIteratorWithClose(iter.NewMultiExemplarSeriesIterator(seriesRes), func() error {
for _, s := range series {
ExemplarsPool.Put(s.Exemplars)
}
return nil
})
}

func unsafeGetBytes(s string) []byte {
var buf []byte
p := unsafe.Pointer(&buf)
Expand Down Expand Up @@ -1332,3 +1433,45 @@ func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLa
func (e *sampleBufferedIterator) Sample() logproto.Sample {
return e.cur
}

//

func newExemplarIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.ExemplarIterator {
it := &exemplarBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b),
extractor: extractor,
}
return it
}

type exemplarBufferedIterator struct {
*bufferedIterator

extractor log.StreamSampleExtractor

cur logproto.Exemplar
currLabels log.LabelsResult
}

func (e *exemplarBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, lbs, ok := e.extractor.Process(e.currTs, e.currLine)
if !ok {
continue
}
e.currLabels = lbs
e.cur.Value = val
e.cur.Hash = xxhash.Sum64(e.currLine)
e.cur.TimestampMs = e.currTs
e.cur.Labels = logproto.FromExemplarLabelsToLabelAdapters(lbs.Labels(), e.currLine)
return true
}
return false
}
func (e *exemplarBufferedIterator) Labels() string { return e.currLabels.String() }

func (e *exemplarBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLabels().Hash() }

func (e *exemplarBufferedIterator) Exemplar() logproto.Exemplar {
return e.cur
}
2 changes: 2 additions & 0 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var (

// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })
// ExemplarsPool pooling array of exemplarsPool [512,1024,...,16k]
ExemplarsPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Exemplar, 0, size) })

// Pool of crc32 hash
crc32HashPool = sync.Pool{
Expand Down
Loading