Skip to content

Commit

Permalink
pkg/prom/instance: create a ModalManager for ApplyConfig (#427)
Browse files Browse the repository at this point in the history
* pkg/prom/instance: create a ModalManager for ApplyConfig

ApplyConfig for the pkg/prom will need to be able to mutate the instance
grouping mode and other settings. This PR enables that by doing the
following:

1. Create a ModalManager that wraps around a GroupManager or a (new)
   DistinctManager. The ModalManager can change modes at the cost of
   needing to re-apply all instances.

2. Remove CountingManager and migrate its logic to ModalManager

3. Separate out storage of the ModalManager and BasicManager to be able
   to control both independently

* remove DistinctManager, use underlying manager directly

* simplify GroupManager.Stop

* removed unused logger from GroupManager

* document active/wrapped managers a bit better in ModalManager
  • Loading branch information
rfratto authored Feb 26, 2021
1 parent 4273523 commit 28afbaa
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 209 deletions.
70 changes: 21 additions & 49 deletions pkg/prom/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,10 @@ var (
WALCleanupPeriod: DefaultCleanupPeriod,
ServiceConfig: ha.DefaultConfig,
ServiceClientConfig: client.DefaultConfig,
InstanceMode: DefaultInstanceMode,
InstanceMode: instance.DefaultMode,
}
)

// InstanceMode controls how instances are created.
type InstanceMode string

// Types of instance modes
var (
InstanceModeDistinct InstanceMode = "distinct"
InstanceModeShared InstanceMode = "shared"

DefaultInstanceMode = InstanceModeShared
)

// UnmarshalYAML unmarshals a string to an InstanceMode. Fails if the string is
// unrecognized.
func (m *InstanceMode) UnmarshalYAML(unmarshal func(interface{}) error) error {
*m = DefaultInstanceMode

var plain string
if err := unmarshal(&plain); err != nil {
return err
}

switch plain {
case string(InstanceModeDistinct):
*m = InstanceModeDistinct
return nil
case string(InstanceModeShared):
*m = InstanceModeShared
return nil
default:
return fmt.Errorf("unsupported instance_mode '%s'. supported values 'shared', 'distinct'", plain)
}
}

