Skip to content

Commit

Permalink
improvements for boltdb-shipper compactor (#2640)
Browse files Browse the repository at this point in the history
1. Run the compaction process on startup instead of waiting for the first tick.
2. Run the compactor service when using loki with boltdb-shipper in single binary mode without clustering.
  • Loading branch information
sandeepsukhani authored Sep 18, 2020
1 parent 2f7e0e5 commit 5b361a0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
6 changes: 6 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ func (t *Loki) setupModuleManager() error {
deps[Store] = append(deps[Store], IngesterQuerier)
}

// If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring),
// we should start compactor as well for better user experience.
if storage.UsingBoltdbShipper(t.cfg.SchemaConfig) && t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
deps[All] = append(deps[All], Compactor)
}

for mod, targets := range deps {
if err := mm.AddDependency(mod, targets...); err != nil {
return err
Expand Down
31 changes: 28 additions & 3 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
)

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.")
}

type Compactor struct {
Expand Down Expand Up @@ -56,10 +58,33 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe
metrics: newMetrics(r),
}

compactor.Service = services.NewTimerService(4*time.Hour, nil, compactor.Run, nil)
compactor.Service = services.NewBasicService(nil, compactor.loop, nil)
return &compactor, nil
}

func (c *Compactor) loop(ctx context.Context) error {
runCompaction := func() {
err := c.Run(ctx)
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "failed to run compaction", "err", err)
}
}

runCompaction()

ticker := time.NewTicker(c.cfg.CompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
runCompaction()
case <-ctx.Done():
return nil
}
}
}

func (c *Compactor) Run(ctx context.Context) error {
status := statusSuccess
start := time.Now()
Expand Down

0 comments on commit 5b361a0

Please sign in to comment.