Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): bloom integration in query planning #12208

Merged
merged 72 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
b831d07
streaming GetShards on idx-gw + some refactoring
owen-d Mar 14, 2024
91ee79e
moves FingerprintBounds to logproto pkg to avoid circular imports
owen-d Mar 14, 2024
2db2852
proto alignment, wiring `GetShards()` through storage
owen-d Mar 14, 2024
6801fbb
tsdb index `ForSeries()` can signal a stop to iteration
owen-d Mar 14, 2024
c2200a0
ForSeries in tsdb.Index ifc
owen-d Mar 14, 2024
db1c830
pr feedback
owen-d Mar 14, 2024
693c48b
removes pointers in protos
owen-d Mar 14, 2024
808d88b
gateway `GetShards` impl
owen-d Mar 14, 2024
391093f
[wip] integrating bound ranges into `GetShards()`
owen-d Mar 14, 2024
42dccdd
generic result accumulator
owen-d Mar 15, 2024
cc105e0
indexclient getshards impl
owen-d Mar 15, 2024
b355b9f
comment
owen-d Mar 15, 2024
14e4815
thread plan+predicate
owen-d Mar 15, 2024
c42344e
[wip] HasForSeries wiring
owen-d Mar 15, 2024
3052e2f
finish HasForSeries impl, move ifc to index pkg
owen-d Mar 15, 2024
548fe30
moves HasForSeries to StatsReader
owen-d Mar 15, 2024
272e06f
new sharding pkg
owen-d Mar 15, 2024
cfb38a5
removes bounds from shardsrequest
owen-d Mar 15, 2024
8988232
HasForSeries accepts timerange + composite store impl
owen-d Mar 15, 2024
300d710
resultAccumulator doesnt run merge for single list
owen-d Mar 15, 2024
1ea82b1
sorts MultiIndex.GetChunkRefs during merge
owen-d Mar 15, 2024
6bad531
naive implementation of bloom-accelerated sharding calculation in gat…
owen-d Mar 16, 2024
09f1618
extracts & tests accumulating chunks to shards
owen-d Mar 16, 2024
6b8feac
moves common sharding logic to shared pkg, lets idx-gw fallback to St…
owen-d Mar 17, 2024
3629b7b
GatewayClient.poolDoWithStrategy to avoid cascading requests during r…
owen-d Mar 17, 2024
a6e0506
shardsrequest compat.go
owen-d Mar 18, 2024
1961eb8
shardsresonse extensions.go
owen-d Mar 18, 2024
27599b7
removes plan from ShardsRequest in favor of a query string which can …
owen-d Mar 18, 2024
fdbe1fa
queryrange+codec shardrequesting support + some cleanup to use query …
owen-d Mar 18, 2024
42fe9cf
ShardResolver.GetShardingRanges support
owen-d Mar 18, 2024
3704abc
querier, http wiring for shards request
owen-d Mar 18, 2024
8854fb4
json tags for shard protos
owen-d Mar 18, 2024
32d400d
[unrelated] fixes double-stats-requesting on quantiles
owen-d Mar 19, 2024
f8ff1fb
multiple logql shard reprs
owen-d Mar 19, 2024
8118844
logql sharding strategy
owen-d Mar 19, 2024
21a2edd
test alignment
owen-d Mar 19, 2024
b9f6121
shard parsing returns verison
owen-d Mar 19, 2024
29cb72a
integrating new shards into label injection
owen-d Mar 19, 2024
7bb381f
aligns inclusivity expectations between sharding impls for Fingerprin…
owen-d Mar 19, 2024
b5f2501
integrates shard-bounds into ingesters
owen-d Mar 19, 2024
e54db18
tsdb sharding strategy
owen-d Mar 19, 2024
48258a0
ingester fixes
owen-d Mar 19, 2024
e7b4930
fix test + docs
owen-d Mar 19, 2024
f476128
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 19, 2024
f4b4e86
fix ifc alignment
owen-d Mar 19, 2024
189b4e3
gen files
owen-d Mar 19, 2024
91277c8
linting, circular imports, default impls for mocks
owen-d Mar 20, 2024
3f14f1a
preserve old indexreaderwriter behavior
owen-d Mar 20, 2024
eda6ef8
goimports -s
owen-d Mar 20, 2024
1f22cee
legacy index reader getshards
owen-d Mar 20, 2024
f24c768
adds pre+postfilter chunkref histograms to idx-gws bloom-accelerated …
owen-d Mar 20, 2024
9049d97
corrects shard routing in middleware
owen-d Mar 20, 2024
5d0d1a0
proto response wrapping for new shards types
owen-d Mar 20, 2024
66e5998
promauto for indexgateway metrics
owen-d Mar 20, 2024
5300648
proto roundtripping + lint
owen-d Mar 20, 2024
5f82560
s/table-compaction-period/table-offset/g
owen-d Mar 21, 2024
60b305b
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 21, 2024
37edc8b
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 21, 2024
b0350ae
index statistics
owen-d Mar 22, 2024
5b70e96
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
db506cb
stats cleanup and proper propagation
owen-d Mar 22, 2024
5ca8d2f
routing stats through store
owen-d Mar 22, 2024
d43b3f7
gatewayclient propagates stats
owen-d Mar 22, 2024
52df554
logging fixes
owen-d Mar 22, 2024
ce33787
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
3e96e75
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
1e77cee
skip (unhealthy?) bloom-gws with no token range
owen-d Mar 22, 2024
fed5262
correctly merges stats in collector middleware for bloom data, some a…
owen-d Mar 22, 2024
2805339
v1 metrics wired up through read path
owen-d Mar 23, 2024
11d8b42
optional pool support for bloom pages
owen-d Mar 26, 2024
ade14a9
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 26, 2024
463dafd
test signature
owen-d Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2695,18 +2695,18 @@ ring:
# CLI flag: -bloom-compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]

