Skip to content

Commit

Permalink
Remove worker callback function that is only used in tests (grafana#1…
Browse files Browse the repository at this point in the history
…1356)

The queue already exposes the functionality to track connected consumers
(workers), so there is no need to have callback that is only used in
tests.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent caf2b53 commit 1a89c06
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
37 changes: 16 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ type blockDownloader struct {
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
wg sync.WaitGroup
ctx context.Context
manager *services.Manager
wg sync.WaitGroup
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
Expand All @@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
wg: sync.WaitGroup{},
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
wg: sync.WaitGroup{},
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
Expand Down Expand Up @@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
// defer first, so it gets executed as last of the deferred functions
defer d.wg.Done()

logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
defer d.wg.Done()

idx := queue.StartIndexWithLocalQueue

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
Expand All @@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
Expand All @@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")

// We want all workers to be connected to the queue
require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric()))

downloader.stop()
require.Equal(t, int32(workersCount), stoppedWorkersCount.Load())

// We want all workers to be disconnected from the queue
require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric()))
}

// creates fake blocks and returns map[block-path]Block and mockBlockClient
Expand Down

0 comments on commit 1a89c06

Please sign in to comment.