Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
Browse files Browse the repository at this point in the history
…ng-i

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Mar 22, 2024
2 parents b0350ae + 863acc0 commit 5b70e96
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 27 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,10 @@ client:
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
# Number of blocks processed concurrently on a single worker.
# CLI flag: -bloom-gateway.block-query-concurrency
[block_query_concurrency: <int> | default = 4]
# Maximum number of outstanding tasks per tenant.
# CLI flag: -bloom-gateway.max-outstanding-per-tenant
[max_outstanding_per_tenant: <int> | default = 1024]
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus
logger: logger,
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
workerConfig: workerConfig{
maxItems: cfg.NumMultiplexItems,
maxItems: cfg.NumMultiplexItems,
queryConcurrency: cfg.BlockQueryConcurrency,
},
pendingTasks: &atomic.Int64{},

Expand Down
18 changes: 8 additions & 10 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func TestBloomGateway_StartStopService(t *testing.T) {
func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"

store := setupBloomStore(t)
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

Expand All @@ -156,7 +155,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ReplicationFactor: 1,
NumTokens: 16,
},
WorkerConcurrency: 4,
WorkerConcurrency: 2,
BlockQueryConcurrency: 2,
MaxOutstandingPerTenant: 1024,
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -333,15 +333,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("use fuse queriers to filter chunks", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

gw.bloomStore = newMockBloomStore(queriers, metas)
err = gw.initServices()
reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)

gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down
2 changes: 2 additions & 0 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Client ClientConfig `yaml:"client,omitempty" doc:""`

WorkerConcurrency int `yaml:"worker_concurrency"`
BlockQueryConcurrency int `yaml:"block_query_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
}
Expand All @@ -31,6 +32,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 4, "Number of blocks processed concurrently on a single worker.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.")
// TODO(chaudum): Figure out what the better place is for registering flags
Expand Down
23 changes: 12 additions & 11 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
func newProcessor(id string, concurrency int, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
store: store,
logger: logger,
metrics: metrics,
id: id,
concurrency: concurrency,
store: store,
logger: logger,
metrics: metrics,
}
}

type processor struct {
id string
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
id string
concurrency int // concurrency at which bloom blocks are processed
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
Expand Down Expand Up @@ -95,8 +97,7 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
return err
}

// TODO(chaudum): What's a good cocurrency value?
return concurrency.ForEachJob(ctx, len(bqs), 10, func(ctx context.Context, i int) error {
return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
// TODO(chaudum): Add metric for skipped blocks
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestProcessor(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestProcessor(t *testing.T) {
mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("store failed")

p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
)

type workerConfig struct {
maxItems int
maxItems int
queryConcurrency int
}

// worker is a datastructure that consumes tasks from the request queue,
Expand Down Expand Up @@ -65,7 +66,7 @@ func (w *worker) starting(_ context.Context) error {
func (w *worker) running(_ context.Context) error {
idx := queue.StartIndexWithLocalQueue

p := newProcessor(w.id, w.store, w.logger, w.metrics)
p := newProcessor(w.id, w.cfg.queryConcurrency, w.store, w.logger, w.metrics)

for st := w.State(); st == services.Running || st == services.Stopping; {
taskCtx := context.Background()
Expand Down
9 changes: 8 additions & 1 deletion pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package logql

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/c2h5oh/datasize"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -388,6 +390,10 @@ func RecordShardsQueryMetrics(
latencyType = latencyTypeSlow
}

var bloomRatio float64
if stats.Index.TotalChunks > 0 {
bloomRatio = float64(stats.Index.PostFilterChunks) / float64(stats.Index.TotalChunks)
}
logValues := make([]interface{}, 0, 15)
logValues = append(logValues,
"latency", latencyType,
Expand All @@ -401,10 +407,11 @@ func RecordShardsQueryMetrics(
"status", status,
"query", query,
"query_hash", util.HashedQuery(query),
"target_bytes_per_shard", targetBytesPerShard,
"target_bytes_per_shard", datasize.ByteSize(targetBytesPerShard).HumanReadable(),
"shards", shards,
"total_chunks", stats.Index.TotalChunks,
"post_filter_chunks", stats.Index.PostFilterChunks,
"bloom_filter_ratio", fmt.Sprintf("%.2f", bloomRatio),
)

level.Info(logger).Log(logValues...)
Expand Down

0 comments on commit 5b70e96

Please sign in to comment.