# How many index periods (days) to wait before building bloom filters for a
# table. This can be used to lower cost by not re-writing data to object storage
# too frequently since recent data changes more often.
# CLI flag: -bloom-compactor.min-table-compaction-period
[min_table_compaction_period: <int> | default = 1]

# The maximum number of index periods (days) to build bloom filters for a table.
# This can be used to lower cost by not trying to compact older data which
# doesn't change. This can be optimized by aligning it with the maximum
# `reject_old_samples_max_age` setting of any tenant.
# CLI flag: -bloom-compactor.max-table-compaction-period
[max_table_compaction_period: <int> | default = 7]
# Newest day-table offset (from today, inclusive) to compact. Increase to lower
# cost by not re-writing data to object storage too frequently since recent data
# changes more often at the cost of not having blooms available as quickly.
# CLI flag: -bloom-compactor.min-table-offset
[min_table_offset: <int> | default = 1]

# Oldest day-table offset (from today, inclusive) to compact. This can be used
# to lower cost by not trying to compact older data which doesn't change. This
# can be optimized by aligning it with the maximum `reject_old_samples_max_age`
# setting of any tenant.
# CLI flag: -bloom-compactor.max-table-offset
[max_table_offset: <int> | default = 2]

# Number of workers to run in parallel for compaction.
# CLI flag: -bloom-compactor.worker-parallelism
Expand Down Expand Up @@ -2871,11 +2871,18 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.tsdb-max-query-parallelism
[tsdb_max_query_parallelism: <int> | default = 128]

# Maximum number of bytes assigned to a single sharded query. Also expressible
# in human readable forms (1GB, etc).
# Target maximum number of bytes assigned to a single sharded query. Also
# expressible in human readable forms (1GB, etc). Note: This is a _target_ and
# not an absolute limit. The actual limit can be higher, but the query planner
# will try to build shards up to this limit.
# CLI flag: -querier.tsdb-max-bytes-per-shard
[tsdb_max_bytes_per_shard: <int> | default = 600MB]

# sharding strategy to use in query planning. Suggested to use bounded once all
# nodes can recognize it.
# CLI flag: -limits.tsdb-sharding-strategy
[tsdb_sharding_strategy: <string> | default = "power_of_two"]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down
20 changes: 8 additions & 12 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ type Compactor struct {

sharding util_ring.TenantSharding

metrics *Metrics
btMetrics *v1.Metrics
metrics *Metrics
}

