Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Add bloom planner and bloom builder to backend target #13997

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/sources/operations/query-acceleration-blooms.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
{{< /admonition >}}

To start building and using blooms you need to:
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Enable blooms building and filtering for each tenant individually, or for all of them by default.

```yaml
Expand Down
42 changes: 37 additions & 5 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

type Builder struct {
Expand All @@ -47,6 +48,10 @@ type Builder struct {
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -59,6 +64,7 @@ func New(
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

Expand All @@ -82,18 +88,33 @@ func New(
logger: logger,
}

if rm != nil {
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
}

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}

func (b *Builder) starting(_ context.Context) error {
func (b *Builder) starting(ctx context.Context) error {
if b.ringWatcher != nil {
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
return fmt.Errorf("error starting builder subservices: %w", err)
}
}
b.metrics.running.Set(1)
return nil
}

func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)

if b.ringWatcher != nil {
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
return fmt.Errorf("error stopping builder subservices: %w", err)
}
}

if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
Expand Down Expand Up @@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}

func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
}

addr, err := b.ringWatcher.GetLeaderAddress()
if err != nil {
return b.cfg.PlannerAddress
}

return addr
}

func (b *Builder) connectAndBuild(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand Down
119 changes: 119 additions & 0 deletions pkg/bloombuild/common/ringwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package common

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

const (
RingKeyOfLeader = 0xffff
)

type RingWatcher struct {
services.Service
id string
ring *ring.Ring
leader *ring.InstanceDesc
lookupPeriod time.Duration
logger log.Logger
}

// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
// The leader instance is the instance that owns the key `RingKeyOfLeader`.
// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
// part of the `backend` target of the Simple Scalable Deployment (SSD).
// It should not be used for any other components outside of the bloombuild package.
func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
w := &RingWatcher{
id: id,
ring: ring,
lookupPeriod: lookupPeriod,
logger: logger,
}
w.Service = services.NewBasicService(nil, w.updateLoop, nil)
return w
}

func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-syncTicker.C:
w.lookupAddresses()
if w.leader != nil {
return nil
}
}
}
}

func (w *RingWatcher) updateLoop(ctx context.Context) error {
_ = w.waitForInitialLeader(ctx)

syncTicker := time.NewTicker(w.lookupPeriod)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-syncTicker.C:
w.lookupAddresses()
}
}
}

func (w *RingWatcher) lookupAddresses() {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
if err != nil {
level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
w.leader = nil
return
}

for i := range rs.Instances {
inst := rs.Instances[i]
state, err := w.ring.GetInstanceState(inst.Id)
if err != nil || state != ring.ACTIVE {
return
}
tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
if w.leader == nil || w.leader.Id != inst.Id {
level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
}
w.leader = &inst
return
}
}

w.leader = nil
}

func (w *RingWatcher) IsLeader() bool {
return w.IsInstanceLeader(w.id)
}

func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
res := w.leader != nil && w.leader.Id == instanceID
level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
return res
}

func (w *RingWatcher) GetLeaderAddress() (string, error) {
if w.leader == nil {
return "", ring.ErrEmptyRing
}
return w.leader.Addr, nil
}
48 changes: 43 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

var errPlannerIsNotRunning = errors.New("planner is not running")
var (
errPlannerIsNotRunning = errors.New("planner is not running")
errPlannerIsNotLeader = errors.New("planner is not leader")
)

type Planner struct {
services.Service
Expand All @@ -52,6 +56,10 @@ type Planner struct {

metrics *Metrics
logger log.Logger

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -63,6 +71,7 @@ func New(
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

Expand Down Expand Up @@ -101,6 +110,12 @@ func New(
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}

if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
svcs = append(svcs, p.ringWatcher)
}

p.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("error creating subservices manager: %w", err)
Expand All @@ -112,6 +127,15 @@ func New(
return p, nil
}

func (p *Planner) isLeader() bool {
if p.ringWatcher == nil {
// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
// therefore we can safely assume that the planner is a singleton
return true
}
return p.ringWatcher.IsLeader()
}

func (p *Planner) starting(ctx context.Context) (err error) {
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
return fmt.Errorf("error starting planner subservices: %w", err)
Expand All @@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
func (p *Planner) running(ctx context.Context) error {
go p.trackInflightRequests(ctx)

// run once at beginning
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}
// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this ticker could be replaced with a simpler time.Sleep at the beginning of the function if the ringWatcher is not null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Sleep, time.After, and time.NewTimer do essentially the same, but since we already have a select loop, I think it's cleaner to avoid time.Sleep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, a sleep is easier to understand as I don't need to think about which select will trigger earlier. But I don't have a strong opinion on this. Approved.

initialPlanningTimer := time.NewTimer(time.Minute)
defer initialPlanningTimer.Stop()

planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()
Expand All @@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
level.Debug(p.logger).Log("msg", "planner context done")
return nil

case <-initialPlanningTimer.C:
level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
}

case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
Expand Down Expand Up @@ -192,6 +221,10 @@ type tenantTable struct {
}

func (p *Planner) runOne(ctx context.Context) error {
if !p.isLeader() {
return errPlannerIsNotLeader
}

var (
wg sync.WaitGroup
start = time.Now()
Expand Down Expand Up @@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)

if !p.isLeader() {
return errPlannerIsNotLeader
}

level.Debug(logger).Log("msg", "builder connected")

p.tasksQueue.RegisterConsumerConnection(builderID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func createPlanner(
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
require.NoError(t, err)

return planner
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {

Read: {QueryFrontend, Querier},
Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
}
Expand Down
Loading
Loading