diff --git a/pkg/bloombuild/planner/plannertest/utils.go b/pkg/bloombuild/planner/plannertest/utils.go index f0c8f0ec70362..706e0abdf00a7 100644 --- a/pkg/bloombuild/planner/plannertest/utils.go +++ b/pkg/bloombuild/planner/plannertest/utils.go @@ -88,8 +88,12 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { } func GenSeries(bounds v1.FingerprintBounds) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) - for i := bounds.Min; i <= bounds.Max; i++ { + return GenSeriesWithStep(bounds, 1) +} + +func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series { + series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step) + for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) { series = append(series, &v1.Series{ Fingerprint: i, Chunks: v1.ChunkRefs{ diff --git a/pkg/bloombuild/planner/strategies/chunksize_test.go b/pkg/bloombuild/planner/strategies/chunksize_test.go index 9f46b95137414..3f023c9853fca 100644 --- a/pkg/bloombuild/planner/strategies/chunksize_test.go +++ b/pkg/bloombuild/planner/strategies/chunksize_test.go @@ -2,22 +2,22 @@ package strategies import ( "context" - "github.com/grafana/loki/v3/pkg/bloombuild/planner/test_utils" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "testing" "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" "github.com/grafana/loki/v3/pkg/bloombuild/protos" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" ) func taskForGap(bounds v1.FingerprintBounds) *protos.Task { - return protos.NewTask(test_utils.TestTable, "fake", bounds, test_utils.TsdbID(0), []protos.Gap{ + return protos.NewTask(plannertest.TestTable, "fake", bounds, plannertest.TsdbID(0), []protos.Gap{ { Bounds: bounds, - Series: test_utils.GenSeriesWithStep(bounds, 10), + Series: plannertest.GenSeriesWithStep(bounds, 10), Blocks: nil, }, }) @@ -37,7 +37,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { // Each series will have 1 chunk of 100KB each tsdbs: TSDBSet{ - test_utils.TsdbID(0): newFakeForSeries(test_utils.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 5 tasks, each with 2 series each @@ -56,30 +56,30 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { // Original metas cover the entire range // One meta for each 2 series w/ 1 block per series originalMetas: []bloomshipper.Meta{ - test_utils.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(0, 0), - test_utils.GenBlockRef(10, 10), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), }), - test_utils.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(20, 20), - test_utils.GenBlockRef(30, 30), + plannertest.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), }), - test_utils.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(40, 40), - test_utils.GenBlockRef(50, 50), + plannertest.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), }), - test_utils.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(60, 60), - test_utils.GenBlockRef(70, 70), + plannertest.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), }), - test_utils.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(80, 80), - test_utils.GenBlockRef(90, 90), + plannertest.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), }), }, tsdbs: TSDBSet{ - test_utils.TsdbID(0): newFakeForSeries(test_utils.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect no tasks @@ -93,30 +93,30 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { // Original metas cover the entire range // One meta for each 2 series w/ 1 block per series originalMetas: []bloomshipper.Meta{ - test_utils.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(0, 0), - test_utils.GenBlockRef(10, 10), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), }), - test_utils.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(20, 20), + plannertest.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 20), // Missing block for 30 }), - test_utils.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(40, 40), - test_utils.GenBlockRef(50, 50), + plannertest.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), }), - test_utils.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(60, 60), - test_utils.GenBlockRef(70, 70), + plannertest.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), }), - test_utils.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(80, 80), - test_utils.GenBlockRef(90, 90), + plannertest.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), }), }, tsdbs: TSDBSet{ - test_utils.TsdbID(0): newFakeForSeries(test_utils.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 1 tasks for the missing series @@ -132,27 +132,27 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { // Original metas cover the entire range // One meta for each 2 series w/ 1 block per series originalMetas: []bloomshipper.Meta{ - test_utils.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(0, 0), - test_utils.GenBlockRef(10, 10), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), }), // Missing meta for 20-30 - test_utils.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(40, 40), - test_utils.GenBlockRef(50, 50), + plannertest.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), }), - test_utils.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(60, 60), - test_utils.GenBlockRef(70, 70), + plannertest.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), }), - test_utils.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ - test_utils.GenBlockRef(80, 80), - test_utils.GenBlockRef(90, 90), + plannertest.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), }), }, tsdbs: TSDBSet{ - test_utils.TsdbID(0): newFakeForSeries(test_utils.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 1 tasks for the missing series @@ -168,7 +168,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { strategy, err := NewChunkSizeStrategy(tc.limits, logger) require.NoError(t, err) - actual, err := strategy.Plan(context.Background(), test_utils.TestTable, "fake", tc.tsdbs, tc.originalMetas) + actual, err := strategy.Plan(context.Background(), plannertest.TestTable, "fake", tc.tsdbs, tc.originalMetas) require.NoError(t, err) require.ElementsMatch(t, tc.expectedTasks, actual) diff --git a/pkg/bloombuild/planner/test_utils/utils.go b/pkg/bloombuild/planner/test_utils/utils.go deleted file mode 100644 index 2b759a4c93f3a..0000000000000 --- a/pkg/bloombuild/planner/test_utils/utils.go +++ /dev/null @@ -1,156 +0,0 @@ -package test_utils - -import ( - "bytes" - "context" - "time" - - "github.com/grafana/loki/v3/pkg/bloombuild/planner" - "github.com/grafana/loki/v3/pkg/bloombuild/protos" - "github.com/grafana/loki/v3/pkg/compression" - "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/storage/bloom/v1" - "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" - "github.com/prometheus/common/model" -) - -var TestDay = ParseDayTime("2023-09-01") -var TestTable = config.NewDayTable(TestDay, "index_") - -func TsdbID(n int) tsdb.SingleTenantTSDBIdentifier { - return tsdb.SingleTenantTSDBIdentifier{ - TS: time.Unix(int64(n), 0), - } -} - -func GenMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { - m := bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: TestTable.Addr(), - Bounds: v1.NewBounds(min, max), - }, - }, - Blocks: blocks, - } - for _, source := range sources { - m.Sources = append(m.Sources, TsdbID(source)) - } - return m -} - -func GenBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { - startTS, endTS := TestDay.Bounds() - return bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: TestTable.Addr(), - Bounds: v1.NewBounds(min, max), - StartTimestamp: startTS, - EndTimestamp: endTS, - Checksum: 0, - }, - } -} - -func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) - reader := v1.NewByteReader(indexBuf, bloomsBuf) - - blockOpts := v1.NewBlockOptions(compression.None, 0, 0) - - builder, err := v1.NewBlockBuilder(blockOpts, writer) - if err != nil { - return bloomshipper.Block{}, err - } - - if _, err = builder.BuildFrom(v2.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil { - return bloomshipper.Block{}, err - } - - block := v1.NewBlock(reader, v1.NewMetrics(nil)) - - buf := bytes.NewBuffer(nil) - if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil { - return bloomshipper.Block{}, err - } - - tarReader := bytes.NewReader(buf.Bytes()) - - return bloomshipper.Block{ - BlockRef: ref, - Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader}, - }, nil -} - -func GenSeries(bounds v1.FingerprintBounds) []*v1.Series { - return GenSeriesWithStep(bounds, 1) -} - -func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step) - for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) { - series = append(series, &v1.Series{ - Fingerprint: i, - Chunks: v1.ChunkRefs{ - { - From: 0, - Through: 1, - Checksum: 1, - }, - }, - }) - } - return series -} - -func CreateTasks(n int, resultsCh chan *protos.TaskResult) []*planner.QueueTask { - tasks := make([]*planner.QueueTask, 0, n) - // Enqueue tasks - for i := 0; i < n; i++ { - task := planner.NewQueueTask( - context.Background(), time.Now(), - protos.NewTask(config.NewDayTable(TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), TsdbID(1), nil), - resultsCh, - ) - tasks = append(tasks, task) - } - return tasks -} - -func PutMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error { - for _, meta := range metas { - err := bloomClient.PutMeta(context.Background(), meta) - if err != nil { - return err - } - - for _, block := range meta.Blocks { - writtenBlock, err := GenBlock(block) - if err != nil { - return err - } - - err = bloomClient.PutBlock(context.Background(), writtenBlock) - if err != nil { - return err - } - } - } - return nil -} - -func ParseDayTime(s string) config.DayTime { - t, err := time.Parse("2006-01-02", s) - if err != nil { - panic(err) - } - return config.DayTime{ - Time: model.TimeFromUnix(t.Unix()), - } -}