Skip to content

Commit

Permalink
Refactor pipeline setup to util
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle committed Apr 6, 2022
1 parent d10c2c5 commit 560a42e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 107 deletions.
56 changes: 3 additions & 53 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"syscall"

"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/deletion"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -325,7 +325,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
return nil, err
}

pipeline, err = setupPipelineDelete(req, pipeline)
pipeline, err = deletion.SetupPipeline(req, pipeline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -358,43 +358,6 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
return iter.NewSortEntryIterator(iters, req.Direction), nil
}

func setupPipelineDelete(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) {
if len(req.Deletes) == 0 {
return p, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return log.NewFilteringPipeline(filters, p), nil
}

func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) {
var filters []log.PipelineFilter
for _, d := range deletes {
expr, err := syntax.ParseLogSelector(d.Selector, true)
if err != nil {
return nil, err
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}

filters = append(filters, log.PipelineFilter{
Start: d.Start,
End: d.End,
Matchers: expr.Matchers(),
Pipeline: pipeline,
})
}

return filters, nil
}

func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
expr, err := req.Expr()
if err != nil {
Expand All @@ -406,7 +369,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
return nil, err
}

extractor, err = setupExtractorDelete(req, extractor)
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -446,19 +409,6 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
return iter.NewSortSampleIterator(iters), nil
}

func setupExtractorDelete(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) {
if len(req.Deletes) == 0 {
return se, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return log.NewFilteringSampleExtractor(filters, se), nil
}

// Label returns the label names or values depending on the given request
// Without label matchers the label names and values are retrieved from the index directly.
// If label matchers are given only the matching streams are fetched from the index.
Expand Down
57 changes: 3 additions & 54 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sort"
"time"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/util/deletion"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -21,7 +21,6 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
logql_log "github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -369,7 +368,7 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter
return nil, err
}

pipeline, err = setupPipelineDelete(req, pipeline)
pipeline, err = deletion.SetupPipeline(req, pipeline)
if err != nil {
return nil, err
}
Expand All @@ -382,43 +381,6 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter
return newLogBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer)
}

func setupPipelineDelete(req logql.SelectLogParams, p logql_log.Pipeline) (logql_log.Pipeline, error) {
if len(req.Deletes) == 0 {
return p, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return logql_log.NewFilteringPipeline(filters, p), nil
}

func deleteFilters(deletes []*logproto.Delete) ([]logql_log.PipelineFilter, error) {
var filters []logql_log.PipelineFilter
for _, d := range deletes {
expr, err := syntax.ParseLogSelector(d.Selector, true)
if err != nil {
return nil, err
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}

filters = append(filters, logql_log.PipelineFilter{
Start: d.Start,
End: d.End,
Matchers: expr.Matchers(),
Pipeline: pipeline,
})
}

return filters, nil
}

func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
matchers, from, through, err := decodeReq(req)
if err != nil {
Expand All @@ -444,7 +406,7 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
return nil, err
}

extractor, err = setupExtractorDelete(req, extractor)
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
Expand All @@ -457,19 +419,6 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
return newSampleBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer)
}

func setupExtractorDelete(req logql.SelectSampleParams, se logql_log.SampleExtractor) (logql_log.SampleExtractor, error) {
if len(req.Deletes) == 0 {
return se, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return logql_log.NewFilteringSampleExtractor(filters, se), nil
}

func (s *store) GetSchemaConfigs() []chunk.PeriodConfig {
return s.schemaCfg.Configs
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/util/deletion/deletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package deletion

import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/syntax"
)

func SetupPipeline(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, error) {
if len(req.Deletes) == 0 {
return p, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return log.NewFilteringPipeline(filters, p), nil
}

func SetupExtractor(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) {
if len(req.Deletes) == 0 {
return se, nil
}

filters, err := deleteFilters(req.Deletes)
if err != nil {
return nil, err
}

return log.NewFilteringSampleExtractor(filters, se), nil
}

func deleteFilters(deletes []*logproto.Delete) ([]log.PipelineFilter, error) {
var filters []log.PipelineFilter
for _, d := range deletes {
expr, err := syntax.ParseLogSelector(d.Selector, true)
if err != nil {
return nil, err
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, err
}

filters = append(filters, log.PipelineFilter{
Start: d.Start,
End: d.End,
Matchers: expr.Matchers(),
Pipeline: pipeline,
})
}

return filters, nil
}

0 comments on commit 560a42e

Please sign in to comment.