Skip to content

Commit

Permalink
Remove worker callback function that is only used in tests (#11356)
Browse files Browse the repository at this point in the history
The queue already exposes the functionality to track connected consumers
(workers), so there is no need to have callback that is only used in
tests.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored and pull[bot] committed Feb 6, 2024
1 parent d8038cb commit 3271363
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
37 changes: 16 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ type blockDownloader struct {
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
wg sync.WaitGroup
ctx context.Context
manager *services.Manager
wg sync.WaitGroup
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
Expand All @@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
wg: sync.WaitGroup{},
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
wg: sync.WaitGroup{},
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
Expand Down Expand Up @@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
// defer first, so it gets executed as last of the deferred functions
defer d.wg.Done()

logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
defer d.wg.Done()

idx := queue.StartIndexWithLocalQueue

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
Expand All @@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
Expand All @@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")

// We want all workers to be connected to the queue
require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric()))

downloader.stop()
require.Equal(t, int32(workersCount), stoppedWorkersCount.Load())

// We want all workers to be disconnected from the queue
require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric()))
}

// creates fake blocks and returns map[block-path]Block and mockBlockClient
Expand Down

0 comments on commit 3271363

Please sign in to comment.