From 07ea1ed99c4ef10cbafc4212f589500afa85c59d Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 3 Nov 2021 14:14:17 -0600 Subject: [PATCH] Add common ring configuration (#4617) * add ring section to common config Co-Authored-By: Dylan Guedes * fix compactor test to append loopback interface it ifnames in test if present * add loopback interface to compactor ring iface names * clean-up from PR review Co-authored-by: Dylan Guedes * only appy ingester config to rings with no non-default properties Co-authored-by: Dylan Guedes --- pkg/loki/common/common.go | 8 +- pkg/loki/config_wrapper.go | 245 ++++++++++++------ pkg/loki/config_wrapper_test.go | 223 ++++++++++++++-- .../shipper/compactor/compactor_test.go | 5 + pkg/util/ring_config.go | 21 ++ 5 files changed, 402 insertions(+), 100 deletions(-) diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index f791cac2fe24..dc872c88f645 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -8,13 +8,15 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/gcp" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/openstack" + "github.com/grafana/loki/pkg/util" ) // Config holds common config that can be shared between multiple other config sections type Config struct { - PathPrefix string `yaml:"path_prefix"` - Storage Storage `yaml:"storage"` - PersistTokens bool `yaml:"persist_tokens"` + PathPrefix string `yaml:"path_prefix"` + Storage Storage `yaml:"storage"` + PersistTokens bool `yaml:"persist_tokens"` + Ring util.RingConfig `yaml:"ring"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index c29e49ff69dd..56ef7262e5b0 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/cfg" loki_storage "github.com/grafana/loki/pkg/storage" @@ -78,11 +79,15 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { r.QueryScheduler.UseSchedulerRing = true } - applyPathPrefixDefaults(r, defaults) - if err := applyIngesterRingConfig(r, &defaults); err != nil { + applyPathPrefixDefaults(r, &defaults) + + applyDynamicRingConfigs(r, &defaults) + + appendLoopbackInterface(r, &defaults) + + if err := applyTokensFilePath(r); err != nil { return err } - applyMemberlistConfig(r) if err := applyStorageConfig(r, &defaults); err != nil { return err @@ -99,85 +104,138 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } } -// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else -// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically -// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's -// and is the most central ring config for Loki. +// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly. // -// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append -// the loopback interface at the end of it. -func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error { - if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { - appendLoopbackInterface(r) +// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring, +// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed +// again after the dynamic config has been applied, so will take higher precedence. +// 2. If no explicit ring config is set, use the common ring configured if provided. +// 3. If no common ring was provided, use the memberlist config if provided. +// 4. If no common ring or memberlist were provided, use the ingester's ring configuration. + +// When using the ingester or common ring config, the loopback interface will be appended to the end of +// the list of default interface names +func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { + if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { + // common ring is provided, use that for all rings, merging with + // any specific configs provided for each ring + applyConfigToRings(r, defaults, r.Common.Ring, true) + return } - lc := r.Ingester.LifecyclerConfig - rc := r.Ingester.LifecyclerConfig.RingConfig - s := rc.KVStore.Store - sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig - - f, err := tokensFile(r, "ingester.tokens") - if err != nil { - return err + // common ring not set, so use memberlist for all rings if set + if len(r.MemberlistKV.JoinMembers) > 0 { + applyMemberlistConfig(r) + } else { + // neither common ring nor memberlist set, use ingester ring configuration for all rings + // that have not been configured. Don't merge any ingester ring configurations for rings + // that deviate from the default in any way. + ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig) + applyConfigToRings(r, defaults, ingesterRingCfg, false) } - r.Ingester.LifecyclerConfig.TokensFilePath = f +} - // This gets ugly because we use a separate struct for configuring each ring... +//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured. +//`mergeWithExisting` will be true when applying the common config, false when applying the ingester +//config. This decision was made since the ingester ring copying behavior is likely to be less intuitive, +//and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester +//ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has +//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified +//derivations, with the derivations taking precedence. +func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) { + //Ingester - mergeWithExisting is false when applying the ingester config, and we only want to + //change ingester ring values when applying the common config, so there's no need for the DeepEqual + //check here. + if mergeWithExisting { + r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore + r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod + r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout + r.Ingester.LifecyclerConfig.TokensFilePath = rc.TokensFilePath + r.Ingester.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Ingester.LifecyclerConfig.ID = rc.InstanceID + r.Ingester.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames + r.Ingester.LifecyclerConfig.Port = rc.InstancePort + r.Ingester.LifecyclerConfig.Addr = rc.InstanceAddr + r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone + r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort + r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod + } // Distributor - r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.Distributor.DistributorRing.InstancePort = lc.Port - r.Distributor.DistributorRing.InstanceAddr = lc.Addr - r.Distributor.DistributorRing.InstanceID = lc.ID - r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames - r.Distributor.DistributorRing.KVStore.Store = s - r.Distributor.DistributorRing.KVStore.StoreConfig = sc + if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) { + r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.Distributor.DistributorRing.InstancePort = rc.InstancePort + r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr + r.Distributor.DistributorRing.InstanceID = rc.InstanceID + r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store + r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } // Ruler - r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod - r.Ruler.Ring.InstancePort = lc.Port - r.Ruler.Ring.InstanceAddr = lc.Addr - r.Ruler.Ring.InstanceID = lc.ID - r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames - r.Ruler.Ring.NumTokens = lc.NumTokens - r.Ruler.Ring.KVStore.Store = s - r.Ruler.Ring.KVStore.StoreConfig = sc + if mergeWithExisting || reflect.DeepEqual(r.Ruler.Ring, defaults.Ruler.Ring) { + r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Ruler.Ring.InstancePort = rc.InstancePort + r.Ruler.Ring.InstanceAddr = rc.InstanceAddr + r.Ruler.Ring.InstanceID = rc.InstanceID + r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Ruler.Ring.KVStore.Store = rc.KVStore.Store + r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig + } // Query Scheduler - r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.QueryScheduler.SchedulerRing.InstancePort = lc.Port - r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr - r.QueryScheduler.SchedulerRing.InstanceID = lc.ID - r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames - r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone - r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryScheduler.SchedulerRing.KVStore.Store = s - r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc - f, err = tokensFile(r, "scheduler.tokens") + if mergeWithExisting || reflect.DeepEqual(r.QueryScheduler.SchedulerRing, defaults.QueryScheduler.SchedulerRing) { + r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort + r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr + r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID + r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone + r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store + r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } + + // Compactor + if mergeWithExisting || reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) { + r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod + r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort + r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr + r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID + r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone + r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store + r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig + } +} + +func applyTokensFilePath(cfg *ConfigWrapper) error { + // Ingester + f, err := tokensFile(cfg, "ingester.tokens") if err != nil { return err } - r.QueryScheduler.SchedulerRing.TokensFilePath = f + cfg.Ingester.LifecyclerConfig.TokensFilePath = f // Compactor - r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout - r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod - r.CompactorConfig.CompactorRing.InstancePort = lc.Port - r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr - r.CompactorConfig.CompactorRing.InstanceID = lc.ID - r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames - r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone - r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.CompactorConfig.CompactorRing.KVStore.Store = s - r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc - f, err = tokensFile(r, "compactor.tokens") + f, err = tokensFile(cfg, "compactor.tokens") + if err != nil { + return err + } + cfg.CompactorConfig.CompactorRing.TokensFilePath = f + + // Query Scheduler + f, err = tokensFile(cfg, "scheduler.tokens") if err != nil { return err } - r.CompactorConfig.CompactorRing.TokensFilePath = f + cfg.QueryScheduler.SchedulerRing.TokensFilePath = f + return nil } @@ -196,7 +254,7 @@ func tokensFile(cfg *ConfigWrapper, file string) (string, error) { return cfg.Common.PathPrefix + "/" + file, nil } -func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { +func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { if r.Common.PathPrefix != "" { prefix := strings.TrimSuffix(r.Common.PathPrefix, "/") @@ -214,26 +272,53 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { } } -func appendLoopbackInterface(r *ConfigWrapper) { - if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil { - r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface) - r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface) +// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring, +// v2 frontend, and common ring config unless an explicit list of names was provided. +func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { + loopbackIface, err := loki_net.LoopbackInterfaceName() + if err != nil { + return + } + + if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { + cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) { + cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Distributor.DistributorRing.InstanceInterfaceNames, defaults.Distributor.DistributorRing.InstanceInterfaceNames) { + cfg.Distributor.DistributorRing.InstanceInterfaceNames = append(cfg.Distributor.DistributorRing.InstanceInterfaceNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) { + cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) { + cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaults.QueryScheduler.SchedulerRing.InstanceInterfaceNames) { + cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames = append(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, loopbackIface) + } + + if reflect.DeepEqual(cfg.Ruler.Ring.InstanceInterfaceNames, defaults.Ruler.Ring.InstanceInterfaceNames) { + cfg.Ruler.Ring.InstanceInterfaceNames = append(cfg.Ruler.Ring.InstanceInterfaceNames, loopbackIface) } } -// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist -// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the -// memberlist configuration section, they probably want to be using memberlist for all their ring configurations. -// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor), -// it seems harmless to take a guess at better defaults here. +// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. +// The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist +// for all their ring configurations. Since a user can still explicitly override a specific ring configuration +// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here. func applyMemberlistConfig(r *ConfigWrapper) { - if len(r.MemberlistKV.JoinMembers) > 0 { - r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr - r.Distributor.DistributorRing.KVStore.Store = memberlistStr - r.Ruler.Ring.KVStore.Store = memberlistStr - r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr - r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr - } + r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr + r.Distributor.DistributorRing.KVStore.Store = memberlistStr + r.Ruler.Ring.KVStore.Store = memberlistStr + r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr + r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 4081b4f53f92..fb84e5ed863a 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -27,6 +27,13 @@ import ( loki_net "github.com/grafana/loki/pkg/util/net" ) +// Can't use a totally empty yaml file or it causes weird behavior in the unmarshalling. +const minimalConfig = `--- +schema_config: + configs: + - from: 2021-08-01 + schema: v11` + func configWrapperFromYAML(t *testing.T, configFileString string, args []string) (ConfigWrapper, ConfigWrapper, error) { config := ConfigWrapper{} fs := flag.NewFlagSet(t.Name(), flag.PanicOnError) @@ -897,17 +904,6 @@ query_range: }) } -// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling. -const minimalConfig = `--- -schema_config: - configs: - - from: 2021-08-01 - schema: v11 - -memberlist: - join_members: - - loki.loki-dev-single-binary.svc.cluster.local` - func TestDefaultUnmarshal(t *testing.T) { t.Run("with a minimal config file and no command line args, defaults are use", func(t *testing.T) { file, err := ioutil.TempFile("", "config.yaml") @@ -932,12 +928,15 @@ func TestDefaultUnmarshal(t *testing.T) { } func Test_applyIngesterRingConfig(t *testing.T) { - t.Run("Attempt to catch changes to a RingConfig", func(t *testing.T) { msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." - assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) - assert.Equal(t, 12, reflect.TypeOf(util.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) + assert.Equal(t, 8, + reflect.TypeOf(distributor.RingConfig{}).NumField(), + fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 12, + reflect.TypeOf(util.RingConfig{}).NumField(), + fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) }) t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) { @@ -967,7 +966,7 @@ common: assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath) }) - t.Run("common config ignored if actual values set", func(t *testing.T) { + t.Run("ingester config not applied to other rings if actual values set", func(t *testing.T) { yamlContent := ` ingester: lifecycler: @@ -990,6 +989,45 @@ common: assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath) }) + t.Run("ingester ring configuration is used for other rings when no common ring or memberlist config is provided", func(t *testing.T) { + yamlContent := ` +ingester: + lifecycler: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("memberlist configuration takes precedence over copying ingester config", func(t *testing.T) { + yamlContent := ` +memberlist: + join_members: + - 127.0.0.1 +ingester: + lifecycler: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + + assert.Equal(t, "memberlist", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "memberlist", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "memberlist", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "memberlist", config.CompactorConfig.CompactorRing.KVStore.Store) + }) } func TestRingInterfaceNames(t *testing.T) { @@ -1001,13 +1039,14 @@ func TestRingInterfaceNames(t *testing.T) { config, _, err := configWrapperFromYAML(t, minimalConfig, []string{}) assert.NoError(t, err) + assert.Contains(t, config.Common.Ring.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.Ingester.LifecyclerConfig.InfNames, defaultIface) assert.Contains(t, config.Distributor.DistributorRing.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaultIface) assert.Contains(t, config.Ruler.Ring.InstanceInterfaceNames, defaultIface) }) - t.Run("if ingestor interface is set, it overrides other rings default interfaces", func(t *testing.T) { + t.Run("if ingester interface is set, it overrides other rings default interfaces", func(t *testing.T) { yamlContent := `ingester: lifecycler: interface_names: @@ -1075,10 +1114,12 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, defaultIface) - t.Run("by default, loopback should be in FrontendV2 interface names", func(t *testing.T) { + t.Run("when using common or ingester ring configs, loopback should be added to interface names", func(t *testing.T) { config, _, err := configWrapperFromYAML(t, minimalConfig, []string{}) assert.NoError(t, err) assert.Equal(t, []string{"eth0", "en0", defaultIface}, config.Frontend.FrontendV2.InfNames) + assert.Equal(t, []string{"eth0", "en0", defaultIface}, config.Ingester.LifecyclerConfig.InfNames) + assert.Equal(t, []string{defaultIface}, config.Common.Ring.InstanceInterfaceNames) }) t.Run("loopback shouldn't be in FrontendV2 interface names if set by user", func(t *testing.T) { @@ -1117,3 +1158,151 @@ func Test_tokensFile(t *testing.T) { }) } } + +func TestCommonRingConfigSection(t *testing.T) { + t.Run("if only common ring is provided, reuse it for all rings", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if common ring is provided, reuse it for all rings that aren't explicitly set", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: etcd +ingester: + lifecycler: + ring: + kvstore: + store: inmemory` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if only ingester ring is provided, reuse it for all rings", func(t *testing.T) { + yamlContent := `ingester: + lifecycler: + ring: + kvstore: + store: etcd` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("if a ring is explicitly configured, don't override any part of it with ingester config", func(t *testing.T) { + yamlContent := ` +distributor: + ring: + kvstore: + store: etcd +ingester: + lifecycler: + heartbeat_period: 5m + ring: + kvstore: + store: inmemory` + + config, defaults, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, defaults.Distributor.DistributorRing.HeartbeatPeriod, config.Distributor.DistributorRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.QueryScheduler.SchedulerRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) + }) + + t.Run("if a ring is explicitly configured, merge common config with unconfigured parts of explicitly configured ring", func(t *testing.T) { + yamlContent := ` +common: + ring: + heartbeat_period: 5m + kvstore: + store: inmemory +distributor: + ring: + kvstore: + store: etcd` + + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Distributor.DistributorRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.QueryScheduler.SchedulerRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) + }) + + t.Run("ring configs provided via command line take precedence", func(t *testing.T) { + yamlContent := `common: + ring: + kvstore: + store: consul` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{ + "--distributor.ring.store", "etcd", + }) + assert.NoError(t, err) + assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "consul", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + }) + + t.Run("common ring config take precedence over common memberlist config", func(t *testing.T) { + yamlContent := `memberlist: + join_members: + - 127.0.0.1 +common: + ring: + kvstore: + store: consul` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, "consul", config.Distributor.DistributorRing.KVStore.Store) + assert.Equal(t, "consul", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) + assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) + assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) + assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + }) +} diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index 88208ce3b12a..b10bd78fbb74 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + loki_net "github.com/grafana/loki/pkg/util/net" ) func setupTestCompactor(t *testing.T, tempDir string) *Compactor { @@ -24,6 +25,10 @@ func setupTestCompactor(t *testing.T, tempDir string) *Compactor { cfg.SharedStoreType = "filesystem" cfg.RetentionEnabled = false + if loopbackIFace, err := loki_net.LoopbackInterfaceName(); err == nil { + cfg.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorRing.InstanceInterfaceNames, loopbackIFace) + } + require.NoError(t, cfg.Validate()) c, err := NewCompactor(cfg, storage.Config{FSConfig: local.FSConfig{Directory: tempDir}}, loki_storage.SchemaConfig{}, nil, nil) diff --git a/pkg/util/ring_config.go b/pkg/util/ring_config.go index a0ae0e6e3e30..111caf8b0c27 100644 --- a/pkg/util/ring_config.go +++ b/pkg/util/ring_config.go @@ -82,6 +82,27 @@ func (cfg *RingConfig) ToLifecyclerConfig(numTokens int) (ring.BasicLifecyclerCo }, nil } +func CortexLifecyclerConfigToRingConfig(cfg ring.LifecyclerConfig) RingConfig { + return RingConfig{ + KVStore: kv.Config{ + Store: cfg.RingConfig.KVStore.Store, + Prefix: cfg.RingConfig.KVStore.Prefix, + StoreConfig: cfg.RingConfig.KVStore.StoreConfig, + }, + HeartbeatPeriod: cfg.HeartbeatPeriod, + HeartbeatTimeout: cfg.RingConfig.HeartbeatTimeout, + TokensFilePath: cfg.TokensFilePath, + ZoneAwarenessEnabled: cfg.RingConfig.ZoneAwarenessEnabled, + InstanceID: cfg.ID, + InstanceInterfaceNames: cfg.InfNames, + InstancePort: cfg.Port, + InstanceAddr: cfg.Addr, + InstanceZone: cfg.Zone, + ListenPort: cfg.ListenPort, + ObservePeriod: cfg.ObservePeriod, + } +} + func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config { rc := ring.Config{} flagext.DefaultValues(&rc)