Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a DeletionMode config variable #5481

Merged
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
929cbbe
Add deletionEnabled setting and remove delete request manager
MichelHollands Feb 25, 2022
42bfebf
Rebase
MichelHollands Feb 25, 2022
a6ed435
Remove deletion handling from delete requests manager
MichelHollands Feb 25, 2022
3ce76d6
change store so it stores a logql statement
MichelHollands Feb 25, 2022
bb2b3eb
Add validation code for logql statement
MichelHollands Feb 25, 2022
9397b50
Run deleteRequestsManager when deletion is enabled
MichelHollands Feb 25, 2022
6f676aa
Remove unused variables
MichelHollands Feb 25, 2022
e1afcc3
Revert "Remove deletion handling from delete requests manager"
MichelHollands Mar 1, 2022
f6eb37e
Re-add IsDeleted method
MichelHollands Mar 1, 2022
0fbdbe1
Re-add tests for IsDeleted
MichelHollands Mar 1, 2022
5495668
Fix delete request store test
MichelHollands Mar 1, 2022
027adfd
Fix linting issue
MichelHollands Mar 1, 2022
587f6d2
Revert compactor changes
MichelHollands Mar 1, 2022
b10e121
Add deletion mode
MichelHollands Mar 1, 2022
b90811a
Add v1 mode
MichelHollands Mar 1, 2022
6a7ca3a
Rename LogQLRequest to Query
MichelHollands Mar 1, 2022
815c617
Fix linting issues
MichelHollands Mar 1, 2022
a9b8f73
Use DeleteMode in compactor module
MichelHollands Mar 1, 2022
65504d0
Rename logql to query
MichelHollands Mar 1, 2022
e75c36b
Put cancel under delete verb
MichelHollands Mar 2, 2022
1a5069b
Update documentation
MichelHollands Mar 2, 2022
dc4ab08
Update changelog
MichelHollands Mar 2, 2022
b00482c
Revert only the API surface area while keeping everything else
MasslessParticle Mar 11, 2022
38d819f
Use moved code in syntax package
MichelHollands Mar 15, 2022
3ae0aea
Remove duplicte import
MichelHollands Mar 15, 2022
b6b4734
Use renamed field in tests
MichelHollands Mar 15, 2022
beddf81
Remove duplicates and empty lines in changelog
MichelHollands Mar 15, 2022
e3dbe38
Update changelog description
MichelHollands Mar 15, 2022
1ba285a
Update pkg/storage/stores/shipper/compactor/deletion/delete_request.go
MichelHollands Mar 17, 2022
8152916
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
eb46a59
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
197e08b
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
ff27a1b
Update CHANGELOG.md
MichelHollands Mar 17, 2022
d249f18
Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_…
MichelHollands Mar 17, 2022
0b35252
Make DeletionMode struct member non public
MichelHollands Mar 17, 2022
0b49512
Revert change to docs re cancellation
MichelHollands Mar 17, 2022
1db21ca
Use same variable names
MichelHollands Mar 17, 2022
92aa117
Add parameter validation to changelog
MichelHollands Mar 17, 2022
a574d62
Rename v1 to WholeStreamDeletion
MichelHollands Mar 17, 2022
8098e1b
Fix default value of deletion mode config setting
MichelHollands Mar 18, 2022
86a2a53
reimplement new api
MasslessParticle Mar 18, 2022
16e9ab2
Add delete request handler when delete mode is set
MichelHollands Apr 4, 2022
9bec8a1
Remove unused variable
MichelHollands Apr 4, 2022
a10466b
Add comment to change the code when other deletion modes are available
MichelHollands Apr 5, 2022
f9addc8
create expirationChecker if deletionMode is set
MichelHollands Apr 5, 2022
85a32ea
Address review comments
MichelHollands Apr 6, 2022
a12b157
Update pkg/storage/stores/shipper/compactor/compactor.go
MichelHollands Apr 6, 2022
9d6dc0d
Rename AddQuery to SetQuery
MichelHollands Apr 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5481](https://github.com/grafana/loki/pull/5481) **MichelHollands**: Add a DeletionMode config variable to specify the delete mode and validate match parameters.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.
Expand Down
13 changes: 7 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -52,6 +50,7 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
Expand Down Expand Up @@ -749,10 +748,12 @@ func (t *Loki) initCompactor() (services.Service, error) {
}

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)
if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

// TODO: update this when the other deletion modes are available
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion {
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
}

