Skip to content

Commit

Permalink
Enables Series API in loki (#1419)
Browse files Browse the repository at this point in the history
* helpers for parsing /series requests

* logproto.SeriesResponse impl

* passes unparsed matchers in protobuf for simplicity

* ingester support for series queries

* labels map coercion

* fixes label match loghttp test

* removes labelmatcher protobuf adapters

* series api impl

* tests matchers for dispatching series queries

* series looked up serially for /series api

* splits series api async code and remove unnecessary type assertion

* adds series handle to /api/prom/series and includes docs

* compatibility changes w/ upstream merge
  • Loading branch information
owen-d authored and cyriltovena committed Dec 20, 2019
1 parent fb76416 commit dafb9d8
Show file tree
Hide file tree
Showing 19 changed files with 1,911 additions and 98 deletions.
76 changes: 76 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ The HTTP API includes the following endpoints:
- [`GET /loki/api/v1/labels`](#get-lokiapiv1labels)
- [`GET /loki/api/v1/label/<name>/values`](#get-lokiapiv1labelnamevalues)
- [`GET /loki/api/v1/tail`](#get-lokiapiv1tail)
- [`GET /loki/api/v1/series`](#series)
- [`POST /loki/api/v1/series`](#series)
- [`POST /loki/api/v1/push`](#post-lokiapiv1push)
- [`GET /api/prom/tail`](#get-apipromtail)
- [`GET /api/prom/query`](#get-apipromquery)
- [`GET /api/prom/label`](#get-apipromlabel)
- [`GET /api/prom/label/<name>/values`](#get-apipromlabelnamevalues)
- [`GET /api/prom/series`](#series)
- [`POST /api/prom/series`](#series)
- [`POST /api/prom/push`](#post-apiprompush)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
Expand Down Expand Up @@ -743,3 +747,75 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.
for a list of exported metrics.

In microservices mode, the `/metrics` endpoint is exposed by all components.

## Series

The Series API is available under the following:
- `GET /loki/api/v1/series`
- `POST /loki/api/v1/series`
- `GET /api/prom/series`
- `POST /api/prom/series`

This endpoint returns the list of time series that match a certain label set.

URL query parameters:

- `match[]=<series_selector>`: Repeated log stream selector argument that selects the streams to return. At least one `match[]` argument must be provided.
- `start=<nanosecond Unix epoch>`: Start timestamp.
- `end=<nanosecond Unix epoch>`: End timestamp.

You can URL-encode these parameters directly in the request body by using the POST method and `Content-Type: application/x-www-form-urlencoded` header. This is useful when specifying a large or dynamic number of stream selectors that may breach server-side URL character limits.

In microservices mode, these endpoints are exposed by the querier.

### Examples

``` bash
$ curl -s "http://localhost:3100/loki/api/v1/series" --data-urlencode 'match={container_name=~"prometheus.*", component="server"}' --data-urlencode 'match={app="loki"}' | jq '.'
{
"status": "success",
"data": [
{
"container_name": "loki",
"app": "loki",
"stream": "stderr",
"filename": "/var/log/pods/default_loki-stack-0_50835643-1df0-11ea-ba79-025000000001/loki/0.log",
"name": "loki",
"job": "default/loki",
"controller_revision_hash": "loki-stack-757479754d",
"statefulset_kubernetes_io_pod_name": "loki-stack-0",
"release": "loki-stack",
"namespace": "default",
"instance": "loki-stack-0"
},
{
"chart": "prometheus-9.3.3",
"container_name": "prometheus-server-configmap-reload",
"filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server-configmap-reload/0.log",
"instance": "loki-stack-prometheus-server-696cc9ddff-87lmq",
"pod_template_hash": "696cc9ddff",
"app": "prometheus",
"component": "server",
"heritage": "Tiller",
"job": "default/prometheus",
"namespace": "default",
"release": "loki-stack",
"stream": "stderr"
},
{
"app": "prometheus",
"component": "server",
"filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server/0.log",
"release": "loki-stack",
"namespace": "default",
"pod_template_hash": "696cc9ddff",
"stream": "stderr",
"chart": "prometheus-9.3.3",
"container_name": "prometheus-server",
"heritage": "Tiller",
"instance": "loki-stack-prometheus-server-696cc9ddff-87lmq",
"job": "default/prometheus"
}
]
}
```
11 changes: 11 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return instance.Label(ctx, req)
}

// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

instance := i.getOrCreateInstance(instanceID)
return instance.Series(ctx, req)
}

// Check implements grpc_health_v1.HealthCheck.
func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
Expand Down
85 changes: 85 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,91 @@ func TestIngester(t *testing.T) {
require.Len(t, result.resps, 1)
require.Len(t, result.resps[0].Streams, 1)
require.Equal(t, `{bar="baz2", foo="bar"}`, result.resps[0].Streams[0].Labels)

// Series

// empty matchers
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
})
require.Error(t, err)

// wrong matchers fmt
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{a="b`},
})
require.Error(t, err)

// no selectors
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`, `{}`},
})
require.Error(t, err)

// foo=bar
resp, err := i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// foo=bar, bar=~"baz[2-9]"
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar", bar=~"baz[2-9]"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// foo=bar, bar=~"baz[2-9]" in different groups should OR the results
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`, `{bar=~"baz[2-9]"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())
}

func TestIngesterStreamLimitExceeded(t *testing.T) {
Expand Down
67 changes: 59 additions & 8 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -190,7 +191,20 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
if err != nil {
return err
}
iters, err := i.lookupStreams(queryServer.Context(), req, expr.Matchers(), filter)

var iters []iter.EntryIterator

err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
iters = append(iters, iter)
return nil
},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,32 +235,69 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}

func (i *instance) lookupStreams(ctx context.Context, req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) {
func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
groups, err := loghttp.Match(req.GetGroups())
if err != nil {
return nil, err
}

dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}

dedupedSeries[key] = logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
return nil
})

if err != nil {
return nil, err
}
}
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)

}
return &logproto.SeriesResponse{Series: series}, nil
}

// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream acces without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
matchers []*labels.Matcher,
fn func(*stream) error,
) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

filters, matchers := cutil.SplitFiltersAndMatchers(matchers)
ids := i.index.Lookup(matchers)
iterators := make([]iter.EntryIterator, 0, len(ids))

outer:
for _, streamID := range ids {
stream, ok := i.streams[streamID]
if !ok {
return nil, ErrStreamMissing
return ErrStreamMissing
}
for _, filter := range filters {
if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer
}
}
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)

err := fn(stream)
if err != nil {
return nil, err
return err
}
iterators = append(iterators, iter)
}
return iterators, nil
return nil
}

func (i *instance) addNewTailer(t *tailer) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type LabelResponse struct {
// LabelSet is a key/value pair mapping of labels
type LabelSet map[string]string

// Map coerces LabelSet into a map[string]string. This is useful for working with adapter types.
func (l LabelSet) Map() map[string]string {
return l
}

// String implements the Stringer interface. It returns a formatted/sorted set of label key/value pairs.
func (l LabelSet) String() string {
var b bytes.Buffer
Expand Down
16 changes: 16 additions & 0 deletions pkg/loghttp/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func TestParseLabelQuery(t *testing.T) {
}
}

func TestLabelsMap(t *testing.T) {
ls := LabelSet{
"a": "1",
"b": "2",
}

require.Equal(
t,
map[string]string{
"a": "1",
"b": "2",
},
ls.Map(),
)
}

func timePtr(t time.Time) *time.Time {
return &t
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

const (
Expand Down Expand Up @@ -71,6 +73,27 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) {
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
}

// Match extracts and parses multiple matcher groups from a slice of strings
func Match(xs []string) ([][]*labels.Matcher, error) {
if len(xs) == 0 {
return nil, errors.New("0 matcher groups supplied")
}

groups := make([][]*labels.Matcher, 0, len(xs))
for _, x := range xs {
ms, err := logql.ParseMatchers(x)
if err != nil {
return nil, err
}
if len(ms) == 0 {
return nil, errors.Errorf("0 matchers in group: %s", x)
}
groups = append(groups, ms)
}

return groups, nil
}

// defaultQueryRangeStep returns the default step used in the query range API,
// which is dinamically calculated based on the time range
func defaultQueryRangeStep(start time.Time, end time.Time) int {
Expand Down
Loading

0 comments on commit dafb9d8

Please sign in to comment.