Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: add per tenant compaction delete enabled flag #6410

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling.
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields.
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the Promtail journal target.
* [6099](https://github.com/grafana/loki/pull/6099) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage.
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,10 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# This also determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]

# When true, access to the deletion API is enabled.
# CLI flag: -compactor.allow_deletes
[allow_deletes: <boolean> | default = false]
```

## sigv4_config
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/operations/storage/logs-deletion.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ With `whole-stream-deletion`, all the log entries matching the query given in th
With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks.
With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks.


A delete request may be canceled within a configurable cancellation period. Set the `delete_request_cancel_period` in the Compactor's YAML configuration or on the command line when invoking Loki. Its default value is 24h.

Access to the deletion API can be enabled per tenant via the `allow_deletes` setting.

## Compactor endpoints

The Compactor exposes endpoints to allow for the deletion of log entries from specified streams.
Expand Down
1 change: 1 addition & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-boltdb.shipper.compactor.deletion-mode=filter-and-delete",
// By default a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.allow-deletes=true",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
Expand Down
8 changes: 4 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,10 +873,10 @@ func (t *Loki) initCompactor() (services.Service, error) {
if t.Cfg.CompactorConfig.RetentionEnabled {
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
default:
break
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
limits,
r,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,40 @@ import (

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

const deletionNotAvailableMsg = "deletion is not available for this tenant"

// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
limits retention.Limits
deleteRequestCancelPeriod time.Duration
}

// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteRequestsStore: deleteStore,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
limits: limits,
metrics: newDeleteRequestHandlerMetrics(registerer),
}

return &deleteMgr
}

// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler))
}

// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -98,7 +108,12 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
}

// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler))
}

// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -120,7 +135,12 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
}

// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler))
}

// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -163,7 +183,12 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
}

// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler))
}

// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -183,3 +208,30 @@ func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseW
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}
}

func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

allLimits := dm.limits.AllByUserID()
userLimits, ok := allLimits[userID]
if ok {
if !userLimits.CompactorDeletionEnabled {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
} else {
if !dm.limits.DefaultLimits().CompactorDeletionEnabled {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
}

next.ServeHTTP(w, r)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package deletion

import (
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/validation"
)

type retentionLimit struct {
compactorDeletionEnabled bool
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
}

func (r retentionLimit) convertToValidationLimit() *validation.Limits {
return &validation.Limits{
CompactorDeletionEnabled: r.compactorDeletionEnabled,
RetentionPeriod: model.Duration(r.retentionPeriod),
StreamRetention: r.streamRetention,
}
}

type fakeLimits struct {
defaultLimit retentionLimit
perTenant map[string]retentionLimit
}

func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
return f.perTenant[userID].retentionPeriod
}

func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.perTenant[userID].streamRetention
}

func (f fakeLimits) CompactorDeletionEnabled(userID string) bool {
return f.perTenant[userID].compactorDeletionEnabled
}

func (f fakeLimits) DefaultLimits() *validation.Limits {
return f.defaultLimit.convertToValidationLimit()
}

func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
res := make(map[string]*validation.Limits)
for userID, ret := range f.perTenant {
res[userID] = ret.convertToValidationLimit()
}
return res
}

func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) {
// build the store
tempDir := t.TempDir()

workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")

objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)

// limits
fl := &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: true},
"2": {compactorDeletionEnabled: false},
},
}

// Setup handler
drh := NewDeleteRequestHandler(testDeleteRequestsStore, 10*time.Second, fl, nil)
middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))

// User that has deletion enabled
req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "1"))

res := httptest.NewRecorder()
middle.ServeHTTP(res, req)

require.Equal(t, http.StatusOK, res.Result().StatusCode)

// User that does not have deletion enabled
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "2"))

res = httptest.NewRecorder()
middle.ServeHTTP(res, req)

require.Equal(t, http.StatusForbidden, res.Result().StatusCode)

// User without override, this should use the default value which is false
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))

res = httptest.NewRecorder()
middle.ServeHTTP(res, req)

require.Equal(t, http.StatusForbidden, res.Result().StatusCode)

// User without override, after the default value is set to true
fl.defaultLimit.compactorDeletionEnabled = true

req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))

res = httptest.NewRecorder()
middle.ServeHTTP(res, req)

require.Equal(t, http.StatusOK, res.Result().StatusCode)

// User header is not given
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)

res = httptest.NewRecorder()
middle.ServeHTTP(res, req)

require.Equal(t, http.StatusBadRequest, res.Result().StatusCode)
}
8 changes: 8 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type Limits struct {
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteSigV4Config *sigv4.SigV4Config `yaml:"ruler_remote_write_sigv4_config" json:"ruler_remote_write_sigv4_config"`

CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"`

// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"`
Expand Down Expand Up @@ -193,6 +195,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

_ = l.QuerySplitDuration.Set("30m")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled")

f.BoolVar(&l.CompactorDeletionEnabled, "compactor.allow-deletes", false, "Enable access to the deletion API.")
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down Expand Up @@ -532,6 +536,10 @@ func (o *Overrides) UnorderedWrites(userID string) bool {
return o.getOverridesForUser(userID).UnorderedWrites
}

func (o *Overrides) CompactorDeletionEnabled(userID string) bool {
return o.getOverridesForUser(userID).CompactorDeletionEnabled
}

func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}
Expand Down