Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: allow for multiple targets #3807

Merged
merged 1 commit into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ import (

// Config is the root config for Loki.
type Config struct {
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
HTTPPrefix string `yaml:"http_prefix"`
Target flagext.StringSliceCSV `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
HTTPPrefix string `yaml:"http_prefix"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Expand All @@ -80,7 +80,10 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Server.MetricsNamespace = "loki"
c.Server.ExcludeRequestInLog = true

f.StringVar(&c.Target, "target", All, "target module (default All)")
// Set the default module list to 'all'
c.Target = []string{All}
f.Var(&c.Target, "target", "Comma-separated list of Loki modules to load. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. ")
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")

c.Server.RegisterFlags(f)
Expand Down Expand Up @@ -148,6 +151,10 @@ func (c *Config) Validate() error {
return nil
}

func (c *Config) isModuleEnabled(m string) bool {
return util.StringsContain(c.Target, m)
}

// Loki is the root datastructure for Loki.
type Loki struct {
Cfg Config
Expand Down Expand Up @@ -231,7 +238,7 @@ func newDefaultConfig() *Config {

// Run starts Loki running, and blocks until a Loki stops.
func (t *Loki) Run() error {
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target...)
if err != nil {
return err
}
Expand Down Expand Up @@ -393,7 +400,7 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.Cfg.Target == Querier || t.Cfg.Target == Ruler {
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) {
deps[Store] = append(deps[Store], IngesterQuerier)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
return nil, err
}

if t.Cfg.Target != All {
if !t.Cfg.isModuleEnabled(All) {
logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
}

Expand Down Expand Up @@ -291,8 +291,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {

if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
switch t.Cfg.Target {
case Ingester:
switch true {
case t.Cfg.isModuleEnabled(Ingester):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory.
Expand All @@ -308,7 +308,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
},
}
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case Querier, Ruler:
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
Expand All @@ -324,8 +324,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {

if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch t.Cfg.Target {
case Querier, Ruler:
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
Expand All @@ -335,7 +335,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
)
case All:
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
Expand Down Expand Up @@ -498,7 +498,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}
Expand Down