Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalidate caches on delete #5661

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ to include only the most relevant.
* [5543](https://github.com/grafana/loki/pull/5543) **cyriltovena**: update loki go version to 1.17.8
* [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option
* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides
* [5661](https://github.com/grafana/loki/pull/5450) **masslessparticle**: Invalidate caches on deletes
* [5358](https://github.com/grafana/loki/pull/5358) **DylanGuedes**: Add `RingMode` support to `IndexGateway`
* [5435](https://github.com/grafana/loki/pull/5435) **slim-bean**: set match_max_concurrent true by default
* [5361](https://github.com/grafana/loki/pull/5361) **cyriltovena**: Add usage report into Loki.
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]

# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]

# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2041,6 +2045,11 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]

# Which deletion mode to use. Supported values are: disabled,
# whole-stream-deletion, filter-only, filter-and-delete
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]

# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
Expand Down
42 changes: 36 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,17 @@ import (
"os"
"time"

"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/tenant"
gerrors "github.com/pkg/errors"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/runtimeconfig"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
gerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
Expand Down Expand Up @@ -53,6 +52,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/generationnumber"
cstyan marked this conversation as resolved.
Show resolved Hide resolved
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
Expand Down Expand Up @@ -494,11 +494,17 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { ret
func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

genNumberGetter, err := t.frontendGenerationNumberGetter()
if err != nil {
return nil, err
}

tripperware, stopper, err := queryrange.NewTripperware(
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig,
generationnumber.NewGenNumberLoader(genNumberGetter, prometheus.DefaultRegisterer),
prometheus.DefaultRegisterer,
)
if err != nil {
Expand All @@ -510,6 +516,29 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
return services.NewIdleService(nil, nil), nil
}

func (t *Loki) frontendGenerationNumberGetter() (generationnumber.CacheGenClient, error) {
MasslessParticle marked this conversation as resolved.
Show resolved Hide resolved
filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}

if !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}), nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))

Expand Down Expand Up @@ -766,11 +795,12 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

// TODO: update this when the other deletion modes are available
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion {
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() != deletion.Disabled {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))

t.Server.HTTP.Path("/loki/api/v1/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
MasslessParticle marked this conversation as resolved.
Show resolved Hide resolved
}

return t.compactor, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -27,6 +28,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")

f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{}, nil)
}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewTripperware(
log log.Logger,
limits Limits,
schema config.SchemaConfig,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
metrics := NewMetrics(registerer)
Expand All @@ -61,7 +62,7 @@ func NewTripperware(
}

metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c,
PrometheusExtractor{}, metrics, registerer)
cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -387,6 +388,7 @@ func NewMetricTripperware(
schema config.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
extractor queryrangebase.Extractor,
metrics *Metrics,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -414,7 +416,7 @@ func NewMetricTripperware(
limits,
codec,
extractor,
nil,
cacheGenNumLoader,
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestMetricsTripperware(t *testing.T) {
}

func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}

func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestLabelsTripperware(t *testing.T) {
}

func TestLogNoRegex(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestLogNoRegex(t *testing.T) {
}

func TestUnhandledPath(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand All @@ -403,7 +403,7 @@ func TestUnhandledPath(t *testing.T) {

func TestRegexpParamsSupport(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestPostQueries(t *testing.T) {
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestEntriesLimitsTripperware(t *testing.T) {
}

func TestEntriesLimitWithZeroTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

if c.deleteMode == deletion.WholeStreamDeletion {
if c.deleteMode != deletion.Disabled {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func (m mockDeleteRequestsStore) Stop() {
panic("implement me")
}

func (m mockDeleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) {
panic("implement me")
}
MasslessParticle marked this conversation as resolved.
Show resolved Hide resolved

func (m mockDeleteRequestsStore) Name() string {
return ""
}

func TestDeleteRequestsManager_Expired(t *testing.T) {
type resp struct {
isExpired bool
Expand Down
Loading