Skip to content

Commit

Permalink
Custom Retention (#3642)
Browse files Browse the repository at this point in the history
* Playing around with a POC based on the design doc.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Getting series ID for label matchers is now working.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* wip/

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* We're parsing the index label now.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Starting to extract interfaces to make the code testable.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Work in progress will try to add labels to chunk ref iterator

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Iterator for chunks ref with all labels !!!.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Optimize code away.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* with delete into the mix

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Cleaner but not yet working for v10 and v11.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes series cleaner.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hooking into the compactor.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Hooking limit retention config.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Finishing off the marker processor.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Working on sweeper and fixing tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Adding more tests and founding more bugs along the way.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Bug with path once boltdb is closed.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixing more bug and more robust test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More test and cleanup getting close.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes moar bugs with regards to period schema

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix a flaky tests because of boltdb still open.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more metrics.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Adding metrics.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve benchmark.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes issue.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint code.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* more logs.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Save files without using table key

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve logging and ability to use more goroutines.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Removes duplicate metrics since histogram contains total too.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more logs.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a deadlock bug when too many workers are trying to update the mark file.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a deadlock when reading and updating db at the same time.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes default config test  of boltdb.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* PR Review feedbacks.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove the user of assert to not fail a test if it's incorrect on the first shot.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add experimental notice to the flag documentation

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes empty index detection and table deletion.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Do not delete folder it's not necessary with object store.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Better working path cleanup

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* got linted.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Apr 28, 2021
1 parent 771d33b commit 806d6a5
Show file tree
Hide file tree
Showing 27 changed files with 2,825 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/logql/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestMappingEquivalence(t *testing.T) {
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds, shards, []string{"a", "b", "c", "d"})
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (t *Loki) setupModuleManager() error {
QueryFrontend: {Server, Overrides, TenantConfigs},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server},
Compactor: {Server, Overrides},
IngesterQuerier: {Ring},
All: {Querier, Ingester, Distributor, TableManager, Ruler},
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,11 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
}

func (t *Loki) initCompactor() (services.Service, error) {
var err error
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, prometheus.DefaultRegisterer)
err := t.Cfg.SchemaConfig.Load()
if err != nil {
return nil, err
}
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/loki/runtime_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"fmt"
"io"

"github.com/cortexproject/cortex/pkg/ring/kv"
Expand All @@ -21,15 +22,26 @@ type runtimeConfigValues struct {
Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`
}

func (r runtimeConfigValues) validate() error {
for t, c := range r.TenantLimits {
if err := c.Validate(); err != nil {
return fmt.Errorf("invalid override for tenant %s: %w", t, err)
}
}
return nil
}

func loadRuntimeConfig(r io.Reader) (interface{}, error) {
var overrides = &runtimeConfigValues{}
overrides := &runtimeConfigValues{}

decoder := yaml.NewDecoder(r)
decoder.SetStrict(true)
if err := decoder.Decode(&overrides); err != nil {
return nil, err
}

if err := overrides.validate(); err != nil {
return nil, err
}
return overrides, nil
}

Expand Down
117 changes: 117 additions & 0 deletions pkg/loki/runtime_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package loki

import (
"context"
"flag"
"io"
"io/ioutil"
"strings"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/validation"
)

func Test_LoadRetentionRules(t *testing.T) {
overrides := newTestOverrides(t,
`
overrides:
"1":
creation_grace_period: 48h
"29":
creation_grace_period: 48h
ingestion_burst_size_mb: 140
ingestion_rate_mb: 120
max_concurrent_tail_requests: 1000
max_global_streams_per_user: 100000
max_label_names_per_series: 30
max_query_parallelism: 256
split_queries_by_interval: 15m
retention_period: 1440h
retention_stream:
- selector: '{app="foo"}'
period: 48h
priority: 10
- selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}'
period: 24h
priority: 5
`)
require.Equal(t, 31*24*time.Hour, overrides.RetentionPeriod("1")) // default
require.Equal(t, 2*30*24*time.Hour, overrides.RetentionPeriod("29")) // overrides
require.Equal(t, []validation.StreamRetention(nil), overrides.StreamRetention("1"))
require.Equal(t, []validation.StreamRetention{
{Period: 48 * time.Hour, Priority: 10, Selector: `{app="foo"}`, Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
}},
{Period: 24 * time.Hour, Priority: 5, Selector: `{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}`, Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "namespace", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "cluster", "fo.*|b.+|[1-2]"),
}},
}, overrides.StreamRetention("29"))
}

func Test_ValidateRules(t *testing.T) {
_, err := loadRuntimeConfig(strings.NewReader(
`
overrides:
"29":
retention_stream:
- selector: '{app=foo"}'
period: 48h
priority: 10
- selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}'
period: 24h
priority: 10
`))
require.Equal(t, "invalid override for tenant 29: invalid labels matchers: parse error at line 1, col 6: syntax error: unexpected IDENTIFIER, expecting STRING", err.Error())
_, err = loadRuntimeConfig(strings.NewReader(
`
overrides:
"29":
retention_stream:
- selector: '{app="foo"}'
period: 5h
priority: 10
`))
require.Equal(t, "invalid override for tenant 29: retention period must be >= 24h was 5h0m0s", err.Error())
}

func newTestOverrides(t *testing.T, yaml string) *validation.Overrides {
t.Helper()
f, err := ioutil.TempFile(t.TempDir(), "bar")
require.NoError(t, err)
path := f.Name()
// fake loader to load from string instead of file.
loader := func(_ io.Reader) (interface{}, error) {
return loadRuntimeConfig(strings.NewReader(yaml))
}
cfg := runtimeconfig.ManagerConfig{
ReloadPeriod: 1 * time.Second,
Loader: loader,
LoadPath: path,
}
flagset := flag.NewFlagSet("", flag.PanicOnError)
var defaults validation.Limits
defaults.RegisterFlags(flagset)
require.NoError(t, flagset.Parse(nil))
validation.SetDefaultLimitsForYAMLUnmarshalling(defaults)

runtimeConfig, err := runtimeconfig.NewRuntimeConfigManager(cfg, prometheus.DefaultRegisterer)
require.NoError(t, err)

require.NoError(t, runtimeConfig.StartAsync(context.Background()))
require.NoError(t, runtimeConfig.AwaitRunning(context.Background()))
defer func() {
runtimeConfig.StopAsync()
require.NoError(t, runtimeConfig.AwaitTerminated(context.Background()))
}()

overrides, err := validation.NewOverrides(defaults, tenantLimitsFromRuntimeConfig(runtimeConfig))
require.NoError(t, err)
return overrides
}
140 changes: 122 additions & 18 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
Expand All @@ -17,17 +18,24 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
errUtil "github.com/grafana/loki/pkg/util"
)

const delimiter = "/"

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionInterval time.Duration `yaml:"retention_interval"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
}

