diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b65e0516ab1..a895e73e6d41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling. * [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target * [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage * [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index f758ab5d8d51..e5032981b929 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2373,6 +2373,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. # This also determines how cache keys are chosen when result caching is enabled # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 30m] + +# When true, access to the deletion API is enabled. +# CLI flag: -compactor.allow_deletes +[allow_deletes: | default = false] ``` ### grpc_client_config diff --git a/docs/sources/operations/storage/logs-deletion.md b/docs/sources/operations/storage/logs-deletion.md index 5c3bb31dfc8f..acc75a390d50 100644 --- a/docs/sources/operations/storage/logs-deletion.md +++ b/docs/sources/operations/storage/logs-deletion.md @@ -24,9 +24,10 @@ With `whole-stream-deletion`, all the log entries matching the query given in th With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks. With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks. - A delete request may be canceled within a configurable cancellation period. Set the `delete_request_cancel_period` in the Compactor's YAML configuration or on the command line when invoking Loki. Its default value is 24h. +Access to the deletion API can be enabled per tenant via the `allow_deletes` setting. + ## Compactor endpoints The Compactor exposes endpoints to allow for the deletion of log entries from specified streams. diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index dbd1b97c8dc1..1949d617ba4e 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -26,6 +26,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) { "-boltdb.shipper.compactor.deletion-mode=filter-and-delete", // By default a minute is added to the delete request start time. This compensates for that. "-boltdb.shipper.compactor.delete-request-cancel-period=-60s", + "-compactor.allow-deletes=true", ) tIndexGateway = clu.AddComponent( "index-gateway", diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b930746812f4..44abfb3daffd 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -867,9 +867,10 @@ func (t *Loki) initCompactor() (services.Service, error) { if t.Cfg.CompactorConfig.RetentionEnabled { switch t.compactor.DeleteMode() { case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) default: break } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 0c6673c52c05..aa55ba11d7e6 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -265,6 +265,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler( c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, + limits, r, ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/request_handler.go b/pkg/storage/stores/shipper/compactor/deletion/request_handler.go index 4566a7c0f457..acead9175ba1 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/request_handler.go +++ b/pkg/storage/stores/shipper/compactor/deletion/request_handler.go @@ -12,22 +12,27 @@ import ( "github.com/grafana/dskit/tenant" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) +const deletionNotAvailableMsg = "deletion is not available for this tenant" + // DeleteRequestHandler provides handlers for delete requests type DeleteRequestHandler struct { deleteRequestsStore DeleteRequestsStore metrics *deleteRequestHandlerMetrics + limits retention.Limits deleteRequestCancelPeriod time.Duration } // NewDeleteRequestHandler creates a DeleteRequestHandler -func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { +func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler { deleteMgr := DeleteRequestHandler{ deleteRequestsStore: deleteStore, deleteRequestCancelPeriod: deleteRequestCancelPeriod, + limits: limits, metrics: newDeleteRequestHandlerMetrics(registerer), } @@ -35,7 +40,12 @@ func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCance } // AddDeleteRequestHandler handles addition of a new delete request -func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler { + return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler)) +} + +// AddDeleteRequestHandler handles addition of a new delete request +func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -98,7 +108,12 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r } // GetAllDeleteRequestsHandler handles get all delete requests -func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler { + return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler)) +} + +// GetAllDeleteRequestsHandler handles get all delete requests +func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -120,7 +135,12 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite } // CancelDeleteRequestHandler handles delete request cancellation -func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler { + return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler)) +} + +// CancelDeleteRequestHandler handles delete request cancellation +func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -163,7 +183,12 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter } // GetCacheGenerationNumberHandler handles requests for a user's cache generation number -func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler { + return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler)) +} + +// GetCacheGenerationNumberHandler handles requests for a user's cache generation number +func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -183,3 +208,30 @@ func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseW http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError) } } + +func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + allLimits := dm.limits.AllByUserID() + userLimits, ok := allLimits[userID] + if ok { + if !userLimits.CompactorDeletionEnabled { + http.Error(w, deletionNotAvailableMsg, http.StatusForbidden) + return + } + } else { + if !dm.limits.DefaultLimits().CompactorDeletionEnabled { + http.Error(w, deletionNotAvailableMsg, http.StatusForbidden) + return + } + } + + next.ServeHTTP(w, r) + }) +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go b/pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go new file mode 100644 index 000000000000..aaecb4442873 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go @@ -0,0 +1,133 @@ +package deletion + +import ( + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/validation" +) + +type retentionLimit struct { + compactorDeletionEnabled bool + retentionPeriod time.Duration + streamRetention []validation.StreamRetention +} + +func (r retentionLimit) convertToValidationLimit() *validation.Limits { + return &validation.Limits{ + CompactorDeletionEnabled: r.compactorDeletionEnabled, + RetentionPeriod: model.Duration(r.retentionPeriod), + StreamRetention: r.streamRetention, + } +} + +type fakeLimits struct { + defaultLimit retentionLimit + perTenant map[string]retentionLimit +} + +func (f fakeLimits) RetentionPeriod(userID string) time.Duration { + return f.perTenant[userID].retentionPeriod +} + +func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention { + return f.perTenant[userID].streamRetention +} + +func (f fakeLimits) CompactorDeletionEnabled(userID string) bool { + return f.perTenant[userID].compactorDeletionEnabled +} + +func (f fakeLimits) DefaultLimits() *validation.Limits { + return f.defaultLimit.convertToValidationLimit() +} + +func (f fakeLimits) AllByUserID() map[string]*validation.Limits { + res := make(map[string]*validation.Limits) + for userID, ret := range f.perTenant { + res[userID] = ret.convertToValidationLimit() + } + return res +} + +func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) { + // build the store + tempDir := t.TempDir() + + workingDir := filepath.Join(tempDir, "working-dir") + objectStorePath := filepath.Join(tempDir, "object-store") + + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: objectStorePath, + }) + require.NoError(t, err) + testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, "")) + require.NoError(t, err) + + // limits + fl := &fakeLimits{ + perTenant: map[string]retentionLimit{ + "1": {compactorDeletionEnabled: true}, + "2": {compactorDeletionEnabled: false}, + }, + } + + // Setup handler + drh := NewDeleteRequestHandler(testDeleteRequestsStore, 10*time.Second, fl, nil) + middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + + // User that has deletion enabled + req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "1")) + + res := httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusOK, res.Result().StatusCode) + + // User that does not have deletion enabled + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "2")) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusForbidden, res.Result().StatusCode) + + // User without override, this should use the default value which is false + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "3")) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusForbidden, res.Result().StatusCode) + + // User without override, after the default value is set to true + fl.defaultLimit.compactorDeletionEnabled = true + + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "3")) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusOK, res.Result().StatusCode) + + // User header is not given + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusBadRequest, res.Result().StatusCode) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 2cc1ad8e50e0..a9f7286ab618 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -109,6 +109,8 @@ type Limits struct { RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"` RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"` + CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"` + // Global and per tenant retention RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"` StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"` @@ -191,6 +193,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("30m") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled") + + f.BoolVar(&l.CompactorDeletionEnabled, "compactor.allow-deletes", false, "Enable access to the deletion API.") } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -526,6 +530,10 @@ func (o *Overrides) UnorderedWrites(userID string) bool { return o.getOverridesForUser(userID).UnorderedWrites } +func (o *Overrides) CompactorDeletionEnabled(userID string) bool { + return o.getOverridesForUser(userID).CompactorDeletionEnabled +} + func (o *Overrides) DefaultLimits() *Limits { return o.defaultLimits }