Skip to content

Commit

Permalink
change the default application semantics of instance configs (#442)
Browse files Browse the repository at this point in the history
this commit reduces the number of places defaults are applied to
instance configs:

1. When the Agent YAML is unmarshaled
2. When the instance config is loaded from the KV store
  • Loading branch information
rfratto authored Mar 2, 2021
1 parent 0e9c20f commit 77c67d6
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 95 deletions.
45 changes: 7 additions & 38 deletions pkg/integrations/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/grafana/agent/pkg/integrations/config"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/prometheus/client_golang/prometheus/promhttp"
prom_config "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -55,7 +54,7 @@ test:

func TestConfig_AddressRelabels(t *testing.T) {
cfgText := `
agent:
agent:
enabled: true
`

Expand All @@ -82,35 +81,11 @@ agent:
require.Equal(t, result.Get("instance"), expectHostname+":12345")
}

// TestManager_ValidInstanceConfigs ensures that the instance configs
// applied to the instance manager are valid.
func TestManager_ValidInstanceConfigs(t *testing.T) {
mock := newMockIntegration()
icfg := mockConfig{integration: mock}

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error {
globalConfig := prom_config.DefaultConfig.GlobalConfig
return c.ApplyDefaults(&globalConfig, nil)
})
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
require.NoError(t, err)
defer m.Stop()

// If the config doesn't show up in ListConfigs, it wasn't valid.
test.Poll(t, time.Second, 1, func() interface{} {
return len(im.ListConfigs())
})
}

func TestManager_instanceConfigForIntegration(t *testing.T) {
mock := newMockIntegration()
icfg := mockConfig{integration: mock}

im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error {
globalConfig := prom_config.DefaultConfig.GlobalConfig
return c.ApplyDefaults(&globalConfig, nil)
})
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, nil)
require.NoError(t, err)
defer m.Stop()
Expand All @@ -129,10 +104,7 @@ func TestManager_NoIntegrationsScrape(t *testing.T) {
icfg := mockConfig{integration: mock}

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error {
globalConfig := prom_config.DefaultConfig.GlobalConfig
return c.ApplyDefaults(&globalConfig, nil)
})
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)

cfg := mockManagerConfig()
cfg.ScrapeIntegrations = false
Expand All @@ -158,10 +130,7 @@ func TestManager_NoIntegrationScrape(t *testing.T) {
mock.commonCfg.ScrapeIntegration = &noScrape

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error {
globalConfig := prom_config.DefaultConfig.GlobalConfig
return c.ApplyDefaults(&globalConfig, nil)
})
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)

m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
require.NoError(t, err)
Expand All @@ -179,7 +148,7 @@ func TestManager_StartsIntegrations(t *testing.T) {

integrations := map[Config]Integration{icfg: mock}

im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil)
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
require.NoError(t, err)
defer m.Stop()
Expand All @@ -199,7 +168,7 @@ func TestManager_RestartsIntegrations(t *testing.T) {
icfg := mockConfig{integration: mock}

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil)
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
require.NoError(t, err)
defer m.Stop()
Expand All @@ -216,7 +185,7 @@ func TestManager_GracefulStop(t *testing.T) {
icfg := mockConfig{integration: mock}

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil)
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
require.NoError(t, err)

Expand Down
6 changes: 1 addition & 5 deletions pkg/prom/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins

a.bm = instance.NewBasicManager(instance.BasicManagerConfig{
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
}, a.logger, a.newInstance, a.validateInstance)
}, a.logger, a.newInstance)

var err error
a.mm, err = instance.NewModalManager(a.reg, a.logger, a.bm, cfg.InstanceMode)
Expand Down Expand Up @@ -197,10 +197,6 @@ func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error)
return a.instanceFactory(reg, a.cfg.Global, c, a.cfg.WALDir, a.logger)
}

func (a *Agent) validateInstance(c *instance.Config) error {
return c.ApplyDefaults(&a.cfg.Global, c.RemoteWrite)
}

