Skip to content

Commit

Permalink
implement ApplyConfig for pkg/prom (#473)
Browse files Browse the repository at this point in the history
* implement ApplyConfig for pkg/prom

* clean up comments
  • Loading branch information
rfratto authored Mar 17, 2021
1 parent edb137c commit 5937472
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 26 deletions.
147 changes: 125 additions & 22 deletions pkg/prom/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"errors"
"flag"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/google/go-cmp/cmp"
"github.com/grafana/agent/pkg/prom/cluster"
"github.com/grafana/agent/pkg/prom/cluster/client"
"github.com/grafana/agent/pkg/prom/instance"
Expand Down Expand Up @@ -117,6 +119,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
// components of Prometheus. It is broken down into a series of Instances, each
// of which perform metric collection.
type Agent struct {
mut sync.RWMutex
cfg Config
logger log.Logger
reg prometheus.Registerer
Expand All @@ -131,6 +134,10 @@ type Agent struct {
instanceFactory instanceFactory

cluster *cluster.Cluster

stopped bool
stopOnce sync.Once
actor chan func()
}

// New creates and starts a new Agent.
Expand All @@ -140,10 +147,10 @@ func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (*Agent, erro

func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact instanceFactory) (*Agent, error) {
a := &Agent{
cfg: cfg,
logger: log.With(logger, "agent", "prometheus"),
instanceFactory: fact,
reg: reg,
actor: make(chan func(), 1),
}

a.bm = instance.NewBasicManager(instance.BasicManagerConfig{
Expand All @@ -156,37 +163,23 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins
return nil, fmt.Errorf("failed to create modal instance manager: %w", err)
}

// Periodically attempt to clean up WALs from instances that aren't being run by
// this agent anymore.
a.cleaner = NewWALCleaner(
a.logger,
a.mm,
cfg.WALDir,
cfg.WALCleanupAge,
cfg.WALCleanupPeriod,
)

allConfigsValid := true
for _, c := range cfg.Configs {
if err := a.mm.ApplyConfig(c); err != nil {
level.Error(logger).Log("msg", "failed to apply config", "name", c.Name, "err", err)
allConfigsValid = false
}
}
if !allConfigsValid {
return nil, fmt.Errorf("one or more configs was found to be invalid")
}

a.cluster, err = cluster.New(a.logger, reg, cfg.ServiceConfig, a.mm, a.Validate)
if err != nil {
return nil, err
}

if err := a.ApplyConfig(cfg); err != nil {
return nil, err
}
go a.run()
return a, nil
}

// newInstance creates a new Instance given a config.
func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error) {
a.mut.RLock()
defer a.mut.RUnlock()

// Controls the label
instanceLabel := "instance_name"
if a.cfg.InstanceMode == instance.ModeShared {
Expand All @@ -202,13 +195,113 @@ func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error)

// Validate will validate the incoming Config and mutate it to apply defaults.
func (a *Agent) Validate(c *instance.Config) error {
a.mut.RLock()
defer a.mut.RUnlock()

if err := c.ApplyDefaults(&a.cfg.Global, a.cfg.RemoteWrite); err != nil {
return fmt.Errorf("failed to apply defaults to %q: %w", c.Name, err)
}

return nil
}

// ApplyConfig applies config changes to the Agent.
func (a *Agent) ApplyConfig(cfg Config) error {
a.mut.Lock()
defer a.mut.Unlock()

if cmp.Equal(a.cfg, cfg) {
return nil
}

if a.stopped {
return fmt.Errorf("agent stopped")
}

// The ordering here is done to minimze the number of instances that need to
// be restarted. We update components from lowest to highest level:
//
// 1. WAL Cleaner
// 2. Basic manager
// 3. Modal Manager
// 4. Cluster
// 5. Local configs

if a.cleaner != nil {
a.cleaner.Stop()
}
a.cleaner = NewWALCleaner(
a.logger,
a.mm,
cfg.WALDir,
cfg.WALCleanupAge,
cfg.WALCleanupPeriod,
)

a.bm.UpdateManagerConfig(instance.BasicManagerConfig{
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
})

if err := a.mm.SetMode(cfg.InstanceMode); err != nil {
return err
}

if err := a.cluster.ApplyConfig(cfg.ServiceConfig); err != nil {
return fmt.Errorf("failed to apply cluster config: %w", err)
}

// Queue an actor in the background to sync the instances. This is required
// because creating both this function and newInstance grab the mutex.
oldConfig := a.cfg

a.actor <- func() {
a.syncInstances(oldConfig, cfg)
}

a.cfg = cfg
return nil
}

// syncInstances syncs the state of the instance manager to newConfig by
// applying all configs from newConfig and deleting any configs from oldConfig
// that are not in newConfig.
func (a *Agent) syncInstances(oldConfig, newConfig Config) {
a.mut.RLock()
defer a.mut.RUnlock()

// Apply the new configs
for _, c := range newConfig.Configs {
if err := a.mm.ApplyConfig(c); err != nil {
level.Error(a.logger).Log("msg", "failed to apply config", "name", c.Name, "err", err)
}
}

// Remove any configs from oldConfig that aren't in newConfig.
for _, oc := range oldConfig.Configs {
foundConfig := false
for _, nc := range newConfig.Configs {
if nc.Name == oc.Name {
foundConfig = true
break
}
}
if foundConfig {
continue
}

if err := a.mm.DeleteConfig(oc.Name); err != nil {
level.Error(a.logger).Log("msg", "failed to delete old config", "name", oc.Name, "err", err)
}
}
}

// run calls received actor functions in the background.
func (a *Agent) run() {
for f := range a.actor {
f()
}
}

func (a *Agent) WireGRPC(s *grpc.Server) {
a.cluster.WireGRPC(s)
}
Expand All @@ -218,13 +311,23 @@ func (a *Agent) InstanceManager() instance.Manager { return a.mm }

// Stop stops the agent and all its instances.
func (a *Agent) Stop() {
a.mut.Lock()
defer a.mut.Unlock()

// Close the actor channel to stop run.
a.stopOnce.Do(func() {
close(a.actor)
})

a.cluster.Stop()

a.cleaner.Stop()

// Only need to stop the ModalManager, which will passthrough everything to the
// BasicManager.
a.mm.Stop()

a.stopped = true
}

type instanceFactory = func(reg prometheus.Registerer, global config.GlobalConfig, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error)
Expand Down
8 changes: 4 additions & 4 deletions pkg/prom/instance/modal_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ type ModalManager struct {

// NewModalManager creates a new ModalManager.
func NewModalManager(reg prometheus.Registerer, l log.Logger, next Manager, mode Mode) (*ModalManager, error) {
if mode == "" {
mode = DefaultMode
}

currentActiveConfigs := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "agent_prometheus_active_configs",
Help: "Current number of active configs being used by the agent.",
Expand All @@ -94,6 +90,10 @@ func NewModalManager(reg prometheus.Registerer, l log.Logger, next Manager, mode
// an expensive operation; all underlying configs must be stopped and then
// reapplied.
func (m *ModalManager) SetMode(newMode Mode) error {
if newMode == "" {
newMode = DefaultMode
}

m.mut.Lock()
defer m.mut.Unlock()

Expand Down

0 comments on commit 5937472

Please sign in to comment.