// RegisterFlags registers flags.
Expand All @@ -36,6 +44,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.")
f.DurationVar(&cfg.RetentionInterval, "boltdb.shipper.compactor.retention-interval", 10*time.Minute, "Interval at which to re-run the retention operation.")
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
}

func (cfg *Config) IsDefaults() bool {
Expand All @@ -53,11 +65,13 @@ type Compactor struct {

cfg Config
objectClient chunk.ObjectClient
tableMarker *retention.Marker
sweeper *retention.Sweeper

metrics *metrics
}

func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registerer) (*Compactor, error) {
func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) {
if cfg.IsDefaults() {
return nil, errors.New("Must specify compactor config")
}
Expand All @@ -71,11 +85,24 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe
if err != nil {
return nil, err
}
prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix)

retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention")

sweeper, err := retention.NewSweeper(retentionWorkDir, retention.NewDeleteClient(objectClient), cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r)
if err != nil {
return nil, err
}
marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), r)
if err != nil {
return nil, err
}
compactor := Compactor{
cfg: cfg,
objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix),
objectClient: prefixedClient,
metrics: newMetrics(r),
tableMarker: marker,
sweeper: sweeper,
}

compactor.Service = services.NewBasicService(nil, compactor.loop, nil)
Expand All @@ -84,28 +111,68 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe

func (c *Compactor) loop(ctx context.Context) error {
runCompaction := func() {
err := c.Run(ctx)
err := c.RunCompaction(ctx)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
}
}
runRetention := func() {
err := c.RunRetention(ctx)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run retention", "err", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runCompaction()

runCompaction()

ticker := time.NewTicker(c.cfg.CompactionInterval)
defer ticker.Stop()
ticker := time.NewTicker(c.cfg.CompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
runCompaction()
case <-ctx.Done():
return nil
for {
select {
case <-ticker.C:
runCompaction()
case <-ctx.Done():
return
}
}
}()
if c.cfg.RetentionEnabled {
wg.Add(2)
go func() {
// starts the chunk sweeper
defer func() {
c.sweeper.Stop()
wg.Done()
}()
c.sweeper.Start()
<-ctx.Done()
}()
go func() {
// start the index marker
defer wg.Done()
ticker := time.NewTicker(c.cfg.RetentionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
runRetention()
case <-ctx.Done():
return
}
}
}()
}

wg.Wait()
return nil
}

func (c *Compactor) Run(ctx context.Context) error {
func (c *Compactor) RunCompaction(ctx context.Context) error {
status := statusSuccess
start := time.Now()

Expand Down Expand Up @@ -152,3 +219,40 @@ func (c *Compactor) Run(ctx context.Context) error {

return nil
}

func (c *Compactor) RunRetention(ctx context.Context) error {
status := statusSuccess
start := time.Now()

defer func() {
level.Debug(util_log.Logger).Log("msg", "finished to processing retention on all tables", "status", status, "duration", time.Since(start))
c.metrics.retentionOperationTotal.WithLabelValues(status).Inc()
if status == statusSuccess {
c.metrics.retentionOperationDurationSeconds.Set(time.Since(start).Seconds())
c.metrics.retentionOperationLastSuccess.SetToCurrentTime()
}
}()
level.Debug(util_log.Logger).Log("msg", "starting to processing retention on all all tables")

_, dirs, err := c.objectClient.List(ctx, "", delimiter)
if err != nil {
status = statusFailure
return err
}

tables := make([]string, len(dirs))
for i, dir := range dirs {
tables[i] = strings.TrimSuffix(string(dir), delimiter)
}

var errs errUtil.MultiError

for _, tableName := range tables {
if err := c.tableMarker.MarkForDelete(ctx, tableName); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to mark table for deletes", "table", tableName, "err", err)
errs.Add(err)
status = statusFailure
}
}
return errs.Err()
}
Loading

0 comments on commit 806d6a5

Please sign in to comment.