From 28afbaa2ed916516c21d10c995872859a883967e Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Fri, 26 Feb 2021 12:11:24 -0500 Subject: [PATCH] pkg/prom/instance: create a ModalManager for ApplyConfig (#427) * pkg/prom/instance: create a ModalManager for ApplyConfig ApplyConfig for the pkg/prom will need to be able to mutate the instance grouping mode and other settings. This PR enables that by doing the following: 1. Create a ModalManager that wraps around a GroupManager or a (new) DistinctManager. The ModalManager can change modes at the cost of needing to re-apply all instances. 2. Remove CountingManager and migrate its logic to ModalManager 3. Separate out storage of the ModalManager and BasicManager to be able to control both independently * remove DistinctManager, use underlying manager directly * simplify GroupManager.Stop * removed unused logger from GroupManager * document active/wrapped managers a bit better in ModalManager --- pkg/prom/agent.go | 70 +++----- pkg/prom/agent_test.go | 11 +- pkg/prom/http.go | 4 +- pkg/prom/http_test.go | 7 +- pkg/prom/instance/counting_manager.go | 80 --------- pkg/prom/instance/counting_manager_test.go | 68 ------- pkg/prom/instance/group_manager.go | 9 +- pkg/prom/instance/manager.go | 22 ++- pkg/prom/instance/modal_manager.go | 197 +++++++++++++++++++++ 9 files changed, 259 insertions(+), 209 deletions(-) delete mode 100644 pkg/prom/instance/counting_manager.go delete mode 100644 pkg/prom/instance/counting_manager_test.go create mode 100644 pkg/prom/instance/modal_manager.go diff --git a/pkg/prom/agent.go b/pkg/prom/agent.go index 69bdd9ae80db..3c6e1390c567 100644 --- a/pkg/prom/agent.go +++ b/pkg/prom/agent.go @@ -27,43 +27,10 @@ var ( WALCleanupPeriod: DefaultCleanupPeriod, ServiceConfig: ha.DefaultConfig, ServiceClientConfig: client.DefaultConfig, - InstanceMode: DefaultInstanceMode, + InstanceMode: instance.DefaultMode, } ) -// InstanceMode controls how instances are created. -type InstanceMode string - -// Types of instance modes -var ( - InstanceModeDistinct InstanceMode = "distinct" - InstanceModeShared InstanceMode = "shared" - - DefaultInstanceMode = InstanceModeShared -) - -// UnmarshalYAML unmarshals a string to an InstanceMode. Fails if the string is -// unrecognized. -func (m *InstanceMode) UnmarshalYAML(unmarshal func(interface{}) error) error { - *m = DefaultInstanceMode - - var plain string - if err := unmarshal(&plain); err != nil { - return err - } - - switch plain { - case string(InstanceModeDistinct): - *m = InstanceModeDistinct - return nil - case string(InstanceModeShared): - *m = InstanceModeShared - return nil - default: - return fmt.Errorf("unsupported instance_mode '%s'. supported values 'shared', 'distinct'", plain) - } -} - // Config defines the configuration for the entire set of Prometheus client // instances, along with a global configuration. type Config struct { @@ -78,7 +45,7 @@ type Config struct { ServiceClientConfig client.Config `yaml:"scraping_service_client"` Configs []instance.Config `yaml:"configs,omitempty"` InstanceRestartBackoff time.Duration `yaml:"instance_restart_backoff,omitempty"` - InstanceMode InstanceMode `yaml:"instance_mode"` + InstanceMode instance.Mode `yaml:"instance_mode"` } // UnmarshalYAML implements yaml.Unmarshaler. @@ -147,7 +114,11 @@ type Agent struct { logger log.Logger reg prometheus.Registerer - cm instance.Manager + // Store both the basic manager and the modal manager so we can update their + // settings indepedently. Only the ModalManager should be used for mutating + // configs. + bm *instance.BasicManager + mm *instance.ModalManager cleaner *WALCleaner instanceFactory instanceFactory @@ -168,23 +139,21 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins reg: reg, } - a.cm = instance.NewBasicManager(instance.BasicManagerConfig{ + a.bm = instance.NewBasicManager(instance.BasicManagerConfig{ InstanceRestartBackoff: cfg.InstanceRestartBackoff, }, a.logger, a.newInstance, a.validateInstance) - if cfg.InstanceMode == InstanceModeShared { - a.cm = instance.NewGroupManager(a.cm) + var err error + a.mm, err = instance.NewModalManager(a.reg, a.logger, a.bm, cfg.InstanceMode) + if err != nil { + return nil, fmt.Errorf("failed to create modal instance manager: %w", err) } - // Regardless of the instance mode, wrap the manager in a CountingManager so we can - // collect metrics on the number of active configs. - a.cm = instance.NewCountingManager(reg, a.cm) - // Periodically attempt to clean up WALs from instances that aren't being run by // this agent anymore. a.cleaner = NewWALCleaner( a.logger, - a.cm, + a.mm, cfg.WALDir, cfg.WALCleanupAge, cfg.WALCleanupPeriod, @@ -192,7 +161,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins allConfigsValid := true for _, c := range cfg.Configs { - if err := a.cm.ApplyConfig(c); err != nil { + if err := a.mm.ApplyConfig(c); err != nil { level.Error(logger).Log("msg", "failed to apply config", "name", c.Name, "err", err) allConfigsValid = false } @@ -203,7 +172,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins if cfg.ServiceConfig.Enabled { var err error - a.ha, err = ha.New(reg, cfg.ServiceConfig, &cfg.Global, cfg.ServiceClientConfig, a.logger, a.cm) + a.ha, err = ha.New(reg, cfg.ServiceConfig, &cfg.Global, cfg.ServiceClientConfig, a.logger, a.mm) if err != nil { return nil, err } @@ -216,7 +185,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error) { // Controls the label instanceLabel := "instance_name" - if a.cfg.InstanceMode == InstanceModeShared { + if a.cfg.InstanceMode == instance.ModeShared { instanceLabel = "instance_group_name" } @@ -238,7 +207,7 @@ func (a *Agent) WireGRPC(s *grpc.Server) { } func (a *Agent) Config() Config { return a.cfg } -func (a *Agent) InstanceManager() instance.Manager { return a.cm } +func (a *Agent) InstanceManager() instance.Manager { return a.mm } // Stop stops the agent and all its instances. func (a *Agent) Stop() { @@ -248,7 +217,10 @@ func (a *Agent) Stop() { } } a.cleaner.Stop() - a.cm.Stop() + + // Only need to stop the ModalManager, which will passthrough everything to the + // BasicManager. + a.mm.Stop() } type instanceFactory = func(reg prometheus.Registerer, global config.GlobalConfig, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error) diff --git a/pkg/prom/agent_test.go b/pkg/prom/agent_test.go index ab5036da2e46..1575470ef263 100644 --- a/pkg/prom/agent_test.go +++ b/pkg/prom/agent_test.go @@ -25,7 +25,7 @@ func TestConfig_Validate(t *testing.T) { Configs: []instance.Config{ makeInstanceConfig("instance"), }, - InstanceMode: DefaultInstanceMode, + InstanceMode: instance.DefaultMode, } tt := []struct { @@ -121,6 +121,7 @@ func TestAgent(t *testing.T) { makeInstanceConfig("instance_b"), }, InstanceRestartBackoff: time.Duration(0), + InstanceMode: instance.ModeDistinct, } fact := newFakeInstanceFactory() @@ -132,7 +133,7 @@ func TestAgent(t *testing.T) { if fact.created == nil { return false } - return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2 + return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2 }) t.Run("instances should be running", func(t *testing.T) { @@ -179,6 +180,7 @@ func TestAgent_NormalInstanceExits(t *testing.T) { makeInstanceConfig("instance_b"), }, InstanceRestartBackoff: time.Duration(0), + InstanceMode: instance.ModeDistinct, } for _, tc := range tt { @@ -192,7 +194,7 @@ func TestAgent_NormalInstanceExits(t *testing.T) { if fact.created == nil { return false } - return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2 + return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2 }) for _, mi := range fact.Mocks() { @@ -224,6 +226,7 @@ func TestAgent_Stop(t *testing.T) { makeInstanceConfig("instance_b"), }, InstanceRestartBackoff: time.Duration(0), + InstanceMode: instance.ModeDistinct, } fact := newFakeInstanceFactory() @@ -235,7 +238,7 @@ func TestAgent_Stop(t *testing.T) { if fact.created == nil { return false } - return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2 + return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2 }) a.Stop() diff --git a/pkg/prom/http.go b/pkg/prom/http.go index 2b25a8636cc7..74c00bde84f5 100644 --- a/pkg/prom/http.go +++ b/pkg/prom/http.go @@ -24,7 +24,7 @@ func (a *Agent) WireAPI(r *mux.Router) { // ListInstances writes the set of currently running instances to the http.ResponseWriter. func (a *Agent) ListInstancesHandler(w http.ResponseWriter, _ *http.Request) { - cfgs := a.cm.ListConfigs() + cfgs := a.mm.ListConfigs() instanceNames := make([]string, 0, len(cfgs)) for k := range cfgs { instanceNames = append(instanceNames, k) @@ -40,7 +40,7 @@ func (a *Agent) ListInstancesHandler(w http.ResponseWriter, _ *http.Request) { // ListTargetsHandler retrieves the full set of targets across all instances and shows // information on them. func (a *Agent) ListTargetsHandler(w http.ResponseWriter, _ *http.Request) { - instances := a.cm.ListInstances() + instances := a.mm.ListInstances() resp := ListTargetsResponse{} for instName, inst := range instances { diff --git a/pkg/prom/http_test.go b/pkg/prom/http_test.go index f31e0c69f76b..df1a30e28e9e 100644 --- a/pkg/prom/http_test.go +++ b/pkg/prom/http_test.go @@ -36,8 +36,8 @@ func TestAgent_ListInstancesHandler(t *testing.T) { }) t.Run("non-empty", func(t *testing.T) { - require.NoError(t, a.cm.ApplyConfig(makeInstanceConfig("foo"))) - require.NoError(t, a.cm.ApplyConfig(makeInstanceConfig("bar"))) + require.NoError(t, a.mm.ApplyConfig(makeInstanceConfig("foo"))) + require.NoError(t, a.mm.ApplyConfig(makeInstanceConfig("bar"))) expect := `{"status":"success","data":["bar","foo"]}` test.Poll(t, time.Second, true, func() interface{} { @@ -62,7 +62,8 @@ func TestAgent_ListTargetsHandler(t *testing.T) { DeleteConfigFunc: func(name string) error { return nil }, StopFunc: func() {}, } - a.cm = mockManager + a.mm, err = instance.NewModalManager(prometheus.NewRegistry(), a.logger, mockManager, instance.ModeDistinct) + require.NoError(t, err) r := httptest.NewRequest("GET", "/agent/api/v1/targets", nil) diff --git a/pkg/prom/instance/counting_manager.go b/pkg/prom/instance/counting_manager.go deleted file mode 100644 index a90693e8785a..000000000000 --- a/pkg/prom/instance/counting_manager.go +++ /dev/null @@ -1,80 +0,0 @@ -package instance - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -// CountingManager counts how many unique configs pass through a Manager. -// It may be distinct from the set of instances depending on available -// Managers. -// -// CountingManager implements Manager. -type CountingManager struct { - currentActiveConfigs prometheus.Gauge - - cache map[string]struct{} - inner Manager -} - -// NewCountingManager creates a new CountingManager. The Manager provided -// by inner will be wrapped by CountingManager and will handle requests to -// apply configs. -func NewCountingManager(reg prometheus.Registerer, inner Manager) *CountingManager { - currentActiveConfigs := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "agent_prometheus_active_configs", - Help: "Current number of active configs being used by the agent.", - }) - if reg != nil { - reg.MustRegister(currentActiveConfigs) - } - - return &CountingManager{ - currentActiveConfigs: currentActiveConfigs, - cache: make(map[string]struct{}), - inner: inner, - } -} - -// ListInstances implements Manager. -func (cm *CountingManager) ListInstances() map[string]ManagedInstance { - return cm.inner.ListInstances() -} - -// ListConfigs implements Manager. -func (cm *CountingManager) ListConfigs() map[string]Config { - return cm.inner.ListConfigs() -} - -// ApplyConfig implements Manager. -func (cm *CountingManager) ApplyConfig(c Config) error { - err := cm.inner.ApplyConfig(c) - if err != nil { - return err - } - - // If the config isn't in the cache, add it and increment the counter. - if _, ok := cm.cache[c.Name]; !ok { - cm.cache[c.Name] = struct{}{} - cm.currentActiveConfigs.Inc() - } - - return nil -} - -// DeleteConfig implements Manager. -func (cm *CountingManager) DeleteConfig(name string) error { - err := cm.inner.DeleteConfig(name) - if err != nil { - return err - } - - // Remove the config from the cache and decrement the counter. - delete(cm.cache, name) - cm.currentActiveConfigs.Dec() - return nil -} - -// Stop implements Manager. -func (cm *CountingManager) Stop() { - cm.inner.Stop() -} diff --git a/pkg/prom/instance/counting_manager_test.go b/pkg/prom/instance/counting_manager_test.go deleted file mode 100644 index 0ee55edd1282..000000000000 --- a/pkg/prom/instance/counting_manager_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package instance - -import ( - "errors" - "fmt" - "strings" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" -) - -func TestCountingManager(t *testing.T) { - mockConfigs := make(map[string]Config) - - mock := &MockManager{ - ListInstancesFunc: func() map[string]ManagedInstance { return nil }, - ListConfigsFunc: func() map[string]Config { - return mockConfigs - }, - ApplyConfigFunc: func(c Config) error { - mockConfigs[c.Name] = c - return nil - }, - DeleteConfigFunc: func(name string) error { - if _, ok := mockConfigs[name]; !ok { - return errors.New("config does not exist") - } - delete(mockConfigs, name) - return nil - }, - } - - reg := prometheus.NewRegistry() - cm := NewCountingManager(reg, mock) - - // Pull values from the registry and assert that our gauge has the - // expected value. - requireGaugeValue := func(value int) { - expect := fmt.Sprintf(` - # HELP agent_prometheus_active_configs Current number of active configs being used by the agent. - # TYPE agent_prometheus_active_configs gauge - agent_prometheus_active_configs %d - `, value) - - r := strings.NewReader(expect) - require.NoError(t, testutil.GatherAndCompare(reg, r)) - } - - requireGaugeValue(0) - - // Apply two different configs, but each config twice. The gauge should - // only be set to 2. - _ = cm.ApplyConfig(Config{Name: "config-a"}) - _ = cm.ApplyConfig(Config{Name: "config-a"}) - _ = cm.ApplyConfig(Config{Name: "config-b"}) - _ = cm.ApplyConfig(Config{Name: "config-b"}) - requireGaugeValue(2) - - // Deleting a config that doesn't exist shouldn't change the gauge. - _ = cm.DeleteConfig("config-nil") - requireGaugeValue(2) - - // Deleting a config that does exist _should_ change the gauge. - _ = cm.DeleteConfig("config-b") - requireGaugeValue(1) -} diff --git a/pkg/prom/instance/group_manager.go b/pkg/prom/instance/group_manager.go index 1e70ccf4db4c..a6c2c7777ce1 100644 --- a/pkg/prom/instance/group_manager.go +++ b/pkg/prom/instance/group_manager.go @@ -227,7 +227,14 @@ func (m *GroupManager) deleteConfig(name string) error { } // Stop stops the Manager and all of its managed instances. -func (m *GroupManager) Stop() { m.inner.Stop() } +func (m *GroupManager) Stop() { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.inner.Stop() + m.groupLookup = make(map[string]string) + m.groups = make(map[string]groupedConfigs) +} // hashConfig determines the hash of a Config used for grouping. It ignores // the name and scrape_configs and also orders remote_writes by name prior to diff --git a/pkg/prom/instance/manager.go b/pkg/prom/instance/manager.go index 655d8a3699a8..b939960d8a81 100644 --- a/pkg/prom/instance/manager.go +++ b/pkg/prom/instance/manager.go @@ -72,6 +72,7 @@ type BasicManagerConfig struct { // // Other implementations of Manager usually wrap a BasicManager. type BasicManager struct { + cfgMut sync.Mutex cfg BasicManagerConfig logger log.Logger @@ -126,6 +127,13 @@ func NewBasicManager(cfg BasicManagerConfig, logger log.Logger, launch Factory, } } +// UpdateManagerConfig updates the BasicManagerConfig. +func (m *BasicManager) UpdateManagerConfig(c BasicManagerConfig) { + m.cfgMut.Lock() + defer m.cfgMut.Unlock() + m.cfg = c +} + // ListInstances returns the current active instances managed by BasicManager. func (m *BasicManager) ListInstances() map[string]ManagedInstance { m.mut.Lock() @@ -249,9 +257,11 @@ func (m *BasicManager) runProcess(ctx context.Context, name string, inst Managed for { err := inst.Run(ctx) if err != nil && err != context.Canceled { + backoff := m.instanceRestartBackoff() + instanceAbnormalExits.WithLabelValues(name).Inc() - level.Error(m.logger).Log("msg", "instance stopped abnormally, restarting after backoff period", "err", err, "backoff", m.cfg.InstanceRestartBackoff, "instance", name) - time.Sleep(m.cfg.InstanceRestartBackoff) + level.Error(m.logger).Log("msg", "instance stopped abnormally, restarting after backoff period", "err", err, "backoff", backoff, "instance", name) + time.Sleep(backoff) } else { level.Info(m.logger).Log("msg", "stopped instance", "instance", name) break @@ -259,6 +269,12 @@ func (m *BasicManager) runProcess(ctx context.Context, name string, inst Managed } } +func (m *BasicManager) instanceRestartBackoff() time.Duration { + m.cfgMut.Lock() + defer m.cfgMut.Unlock() + return m.cfg.InstanceRestartBackoff +} + // DeleteConfig removes a managed instance by its config name. Returns an error // if there is no such managed instance with the given name. func (m *BasicManager) DeleteConfig(name string) error { @@ -280,6 +296,8 @@ func (m *BasicManager) DeleteConfig(name string) error { func (m *BasicManager) Stop() { var wg sync.WaitGroup + // We don't need to change m.processes here; processes remove themselves + // from the map (in spawnProcess). m.mut.Lock() wg.Add(len(m.processes)) for _, proc := range m.processes { diff --git a/pkg/prom/instance/modal_manager.go b/pkg/prom/instance/modal_manager.go new file mode 100644 index 000000000000..9483a879d58c --- /dev/null +++ b/pkg/prom/instance/modal_manager.go @@ -0,0 +1,197 @@ +package instance + +import ( + "fmt" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Mode controls how instances are created. +type Mode string + +// Types of instance modes +var ( + ModeDistinct Mode = "distinct" + ModeShared Mode = "shared" + + DefaultMode = ModeShared +) + +// UnmarshalYAML unmarshals a string to a Mode. Fails if the string is +// unrecognized. +func (m *Mode) UnmarshalYAML(unmarshal func(interface{}) error) error { + *m = DefaultMode + + var plain string + if err := unmarshal(&plain); err != nil { + return err + } + + switch plain { + case string(ModeDistinct): + *m = ModeDistinct + return nil + case string(ModeShared): + *m = ModeShared + return nil + default: + return fmt.Errorf("unsupported instance_mode '%s'. supported values 'shared', 'distinct'", plain) + } +} + +// ModalManager runs instances by either grouping them or running them fully +// separately. +type ModalManager struct { + mut sync.RWMutex + mode Mode + configs map[string]Config + + currentActiveConfigs prometheus.Gauge + + log log.Logger + + // The ModalManager wraps around a "final" Manager that is intended to + // launch and manage instances based on Configs. This is specified here by the + // "wrapped" Manager. + // + // However, there may be another manager performing formations on the configs + // before they are passed through to wrapped. This is specified by the "active" + // Manager. + // + // If no transformations on Configs are needed, active will be identical to + // wrapped. + wrapped, active Manager +} + +// NewModalManager creates a new ModalManager. +func NewModalManager(reg prometheus.Registerer, l log.Logger, next Manager, mode Mode) (*ModalManager, error) { + if mode == "" { + mode = DefaultMode + } + + currentActiveConfigs := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "agent_prometheus_active_configs", + Help: "Current number of active configs being used by the agent.", + }) + + mm := ModalManager{ + wrapped: next, + log: l, + currentActiveConfigs: currentActiveConfigs, + configs: make(map[string]Config), + } + if err := mm.SetMode(mode); err != nil { + return nil, err + } + return &mm, nil +} + +// SetMode updates the mode ModalManager is running in. Changing the mode is +// an expensive operation; all underlying configs must be stopped and then +// reapplied. +func (m *ModalManager) SetMode(newMode Mode) error { + m.mut.Lock() + defer m.mut.Unlock() + + var ( + prevMode = m.mode + prevActive = m.active + ) + + if prevMode == newMode { + return nil + } + + // Set the active Manager based on the new mode. "distinct" means no transformations + // need to be applied and we can use the wrapped Manager directly. Otherwise, we need + // to create a new Manager to apply transformations. + switch newMode { + case ModeDistinct: + m.active = m.wrapped + case ModeShared: + m.active = NewGroupManager(m.wrapped) + default: + panic("unknown mode " + m.mode) + } + m.mode = newMode + + // Remove all configs from the previous active Manager. + if prevActive != nil { + prevActive.Stop() + } + + // Re-apply configs to the new active Manager. + var firstError error + for name, cfg := range m.configs { + err := m.active.ApplyConfig(cfg) + if err != nil { + level.Error(m.log).Log("msg", "failed to apply config when changing modes", "name", name, "prev_mode", prevMode, "new_mode", newMode, "err", err) + } + if firstError == nil && err != nil { + firstError = err + } + } + + return firstError +} + +// ListInstances implements Manager. +func (m *ModalManager) ListInstances() map[string]ManagedInstance { + m.mut.RLock() + defer m.mut.RUnlock() + return m.active.ListInstances() +} + +// ListConfigs implements Manager. +func (m *ModalManager) ListConfigs() map[string]Config { + m.mut.RLock() + defer m.mut.RUnlock() + return m.active.ListConfigs() +} + +// ApplyConfig implements Manager. +func (m *ModalManager) ApplyConfig(c Config) error { + m.mut.Lock() + defer m.mut.Unlock() + + if err := m.active.ApplyConfig(c); err != nil { + return err + } + + if _, existingConfig := m.configs[c.Name]; !existingConfig { + m.currentActiveConfigs.Inc() + } + m.configs[c.Name] = c + + return nil +} + +// DeleteConfig implements Manager. +func (m *ModalManager) DeleteConfig(name string) error { + m.mut.Lock() + defer m.mut.Unlock() + + if err := m.active.DeleteConfig(name); err != nil { + return err + } + + if _, existingConfig := m.configs[name]; existingConfig { + m.currentActiveConfigs.Dec() + delete(m.configs, name) + } + return nil +} + +// Stop implements Manager. +func (m *ModalManager) Stop() { + m.mut.Lock() + defer m.mut.Unlock() + + m.active.Stop() + m.currentActiveConfigs.Set(0) + m.configs = make(map[string]Config) +}