Skip to content

Commit

Permalink
feat: Boilerplate for new bloom build planner and worker components. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored May 20, 2024
1 parent 88e545f commit 8978ecf
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 1 deletion.
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ pattern_ingester:
# merging them as bloom blocks.
[bloom_compactor: <bloom_compactor>]

bloom_build:
# Flag to enable or disable the usage of the bloom-planner and bloom-builder
# components.
# CLI flag: -bloom-build.enabled
[enabled: <boolean> | default = false]

planner:

builder:

# Experimental: The bloom_gateway block configures the Loki bloom gateway
# server, responsible for serving queries for filtering chunks based on filter
# expressions.
Expand Down
50 changes: 50 additions & 0 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package builder

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

utillog "github.com/grafana/loki/v3/pkg/util/log"
)

type Worker struct {
services.Service

cfg Config
metrics *Metrics
logger log.Logger
}

func New(
cfg Config,
logger log.Logger,
r prometheus.Registerer,
) (*Worker, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

w := &Worker{
cfg: cfg,
metrics: NewMetrics(r),
logger: logger,
}

w.Service = services.NewBasicService(w.starting, w.running, w.stopping)
return w, nil
}

func (w *Worker) starting(_ context.Context) (err error) {
w.metrics.running.Set(1)
return err
}

func (w *Worker) stopping(_ error) error {
w.metrics.running.Set(0)
return nil
}

func (w *Worker) running(_ context.Context) error {
return nil
}
21 changes: 21 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package builder

import "flag"

// Config configures the bloom-builder component.
type Config struct {
// TODO: Add config
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
// TODO: Register flags with flagsPrefix
}

func (cfg *Config) Validate() error {
return nil
}

type Limits interface {
// TODO: Add limits
}
26 changes: 26 additions & 0 deletions pkg/bloombuild/builder/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package builder

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
metricsNamespace = "loki"
metricsSubsystem = "bloombuilder"
)

type Metrics struct {
running prometheus.Gauge
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if the bloom builder is currently running on this instance",
}),
}
}
40 changes: 40 additions & 0 deletions pkg/bloombuild/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package bloombuild

import (
"flag"
"fmt"

"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
)

// Config configures the bloom-planner component.
type Config struct {
Enabled bool `yaml:"enabled"`

Planner planner.Config `yaml:"planner"`
Builder builder.Config `yaml:"builder"`
}

// RegisterFlags registers flags for the bloom building configuration.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "bloom-build.enabled", false, "Flag to enable or disable the usage of the bloom-planner and bloom-builder components.")
cfg.Planner.RegisterFlagsWithPrefix("bloom-build.planner", f)
cfg.Builder.RegisterFlagsWithPrefix("bloom-build.builder", f)
}

func (cfg *Config) Validate() error {
if !cfg.Enabled {
return nil
}

if err := cfg.Planner.Validate(); err != nil {
return fmt.Errorf("invalid bloom planner configuration: %w", err)
}

if err := cfg.Builder.Validate(); err != nil {
return fmt.Errorf("invalid bloom builder configuration: %w", err)
}

return nil
}
21 changes: 21 additions & 0 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package planner

import "flag"

// Config configures the bloom-planner component.
type Config struct {
// TODO: Add config
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
// TODO: Register flags with flagsPrefix
}

func (cfg *Config) Validate() error {
return nil
}

type Limits interface {
// TODO: Add limits
}
26 changes: 26 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package planner

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
metricsNamespace = "loki"
metricsSubsystem = "bloomplanner"
)

type Metrics struct {
running prometheus.Gauge
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if bloom planner is currently running on this instance",
}),
}
}
50 changes: 50 additions & 0 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package planner

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

utillog "github.com/grafana/loki/v3/pkg/util/log"
)

type Planner struct {
services.Service

cfg Config
metrics *Metrics
logger log.Logger
}

