From c08c114c4dc155d44e1c79efe4de32d5ddae7330 Mon Sep 17 00:00:00 2001 From: honganan Date: Tue, 4 Jun 2024 17:39:26 +0800 Subject: [PATCH] fix duplicate enqueue items problem in bloom download queue when do sync download --- .../stores/shipper/bloomshipper/fetcher.go | 1 + .../shipper/bloomshipper/fetcher_test.go | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 69715158950e0..c2a2939a805b3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -502,6 +502,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R] func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) { if !t.async { q.queue <- t + return } // for async task we attempt to dedupe task already in progress. q.enqueuedMutex.Lock() diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 847c5f69b6a29..fb802fd63b9a5 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -267,6 +267,59 @@ func TestFetcher_DownloadQueue(t *testing.T) { } }) + + t.Run("download multiple items and return in order", func(t *testing.T) { + ctx := context.Background() + + q, err := newDownloadQueue[bool, bool]( + 100, + 1, + func(_ context.Context, r downloadRequest[bool, bool]) { + r.results <- downloadResponse[bool]{ + key: r.key, + idx: r.idx, + item: true, + } + }, + log.NewNopLogger(), + ) + require.NoError(t, err) + + count := 10 + resultsCh := make(chan downloadResponse[bool], count) + errorsCh := make(chan error, count) + + reqs := buildDownloadRequest(ctx, count, resultsCh, errorsCh) + for _, r := range reqs { + q.enqueue(r) + } + + for i := 0; i < count; i++ { + select { + case err := <-errorsCh: + require.False(t, true, "got %+v should have received a response instead", err) + case res := <-resultsCh: + require.True(t, res.item) + require.Equal(t, reqs[i].key, res.key) + require.Equal(t, reqs[i].idx, res.idx) + } + } + }) +} + +func buildDownloadRequest(ctx context.Context, count int, resCh chan downloadResponse[bool], errCh chan error) []downloadRequest[bool, bool] { + requests := make([]downloadRequest[bool, bool], count) + for i := 0; i < count; i++ { + requests[i] = downloadRequest[bool, bool]{ + ctx: ctx, + item: false, + key: "test", + idx: i, + results: resCh, + errors: errCh, + } + } + return requests } func TestFetcher_LoadBlocksFromFS(t *testing.T) {