From 61b60092af67dad4f7439b3b92f71fbdc6990181 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 31 May 2021 11:16:41 +0530 Subject: [PATCH] initialize retention and deletion components only when they are enabled (#3772) --- .../stores/shipper/compactor/compactor.go | 85 +++++++++++-------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index da9ea46d651a..0d7f6760c2c1 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -81,59 +81,72 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st return nil, errors.New("Must specify compactor config") } - objectClient, err := storage.NewObjectClient(cfg.SharedStoreType, storageConfig) - if err != nil { - return nil, err + compactor := &Compactor{ + cfg: cfg, } - err = chunk_util.EnsureDirectory(cfg.WorkingDirectory) - if err != nil { + if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil { return nil, err } - prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix) - - var encoder objectclient.KeyEncoder - if _, ok := objectClient.(*local.FSObjectClient); ok { - encoder = objectclient.Base64Encoder - } - chunkClient := objectclient.NewClient(objectClient, encoder) + compactor.Service = services.NewBasicService(nil, compactor.loop, nil) + return compactor, nil +} - retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention") - sweeper, err := retention.NewSweeper(retentionWorkDir, chunkClient, cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r) +func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error { + objectClient, err := storage.NewObjectClient(c.cfg.SharedStoreType, storageConfig) if err != nil { - return nil, err + return err } - deletionWorkDir := filepath.Join(cfg.WorkingDirectory, "deletion") - - deletesStore, err := deletion.NewDeleteStore(deletionWorkDir, prefixedClient) + err = chunk_util.EnsureDirectory(c.cfg.WorkingDirectory) if err != nil { - return nil, err + return err } + c.objectClient = util.NewPrefixedObjectClient(objectClient, c.cfg.SharedStoreKeyPrefix) + c.metrics = newMetrics(r) - compactor := &Compactor{ - cfg: cfg, - objectClient: prefixedClient, - metrics: newMetrics(r), - sweeper: sweeper, - deleteRequestsStore: deletesStore, - DeleteRequestsHandler: deletion.NewDeleteRequestHandler(deletesStore, time.Hour, r), - deleteRequestsManager: deletion.NewDeleteRequestsManager(deletesStore, cfg.DeleteRequestCancelPeriod, r), - } + if c.cfg.RetentionEnabled { + var encoder objectclient.KeyEncoder + if _, ok := objectClient.(*local.FSObjectClient); ok { + encoder = objectclient.Base64Encoder + } - expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), compactor.deleteRequestsManager) + chunkClient := objectclient.NewClient(objectClient, encoder) - marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r) - if err != nil { - return nil, err + retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention") + c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r) + if err != nil { + return err + } + + deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") + + c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.objectClient) + if err != nil { + return err + } + + c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r) + + expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) + + c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r) + if err != nil { + return err + } } - compactor.tableMarker = marker - compactor.Service = services.NewBasicService(nil, compactor.loop, nil) - return compactor, nil + + return nil } func (c *Compactor) loop(ctx context.Context) error { + if c.cfg.RetentionEnabled { + defer c.deleteRequestsStore.Stop() + defer c.deleteRequestsManager.Stop() + } + runCompaction := func() { err := c.RunCompaction(ctx) if err != nil { @@ -172,8 +185,6 @@ func (c *Compactor) loop(ctx context.Context) error { } wg.Wait() - c.deleteRequestsManager.Stop() - c.deleteRequestsStore.Stop() return nil }