func New(
cfg Config,
logger log.Logger,
r prometheus.Registerer,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

p := &Planner{
cfg: cfg,
metrics: NewMetrics(r),
logger: logger,
}

p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
return p, nil
}

func (p *Planner) starting(_ context.Context) (err error) {
p.metrics.running.Set(1)
return err
}

func (p *Planner) stopping(_ error) error {
p.metrics.running.Set(0)
return nil
}

func (p *Planner) running(_ context.Context) error {
return nil
}
7 changes: 7 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/bloombuild"
"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
Expand Down Expand Up @@ -90,6 +91,7 @@ type Config struct {
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
BloomCompactor bloomcompactor.Config `yaml:"bloom_compactor,omitempty" category:"experimental"`
BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"`
BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
Expand Down Expand Up @@ -173,6 +175,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Tracing.RegisterFlags(f)
c.CompactorConfig.RegisterFlags(f)
c.BloomCompactor.RegisterFlags(f)
c.BloomBuild.RegisterFlags(f)
c.QueryScheduler.RegisterFlags(f)
c.Analytics.RegisterFlags(f)
c.OperationalConfig.RegisterFlags(f)
Expand Down Expand Up @@ -649,6 +652,8 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(BloomStore, t.initBloomStore)
mm.RegisterModule(BloomCompactor, t.initBloomCompactor)
mm.RegisterModule(BloomCompactorRing, t.initBloomCompactorRing, modules.UserInvisibleModule)
mm.RegisterModule(BloomPlanner, t.initBloomPlanner)
mm.RegisterModule(BloomBuilder, t.initBloomBuilder)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule)
Expand Down Expand Up @@ -686,6 +691,8 @@ func (t *Loki) setupModuleManager() error {
IndexGateway: {Server, Store, BloomStore, IndexGatewayRing, IndexGatewayInterceptors, Analytics},
BloomGateway: {Server, BloomStore, Analytics},
BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store},
BloomPlanner: {Server, BloomStore, Analytics, Store},
BloomBuilder: {Server, BloomStore, Analytics, Store},
PatternIngester: {Server, MemberlistKV, Analytics},
PatternRingClient: {Server, MemberlistKV, Analytics},
IngesterQuerier: {Ring},
Expand Down
34 changes: 33 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
compactorclient "github.com/grafana/loki/v3/pkg/compactor/client"
Expand Down Expand Up @@ -122,6 +124,8 @@ const (
QuerySchedulerRing string = "query-scheduler-ring"
BloomCompactor string = "bloom-compactor"
BloomCompactorRing string = "bloom-compactor-ring"
BloomPlanner string = "bloom-planner"
BloomBuilder string = "bloom-builder"
BloomStore string = "bloom-store"
All string = "all"
Read string = "read"
Expand Down Expand Up @@ -803,7 +807,7 @@ func (t *Loki) updateConfigForShipperStore() {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor):
case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor), t.Cfg.isTarget(BloomPlanner), t.Cfg.isTarget(BloomBuilder):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
Expand Down Expand Up @@ -1553,6 +1557,34 @@ func (t *Loki) initBloomCompactorRing() (services.Service, error) {
return t.bloomCompactorRingManager, nil
}

func (t *Loki) initBloomPlanner() (services.Service, error) {
if !t.Cfg.BloomBuild.Enabled {
return nil, nil
}

logger := log.With(util_log.Logger, "component", "bloom-planner")

return planner.New(
t.Cfg.BloomBuild.Planner,
logger,
prometheus.DefaultRegisterer,
)
}

func (t *Loki) initBloomBuilder() (services.Service, error) {
if !t.Cfg.BloomBuild.Enabled {
return nil, nil
}

logger := log.With(util_log.Logger, "component", "bloom-worker")

return builder.New(
t.Cfg.BloomBuild.Builder,
logger,
prometheus.DefaultRegisterer,
)
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace)
if err != nil {
Expand Down

0 comments on commit 8978ecf

Please sign in to comment.