diff --git a/CHANGELOG.md b/CHANGELOG.md index 8945f3425a8..098a22d186e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information. - [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down. - [#1712](https://github.com/thanos-io/thanos/pull/1712) Rename flag on bucket web component from `--listen` to `--http-address` to match other components. +- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. ### Fixed diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 3a7646501c9..b1bb34ce0ab 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -316,8 +316,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str label := cmd.Flag("label", "Prometheus label to use as timeline title").String() m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { - ctx, cancel := context.WithCancel(context.Background()) - statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. srv := httpserver.New(logger, reg, component.Bucket, statusProber, @@ -342,8 +340,20 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout") } + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String()) + if err != nil { + return errors.Wrap(err, "bucket client") + } + + ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig) + defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + return refresh(ctx, logger, bucketUI, *interval, *timeout, bkt, block.MetaDownloaderFn(block.DownloadMeta)) }, func(error) { cancel() }) @@ -354,25 +364,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str } } -// refresh metadata from remote storage periodically and update UI. -func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error { - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - - bkt, err := client.NewBucket(logger, confContentYaml, reg, name) - if err != nil { - return errors.Wrap(err, "bucket client") - } - - defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") +// refresh metadata from remote storage periodically and update the UI. +func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, bkt objstore.Bucket, metaDownloader block.MetaDownloader) error { return runutil.Repeat(duration, ctx.Done(), func() error { return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error { iterCtx, iterCancel := context.WithTimeout(ctx, timeout) defer iterCancel() - blocks, err := download(iterCtx, logger, bkt) + blocks, err := download(iterCtx, logger, bkt, metaDownloader) if err != nil { bucketUI.Set("[]", err) return err @@ -389,7 +388,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati }) } -func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (blocks []metadata.Meta, err error) { +func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, metaDownloader block.MetaDownloader) (blocks []metadata.Meta, err error) { level.Info(logger).Log("msg", "synchronizing block metadata") if err = bkt.Iter(ctx, "", func(name string) error { @@ -398,7 +397,7 @@ func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (bloc return nil } - meta, err := block.DownloadMeta(ctx, logger, bkt, id) + meta, err := metaDownloader.DownloadMeta(ctx, logger, bkt, id) if err != nil { return err } diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ba099949131..8f263065a80 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -16,6 +16,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/route" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -23,11 +24,13 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/ui" "gopkg.in/alecthomas/kingpin.v2" ) @@ -113,6 +116,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) + label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String() + m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -135,6 +140,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { *blockSyncConcurrency, *compactionConcurrency, selectorRelabelConf, + *label, ) } } @@ -159,6 +165,7 @@ func runCompact( blockSyncConcurrency int, concurrency int, selectorRelabelConf *extflag.PathOrContent, + label string, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -211,8 +218,9 @@ func runCompact( } }() + metaDownloader := block.NewCachingMetaDownloader(block.MetaDownloaderFn(block.DownloadMeta)) sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, - blockSyncConcurrency, acceptMalformedIndex, relabelConfig) + blockSyncConcurrency, acceptMalformedIndex, relabelConfig, metaDownloader) if err != nil { return errors.Wrap(err, "create syncer") } @@ -295,6 +303,9 @@ func runCompact( return nil } + // Compaction and bucket refresh interval. + interval := 5 * time.Minute + g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") @@ -310,7 +321,7 @@ func runCompact( } // --wait=true is specified. - return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { + return runutil.Repeat(interval, ctx.Done(), func() error { err := f() if err == nil { return nil @@ -343,6 +354,19 @@ func runCompact( cancel() }) + if wait { + router := route.New() + bucketUI := ui.NewBucketUI(logger, label) + bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg)) + srv.Handle("/", router) + + g.Add(func() error { + return refresh(ctx, logger, bucketUI, interval, time.Minute, bkt, metaDownloader) + }, func(error) { + cancel() + }) + } + level.Info(logger).Log("msg", "starting compact node") statusProber.SetReady() return nil diff --git a/docs/components/compact.md b/docs/components/compact.md index 4a710a6773b..b6b0c9e8504 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -135,5 +135,8 @@ Flags: selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + --bucket-web-label=BUCKET-WEB-LABEL + Prometheus label to use as timeline title in the + bucket web UI ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 4a0115ad218..8d2275cda48 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "strings" + "sync" "github.com/go-kit/kit/log/level" @@ -188,3 +189,47 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } + +// MetaDownloader abstracts anything that can download block metas. +type MetaDownloader interface { + DownloadMeta(context.Context, log.Logger, objstore.Bucket, ulid.ULID) (metadata.Meta, error) +} + +// MetaDownloaderFn turns any func that downloads metas into a MetaDownloader. +type MetaDownloaderFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) + +// DownloadMeta implements the MetaDownloader interface. +func (m MetaDownloaderFn) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { + return m(ctx, logger, bkt, id) +} + +// CachingMetaDownloader is a MetaDownloader that can cache metas. +type CachingMetaDownloader struct { + sync.Mutex + next MetaDownloader + metas map[ulid.ULID]metadata.Meta +} + +// NewCachingMetaDownloader creates a new MetaDownloader that can cache metas +// and uses the given MetaDownloader when something is not in the cache. +func NewCachingMetaDownloader(next MetaDownloader) MetaDownloader { + return &CachingMetaDownloader{ + next: next, + metas: make(map[ulid.ULID]metadata.Meta), + } +} + +// DownloadMeta implements the MetaDownloader interface. +func (c *CachingMetaDownloader) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { + c.Lock() + defer c.Unlock() + m, ok := c.metas[id] + if ok { + return m, nil + } + m, err := c.next.DownloadMeta(ctx, logger, bkt, id) + if err == nil { + c.metas[id] = m + } + return m, err +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 01e6f78f892..77188d42613 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -53,6 +53,7 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool relabelConfig []*relabel.Config + metaDownloader block.MetaDownloader } type syncerMetrics struct { @@ -145,7 +146,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config, md block.MetaDownloader) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -155,6 +156,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket consistencyDelay: consistencyDelay, blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, + metaDownloader: md, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, @@ -291,7 +293,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { level.Debug(c.logger).Log("msg", "download meta", "block", id) - meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) + meta, err := c.metaDownloader.DownloadMeta(ctx, c.logger, c.bkt, id) if err != nil { if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) @@ -356,7 +358,7 @@ func groupKey(res int64, lbls labels.Labels) string { } // Groups returns the compaction groups for all blocks currently known to the syncer. -// It creates all groups from the scratch on every call. +// It creates all groups from scratch on every call. func (c *Syncer) Groups() (res []*Group, err error) { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 7513a944e4c..810dc653d32 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { defer cancel() relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) @@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() - sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil) + sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) var ids []ulid.ULID diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 85d0f332fdc..24c1c36f23e 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/oklog/ulid" @@ -79,7 +80,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { bkt := inmem.NewBucket() relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.