Skip to content

Commit

Permalink
WIP: Implement a ring common config.
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Nov 1, 2021
1 parent 84220c4 commit 717812e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 34 deletions.
8 changes: 5 additions & 3 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)

// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
Ring util.RingConfig `yaml:"ring"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down
113 changes: 83 additions & 30 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"

Expand Down Expand Up @@ -77,9 +78,11 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}

applyPathPrefixDefaults(r, defaults)
if err := applyIngesterRingConfig(r, &defaults); err != nil {
return err
}

applyDynamicRingConfigs(r, &defaults)

applyTokensFilePath(r)

applyMemberlistConfig(r)

if err := applyStorageConfig(r, &defaults); err != nil {
Expand All @@ -96,30 +99,54 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}

// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else
// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically
// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's
// and is the most central ring config for Loki.
// applyDynamicRingConfigs checks the current config and, depending on the given values, reuse ring configs accordingly.
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
appendLoopbackInterface(r)
// 1. Gives preference to any explicit ring config set. For instance, if the user explicitly configures Distributor's ring,
// that config will prevail.
// 2. If no explicit ring config is set, reuse the common ring configured if it was provided.
// 3. If no common ring was provided, reuse whatever was set by the ingester ring.
func applyDynamicRingConfigs(r, defaults *ConfigWrapper) error {
// if common/ring is NOT set, reuse ingester ring config.
if reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) {
reuseIngesterConfig(r, defaults)
return nil
}

lc := r.Ingester.LifecyclerConfig
rc := r.Ingester.LifecyclerConfig.RingConfig
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig
return reuseCommonConfig(r, defaults)
}

f, err := tokensFile(r, "ingester.tokens")
func reuseCommonConfig(r, defaults *ConfigWrapper) error {
lifecyclerCfg, err := r.Common.Ring.ToCortexLifecyclerConfig()
if err != nil {
return err
}
r.Ingester.LifecyclerConfig.TokensFilePath = f

// This gets ugly because we use a separate struct for configuring each ring...
applyConfigToRings(r, defaults, lifecyclerCfg)
return nil
}

func reuseIngesterConfig(r, defaults *ConfigWrapper) {
appendLoopbackInterface(r, defaults)
applyConfigToRings(r, defaults, r.Ingester.LifecyclerConfig)
}

// applyConfigToRings will reuse a given LifecyclerConfig everywhere else we have a ring configured.
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyConfigToRings(r, defaults *ConfigWrapper, reuse ring.LifecyclerConfig) {
shouldOverrideIngester := reflect.DeepEqual(r.Ingester.LifecyclerConfig, defaults.Ingester.LifecyclerConfig)
appendLoopbackInterface(r, defaults)

lc := reuse
rc := reuse.RingConfig
s := rc.KVStore.Store
sc := rc.KVStore.StoreConfig

if shouldOverrideIngester {
r.Ingester.LifecyclerConfig = reuse
r.Ingester.LifecyclerConfig.RingConfig = reuse.RingConfig
}

// Distributor
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
Expand Down Expand Up @@ -153,11 +180,6 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "scheduler.tokens")
if err != nil {
return err
}
r.QueryScheduler.SchedulerRing.TokensFilePath = f

// Compactor
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
Expand All @@ -170,11 +192,30 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = s
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "compactor.tokens")
}

func applyTokensFilePath(cfg *ConfigWrapper) error {
// Ingester
f, err := tokensFile(cfg, "ingester.tokens")
if err != nil {
return err
}
cfg.Ingester.LifecyclerConfig.TokensFilePath = f

// Compactor
f, err = tokensFile(cfg, "compactor.tokens")
if err != nil {
return err
}
cfg.CompactorConfig.CompactorRing.TokensFilePath = f

// Query Scheduler
f, err = tokensFile(cfg, "scheduler.tokens")
if err != nil {
return err
}
r.CompactorConfig.CompactorRing.TokensFilePath = f
cfg.QueryScheduler.SchedulerRing.TokensFilePath = f

return nil
}

Expand Down Expand Up @@ -211,10 +252,22 @@ func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
}
}

func appendLoopbackInterface(r *ConfigWrapper) {
if loopbackIface, err := loki_net.LoopbackInterfaceName(); err == nil {
r.Ingester.LifecyclerConfig.InfNames = append(r.Ingester.LifecyclerConfig.InfNames, loopbackIface)
r.Config.Frontend.FrontendV2.InfNames = append(r.Config.Frontend.FrontendV2.InfNames, loopbackIface)
func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
loopbackIface, err := loki_net.LoopbackInterfaceName()
if err != nil {
return
}

if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Frontend.FrontendV2.InfNames, defaults.Frontend.FrontendV2.InfNames) {
cfg.Frontend.FrontendV2.InfNames = append(cfg.Config.Frontend.FrontendV2.InfNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.Ring.InstanceInterfaceNames, defaults.Common.Ring.InstanceInterfaceNames) {
cfg.Common.Ring.InstanceInterfaceNames = append(cfg.Common.Ring.InstanceInterfaceNames, loopbackIface)
}
}

Expand Down
80 changes: 79 additions & 1 deletion pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,13 +980,14 @@ func TestRingInterfaceNames(t *testing.T) {
config, _, err := configWrapperFromYAML(t, minimalConfig, []string{})
assert.NoError(t, err)

assert.Contains(t, config.Common.Ring.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.Ingester.LifecyclerConfig.InfNames, defaultIface)
assert.Contains(t, config.Distributor.DistributorRing.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.Ruler.Ring.InstanceInterfaceNames, defaultIface)
})

t.Run("if ingestor interface is set, it overrides other rings default interfaces", func(t *testing.T) {
t.Run("if ingester interface is set, it overrides other rings default interfaces", func(t *testing.T) {
yamlContent := `ingester:
lifecycler:
interface_names:
Expand Down Expand Up @@ -1096,3 +1097,80 @@ func Test_tokensFile(t *testing.T) {
})
}
}

