Skip to content

Commit

Permalink
feat(blooms): bloom integration in query planning (#12208)
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d authored Mar 26, 2024
1 parent d5ecf9a commit a36483b
Show file tree
Hide file tree
Showing 121 changed files with 6,070 additions and 876 deletions.
35 changes: 21 additions & 14 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2695,18 +2695,18 @@ ring:
# CLI flag: -bloom-compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]
# How many index periods (days) to wait before building bloom filters for a
# table. This can be used to lower cost by not re-writing data to object storage
# too frequently since recent data changes more often.
# CLI flag: -bloom-compactor.min-table-compaction-period
[min_table_compaction_period: <int> | default = 1]
# The maximum number of index periods (days) to build bloom filters for a table.
# This can be used to lower cost by not trying to compact older data which
# doesn't change. This can be optimized by aligning it with the maximum
# `reject_old_samples_max_age` setting of any tenant.
# CLI flag: -bloom-compactor.max-table-compaction-period
[max_table_compaction_period: <int> | default = 7]
# Newest day-table offset (from today, inclusive) to compact. Increase to lower
# cost by not re-writing data to object storage too frequently since recent data
# changes more often at the cost of not having blooms available as quickly.
# CLI flag: -bloom-compactor.min-table-offset
[min_table_offset: <int> | default = 1]
# Oldest day-table offset (from today, inclusive) to compact. This can be used
# to lower cost by not trying to compact older data which doesn't change. This
# can be optimized by aligning it with the maximum `reject_old_samples_max_age`
# setting of any tenant.
# CLI flag: -bloom-compactor.max-table-offset
[max_table_offset: <int> | default = 2]

# Number of workers to run in parallel for compaction.
# CLI flag: -bloom-compactor.worker-parallelism
Expand Down Expand Up @@ -2871,11 +2871,18 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.tsdb-max-query-parallelism
[tsdb_max_query_parallelism: <int> | default = 128]

# Maximum number of bytes assigned to a single sharded query. Also expressible
# in human readable forms (1GB, etc).
# Target maximum number of bytes assigned to a single sharded query. Also
# expressible in human readable forms (1GB, etc). Note: This is a _target_ and
# not an absolute limit. The actual limit can be higher, but the query planner
# will try to build shards up to this limit.
# CLI flag: -querier.tsdb-max-bytes-per-shard
[tsdb_max_bytes_per_shard: <int> | default = 600MB]

# sharding strategy to use in query planning. Suggested to use bounded once all
# nodes can recognize it.
# CLI flag: -limits.tsdb-sharding-strategy
[tsdb_sharding_strategy: <string> | default = "power_of_two"]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down
20 changes: 8 additions & 12 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ type Compactor struct {

sharding util_ring.TenantSharding

metrics *Metrics
btMetrics *v1.Metrics
metrics *Metrics
}

func New(
Expand All @@ -67,7 +66,7 @@ func New(
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
limits Limits,
store bloomshipper.Store,
store bloomshipper.StoreWithMetrics,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand All @@ -78,6 +77,7 @@ func New(
sharding: sharding,
limits: limits,
bloomStore: store,
metrics: NewMetrics(r, store.BloomMetrics()),
}

tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics, logger)
Expand All @@ -86,10 +86,6 @@ func New(
}
c.tsdbStore = tsdbStore

// initialize metrics
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer_", r))
c.metrics = NewMetrics(r, c.btMetrics)

chunkLoader := NewStoreChunkLoader(
fetcherProvider,
c.metrics,
Expand Down Expand Up @@ -258,12 +254,12 @@ func (c *Compactor) runOne(ctx context.Context) error {
func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
adjustedMin := min(c.cfg.MinTableCompactionPeriod - 1)
minCompactionPeriod := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionPeriod := time.Duration(c.cfg.MaxTableCompactionPeriod) * config.ObjectStorageIndexRequiredPeriod
adjustedMin := min(c.cfg.MinTableOffset - 1)
minCompactionDelta := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionDelta := time.Duration(c.cfg.MaxTableOffset) * config.ObjectStorageIndexRequiredPeriod

from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
from := ts.Add(-maxCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
Expand Down
25 changes: 12 additions & 13 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ type Config struct {
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableCompactionPeriod int `yaml:"min_table_compaction_period"`
MaxTableCompactionPeriod int `yaml:"max_table_compaction_period"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
Expand All @@ -40,15 +40,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.")
// TODO(owen-d): This is a confusing name. Rename it to `min_table_offset`
f.IntVar(&cfg.MinTableCompactionPeriod, "bloom-compactor.min-table-compaction-period", 1, "How many index periods (days) to wait before building bloom filters for a table. This can be used to lower cost by not re-writing data to object storage too frequently since recent data changes more often.")
f.IntVar(&cfg.MinTableOffset, "bloom-compactor.min-table-offset", 1, "Newest day-table offset (from today, inclusive) to compact. Increase to lower cost by not re-writing data to object storage too frequently since recent data changes more often at the cost of not having blooms available as quickly.")
// TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting,
// but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by
// iterating the table periods in object storage and looking for tenants within that period.
// In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also
// dynamically reloaded.
// I'm doing it the simple way for now.
// TODO(owen-d): This is a confusing name. Rename it to `max_table_offset`
f.IntVar(&cfg.MaxTableCompactionPeriod, "bloom-compactor.max-table-compaction-period", 7, "The maximum number of index periods (days) to build bloom filters for a table. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxTableOffset, "bloom-compactor.max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.")
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
Expand All @@ -67,8 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}
if cfg.Ring.ReplicationFactor != ringReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

b.curr = v1.NewBlock(reader)
b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics)
return true
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

res = append(res, v1.NewBlock(reader))
res = append(res, v1.NewBlock(reader, v1.NewMetrics(nil)))
ref := genBlockRef(data[minIdx].Series.Fingerprint, data[maxIdx-1].Series.Fingerprint)
t.Log("create block", ref)
refs = append(refs, ref)
Expand All @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b),
BlockQuerier: v1.NewBlockQuerier(b, false),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
bq := v1.NewBlockQuerier(block, false)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
)

const (
Expand Down Expand Up @@ -121,34 +122,22 @@ func (b *BloomTSDBStore) LoadTSDB(
}
}()

return NewTSDBSeriesIter(ctx, idx, bounds)
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
}

// TSDBStore is an interface for interacting with the TSDB,
// modeled off a relevant subset of the `tsdb.TSDBIndex` struct
type forSeries interface {
ForSeries(
ctx context.Context,
fpFilter index.FingerprintFilter,
from model.Time,
through model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error
}

func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)

if err := f.ForSeries(
ctx,
user,
bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return
return true
default:
res := &v1.Series{
Fingerprint: fp,
Expand All @@ -163,6 +152,7 @@ func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBo
}

series = append(series, res)
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
Expand Down
9 changes: 5 additions & 4 deletions pkg/bloomcompactor/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type forSeriesTestImpl []*v1.Series

func (f forSeriesTestImpl) ForSeries(
_ context.Context,
_ string,
_ index.FingerprintFilter,
_ model.Time,
_ model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) bool,
_ ...*labels.Matcher,
) error {
for i := range f {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestTSDBSeriesIter(t *testing.T) {
},
}
srcItr := v1.NewSliceIter(input)
itr, err := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
Expand All @@ -78,7 +79,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {
t.Run("expires on creation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{}, // a single entry
}, v1.NewBounds(0, math.MaxUint64))
require.Error(t, err)
Expand All @@ -87,7 +88,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {

t.Run("expires during consumption", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{},
{},
}, v1.NewBounds(0, math.MaxUint64))
Expand Down
26 changes: 14 additions & 12 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand All @@ -63,6 +62,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
Expand Down Expand Up @@ -196,15 +196,17 @@ func (g *Gateway) stopping(_ error) error {

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs")
defer sp.Finish()

tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

logger := log.With(g.logger, "tenant", tenantID)
sp, ctx := spanlogger.NewWithLogger(
ctx,
log.With(g.logger, "tenant", tenantID),
"bloomgateway.FilterChunkRefs",
)
defer sp.Finish()

// start time == end time --> empty response
if req.From.Equal(req.Through) {
Expand Down Expand Up @@ -237,7 +239,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

sp.LogKV(
sp.Log(
"filters", len(filters),
"days", len(seriesByDay),
"series_requested", len(req.Refs),
Expand All @@ -254,7 +256,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))

level.Debug(g.logger).Log(
level.Debug(sp).Log(
"msg", "created task for day",
"task", task.ID,
"day", seriesForDay.day,
Expand All @@ -276,7 +278,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
level.Info(sp).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
Expand All @@ -289,7 +291,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
go g.consumeTask(ctx, task, tasksCh)
}

sp.LogKV("enqueue_duration", time.Since(queueStart).String())
sp.Log("enqueue_duration", time.Since(queueStart).String())

remaining := len(tasks)

Expand All @@ -305,7 +307,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
return nil, errors.Wrap(task.Err(), "request failed")
}
Expand All @@ -314,7 +316,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
}

sp.LogKV("msg", "received all responses")
sp.Log("msg", "received all responses")

filtered := filterChunkRefs(req, responses)

Expand All @@ -333,7 +335,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))

level.Info(logger).Log(
level.Info(sp).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
Expand Down
Loading

0 comments on commit a36483b

Please sign in to comment.