Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter parameter to rebound so lines can be deleted by the compactor #5879

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode.
* [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9.
* [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist.
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/filter"
)

const (
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *dumbChunk) Close() error {
return nil
}

func (c *dumbChunk) Rebound(start, end time.Time) (Chunk, error) {
func (c *dumbChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) {
return nil, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
)

// GzipLogChunk is a cortex encoding type for our chunks.
Expand Down Expand Up @@ -86,8 +87,8 @@ func (f Facade) LokiChunk() Chunk {
return f.c
}

func (f Facade) Rebound(start, end model.Time) (chunk.Data, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time())
func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time(), filter)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/filter"
)

// Errors returned by the chunk interface.
Expand Down Expand Up @@ -127,7 +128,7 @@ type Chunk interface {
CompressedSize() int
Close() error
Encoding() Encoding
Rebound(start, end time.Time) (Chunk, error)
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

// Block is a chunk block.
Expand Down
8 changes: 6 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -713,7 +714,7 @@ func (c *MemChunk) reorder() error {

// Otherwise, we need to rebuild the blocks
from, to := c.Bounds()
newC, err := c.Rebound(from, to)
newC, err := c.Rebound(from, to, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -910,7 +911,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
}

// Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) {
// add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive.
itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
if err != nil {
Expand All @@ -931,6 +932,9 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {

for itr.Next() {
entry := itr.Entry()
if filter != nil && filter(entry.Line) {
continue
}
if err := newChunk.Append(&entry); err != nil {
return nil, err
}
Expand Down
96 changes: 95 additions & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ func TestMemChunk_Rebound(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo)
newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo, nil)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
Expand Down Expand Up @@ -1231,3 +1231,97 @@ func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {

return chk
}

func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
chkFrom := time.Unix(1, 0) // headBlock.Append treats Unix time 0 as not set so we have to use a later time
chkFromPlus5 := chkFrom.Add(5 * time.Second)
chkThrough := chkFrom.Add(10 * time.Second)
chkThroughPlus1 := chkThrough.Add(1 * time.Second)

filterFunc := func(in string) bool {
return strings.HasPrefix(in, "matching")
}

for _, tc := range []struct {
name string
matchingSliceFrom, matchingSliceTo *time.Time
err error
nrMatching int
nrNotMatching int
}{
{
name: "no matches",
nrMatching: 0,
nrNotMatching: 10,
},
{
name: "some lines removed",
matchingSliceFrom: &chkFrom,
matchingSliceTo: &chkFromPlus5,
nrMatching: 5,
nrNotMatching: 5,
},
{
name: "all lines match",
err: chunk.ErrSliceNoDataInRange,
matchingSliceFrom: &chkFrom,
matchingSliceTo: &chkThroughPlus1,
},
} {
t.Run(tc.name, func(t *testing.T) {
originalChunk := buildFilterableTestMemChunk(t, chkFrom, chkThrough, tc.matchingSliceFrom, tc.matchingSliceTo)
newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, filterFunc)
if tc.err != nil {
require.Equal(t, tc.err, err)
return
}
require.NoError(t, err)

// iterate originalChunk from slice start to slice end + nanosecond. Adding a nanosecond here to be inclusive of sample at end time.
originalChunkItr, err := originalChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)
originalChunkSamples := 0
for originalChunkItr.Next() {
originalChunkSamples++
}
require.Equal(t, tc.nrMatching+tc.nrNotMatching, originalChunkSamples)

// iterate newChunk for whole chunk interval which should include all the samples in the chunk and hence align it with expected values.
newChunkItr, err := newChunk.Iterator(context.Background(), chkFrom, chkThrough.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)
newChunkSamples := 0
for newChunkItr.Next() {
newChunkSamples++
}
require.Equal(t, tc.nrNotMatching, newChunkSamples)
})
}
}

func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk {
chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
for from.Before(through) {
// If a line is between matchingFrom and matchingTo add the prefix "matching"
if matchingFrom != nil && matchingTo != nil &&
(from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) {
t.Logf("%v matching line", from.String())
err := chk.Append(&logproto.Entry{
Line: fmt.Sprintf("matching %v", from.String()),
Timestamp: from,
})
require.NoError(t, err)
} else {
t.Logf("%v non-match line", from.String())
err := chk.Append(&logproto.Entry{
Line: from.String(),
Timestamp: from,
})
require.NoError(t, err)
}
from = from.Add(time.Second)
}

return chk
}
15 changes: 9 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,12 +795,15 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
if t.Cfg.CompactorConfig.RetentionEnabled {
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
default:
break
}
}

return t.compactor, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/chunk/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/loki/pkg/util/filter"
)

const samplesPerChunk = 120
Expand Down Expand Up @@ -85,7 +87,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
return nil
}

func (b *bigchunk) Rebound(start, end model.Time) (Data, error) {
func (b *bigchunk) Rebound(start, end model.Time, filter filter.Func) (Data, error) {
return nil, errors.New("not implemented")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
errs "github.com/weaveworks/common/errors"

"github.com/grafana/loki/pkg/util/filter"
)

const (
Expand All @@ -47,7 +49,7 @@ type Data interface {
// Rebound returns a smaller chunk that includes all samples between start and end (inclusive).
// We do not want to change existing Slice implementations because
// it is built specifically for query optimization and is a noop for some of the encodings.
Rebound(start, end model.Time) (Data, error)
Rebound(start, end model.Time, filter filter.Func) (Data, error)
// Size returns the approximate length of the chunk in bytes.
Size() int
Utilization() float64
Expand Down
14 changes: 10 additions & 4 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,23 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

if c.deleteMode != deletion.Disabled {
switch c.deleteMode {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
r,
c.deleteMode,
)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
default:
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
Expand Down Expand Up @@ -567,7 +573,7 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio
return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
}

func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []model.Interval) {
func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []retention.IntervalFilter) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
return expired, nonDeletedIntervals
}
Expand Down
Loading