Skip to content

Commit

Permalink
Add bloom planner and bloom builder to backend target
Browse files Browse the repository at this point in the history
in simple scalable deployment

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Sep 2, 2024
1 parent ef1df0e commit be9eb50
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docs/sources/operations/query-acceleration-blooms.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
{{< /admonition >}}

To start building and using blooms you need to:
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Enable blooms building and filtering for each tenant individually, or for all of them by default.

```yaml
Expand Down
42 changes: 37 additions & 5 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

type Builder struct {
Expand All @@ -47,6 +48,10 @@ type Builder struct {
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -59,6 +64,7 @@ func New(
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

Expand All @@ -82,18 +88,33 @@ func New(
logger: logger,
}

if rm != nil {
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
}

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}

func (b *Builder) starting(_ context.Context) error {
func (b *Builder) starting(ctx context.Context) error {
if b.ringWatcher != nil {
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
return fmt.Errorf("error starting builder subservices: %w", err)
}
}
b.metrics.running.Set(1)
return nil
}

func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)

if b.ringWatcher != nil {
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
return fmt.Errorf("error stopping builder subservices: %w", err)
}
}

if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
Expand Down Expand Up @@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}

func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
}

addr, err := b.ringWatcher.GetLeaderAddress()
if err != nil {
return b.cfg.PlannerAddress
}

return addr
}

func (b *Builder) connectAndBuild(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand Down
119 changes: 119 additions & 0 deletions pkg/bloombuild/common/ringwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package common

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

const (
RingKeyOfLeader = 0xffff
)

type RingWatcher struct {
services.Service
id string
ring *ring.Ring
leader *ring.InstanceDesc
lookupPeriod time.Duration
logger log.Logger
}

// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
// The leader instance is the instance that owns the key `RingKeyOfLeader`.
// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
// part of the `backend` target of the Simple Scalable Deployment (SSD).
// It should not be used for any other components outside of the bloombuild package.
func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
w := &RingWatcher{
id: id,
ring: ring,
lookupPeriod: lookupPeriod,
logger: logger,
}
w.Service = services.NewBasicService(nil, w.updateLoop, nil)
return w
}

func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-syncTicker.C:
w.lookupAddresses()
if w.leader != nil {
return nil
}
}
}
}

func (w *RingWatcher) updateLoop(ctx context.Context) error {
_ = w.waitForInitialLeader(ctx)

syncTicker := time.NewTicker(w.lookupPeriod)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-syncTicker.C:
w.lookupAddresses()
}
}
}

func (w *RingWatcher) lookupAddresses() {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
if err != nil {
level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
w.leader = nil
return
}

for i := range rs.Instances {
inst := rs.Instances[i]
state, err := w.ring.GetInstanceState(inst.Id)
if err != nil || state != ring.ACTIVE {
return
}
tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
if w.leader == nil || w.leader.Id != inst.Id {
level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
}
w.leader = &inst
return
}
}

w.leader = nil
}

func (w *RingWatcher) IsLeader() bool {
return w.IsInstanceLeader(w.id)
}

func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
res := w.leader != nil && w.leader.Id == instanceID
level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
return res
}

func (w *RingWatcher) GetLeaderAddress() (string, error) {
if w.leader == nil {
return "", ring.ErrEmptyRing
}
return w.leader.Addr, nil
}
48 changes: 43 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

var errPlannerIsNotRunning = errors.New("planner is not running")
var (
errPlannerIsNotRunning = errors.New("planner is not running")
errPlannerIsNotLeader = errors.New("planner is not leader")
)

type Planner struct {
services.Service
Expand All @@ -52,6 +56,10 @@ type Planner struct {

metrics *Metrics
logger log.Logger

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -63,6 +71,7 @@ func New(
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

Expand Down Expand Up @@ -101,6 +110,12 @@ func New(
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}

if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
svcs = append(svcs, p.ringWatcher)
}

p.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("error creating subservices manager: %w", err)
Expand All @@ -112,6 +127,15 @@ func New(
return p, nil
}

func (p *Planner) isLeader() bool {
if p.ringWatcher == nil {
// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
// therefore we can safely assume that the planner is a singleton
return true
}
return p.ringWatcher.IsLeader()
}

func (p *Planner) starting(ctx context.Context) (err error) {
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
return fmt.Errorf("error starting planner subservices: %w", err)
Expand All @@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
func (p *Planner) running(ctx context.Context) error {
go p.trackInflightRequests(ctx)

// run once at beginning
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}
// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
initialPlanningTimer := time.NewTimer(time.Minute)
defer initialPlanningTimer.Stop()

planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()
Expand All @@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
level.Debug(p.logger).Log("msg", "planner context done")
return nil

case <-initialPlanningTimer.C:
level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
}

case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
Expand Down Expand Up @@ -192,6 +221,10 @@ type tenantTable struct {
}

func (p *Planner) runOne(ctx context.Context) error {
if !p.isLeader() {
return errPlannerIsNotLeader
}

var (
wg sync.WaitGroup
start = time.Now()
Expand Down Expand Up @@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)

if !p.isLeader() {
return errPlannerIsNotLeader
}

level.Debug(logger).Log("msg", "builder connected")

p.tasksQueue.RegisterConsumerConnection(builderID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func createPlanner(
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
require.NoError(t, err)

return planner
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {

Read: {QueryFrontend, Querier},
Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
}
Expand Down
Loading

0 comments on commit be9eb50

Please sign in to comment.