Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add common ring configuration #4617

Merged
merged 5 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)

// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
Ring util.RingConfig `yaml:"ring"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down
226 changes: 150 additions & 76 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"

loki_storage "github.com/grafana/loki/pkg/storage"
Expand Down Expand Up @@ -76,11 +77,13 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
r.QueryScheduler.UseSchedulerRing = true
}

applyPathPrefixDefaults(r, defaults)
if err := applyIngesterRingConfig(r, &defaults); err != nil {
applyPathPrefixDefaults(r, &defaults)
Copy link
Contributor

@DylanGuedes DylanGuedes Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, since applyPathPrefixDefaults is being invoked before applyDynamicRingConfigs, the check reflect.DeepEqual(r.CompactorConfig.CompactorRing, defaults.CompactorConfig.CompactorRing) will always be false I think

edit: my bad, I meant applyTokensFilePath instead of applyPathPrefixDefaults

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does applyPathPrefixDefaults modify the compactor's ring? I see it modifying the working directory, but I'm missing the impact on the ring?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I completely confused the function names 😂 I was referring to applyTokensFilePath


applyDynamicRingConfigs(r, &defaults)

if err := applyTokensFilePath(r); err != nil {
return err
}
applyMemberlistConfig(r)

if err := applyStorageConfig(r, &defaults); err != nil {
return err
Expand All @@ -96,85 +99,141 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}

// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else
// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically
// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's
// and is the most central ring config for Loki.
// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly.
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
appendLoopbackInterface(r)
// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be explicit > common > memberlist. Using the ingester's ring was a stopgap put in place recently and meant to accomplish what the new common.ring will, so it's not something we'll need to retain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I can imagine users being confused due to their different rings not using default values listed in the documentation (instead, such rings would be using the ingester config, which they might not even realize).

// that config will prevail. This rule is enforced by the fact that the config file and command line args are parsed
// again after the dynamic config has been applied, so will take higher precedence.
// 2. If no explicit ring config is set, use the common ring configured if provided.
// 3. If no common ring was provided, use the memberlist config if provided.
// 4. If no common ring or memberlist were provided, use the ingester's ring configuration.

// When using the ingester or common ring config, the loopback interface will be appended to the end of
// the list of default interface names
func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
if reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) {
// common ring not set, so use memberlist for all rings if set
if len(r.MemberlistKV.JoinMembers) > 0 {
applyMemberlistConfig(r)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
} else {
// neither common ring nor memberlist set, use ingester ring configuration for all rings
useIngesterRingConfig(r, defaults)
}
} else {
// common ring is provided, use that for all rings
useCommonRingConfig(r, defaults)
}
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved

lc := r.Ingester.LifecyclerConfig
rc := r.Ingester.LifecyclerConfig.RingConfig
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig
}

f, err := tokensFile(r, "ingester.tokens")
if err != nil {
return err
}
r.Ingester.LifecyclerConfig.TokensFilePath = f
func useCommonRingConfig(r, defaults *ConfigWrapper) {
// if the default ingester config is detected, then it's safe to override with the common config
shouldOverrideIngester := reflect.DeepEqual(r.Ingester.LifecyclerConfig, defaults.Ingester.LifecyclerConfig)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen-d per your comment on @DylanGuedes's original PR, would you like to see this DeepEqual check applied to all ring overrides? The reason why we may not need to do that is because after this ApplyDynamicConfig function runs, we the parse the config file and command line args again. So, if someone is not using the default, they must have provided configs in one of those 2 places, and thus they will get applied and overrides any dynamic configs we applied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I don't think it's necessary 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, since in such a scenario the configs would be applied again, maybe this whole shouldOverrideIngester isn't necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, and just always override the ingester. that's a good point. I believe we have test coverage for that case so I'll try it out


// append the loopback after checking if the default is enabled, as it mutates the current
appendLoopbackInterface(r, defaults)

numTokens := 128 //this is the default used by the ingester and ruler
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen-d per your comment on @DylanGuedes's original PR, do you think we should use 512 here instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be overwriting these at all. Extra fields (i.e. num_tokens) can still be reconfigured by setting the ingester ring tokens manually, but I don't think we should be overriding them here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so just leave that one blank for now, I'm in agreement with that.

applyConfigToRings(r, r.Common.Ring, numTokens, shouldOverrideIngester)
}

// This gets ugly because we use a separate struct for configuring each ring...
//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured.
func applyConfigToRings(r *ConfigWrapper, rc util.RingConfig, defaultNumTokens int, overrideIngester bool) {
//Ingester
if overrideIngester {
r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ingester.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ingester.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
r.Ingester.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Ingester.LifecyclerConfig.ID = rc.InstanceID
r.Ingester.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
r.Ingester.LifecyclerConfig.Port = rc.InstancePort
r.Ingester.LifecyclerConfig.Addr = rc.InstanceAddr
r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone
r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort
r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}

// Distributor
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = lc.Port
r.Distributor.DistributorRing.InstanceAddr = lc.Addr
r.Distributor.DistributorRing.InstanceID = lc.ID
r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames
r.Distributor.DistributorRing.KVStore.Store = s
r.Distributor.DistributorRing.KVStore.StoreConfig = sc
r.Distributor.DistributorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = rc.InstancePort
r.Distributor.DistributorRing.InstanceAddr = rc.InstanceAddr
r.Distributor.DistributorRing.InstanceID = rc.InstanceID
r.Distributor.DistributorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Distributor.DistributorRing.KVStore.Store = rc.KVStore.Store
r.Distributor.DistributorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig

// Ruler
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = lc.Port
r.Ruler.Ring.InstanceAddr = lc.Addr
r.Ruler.Ring.InstanceID = lc.ID
r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames
r.Ruler.Ring.NumTokens = lc.NumTokens
r.Ruler.Ring.KVStore.Store = s
r.Ruler.Ring.KVStore.StoreConfig = sc
r.Ruler.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = rc.InstancePort
r.Ruler.Ring.InstanceAddr = rc.InstanceAddr
r.Ruler.Ring.InstanceID = rc.InstanceID
r.Ruler.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Ruler.Ring.NumTokens = defaultNumTokens
r.Ruler.Ring.KVStore.Store = rc.KVStore.Store
r.Ruler.Ring.KVStore.StoreConfig = rc.KVStore.StoreConfig

// Query Scheduler
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = lc.Port
r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr
r.QueryScheduler.SchedulerRing.InstanceID = lc.ID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames
r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = rc.InstancePort
r.QueryScheduler.SchedulerRing.InstanceAddr = rc.InstanceAddr
r.QueryScheduler.SchedulerRing.InstanceID = rc.InstanceID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.QueryScheduler.SchedulerRing.InstanceZone = rc.InstanceZone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "scheduler.tokens")
r.QueryScheduler.SchedulerRing.KVStore.Store = rc.KVStore.Store
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = rc.KVStore.StoreConfig

// Compactor
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = rc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = rc.InstancePort
r.CompactorConfig.CompactorRing.InstanceAddr = rc.InstanceAddr
r.CompactorConfig.CompactorRing.InstanceID = rc.InstanceID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.CompactorConfig.CompactorRing.InstanceZone = rc.InstanceZone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = rc.KVStore.Store
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = rc.KVStore.StoreConfig
}

// useIngesterRingConfig will override all other rings to use the ingester's ring configuration
func useIngesterRingConfig(r, defaults *ConfigWrapper) {
appendLoopbackInterface(r, defaults)

ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig)
numTokens := r.Ingester.LifecyclerConfig.NumTokens

// we're using the ingester config here, so never need to override it
Copy link
Contributor

@DylanGuedes DylanGuedes Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I was thinking since you are passing the ingester, wouldn't it be fine to allow overriding the ingester? In such a scenario you'd be overriding the ingester with the ingester itself, so looks fine.

edit: In conclusion, maybe we can remove the whole third param

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like that idea, I'm going to look into removing the whole third param

applyConfigToRings(r, ingesterRingCfg, numTokens, false)
}

func applyTokensFilePath(cfg *ConfigWrapper) error {
// Ingester
f, err := tokensFile(cfg, "ingester.tokens")
if err != nil {
return err
}
r.QueryScheduler.SchedulerRing.TokensFilePath = f
cfg.Ingester.LifecyclerConfig.TokensFilePath = f

// Compactor
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = lc.Port
r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr
r.CompactorConfig.CompactorRing.InstanceID = lc.ID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames
r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = s
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "compactor.tokens")
f, err = tokensFile(cfg, "compactor.tokens")
if err != nil {
return err
}
cfg.CompactorConfig.CompactorRing.TokensFilePath = f

// Query Scheduler
f, err = tokensFile(cfg, "scheduler.tokens")
if err != nil {
return err
}
r.CompactorConfig.CompactorRing.TokensFilePath = f
cfg.QueryScheduler.SchedulerRing.TokensFilePath = f

return nil
}

Expand All @@ -193,7 +252,7 @@ func tokensFile(cfg *ConfigWrapper, file string) (string, error) {
return cfg.Common.PathPrefix + "/" + file, nil
}

func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(r.Common.PathPrefix, "/")

Expand All @@ -211,26 +270,41 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
}
}

func appendLoopbackInterface(r *ConfigWrapper) {
if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil {
r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface)
r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface)
// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring,
// v2 frontend, and common ring config unless an explicit list of names was provided.
func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
loopbackIface, err := loki_net.LoopbackInterfaceName()
if err != nil {
return
}

if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) {
cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) {
cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, defaults.CompactorConfig.CompactorRing.InstanceInterfaceNames) {
cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames = append(cfg.CompactorConfig.CompactorRing.InstanceInterfaceNames, loopbackIface)
}
}

// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.
// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor),
// it seems harmless to take a guess at better defaults here.
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
// The idea here is that if a user explicitly configured the memberlist configuration section, they probably want to be using memberlist
// for all their ring configurations. Since a user can still explicitly override a specific ring configuration
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved
// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here.
func applyMemberlistConfig(r *ConfigWrapper) {
if len(r.MemberlistKV.JoinMembers) > 0 {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
Loading