Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Filtering -- Querier #5482

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
696 changes: 576 additions & 120 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,21 @@ message QueryRequest {
Direction direction = 5;
reserved 6;
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];
repeated Delete deletes = 8;
}

message SampleQueryRequest {
string selector = 1;
google.protobuf.Timestamp start = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"];
repeated Delete deletes = 5;
}

message Delete {
string selector = 1;
int64 start = 2;
int64 end = 3;
}

message QueryResponse {
Expand Down
48 changes: 48 additions & 0 deletions pkg/loki/delete_store_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package loki

import (
"github.com/grafana/dskit/services"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)

func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
return &listener{d}
}

type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
}

// Starting is called when the service transitions from NEW to STARTING.
func (l *listener) Starting() {}

// Running is called when the service transitions from STARTING to RUNNING.
func (l *listener) Running() {}

// Stopping is called when the service transitions to the STOPPING state.
func (l *listener) Stopping(from services.State) {
if from == services.Stopping || from == services.Terminated || from == services.Failed {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
}

// Terminated is called when the service transitions to the TERMINATED state.
func (l *listener) Terminated(from services.State) {
if from == services.Stopping || from == services.Terminated || from == services.Failed {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
}

// Failed is called when the service transitions to the FAILED state.
func (l *listener) Failed(from services.State, failure error) {
if from == services.Stopping || from == services.Terminated || from == services.Failed {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
}
52 changes: 46 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -221,9 +223,12 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

logger := log.With(util_log.Logger, "component", "querier")
var err error
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
deleteStore, err := t.deleteRequestsStore()
if err != nil {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore)
if err != nil {
return nil, err
}
Expand All @@ -249,6 +254,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
httpreq.ExtractQueryMetricsMiddleware(),
)

logger := log.With(util_log.Logger, "component", "querier")
t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.overrides, logger)
queryHandlers := map[string]http.Handler{
"/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.RangeQueryHandler)),
Expand Down Expand Up @@ -278,9 +284,23 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/api/prom/tail": http.HandlerFunc(t.querierAPI.TailHandler),
}

return querier.InitWorkerService(
querierWorkerServiceConfig, prometheus.DefaultRegisterer, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
svc, err := querier.InitWorkerService(
querierWorkerServiceConfig,
prometheus.DefaultRegisterer,
queryHandlers,
alwaysExternalHandlers,
t.Server.HTTP,
t.Server.HTTPServer.Handler,
t.HTTPAuthMiddleware,
)
if err != nil {
return nil, err
}

if svc != nil {
svc.AddListener(deleteRequestsStoreListener(deleteStore))
}
return svc, nil
}

func (t *Loki) initIngester() (_ services.Service, err error) {
Expand Down Expand Up @@ -625,7 +645,13 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)

deleteStore, err := t.deleteRequestsStore()
if err != nil {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -674,6 +700,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup)))
}

t.ruler.AddListener(deleteRequestsStoreListener(deleteStore))
return t.ruler, nil
}

Expand Down Expand Up @@ -791,6 +818,19 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
deleteStore := deletion.NewNoOpDeleteRequestsStore()
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
indexClient, err := chunk_storage.NewIndexClient(shipper.BoltDBShipperType, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
}
return deleteStore, nil
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
Expand Down
56 changes: 51 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -86,18 +88,22 @@ type SingleTenantQuerier struct {
store storage.Store
limits *validation.Overrides
ingesterQuerier *IngesterQuerier
deleteGetter deleteGetter
}

type deleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// New makes a new Querier.
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides) (*SingleTenantQuerier, error) {
querier := SingleTenantQuerier{
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides, d deleteGetter) (*SingleTenantQuerier, error) {
return &SingleTenantQuerier{
cfg: cfg,
store: store,
ingesterQuerier: ingesterQuerier,
limits: limits,
}

return &querier, nil
deleteGetter: d,
}, nil
}

// Select Implements logql.Querier which select logs via matchers and regex filters.
Expand All @@ -108,6 +114,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
return nil, err
}

params.QueryRequest.Deletes, err = q.deletesForUser(ctx, params.Start, params.End)
if err != nil {
return nil, err
}

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

iters := []iter.EntryIterator{}
Expand Down Expand Up @@ -157,6 +168,11 @@ func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.Se
return nil, err
}

params.SampleQueryRequest.Deletes, err = q.deletesForUser(ctx, params.Start, params.End)
if err != nil {
return nil, err
}

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

iters := []iter.SampleIterator{}
Expand Down Expand Up @@ -192,6 +208,36 @@ func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.Se
return iter.NewMergeSampleIterator(ctx, iters), nil
}

func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT time.Time) ([]*logproto.Delete, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

d, err := q.deleteGetter.GetAllDeleteRequestsForUser(ctx, userID)
if err != nil {
return nil, err
}

start := startT.UnixNano()
end := endT.UnixNano()

var deletes []*logproto.Delete
for _, del := range d {
if int64(del.StartTime) <= end && int64(del.EndTime) >= start {
for _, selector := range del.Selectors {
deletes = append(deletes, &logproto.Delete{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a future PR it would be nice to have this as a metrics somewhere so we can see if the different instances are in sync.

Selector: selector,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
}
}

return deletes, nil
}

func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
Expand Down
55 changes: 55 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c *querierClientMock) Query(ctx context.Context, in *logproto.QueryRequest
return args.Get(0).(logproto.Querier_QueryClient), args.Error(1)
}

func (c *querierClientMock) QuerySample(ctx context.Context, in *logproto.SampleQueryRequest, opts ...grpc.CallOption) (logproto.Querier_QuerySampleClient, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(logproto.Querier_QuerySampleClient), args.Error(1)
}

func (c *querierClientMock) Label(ctx context.Context, in *logproto.LabelRequest, opts ...grpc.CallOption) (*logproto.LabelResponse, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(*logproto.LabelResponse), args.Error(1)
Expand Down Expand Up @@ -141,6 +146,49 @@ func (c *queryClientMock) Context() context.Context {
return context.Background()
}

// queryClientMock is a mockable version of Querier_QueryClient
type querySampleClientMock struct {
util.ExtendedMock
logproto.Querier_QueryClient
}

func newQuerySampleClientMock() *querySampleClientMock {
return &querySampleClientMock{}
}

func (c *querySampleClientMock) Recv() (*logproto.SampleQueryResponse, error) {
args := c.Called()
res := args.Get(0)
if res == nil {
return (*logproto.SampleQueryResponse)(nil), args.Error(1)
}
return res.(*logproto.SampleQueryResponse), args.Error(1)
}

func (c *querySampleClientMock) Header() (grpc_metadata.MD, error) {
return nil, nil
}

func (c *querySampleClientMock) Trailer() grpc_metadata.MD {
return nil
}

func (c *querySampleClientMock) CloseSend() error {
return nil
}

func (c *querySampleClientMock) SendMsg(m interface{}) error {
return nil
}

func (c *querySampleClientMock) RecvMsg(m interface{}) error {
return nil
}

func (c *querySampleClientMock) Context() context.Context {
return context.Background()
}

// tailClientMock is mockable version of Querier_TailClient
type tailClientMock struct {
util.ExtendedMock
Expand Down Expand Up @@ -381,6 +429,13 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockSampleIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
func mockSampleIterator(client iter.QuerySampleClient) iter.SampleIterator {
return iter.NewSampleQueryClientIterator(client)
}

// mockStream return a stream with quantity entries, where entries timestamp and
// line string are constructed as sequential numbers starting at from
func mockStream(from int, quantity int) logproto.Stream {
Expand Down
Loading