diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 0e523817dc70a..c92d6fad30f73 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -13,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/querier/plan" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/spanlogger" @@ -61,19 +63,26 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st } } +type QuerierConfig struct { + // MinTableOffset is derived from the compactor's MinTableOffset + MinTableOffset int +} + // BloomQuerier is a store-level abstraction on top of Client // It is used by the index gateway to filter ChunkRefs based on given line fiter expression. type BloomQuerier struct { c Client + cfg QuerierConfig logger log.Logger metrics *querierMetrics limits Limits blockResolver BlockResolver } -func NewQuerier(c Client, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier { +func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier { return &BloomQuerier{ c: c, + cfg: cfg, logger: logger, metrics: newQuerierMetrics(r, constants.Loki, querierMetricsSubsystem), limits: limits, @@ -101,6 +110,33 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from preFilterChunks := len(chunkRefs) preFilterSeries := len(grouped) + // Do not attempt to filter chunks for which there are no blooms + if bq.cfg.MinTableOffset > 0 { + minAge := truncateDay(model.Now()).Add(-1 * config.ObjectStorageIndexRequiredPeriod * time.Duration(bq.cfg.MinTableOffset-1)) + if through.After(minAge) { + level.Debug(logger).Log( + "msg", "skip too recent chunks", + "tenant", tenant, + "from", from.Time(), + "through", through.Time(), + "responses", 0, + "preFilterChunks", preFilterChunks, + "postFilterChunks", preFilterChunks, + "filteredChunks", 0, + "preFilterSeries", preFilterSeries, + "postFilterSeries", preFilterSeries, + "filteredSeries", 0, + ) + + bq.metrics.chunksTotal.Add(float64(preFilterChunks)) + bq.metrics.chunksFiltered.Add(0) + bq.metrics.seriesTotal.Add(float64(preFilterSeries)) + bq.metrics.seriesFiltered.Add(0) + + return chunkRefs, nil + } + } + responses := make([][]*logproto.GroupedChunkRefs, 0, 2) // We can perform requests sequentially, because most of the time the request // only covers a single day, and if not, it's at most two days. @@ -153,7 +189,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from "preFilterSeries", preFilterSeries, "postFilterSeries", postFilterSeries, "filteredSeries", preFilterSeries-postFilterSeries, - "operation", "bloomquerier.FilterChunkRefs", ) bq.metrics.chunksTotal.Add(float64(preFilterChunks)) diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index 516f1cd403bb3..d4b24447ae124 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -61,12 +61,13 @@ var _ BlockResolver = &mockBlockResolver{} func TestBloomQuerier(t *testing.T) { logger := log.NewNopLogger() limits := newLimits() + cfg := QuerierConfig{} resolver := &mockBlockResolver{} tenant := "fake" t.Run("client not called when filters are empty", func(t *testing.T) { c := &noopClient{} - bq := NewQuerier(c, limits, resolver, nil, logger) + bq := NewQuerier(c, cfg, limits, resolver, nil, logger) ctx := context.Background() through := model.Now() @@ -86,7 +87,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("client not called when chunkRefs are empty", func(t *testing.T) { c := &noopClient{} - bq := NewQuerier(c, limits, resolver, nil, logger) + bq := NewQuerier(c, cfg, limits, resolver, nil, logger) ctx := context.Background() through := model.Now() @@ -102,7 +103,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("querier propagates error from client", func(t *testing.T) { c := &noopClient{err: errors.New("something went wrong")} - bq := NewQuerier(c, limits, resolver, nil, logger) + bq := NewQuerier(c, cfg, limits, resolver, nil, logger) ctx := context.Background() through := model.Now() @@ -121,7 +122,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("client called once for each day of the interval", func(t *testing.T) { c := &noopClient{} - bq := NewQuerier(c, limits, resolver, nil, logger) + bq := NewQuerier(c, cfg, limits, resolver, nil, logger) ctx := context.Background() from := mktime("2024-04-16 22:00") diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 458d7c9e3f5c8..0854f4bf42a02 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1445,7 +1445,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) { return nil, err } resolver := bloomgateway.NewBlockResolver(t.BloomStore, logger) - bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, t.Overrides, resolver, prometheus.DefaultRegisterer, logger) + querierCfg := bloomgateway.QuerierConfig{ + MinTableOffset: t.Cfg.BloomCompactor.MinTableOffset, + } + bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, querierCfg, t.Overrides, resolver, prometheus.DefaultRegisterer, logger) } gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, t.Overrides, logger, prometheus.DefaultRegisterer, t.Store, indexClients, bloomQuerier)