From 39f337b0535fb629613873757f3fc3a5e91ba176 Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Thu, 23 Jun 2022 13:28:54 +0100 Subject: [PATCH] [k102] Backport Remove whole stream deletion mode (#6435) (#6477) * Remove whole stream deletion mode (#6435) * Remove whole-stream-deletion mode Signed-off-by: Michel Hollands * Remove whole-stream-deletion from docs Signed-off-by: Michel Hollands * Update the changelog Signed-off-by: Michel Hollands * Sort changelog entries Signed-off-by: Michel Hollands * Remove link to wrong configuration Signed-off-by: Michel Hollands * Fix integration test Signed-off-by: Michel Hollands * Set default deletion mode to disabled Signed-off-by: Michel Hollands * Remove extra white line in documentation Signed-off-by: Michel Hollands * Fix default value in docs Signed-off-by: Michel Hollands * Fix changelog Signed-off-by: Michel Hollands * Add DeletionEnabled method on mode Signed-off-by: Michel Hollands * Rename test Signed-off-by: Michel Hollands (cherry picked from commit f80e487dcf106a6c4ea0a82d8afe7b0104addbc3) * Fix changelog Signed-off-by: Michel Hollands * Fix changelog link Signed-off-by: Michel Hollands (cherry picked from commit da3fbd965bd4d1f31130dacaa2d1e1dff388321f) --- CHANGELOG.md | 1 + docs/sources/configuration/_index.md | 11 +++------ .../operations/storage/logs-deletion.md | 4 +--- .../loki_micro_services_delete_test.go | 2 ++ pkg/loki/modules.go | 21 +++++++--------- .../stores/shipper/compactor/compactor.go | 7 +++--- .../deletion/delete_requests_manager_test.go | 16 ++++++------- .../stores/shipper/compactor/deletion/mode.go | 17 +++++++------ .../shipper/compactor/deletion/mode_test.go | 24 +++++++++++++++---- 9 files changed, 53 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a895e73e6d41..2ce591769d90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [6435](https://github.com/grafana/loki/pull/6435) **MichelHollands**: Remove the `whole-stream-deletion` mode. * [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling. * [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target * [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index e5032981b929..46750788441b 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2093,11 +2093,6 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] -# Which deletion mode to use. Supported values are: disabled, -# whole-stream-deletion, filter-only, filter-and-delete -# CLI flag: -boltdb.shipper.compactor.deletion-mode -[deletion_mode: | default = "whole-stream-deletion"] - # Maximum number of tables to compact in parallel. # While increasing this value, please make sure compactor has enough disk space # allocated to be able to store and compact as many tables. @@ -2105,11 +2100,11 @@ compacts index shards to more performant forms. [max_compaction_parallelism: | default = 1] # Deletion mode. -# Can be one of "disabled", "whole-stream-deletion", "filter-only", or "filter-and-delete". -# When set to the default value of "whole-stream-deletion", and if +# Can be one of "disabled", "filter-only", or "filter-and-delete". +# When set to "filter-only" or "filter-and-delete", and if # retention_enabled is true, then the log entry deletion API endpoints are available. # CLI flag: -boltdb.shipper.compactor.deletion-mode -[deletion_mode: | default = "whole-stream-deletion"] +[deletion_mode: | default = "disabled"] # The hash ring configuration used by compactors to elect a single instance for running compactions # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring diff --git a/docs/sources/operations/storage/logs-deletion.md b/docs/sources/operations/storage/logs-deletion.md index acc75a390d50..95675e185d79 100644 --- a/docs/sources/operations/storage/logs-deletion.md +++ b/docs/sources/operations/storage/logs-deletion.md @@ -9,7 +9,6 @@ Log entry deletion is supported _only_ for the BoltDB Shipper index store. Grafana Loki supports the deletion of log entries from a specified stream. Log entries that fall within a specified time window and match an optional line filter are those that will be deleted. - The Compactor component exposes REST endpoints that process delete requests. Hitting the endpoint specifies the streams and the time window. The deletion of the log entries takes place after a configurable cancellation time period expires. @@ -18,9 +17,8 @@ Log entry deletion relies on configuration of the custom logs retention workflow ## Configuration -Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `whole-stream-deletion`, `filter-only`, or `filter-and-delete` in the compactor's configuration. See the example in [Retention configuration](../retention#retention-configuration). +Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `filter-only` or `filter-and-delete` in the compactor's configuration. -With `whole-stream-deletion`, all the log entries matching the query given in the delete request are removed. With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks. With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks. diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 1949d617ba4e..6af2f1034b5e 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -52,12 +52,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) { "-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host, "-frontend.default-validity=0s", "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host, + "-common.compactor-address="+tCompactor.HTTPURL().String(), ) _ = clu.AddComponent( "querier", "-target=querier", "-querier.scheduler-address="+tQueryScheduler.GRPCURL().Host, "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host, + "-common.compactor-address="+tCompactor.HTTPURL().String(), ) ) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 44abfb3daffd..47d07eeca6f9 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -568,7 +568,7 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { } func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) { - filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + filteringEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode) if err != nil { return nil, err } @@ -864,16 +864,11 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) - if t.Cfg.CompactorConfig.RetentionEnabled { - switch t.compactor.DeleteMode() { - case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) - default: - break - } + if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode().DeleteEnabled() { + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) } return t.compactor, nil @@ -971,12 +966,12 @@ func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) { return deletion.NewNoOpDeleteRequestsStore(), nil } - filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + deleteEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode) if err != nil { return nil, err } - if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled { + if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !deleteEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index aa55ba11d7e6..e523acc33025 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -87,7 +87,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") - f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) + f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "disabled", fmt.Sprintf("Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)") } @@ -231,12 +231,11 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - switch c.deleteMode { - case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: + if c.deleteMode.DeleteEnabled() { if err := c.initDeletes(r, limits); err != nil { return err } - default: + } else { c.expirationChecker = newExpirationChecker( retention.NewExpirationChecker(limits), // This is a dummy deletion ExpirationChecker that never expires anything diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go index 2712084210d2..36e9e67a53af 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go @@ -41,7 +41,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }{ { name: "no delete requests", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, expectedResp: resp{ isExpired: false, nonDeletedIntervals: nil, @@ -49,7 +49,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "no relevant delete requests", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: "different-user", @@ -65,7 +65,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "whole chunk deleted by single request", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -81,7 +81,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "deleted interval out of range", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -97,7 +97,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple delete requests with one deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -119,7 +119,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple delete requests causing multiple holes", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -172,7 +172,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple overlapping requests deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -194,7 +194,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple non-overlapping requests deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode.go b/pkg/storage/stores/shipper/compactor/deletion/mode.go index ca0dd624f8b9..733ce27fad1e 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode.go @@ -11,8 +11,7 @@ var ( ) const ( - Disabled Mode = iota - WholeStreamDeletion // The existing log deletion that removes whole streams. + Disabled Mode = iota FilterOnly FilterAndDelete ) @@ -21,8 +20,6 @@ func (m Mode) String() string { switch m { case Disabled: return "disabled" - case WholeStreamDeletion: - return "whole-stream-deletion" case FilterOnly: return "filter-only" case FilterAndDelete: @@ -31,16 +28,18 @@ func (m Mode) String() string { return "unknown" } +func (m Mode) DeleteEnabled() bool { + return m == FilterOnly || m == FilterAndDelete +} + func AllModes() []string { - return []string{Disabled.String(), WholeStreamDeletion.String(), FilterOnly.String(), FilterAndDelete.String()} + return []string{Disabled.String(), FilterOnly.String(), FilterAndDelete.String()} } func ParseMode(in string) (Mode, error) { switch in { case "disabled": return Disabled, nil - case "whole-stream-deletion": - return WholeStreamDeletion, nil case "filter-only": return FilterOnly, nil case "filter-and-delete": @@ -49,11 +48,11 @@ func ParseMode(in string) (Mode, error) { return 0, errUnknownMode } -func FilteringEnabled(in string) (bool, error) { +func DeleteEnabled(in string) (bool, error) { deleteMode, err := ParseMode(in) if err != nil { return false, err } - return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil + return deleteMode.DeleteEnabled(), nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode_test.go b/pkg/storage/stores/shipper/compactor/deletion/mode_test.go index f8f2fb489990..de9145d9c284 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode_test.go @@ -8,7 +8,7 @@ import ( func TestAllModes(t *testing.T) { modes := AllModes() - require.ElementsMatch(t, []string{"disabled", "whole-stream-deletion", "filter-only", "filter-and-delete"}, modes) + require.ElementsMatch(t, []string{"disabled", "filter-only", "filter-and-delete"}, modes) } func TestParseMode(t *testing.T) { @@ -16,10 +16,6 @@ func TestParseMode(t *testing.T) { require.NoError(t, err) require.Equal(t, Disabled, mode) - mode, err = ParseMode("whole-stream-deletion") - require.NoError(t, err) - require.Equal(t, WholeStreamDeletion, mode) - mode, err = ParseMode("filter-only") require.NoError(t, err) require.Equal(t, FilterOnly, mode) @@ -31,3 +27,21 @@ func TestParseMode(t *testing.T) { _, err = ParseMode("something-else") require.ErrorIs(t, errUnknownMode, err) } + +func TestDeleteEnabled(t *testing.T) { + enabled, err := DeleteEnabled("disabled") + require.NoError(t, err) + require.False(t, enabled) + + enabled, err = DeleteEnabled("filter-only") + require.NoError(t, err) + require.True(t, enabled) + + enabled, err = DeleteEnabled("filter-and-delete") + require.NoError(t, err) + require.True(t, enabled) + + enabled, err = DeleteEnabled("some other value") + require.Error(t, err) + require.False(t, enabled) +}