Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(blooms): Resolve bloom blocks on index gateway and shard by block address #12720

Merged
merged 28 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6cf356a
Add metric that observes time spent resolving blocks
chaudum Apr 17, 2024
a4c5b7e
Remove deprecated field from FilterChunkRefsRequest payload
chaudum Apr 17, 2024
1042f31
Add Blocks field to FilterChunkRefRequest message
chaudum Apr 17, 2024
882694e
Partition filter requests by day on client side
chaudum Apr 17, 2024
533eb09
Reject queries on bloom gw when they span across multiple days
chaudum Apr 17, 2024
d511035
Do not split request by day in bloom gateway
chaudum Apr 18, 2024
9f98240
Pass down blocks from requests to tasks
chaudum Apr 18, 2024
be1fa4f
Add processor test case with providing blocks
chaudum Apr 18, 2024
4e6f11d
Resolve blocks and shard by block address on index gateways
chaudum Apr 19, 2024
1c0393b
Add BloomStore as dependency for IndexGateway
chaudum Apr 19, 2024
43b046d
Use interval for block resolver
chaudum Apr 20, 2024
16fe790
Cleanup
chaudum Apr 20, 2024
b103e27
Test and benchmark for chunk grouping
chaudum Apr 20, 2024
15d69aa
Additional test for partitioning request
chaudum Apr 22, 2024
ec54d21
Add span to bloom querier
chaudum Apr 22, 2024
6971505
Debugging
chaudum Apr 22, 2024
ba56649
fixup! Debugging
chaudum Apr 22, 2024
fa8b97d
Sort input series before sending to bloom gateway
chaudum Apr 22, 2024
8938f97
Fix linter error
chaudum Apr 22, 2024
4d70f88
Remove unused test
chaudum Apr 22, 2024
529c553
Revert some gateway changes to keep PR minimal
chaudum Apr 22, 2024
8f977d3
Make linter happy
chaudum Apr 22, 2024
736a77c
fixup! Make linter happy
chaudum Apr 22, 2024
5ae6596
Return unfiltered list if no block are given
chaudum Apr 22, 2024
e53ee29
Remove old code path for resolving blocks on bloom gateways
chaudum Apr 22, 2024
4724cca
Assume sorted chunk when grouping by fingerprint
chaudum Apr 22, 2024
b8dd578
Define index gateway service dependencies statically
chaudum Apr 22, 2024
a04b558
fixup! Define index gateway service dependencies statically
chaudum Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,49 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)
blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
for _, key := range req.Blocks {
block, err := bloomshipper.BlockRefFromKey(key)
if err != nil {
stats.Status = labelFailure
return nil, errors.New("could not parse block key")
}
blocks = append(blocks, block)
}

// no tasks --> empty response
if len(seriesByDay) == 0 {
// Shortcut if request does not contain blocks
if len(blocks) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
ChunkRefs: req.Refs,
}, nil
}

// TODO(chaudum): I intentionally keep the logic for handling multiple tasks,
// so that the PR does not explode in size. This should be cleaned up at some point.

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)

sp.LogKV(
"filters", len(filters),
"days", len(seriesByDay),
"blocks", len(req.Blocks),
"series_requested", len(req.Refs),
)

if len(seriesByDay) != 1 {
stats.Status = labelFailure
return nil, errors.New("request time range must span exactly one day")
}

tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters)
task, err := NewTask(ctx, tenantID, seriesForDay, filters, blocks)
if err != nil {
return nil, err
}

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
tasks = append(tasks, task)
Expand Down
73 changes: 66 additions & 7 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,24 @@ import (
"github.com/grafana/loki/v3/pkg/validation"
)

func stringSlice[T fmt.Stringer](s []T) []string {
res := make([]string, len(s))
for i := range res {
res[i] = s[i].String()
}
return res
}

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
grouped := make([]*logproto.GroupedChunkRefs, 0, len(chunkRefs))
return groupChunkRefs(chunkRefs, grouped)
return groupChunkRefs(chunkRefs, nil)
}

func newLimits() *validation.Overrides {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.BloomGatewayEnabled = true
limits.BloomGatewayShardSize = 1

overrides, _ := validation.NewOverrides(limits, nil)
return overrides
Expand Down Expand Up @@ -129,11 +137,46 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
MaxOutstandingPerTenant: 1024,
}

t.Run("shipper error is propagated", func(t *testing.T) {
t.Run("request fails when providing invalid block", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)

expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: []string{"bloom/invalid/block.tar.gz"},
}

ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
require.ErrorContainsf(t, err, "could not parse block key", "%+v", res)
})

t.Run("shipper error is propagated", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("request failed")

reg := prometheus.NewRegistry()
Expand All @@ -160,6 +203,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -175,7 +219,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond

Expand Down Expand Up @@ -203,6 +247,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand All @@ -228,11 +273,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)
})

