diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go index fc9667fa28e0..26c9c4b3f295 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go @@ -34,10 +34,9 @@ type blockDownloader struct { limits Limits activeUsersService *util.ActiveUsersCleanupService - ctx context.Context - manager *services.Manager - onWorkerStopCallback func() - wg sync.WaitGroup + ctx context.Context + manager *services.Manager + wg sync.WaitGroup } func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) { @@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li } b := &blockDownloader{ - ctx: ctx, - logger: logger, - workingDirectory: config.WorkingDirectory, - queueMetrics: queueMetrics, - queue: downloadingQueue, - blockClient: blockClient, - activeUsersService: activeUsersService, - limits: limits, - manager: manager, - onWorkerStopCallback: onWorkerStopNoopCallback, - wg: sync.WaitGroup{}, + ctx: ctx, + logger: logger, + workingDirectory: config.WorkingDirectory, + queueMetrics: queueMetrics, + queue: downloadingQueue, + blockClient: blockClient, + activeUsersService: activeUsersService, + limits: limits, + manager: manager, + wg: sync.WaitGroup{}, } for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ { @@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b } } -// noop implementation -var onWorkerStopNoopCallback = func() {} - func (d *blockDownloader) serveDownloadingTasks(workerID string) { + // defer first, so it gets executed as last of the deferred functions + defer d.wg.Done() + logger := log.With(d.logger, "worker", workerID) level.Debug(logger).Log("msg", "starting worker") d.queue.RegisterConsumerConnection(workerID) defer d.queue.UnregisterConsumerConnection(workerID) - //this callback is used only in the tests to assert that worker is stopped - defer d.onWorkerStopCallback() - defer d.wg.Done() idx := queue.StartIndexWithLocalQueue diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go index aa6f7249b8ee..9eb7bc0e66c0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go @@ -15,7 +15,6 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "go.uber.org/atomic" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" @@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) { MaxTasksEnqueuedPerTenant: 20, }, }, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer) - stoppedWorkersCount := atomic.NewInt32(0) - downloader.onWorkerStopCallback = func() { - stoppedWorkersCount.Inc() - } require.NoError(t, err) blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) downloadedBlocks := make(map[string]any, len(blockReferences)) @@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) { } require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") + // We want all workers to be connected to the queue + require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric())) + downloader.stop() - require.Equal(t, int32(workersCount), stoppedWorkersCount.Load()) + + // We want all workers to be disconnected from the queue + require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric())) } // creates fake blocks and returns map[block-path]Block and mockBlockClient