Skip to content

Commit

Permalink
compactor: adds tunable to control upload parallelism (#6817)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This can help with compaction latency. I patched the compactor to set this to 100.

**Which issue(s) this PR fixes**:
Helps with #6775

**Checklist**
- [x] Documentation added
  • Loading branch information
afayngelerindbx authored Aug 26, 2022
1 parent 11caa49 commit 4b5b165
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
9 changes: 8 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,14 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]
# Number of upload/remove operations to execute in parallel when finalizing a compaction.
# CLI flag: -boltdb.shipper.compactor.upload-parallelism
#
# NOTE: This setting is per compaction operation, which can be
# executed in parallel. The upper bound on the number of concurrent
# uploads is upload_parallelism * max_compaction_parallelism
[upload_parallelism: <int> | default = 10]
# (Experimental) Activate custom (per-stream,per-tenant) retention.
# CLI flag: -boltdb.shipper.compactor.retention-enabled
[retention_enabled: <boolean> | default = false]
Expand Down Expand Up @@ -2200,7 +2208,6 @@ compacts index shards to more performant forms.
# The hash ring configuration used by compactors to elect a single instance for running compactions
# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring
[compactor_ring: <ring>]
```

## limits_config
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/indexshipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
RunOnce bool `yaml:"_"`
TablesToCompact int `yaml:"tables_to_compact"`
Expand All @@ -109,6 +110,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DeleteMaxInterval, "boltdb.shipper.compactor.delete-max-interval", 0, "Constrain the size of any single delete request. When a delete request > delete_max_query_range is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.RetentionTableTimeout, "boltdb.shipper.compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.IntVar(&cfg.UploadParallelism, "boltdb.shipper.compactor.upload-parallelism", 10, "Number of upload/remove operations to execute in parallel when finalizing a compaction. ")
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")

// Deprecated
Expand Down Expand Up @@ -505,7 +507,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRet
}

table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, indexCompactor,
schemaCfg, c.tableMarker, c.expirationChecker)
schemaCfg, c.tableMarker, c.expirationChecker, c.cfg.UploadParallelism)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
return err
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/stores/indexshipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
)

const (
uploadIndexSetsConcurrency = 10
gzipExtension = ".gz"
gzipExtension = ".gz"
)

var errRetentionFileCountNotOne = fmt.Errorf("can't apply retention when index file count is not one")
Expand Down Expand Up @@ -71,6 +70,7 @@ type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)
type table struct {
name string
workingDirectory string
uploadConcurrency int
indexStorageClient storage.Client
indexCompactor IndexCompactor
tableMarker retention.TableMarker
Expand All @@ -89,6 +89,7 @@ type table struct {
func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client,
indexCompactor IndexCompactor, periodConfig config.PeriodConfig,
tableMarker retention.TableMarker, expirationChecker tableExpirationChecker,
uploadConcurrency int,
) (*table, error) {
err := chunk_util.EnsureDirectory(workingDirectory)
if err != nil {
Expand All @@ -107,6 +108,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s
indexSets: map[string]*indexSet{},
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
uploadConcurrency: uploadConcurrency,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)

Expand Down Expand Up @@ -195,7 +197,7 @@ func (t *table) done() error {
userIDs = append(userIDs, userID)
}

err := concurrency.ForEachJob(t.ctx, len(userIDs), uploadIndexSetsConcurrency, func(ctx context.Context, idx int) error {
err := concurrency.ForEachJob(t.ctx, len(userIDs), t.uploadConcurrency, func(ctx context.Context, idx int) error {
return t.indexSets[userIDs[idx]].done()
})
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/stores/indexshipper/compactor/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type indexSetState struct {
}

func TestTable_Compaction(t *testing.T) {
for _, numUsers := range []int{uploadIndexSetsConcurrency / 2, uploadIndexSetsConcurrency, uploadIndexSetsConcurrency * 2} {
// user counts are aligned with default upload parallelism
for _, numUsers := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("numUsers=%d", numUsers), func(t *testing.T) {
for _, tc := range []struct {
numUnCompactedCommonDBs int
Expand Down Expand Up @@ -199,7 +200,7 @@ func TestTable_Compaction(t *testing.T) {
require.NoError(t, err)

table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil)
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)

require.NoError(t, table.compact(false))
Expand Down Expand Up @@ -236,7 +237,7 @@ func TestTable_Compaction(t *testing.T) {

// running compaction again should not do anything.
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil)
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)

require.NoError(t, table.compact(false))
Expand Down Expand Up @@ -379,7 +380,7 @@ func TestTable_CompactionRetention(t *testing.T) {
newTestIndexCompactor(), config.PeriodConfig{},
tt.tableMarker, IntervalMayHaveExpiredChunksFunc(func(interval model.Interval, userID string) bool {
return true
}))
}), 10)
require.NoError(t, err)

require.NoError(t, table.compact(true))
Expand Down Expand Up @@ -452,7 +453,7 @@ func TestTable_CompactionFailure(t *testing.T) {
require.NoError(t, err)

table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil)
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)

// compaction should fail due to a non-boltdb file.
Expand All @@ -470,7 +471,7 @@ func TestTable_CompactionFailure(t *testing.T) {
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.gz")))

table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil)
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact(false))

Expand Down

0 comments on commit 4b5b165

Please sign in to comment.