Skip to content

Commit

Permalink
Add common ring configuration (#4617)
Browse files Browse the repository at this point in the history
* add ring section to common config

Co-Authored-By: Dylan Guedes <djmgguedes@gmail.com>

* fix compactor test to append loopback interface it ifnames in test if present

* add loopback interface to compactor ring iface names

* clean-up from PR review

Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>

* only appy ingester config to rings with no non-default properties

Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
  • Loading branch information
trevorwhitney and DylanGuedes authored Nov 3, 2021
1 parent 0d66319 commit 07ea1ed
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 100 deletions.
8 changes: 5 additions & 3 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)

// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
Ring util.RingConfig `yaml:"ring"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down
245 changes: 165 additions & 80 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"

loki_storage "github.com/grafana/loki/pkg/storage"
Expand Down Expand Up @@ -78,11 +79,15 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
r.QueryScheduler.UseSchedulerRing = true
}

applyPathPrefixDefaults(r, defaults)
if err := applyIngesterRingConfig(r, &defaults); err != nil {
applyPathPrefixDefaults(r, &defaults)

applyDynamicRingConfigs(r, &defaults)

appendLoopbackInterface(r, &defaults)

if err := applyTokensFilePath(r); err != nil {
return err
}
applyMemberlistConfig(r)

if err := applyStorageConfig(r, &defaults); err != nil {
return err
Expand All @@ -99,85 +104,138 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}

// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else
// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically
// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's
// and is the most central ring config for Loki.
// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly.
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
appendLoopbackInterface(r)
// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring,
// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed
// again after the dynamic config has been applied, so will take higher precedence.
// 2. If no explicit ring config is set, use the common ring configured if provided.
// 3. If no common ring was provided, use the memberlist config if provided.
// 4. If no common ring or memberlist were provided, use the ingester's ring configuration.

// When using the ingester or common ring config, the loopback interface will be appended to the end of
// the list of default interface names
func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) {
// common ring is provided, use that for all rings, merging with
// any specific configs provided for each ring
applyConfigToRings(r, defaults, r.Common.Ring, true)
return
}

lc := r.Ingester.LifecyclerConfig
rc := r.Ingester.LifecyclerConfig.RingConfig
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig

f, err := tokensFile(r, "ingester.tokens")
if err != nil {
return err
// common ring not set, so use memberlist for all rings if set
if len(r.MemberlistKV.JoinMembers) > 0 {
applyMemberlistConfig(r)
} else {
// neither common ring nor memberlist set, use ingester ring configuration for all rings
// that have not been configured. Don't merge any ingester ring configurations for rings
// that deviate from the default in any way.
ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig)
applyConfigToRings(r, defaults, ingesterRingCfg, false)
}
r.Ingester.LifecyclerConfig.TokensFilePath = f
}

// This gets ugly because we use a separate struct for configuring each ring...
//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured.
//`mergeWithExisting` will be true when applying the common config, false when applying the ingester
//config. This decision was made since the ingester ring copying behavior is likely to be less intuitive,
//and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester
//ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has
//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified
//derivations, with the derivations taking precedence.
func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) {
//Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
//change ingester ring values when applying the common config, so there's no need for the DeepEqual
//check here.
if mergeWithExisting {
r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ingester.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
r.Ingester.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Ingester.LifecyclerConfig.ID = rc.InstanceID
r.Ingester.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
r.Ingester.LifecyclerConfig.Port = rc.InstancePort
r.Ingester.LifecyclerConfig.Addr = rc.InstanceAddr
r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone
r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort
r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}

// Distributor
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = lc.Port
r.Distributor.DistributorRing.InstanceAddr = lc.Addr
r.Distributor.DistributorRing.InstanceID = lc.ID
r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames
r.Distributor.DistributorRing.KVStore.Store = s
r.Distributor.DistributorRing.KVStore.StoreConfig = sc
if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) {
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = rc.InstancePort
r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr
r.Distributor.DistributorRing.InstanceID = rc.InstanceID
r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store
r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Ruler
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = lc.Port
r.Ruler.Ring.InstanceAddr = lc.Addr
r.Ruler.Ring.InstanceID = lc.ID
r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames
r.Ruler.Ring.NumTokens = lc.NumTokens
r.Ruler.Ring.KVStore.Store = s
r.Ruler.Ring.KVStore.StoreConfig = sc
if mergeWithExisting || reflect.DeepEqual(r.Ruler.Ring, defaults.Ruler.Ring) {
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = rc.InstancePort
r.Ruler.Ring.InstanceAddr = rc.InstanceAddr
r.Ruler.Ring.InstanceID = rc.InstanceID
r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Ruler.Ring.KVStore.Store = rc.KVStore.Store
r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Query Scheduler
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = lc.Port
r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr
r.QueryScheduler.SchedulerRing.InstanceID = lc.ID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames
r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "scheduler.tokens")
if mergeWithExisting || reflect.DeepEqual(r.QueryScheduler.SchedulerRing, defaults.QueryScheduler.SchedulerRing) {
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort
r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr
r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Compactor
if mergeWithExisting || reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) {
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort
r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr
r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}
}

func applyTokensFilePath(cfg *ConfigWrapper) error {
// Ingester
f, err := tokensFile(cfg, "ingester.tokens")
if err != nil {
return err
}
r.QueryScheduler.SchedulerRing.TokensFilePath = f
cfg.Ingester.LifecyclerConfig.TokensFilePath = f

// Compactor
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = lc.Port
r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr
r.CompactorConfig.CompactorRing.InstanceID = lc.ID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames
r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = s
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "compactor.tokens")
f, err = tokensFile(cfg, "compactor.tokens")
if err != nil {
return err
}
cfg.CompactorConfig.CompactorRing.TokensFilePath = f

// Query Scheduler
f, err = tokensFile(cfg, "scheduler.tokens")
if err != nil {
return err
}
r.CompactorConfig.CompactorRing.TokensFilePath = f
cfg.QueryScheduler.SchedulerRing.TokensFilePath = f

return nil
}

Expand All @@ -196,7 +254,7 @@ func tokensFile(cfg *ConfigWrapper, file string) (string, error) {
return cfg.Common.PathPrefix + "/" + file, nil
}

func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(r.Common.PathPrefix, "/")

Expand All @@ -214,26 +272,53 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
}
}

func appendLoopbackInterface(r *ConfigWrapper) {
if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil {
r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface)
r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface)
// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring,
// v2 frontend, and common ring config unless an explicit list of names was provided.
func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
loopbackIface, err := loki_net.LoopbackInterfaceName()
if err != nil {
return
}

if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) {
cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Distributor.DistributorRing.InstanceInterfaceNames, defaults.Distributor.DistributorRing.InstanceInterfaceNames) {
cfg.Distributor.DistributorRing.InstanceInterfaceNames = append(cfg.Distributor.DistributorRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) {
cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) {
cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaults.QueryScheduler.SchedulerRing.InstanceInterfaceNames) {
cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames = append(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Ruler.Ring.InstanceInterfaceNames, defaults.Ruler.Ring.InstanceInterfaceNames) {
cfg.Ruler.Ring.InstanceInterfaceNames = append(cfg.Ruler.Ring.InstanceInterfaceNames, loopbackIface)
}
}

// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.
// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor),
// it seems harmless to take a guess at better defaults here.
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
// The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist
// for all their ring configurations. Since a user can still explicitly override a specific ring configuration
// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here.
func applyMemberlistConfig(r *ConfigWrapper) {
if len(r.MemberlistKV.JoinMembers) > 0 {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
Loading

0 comments on commit 07ea1ed

Please sign in to comment.