Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove worker callback function that is only used in tests #11356

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ type blockDownloader struct {
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
wg sync.WaitGroup
ctx context.Context
manager *services.Manager
wg sync.WaitGroup
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
Expand All @@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
wg: sync.WaitGroup{},
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
wg: sync.WaitGroup{},
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
Expand Down Expand Up @@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
// defer first, so it gets executed as last of the deferred functions
defer d.wg.Done()

logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
defer d.wg.Done()

idx := queue.StartIndexWithLocalQueue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
Expand All @@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
Expand All @@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")

// We want all workers to be connected to the queue
require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric()))

downloader.stop()
require.Equal(t, int32(workersCount), stoppedWorkersCount.Load())

// We want all workers to be disconnected from the queue
require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric()))
}

// creates fake blocks and returns map[block-path]Block and mockBlockClient
Expand Down
Loading