diff --git a/CHANGELOG.md b/CHANGELOG.md index 00ef055277..c6b7d0b884 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ instructions below to upgrade your Postgres. * [FEATURE] Added readiness probe endpoint`/ready` to queriers. #1934 * [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830 * [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749 +* [FEATURE] EXPERIMENTAL: Added TSDB blocks `compactor` component, which iterates over users blocks stored in the bucket and compact them according to the configured block ranges. #1942 * [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. #1978 * [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. #1923 * [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go new file mode 100644 index 0000000000..7c87f9e099 --- /dev/null +++ b/pkg/compactor/compactor.go @@ -0,0 +1,263 @@ +package compactor + +import ( + "context" + "flag" + "fmt" + "path" + "strings" + "sync" + "time" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/objstore" +) + +// Config holds the Compactor config. +type Config struct { + BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + DataDir string `yaml:"data_dir"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionRetries int `yaml:"compaction_retries"` + + // No need to add options to customize the retry backoff, + // given the defaults should be fine, but allow to override + // it in tests. + retryMinBackoff time.Duration `yaml:"-"` + retryMaxBackoff time.Duration `yaml:"-"` +} + +// RegisterFlags registers the Compactor flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} + cfg.retryMinBackoff = 10 * time.Second + cfg.retryMaxBackoff = time.Minute + + f.Var(&cfg.BlockRanges, "compactor.block-ranges", "Comma separated list of compaction ranges expressed in the time duration format") + f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 30*time.Minute, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)) + f.IntVar(&cfg.BlockSyncConcurrency, "compactor.block-sync-concurrency", 20, "Number of goroutines to use when syncing block metadata from object storage") + f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions") + f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs") + f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval") +} + +// Compactor is a multi-tenant TSDB blocks compactor based on Thanos. +type Compactor struct { + compactorCfg Config + storageCfg cortex_tsdb.Config + logger log.Logger + + // Underlying compactor used to compact TSDB blocks. + tsdbCompactor tsdb.Compactor + + // Client used to run operations on the bucket storing blocks. + bucketClient objstore.Bucket + + // Wait group used to wait until the internal go routine completes. + runner sync.WaitGroup + + // Context used to run compaction and its cancel function to + // safely interrupt it on shutdown. + ctx context.Context + cancelCtx context.CancelFunc + + // Metrics. + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionRunsFailed prometheus.Counter +} + +// NewCompactor makes a new Compactor. +func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) { + ctx, cancelCtx := context.WithCancel(context.Background()) + + bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg, "compactor", logger) + if err != nil { + cancelCtx() + return nil, errors.Wrap(err, "failed to create the bucket client") + } + + tsdbCompactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool()) + if err != nil { + cancelCtx() + return nil, errors.Wrap(err, "failed to create TSDB compactor") + } + + return newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registerer) +} + +func newCompactor( + ctx context.Context, + cancelCtx context.CancelFunc, + compactorCfg Config, + storageCfg cortex_tsdb.Config, + bucketClient objstore.Bucket, + tsdbCompactor tsdb.Compactor, + logger log.Logger, + registerer prometheus.Registerer, +) (*Compactor, error) { + c := &Compactor{ + compactorCfg: compactorCfg, + storageCfg: storageCfg, + logger: logger, + bucketClient: bucketClient, + tsdbCompactor: tsdbCompactor, + ctx: ctx, + cancelCtx: cancelCtx, + compactionRunsStarted: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_runs_started_total", + Help: "Total number of compaction runs started.", + }), + compactionRunsCompleted: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_runs_completed_total", + Help: "Total number of compaction runs successfully completed.", + }), + compactionRunsFailed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_runs_failed_total", + Help: "Total number of compaction runs failed.", + }), + } + + // Register metrics. + if registerer != nil { + registerer.MustRegister(c.compactionRunsStarted, c.compactionRunsCompleted, c.compactionRunsFailed) + } + + // Start the compactor loop. + c.runner.Add(1) + go c.run() + + return c, nil +} + +// Shutdown the compactor and waits until done. This may take some time +// if there's a on-going compaction. +func (c *Compactor) Shutdown() { + c.cancelCtx() + c.runner.Wait() +} + +func (c *Compactor) run() { + defer c.runner.Done() + + // Run an initial compaction before starting the interval. + c.compactUsersWithRetries(c.ctx) + + ticker := time.NewTicker(c.compactorCfg.CompactionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.compactUsersWithRetries(c.ctx) + case <-c.ctx.Done(): + return + } + } +} + +func (c *Compactor) compactUsersWithRetries(ctx context.Context) { + retries := util.NewBackoff(ctx, util.BackoffConfig{ + MinBackoff: c.compactorCfg.retryMinBackoff, + MaxBackoff: c.compactorCfg.retryMaxBackoff, + MaxRetries: c.compactorCfg.CompactionRetries, + }) + + c.compactionRunsStarted.Inc() + + for retries.Ongoing() { + if success := c.compactUsers(ctx); success { + c.compactionRunsCompleted.Inc() + return + } + + retries.Wait() + } + + c.compactionRunsFailed.Inc() +} + +func (c *Compactor) compactUsers(ctx context.Context) bool { + level.Info(c.logger).Log("msg", "discovering users from bucket") + users, err := c.discoverUsers(ctx) + if err != nil { + level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err) + return false + } + level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users)) + + for _, userID := range users { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if ctx.Err() != nil { + level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err) + return false + } + + level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID) + + if err = c.compactUser(ctx, userID); err != nil { + level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err) + continue + } + + level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID) + } + + return true +} + +func (c *Compactor) compactUser(ctx context.Context, userID string) error { + bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient) + + syncer, err := compact.NewSyncer( + c.logger, + nil, // TODO(pracucci) we should pass the prometheus registerer, but we would need to inject the user label to each metric, otherwise we have clashing metrics + bucket, + c.compactorCfg.ConsistencyDelay, + c.compactorCfg.BlockSyncConcurrency, + false, // Do not accept malformed indexes + true, // Enable vertical compaction + []*relabel.Config{}) + if err != nil { + return errors.Wrap(err, "failed to create syncer") + } + + compactor, err := compact.NewBucketCompactor( + c.logger, + syncer, + c.tsdbCompactor, + path.Join(c.compactorCfg.DataDir, "compact"), + bucket, + // No compaction concurrency. Due to how Cortex works we don't + // expect to have multiple block groups per tenant, so setting + // a value higher than 1 would be useless. + 1, + ) + if err != nil { + return errors.Wrap(err, "failed to create bucket compactor") + } + + return compactor.Compact(ctx) +} + +func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) { + var users []string + + err := c.bucketClient.Iter(ctx, "", func(entry string) error { + users = append(users, strings.TrimSuffix(entry, "/")) + return nil + }) + + return users, err +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go new file mode 100644 index 0000000000..74abf5100a --- /dev/null +++ b/pkg/compactor/compactor_test.go @@ -0,0 +1,262 @@ +package compactor + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "flag" + "strings" + "testing" + "time" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestConfig_ShouldSupportYamlConfig(t *testing.T) { + yamlCfg := ` +block_ranges: [2h, 48h] +consistency_delay: 1h +block_sync_concurrency: 123 +data_dir: /tmp +compaction_interval: 15m +compaction_retries: 123 +` + + cfg := Config{} + flagext.DefaultValues(&cfg) + assert.NoError(t, yaml.Unmarshal([]byte(yamlCfg), &cfg)) + assert.Equal(t, cortex_tsdb.DurationList{2 * time.Hour, 48 * time.Hour}, cfg.BlockRanges) + assert.Equal(t, time.Hour, cfg.ConsistencyDelay) + assert.Equal(t, 123, cfg.BlockSyncConcurrency) + assert.Equal(t, "/tmp", cfg.DataDir) + assert.Equal(t, 15*time.Minute, cfg.CompactionInterval) + assert.Equal(t, 123, cfg.CompactionRetries) +} + +func TestConfig_ShouldSupportCliFlags(t *testing.T) { + fs := flag.NewFlagSet("", flag.PanicOnError) + cfg := Config{} + cfg.RegisterFlags(fs) + require.NoError(t, fs.Parse([]string{ + "-compactor.block-ranges=2h,48h", + "-compactor.consistency-delay=1h", + "-compactor.block-sync-concurrency=123", + "-compactor.data-dir=/tmp", + "-compactor.compaction-interval=15m", + "-compactor.compaction-retries=123", + })) + + assert.Equal(t, cortex_tsdb.DurationList{2 * time.Hour, 48 * time.Hour}, cfg.BlockRanges) + assert.Equal(t, time.Hour, cfg.ConsistencyDelay) + assert.Equal(t, 123, cfg.BlockSyncConcurrency) + assert.Equal(t, "/tmp", cfg.DataDir) + assert.Equal(t, 15*time.Minute, cfg.CompactionInterval) + assert.Equal(t, 123, cfg.CompactionRetries) +} + +func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { + t.Parallel() + + c, bucketClient, _, logs, registry := prepare(t) + + // No user blocks stored in the bucket. + bucketClient.MockIter("", []string{}, nil) + + // Wait until a run has completed. + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + c.Shutdown() + + assert.Equal(t, []string{ + `level=info msg="discovering users from bucket"`, + `level=info msg="discovered users from bucket" users=0`, + }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # TYPE cortex_compactor_runs_started_total counter + # HELP cortex_compactor_runs_started_total Total number of compaction runs started. + cortex_compactor_runs_started_total 1 + + # TYPE cortex_compactor_runs_completed_total counter + # HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed. + cortex_compactor_runs_completed_total 1 + + # TYPE cortex_compactor_runs_failed_total counter + # HELP cortex_compactor_runs_failed_total Total number of compaction runs failed. + cortex_compactor_runs_failed_total 0 + `))) +} + +func TestCompactor_ShouldRetryOnFailureWhileDiscoveringUsersFromBucket(t *testing.T) { + t.Parallel() + + c, bucketClient, _, logs, registry := prepare(t) + + // Fail to iterate over the bucket while discovering users. + bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) + + // Wait until all retry attempts have completed. + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsFailed) + }) + + c.Shutdown() + + // Ensure the bucket iteration has been retried the configured number of times. + bucketClient.AssertNumberOfCalls(t, "Iter", 3) + + assert.Equal(t, []string{ + `level=info msg="discovering users from bucket"`, + `level=error msg="failed to discover users from bucket" err="failed to iterate the bucket"`, + `level=info msg="discovering users from bucket"`, + `level=error msg="failed to discover users from bucket" err="failed to iterate the bucket"`, + `level=info msg="discovering users from bucket"`, + `level=error msg="failed to discover users from bucket" err="failed to iterate the bucket"`, + }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # TYPE cortex_compactor_runs_started_total counter + # HELP cortex_compactor_runs_started_total Total number of compaction runs started. + cortex_compactor_runs_started_total 1 + + # TYPE cortex_compactor_runs_completed_total counter + # HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed. + cortex_compactor_runs_completed_total 0 + + # TYPE cortex_compactor_runs_failed_total counter + # HELP cortex_compactor_runs_failed_total Total number of compaction runs failed. + cortex_compactor_runs_failed_total 1 + `))) +} + +func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { + t.Parallel() + + c, bucketClient, tsdbCompactor, logs, registry := prepare(t) + + // Mock the bucket to contain two users, each one with one block. + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) + bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + + // Mock the compactor as if there's no compaction to do, + // in order to simplify tests (all in all, we just want to + // test our logic and not TSDB compactor which we expect to + // be already tested). + tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil) + + // Wait until a run has completed. + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + c.Shutdown() + + // Ensure a plan has been executed for the blocks of each user. + tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2) + + assert.Equal(t, []string{ + `level=info msg="discovering users from bucket"`, + `level=info msg="discovered users from bucket" users=2`, + `level=info msg="starting compaction of user blocks" user=user-1`, + `level=info msg="start sync of metas"`, + `level=debug msg="download meta" block=01DTVP434PA9VFXSW2JKB3392D`, + `level=info msg="start of GC"`, + `level=info msg="start of compaction"`, + `level=info msg="successfully compacted user blocks" user=user-1`, + `level=info msg="starting compaction of user blocks" user=user-2`, + `level=info msg="start sync of metas"`, + `level=debug msg="download meta" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`, + `level=info msg="start of GC"`, + `level=info msg="start of compaction"`, + `level=info msg="successfully compacted user blocks" user=user-2`, + }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # TYPE cortex_compactor_runs_started_total counter + # HELP cortex_compactor_runs_started_total Total number of compaction runs started. + cortex_compactor_runs_started_total 1 + + # TYPE cortex_compactor_runs_completed_total counter + # HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed. + cortex_compactor_runs_completed_total 1 + + # TYPE cortex_compactor_runs_failed_total counter + # HELP cortex_compactor_runs_failed_total Total number of compaction runs failed. + cortex_compactor_runs_failed_total 0 + `))) +} + +func prepare(t *testing.T) (*Compactor, *cortex_tsdb.BucketClientMock, *tsdbCompactorMock, *bytes.Buffer, prometheus.Gatherer) { + compactorCfg := Config{} + storageCfg := cortex_tsdb.Config{} + flagext.DefaultValues(&compactorCfg, &storageCfg) + compactorCfg.retryMinBackoff = 0 + compactorCfg.retryMaxBackoff = 0 + + bucketClient := &cortex_tsdb.BucketClientMock{} + tsdbCompactor := &tsdbCompactorMock{} + logs := &bytes.Buffer{} + logger := log.NewLogfmtLogger(logs) + registry := prometheus.NewRegistry() + + ctx, cancelCtx := context.WithCancel(context.Background()) + c, err := newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registry) + require.NoError(t, err) + + return c, bucketClient, tsdbCompactor, logs, registry +} + +type tsdbCompactorMock struct { + mock.Mock +} + +func (m *tsdbCompactorMock) Plan(dir string) ([]string, error) { + args := m.Called(dir) + return args.Get(0).([]string), args.Error(1) +} + +func (m *tsdbCompactorMock) Write(dest string, b tsdb.BlockReader, mint, maxt int64, parent *tsdb.BlockMeta) (ulid.ULID, error) { + args := m.Called(dest, b, mint, maxt, parent) + return args.Get(0).(ulid.ULID), args.Error(1) +} + +func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) { + args := m.Called(dest, dirs, open) + return args.Get(0).(ulid.ULID), args.Error(1) +} + +func mockBlockMetaJSON(id string) string { + meta := tsdb.BlockMeta{ + ULID: ulid.MustParse(id), + MinTime: 1574776800000, + MaxTime: 1574784000000, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{ulid.MustParse(id)}, + }, + } + + content, err := json.Marshal(meta) + if err != nil { + panic("failed to marshal mocked block meta") + } + + return string(content) +} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index f2d934d0be..5373cc65f8 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -18,6 +18,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/chunk/storage" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/compactor" "github.com/cortexproject/cortex/pkg/configs/api" config_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" @@ -75,6 +76,7 @@ type Config struct { TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. TSDB tsdb.Config `yaml:"tsdb" doc:"hidden"` + Compactor compactor.Config `yaml:"compactor,omitempty" doc:"hidden"` Ruler ruler.Config `yaml:"ruler,omitempty"` ConfigDB db.Config `yaml:"configdb,omitempty"` @@ -109,6 +111,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.TableManager.RegisterFlags(f) c.Encoding.RegisterFlags(f) c.TSDB.RegisterFlags(f) + c.Compactor.RegisterFlags(f) c.Ruler.RegisterFlags(f) c.ConfigDB.RegisterFlags(f) @@ -165,6 +168,7 @@ type Cortex struct { configAPI *api.API configDB db.DB alertmanager *alertmanager.MultitenantAlertmanager + compactor *compactor.Compactor } // New makes a new Cortex. diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 27675823b0..271e962696 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/alertmanager" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/compactor" "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" @@ -53,6 +54,7 @@ const ( Ruler Configs AlertManager + Compactor All ) @@ -84,6 +86,8 @@ func (m moduleName) String() string { return "configs" case AlertManager: return "alertmanager" + case Compactor: + return "compactor" case All: return "all" default: @@ -129,6 +133,9 @@ func (m *moduleName) Set(s string) error { case "alertmanager": *m = AlertManager return nil + case "compactor": + *m = Compactor + return nil case "all": *m = All return nil @@ -458,6 +465,16 @@ func (t *Cortex) stopAlertmanager() error { return nil } +func (t *Cortex) initCompactor(cfg *Config) (err error) { + t.compactor, err = compactor.NewCompactor(cfg.Compactor, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + return err +} + +func (t *Cortex) stopCompactor() error { + t.compactor.Shutdown() + return nil +} + type module struct { deps []moduleName init func(t *Cortex, cfg *Config) error @@ -539,6 +556,12 @@ var modules = map[moduleName]module{ stop: (*Cortex).stopAlertmanager, }, + Compactor: { + deps: []moduleName{Server}, + init: (*Cortex).initCompactor, + stop: (*Cortex).stopCompactor, + }, + All: { deps: []moduleName{Querier, Ingester, Distributor, TableManager}, }, diff --git a/pkg/ingester/bucket.go b/pkg/ingester/bucket.go deleted file mode 100644 index cd1dfbe40a..0000000000 --- a/pkg/ingester/bucket.go +++ /dev/null @@ -1,73 +0,0 @@ -package ingester - -import ( - "fmt" - "io" - "strings" - - "github.com/thanos-io/thanos/pkg/objstore" - "golang.org/x/net/context" -) - -// Bucket is a wrapper around a objstore.Bucket that prepends writes with a userID -type Bucket struct { - UserID string - Bucket objstore.Bucket -} - -func (b *Bucket) fullName(name string) string { - return fmt.Sprintf("%s/%s", b.UserID, name) -} - -// Close implements io.Closer -func (b *Bucket) Close() error { return b.Bucket.Close() } - -// Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { - return b.Bucket.Upload(ctx, b.fullName(name), r) -} - -// Delete removes the object with the given name. -func (b *Bucket) Delete(ctx context.Context, name string) error { - return b.Bucket.Delete(ctx, b.fullName(name)) -} - -// Name returns the bucket name for the provider. -func (b *Bucket) Name() string { return b.Bucket.Name() } - -// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { - return b.Bucket.Iter(ctx, b.fullName(dir), func(s string) error { - /* - Since all objects are prefixed with the userID we need to strip the userID - upon passing to the processing function - */ - return f(strings.Join(strings.Split(s, "/")[1:], "/")) - }) -} - -// Get returns a reader for the given object name. -func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return b.Bucket.Get(ctx, b.fullName(name)) -} - -// GetRange returns a new range reader for the given object name and range. -func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - return b.Bucket.GetRange(ctx, b.fullName(name), off, length) -} - -// Exists checks if the given object exists in the bucket. -func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { - return b.Bucket.Exists(ctx, b.fullName(name)) -} - -// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. -func (b *Bucket) IsObjNotFoundErr(err error) bool { - return b.Bucket.IsObjNotFoundErr(err) -} - -// ObjectSize returns the size of the specified object. -func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - return b.Bucket.ObjectSize(ctx, b.fullName(name)) -} diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index ada51bec5a..309a2e2ce2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -53,7 +53,7 @@ type TSDBState struct { func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create the bucket client") } i := &Ingester{ @@ -426,7 +426,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { // Create a new user database db, err := tsdb.Open(udir, util.Logger, tsdbPromReg, &tsdb.Options{ RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond), - BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(), + BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMilliseconds(), NoLockfile: true, }) if err != nil { @@ -445,7 +445,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { // Create a new shipper for this database if i.cfg.TSDBConfig.ShipInterval > 0 { - s := shipper.New(util.Logger, tsdbPromReg, udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource) + s := shipper.New(util.Logger, tsdbPromReg, udir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), func() labels.Labels { return l }, metadata.ReceiveSource) i.done.Add(1) go func() { defer i.done.Done() diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 898b44cebd..d2efba4669 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -337,12 +336,6 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) level.Info(u.logger).Log("msg", "creating user bucket store", "user", userID) - // Bucket with the user wrapper - userBkt := &ingester.Bucket{ - UserID: userID, - Bucket: u.bucket, - } - reg := prometheus.NewRegistry() indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes maxItemSizeBytes := indexCacheSizeBytes / 2 @@ -357,7 +350,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) bs, err = store.NewBucketStore( u.logger, reg, - userBkt, + tsdb.NewUserBucketClient(userID, u.bucket), filepath.Join(u.cfg.BucketStore.SyncDir, userID), indexCache, uint64(u.cfg.BucketStore.MaxChunkPoolBytes), diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 55d3f96c7d..7516085d61 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -48,31 +48,38 @@ type Config struct { // DurationList is the block ranges for a tsdb type DurationList []time.Duration -// String implements the flag.Var interface -func (d DurationList) String() string { return "RangeList is the block ranges for a tsdb" } - -// Set implements the flag.Var interface -func (d DurationList) Set(s string) error { - blocks := strings.Split(s, ",") - d = make([]time.Duration, 0, len(blocks)) // flag.Parse may be called twice, so overwrite instead of append - for _, blk := range blocks { - t, err := time.ParseDuration(blk) +// String implements the flag.Value interface +func (d *DurationList) String() string { + values := make([]string, 0, len(*d)) + for _, v := range *d { + values = append(values, v.String()) + } + + return strings.Join(values, ",") +} + +// Set implements the flag.Value interface +func (d *DurationList) Set(s string) error { + values := strings.Split(s, ",") + *d = make([]time.Duration, 0, len(values)) // flag.Parse may be called twice, so overwrite instead of append + for _, v := range values { + t, err := time.ParseDuration(v) if err != nil { return err } - d = append(d, t) + *d = append(*d, t) } return nil } -// ToMillisecondRanges returns the duration list in milliseconds -func (d DurationList) ToMillisecondRanges() []int64 { - ranges := make([]int64, 0, len(d)) - for _, t := range d { - ranges = append(ranges, int64(t/time.Millisecond)) +// ToMilliseconds returns the duration list in milliseconds +func (d *DurationList) ToMilliseconds() []int64 { + values := make([]int64, 0, len(*d)) + for _, t := range *d { + values = append(values, t.Milliseconds()) } - return ranges + return values } // RegisterFlags registers the TSDB flags @@ -86,7 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } f.StringVar(&cfg.Dir, "experimental.tsdb.dir", "tsdb", "directory to place all TSDB's into") - f.Var(cfg.BlockRanges, "experimental.tsdb.block-ranges-period", "comma separated list of TSDB block ranges in time.Duration format") + f.Var(&cfg.BlockRanges, "experimental.tsdb.block-ranges-period", "comma separated list of TSDB block ranges in time.Duration format") f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention") f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 30*time.Second, "the frequency at which tsdb blocks are scanned for shipping. 0 means shipping is disabled.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 5aeeeba3a6..3d0edd74aa 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -86,7 +86,7 @@ func TestConfig_DurationList(t *testing.T) { t.Run(name, func(t *testing.T) { testdata.f(&testdata.cfg) - assert.Equal(t, testdata.expectedRanges, testdata.cfg.BlockRanges.ToMillisecondRanges()) + assert.Equal(t, testdata.expectedRanges, testdata.cfg.BlockRanges.ToMilliseconds()) }) } } diff --git a/pkg/storage/tsdb/user_bucket_client.go b/pkg/storage/tsdb/user_bucket_client.go new file mode 100644 index 0000000000..48fee1e2c2 --- /dev/null +++ b/pkg/storage/tsdb/user_bucket_client.go @@ -0,0 +1,81 @@ +package tsdb + +import ( + "fmt" + "io" + "strings" + + "github.com/thanos-io/thanos/pkg/objstore" + "golang.org/x/net/context" +) + +// UserBucketClient is a wrapper around a objstore.Bucket that prepends writes with a userID +type UserBucketClient struct { + userID string + bucket objstore.Bucket +} + +// NewUserBucketClient makes a new UserBucketClient. +func NewUserBucketClient(userID string, bucket objstore.Bucket) *UserBucketClient { + return &UserBucketClient{ + userID: userID, + bucket: bucket, + } +} + +func (b *UserBucketClient) fullName(name string) string { + return fmt.Sprintf("%s/%s", b.userID, name) +} + +// Close implements io.Closer +func (b *UserBucketClient) Close() error { return b.bucket.Close() } + +// Upload the contents of the reader as an object into the bucket. +func (b *UserBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { + return b.bucket.Upload(ctx, b.fullName(name), r) +} + +// Delete removes the object with the given name. +func (b *UserBucketClient) Delete(ctx context.Context, name string) error { + return b.bucket.Delete(ctx, b.fullName(name)) +} + +// Name returns the bucket name for the provider. +func (b *UserBucketClient) Name() string { return b.bucket.Name() } + +// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *UserBucketClient) Iter(ctx context.Context, dir string, f func(string) error) error { + return b.bucket.Iter(ctx, b.fullName(dir), func(s string) error { + /* + Since all objects are prefixed with the userID we need to strip the userID + upon passing to the processing function + */ + return f(strings.Join(strings.Split(s, "/")[1:], "/")) + }) +} + +// Get returns a reader for the given object name. +func (b *UserBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.bucket.Get(ctx, b.fullName(name)) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *UserBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.bucket.GetRange(ctx, b.fullName(name), off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *UserBucketClient) Exists(ctx context.Context, name string) (bool, error) { + return b.bucket.Exists(ctx, b.fullName(name)) +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *UserBucketClient) IsObjNotFoundErr(err error) bool { + return b.bucket.IsObjNotFoundErr(err) +} + +// ObjectSize returns the size of the specified object. +func (b *UserBucketClient) ObjectSize(ctx context.Context, name string) (uint64, error) { + return b.bucket.ObjectSize(ctx, b.fullName(name)) +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go new file mode 100644 index 0000000000..b853ff5be1 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -0,0 +1,1165 @@ +package compact + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/tsdb" + terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type ResolutionLevel int64 + +const ( + ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) + ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) + ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) + + MinimumAgeForRemoval = time.Duration(30 * time.Minute) +) + +var blockTooFreshSentinelError = errors.New("Block too fresh") + +// Syncer syncronizes block metas from a bucket into a local directory. +// It sorts them into compaction groups based on equal label sets. +type Syncer struct { + logger log.Logger + reg prometheus.Registerer + bkt objstore.Bucket + consistencyDelay time.Duration + mtx sync.Mutex + blocks map[ulid.ULID]*metadata.Meta + blocksMtx sync.Mutex + blockSyncConcurrency int + metrics *syncerMetrics + acceptMalformedIndex bool + enableVerticalCompaction bool + relabelConfig []*relabel.Config +} + +type syncerMetrics struct { + syncMetas prometheus.Counter + syncMetaFailures prometheus.Counter + syncMetaDuration prometheus.Histogram + garbageCollectedBlocks prometheus.Counter + garbageCollections prometheus.Counter + garbageCollectionFailures prometheus.Counter + garbageCollectionDuration prometheus.Histogram + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec +} + +func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { + var m syncerMetrics + + m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_sync_meta_total", + Help: "Total number of sync meta operations.", + }) + m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_sync_meta_failures_total", + Help: "Total number of failed sync meta operations.", + }) + m.syncMetaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_compact_sync_meta_duration_seconds", + Help: "Time it took to sync meta files.", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, + }) + + m.garbageCollectedBlocks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_garbage_collected_blocks_total", + Help: "Total number of deleted blocks by compactor.", + }) + m.garbageCollections = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_garbage_collection_total", + Help: "Total number of garbage collection operations.", + }) + m.garbageCollectionFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_garbage_collection_failures_total", + Help: "Total number of failed garbage collection operations.", + }) + m.garbageCollectionDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_compact_garbage_collection_duration_seconds", + Help: "Time it took to perform garbage collection iteration.", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, + }) + + m.compactions = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block.", + }, []string{"group"}) + m.compactionRunsStarted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_started_total", + Help: "Total number of group compaction attempts.", + }, []string{"group"}) + m.compactionRunsCompleted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_completed_total", + Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", + }, []string{"group"}) + m.compactionFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_failures_total", + Help: "Total number of failed group compactions.", + }, []string{"group"}) + m.verticalCompactions = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_vertical_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", + }, []string{"group"}) + + if reg != nil { + reg.MustRegister( + m.syncMetas, + m.syncMetaFailures, + m.syncMetaDuration, + m.garbageCollectedBlocks, + m.garbageCollections, + m.garbageCollectionFailures, + m.garbageCollectionDuration, + m.compactions, + m.compactionRunsStarted, + m.compactionRunsCompleted, + m.compactionFailures, + m.verticalCompactions, + ) + } + return &m +} + +// NewSyncer returns a new Syncer for the given Bucket and directory. +// Blocks must be at least as old as the sync delay for being considered. +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) { + if logger == nil { + logger = log.NewNopLogger() + } + return &Syncer{ + logger: logger, + reg: reg, + consistencyDelay: consistencyDelay, + blocks: map[ulid.ULID]*metadata.Meta{}, + bkt: bkt, + metrics: newSyncerMetrics(reg), + blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, + relabelConfig: relabelConfig, + // The syncer offers an option to enable vertical compaction, even if it's + // not currently used by Thanos, because the compactor is also used by Cortex + // which needs vertical compaction. + enableVerticalCompaction: enableVerticalCompaction, + }, nil +} + +// SyncMetas synchronizes all meta files from blocks in the bucket into +// the memory. It removes any partial blocks older than the max of +// consistencyDelay and MinimumAgeForRemoval from the bucket. +func (c *Syncer) SyncMetas(ctx context.Context) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + begin := time.Now() + + err := c.syncMetas(ctx) + if err != nil { + c.metrics.syncMetaFailures.Inc() + } + c.metrics.syncMetas.Inc() + c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds()) + return err +} + +// UntilNextDownsampling calculates how long it will take until the next downsampling operation. +// Returns an error if there will be no downsampling. +func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { + timeRange := time.Duration((m.MaxTime - m.MinTime) * int64(time.Millisecond)) + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel2: + return time.Duration(0), errors.New("no downsampling") + case downsample.ResLevel1: + return time.Duration(downsample.DownsampleRange1*time.Millisecond) - timeRange, nil + case downsample.ResLevel0: + return time.Duration(downsample.DownsampleRange0*time.Millisecond) - timeRange, nil + default: + panic(fmt.Errorf("invalid resolution %v", m.Thanos.Downsample.Resolution)) + } +} + +func (c *Syncer) syncMetas(ctx context.Context) error { + var wg sync.WaitGroup + defer wg.Wait() + + metaIDsChan := make(chan ulid.ULID) + errChan := make(chan error, c.blockSyncConcurrency) + + workCtx, cancel := context.WithCancel(ctx) + defer cancel() + for i := 0; i < c.blockSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for id := range metaIDsChan { + // Check if we already have this block cached locally. + c.blocksMtx.Lock() + _, seen := c.blocks[id] + c.blocksMtx.Unlock() + if seen { + continue + } + + meta, err := c.downloadMeta(workCtx, id) + if err == blockTooFreshSentinelError { + continue + } + + if err != nil { + if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored { + continue + } + errChan <- err + return + } + + // Check for block labels by relabeling. + // If output is empty, the block will be dropped. + lset := labels.FromMap(meta.Thanos.Labels) + processedLabels := relabel.Process(lset, c.relabelConfig...) + if processedLabels == nil { + level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) + continue + } + + c.blocksMtx.Lock() + c.blocks[id] = meta + c.blocksMtx.Unlock() + } + }() + } + + // Read back all block metas so we can detect deleted blocks. + remote := map[ulid.ULID]struct{}{} + + err := c.bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + remote[id] = struct{}{} + + select { + case <-ctx.Done(): + case metaIDsChan <- id: + } + + return nil + }) + close(metaIDsChan) + if err != nil { + return retry(errors.Wrap(err, "retrieve bucket block metas")) + } + + wg.Wait() + close(errChan) + + if err := <-errChan; err != nil { + return retry(err) + } + + // Delete all local block dirs that no longer exist in the bucket. + for id := range c.blocks { + if _, ok := remote[id]; !ok { + delete(c.blocks, id) + } + } + + return nil +} + +func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { + level.Debug(c.logger).Log("msg", "download meta", "block", id) + + meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) + if err != nil { + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { + level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) + return nil, blockTooFreshSentinelError + } + return nil, errors.Wrapf(err, "downloading meta.json for %s", id) + } + + // ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to + // avoid races when a block is only partially uploaded. This relates to all blocks, excluding: + // - repair created blocks + // - compactor created blocks + // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. + // TODO(bplotka): https://github.com/thanos-io/thanos/issues/377. + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) + return nil, blockTooFreshSentinelError + } + + return &meta, nil +} + +// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that +// are younger than MinimumAgeForRemoval. +func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { + metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) + if err != nil { + level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) + return false + } + if metaExists { + // Meta exists, block is not malformed. + return false + } + + if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { + // Minimum delay has not expired, ignore for now. + return true + } + + if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil { + level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) + return false + } + level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) + + return true +} + +// GroupKey returns a unique identifier for the group the block belongs to. It considers +// the downsampling resolution and the block's labels. +func GroupKey(meta metadata.Thanos) string { + return groupKey(meta.Downsample.Resolution, labels.FromMap(meta.Labels)) +} + +func groupKey(res int64, lbls labels.Labels) string { + return fmt.Sprintf("%d@%v", res, lbls.Hash()) +} + +// Groups returns the compaction groups for all blocks currently known to the syncer. +// It creates all groups from the scratch on every call. +func (c *Syncer) Groups() (res []*Group, err error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + groups := map[string]*Group{} + for _, m := range c.blocks { + g, ok := groups[GroupKey(m.Thanos)] + if !ok { + g, err = newGroup( + log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)), + c.bkt, + labels.FromMap(m.Thanos.Labels), + m.Thanos.Downsample.Resolution, + c.acceptMalformedIndex, + c.enableVerticalCompaction, + c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.garbageCollectedBlocks, + ) + if err != nil { + return nil, errors.Wrap(err, "create compaction group") + } + groups[GroupKey(m.Thanos)] = g + res = append(res, g) + } + if err := g.Add(m); err != nil { + return nil, errors.Wrap(err, "add compaction group") + } + } + sort.Slice(res, func(i, j int) bool { + return res[i].Key() < res[j].Key() + }) + return res, nil +} + +// GarbageCollect deletes blocks from the bucket if their data is available as part of a +// block with a higher compaction level. +func (c *Syncer) GarbageCollect(ctx context.Context) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + begin := time.Now() + + // Run a separate round of garbage collections for each valid resolution. + for _, res := range []int64{ + downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, + } { + err := c.garbageCollect(ctx, res) + if err != nil { + c.metrics.garbageCollectionFailures.Inc() + } + c.metrics.garbageCollections.Inc() + c.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) + + if err != nil { + return errors.Wrapf(err, "garbage collect resolution %d", res) + } + } + return nil +} + +func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { + // Map each block to its highest priority parent. Initial blocks have themselves + // in their source section, i.e. are their own parent. + parents := map[ulid.ULID]ulid.ULID{} + + for id, meta := range c.blocks { + + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + + // For each source block we contain, check whether we are the highest priority parent block. + for _, sid := range meta.Compaction.Sources { + pid, ok := parents[sid] + // No parents for the source block so far. + if !ok { + parents[sid] = id + continue + } + pmeta, ok := c.blocks[pid] + if !ok { + return nil, errors.Errorf("previous parent block %s not found", pid) + } + // The current block is the higher priority parent for the source if its + // compaction level is higher than that of the previously set parent. + // If compaction levels are equal, the more recent ULID wins. + // + // The ULID recency alone is not sufficient since races, e.g. induced + // by downtime of garbage collection, may re-compact blocks that are + // were already compacted into higher-level blocks multiple times. + level, plevel := meta.Compaction.Level, pmeta.Compaction.Level + + if level > plevel || (level == plevel && id.Compare(pid) > 0) { + parents[sid] = id + } + } + } + + // A block can safely be deleted if they are not the highest priority parent for + // any source block. + topParents := map[ulid.ULID]struct{}{} + for _, pid := range parents { + topParents[pid] = struct{}{} + } + + for id, meta := range c.blocks { + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + if _, ok := topParents[id]; ok { + continue + } + + ids = append(ids, id) + } + return ids, nil +} + +func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { + garbageIds, err := c.GarbageBlocks(resolution) + if err != nil { + return err + } + + for _, id := range garbageIds { + if ctx.Err() != nil { + return ctx.Err() + } + + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + + level.Info(c.logger).Log("msg", "deleting outdated block", "block", id) + + err := block.Delete(delCtx, c.logger, c.bkt, id) + cancel() + if err != nil { + return retry(errors.Wrapf(err, "delete block %s from bucket", id)) + } + + // Immediately update our in-memory state so no further call to SyncMetas is needed + // after running garbage collection. + delete(c.blocks, id) + c.metrics.garbageCollectedBlocks.Inc() + } + return nil +} + +// Group captures a set of blocks that have the same origin labels and downsampling resolution. +// Those blocks generally contain the same series and can thus efficiently be compacted. +type Group struct { + logger log.Logger + bkt objstore.Bucket + labels labels.Labels + resolution int64 + mtx sync.Mutex + blocks map[ulid.ULID]*metadata.Meta + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionFailures prometheus.Counter + verticalCompactions prometheus.Counter + groupGarbageCollectedBlocks prometheus.Counter +} + +// newGroup returns a new compaction group. +func newGroup( + logger log.Logger, + bkt objstore.Bucket, + lset labels.Labels, + resolution int64, + acceptMalformedIndex bool, + enableVerticalCompaction bool, + compactions prometheus.Counter, + compactionRunsStarted prometheus.Counter, + compactionRunsCompleted prometheus.Counter, + compactionFailures prometheus.Counter, + verticalCompactions prometheus.Counter, + groupGarbageCollectedBlocks prometheus.Counter, +) (*Group, error) { + if logger == nil { + logger = log.NewNopLogger() + } + g := &Group{ + logger: logger, + bkt: bkt, + labels: lset, + resolution: resolution, + blocks: map[ulid.ULID]*metadata.Meta{}, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + compactions: compactions, + compactionRunsStarted: compactionRunsStarted, + compactionRunsCompleted: compactionRunsCompleted, + compactionFailures: compactionFailures, + verticalCompactions: verticalCompactions, + groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, + } + return g, nil +} + +// Key returns an identifier for the group. +func (cg *Group) Key() string { + return groupKey(cg.resolution, cg.labels) +} + +// Add the block with the given meta to the group. +func (cg *Group) Add(meta *metadata.Meta) error { + cg.mtx.Lock() + defer cg.mtx.Unlock() + + if !labels.Equal(cg.labels, labels.FromMap(meta.Thanos.Labels)) { + return errors.New("block and group labels do not match") + } + if cg.resolution != meta.Thanos.Downsample.Resolution { + return errors.New("block and group resolution do not match") + } + cg.blocks[meta.ULID] = meta + return nil +} + +// IDs returns all sorted IDs of blocks in the group. +func (cg *Group) IDs() (ids []ulid.ULID) { + cg.mtx.Lock() + defer cg.mtx.Unlock() + + for id := range cg.blocks { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { + return ids[i].Compare(ids[j]) < 0 + }) + return ids +} + +// Labels returns the labels that all blocks in the group share. +func (cg *Group) Labels() labels.Labels { + return cg.labels +} + +// Resolution returns the common downsampling resolution of blocks in the group. +func (cg *Group) Resolution() int64 { + return cg.resolution +} + +// Compact plans and runs a single compaction against the group. The compacted result +// is uploaded into the bucket the blocks were retrieved from. +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { + cg.compactionRunsStarted.Inc() + + subDir := filepath.Join(dir, cg.Key()) + + defer func() { + if err := os.RemoveAll(subDir); err != nil { + level.Error(cg.logger).Log("msg", "failed to remove compaction group work directory", "path", subDir, "err", err) + } + }() + + if err := os.RemoveAll(subDir); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") + } + if err := os.MkdirAll(subDir, 0777); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + } + + shouldRerun, compID, err := cg.compact(ctx, subDir, comp) + if err != nil { + cg.compactionFailures.Inc() + return false, ulid.ULID{}, err + } + cg.compactionRunsCompleted.Inc() + return shouldRerun, compID, nil +} + +// Issue347Error is a type wrapper for errors that should invoke repair process for broken block. +type Issue347Error struct { + err error + + id ulid.ULID +} + +func issue347Error(err error, brokenBlock ulid.ULID) Issue347Error { + return Issue347Error{err: err, id: brokenBlock} +} + +func (e Issue347Error) Error() string { + return e.err.Error() +} + +// IsIssue347Error returns true if the base error is a Issue347Error. +func IsIssue347Error(err error) bool { + _, ok := errors.Cause(err).(Issue347Error) + return ok +} + +// HaltError is a type wrapper for errors that should halt any further progress on compactions. +type HaltError struct { + err error +} + +func halt(err error) HaltError { + return HaltError{err: err} +} + +func (e HaltError) Error() string { + return e.err.Error() +} + +// IsHaltError returns true if the base error is a HaltError. +// If a multierror is passed, any halt error will return true. +func IsHaltError(err error) bool { + if multiErr, ok := err.(terrors.MultiError); ok { + for _, err := range multiErr { + if _, ok := errors.Cause(err).(HaltError); ok { + return true + } + } + return false + } + + _, ok := errors.Cause(err).(HaltError) + return ok +} + +// RetryError is a type wrapper for errors that should trigger warning log and retry whole compaction loop, but aborting +// current compaction further progress. +type RetryError struct { + err error +} + +func retry(err error) error { + if IsHaltError(err) { + return err + } + return RetryError{err: err} +} + +func (e RetryError) Error() string { + return e.err.Error() +} + +// IsRetryError returns true if the base error is a RetryError. +// If a multierror is passed, all errors must be retriable. +func IsRetryError(err error) bool { + if multiErr, ok := err.(terrors.MultiError); ok { + for _, err := range multiErr { + if _, ok := errors.Cause(err).(RetryError); !ok { + return false + } + } + return true + } + + _, ok := errors.Cause(err).(RetryError) + return ok +} + +func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error { + var ( + metas []tsdb.BlockMeta + exclude = map[ulid.ULID]struct{}{} + ) + + for _, e := range excludeDirs { + id, err := ulid.Parse(filepath.Base(e)) + if err != nil { + return errors.Wrapf(err, "overlaps find dir %s", e) + } + exclude[id] = struct{}{} + } + + for _, m := range cg.blocks { + if _, ok := exclude[m.ULID]; ok { + continue + } + metas = append(metas, m.BlockMeta) + } + + if include != nil { + metas = append(metas, include.BlockMeta) + } + + sort.Slice(metas, func(i, j int) bool { + return metas[i].MinTime < metas[j].MinTime + }) + if overlaps := tsdb.OverlappingBlocks(metas); len(overlaps) > 0 { + return errors.Errorf("overlaps found while gathering blocks. %s", overlaps) + } + return nil +} + +// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, issue347Err error) error { + ie, ok := errors.Cause(issue347Err).(Issue347Error) + if !ok { + return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) + } + + level.Info(logger).Log("msg", "Repairing block broken by https://github.com/prometheus/tsdb/issues/347", "id", ie.id, "err", issue347Err) + + tmpdir, err := ioutil.TempDir("", fmt.Sprintf("repair-issue-347-id-%s-", ie.id)) + if err != nil { + return err + } + + defer func() { + if err := os.RemoveAll(tmpdir); err != nil { + level.Warn(logger).Log("msg", "failed to remote tmpdir", "err", err, "tmpdir", tmpdir) + } + }() + + bdir := filepath.Join(tmpdir, ie.id.String()) + if err := block.Download(ctx, logger, bkt, ie.id, bdir); err != nil { + return retry(errors.Wrapf(err, "download block %s", ie.id)) + } + + meta, err := metadata.Read(bdir) + if err != nil { + return errors.Wrapf(err, "read meta from %s", bdir) + } + + resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + if err != nil { + return errors.Wrapf(err, "repair failed for block %s", ie.id) + } + + // Verify repaired id before uploading it. + if err := block.VerifyIndex(logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + return errors.Wrapf(err, "repaired block is invalid %s", resid) + } + + level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) + if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String())); err != nil { + return retry(errors.Wrapf(err, "upload of %s failed", resid)) + } + + level.Info(logger).Log("msg", "deleting broken block", "id", ie.id) + + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). + if err := block.Delete(delCtx, logger, bkt, ie.id); err != nil { + return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) + } + + return nil +} + +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { + cg.mtx.Lock() + defer cg.mtx.Unlock() + + // Check for overlapped blocks. + overlappingBlocks := false + if err := cg.areBlocksOverlapping(nil); err != nil { + if !cg.enableVerticalCompaction { + return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) + } + + overlappingBlocks = true + } + + // Planning a compaction works purely based on the meta.json files in our future group's dir. + // So we first dump all our memory block metas into the directory. + for _, meta := range cg.blocks { + bdir := filepath.Join(dir, meta.ULID.String()) + if err := os.MkdirAll(bdir, 0777); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir") + } + if err := metadata.Write(cg.logger, bdir, meta); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file") + } + } + + // Plan against the written meta.json files. + plan, err := comp.Plan(dir) + if err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") + } + if len(plan) == 0 { + // Nothing to do. + return false, ulid.ULID{}, nil + } + + // Due to #183 we verify that none of the blocks in the plan have overlapping sources. + // This is one potential source of how we could end up with duplicated chunks. + uniqueSources := map[ulid.ULID]struct{}{} + + // Once we have a plan we need to download the actual data. + begin := time.Now() + + for _, pdir := range plan { + meta, err := metadata.Read(pdir) + if err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir) + } + + if cg.Key() != GroupKey(meta.Thanos) { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(meta.Thanos))) + } + + for _, s := range meta.Compaction.Sources { + if _, ok := uniqueSources[s]; ok { + return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) + } + uniqueSources[s] = struct{}{} + } + + id, err := ulid.Parse(filepath.Base(pdir)) + if err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "plan dir %s", pdir) + } + + if meta.ULID.Compare(id) != 0 { + return false, ulid.ULID{}, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) + } + + if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id)) + } + + // Ensure all input blocks are valid. + stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + if err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir) + } + + if err := stats.CriticalErr(); err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) + } + + if err := stats.Issue347OutsideChunksErr(); err != nil { + return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) + } + + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", id) + } + } + level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", + "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) + + begin = time.Now() + + compID, err = comp.Compact(dir, plan, nil) + if err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", plan)) + } + if compID == (ulid.ULID{}) { + // Prometheus compactor found that the compacted block would have no samples. + level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan)) + for _, block := range plan { + meta, err := metadata.Read(block) + if err != nil { + level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block) + continue + } + if meta.Stats.NumSamples == 0 { + if err := cg.deleteBlock(block); err != nil { + level.Warn(cg.logger).Log("msg", "failed to delete empty block found during compaction", "block", block) + } + } + } + // Even though this block was empty, there may be more work to do. + return true, ulid.ULID{}, nil + } + cg.compactions.Inc() + if overlappingBlocks { + cg.verticalCompactions.Inc() + } + level.Debug(cg.logger).Log("msg", "compacted blocks", + "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks) + + bdir := filepath.Join(dir, compID.String()) + index := filepath.Join(bdir, block.IndexFilename) + indexCache := filepath.Join(bdir, block.IndexCacheFilename) + + newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ + Labels: cg.labels.Map(), + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, + }, nil) + if err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) + } + + if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") + } + + // Ensure the output block is valid. + if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + } + + // Ensure the output block is not overlapping with anything else, + // unless vertical compaction is enabled. + if !cg.enableVerticalCompaction { + if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + } + } + + if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "write index cache") + } + + begin = time.Now() + + if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) + } + level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) + + // Delete the blocks we just compacted from the group and bucket so they do not get included + // into the next planning cycle. + // Eventually the block we just uploaded should get synced into the group again (including sync-delay). + for _, b := range plan { + if err := cg.deleteBlock(b); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket")) + } + cg.groupGarbageCollectedBlocks.Inc() + } + + return true, compID, nil +} + +func (cg *Group) deleteBlock(b string) error { + id, err := ulid.Parse(filepath.Base(b)) + if err != nil { + return errors.Wrapf(err, "plan dir %s", b) + } + + if err := os.RemoveAll(b); err != nil { + return errors.Wrapf(err, "remove old block dir %s", id) + } + + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) + if err := block.Delete(delCtx, cg.logger, cg.bkt, id); err != nil { + return errors.Wrapf(err, "delete block %s from bucket", id) + } + return nil +} + +// BucketCompactor compacts blocks in a bucket. +type BucketCompactor struct { + logger log.Logger + sy *Syncer + comp tsdb.Compactor + compactDir string + bkt objstore.Bucket + concurrency int +} + +// NewBucketCompactor creates a new bucket compactor. +func NewBucketCompactor( + logger log.Logger, + sy *Syncer, + comp tsdb.Compactor, + compactDir string, + bkt objstore.Bucket, + concurrency int, +) (*BucketCompactor, error) { + if concurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) + } + return &BucketCompactor{ + logger: logger, + sy: sy, + comp: comp, + compactDir: compactDir, + bkt: bkt, + concurrency: concurrency, + }, nil +} + +// Compact runs compaction over bucket. +func (c *BucketCompactor) Compact(ctx context.Context) error { + defer func() { + if err := os.RemoveAll(c.compactDir); err != nil { + level.Error(c.logger).Log("msg", "failed to remove compaction work directory", "path", c.compactDir, "err", err) + } + }() + + // Loop over bucket and compact until there's no work left. + for { + var ( + wg sync.WaitGroup + workCtx, workCtxCancel = context.WithCancel(ctx) + groupChan = make(chan *Group) + errChan = make(chan error, c.concurrency) + finishedAllGroups = true + mtx sync.Mutex + ) + defer workCtxCancel() + + // Set up workers who will compact the groups when the groups are ready. + // They will compact available groups until they encounter an error, after which they will stop. + for i := 0; i < c.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for g := range groupChan { + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp) + if err == nil { + if shouldRerunGroup { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + } + continue + } + + if IsIssue347Error(err) { + if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } + } + errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key())) + return + } + }() + } + + // Clean up the compaction temporary directory at the beginning of every compaction loop. + if err := os.RemoveAll(c.compactDir); err != nil { + return errors.Wrap(err, "clean up the compaction temporary directory") + } + + level.Info(c.logger).Log("msg", "start sync of metas") + + if err := c.sy.SyncMetas(ctx); err != nil { + return errors.Wrap(err, "sync") + } + + level.Info(c.logger).Log("msg", "start of GC") + + // Blocks that were compacted are garbage collected after each Compaction. + // However if compactor crashes we need to resolve those on startup. + if err := c.sy.GarbageCollect(ctx); err != nil { + return errors.Wrap(err, "garbage") + } + + level.Info(c.logger).Log("msg", "start of compaction") + + groups, err := c.sy.Groups() + if err != nil { + return errors.Wrap(err, "build compaction groups") + } + + // Send all groups found during this pass to the compaction workers. + var groupErrs terrors.MultiError + + groupLoop: + for _, g := range groups { + select { + case groupErr := <-errChan: + groupErrs.Add(groupErr) + break groupLoop + case groupChan <- g: + } + } + close(groupChan) + wg.Wait() + + // Collect any other error reported by the workers, or any error reported + // while we were waiting for the last batch of groups to run the compaction. + close(errChan) + for groupErr := range errChan { + groupErrs.Add(groupErr) + } + + workCtxCancel() + if len(groupErrs) > 0 { + return groupErrs + } + + if finishedAllGroups { + break + } + } + return nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/retention.go b/vendor/github.com/thanos-io/thanos/pkg/compact/retention.go new file mode 100644 index 0000000000..9021677f2b --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/retention.go @@ -0,0 +1,48 @@ +package compact + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" +) + +// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. +// A value of 0 disables the retention for its resolution. +func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, retentionByResolution map[ResolutionLevel]time.Duration) error { + level.Info(logger).Log("msg", "start optional retention") + if err := bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + m, err := block.DownloadMeta(ctx, logger, bkt, id) + if err != nil { + return errors.Wrap(err, "download metadata") + } + + retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)] + if retentionDuration.Seconds() == 0 { + return nil + } + + maxTime := time.Unix(m.MaxTime/1000, 0) + if time.Now().After(maxTime.Add(retentionDuration)) { + level.Info(logger).Log("msg", "applying retention: deleting block", "id", id, "maxTime", maxTime.String()) + if err := block.Delete(ctx, logger, bkt, id); err != nil { + return errors.Wrap(err, "delete block") + } + } + + return nil + }); err != nil { + return errors.Wrap(err, "retention") + } + + level.Info(logger).Log("msg", "optional retention apply done") + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c1421e20d3..6ebdca1ee1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -521,6 +521,7 @@ github.com/stretchr/testify/require # github.com/thanos-io/thanos v0.8.1-0.20200102143048-a37ac093a67a github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/metadata +github.com/thanos-io/thanos/pkg/compact github.com/thanos-io/thanos/pkg/compact/downsample github.com/thanos-io/thanos/pkg/component github.com/thanos-io/thanos/pkg/extprom