func (a *Agent) WireGRPC(s *grpc.Server) {
if a.cfg.ServiceConfig.Enabled {
a.ha.WireGRPC(s)
Expand Down
11 changes: 9 additions & 2 deletions pkg/prom/ha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,18 @@ func (s *Server) checkUnique(ctx context.Context, cfg *instance.Config) error {

for otherConfig := range cfgCh {
// Skip over the config if it's the same one we're about to apply.
if otherConfig.Name == cfg.Name {
if otherConfig.Key == cfg.Name {
continue
}

for _, otherScrape := range otherConfig.ScrapeConfigs {
// Unmarshal the config. If the config is invalid, skip over it, since it wouldn't
// be able to run anyway.
cfg, err := instance.UnmarshalConfig(strings.NewReader(otherConfig.Value))
if err != nil {
continue
}

for _, otherScrape := range cfg.ScrapeConfigs {
if _, exist := newJobNames[otherScrape.JobName]; exist {
return &httpError{
StatusCode: http.StatusBadRequest,
Expand Down
42 changes: 30 additions & 12 deletions pkg/prom/ha/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,19 +347,9 @@ func (s *Server) watchKV(ctx context.Context) {

// New config should be applied if we own it
case !isDeleted && owned:
cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
if err != nil {
level.Error(s.logger).Log("msg", "could not unmarshal stored config", "name", key, "err", err)
if s.applyConfig(key, v.(string)) {
s.configs[key] = struct{}{}
}

// Applying configs should only fail if the config is invalid
err = s.im.ApplyConfig(*cfg)
if err != nil {
level.Error(s.logger).Log("msg", "failed to apply config, will retry on next reshard", "name", key, "err", err)
return true
}

s.configs[key] = struct{}{}
}

return true
Expand All @@ -368,6 +358,34 @@ func (s *Server) watchKV(ctx context.Context) {
level.Info(s.logger).Log("msg", "stopped watching for changes to configs")
}

// applyConfig applies a config to the InstanceManager. Returns true if the
// application succeed.
func (s *Server) applyConfig(key string, cfgText string) bool {
cfg, err := instance.UnmarshalConfig(strings.NewReader(cfgText))
if err != nil {
level.Error(s.logger).Log("msg", "could not unmarshal stored config", "name", key, "err", err)
return false
}

// Configs from the store aren't immediately valid and must be given the
// global config before running. Configs are validated against the current
// global config at upload time, but if the global config has since changed,
// they can be invalid at read time.
if err := cfg.ApplyDefaults(s.globalConfig, s.defaultRemoteWrite); err != nil {
level.Error(s.logger).Log("msg", "failed to apply defaults to config. this config cannot run until the globals are adjusted or the config is updated with either explicit overrides to defaults or tweaked to operate within the globals", "name", key, "err", err)
return false
}

// Applying configs should only fail if the config is invalid
err = s.im.ApplyConfig(*cfg)
if err != nil {
level.Error(s.logger).Log("msg", "failed to apply config, will retry on next reshard", "name", key, "err", err)
return false
}

return true
}

func (s *Server) reshardLoop(ctx context.Context) {
level.Info(s.logger).Log("msg", "resharding agent on interval", "interval", s.cfg.ReshardInterval)
t := time.NewTicker(s.cfg.ReshardInterval)
Expand Down
30 changes: 13 additions & 17 deletions pkg/prom/ha/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"context"
"hash/fnv"
"net/http"
"strings"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/go-kit/kit/log/level"
"github.com/golang/protobuf/ptypes/empty"
"github.com/grafana/agent/pkg/agentproto"
"github.com/grafana/agent/pkg/prom/instance"
)

// Reshard initiates an entire reshard of the current HA scraping service instance.
Expand Down Expand Up @@ -45,14 +43,9 @@ func (s *Server) Reshard(ctx context.Context, _ *agentproto.ReshardRequest) (_ *
return nil, err
}
for ch := range configCh {
// Applying configs should only fail if the config is invalid
err := s.im.ApplyConfig(ch)
if err != nil {
level.Error(s.logger).Log("msg", "failed to apply config when resharding", "err", err)
continue
if s.applyConfig(ch.Key, ch.Value) {
discoveredConfigs[ch.Key] = struct{}{}
}

discoveredConfigs[ch.Name] = struct{}{}
}

// Find the set of configs that disappeared from AllConfigs from the last
Expand All @@ -77,13 +70,13 @@ func (s *Server) Reshard(ctx context.Context, _ *agentproto.ReshardRequest) (_ *
}

// AllConfigs gets all configs known to the KV store.
func (s *Server) AllConfigs(ctx context.Context) (<-chan instance.Config, error) {
func (s *Server) AllConfigs(ctx context.Context) (<-chan ConfigEntry, error) {
keys, err := s.kv.List(ctx, "")
if err != nil {
return nil, err
}

ch := make(chan instance.Config)
ch := make(chan ConfigEntry)

var wg sync.WaitGroup
wg.Add(len(keys))
Expand Down Expand Up @@ -115,17 +108,20 @@ func (s *Server) AllConfigs(ctx context.Context) (<-chan instance.Config, error)
return
}

cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
if err != nil {
level.Error(s.logger).Log("msg", "could not unmarshal stored config", "name", key, "err", err)
}

ch <- *cfg
ch <- ConfigEntry{Key: key, Value: v.(string)}
}(key)
}
return ch, nil
}

type ConfigEntry struct {
// Key of the config
Key string

// Contents of the config
Value string
}

// owns checks to see if a config name is owned by this Server. owns will
// return an error if the ring is empty or if there aren't enough
// healthy nodes.
Expand Down
20 changes: 2 additions & 18 deletions pkg/prom/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ type BasicManager struct {
mut sync.Mutex
processes map[string]*managedProcess

launch Factory
validate ConfigValidator
launch Factory
}

// managedProcess represents a goroutine running a ManagedInstance. cancel
Expand All @@ -103,27 +102,19 @@ func (p managedProcess) Stop() {
// Factory should return an unstarted instance given some config.
type Factory func(c Config) (ManagedInstance, error)

// A ConfigValidator should validate a Config and return an error if there is
// a problem. ConfigValidator may mutate the Config.
type ConfigValidator func(c *Config) error

// NewBasicManager creates a new BasicManager. The launch function will be
// invoked any time a new Config is applied.
//
// The lifecycle of any ManagedInstance returned by the launch function will
// be handled by the BasicManager. Instances will be automatically restarted
// if stopped, updated if the config changes, or removed when the Config is
// deleted.
//
// The validate function will be called before creating an instance. If the
// config is not valid, launch will not be called. validate is optional.
func NewBasicManager(cfg BasicManagerConfig, logger log.Logger, launch Factory, validate ConfigValidator) *BasicManager {
func NewBasicManager(cfg BasicManagerConfig, logger log.Logger, launch Factory) *BasicManager {
return &BasicManager{
cfg: cfg,
logger: logger,
processes: make(map[string]*managedProcess),
launch: launch,
validate: validate,
}
}

Expand Down Expand Up @@ -163,13 +154,6 @@ func (m *BasicManager) ListConfigs() map[string]Config {
// uniquely identify the Config and determine whether the Config has an
// existing associated managed instance.
func (m *BasicManager) ApplyConfig(c Config) error {
if m.validate != nil {
err := m.validate(&c)
if err != nil {
return fmt.Errorf("failed to validate instance %s: %w", c.Name, err)
}
}

m.mut.Lock()
defer m.mut.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions pkg/prom/instance/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestBasicManager_ApplyConfig(t *testing.T) {
return &newMock, nil
}

cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner, nil)
cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner)

for i := 0; i < 10; i++ {
err := cm.ApplyConfig(Config{Name: "test"})
Expand All @@ -61,7 +61,7 @@ func TestBasicManager_ApplyConfig(t *testing.T) {
return &newMock, nil
}

cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner, nil)
cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner)

for i := 0; i < 10; i++ {
err := cm.ApplyConfig(Config{Name: "test"})
Expand All @@ -83,7 +83,7 @@ func TestBasicManager_ApplyConfig(t *testing.T) {
return &newMock, nil
}

cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner, nil)
cm := NewBasicManager(DefaultBasicManagerConfig, logger, spawner)

// Creation should succeed
err := cm.ApplyConfig(Config{Name: "test"})
Expand Down

0 comments on commit 77c67d6

Please sign in to comment.