// Config defines the configuration for the entire set of Prometheus client
// instances, along with a global configuration.
type Config struct {
Expand All @@ -78,7 +45,7 @@ type Config struct {
ServiceClientConfig client.Config `yaml:"scraping_service_client"`
Configs []instance.Config `yaml:"configs,omitempty"`
InstanceRestartBackoff time.Duration `yaml:"instance_restart_backoff,omitempty"`
InstanceMode InstanceMode `yaml:"instance_mode"`
InstanceMode instance.Mode `yaml:"instance_mode"`
}

// UnmarshalYAML implements yaml.Unmarshaler.
Expand Down Expand Up @@ -147,7 +114,11 @@ type Agent struct {
logger log.Logger
reg prometheus.Registerer

cm instance.Manager
// Store both the basic manager and the modal manager so we can update their
// settings indepedently. Only the ModalManager should be used for mutating
// configs.
bm *instance.BasicManager
mm *instance.ModalManager
cleaner *WALCleaner

instanceFactory instanceFactory
Expand All @@ -168,31 +139,29 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins
reg: reg,
}

a.cm = instance.NewBasicManager(instance.BasicManagerConfig{
a.bm = instance.NewBasicManager(instance.BasicManagerConfig{
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
}, a.logger, a.newInstance, a.validateInstance)

if cfg.InstanceMode == InstanceModeShared {
a.cm = instance.NewGroupManager(a.cm)
var err error
a.mm, err = instance.NewModalManager(a.reg, a.logger, a.bm, cfg.InstanceMode)
if err != nil {
return nil, fmt.Errorf("failed to create modal instance manager: %w", err)
}

// Regardless of the instance mode, wrap the manager in a CountingManager so we can
// collect metrics on the number of active configs.
a.cm = instance.NewCountingManager(reg, a.cm)

// Periodically attempt to clean up WALs from instances that aren't being run by
// this agent anymore.
a.cleaner = NewWALCleaner(
a.logger,
a.cm,
a.mm,
cfg.WALDir,
cfg.WALCleanupAge,
cfg.WALCleanupPeriod,
)

allConfigsValid := true
for _, c := range cfg.Configs {
if err := a.cm.ApplyConfig(c); err != nil {
if err := a.mm.ApplyConfig(c); err != nil {
level.Error(logger).Log("msg", "failed to apply config", "name", c.Name, "err", err)
allConfigsValid = false
}
Expand All @@ -203,7 +172,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins

if cfg.ServiceConfig.Enabled {
var err error
a.ha, err = ha.New(reg, cfg.ServiceConfig, &cfg.Global, cfg.ServiceClientConfig, a.logger, a.cm)
a.ha, err = ha.New(reg, cfg.ServiceConfig, &cfg.Global, cfg.ServiceClientConfig, a.logger, a.mm)
if err != nil {
return nil, err
}
Expand All @@ -216,7 +185,7 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins
func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error) {
// Controls the label
instanceLabel := "instance_name"
if a.cfg.InstanceMode == InstanceModeShared {
if a.cfg.InstanceMode == instance.ModeShared {
instanceLabel = "instance_group_name"
}

Expand All @@ -238,7 +207,7 @@ func (a *Agent) WireGRPC(s *grpc.Server) {
}

func (a *Agent) Config() Config { return a.cfg }
func (a *Agent) InstanceManager() instance.Manager { return a.cm }
func (a *Agent) InstanceManager() instance.Manager { return a.mm }

// Stop stops the agent and all its instances.
func (a *Agent) Stop() {
Expand All @@ -248,7 +217,10 @@ func (a *Agent) Stop() {
}
}
a.cleaner.Stop()
a.cm.Stop()

// Only need to stop the ModalManager, which will passthrough everything to the
// BasicManager.
a.mm.Stop()
}

type instanceFactory = func(reg prometheus.Registerer, global config.GlobalConfig, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error)
Expand Down
11 changes: 7 additions & 4 deletions pkg/prom/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestConfig_Validate(t *testing.T) {
Configs: []instance.Config{
makeInstanceConfig("instance"),
},
InstanceMode: DefaultInstanceMode,
InstanceMode: instance.DefaultMode,
}

tt := []struct {
Expand Down Expand Up @@ -121,6 +121,7 @@ func TestAgent(t *testing.T) {
makeInstanceConfig("instance_b"),
},
InstanceRestartBackoff: time.Duration(0),
InstanceMode: instance.ModeDistinct,
}

fact := newFakeInstanceFactory()
Expand All @@ -132,7 +133,7 @@ func TestAgent(t *testing.T) {
if fact.created == nil {
return false
}
return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
})

t.Run("instances should be running", func(t *testing.T) {
Expand Down Expand Up @@ -179,6 +180,7 @@ func TestAgent_NormalInstanceExits(t *testing.T) {
makeInstanceConfig("instance_b"),
},
InstanceRestartBackoff: time.Duration(0),
InstanceMode: instance.ModeDistinct,
}

for _, tc := range tt {
Expand All @@ -192,7 +194,7 @@ func TestAgent_NormalInstanceExits(t *testing.T) {
if fact.created == nil {
return false
}
return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
})

for _, mi := range fact.Mocks() {
Expand Down Expand Up @@ -224,6 +226,7 @@ func TestAgent_Stop(t *testing.T) {
makeInstanceConfig("instance_b"),
},
InstanceRestartBackoff: time.Duration(0),
InstanceMode: instance.ModeDistinct,
}

fact := newFakeInstanceFactory()
Expand All @@ -235,7 +238,7 @@ func TestAgent_Stop(t *testing.T) {
if fact.created == nil {
return false
}
return fact.created.Load() == 2 && len(a.cm.ListInstances()) == 2
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
})

a.Stop()
Expand Down
4 changes: 2 additions & 2 deletions pkg/prom/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (a *Agent) WireAPI(r *mux.Router) {

// ListInstances writes the set of currently running instances to the http.ResponseWriter.
func (a *Agent) ListInstancesHandler(w http.ResponseWriter, _ *http.Request) {
cfgs := a.cm.ListConfigs()
cfgs := a.mm.ListConfigs()
instanceNames := make([]string, 0, len(cfgs))
for k := range cfgs {
instanceNames = append(instanceNames, k)
Expand All @@ -40,7 +40,7 @@ func (a *Agent) ListInstancesHandler(w http.ResponseWriter, _ *http.Request) {
// ListTargetsHandler retrieves the full set of targets across all instances and shows
// information on them.
func (a *Agent) ListTargetsHandler(w http.ResponseWriter, _ *http.Request) {
instances := a.cm.ListInstances()
instances := a.mm.ListInstances()
resp := ListTargetsResponse{}

for instName, inst := range instances {
Expand Down
7 changes: 4 additions & 3 deletions pkg/prom/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestAgent_ListInstancesHandler(t *testing.T) {
})

t.Run("non-empty", func(t *testing.T) {
require.NoError(t, a.cm.ApplyConfig(makeInstanceConfig("foo")))
require.NoError(t, a.cm.ApplyConfig(makeInstanceConfig("bar")))
require.NoError(t, a.mm.ApplyConfig(makeInstanceConfig("foo")))
require.NoError(t, a.mm.ApplyConfig(makeInstanceConfig("bar")))

expect := `{"status":"success","data":["bar","foo"]}`
test.Poll(t, time.Second, true, func() interface{} {
Expand All @@ -62,7 +62,8 @@ func TestAgent_ListTargetsHandler(t *testing.T) {
DeleteConfigFunc: func(name string) error { return nil },
StopFunc: func() {},
}
a.cm = mockManager
a.mm, err = instance.NewModalManager(prometheus.NewRegistry(), a.logger, mockManager, instance.ModeDistinct)
require.NoError(t, err)

r := httptest.NewRequest("GET", "/agent/api/v1/targets", nil)

Expand Down
80 changes: 0 additions & 80 deletions pkg/prom/instance/counting_manager.go

This file was deleted.

68 changes: 0 additions & 68 deletions pkg/prom/instance/counting_manager_test.go

This file was deleted.

Loading

0 comments on commit 28afbaa

Please sign in to comment.