Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add common ring configuration #4617

Merged
merged 5 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)

// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
Ring util.RingConfig `yaml:"ring"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down
245 changes: 165 additions & 80 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"

loki_storage "github.com/grafana/loki/pkg/storage"
Expand Down Expand Up @@ -76,11 +77,15 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
r.QueryScheduler.UseSchedulerRing = true
}

applyPathPrefixDefaults(r, defaults)
if err := applyIngesterRingConfig(r, &defaults); err != nil {
applyPathPrefixDefaults(r, &defaults)
Copy link
Contributor

@DylanGuedes DylanGuedes Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, since applyPathPrefixDefaults is being invoked before applyDynamicRingConfigs, the check reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) will always be false I think

edit: my bad, I meant applyTokensFilePath instead of applyPathPrefixDefaults

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does applyPathPrefixDefaults modify the compactor's ring? I see it modifying the working directory, but I'm missing the impact on the ring?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I completely confused the function names 😂 I was referring to applyTokensFilePath


applyDynamicRingConfigs(r, &defaults)

appendLoopbackInterface(r, &defaults)

if err := applyTokensFilePath(r); err != nil {
return err
}
applyMemberlistConfig(r)

if err := applyStorageConfig(r, &defaults); err != nil {
return err
Expand All @@ -96,85 +101,138 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}

// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else
// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically
// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's
// and is the most central ring config for Loki.
// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly.
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
appendLoopbackInterface(r)
// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be explicit > common > memberlist. Using the ingester's ring was a stopgap put in place recently and meant to accomplish what the new common.ring will, so it's not something we'll need to retain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I can imagine users being confused due to their different rings not using default values listed in the documentation (instead, such rings would be using the ingester config, which they might not even realize).

// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed
// again after the dynamic config has been applied, so will take higher precedence.
// 2. If no explicit ring config is set, use the common ring configured if provided.
// 3. If no common ring was provided, use the memberlist config if provided.
// 4. If no common ring or memberlist were provided, use the ingester's ring configuration.

// When using the ingester or common ring config, the loopback interface will be appended to the end of
// the list of default interface names
func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) {
// common ring is provided, use that for all rings, merging with
// any specific configs provided for each ring
applyConfigToRings(r, defaults, r.Common.Ring, true)
return
}

lc := r.Ingester.LifecyclerConfig
rc := r.Ingester.LifecyclerConfig.RingConfig
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig

f, err := tokensFile(r, "ingester.tokens")
if err != nil {
return err
// common ring not set, so use memberlist for all rings if set
if len(r.MemberlistKV.JoinMembers) > 0 {
applyMemberlistConfig(r)
} else {
// neither common ring nor memberlist set, use ingester ring configuration for all rings
// that have not been configured. Don't merge any ingester ring configurations for rings
// that deviate from the default in any way.
ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig)
applyConfigToRings(r, defaults, ingesterRingCfg, false)
}
r.Ingester.LifecyclerConfig.TokensFilePath = f
}

// This gets ugly because we use a separate struct for configuring each ring...
//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured.
//`mergeWithExisting` will be true when applying the common config, false when applying the ingester
//config. This decision was made since the ingester ring copying behavior is likely to be less intuitive,
//and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester
//ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has
//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified
//derivations, with the derivations taking precedence.
func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) {
//Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
//change ingester ring values when applying the common config, so there's no need for the DeepEqual
//check here.
if mergeWithExisting {
r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ingester.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
r.Ingester.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Ingester.LifecyclerConfig.ID = rc.InstanceID
r.Ingester.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
r.Ingester.LifecyclerConfig.Port = rc.InstancePort
r.Ingester.LifecyclerConfig.Addr = rc.InstanceAddr
r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone
r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort
r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}

// Distributor
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = lc.Port
r.Distributor.DistributorRing.InstanceAddr = lc.Addr
r.Distributor.DistributorRing.InstanceID = lc.ID
r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames
r.Distributor.DistributorRing.KVStore.Store = s
r.Distributor.DistributorRing.KVStore.StoreConfig = sc
if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) {
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = rc.InstancePort
r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr
r.Distributor.DistributorRing.InstanceID = rc.InstanceID
r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store
r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Ruler
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = lc.Port
r.Ruler.Ring.InstanceAddr = lc.Addr
r.Ruler.Ring.InstanceID = lc.ID
r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames
r.Ruler.Ring.NumTokens = lc.NumTokens
r.Ruler.Ring.KVStore.Store = s
r.Ruler.Ring.KVStore.StoreConfig = sc
if mergeWithExisting || reflect.DeepEqual(r.Ruler.Ring, defaults.Ruler.Ring) {
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = rc.InstancePort
r.Ruler.Ring.InstanceAddr = rc.InstanceAddr
r.Ruler.Ring.InstanceID = rc.InstanceID
r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Ruler.Ring.KVStore.Store = rc.KVStore.Store
r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Query Scheduler
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = lc.Port
r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr
r.QueryScheduler.SchedulerRing.InstanceID = lc.ID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames
r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "scheduler.tokens")
if mergeWithExisting || reflect.DeepEqual(r.QueryScheduler.SchedulerRing, defaults.QueryScheduler.SchedulerRing) {
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort
r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr
r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// Compactor
if mergeWithExisting || reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) {
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort
r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr
r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}
}

func applyTokensFilePath(cfg *ConfigWrapper) error {
// Ingester
f, err := tokensFile(cfg, "ingester.tokens")
if err != nil {
return err
}
r.QueryScheduler.SchedulerRing.TokensFilePath = f
cfg.Ingester.LifecyclerConfig.TokensFilePath = f

// Compactor
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = lc.Port
r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr
r.CompactorConfig.CompactorRing.InstanceID = lc.ID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames
r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = s
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "compactor.tokens")
f, err = tokensFile(cfg, "compactor.tokens")
if err != nil {
return err
}
cfg.CompactorConfig.CompactorRing.TokensFilePath = f

// Query Scheduler
f, err = tokensFile(cfg, "scheduler.tokens")
if err != nil {
return err
}
r.CompactorConfig.CompactorRing.TokensFilePath = f
cfg.QueryScheduler.SchedulerRing.TokensFilePath = f

return nil
}

Expand All @@ -193,7 +251,7 @@ func tokensFile(cfg *ConfigWrapper, file string) (string, error) {
return cfg.Common.PathPrefix + "/" + file, nil
}

func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(r.Common.PathPrefix, "/")

Expand All @@ -211,26 +269,53 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
}
}

func appendLoopbackInterface(r *ConfigWrapper) {
if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil {
r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface)
r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface)
// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring,
// v2 frontend, and common ring config unless an explicit list of names was provided.
func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
loopbackIface, err := loki_net.LoopbackInterfaceName()
if err != nil {
return
}

if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) {
cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Distributor.DistributorRing.InstanceInterfaceNames, defaults.Distributor.DistributorRing.InstanceInterfaceNames) {
cfg.Distributor.DistributorRing.InstanceInterfaceNames = append(cfg.Distributor.DistributorRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) {
cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) {
cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaults.QueryScheduler.SchedulerRing.InstanceInterfaceNames) {
cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames = append(cfg.QueryScheduler.SchedulerRing.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Ruler.Ring.InstanceInterfaceNames, defaults.Ruler.Ring.InstanceInterfaceNames) {
cfg.Ruler.Ring.InstanceInterfaceNames = append(cfg.Ruler.Ring.InstanceInterfaceNames, loopbackIface)
}
}

// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.
// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor),
// it seems harmless to take a guess at better defaults here.
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
// The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist
// for all their ring configurations. Since a user can still explicitly override a specific ring configuration
// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here.
func applyMemberlistConfig(r *ConfigWrapper) {
if len(r.MemberlistKV.JoinMembers) > 0 {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
Loading