Skip to content

Commit

Permalink
feat tools: bucket replicate allows to filter multiple compaction and… (
Browse files Browse the repository at this point in the history
#2671)

* feat tools: bucket replicate allows to filter multiple compaction and resolution lvls

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* chore: update changelog

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* Update cmd/thanos/tools_bucket.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* Update cmd/thanos/tools_bucket.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* fix: revert of unrelated swift ENV variable fix

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* refactor tools: changed check of valid retention and compaction lvls to use map

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* fix: update docs

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

* feat tools: bucket replicate update flag defaults and input format of resolution

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
FUSAKLA and bwplotka authored Jun 8, 2020
1 parent 63bcedb commit 8c3f029
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 53 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed.
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).
- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format.
- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks.


### Added

- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags.

## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS

Expand Down
21 changes: 12 additions & 9 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,19 +414,17 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
// Provide a list of resolution, can not use Enum directly, since string does not implement int64 function.
func listResLevel() []string {
return []string{
strconv.FormatInt(downsample.ResLevel0, 10),
strconv.FormatInt(downsample.ResLevel1, 10),
strconv.FormatInt(downsample.ResLevel2, 10)}
time.Duration(downsample.ResLevel0).String(),
time.Duration(downsample.ResLevel1).String(),
time.Duration(downsample.ResLevel2).String()}
}

func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename))
httpBindAddr, httpGracePeriod := regHTTPFlags(cmd)
toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.")
// TODO(bwplotka): Allow to replicate many resolution levels.
resolution := cmd.Flag("resolution", "Only blocks with this resolution will be replicated.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64()
// TODO(bwplotka): Allow to replicate many compaction levels.
compaction := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Int()
resolutions := cmd.Flag("resolution", "Only blocks with these resolutions will be replicated. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationList()
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()

Expand All @@ -436,6 +434,11 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
return errors.Wrap(err, "parse block label matchers")
}

var resolutionLevels []compact.ResolutionLevel
for _, lvl := range *resolutions {
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds()))
}

return replicate.RunReplicate(
g,
logger,
Expand All @@ -444,8 +447,8 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
*httpBindAddr,
time.Duration(*httpGracePeriod),
matchers,
compact.ResolutionLevel(*resolution),
*compaction,
resolutionLevels,
*compactions,
objStoreConfig,
toObjStoreConfig,
*singleRun,
Expand Down
8 changes: 4 additions & 4 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ Flags:
format details:
https://thanos.io/storage.md/#configuration The
object storage which replicate data to.
--resolution=0 Only blocks with this resolution will be
replicated.
--compaction=1 Only blocks with this compaction level will be
replicated.
--resolution=0s... ... Only blocks with these resolutions will be
replicated. Repeated flag.
--compaction=1... ... Only blocks with these compaction levels will
be replicated. Repeated flag.
--matcher=key="value" ... Only blocks whose external labels exactly match
this matcher will be replicated.
--single-run Run replication only one time, then exit.
Expand Down
8 changes: 4 additions & 4 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func RunReplicate(
httpBindAddr string,
httpGracePeriod time.Duration,
labelSelector labels.Selector,
resolution compact.ResolutionLevel,
compaction int,
resolutions []compact.ResolutionLevel,
compactions []int,
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
Expand Down Expand Up @@ -159,8 +159,8 @@ func RunReplicate(
blockFilter := NewBlockFilter(
logger,
labelSelector,
resolution,
compaction,
resolutions,
compactions,
).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
65 changes: 30 additions & 35 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package replicate
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"path"
Expand All @@ -27,24 +28,32 @@ import (

// BlockFilter is block filter that filters out compacted and unselected blocks.
type BlockFilter struct {
logger log.Logger
labelSelector labels.Selector
resolutionLevel compact.ResolutionLevel
compactionLevel int
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
}

// NewBlockFilter returns block filter.
func NewBlockFilter(
logger log.Logger,
labelSelector labels.Selector,
resolutionLevel compact.ResolutionLevel,
compactionLevel int,
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
allowedResolutions[resolutionLevel] = struct{}{}
}
allowedCompactions := make(map[int]struct{})
for _, compactionLevel := range compactionLevels {
allowedCompactions[compactionLevel] = struct{}{}
}
return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevel: resolutionLevel,
compactionLevel: compactionLevel,
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
}
}

Expand Down Expand Up @@ -77,20 +86,14 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
}

gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution)
expectedResolution := bf.resolutionLevel

resolutionMatch := gotResolution == expectedResolution
if !resolutionMatch {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolutions don't match", "got_resolution", gotResolution, "expected_resolution", expectedResolution)
if _, ok := bf.resolutionLevels[gotResolution]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels))
return false
}

gotCompactionLevel := b.BlockMeta.Compaction.Level
expectedCompactionLevel := bf.compactionLevel

compactionMatch := gotCompactionLevel == expectedCompactionLevel
if !compactionMatch {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction levels don't match", "got_compaction_level", gotCompactionLevel, "expected_compaction_level", expectedCompactionLevel)
if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels))
return false
}

Expand Down Expand Up @@ -211,31 +214,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return nil
}

level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String())

availableBlocks = append(availableBlocks, meta)
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

candidateBlocks := []*metadata.Meta{}

for _, b := range availableBlocks {
if rs.blockFilter(b) {
level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String())
candidateBlocks = append(candidateBlocks, b)
}
}

// In order to prevent races in compactions by the target environment, we
// need to replicate oldest start timestamp first.
sort.Slice(candidateBlocks, func(i, j int) bool {
return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime
sort.Slice(availableBlocks, func(i, j int) bool {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})

for _, b := range candidateBlocks {
for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestReplicationSchemeAll(t *testing.T) {
selector = c.selector
}

filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter
filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}).Filter
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

Expand Down

0 comments on commit 8c3f029

Please sign in to comment.