From 60a2c4e9ffede57a0a0cb00cf5026a13a460fc16 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 2 Nov 2021 16:30:51 -0600 Subject: [PATCH 1/5] add ring section to common config Co-Authored-By: Dylan Guedes --- pkg/loki/common/common.go | 8 +- pkg/loki/config_wrapper.go | 222 +++++++++++++++++++++----------- pkg/loki/config_wrapper_test.go | 185 +++++++++++++++++++++++--- pkg/util/ring_config.go | 21 +++ 4 files changed, 340 insertions(+), 96 deletions(-) diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index f791cac2fe24..dc872c88f645 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -8,13 +8,15 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/gcp" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/openstack" + "github.com/grafana/loki/pkg/util" ) // Config holds common config that can be shared between multiple other config sections type Config struct { - PathPrefix string `yaml:"path_prefix"` - Storage Storage `yaml:"storage"` - PersistTokens bool `yaml:"persist_tokens"` + PathPrefix string `yaml:"path_prefix"` + Storage Storage `yaml:"storage"` + PersistTokens bool `yaml:"persist_tokens"` + Ring util.RingConfig `yaml:"ring"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index c6519a59d79d..f719bc1aefc6 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/cfg" loki_storage "github.com/grafana/loki/pkg/storage" @@ -76,11 +77,13 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { r.QueryScheduler.UseSchedulerRing = true } - applyPathPrefixDefaults(r, defaults) - if err := applyIngesterRingConfig(r, &defaults); err != nil { + applyPathPrefixDefaults(r, &defaults) + + applyDynamicRingConfigs(r, &defaults) + + if err := applyTokensFilePath(r); err != nil { return err } - applyMemberlistConfig(r) if err := applyStorageConfig(r, &defaults); err != nil { return err @@ -96,85 +99,141 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } } -// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else -// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically -// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's -// and is the most central ring config for Loki. +// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly. // -// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append -// the loopback interface at the end of it. -func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error { - if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { - appendLoopbackInterface(r) +// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring, +// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed +// again after the dynamic config has been applied, so will take higher precedence. +// 2. If no explicit ring config is set, use the common ring configured if provided. +// 3. If no common ring was provided, use the memberlist config if provided. +// 4. If no common ring or memberlist were provided, use the ingester's ring configuration. + +// When using the ingester or common ring config, the loopback interface will be appended to the end of +// the list of default interface names +func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { + if reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { + // common ring not set, so use memberlist for all rings if set + if len(r.MemberlistKV.JoinMembers) > 0 { + applyMemberlistConfig(r) + } else { + // neither common ring nor memberlist set, use ingester ring configuration for all rings + useIngesterRingConfig(r, defaults) + } + } else { + // common ring is provided, use that for all rings + useCommonRingConfig(r, defaults) } - lc := r.Ingester.LifecyclerConfig - rc := r.Ingester.LifecyclerConfig.RingConfig - s := rc.KVStore.Store - sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig +} - f, err := tokensFile(r, "ingester.tokens") - if err != nil { - return err - } - r.Ingester.LifecyclerConfig.TokensFilePath = f +func useCommonRingConfig(r, defaults *ConfigWrapper) { + // if the default ingester config is detected, then it's safe to override with the common config + shouldOverrideIngester := reflect.DeepEqual(r.Ingester.LifecyclerConfig, defaults.Ingester.LifecyclerConfig) - // This gets ugly because we use a separate struct for configuring each ring... + // append the loopback after checking if the default is enabled, as it mutates the current + appendLoopbackInterface(r, defaults) + + numTokens := 128 //this is the default used by the ingester and ruler + applyConfigToRings(r, r.Common.Ring, numTokens, shouldOverrideIngester) +} + +//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured. +func applyConfigToRings(r *ConfigWrapper, rc util.RingConfig, defaultNumTokens int, overrideIngester bool) { + //Ingester + if overrideIngester { + r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore + r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod + r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout + r.Ingester.LifecyclerConfig.TokensFilePath = rc.TokensFilePath + r.Ingester.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Ingester.LifecyclerConfig.ID = rc.InstanceID + r.Ingester.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames + r.Ingester.LifecyclerConfig.Port = rc.InstancePort + r.Ingester.LifecyclerConfig.Addr = rc.InstanceAddr + r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone + r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort + r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod + } // Distributor r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.Distributor.DistributorRing.InstancePort = lc.Port - r.Distributor.DistributorRing.InstanceAddr = lc.Addr - r.Distributor.DistributorRing.InstanceID = lc.ID - r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames - r.Distributor.DistributorRing.KVStore.Store = s - r.Distributor.DistributorRing.KVStore.StoreConfig = sc + r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.Distributor.DistributorRing.InstancePort = rc.InstancePort + r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr + r.Distributor.DistributorRing.InstanceID = rc.InstanceID + r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store + r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig // Ruler r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod - r.Ruler.Ring.InstancePort = lc.Port - r.Ruler.Ring.InstanceAddr = lc.Addr - r.Ruler.Ring.InstanceID = lc.ID - r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames - r.Ruler.Ring.NumTokens = lc.NumTokens - r.Ruler.Ring.KVStore.Store = s - r.Ruler.Ring.KVStore.StoreConfig = sc + r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Ruler.Ring.InstancePort = rc.InstancePort + r.Ruler.Ring.InstanceAddr = rc.InstanceAddr + r.Ruler.Ring.InstanceID = rc.InstanceID + r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Ruler.Ring.NumTokens = defaultNumTokens + r.Ruler.Ring.KVStore.Store = rc.KVStore.Store + r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig // Query Scheduler r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.QueryScheduler.SchedulerRing.InstancePort = lc.Port - r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr - r.QueryScheduler.SchedulerRing.InstanceID = lc.ID - r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames - r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone + r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort + r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr + r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID + r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryScheduler.SchedulerRing.KVStore.Store = s - r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc - f, err = tokensFile(r, "scheduler.tokens") + r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store + r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + + // Compactor + r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort + r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr + r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID + r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone + r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store + r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig +} + +// useIngesterRingConfig will override all other rings to use the ingester's ring configuration +func useIngesterRingConfig(r, defaults *ConfigWrapper) { + appendLoopbackInterface(r, defaults) + + ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig) + numTokens := r.Ingester.LifecyclerConfig.NumTokens + + // we're using the ingester config here, so never need to override it + applyConfigToRings(r, ingesterRingCfg, numTokens, false) +} + +func applyTokensFilePath(cfg *ConfigWrapper) error { + // Ingester + f, err := tokensFile(cfg, "ingester.tokens") if err != nil { return err } - r.QueryScheduler.SchedulerRing.TokensFilePath = f + cfg.Ingester.LifecyclerConfig.TokensFilePath = f // Compactor - r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.CompactorConfig.CompactorRing.InstancePort = lc.Port - r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr - r.CompactorConfig.CompactorRing.InstanceID = lc.ID - r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames - r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone - r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.CompactorConfig.CompactorRing.KVStore.Store = s - r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc - f, err = tokensFile(r, "compactor.tokens") + f, err = tokensFile(cfg, "compactor.tokens") + if err != nil { + return err + } + cfg.CompactorConfig.CompactorRing.TokensFilePath = f + + // Query Scheduler + f, err = tokensFile(cfg, "scheduler.tokens") if err != nil { return err } - r.CompactorConfig.CompactorRing.TokensFilePath = f + cfg.QueryScheduler.SchedulerRing.TokensFilePath = f + return nil } @@ -193,7 +252,7 @@ func tokensFile(cfg *ConfigWrapper, file string) (string, error) { return cfg.Common.PathPrefix + "/" + file, nil } -func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { +func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { if r.Common.PathPrefix != "" { prefix := strings.TrimSuffix(r.Common.PathPrefix, "/") @@ -211,26 +270,37 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { } } -func appendLoopbackInterface(r *ConfigWrapper) { - if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil { - r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface) - r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface) +// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring, +// v2 frontend, and common ring config unless an explicit list of names was provided. +func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { + loopbackIface, err := loki_net.LoopbackInterfaceName() + if err != nil { + return + } + + if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { + cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) { + cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) { + cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface) } } -// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist -// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the -// memberlist configuration section, they probably want to be using memberlist for all their ring configurations. -// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor), -// it seems harmless to take a guess at better defaults here. +// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. +// The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist +// for all their ring configurations. Since a user can still explicitly override a specific ring configuration +// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here. func applyMemberlistConfig(r *ConfigWrapper) { - if len(r.MemberlistKV.JoinMembers) > 0 { - r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr - r.Distributor.DistributorRing.KVStore.Store = memberlistStr - r.Ruler.Ring.KVStore.Store = memberlistStr - r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr - r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr - } + r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr + r.Distributor.DistributorRing.KVStore.Store = memberlistStr + r.Ruler.Ring.KVStore.Store = memberlistStr + r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr + r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 0a103a91c5e3..710aa0cf20d1 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -27,6 +27,13 @@ import ( loki_net "github.com/grafana/loki/pkg/util/net" ) +// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling. +const minimalConfig = `--- +schema_config: + configs: + - from: 2021-08-01 + schema: v11` + func configWrapperFromYAML(t *testing.T, configFileString string, args []string) (ConfigWrapper, ConfigWrapper, error) { config := ConfigWrapper{} fs := flag.NewFlagSet(t.Name(), flag.PanicOnError) @@ -876,17 +883,6 @@ query_range: }) } -// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling. -const minimalConfig = `--- -schema_config: - configs: - - from: 2021-08-01 - schema: v11 - -memberlist: - join_members: - - loki.loki-dev-single-binary.svc.cluster.local` - func TestDefaultUnmarshal(t *testing.T) { t.Run("with a minimal config file and no command line args, defaults are use", func(t *testing.T) { file, err := ioutil.TempFile("", "config.yaml") @@ -911,12 +907,15 @@ func TestDefaultUnmarshal(t *testing.T) { } func Test_applyIngesterRingConfig(t *testing.T) { - t.Run("Attempt to catch changes to a RingConfig", func(t *testing.T) { msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." - assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) - assert.Equal(t, 12, reflect.TypeOf(util.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) + assert.Equal(t, 8, + reflect.TypeOf(distributor.RingConfig{}).NumField(), + fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 12, + reflect.TypeOf(util.RingConfig{}).NumField(), + fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) }) t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) { @@ -946,7 +945,7 @@ common: assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath) }) - t.Run("common config ignored if actual values set", func(t *testing.T) { + t.Run("ingester config not applied to other rings if actual values set", func(t *testing.T) { yamlContent := ` ingester: lifecycler: @@ -969,6 +968,45 @@ common: assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath) }) + t.Run("ingester ring configuration is used for other rings when no common ring or memberlist config is provided", func(t *testing.T) { + yamlContent := ` +ingester: + lifecycler: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("memberlist configuration takes precedence over copying ingester config", func(t *testing.T) { + yamlContent := ` +memberlist: + join_members: + - 127.0.0.1 +ingester: + lifecycler: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + + assert.Equal(t, "memberlist", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "memberlist", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "memberlist", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "memberlist", config.CompactorConfig.CompactorRing.KVStore.Store) + }) } func TestRingInterfaceNames(t *testing.T) { @@ -980,13 +1018,14 @@ func TestRingInterfaceNames(t *testing.T) { config, _, err := configWrapperFromYAML(t, minimalConfig, []string{}) assert.NoError(t, err) + assert.Contains(t, config.Common.Ring.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.Ingester.LifecyclerConfig.InfNames, defaultIface) assert.Contains(t, config.Distributor.DistributorRing.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.Ruler.Ring.InstanceInterfaceNames, defaultIface) }) - t.Run("if ingestor interface is set, it overrides other rings default interfaces", func(t *testing.T) { + t.Run("if ingester interface is set, it overrides other rings default interfaces", func(t *testing.T) { yamlContent := `ingester: lifecycler: interface_names: @@ -1054,10 +1093,12 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, defaultIface) - t.Run("by default, loopback should be in FrontendV2 interface names", func(t *testing.T) { + t.Run("when using common or ingester ring configs, loopback should be added interface names", func(t *testing.T) { config, _, err := configWrapperFromYAML(t, minimalConfig, []string{}) assert.NoError(t, err) assert.Equal(t, []string{"eth0", "en0", defaultIface}, config.Frontend.FrontendV2.InfNames) + assert.Equal(t, []string{"eth0", "en0", defaultIface}, config.Ingester.LifecyclerConfig.InfNames) + assert.Equal(t, []string{defaultIface}, config.Common.Ring.InstanceInterfaceNames) }) t.Run("loopback shouldn't be in FrontendV2 interface names if set by user", func(t *testing.T) { @@ -1096,3 +1137,113 @@ func Test_tokensFile(t *testing.T) { }) } } + +func TestCommonRingConfigSection(t *testing.T) { + t.Run("if only common ring is provided, reuse it for all rings", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if common ring is provided, reuse it for all rings that aren't explicitly set", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: etcd +ingester: + lifecycler: + ring: + kvstore: + store: inmemory` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if only ingester ring is provided, reuse it for all rings", func(t *testing.T) { + yamlContent := `ingester: + lifecycler: + ring: + kvstore: + store: etcd` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if a ring is explicitly configured, don't override it", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: consul +distributor: + ring: + kvstore: + store: etcd +ingester: + lifecycler: + ring: + kvstore: + store: inmemory` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("ring configs provided via command line take precedence", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: consul` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{ + "--distributor.ring.store", "etcd", + }) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "consul", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("common ring config take precedence over common memberlist config", func(t *testing.T) { + yamlContent := `memberlist: + join_members: + - 127.0.0.1 +common: + ring: + kvstore: + store: consul` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "consul", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "consul", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + }) +} diff --git a/pkg/util/ring_config.go b/pkg/util/ring_config.go index a0ae0e6e3e30..111caf8b0c27 100644 --- a/pkg/util/ring_config.go +++ b/pkg/util/ring_config.go @@ -82,6 +82,27 @@ func (cfg *RingConfig) ToLifecyclerConfig(numTokens int) (ring.BasicLifecyclerCo }, nil } +func CortexLifecyclerConfigToRingConfig(cfg ring.LifecyclerConfig) RingConfig { + return RingConfig{ + KVStore: kv.Config{ + Store: cfg.RingConfig.KVStore.Store, + Prefix: cfg.RingConfig.KVStore.Prefix, + StoreConfig: cfg.RingConfig.KVStore.StoreConfig, + }, + HeartbeatPeriod: cfg.HeartbeatPeriod, + HeartbeatTimeout: cfg.RingConfig.HeartbeatTimeout, + TokensFilePath: cfg.TokensFilePath, + ZoneAwarenessEnabled: cfg.RingConfig.ZoneAwarenessEnabled, + InstanceID: cfg.ID, + InstanceInterfaceNames: cfg.InfNames, + InstancePort: cfg.Port, + InstanceAddr: cfg.Addr, + InstanceZone: cfg.Zone, + ListenPort: cfg.ListenPort, + ObservePeriod: cfg.ObservePeriod, + } +} + func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config { rc := ring.Config{} flagext.DefaultValues(&rc) From afd361a6394c10166e70c6607feeeb2cdd65b293 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 2 Nov 2021 16:47:39 -0600 Subject: [PATCH 2/5] fix compactor test to append loopback interface it ifnames in test if present --- pkg/storage/stores/shipper/compactor/compactor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index 88208ce3b12a..b10bd78fbb74 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + loki_net "github.com/grafana/loki/pkg/util/net" ) func setupTestCompactor(t *testing.T, tempDir string) *Compactor { @@ -24,6 +25,10 @@ func setupTestCompactor(t *testing.T, tempDir string) *Compactor { cfg.SharedStoreType = "filesystem" cfg.RetentionEnabled = false + if loopbackIFace, err := loki_net.LoopbackInterfaceName(); err == nil { + cfg.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorRing.InstanceInterfaceNames, loopbackIFace) + } + require.NoError(t, cfg.Validate()) c, err := NewCompactor(cfg, storage.Config{FSConfig: local.FSConfig{Directory: tempDir}}, loki_storage.SchemaConfig{}, nil, nil) From b59bb0d62f13d03b0cf5324fd754e8b43e43073b Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 2 Nov 2021 16:50:58 -0600 Subject: [PATCH 3/5] add loopback interface to compactor ring iface names --- pkg/loki/config_wrapper.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index f719bc1aefc6..4d994ed65a97 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -289,6 +289,10 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) { cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface) } + + if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) { + cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface) + } } // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. From 82fe79e7cacb5842c55a4721442ff21fcdd63544 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 3 Nov 2021 08:51:06 -0600 Subject: [PATCH 4/5] clean-up from PR review Co-authored-by: Dylan Guedes --- pkg/loki/config_wrapper.go | 23 ++++++++++++----------- pkg/loki/config_wrapper_test.go | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 4d994ed65a97..5b1eb5b61b55 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -111,17 +111,18 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { // When using the ingester or common ring config, the loopback interface will be appended to the end of // the list of default interface names func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { - if reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { - // common ring not set, so use memberlist for all rings if set - if len(r.MemberlistKV.JoinMembers) > 0 { - applyMemberlistConfig(r) - } else { - // neither common ring nor memberlist set, use ingester ring configuration for all rings - useIngesterRingConfig(r, defaults) - } - } else { - // common ring is provided, use that for all rings + if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { + // common ring is provided, use that for all rings useCommonRingConfig(r, defaults) + return + } + + // common ring not set, so use memberlist for all rings if set + if len(r.MemberlistKV.JoinMembers) > 0 { + applyMemberlistConfig(r) + } else { + // neither common ring nor memberlist set, use ingester ring configuration for all rings + useIngesterRingConfig(r, defaults) } } @@ -297,7 +298,7 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. // The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist -// for all their ring configurations. Since a user can still explicitly override a specific ring configuration +// for all their ring configurations. Since a user can still explicitly override a specific ring configuration // (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here. func applyMemberlistConfig(r *ConfigWrapper) { r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 710aa0cf20d1..04193ba5b293 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -27,7 +27,7 @@ import ( loki_net "github.com/grafana/loki/pkg/util/net" ) -// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling. +// Can't use a totally empty yaml file or it causes weird behavior in the unmarshalling. const minimalConfig = `--- schema_config: configs: @@ -1093,7 +1093,7 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, defaultIface) - t.Run("when using common or ingester ring configs, loopback should be added interface names", func(t *testing.T) { + t.Run("when using common or ingester ring configs, loopback should be added to interface names", func(t *testing.T) { config, _, err := configWrapperFromYAML(t, minimalConfig, []string{}) assert.NoError(t, err) assert.Equal(t, []string{"eth0", "en0", defaultIface}, config.Frontend.FrontendV2.InfNames) From fc752a45d7027bef6bf0a6bdda8063af7c034df1 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 3 Nov 2021 13:07:46 -0600 Subject: [PATCH 5/5] only appy ingester config to rings with no non-default properties --- pkg/loki/config_wrapper.go | 142 +++++++++++++++++--------------- pkg/loki/config_wrapper_test.go | 54 ++++++++++-- 2 files changed, 122 insertions(+), 74 deletions(-) diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 5b1eb5b61b55..bd117f480756 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -81,6 +81,8 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { applyDynamicRingConfigs(r, &defaults) + appendLoopbackInterface(r, &defaults) + if err := applyTokensFilePath(r); err != nil { return err } @@ -112,8 +114,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { // the list of default interface names func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { - // common ring is provided, use that for all rings - useCommonRingConfig(r, defaults) + // common ring is provided, use that for all rings, merging with + // any specific configs provided for each ring + applyConfigToRings(r, defaults, r.Common.Ring, true) return } @@ -122,26 +125,25 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { applyMemberlistConfig(r) } else { // neither common ring nor memberlist set, use ingester ring configuration for all rings - useIngesterRingConfig(r, defaults) + // that have not been configured. Don't merge any ingester ring configurations for rings + // that deviate from the default in any way. + ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig) + applyConfigToRings(r, defaults, ingesterRingCfg, false) } - -} - -func useCommonRingConfig(r, defaults *ConfigWrapper) { - // if the default ingester config is detected, then it's safe to override with the common config - shouldOverrideIngester := reflect.DeepEqual(r.Ingester.LifecyclerConfig, defaults.Ingester.LifecyclerConfig) - - // append the loopback after checking if the default is enabled, as it mutates the current - appendLoopbackInterface(r, defaults) - - numTokens := 128 //this is the default used by the ingester and ruler - applyConfigToRings(r, r.Common.Ring, numTokens, shouldOverrideIngester) } //applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured. -func applyConfigToRings(r *ConfigWrapper, rc util.RingConfig, defaultNumTokens int, overrideIngester bool) { - //Ingester - if overrideIngester { +//`mergeWithExisting` will be true when applying the common config, false when applying the ingester +//config. This decision was made since the ingester ring copying behavior is likely to be less intuitive, +//and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester +//ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has +//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified +//derivations, with the derivations taking precedence. +func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) { + //Ingester - mergeWithExisting is false when applying the ingester config, and we only want to + //change ingester ring values when applying the common config, so there's no need for the DeepEqual + //check here. + if mergeWithExisting { r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout @@ -157,60 +159,56 @@ func applyConfigToRings(r *ConfigWrapper, rc util.RingConfig, defaultNumTokens i } // Distributor - r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod - r.Distributor.DistributorRing.InstancePort = rc.InstancePort - r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr - r.Distributor.DistributorRing.InstanceID = rc.InstanceID - r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store - r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) { + r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.Distributor.DistributorRing.InstancePort = rc.InstancePort + r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr + r.Distributor.DistributorRing.InstanceID = rc.InstanceID + r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store + r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } // Ruler - r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.Ruler.Ring.InstancePort = rc.InstancePort - r.Ruler.Ring.InstanceAddr = rc.InstanceAddr - r.Ruler.Ring.InstanceID = rc.InstanceID - r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Ruler.Ring.NumTokens = defaultNumTokens - r.Ruler.Ring.KVStore.Store = rc.KVStore.Store - r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig + if mergeWithExisting || reflect.DeepEqual(r.Ruler.Ring, defaults.Ruler.Ring) { + r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Ruler.Ring.InstancePort = rc.InstancePort + r.Ruler.Ring.InstanceAddr = rc.InstanceAddr + r.Ruler.Ring.InstanceID = rc.InstanceID + r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Ruler.Ring.KVStore.Store = rc.KVStore.Store + r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig + } // Query Scheduler - r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod - r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort - r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr - r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID - r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone - r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store - r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + if mergeWithExisting || reflect.DeepEqual(r.QueryScheduler.SchedulerRing, defaults.QueryScheduler.SchedulerRing) { + r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort + r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr + r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID + r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone + r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store + r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } // Compactor - r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod - r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort - r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr - r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID - r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone - r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store - r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig -} - -// useIngesterRingConfig will override all other rings to use the ingester's ring configuration -func useIngesterRingConfig(r, defaults *ConfigWrapper) { - appendLoopbackInterface(r, defaults) - - ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig) - numTokens := r.Ingester.LifecyclerConfig.NumTokens - - // we're using the ingester config here, so never need to override it - applyConfigToRings(r, ingesterRingCfg, numTokens, false) + if mergeWithExisting || reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) { + r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort + r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr + r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID + r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone + r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store + r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } } func applyTokensFilePath(cfg *ConfigWrapper) error { @@ -287,6 +285,10 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface) } + if reflect.DeepEqual(cfg.Distributor.DistributorRing.InstanceInterfaceNames, defaults.Distributor.DistributorRing.InstanceInterfaceNames) { + cfg.Distributor.DistributorRing.InstanceInterfaceNames = append(cfg.Distributor.DistributorRing.InstanceInterfaceNames, loopbackIface) + } + if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) { cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface) } @@ -294,6 +296,14 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) { cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface) } + + if reflect.DeepEqual(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaults.QueryScheduler.SchedulerRing.InstanceInterfaceNames) { + cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames = append(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Ruler.Ring.InstanceInterfaceNames, defaults.Ruler.Ring.InstanceInterfaceNames) { + cfg.Ruler.Ring.InstanceInterfaceNames = append(cfg.Ruler.Ring.InstanceInterfaceNames, loopbackIface) + } } // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 04193ba5b293..08414a89d85c 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -1190,28 +1190,66 @@ ingester: assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) }) - t.Run("if a ring is explicitly configured, don't override it", func(t *testing.T) { - yamlContent := `common: - ring: - kvstore: - store: consul + t.Run("if a ring is explicitly configured, don't override any part of it with ingester config", func(t *testing.T) { + yamlContent := ` distributor: ring: kvstore: store: etcd ingester: lifecycler: + heartbeat_period: 5m ring: kvstore: store: inmemory` + config, defaults, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, defaults.Distributor.DistributorRing.HeartbeatPeriod, config.Distributor.DistributorRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.QueryScheduler.SchedulerRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) + }) + + t.Run("if a ring is explicitly configured, merge common config with unconfigured parts of explicitly configured ring", func(t *testing.T) { + yamlContent := ` +common: + ring: + heartbeat_period: 5m + kvstore: + store: inmemory +distributor: + ring: + kvstore: + store: etcd` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Distributor.DistributorRing.HeartbeatPeriod) + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) - assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) - assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) - assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.QueryScheduler.SchedulerRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) }) t.Run("ring configs provided via command line take precedence", func(t *testing.T) {