From 68b4effbd4f48b88d497eed97e2eabb75ca5fc0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Thu, 31 Oct 2019 17:51:39 +0100 Subject: [PATCH] cmd/thanos/compact: add bucket UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit enhances the compact component so that it runs the bucket UI whenever the --wait flag is also passed. In order to reduce the overhead of running the UI in addition to the compactor, this commit also introduces an abstraction around downloading block meta files allowing the metadata to be downloaded once and cached. This ensures that the compactor does not unnecessarily download every metadata file twice. Signed-off-by: Lucas Servén Marín --- CHANGELOG.md | 1 + cmd/thanos/bucket.go | 37 +++++++++++++-------------- cmd/thanos/compact.go | 28 ++++++++++++++++++-- docs/components/compact.md | 3 +++ pkg/block/block.go | 45 +++++++++++++++++++++++++++++++++ pkg/compact/compact.go | 8 +++--- pkg/compact/compact_e2e_test.go | 8 +++--- pkg/compact/compact_test.go | 3 ++- 8 files changed, 104 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8945f3425a8..098a22d186e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information. - [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down. - [#1712](https://github.com/thanos-io/thanos/pull/1712) Rename flag on bucket web component from `--listen` to `--http-address` to match other components. +- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. ### Fixed diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 3a7646501c9..b1bb34ce0ab 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -316,8 +316,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str label := cmd.Flag("label", "Prometheus label to use as timeline title").String() m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { - ctx, cancel := context.WithCancel(context.Background()) - statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. srv := httpserver.New(logger, reg, component.Bucket, statusProber, @@ -342,8 +340,20 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout") } + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String()) + if err != nil { + return errors.Wrap(err, "bucket client") + } + + ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig) + defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + return refresh(ctx, logger, bucketUI, *interval, *timeout, bkt, block.MetaDownloaderFn(block.DownloadMeta)) }, func(error) { cancel() }) @@ -354,25 +364,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str } } -// refresh metadata from remote storage periodically and update UI. -func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error { - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - - bkt, err := client.NewBucket(logger, confContentYaml, reg, name) - if err != nil { - return errors.Wrap(err, "bucket client") - } - - defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") +// refresh metadata from remote storage periodically and update the UI. +func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, bkt objstore.Bucket, metaDownloader block.MetaDownloader) error { return runutil.Repeat(duration, ctx.Done(), func() error { return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error { iterCtx, iterCancel := context.WithTimeout(ctx, timeout) defer iterCancel() - blocks, err := download(iterCtx, logger, bkt) + blocks, err := download(iterCtx, logger, bkt, metaDownloader) if err != nil { bucketUI.Set("[]", err) return err @@ -389,7 +388,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati }) } -func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (blocks []metadata.Meta, err error) { +func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, metaDownloader block.MetaDownloader) (blocks []metadata.Meta, err error) { level.Info(logger).Log("msg", "synchronizing block metadata") if err = bkt.Iter(ctx, "", func(name string) error { @@ -398,7 +397,7 @@ func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (bloc return nil } - meta, err := block.DownloadMeta(ctx, logger, bkt, id) + meta, err := metaDownloader.DownloadMeta(ctx, logger, bkt, id) if err != nil { return err } diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ba099949131..8f263065a80 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -16,6 +16,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/route" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -23,11 +24,13 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/ui" "gopkg.in/alecthomas/kingpin.v2" ) @@ -113,6 +116,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) + label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String() + m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -135,6 +140,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { *blockSyncConcurrency, *compactionConcurrency, selectorRelabelConf, + *label, ) } } @@ -159,6 +165,7 @@ func runCompact( blockSyncConcurrency int, concurrency int, selectorRelabelConf *extflag.PathOrContent, + label string, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -211,8 +218,9 @@ func runCompact( } }() + metaDownloader := block.NewCachingMetaDownloader(block.MetaDownloaderFn(block.DownloadMeta)) sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, - blockSyncConcurrency, acceptMalformedIndex, relabelConfig) + blockSyncConcurrency, acceptMalformedIndex, relabelConfig, metaDownloader) if err != nil { return errors.Wrap(err, "create syncer") } @@ -295,6 +303,9 @@ func runCompact( return nil } + // Compaction and bucket refresh interval. + interval := 5 * time.Minute + g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") @@ -310,7 +321,7 @@ func runCompact( } // --wait=true is specified. - return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { + return runutil.Repeat(interval, ctx.Done(), func() error { err := f() if err == nil { return nil @@ -343,6 +354,19 @@ func runCompact( cancel() }) + if wait { + router := route.New() + bucketUI := ui.NewBucketUI(logger, label) + bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg)) + srv.Handle("/", router) + + g.Add(func() error { + return refresh(ctx, logger, bucketUI, interval, time.Minute, bkt, metaDownloader) + }, func(error) { + cancel() + }) + } + level.Info(logger).Log("msg", "starting compact node") statusProber.SetReady() return nil diff --git a/docs/components/compact.md b/docs/components/compact.md index 4a710a6773b..b6b0c9e8504 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -135,5 +135,8 @@ Flags: selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + --bucket-web-label=BUCKET-WEB-LABEL + Prometheus label to use as timeline title in the + bucket web UI ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 4a0115ad218..8d2275cda48 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "strings" + "sync" "github.com/go-kit/kit/log/level" @@ -188,3 +189,47 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } + +// MetaDownloader abstracts anything that can download block metas. +type MetaDownloader interface { + DownloadMeta(context.Context, log.Logger, objstore.Bucket, ulid.ULID) (metadata.Meta, error) +} + +// MetaDownloaderFn turns any func that downloads metas into a MetaDownloader. +type MetaDownloaderFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) + +// DownloadMeta implements the MetaDownloader interface. +func (m MetaDownloaderFn) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { + return m(ctx, logger, bkt, id) +} + +// CachingMetaDownloader is a MetaDownloader that can cache metas. +type CachingMetaDownloader struct { + sync.Mutex + next MetaDownloader + metas map[ulid.ULID]metadata.Meta +} + +// NewCachingMetaDownloader creates a new MetaDownloader that can cache metas +// and uses the given MetaDownloader when something is not in the cache. +func NewCachingMetaDownloader(next MetaDownloader) MetaDownloader { + return &CachingMetaDownloader{ + next: next, + metas: make(map[ulid.ULID]metadata.Meta), + } +} + +// DownloadMeta implements the MetaDownloader interface. +func (c *CachingMetaDownloader) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) { + c.Lock() + defer c.Unlock() + m, ok := c.metas[id] + if ok { + return m, nil + } + m, err := c.next.DownloadMeta(ctx, logger, bkt, id) + if err == nil { + c.metas[id] = m + } + return m, err +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 01e6f78f892..77188d42613 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -53,6 +53,7 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool relabelConfig []*relabel.Config + metaDownloader block.MetaDownloader } type syncerMetrics struct { @@ -145,7 +146,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config, md block.MetaDownloader) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -155,6 +156,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket consistencyDelay: consistencyDelay, blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, + metaDownloader: md, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, @@ -291,7 +293,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { level.Debug(c.logger).Log("msg", "download meta", "block", id) - meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) + meta, err := c.metaDownloader.DownloadMeta(ctx, c.logger, c.bkt, id) if err != nil { if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) @@ -356,7 +358,7 @@ func groupKey(res int64, lbls labels.Labels) string { } // Groups returns the compaction groups for all blocks currently known to the syncer. -// It creates all groups from the scratch on every call. +// It creates all groups from scratch on every call. func (c *Syncer) Groups() (res []*Group, err error) { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 7513a944e4c..810dc653d32 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { defer cancel() relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) @@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() - sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil) + sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) var ids []ulid.ULID diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 85d0f332fdc..24c1c36f23e 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/oklog/ulid" @@ -79,7 +80,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { bkt := inmem.NewBucket() relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig) + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta)) testutil.Ok(t, err) // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.