Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TSDB blocks compactor #1942

Merged
merged 9 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ instructions below to upgrade your Postgres.
* [FEATURE] Added readiness probe endpoint`/ready` to queriers. #1934
* [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830
* [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749
* [FEATURE] EXPERIMENTAL: Added TSDB blocks `compactor` component, which iterates over users blocks stored in the bucket and compact them according to the configured block ranges. #1942
* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. #1978
* [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. #1923
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
Expand Down
263 changes: 263 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package compactor

import (
"context"
"flag"
"fmt"
"path"
"strings"
"sync"
"time"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
)

// Config holds the Compactor config.
type Config struct {
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
DataDir string `yaml:"data_dir"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
CompactionRetries int `yaml:"compaction_retries"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
// it in tests.
retryMinBackoff time.Duration `yaml:"-"`
retryMaxBackoff time.Duration `yaml:"-"`
}

// RegisterFlags registers the Compactor flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
cfg.retryMinBackoff = 10 * time.Second
cfg.retryMaxBackoff = time.Minute

f.Var(&cfg.BlockRanges, "compactor.block-ranges", "Comma separated list of compaction ranges expressed in the time duration format")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the default values to the help text?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There actually no need. When the --help is generated, defaults are displayed reading them from the values set:

 -compactor.block-ranges value
        Comma separated list of compaction ranges expressed in the time duration format (default 2h0m0s,12h0m0s,24h0m0s)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome

f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 30*time.Minute, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval))
f.IntVar(&cfg.BlockSyncConcurrency, "compactor.block-sync-concurrency", 20, "Number of goroutines to use when syncing block metadata from object storage")
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
}

// Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
type Compactor struct {
compactorCfg Config
storageCfg cortex_tsdb.Config
logger log.Logger

// Underlying compactor used to compact TSDB blocks.
tsdbCompactor tsdb.Compactor

// Client used to run operations on the bucket storing blocks.
bucketClient objstore.Bucket

// Wait group used to wait until the internal go routine completes.
runner sync.WaitGroup

// Context used to run compaction and its cancel function to
// safely interrupt it on shutdown.
ctx context.Context
cancelCtx context.CancelFunc

// Metrics.
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsFailed prometheus.Counter
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg, "compactor", logger)
if err != nil {
cancelCtx()
return nil, errors.Wrap(err, "failed to create the bucket client")
}

tsdbCompactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
if err != nil {
cancelCtx()
return nil, errors.Wrap(err, "failed to create TSDB compactor")
}

return newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registerer)
}

func newCompactor(
ctx context.Context,
cancelCtx context.CancelFunc,
compactorCfg Config,
storageCfg cortex_tsdb.Config,
bucketClient objstore.Bucket,
tsdbCompactor tsdb.Compactor,
logger log.Logger,
registerer prometheus.Registerer,
) (*Compactor, error) {
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
logger: logger,
bucketClient: bucketClient,
tsdbCompactor: tsdbCompactor,
ctx: ctx,
cancelCtx: cancelCtx,
compactionRunsStarted: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Help: "Total number of compaction runs started.",
}),
compactionRunsCompleted: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_completed_total",
Help: "Total number of compaction runs successfully completed.",
}),
compactionRunsFailed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_failed_total",
Help: "Total number of compaction runs failed.",
}),
}

// Register metrics.
if registerer != nil {
registerer.MustRegister(c.compactionRunsStarted, c.compactionRunsCompleted, c.compactionRunsFailed)
}

// Start the compactor loop.
c.runner.Add(1)
go c.run()

return c, nil
}

// Shutdown the compactor and waits until done. This may take some time
// if there's a on-going compaction.
func (c *Compactor) Shutdown() {
c.cancelCtx()
c.runner.Wait()
}

func (c *Compactor) run() {
defer c.runner.Done()

// Run an initial compaction before starting the interval.
c.compactUsersWithRetries(c.ctx)

ticker := time.NewTicker(c.compactorCfg.CompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.compactUsersWithRetries(c.ctx)
case <-c.ctx.Done():
return
}
}
}

func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
})

c.compactionRunsStarted.Inc()

for retries.Ongoing() {
if success := c.compactUsers(ctx); success {
c.compactionRunsCompleted.Inc()
return
}

retries.Wait()
}

c.compactionRunsFailed.Inc()
}

func (c *Compactor) compactUsers(ctx context.Context) bool {
level.Info(c.logger).Log("msg", "discovering users from bucket")
users, err := c.discoverUsers(ctx)
if err != nil {
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
return false
}
level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))

for _, userID := range users {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if ctx.Err() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this statement looks funny to my eyes. Can this be a switch with <- ctx.Done() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean changing it to something like the following? If yes, what's the real benefit? To me it looks we're writing the same thing using more lines of code.

select {
case <-c.ctx.Done():
  return c.ctx.Err()
default:
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it just felt more idiomatic to me to write it that way. But I have no qualms leaving it the way it is.

Copy link
Contributor

@pstibrany pstibrany Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would only use select version if there were more things to select on. ctx.Err() returns non-nil value exactly when context is finished, so I think this is fine.

level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
return false
}

level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)

if err = c.compactUser(ctx, userID); err != nil {
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
continue
}

level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
}

return true
}

func (c *Compactor) compactUser(ctx context.Context, userID string) error {
bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)

syncer, err := compact.NewSyncer(
c.logger,
nil, // TODO(pracucci) we should pass the prometheus registerer, but we would need to inject the user label to each metric, otherwise we have clashing metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found this to be troublesome since we likely don't want per-user metrics anyways but want a rollup of all the metrics across users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Don't have an answer yet. As stated in the PR description, I would prefer to defer it to a subsequent PR, to not block the compactor because of this.

What's your take? Do you have any idea to solve this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh definitely don't block this PR for that. I too punted on a solution for that for the other user wrapped thanos components. Just wanted to add my thoughts to the comment.

I'm not sure there can be a good solution for this without changing the Thanos code to only call the register function once.

Copy link
Contributor Author

@pracucci pracucci Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh definitely don't block this PR for that

👍

I'm not sure there can be a good solution for this without changing the Thanos code

An option - in Thanos - would be exposing compact.syncerMetrics and picking its instance in input in compact.NewSyncer() so what we could create it once in Cortex and pass the same syncerMetrics instance to multiple Syncer.

@bwplotka would you see feasible having such refactoring in Thanos, just to help Cortex? Would you see a better way to do it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to treat any Thanos package as a library, so if the use case of a struct in our package makes sense, then we are ok with it. (:

We can definitely allow passing metrics for syncer. Also, note that we changed syncer a bit and introduced block.MetadataFetcher. I assume you use different syncers because of many buckets?

bucket,
c.compactorCfg.ConsistencyDelay,
c.compactorCfg.BlockSyncConcurrency,
false, // Do not accept malformed indexes
true, // Enable vertical compaction
[]*relabel.Config{})
if err != nil {
return errors.Wrap(err, "failed to create syncer")
}

compactor, err := compact.NewBucketCompactor(
c.logger,
syncer,
c.tsdbCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
pracucci marked this conversation as resolved.
Show resolved Hide resolved
bucket,
// No compaction concurrency. Due to how Cortex works we don't
// expect to have multiple block groups per tenant, so setting
// a value higher than 1 would be useless.
1,
pracucci marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return errors.Wrap(err, "failed to create bucket compactor")
}

return compactor.Compact(ctx)
}

func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
var users []string

err := c.bucketClient.Iter(ctx, "", func(entry string) error {
users = append(users, strings.TrimSuffix(entry, "/"))
return nil
})

return users, err
}
Loading