Skip to content

Commit

Permalink
Remove worker callback function that is only used in tests
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 1, 2023
1 parent cd3a04d commit fec2f2c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
37 changes: 16 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ type blockDownloader struct {
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
wg sync.WaitGroup
ctx context.Context
manager *services.Manager
wg sync.WaitGroup
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
Expand All @@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
wg: sync.WaitGroup{},
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
wg: sync.WaitGroup{},
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
Expand Down Expand Up @@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
// defer first, so it gets executed as last of the deferred functions
defer d.wg.Done()

logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
defer d.wg.Done()

idx := queue.StartIndexWithLocalQueue

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
Expand All @@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
Expand All @@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")

// We want all workers to be connected to the queue
require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric()))

downloader.stop()
require.Equal(t, int32(workersCount), stoppedWorkersCount.Load())

// We want all workers to be disconnected from the queue
require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric()))
}

// creates fake blocks and returns map[block-path]Block and mockBlockClient
Expand Down

0 comments on commit fec2f2c

Please sign in to comment.