Skip to content

Commit

Permalink
Add a DeletionMode config variable (#5481)
Browse files Browse the repository at this point in the history
* Add deletionEnabled setting and remove delete request manager

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rebase

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove deletion handling from delete requests manager

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* change store so it stores a logql statement

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add validation code for logql statement

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Run deleteRequestsManager when deletion is enabled

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove unused variables

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert "Remove deletion handling from delete requests manager"

This reverts commit ce4f774497aa590caff86b0745ec81588592a9e1.

* Re-add IsDeleted method

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Re-add tests for IsDeleted

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix delete request store test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issue

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert compactor changes

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add deletion mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add v1 mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename LogQLRequest to Query

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issues

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use DeleteMode in compactor module

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename logql to query

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Put cancel under delete verb

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update documentation

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert only the API surface area while keeping everything else

* Use moved code in syntax package

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove duplicte import

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use renamed field in tests

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove duplicates and empty lines in changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update changelog description

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_request.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update CHANGELOG.md

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Make DeletionMode struct member non public

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert change to docs re cancellation

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use same variable names

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add parameter validation to changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename v1 to WholeStreamDeletion

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix default value of deletion mode config setting

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* reimplement new api

* Add delete request handler when delete mode is set

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove unused variable

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add comment to change the code when other deletion modes are available

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* create expirationChecker if deletionMode is set

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Address review comments

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update pkg/storage/stores/shipper/compactor/compactor.go

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* Rename AddQuery to SetQuery

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

Co-authored-by: Travis Patterson <travis.patterson@grafana.com>
Co-authored-by: Christian Simon <simon@swine.de>
Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
4 people authored Apr 6, 2022
1 parent ece1fb5 commit b865b81
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5481](https://github.com/grafana/loki/pull/5481) **MichelHollands**: Add a DeletionMode config variable to specify the delete mode and validate match parameters.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.
Expand Down
13 changes: 7 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -52,6 +50,7 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
Expand Down Expand Up @@ -749,10 +748,12 @@ func (t *Loki) initCompactor() (services.Service, error) {
}

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)
if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

// TODO: update this when the other deletion modes are available
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
}

return t.compactor, nil
Expand Down
12 changes: 5 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,11 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t
var deletes []*logproto.Delete
for _, del := range d {
if int64(del.StartTime) <= end && int64(del.EndTime) >= start {
for _, selector := range del.Selectors {
deletes = append(deletes, &logproto.Delete{
Selector: selector,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,11 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {

delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}

Expand Down Expand Up @@ -802,11 +802,11 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {

delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}

Expand Down
50 changes: 39 additions & 11 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -67,6 +68,7 @@ type Config struct {
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeletionMode string `yaml:"deletion_mode"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
Expand All @@ -84,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
}

Expand All @@ -96,6 +99,10 @@ func (cfg *Config) Validate() error {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}

if _, err := deletion.ParseMode(cfg.DeletionMode); err != nil {
return err
}

return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

Expand All @@ -113,6 +120,7 @@ type Compactor struct {
metrics *metrics
running bool
wg sync.WaitGroup
deleteMode deletion.Mode

// Ring used for running a single compactor
ringLifecycler *ring.BasicLifecycler
Expand Down Expand Up @@ -180,6 +188,12 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st
compactor.subservicesWatcher = services.NewFailureWatcher()
compactor.subservicesWatcher.WatchManager(compactor.subservices)

mode, err := deletion.ParseMode(cfg.DeletionMode)
if err != nil {
return nil, err
}
compactor.deleteMode = mode

if err := compactor.init(storageConfig, schemaConfig, limits, clientMetrics, r); err != nil {
return nil, err
}
Expand Down Expand Up @@ -215,18 +229,24 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
return err
}

deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
if c.deleteMode == deletion.WholeStreamDeletion {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
retention.NeverExpiringExpirationChecker(limits),
)
}

c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)

c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)

c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
if err != nil {
return err
Expand Down Expand Up @@ -286,8 +306,12 @@ func (c *Compactor) starting(ctx context.Context) (err error) {

func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RetentionEnabled {
defer c.deleteRequestsStore.Stop()
defer c.deleteRequestsManager.Stop()
if c.deleteRequestsStore != nil {
defer c.deleteRequestsStore.Stop()
}
if c.deleteRequestsManager != nil {
defer c.deleteRequestsManager.Stop()
}
}

syncTicker := time.NewTicker(c.ringPollPeriod)
Expand Down Expand Up @@ -530,6 +554,10 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
return firstErr
}

func (c *Compactor) DeleteMode() deletion.Mode {
return c.deleteMode
}

type expirationChecker struct {
retentionExpiryChecker retention.ExpirationChecker
deletionExpiryChecker retention.ExpirationChecker
Expand Down
39 changes: 14 additions & 25 deletions pkg/storage/stores/shipper/compactor/deletion/delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,30 @@ package deletion
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)

// DeleteRequest holds all the details about a delete request.
type DeleteRequest struct {
RequestID string `json:"request_id"`
StartTime model.Time `json:"start_time"`
EndTime model.Time `json:"end_time"`
Selectors []string `json:"selectors"`
Query string `json:"query"`
Status DeleteRequestStatus `json:"status"`
CreatedAt model.Time `json:"created_at"`

UserID string `json:"-"`
Matchers [][]*labels.Matcher `json:"-"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
}

func (d *DeleteRequest) SetQuery(logQL string) error {
d.Query = logQL
matchers, err := parseDeletionQuery(logQL)
if err != nil {
return err
}
d.matchers = matchers
return nil
}

func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) {
Expand All @@ -36,26 +44,7 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Int
return false, nil
}

matchers := make([][]*labels.Matcher, len(d.Selectors))
var err error

for j, selector := range d.Selectors {
matchers[j], err = parser.ParseMetricSelector(selector)

if err != nil {
return false, nil
}
}

matches := false
for _, matchers := range matchers {
if labels.Selector(matchers).Matches(entry.Labels) {
matches = true
break
}
}

if !matches {
if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
now := model.Now()
user1 := "user1"

lbls := `{foo="bar", fizz="buzz"}`
lbl := `{foo="bar", fizz="buzz"}`

chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(user1),
From: now.Add(-3 * time.Hour),
Through: now.Add(-time.Hour),
},
Labels: mustParseLabel(lbls),
Labels: mustParseLabel(lbl),
}

type resp struct {
Expand All @@ -43,7 +43,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -56,7 +56,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-2 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -74,7 +74,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -92,7 +92,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -110,7 +110,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-(2*time.Hour + 30*time.Minute)),
EndTime: now.Add(-(time.Hour + 30*time.Minute)),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
Expand All @@ -132,7 +132,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
Expand All @@ -141,10 +141,10 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
{
name: "request not matching due to matchers",
deleteRequest: DeleteRequest{
UserID: user1,
UserID: "user1",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{`{foo1="bar"}`, `{fizz1="buzz"}`},
Query: `{foo1="bar"}`,
},
expectedResp: resp{
isDeleted: false,
Expand All @@ -156,14 +156,15 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: "user2",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isDeleted)
require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals)
Expand Down
Loading

0 comments on commit b865b81

Please sign in to comment.