// input chunks need to be sorted by their fingerprint
chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1},
{Fingerprint: 1000, UserID: tenantID, From: now.Add(-22 * time.Hour), Through: now.Add(-21 * time.Hour), Checksum: 2},
{Fingerprint: 2000, UserID: tenantID, From: now.Add(-20 * time.Hour), Through: now.Add(-19 * time.Hour), Checksum: 3},
{Fingerprint: 1000, UserID: tenantID, From: now.Add(-23 * time.Hour), Through: now.Add(-22 * time.Hour), Checksum: 4},
{Fingerprint: 2000, UserID: tenantID, From: now.Add(-20 * time.Hour), Through: now.Add(-19 * time.Hour), Checksum: 3},
{Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1},
}
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Expand Down Expand Up @@ -284,13 +330,24 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Checksum: uint32(idx),
},
}
ref := bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: tenantID,
TableName: "table_1",
Bounds: v1.NewBounds(0, 10000),
StartTimestamp: now.Add(-24 * time.Hour),
EndTimestamp: now,
Checksum: uint32(idx),
},
}
expr, err := syntax.ParseExpr(`{foo="bar"} |= "foo"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice([]bloomshipper.BlockRef{ref}),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
_, err = gw.FilterChunkRefs(ctx, req)
Expand All @@ -303,7 +360,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)
Expand All @@ -329,6 +386,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: inputChunkRefs,
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
Expand Down Expand Up @@ -361,6 +419,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Through: now,
Refs: inputChunkRefs,
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,14 @@ func TestCache(t *testing.T) {
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, *calls)
require.Equal(t, expectedRes, res)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)

// Doing a request again should only hit the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.Equal(t, expectedRes, res)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)
}

type mockServer struct {
Expand Down
75 changes: 47 additions & 28 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +15,6 @@ import (
ringclient "github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
)
Expand Down Expand Up @@ -111,12 +112,11 @@ func (i *ClientConfig) Validate() error {
}

type Client interface {
FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}

type GatewayClient struct {
cfg ClientConfig
limits Limits
logger log.Logger
metrics *clientMetrics
pool *JumpHashClientPool
Expand Down Expand Up @@ -188,7 +188,6 @@ func NewClient(
return &GatewayClient{
cfg: cfg,
logger: logger,
limits: limits,
metrics: metrics,
pool: pool,
dnsProvider: dnsProvider, // keep reference so we can stop it when the client is closed
Expand All @@ -201,26 +200,41 @@ func (c *GatewayClient) Close() {
}

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
if !c.limits.BloomGatewayEnabled(tenant) || len(groups) == 0 {
return groups, nil
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
// no block and therefore no series with chunks
if len(blocks) == 0 {
return nil, nil
}

clients := make(map[string][]*logproto.GroupedChunkRefs)
for _, g := range groups {
addr, err := c.pool.AddrForFingerprint(g.Fingerprint)
firstFp, lastFp := uint64(math.MaxUint64), uint64(0)
pos := make(map[string]int)
servers := make([]addrWithGroups, 0, len(blocks))
for _, blockWithSeries := range blocks {
addr, err := c.pool.Addr(blockWithSeries.block.String())
if err != nil {
return nil, errors.Wrap(err, "server address for fingerprint")
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
}
clients[addr] = append(clients[addr], g)
}

servers := make([]addrWithGroups, 0, len(clients))
for k, v := range clients {
servers = append(servers, addrWithGroups{
groups: v,
addr: k,
})
// min/max fingerprint needed for the cache locality score
first, last := getFirstLast(blockWithSeries.series)
if first.Fingerprint < firstFp {
firstFp = first.Fingerprint
}
if last.Fingerprint > lastFp {
lastFp = last.Fingerprint
}

if idx, found := pos[addr]; found {
servers[idx].groups = append(servers[idx].groups, blockWithSeries.series...)
servers[idx].blocks = append(servers[idx].blocks, blockWithSeries.block.String())
} else {
pos[addr] = len(servers)
servers = append(servers, addrWithGroups{
addr: addr,
blocks: []string{blockWithSeries.block.String()},
groups: blockWithSeries.series,
})
}
}

if len(servers) > 0 {
Expand All @@ -229,7 +243,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
// but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of
// `1-2/num_instances` -> `1`, where the former represents slight
// overlap on instances to the left and right of the range.
firstFp, lastFp := groups[0].Fingerprint, groups[len(groups)-1].Fingerprint
pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64)
pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs())))
cacheLocalityScore := pctKeyspace / pctInstances
Expand All @@ -241,22 +254,27 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]

sort.Slice(rs.groups, func(i, j int) bool {
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

level.Info(c.logger).Log(
"msg", "do FilterChunkRefs for addresses",
"progress", fmt.Sprintf("%d/%d", i+1, len(servers)),
"part", fmt.Sprintf("%d/%d", i+1, len(servers)),
"addr", rs.addr,
"from", from.Time(),
"through", through.Time(),
"num_refs", len(rs.groups),
"plan", plan.String(),
"plan_hash", plan.Hash(),
"from", interval.Start.Time(),
"through", interval.End.Time(),
"series", len(rs.groups),
"blocks", len(rs.blocks),
"tenant", tenant,
)

return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: from,
Through: through,
From: interval.Start,
Through: interval.End,
Refs: rs.groups,
Blocks: rs.blocks,
Plan: plan,
}
resp, err := client.FilterChunkRefs(ctx, req)
Expand Down Expand Up @@ -308,5 +326,6 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway

type addrWithGroups struct {
addr string
blocks []string
groups []*logproto.GroupedChunkRefs
}
Loading
Loading