func New(
Expand All @@ -67,7 +66,7 @@ func New(
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
limits Limits,
store bloomshipper.Store,
store bloomshipper.StoreWithMetrics,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand All @@ -78,6 +77,7 @@ func New(
sharding: sharding,
limits: limits,
bloomStore: store,
metrics: NewMetrics(r, store.BloomMetrics()),
}

tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics, logger)
Expand All @@ -86,10 +86,6 @@ func New(
}
c.tsdbStore = tsdbStore

// initialize metrics
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer_", r))
c.metrics = NewMetrics(r, c.btMetrics)

chunkLoader := NewStoreChunkLoader(
fetcherProvider,
c.metrics,
Expand Down Expand Up @@ -258,12 +254,12 @@ func (c *Compactor) runOne(ctx context.Context) error {
func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
adjustedMin := min(c.cfg.MinTableCompactionPeriod - 1)
minCompactionPeriod := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionPeriod := time.Duration(c.cfg.MaxTableCompactionPeriod) * config.ObjectStorageIndexRequiredPeriod
adjustedMin := min(c.cfg.MinTableOffset - 1)
minCompactionDelta := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionDelta := time.Duration(c.cfg.MaxTableOffset) * config.ObjectStorageIndexRequiredPeriod

from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
from := ts.Add(-maxCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
Expand Down
25 changes: 12 additions & 13 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ type Config struct {
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableCompactionPeriod int `yaml:"min_table_compaction_period"`
MaxTableCompactionPeriod int `yaml:"max_table_compaction_period"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
Expand All @@ -40,15 +40,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.")
// TODO(owen-d): This is a confusing name. Rename it to `min_table_offset`
f.IntVar(&cfg.MinTableCompactionPeriod, "bloom-compactor.min-table-compaction-period", 1, "How many index periods (days) to wait before building bloom filters for a table. This can be used to lower cost by not re-writing data to object storage too frequently since recent data changes more often.")
f.IntVar(&cfg.MinTableOffset, "bloom-compactor.min-table-offset", 1, "Newest day-table offset (from today, inclusive) to compact. Increase to lower cost by not re-writing data to object storage too frequently since recent data changes more often at the cost of not having blooms available as quickly.")
// TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting,
// but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by
// iterating the table periods in object storage and looking for tenants within that period.
// In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also
// dynamically reloaded.
// I'm doing it the simple way for now.
// TODO(owen-d): This is a confusing name. Rename it to `max_table_offset`
f.IntVar(&cfg.MaxTableCompactionPeriod, "bloom-compactor.max-table-compaction-period", 7, "The maximum number of index periods (days) to build bloom filters for a table. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxTableOffset, "bloom-compactor.max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.")
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
Expand All @@ -67,8 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}
if cfg.Ring.ReplicationFactor != ringReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

b.curr = v1.NewBlock(reader)
b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics)
return true
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

res = append(res, v1.NewBlock(reader))
res = append(res, v1.NewBlock(reader, v1.NewMetrics(nil)))
ref := genBlockRef(data[minIdx].Series.Fingerprint, data[maxIdx-1].Series.Fingerprint)
t.Log("create block", ref)
refs = append(refs, ref)
Expand All @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b),
BlockQuerier: v1.NewBlockQuerier(b, false),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
bq := v1.NewBlockQuerier(block, false)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
)

const (
Expand Down Expand Up @@ -121,34 +122,22 @@ func (b *BloomTSDBStore) LoadTSDB(
}
}()

return NewTSDBSeriesIter(ctx, idx, bounds)
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
}

// TSDBStore is an interface for interacting with the TSDB,
// modeled off a relevant subset of the `tsdb.TSDBIndex` struct
type forSeries interface {
ForSeries(
ctx context.Context,
fpFilter index.FingerprintFilter,
from model.Time,
through model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error
}

func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)

if err := f.ForSeries(
ctx,
user,
bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return
return true
default:
res := &v1.Series{
Fingerprint: fp,
Expand All @@ -163,6 +152,7 @@ func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBo
}

series = append(series, res)
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
Expand Down
9 changes: 5 additions & 4 deletions pkg/bloomcompactor/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type forSeriesTestImpl []*v1.Series

func (f forSeriesTestImpl) ForSeries(
_ context.Context,
_ string,
_ index.FingerprintFilter,
_ model.Time,
_ model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) bool,
_ ...*labels.Matcher,
) error {
for i := range f {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestTSDBSeriesIter(t *testing.T) {
},
}
srcItr := v1.NewSliceIter(input)
itr, err := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
Expand All @@ -78,7 +79,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {
t.Run("expires on creation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{}, // a single entry
}, v1.NewBounds(0, math.MaxUint64))
require.Error(t, err)
Expand All @@ -87,7 +88,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {

t.Run("expires during consumption", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{},
{},
}, v1.NewBounds(0, math.MaxUint64))
Expand Down
26 changes: 14 additions & 12 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand All @@ -63,6 +62,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
Expand Down Expand Up @@ -196,15 +196,17 @@ func (g *Gateway) stopping(_ error) error {

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs")
defer sp.Finish()

tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

logger := log.With(g.logger, "tenant", tenantID)
sp, ctx := spanlogger.NewWithLogger(
ctx,
log.With(g.logger, "tenant", tenantID),
"bloomgateway.FilterChunkRefs",
)
defer sp.Finish()

// start time == end time --> empty response
if req.From.Equal(req.Through) {
Expand Down Expand Up @@ -237,7 +239,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

sp.LogKV(
sp.Log(
"filters", len(filters),
"days", len(seriesByDay),
"series_requested", len(req.Refs),
Expand All @@ -254,7 +256,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))

level.Debug(g.logger).Log(
level.Debug(sp).Log(
"msg", "created task for day",
"task", task.ID,
"day", seriesForDay.day,
Expand All @@ -276,7 +278,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
level.Info(sp).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
Expand All @@ -289,7 +291,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
go g.consumeTask(ctx, task, tasksCh)
}

sp.LogKV("enqueue_duration", time.Since(queueStart).String())
sp.Log("enqueue_duration", time.Since(queueStart).String())

remaining := len(tasks)

Expand All @@ -305,7 +307,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
return nil, errors.Wrap(task.Err(), "request failed")
}
Expand All @@ -314,7 +316,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
}

sp.LogKV("msg", "received all responses")
sp.Log("msg", "received all responses")

filtered := filterChunkRefs(req, responses)

Expand All @@ -333,7 +335,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))

level.Info(logger).Log(
level.Info(sp).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
Expand Down
Loading
Loading