return t.compactor, nil
Expand Down
12 changes: 5 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,11 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t
var deletes []*logproto.Delete
for _, del := range d {
if int64(del.StartTime) <= end && int64(del.EndTime) >= start {
for _, selector := range del.Selectors {
deletes = append(deletes, &logproto.Delete{
Selector: selector,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,11 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {

delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}

Expand Down Expand Up @@ -802,11 +802,11 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {

delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}

Expand Down
50 changes: 39 additions & 11 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -67,6 +68,7 @@ type Config struct {
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeletionMode string `yaml:"deletion_enabled"`
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
Expand All @@ -84,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
}

Expand All @@ -96,6 +99,10 @@ func (cfg *Config) Validate() error {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}

if _, err := deletion.ParseMode(cfg.DeletionMode); err != nil {
return err
}

return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

Expand All @@ -113,6 +120,7 @@ type Compactor struct {
metrics *metrics
running bool
wg sync.WaitGroup
deleteMode deletion.Mode

// Ring used for running a single compactor
ringLifecycler *ring.BasicLifecycler
Expand Down Expand Up @@ -180,6 +188,12 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st
compactor.subservicesWatcher = services.NewFailureWatcher()
compactor.subservicesWatcher.WatchManager(compactor.subservices)

mode, err := deletion.ParseMode(cfg.DeletionMode)
if err != nil {
return nil, err
}
compactor.deleteMode = mode

if err := compactor.init(storageConfig, schemaConfig, limits, clientMetrics, r); err != nil {
return nil, err
}
Expand Down Expand Up @@ -215,18 +229,24 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
return err
}

deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
if c.deleteMode == deletion.WholeStreamDeletion {
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
retention.NeverExpiringExpirationChecker(limits),
)
}

c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)

c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)

c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
if err != nil {
return err
Expand Down Expand Up @@ -286,8 +306,12 @@ func (c *Compactor) starting(ctx context.Context) (err error) {

func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RetentionEnabled {
defer c.deleteRequestsStore.Stop()
defer c.deleteRequestsManager.Stop()
if c.deleteRequestsStore != nil {
defer c.deleteRequestsStore.Stop()
}
if c.deleteRequestsManager != nil {
defer c.deleteRequestsManager.Stop()
}
}

syncTicker := time.NewTicker(c.ringPollPeriod)
Expand Down Expand Up @@ -530,6 +554,10 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
return firstErr
}

func (c *Compactor) DeleteMode() deletion.Mode {
return c.deleteMode
}

type expirationChecker struct {
retentionExpiryChecker retention.ExpirationChecker
deletionExpiryChecker retention.ExpirationChecker
Expand Down
39 changes: 14 additions & 25 deletions pkg/storage/stores/shipper/compactor/deletion/delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,30 @@ package deletion
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)

// DeleteRequest holds all the details about a delete request.
type DeleteRequest struct {
RequestID string `json:"request_id"`
StartTime model.Time `json:"start_time"`
EndTime model.Time `json:"end_time"`
Selectors []string `json:"selectors"`
Query string `json:"query"`
Status DeleteRequestStatus `json:"status"`
CreatedAt model.Time `json:"created_at"`

UserID string `json:"-"`
Matchers [][]*labels.Matcher `json:"-"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
}

func (d *DeleteRequest) AddQuery(logQL string) error {
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
d.Query = logQL
matchers, err := parseDeletionQuery(logQL)
if err != nil {
return err
}
d.matchers = matchers
return nil
}

func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) {
Expand All @@ -36,26 +44,7 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Int
return false, nil
}

matchers := make([][]*labels.Matcher, len(d.Selectors))
var err error

for j, selector := range d.Selectors {
matchers[j], err = parser.ParseMetricSelector(selector)

if err != nil {
return false, nil
}
}

matches := false
for _, matchers := range matchers {
if labels.Selector(matchers).Matches(entry.Labels) {
matches = true
break
}
}

if !matches {
if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
now := model.Now()
user1 := "user1"

lbls := `{foo="bar", fizz="buzz"}`
lbl := `{foo="bar", fizz="buzz"}`

chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(user1),
From: now.Add(-3 * time.Hour),
Through: now.Add(-time.Hour),
},
Labels: mustParseLabel(lbls),
Labels: mustParseLabel(lbl),
}

type resp struct {
Expand All @@ -43,7 +43,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -56,7 +56,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-2 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -74,7 +74,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -92,7 +92,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -110,7 +110,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-(2*time.Hour + 30*time.Minute)),
EndTime: now.Add(-(time.Hour + 30*time.Minute)),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -132,7 +132,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
Expand All @@ -141,10 +141,10 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
{
name: "request not matching due to matchers",
deleteRequest: DeleteRequest{
UserID: user1,
UserID: "user1",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{`{foo1="bar"}`, `{fizz1="buzz"}`},
Query: `{foo1="bar"}`,
},
expectedResp: resp{
isDeleted: false,
Expand All @@ -156,14 +156,15 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: "user2",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.AddQuery(tc.deleteRequest.Query))
isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isDeleted)
require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals)
Expand Down
Loading