// TODO(dylanguedes): Add missing tests scenarios.
func TestCommonRingConfigSection(t *testing.T) {
t.Run("if only common ring is provided, reuse it for all rings", func(t *testing.T) {
yamlContent := `common:
ring:
kvstore:
store: etcd`

config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
})

t.Run("if common ring is provided, reuse it for all rings that aren't explictly set", func(t *testing.T) {
yamlContent := `common:
ring:
kvstore:
store: etcd
ingester:
lifecycler:
ring:
kvstore:
store: inmemory`

config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
})

t.Run("if only ingester ring is provided, reuse it for all rings", func(t *testing.T) {
yamlContent := `ingester:
lifecycler:
ring:
kvstore:
store: etcd`
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
})

t.Run("if a ring is explicitly configured, doesn't override it", func(t *testing.T) {
yamlContent := `common:
ring:
kvstore:
store: consul
distributor:
ring:
kvstore:
store: etcd
ingester:
lifecycler:
ring:
kvstore:
store: inmemory`

config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store)
})
}
21 changes: 21 additions & 0 deletions pkg/util/ring_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ func (cfg *RingConfig) ToLifecyclerConfig(numTokens int) (ring.BasicLifecyclerCo
}, nil
}

func (cfg *RingConfig) ToCortexLifecyclerConfig() (ring.LifecyclerConfig, error) {
// TODO(dylanguedes): Make sure no important config is missed.
return ring.LifecyclerConfig{
RingConfig: ring.Config{KVStore: cfg.KVStore},
NumTokens: 128, // default ingester num tokens
HeartbeatPeriod: cfg.HeartbeatPeriod,
ObservePeriod: cfg.ObservePeriod,

// MinReadyDuration: cfg.MinReady
InfNames: cfg.InstanceInterfaceNames,
// FinalSleep: ,
TokensFilePath: cfg.TokensFilePath,
Zone: cfg.InstanceZone,
// UnregisterOnShutdown: cfg

Addr: cfg.InstanceAddr,
Port: cfg.InstancePort,
ID: cfg.InstanceID,
}, nil
}

func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config {
rc := ring.Config{}
flagext.DefaultValues(&rc)
Expand Down

0 comments on commit 717812e

Please sign in to comment.