Skip to content

Commit

Permalink
initialize retention and deletion components only when they are enabl…
Browse files Browse the repository at this point in the history
…ed (#3772)
  • Loading branch information
sandeepsukhani authored May 31, 2021
1 parent d765921 commit 61b6009
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,59 +81,72 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st
return nil, errors.New("Must specify compactor config")
}

objectClient, err := storage.NewObjectClient(cfg.SharedStoreType, storageConfig)
if err != nil {
return nil, err
compactor := &Compactor{
cfg: cfg,
}

err = chunk_util.EnsureDirectory(cfg.WorkingDirectory)
if err != nil {
if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil {
return nil, err
}
prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix)

var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
}

chunkClient := objectclient.NewClient(objectClient, encoder)
compactor.Service = services.NewBasicService(nil, compactor.loop, nil)
return compactor, nil
}

retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention")
sweeper, err := retention.NewSweeper(retentionWorkDir, chunkClient, cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r)
func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error {
objectClient, err := storage.NewObjectClient(c.cfg.SharedStoreType, storageConfig)
if err != nil {
return nil, err
return err
}

deletionWorkDir := filepath.Join(cfg.WorkingDirectory, "deletion")

deletesStore, err := deletion.NewDeleteStore(deletionWorkDir, prefixedClient)
err = chunk_util.EnsureDirectory(c.cfg.WorkingDirectory)
if err != nil {
return nil, err
return err
}
c.objectClient = util.NewPrefixedObjectClient(objectClient, c.cfg.SharedStoreKeyPrefix)
c.metrics = newMetrics(r)

compactor := &Compactor{
cfg: cfg,
objectClient: prefixedClient,
metrics: newMetrics(r),
sweeper: sweeper,
deleteRequestsStore: deletesStore,
DeleteRequestsHandler: deletion.NewDeleteRequestHandler(deletesStore, time.Hour, r),
deleteRequestsManager: deletion.NewDeleteRequestsManager(deletesStore, cfg.DeleteRequestCancelPeriod, r),
}
if c.cfg.RetentionEnabled {
var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
}

expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), compactor.deleteRequestsManager)
chunkClient := objectclient.NewClient(objectClient, encoder)

marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r)
if err != nil {
return nil, err
retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention")
c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
if err != nil {
return err
}

deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")

c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.objectClient)
if err != nil {
return err
}

c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)

expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)

c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r)
if err != nil {
return err
}
}
compactor.tableMarker = marker
compactor.Service = services.NewBasicService(nil, compactor.loop, nil)
return compactor, nil

return nil
}

func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RetentionEnabled {
defer c.deleteRequestsStore.Stop()
defer c.deleteRequestsManager.Stop()
}

runCompaction := func() {
err := c.RunCompaction(ctx)
if err != nil {
Expand Down Expand Up @@ -172,8 +185,6 @@ func (c *Compactor) loop(ctx context.Context) error {
}

wg.Wait()
c.deleteRequestsManager.Stop()
c.deleteRequestsStore.Stop()
return nil
}

Expand Down

0 comments on commit 61b6009

Please sign in to comment.