Skip to content

Commit

Permalink
compact: Added support for no-compact markers in planner.
Browse files Browse the repository at this point in the history
The planner algo was adapted to avoid unnecessary changes to
blocks caused by excluded blocks, so we can quickly switch to
different planning logic in next iteration.

Fixes: #1424

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 4, 2020
1 parent cb727ab commit 8cf0af5
Show file tree
Hide file tree
Showing 9 changed files with 583 additions and 192 deletions.
4 changes: 3 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func runCompact(
component,
)
api := blocksAPI.NewBlocksAPI(logger, conf.label, flagsMap)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
Expand All @@ -229,6 +230,7 @@ func runCompact(
block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
Expand Down Expand Up @@ -280,7 +282,7 @@ func runCompact(

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down
27 changes: 16 additions & 11 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down Expand Up @@ -111,6 +114,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
Expand Down Expand Up @@ -782,19 +786,20 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String())
if err == metadata.ErrorDeletionMarkNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
if err != nil {
m := &metadata.DeletionMark{}
if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
return err
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {

f.deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
Expand Down
76 changes: 0 additions & 76 deletions pkg/block/metadata/deletionmark.go

This file was deleted.

81 changes: 0 additions & 81 deletions pkg/block/metadata/deletionmark_test.go

This file was deleted.

115 changes: 115 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"
"encoding/json"
"io/ioutil"
"path"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

const (
// DeletionMarkFilename is the known json filename for optional file storing details about when block is marked for deletion.
// If such file is present in block dir, it means the block is meant to be deleted after certain delay.
DeletionMarkFilename = "deletion-mark.json"
// NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction.
// If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions).
NoCompactMarkFilename = "no-compact-mark.json"

// DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos.
DeletionMarkVersion1 = 1
// NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos.
NoCompactMarkVersion1 = 1
)

var (
// ErrorMarkerNotFound is the error when marker file is not found.
ErrorMarkerNotFound = errors.New("marker not found")
// ErrorUnmarshalMarker is the error when unmarshalling marker JSON file.
// This error can occur because marker has been partially uploaded to block storage
// or the marker file is not a valid json file.
ErrorUnmarshalMarker = errors.New("unmarshal marker JSON")
)

type Marker interface {
markerFilename() string
}

// DeletionMark stores block id and when block was marked for deletion.
type DeletionMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

// DeletionTime is a unix timestamp of when the block was marked to be deleted.
DeletionTime int64 `json:"deletion_time"`
}

func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename }

// NoCompactReason is a reason for a block to be excluded from compaction.
type NoCompactReason string

const (
// ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason.
ManualNoCompactReason NoCompactReason = "manual"
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
type NoCompactMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
}

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }

// ReadMarker reads the given mark file from <dir>/<marker filename>.json in bucket.
func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error {
markerFile := path.Join(dir, marker.markerFilename())
r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, markerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return ErrorMarkerNotFound
}
return errors.Wrapf(err, "get file: %s", markerFile)
}
defer runutil.CloseWithLogOnErr(logger, r, "close bkt marker reader")

metaContent, err := ioutil.ReadAll(r)
if err != nil {
return errors.Wrapf(err, "read file: %s", markerFile)
}

if err := json.Unmarshal(metaContent, marker); err != nil {
return errors.Wrapf(ErrorUnmarshalMarker, "file: %s; err: %v", markerFile, err.Error())
}
switch marker.markerFilename() {
case NoCompactMarkFilename:
if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 {
return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1)
}
case DeletionMarkFilename:
if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 {
return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1)
}
}
return nil
}
Loading

0 comments on commit 8cf0af5

Please sign in to comment.