diff --git a/cmd/agent/main.go b/cmd/agent/main.go index a60736923851..ec76e0ee7a97 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -9,6 +9,7 @@ import ( // Adds version information _ "github.com/grafana/agent/pkg/build" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/loki" "github.com/grafana/agent/pkg/tempo" @@ -23,6 +24,9 @@ import ( // Register Prometheus SD components _ "github.com/prometheus/prometheus/discovery/install" + + // Register integrations + _ "github.com/grafana/agent/pkg/integrations/install" ) func init() { diff --git a/go.mod b/go.mod index 4bbf85bea974..8811f96c23f5 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( google.golang.org/grpc v1.33.2 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 ) // Needed for Cortex's dependencies to work properly. diff --git a/pkg/config/config.go b/pkg/config/config.go index 460b9ad6e2d6..d29ac3088762 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -19,11 +19,11 @@ import ( // Config contains underlying configurations for the agent type Config struct { - Server server.Config `yaml:"server"` - Prometheus prom.Config `yaml:"prometheus,omitempty"` - Loki loki.Config `yaml:"loki,omitempty"` - Integrations integrations.Config `yaml:"integrations"` - Tempo tempo.Config `yaml:"tempo,omitempty"` + Server server.Config `yaml:"server"` + Prometheus prom.Config `yaml:"prometheus,omitempty"` + Loki loki.Config `yaml:"loki,omitempty"` + Integrations integrations.ManagerConfig `yaml:"integrations"` + Tempo tempo.Config `yaml:"tempo,omitempty"` } // ApplyDefaults sets default values in the config diff --git a/pkg/integrations/agent/agent.go b/pkg/integrations/agent/agent.go index c893e23f7abe..a2621a107f96 100644 --- a/pkg/integrations/agent/agent.go +++ b/pkg/integrations/agent/agent.go @@ -6,35 +6,44 @@ package agent import ( "context" + "github.com/go-kit/kit/log" "github.com/gorilla/mux" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/prometheus/client_golang/prometheus/promhttp" ) // Config controls the Agent integration. type Config struct { - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` +} + +func (c *Config) Name() string { + return "agent" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} - // Enabled enables the Agent integration. - Enabled bool +func (c *Config) NewIntegration(_ log.Logger) (integrations.Integration, error) { + return New(c), nil +} + +func init() { + integrations.RegisterIntegration(&Config{}) } // Integration is the Agent integration. The Agent integration scrapes the // Agent's own metrics. type Integration struct { - c Config + c *Config } -func New(c Config) *Integration { +func New(c *Config) *Integration { return &Integration{c: c} } -// CommonConfig satisfies Integration.CommonConfig. -func (i *Integration) CommonConfig() config.Common { return i.c.CommonConfig } - -// Name satisfies Integration.Name. -func (i *Integration) Name() string { return "agent" } - // RegisterRoutes satisfies Integration.RegisterRoutes. func (i *Integration) RegisterRoutes(r *mux.Router) error { // Note that if the weaveworks common server is set to not register @@ -48,7 +57,7 @@ func (i *Integration) RegisterRoutes(r *mux.Router) error { // ScrapeConfigs satisfies Integration.ScrapeConfigs. func (i *Integration) ScrapeConfigs() []config.ScrapeConfig { return []config.ScrapeConfig{{ - JobName: i.Name(), + JobName: i.c.Name(), MetricsPath: "/metrics", }} } diff --git a/pkg/integrations/common/collector_integration.go b/pkg/integrations/collector_integration.go similarity index 79% rename from pkg/integrations/common/collector_integration.go rename to pkg/integrations/collector_integration.go index a5620ad0f7b4..0ecb2d7fb463 100644 --- a/pkg/integrations/common/collector_integration.go +++ b/pkg/integrations/collector_integration.go @@ -1,6 +1,4 @@ -// Package common implements a bare-bones Integration that can be used by -// exporters that have no logic associated with them. -package common +package integrations import ( "context" @@ -18,28 +16,20 @@ import ( // collector. type CollectorIntegration struct { name string - cfg config.Common c prometheus.Collector includeExporterMetrics bool } // NewCollectorIntegration creates a basic integration that exposes metrics // from a prometheus.Collector. -func NewCollectorIntegration(name string, cfg config.Common, c prometheus.Collector, includeExporterMetrics bool) *CollectorIntegration { +func NewCollectorIntegration(name string, c prometheus.Collector, includeExporterMetrics bool) *CollectorIntegration { return &CollectorIntegration{ name: name, - cfg: cfg, c: c, includeExporterMetrics: includeExporterMetrics, } } -// CommonConfig satisfies Integration.CommonConfig. -func (i *CollectorIntegration) CommonConfig() config.Common { return i.cfg } - -// Name satisfies Integration.Name. -func (i *CollectorIntegration) Name() string { return i.name } - // RegisterRoutes satisfies Integration.RegisterRoutes. The mux.Router provided // here is expected to be a subrouter, where all registered paths will be // registered within that subroute. @@ -84,7 +74,7 @@ func (i *CollectorIntegration) handler() (http.Handler, error) { // ScrapeConfigs satisfies Integration.ScrapeConfigs. func (i *CollectorIntegration) ScrapeConfigs() []config.ScrapeConfig { return []config.ScrapeConfig{{ - JobName: i.Name(), + JobName: i.name, MetricsPath: "/metrics", }} } diff --git a/pkg/integrations/common/integration.go b/pkg/integrations/common/integration.go deleted file mode 100644 index c8cd3f468a0e..000000000000 --- a/pkg/integrations/common/integration.go +++ /dev/null @@ -1,36 +0,0 @@ -package common - -import ( - "context" - - "github.com/gorilla/mux" - "github.com/grafana/agent/pkg/integrations/config" -) - -type Integration interface { - // Name returns the name of the integration. Each registered integration must - // have a unique name. - Name() string - - // CommonConfig returns the set of common configuration values present across - // all integrations. - CommonConfig() config.Common - - // RegisterRoutes should register any HTTP handlers used for the integration. - // - // The router provided to RegisterRoutes is a subrouter for the path - // /integrations/. All routes should register to the - // relative root path and will be automatically combined to the subroute. For - // example, if a metric "database" registers a /metrics endpoint, it will - // be exposed as /integrations/database/metrics. - RegisterRoutes(r *mux.Router) error - - // ScrapeConfigs should return a set of integration scrape configs that inform - // the integration how samples should be collected. - ScrapeConfigs() []config.ScrapeConfig - - // Run should start the integration and do any required tasks. Run should *not* - // exit until context is canceled. If an integration doesn't need to do anything, - // it should simply wait for ctx to be canceled. - Run(ctx context.Context) error -} diff --git a/pkg/integrations/config/config.go b/pkg/integrations/config/config.go index b02c3de6142e..9f14dfd9ce4a 100644 --- a/pkg/integrations/config/config.go +++ b/pkg/integrations/config/config.go @@ -11,10 +11,11 @@ import ( // Common is a set of common options shared by all integrations. It should be // utilised by an integration's config by inlining the common options: // -// type IntegrationConfig struct { -// Common config.Common `yaml:",inline"` -// } +// type IntegrationConfig struct { +// Common config.Common `yaml:",inline"` +// } type Common struct { + Enabled bool `yaml:"enabled"` ScrapeIntegration *bool `yaml:"scrape_integration"` ScrapeInterval time.Duration `yaml:"scrape_interval"` ScrapeTimeout time.Duration `yaml:"scrape_timeout"` diff --git a/pkg/integrations/consul_exporter/consul_exporter.go b/pkg/integrations/consul_exporter/consul_exporter.go index e761acb34a4e..d9f43bda2c46 100644 --- a/pkg/integrations/consul_exporter/consul_exporter.go +++ b/pkg/integrations/consul_exporter/consul_exporter.go @@ -5,7 +5,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" consul_api "github.com/hashicorp/consul/api" "github.com/prometheus/consul_exporter/pkg/exporter" @@ -21,10 +21,7 @@ var DefaultConfig = Config{ // Config controls the consul_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` Server string `yaml:"server"` CAFile string `yaml:"ca_file"` @@ -50,9 +47,25 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "consul_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new consul_exporter integration. The integration scrapes // metrics from a consul process. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { var ( consulOpts = exporter.ConsulOpts{ CAFile: c.CAFile, @@ -75,10 +88,5 @@ func New(log log.Logger, c Config) (common.Integration, error) { return nil, err } - return common.NewCollectorIntegration( - "consul_exporter", - c.CommonConfig, - e, - false, - ), nil + return integrations.NewCollectorIntegration(c.Name(), e, false), nil } diff --git a/pkg/integrations/dnsmasq_exporter/dnsmasq_exporter.go b/pkg/integrations/dnsmasq_exporter/dnsmasq_exporter.go index 383bb3d43c66..15ff18b9e829 100644 --- a/pkg/integrations/dnsmasq_exporter/dnsmasq_exporter.go +++ b/pkg/integrations/dnsmasq_exporter/dnsmasq_exporter.go @@ -4,7 +4,7 @@ package dnsmasq_exporter //nolint:golint import ( "github.com/go-kit/kit/log" "github.com/google/dnsmasq_exporter/collector" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/miekg/dns" ) @@ -17,10 +17,7 @@ var DefaultConfig Config = Config{ // Config controls the dnsmasq_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` // DnsmasqAddress is the address of the dnsmasq server (host:port). DnsmasqAddress string `yaml:"dnsmasq_address"` @@ -29,6 +26,18 @@ type Config struct { LeasesPath string `yaml:"leases_path"` } +func (c *Config) Name() string { + return "dnsmasq_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + // UnmarshalYAML implements yaml.Unmarshaler for Config. func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultConfig @@ -37,17 +46,16 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new dnsmasq_exporter integration. The integration scrapes metrics // from a dnsmasq server. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { exporter := collector.New(&dns.Client{ SingleInflight: true, }, c.DnsmasqAddress, c.LeasesPath) - return common.NewCollectorIntegration( - "dnsmasq_exporter", - c.CommonConfig, - exporter, - false, - ), nil + return integrations.NewCollectorIntegration(c.Name(), exporter, false), nil } diff --git a/pkg/integrations/install/install.go b/pkg/integrations/install/install.go new file mode 100644 index 000000000000..b0829c947431 --- /dev/null +++ b/pkg/integrations/install/install.go @@ -0,0 +1,15 @@ +// Package install registers all in-source integrations for use. +package install + +import ( + _ "github.com/grafana/agent/pkg/integrations/agent" // register agent + _ "github.com/grafana/agent/pkg/integrations/consul_exporter" // register consul_exporter + _ "github.com/grafana/agent/pkg/integrations/dnsmasq_exporter" // register dnsmasq_exporter + _ "github.com/grafana/agent/pkg/integrations/memcached_exporter" // register memcached_exporter + _ "github.com/grafana/agent/pkg/integrations/mysqld_exporter" // register mysqld_exporter + _ "github.com/grafana/agent/pkg/integrations/node_exporter" // register node_exporter + _ "github.com/grafana/agent/pkg/integrations/postgres_exporter" // register postgres_exporter + _ "github.com/grafana/agent/pkg/integrations/process_exporter" // register process_exporter + _ "github.com/grafana/agent/pkg/integrations/redis_exporter" // register redis_exporter + _ "github.com/grafana/agent/pkg/integrations/statsd_exporter" // register statsd_exporter +) diff --git a/pkg/integrations/integration.go b/pkg/integrations/integration.go new file mode 100644 index 000000000000..c9bf3e81fcd5 --- /dev/null +++ b/pkg/integrations/integration.go @@ -0,0 +1,43 @@ +package integrations + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/gorilla/mux" + "github.com/grafana/agent/pkg/integrations/config" +) + +// Config provides the configuration and constructor for an integration. +type Config interface { + // Name returns the name of the integration and the key that will be used to + // pull the configuration from the Agent config YAML. + Name() string + + // CommonConfig returns the set of common configuration values present across + // all integrations. + CommonConfig() config.Common + + // NewIntegration returns an integration for the given with the given logger. + NewIntegration(l log.Logger) (Integration, error) +} + +// An Integration is a process that integrates with some external system and +// pulls telemetry data. +type Integration interface { + // RegisterRoutes should register any HTTP handlers needed for the + // integrations. The mux router provided will be a subrouter for the path + // /integrations/, where the integration name is retrieved + // by the config that created this integration. + RegisterRoutes(r *mux.Router) error + + // ScrapeConfigs returns a set of scrape configs that determine where metrics + // can be scraped. + ScrapeConfigs() []config.ScrapeConfig + + // Run should start the integration and do any required tasks, if necessary. + // For example, an Integration that requires a persistent connection to a + // database would establish that connection here. If the integration doesn't + // need to do anything, it should wait for the ctx to be canceled. + Run(ctx context.Context) error +} diff --git a/pkg/integrations/manager.go b/pkg/integrations/manager.go index 357da18d0203..7d4cf3c50abe 100644 --- a/pkg/integrations/manager.go +++ b/pkg/integrations/manager.go @@ -10,17 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gorilla/mux" - "github.com/grafana/agent/pkg/integrations/agent" - "github.com/grafana/agent/pkg/integrations/common" - "github.com/grafana/agent/pkg/integrations/consul_exporter" - "github.com/grafana/agent/pkg/integrations/dnsmasq_exporter" - "github.com/grafana/agent/pkg/integrations/memcached_exporter" - "github.com/grafana/agent/pkg/integrations/mysqld_exporter" - "github.com/grafana/agent/pkg/integrations/node_exporter" - "github.com/grafana/agent/pkg/integrations/postgres_exporter" - "github.com/grafana/agent/pkg/integrations/process_exporter" - "github.com/grafana/agent/pkg/integrations/redis_exporter" - "github.com/grafana/agent/pkg/integrations/statsd_exporter" "github.com/grafana/agent/pkg/prom/instance" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -38,7 +27,7 @@ var ( ) var ( - DefaultConfig = Config{ + DefaultManagerConfig = ManagerConfig{ ScrapeIntegrations: true, IntegrationRestartBackoff: 5 * time.Second, UseHostnameLabel: true, @@ -46,33 +35,26 @@ var ( } ) -// Config holds the configuration for all integrations. -type Config struct { +// ManagerConfig holds the configuration for all integrations. +type ManagerConfig struct { // Whether the Integration subsystem should be enabled. Enabled bool `yaml:"-"` // When true, scrapes metrics from integrations. - ScrapeIntegrations bool `yaml:"scrape_integrations"` + ScrapeIntegrations bool `yaml:"scrape_integrations,omitempty"` // When true, replaces the instance label with the agent hostname. - ReplaceInstanceLabel bool `yaml:"replace_instance_label"` + ReplaceInstanceLabel bool `yaml:"replace_instance_label,omitempty"` // DEPRECATED. When true, adds an agent_hostname label to all samples from integrations. // ReplaceInstanceLabel should be used instead. - UseHostnameLabel bool `yaml:"use_hostname_label"` - - Agent agent.Config `yaml:"agent"` - NodeExporter node_exporter.Config `yaml:"node_exporter"` - ProcessExporter process_exporter.Config `yaml:"process_exporter"` - MysqldExporter mysqld_exporter.Config `yaml:"mysqld_exporter"` - RedisExporter redis_exporter.Config `yaml:"redis_exporter"` - DnsmasqExporter dnsmasq_exporter.Config `yaml:"dnsmasq_exporter"` - MemcachedExporter memcached_exporter.Config `yaml:"memcached_exporter"` - PostgresExporter postgres_exporter.Config `yaml:"postgres_exporter"` - StatsdExporter statsd_exporter.Config `yaml:"statsd_exporter"` - ConsulExporter consul_exporter.Config `yaml:"consul_exporter"` + UseHostnameLabel bool `yaml:"use_hostname_label,omitempty"` + + // The integration configs is merged with the manager config struct so we + // don't want to export it here; we'll manually unmarshal it in UnmarshalYAML. + Integrations Configs `yaml:"-"` // Extra labels to add for all integration samples - Labels model.LabelSet `yaml:"labels"` + Labels model.LabelSet `yaml:"labels,omitempty"` // Prometheus RW configs to use for all integrations. PrometheusRemoteWrite []*config.RemoteWriteConfig `yaml:"prometheus_remote_write,omitempty"` @@ -84,21 +66,25 @@ type Config struct { ListenPort *int `yaml:"-"` } -// UnmarshalYAML implements yaml.Unmarshaler for Config. -func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { - *c = DefaultConfig +// MarshalYAML implements yaml.Marshaler for ManagerConfig. +func (c ManagerConfig) MarshalYAML() (interface{}, error) { + return MarshalYAML(c) +} + +// UnmarshalYAML implements yaml.Unmarshaler for ManagerConfig. +func (c *ManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultManagerConfig - // If the Config is unmarshaled, it's present in the config and should be + // If the ManagerConfig is unmarshaled, it's present in the config and should be // enabled. c.Enabled = true - type plain Config - return unmarshal((*plain)(c)) + return UnmarshalYAML(c, unmarshal) } // DefaultRelabelConfigs returns the set of relabel configs that should be // prepended to all RelabelConfigs for an integration. -func (c *Config) DefaultRelabelConfigs() ([]*relabel.Config, error) { +func (c *ManagerConfig) DefaultRelabelConfigs() ([]*relabel.Config, error) { var cfgs []*relabel.Config if c.ReplaceInstanceLabel { @@ -124,9 +110,9 @@ func (c *Config) DefaultRelabelConfigs() ([]*relabel.Config, error) { // Manager manages a set of integrations and runs them. type Manager struct { - c Config + c ManagerConfig logger log.Logger - integrations []common.Integration + integrations map[Config]Integration hostname string defaultRelabelConfigs []*relabel.Config @@ -138,89 +124,24 @@ type Manager struct { // NewManager creates a new integrations manager. NewManager must be given an // InstanceManager which is responsible for accepting instance configs to // scrape and send metrics from running integrations. -func NewManager(c Config, logger log.Logger, im instance.Manager) (*Manager, error) { - var integrations []common.Integration - - if c.Agent.Enabled { - integrations = append(integrations, agent.New(c.Agent)) - } - if c.NodeExporter.Enabled { - l := log.With(logger, "integration", "node_exporter") - i, err := node_exporter.New(l, c.NodeExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.ProcessExporter.Enabled { - l := log.With(logger, "integration", "process_exporter") - i, err := process_exporter.New(l, c.ProcessExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.MysqldExporter.Enabled { - l := log.With(logger, "integration", "mysqld_exporter") - i, err := mysqld_exporter.New(l, c.MysqldExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.RedisExporter.Enabled { - l := log.With(logger, "integration", "redis_exporter") - i, err := redis_exporter.New(l, c.RedisExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.DnsmasqExporter.Enabled { - l := log.With(logger, "integration", "dnsmasq_exporter") - i, err := dnsmasq_exporter.New(l, c.DnsmasqExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.MemcachedExporter.Enabled { - l := log.With(logger, "integration", "memcached_exporter") - i, err := memcached_exporter.New(l, c.MemcachedExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.PostgresExporter.Enabled { - l := log.With(logger, "integration", "postgres_exporter") - i, err := postgres_exporter.New(l, c.PostgresExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.StatsdExporter.Enabled { - l := log.With(logger, "integration", "statsd_exporter") - i, err := statsd_exporter.New(l, c.StatsdExporter) - if err != nil { - return nil, err - } - integrations = append(integrations, i) - } - if c.ConsulExporter.Enabled { - l := log.With(logger, "integration", "consul_exporter") - i, err := consul_exporter.New(l, c.ConsulExporter) - if err != nil { - return nil, err +func NewManager(c ManagerConfig, logger log.Logger, im instance.Manager) (*Manager, error) { + integrations := make(map[Config]Integration) + + for _, integrationCfg := range c.Integrations { + if integrationCfg.CommonConfig().Enabled { + l := log.With(logger, "integration", integrationCfg.Name()) + i, err := integrationCfg.NewIntegration(l) + if err != nil { + return nil, err + } + integrations[integrationCfg] = i } - integrations = append(integrations, i) } return newManager(c, logger, im, integrations) } -func newManager(c Config, logger log.Logger, im instance.Manager, integrations []common.Integration) (*Manager, error) { +func newManager(c ManagerConfig, logger log.Logger, im instance.Manager, integrations map[Config]Integration) (*Manager, error) { defaultRelabels, err := c.DefaultRelabelConfigs() if err != nil { return nil, fmt.Errorf("cannot get default relabel configs: %w", err) @@ -254,34 +175,34 @@ func (m *Manager) run(ctx context.Context) { var wg sync.WaitGroup wg.Add(len(m.integrations)) - for _, i := range m.integrations { - go func(i common.Integration) { - m.runIntegration(ctx, i) + for cfg, i := range m.integrations { + go func(cfg Config, i Integration) { + m.runIntegration(ctx, cfg, i) wg.Done() - }(i) + }(cfg, i) } wg.Wait() close(m.done) } -func (m *Manager) runIntegration(ctx context.Context, i common.Integration) { +func (m *Manager) runIntegration(ctx context.Context, cfg Config, i Integration) { defer func() { if r := recover(); r != nil { err := fmt.Errorf("%v", r) - level.Error(m.logger).Log("msg", "integration has panicked. THIS IS A BUG!", "err", err, "integration", i.Name()) + level.Error(m.logger).Log("msg", "integration has panicked. THIS IS A BUG!", "err", err, "integration", cfg.Name()) } }() shouldCollect := m.c.ScrapeIntegrations - if common := i.CommonConfig(); common.ScrapeIntegration != nil { + if common := cfg.CommonConfig(); common.ScrapeIntegration != nil { shouldCollect = *common.ScrapeIntegration } if shouldCollect { // Apply the config so an instance is launched to scrape our integration. - instanceConfig := m.instanceConfigForIntegration(i) + instanceConfig := m.instanceConfigForIntegration(cfg, i) if err := m.im.ApplyConfig(instanceConfig); err != nil { - level.Error(m.logger).Log("msg", "failed to apply integration. integration will not run. THIS IS A BUG!", "err", err, "integration", i.Name()) + level.Error(m.logger).Log("msg", "failed to apply integration. integration will not run. THIS IS A BUG!", "err", err, "integration", cfg.Name()) return } } @@ -289,27 +210,27 @@ func (m *Manager) runIntegration(ctx context.Context, i common.Integration) { for { err := i.Run(ctx) if err != nil && err != context.Canceled { - integrationAbnormalExits.WithLabelValues(i.Name()).Inc() - level.Error(m.logger).Log("msg", "integration stopped abnormally, restarting after backoff", "err", err, "integration", i.Name(), "backoff", m.c.IntegrationRestartBackoff) + integrationAbnormalExits.WithLabelValues(cfg.Name()).Inc() + level.Error(m.logger).Log("msg", "integration stopped abnormally, restarting after backoff", "err", err, "integration", cfg.Name(), "backoff", m.c.IntegrationRestartBackoff) time.Sleep(m.c.IntegrationRestartBackoff) } else { - level.Info(m.logger).Log("msg", "stopped integration", "integration", i.Name()) + level.Info(m.logger).Log("msg", "stopped integration", "integration", cfg.Name()) break } } } -func (m *Manager) instanceConfigForIntegration(i common.Integration) instance.Config { - prometheusName := fmt.Sprintf("integration/%s", i.Name()) +func (m *Manager) instanceConfigForIntegration(cfg Config, i Integration) instance.Config { + prometheusName := fmt.Sprintf("integration/%s", cfg.Name()) - common := i.CommonConfig() + common := cfg.CommonConfig() relabelConfigs := append(m.defaultRelabelConfigs, common.RelabelConfigs...) var scrapeConfigs []*config.ScrapeConfig - for _, cfg := range i.ScrapeConfigs() { + for _, isc := range i.ScrapeConfigs() { sc := &config.ScrapeConfig{ - JobName: fmt.Sprintf("integrations/%s", cfg.JobName), - MetricsPath: path.Join("/integrations", i.Name(), cfg.MetricsPath), + JobName: fmt.Sprintf("integrations/%s", isc.JobName), + MetricsPath: path.Join("/integrations", cfg.Name(), isc.MetricsPath), Scheme: "http", HonorLabels: false, HonorTimestamps: true, @@ -350,8 +271,8 @@ func (m *Manager) scrapeServiceDiscovery() discovery.Configs { } func (m *Manager) WireAPI(r *mux.Router) error { - for _, i := range m.integrations { - integrationsRoot := fmt.Sprintf("/integrations/%s", i.Name()) + for c, i := range m.integrations { + integrationsRoot := fmt.Sprintf("/integrations/%s", c.Name()) subRouter := r.PathPrefix(integrationsRoot).Subrouter() err := i.RegisterRoutes(subRouter) diff --git a/pkg/integrations/manager_test.go b/pkg/integrations/manager_test.go index 8f6baba6fdfb..9e1d7ae7e7d7 100644 --- a/pkg/integrations/manager_test.go +++ b/pkg/integrations/manager_test.go @@ -9,7 +9,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/test" "github.com/go-kit/kit/log" "github.com/gorilla/mux" - "github.com/grafana/agent/pkg/integrations/common" "github.com/grafana/agent/pkg/integrations/config" "github.com/grafana/agent/pkg/prom/instance" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -21,6 +20,36 @@ import ( "gopkg.in/yaml.v2" ) +// Test that embedded integration fields in the struct can be unmarshaled and +// remarshaled back out to text. +func TestConfig_Remarshal(t *testing.T) { + RegisterIntegration(&testIntegrationA{}) + + cfgText := ` +scrape_integrations: true +replace_instance_label: true +integration_restart_backoff: 5s +use_hostname_label: true +test: + text: Hello, world! + truth: true +` + var ( + cfg ManagerConfig + listenPort int = 12345 + ) + require.NoError(t, yaml.Unmarshal([]byte(cfgText), &cfg)) + + // Listen port must be set before applying defaults. Normally applied by the + // config package. + cfg.ListenPort = &listenPort + + outBytes, err := yaml.Marshal(cfg) + require.NoError(t, err) + fmt.Println(string(outBytes)) + require.YAMLEq(t, cfgText, string(outBytes)) +} + func TestConfig_AddressRelabels(t *testing.T) { cfgText := ` agent: @@ -28,7 +57,7 @@ agent: ` var ( - cfg Config + cfg ManagerConfig listenPort int = 12345 ) require.NoError(t, yaml.Unmarshal([]byte(cfgText), &cfg)) @@ -52,8 +81,9 @@ agent: // applied to the instance manager are valid. func TestManager_ValidInstanceConfigs(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error { globalConfig := prom_config.DefaultConfig.GlobalConfig return c.ApplyDefaults(&globalConfig) @@ -70,16 +100,17 @@ func TestManager_ValidInstanceConfigs(t *testing.T) { func TestManager_instanceConfigForIntegration(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error { globalConfig := prom_config.DefaultConfig.GlobalConfig return c.ApplyDefaults(&globalConfig) }) - m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, []common.Integration{}) + m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, nil) require.NoError(t, err) defer m.Stop() - cfg := m.instanceConfigForIntegration(mock) + cfg := m.instanceConfigForIntegration(icfg, mock) // Validate that the generated MetricsPath is a valid URL path require.Len(t, cfg.ScrapeConfigs, 1) @@ -90,8 +121,9 @@ func TestManager_instanceConfigForIntegration(t *testing.T) { // when the ScrapeIntegrations flag is disabled. func TestManager_NoIntegrationsScrape(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error { globalConfig := prom_config.DefaultConfig.GlobalConfig return c.ApplyDefaults(&globalConfig) @@ -115,11 +147,12 @@ func TestManager_NoIntegrationsScrape(t *testing.T) { // when the ScrapeIntegration flag is disabled on the integration. func TestManager_NoIntegrationScrape(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} noScrape := false mock.commonCfg.ScrapeIntegration = &noScrape - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, func(c *instance.Config) error { globalConfig := prom_config.DefaultConfig.GlobalConfig return c.ApplyDefaults(&globalConfig) @@ -137,8 +170,9 @@ func TestManager_NoIntegrationScrape(t *testing.T) { // launch, TestManager applies a config and runs the integration. func TestManager_StartsIntegrations(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil) m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations) @@ -157,8 +191,9 @@ func TestManager_StartsIntegrations(t *testing.T) { func TestManager_RestartsIntegrations(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil) m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations) require.NoError(t, err) @@ -173,8 +208,9 @@ func TestManager_RestartsIntegrations(t *testing.T) { func TestManager_GracefulStop(t *testing.T) { mock := newMockIntegration() + icfg := mockConfig{integration: mock} - integrations := []common.Integration{mock} + integrations := map[Config]Integration{icfg: mock} im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory, nil) m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations) require.NoError(t, err) @@ -193,6 +229,16 @@ func TestManager_GracefulStop(t *testing.T) { }) } +type mockConfig struct { + integration *mockIntegration +} + +func (c mockConfig) Name() string { return "mock" } +func (c mockConfig) CommonConfig() config.Common { return c.integration.commonCfg } +func (c mockConfig) NewIntegration(_ log.Logger) (Integration, error) { + return c.integration, nil +} + type mockIntegration struct { commonCfg config.Common startedCount *atomic.Uint32 @@ -208,18 +254,18 @@ func newMockIntegration() *mockIntegration { } } -func (i *mockIntegration) Name() string { return "mock" } -func (i *mockIntegration) CommonConfig() config.Common { return i.commonCfg } func (i *mockIntegration) RegisterRoutes(r *mux.Router) error { r.Handle("/metrics", promhttp.Handler()) return nil } + func (i *mockIntegration) ScrapeConfigs() []config.ScrapeConfig { return []config.ScrapeConfig{{ JobName: "mock", MetricsPath: "/metrics", }} } + func (i *mockIntegration) Run(ctx context.Context) error { i.startedCount.Inc() i.running.Store(true) @@ -237,9 +283,9 @@ func mockInstanceFactory(_ instance.Config) (instance.ManagedInstance, error) { return instance.NoOpInstance{}, nil } -func mockManagerConfig() Config { +func mockManagerConfig() ManagerConfig { listenPort := 0 - return Config{ + return ManagerConfig{ ScrapeIntegrations: true, IntegrationRestartBackoff: 0, ListenPort: &listenPort, diff --git a/pkg/integrations/memcached_exporter/memcached_exporter.go b/pkg/integrations/memcached_exporter/memcached_exporter.go index 3b971a133397..554755d3a948 100644 --- a/pkg/integrations/memcached_exporter/memcached_exporter.go +++ b/pkg/integrations/memcached_exporter/memcached_exporter.go @@ -5,7 +5,7 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/prometheus/memcached_exporter/pkg/exporter" ) @@ -18,10 +18,7 @@ var DefaultConfig Config = Config{ // Config controls the memcached_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` // MemcachedAddress is the address of the memcached server (host:port). MemcachedAddress string `yaml:"memcached_address"` @@ -38,12 +35,27 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "memcached_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new memcached_exporter integration. The integration scrapes metrics // from a memcached server. -func New(log log.Logger, c Config) (common.Integration, error) { - return common.NewCollectorIntegration( - "memcached_exporter", - c.CommonConfig, +func New(log log.Logger, c *Config) (integrations.Integration, error) { + return integrations.NewCollectorIntegration( + c.Name(), exporter.New(c.MemcachedAddress, c.Timeout, log), false, ), nil diff --git a/pkg/integrations/mysqld_exporter/mysqld-exporter.go b/pkg/integrations/mysqld_exporter/mysqld-exporter.go index 377adaaae8fb..c8fee81559be 100644 --- a/pkg/integrations/mysqld_exporter/mysqld-exporter.go +++ b/pkg/integrations/mysqld_exporter/mysqld-exporter.go @@ -8,7 +8,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/prometheus/mysqld_exporter/collector" ) @@ -32,10 +32,7 @@ var DefaultConfig = Config{ // Config controls the mysqld_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` // DataSourceName to use to connect to MySQL. DataSourceName string `yaml:"data_source_name"` @@ -76,9 +73,25 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "mysqld_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new mysqld_exporter integration. The integration scrapes // metrics from a mysqld process. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { dsn := c.DataSourceName if len(dsn) == 0 { dsn = os.Getenv("MYSQLD_EXPORTER_DATA_SOURCE_NAME") @@ -98,9 +111,8 @@ func New(log log.Logger, c Config) (common.Integration, error) { level.Debug(log).Log("scraper", scraper.Name()) } - return common.NewCollectorIntegration( - "mysqld_exporter", - c.CommonConfig, + return integrations.NewCollectorIntegration( + c.Name(), exporter, false, ), nil @@ -109,7 +121,7 @@ func New(log log.Logger, c Config) (common.Integration, error) { // GetScrapers returns the set of *enabled* scrapers from the config. // Configurable scrapers will have their configuration filled out matching the // Config's settings. -func GetScrapers(c Config) []collector.Scraper { +func GetScrapers(c *Config) []collector.Scraper { scrapers := map[collector.Scraper]bool{ &collector.ScrapeAutoIncrementColumns{}: false, &collector.ScrapeBinlogSize{}: false, diff --git a/pkg/integrations/node_exporter/config.go b/pkg/integrations/node_exporter/config.go index f0521c9c4d66..afc960255b48 100644 --- a/pkg/integrations/node_exporter/config.go +++ b/pkg/integrations/node_exporter/config.go @@ -7,6 +7,8 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/prometheus/procfs" "github.com/prometheus/procfs/sysfs" @@ -61,10 +63,7 @@ func init() { // Config controls the node_exporter integration. type Config struct { - CommonConfig config.Common `yaml:",inline"` - - // Enabled enables the node_exporter integration. - Enabled bool `yaml:"enabled"` + Common config.Common `yaml:",inline"` IncludeExporterMetrics bool `yaml:"include_exporter_metrics"` @@ -119,6 +118,22 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "node_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // MapConfigToNodeExporterFlags takes in a node_exporter Config and converts // it to the set of flags that node_exporter usually expects when running as a // separate binary. diff --git a/pkg/integrations/node_exporter/node_exporter.go b/pkg/integrations/node_exporter/node_exporter.go index 28efb2287f06..58d220d24ad5 100644 --- a/pkg/integrations/node_exporter/node_exporter.go +++ b/pkg/integrations/node_exporter/node_exporter.go @@ -21,7 +21,7 @@ import ( // Integration is the node_exporter integration. The integration scrapes metrics // from the host Linux-based system. type Integration struct { - c Config + c *Config logger log.Logger nc *collector.NodeCollector @@ -29,12 +29,12 @@ type Integration struct { } // New creates a new node_exporter integration. -func New(log log.Logger, c Config) (*Integration, error) { +func New(log log.Logger, c *Config) (*Integration, error) { // NOTE(rfratto): this works as long as node_exporter is the only thing using // kingpin across the codebase. node_exporter may need a PR eventually to pass // in a custom kingpin application or expose methods to explicitly enable/disable // collectors that we can use instead of this command line hack. - flags, _ := MapConfigToNodeExporterFlags(&c) + flags, _ := MapConfigToNodeExporterFlags(c) level.Debug(log).Log("msg", "initializing node_exporter with flags converted from agent config", "flags", strings.Join(flags, " ")) _, err := kingpin.CommandLine.Parse(flags) @@ -66,12 +66,6 @@ func New(log log.Logger, c Config) (*Integration, error) { }, nil } -// CommonConfig satisfies Integration.CommonConfig. -func (i *Integration) CommonConfig() config.Common { return i.c.CommonConfig } - -// Name satisfies Integration.Name. -func (i *Integration) Name() string { return "node_exporter" } - // RegisterRoutes satisfies Integration.RegisterRoutes. The mux.Router provided // here is expected to be a subrouter, where all registered paths will be // registered within that subroute. @@ -101,8 +95,8 @@ func (i *Integration) handler() (http.Handler, error) { // Register node_exporter_build_info metrics, generally useful for // dashboards that depend on them for discovering targets. - if err := r.Register(version.NewCollector("node_exporter")); err != nil { - return nil, fmt.Errorf("couldn't register node_exporter: %w", err) + if err := r.Register(version.NewCollector(i.c.Name())); err != nil { + return nil, fmt.Errorf("couldn't register %s: %w", i.c.Name(), err) } if i.c.IncludeExporterMetrics { @@ -117,7 +111,7 @@ func (i *Integration) handler() (http.Handler, error) { // ScrapeConfigs satisfies Integration.ScrapeConfigs. func (i *Integration) ScrapeConfigs() []config.ScrapeConfig { return []config.ScrapeConfig{{ - JobName: i.Name(), + JobName: i.c.Name(), MetricsPath: "/metrics", }} } diff --git a/pkg/integrations/node_exporter/node_exporter_test.go b/pkg/integrations/node_exporter/node_exporter_test.go index 122731737fb2..5352772eed4e 100644 --- a/pkg/integrations/node_exporter/node_exporter_test.go +++ b/pkg/integrations/node_exporter/node_exporter_test.go @@ -39,7 +39,7 @@ func TestNodeExporter(t *testing.T) { // Check that the flags convert and the integration initiailizes logger := log.NewNopLogger() - integration, err := New(logger, cfg) + integration, err := New(logger, &cfg) require.NoError(t, err, "failed to setup node_exporter") r := mux.NewRouter() diff --git a/pkg/integrations/postgres_exporter/postgres_exporter.go b/pkg/integrations/postgres_exporter/postgres_exporter.go index 52ad69ce0efd..632352c52553 100644 --- a/pkg/integrations/postgres_exporter/postgres_exporter.go +++ b/pkg/integrations/postgres_exporter/postgres_exporter.go @@ -7,7 +7,7 @@ import ( "strings" "github.com/go-kit/kit/log" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/wrouesnel/postgres_exporter/exporter" ) @@ -16,10 +16,7 @@ var DefaultConfig = Config{} // Config controls the postgres_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` // DataSourceNames to use to connect to Postgres. DataSourceNames []string `yaml:"data_source_names"` @@ -37,9 +34,25 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "postgres_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new postgres_exporter integration. The integration scrapes // metrics from a postgres process. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { dsn := c.DataSourceNames if len(dsn) == 0 { dsn = strings.Split(os.Getenv("POSTGRES_EXPORTER_DATA_SOURCE_NAME"), ",") @@ -56,10 +69,5 @@ func New(log log.Logger, c Config) (common.Integration, error) { exporter.ExcludeDatabases(strings.Join(c.ExcludeDatabases, ",")), ) - return common.NewCollectorIntegration( - "postgres_exporter", - c.CommonConfig, - e, - false, - ), nil + return integrations.NewCollectorIntegration(c.Name(), e, false), nil } diff --git a/pkg/integrations/process_exporter/config.go b/pkg/integrations/process_exporter/config.go index 3048bfa9a0f3..fbeec551e3b1 100644 --- a/pkg/integrations/process_exporter/config.go +++ b/pkg/integrations/process_exporter/config.go @@ -2,6 +2,8 @@ package process_exporter //nolint:golint import ( + "github.com/go-kit/kit/log" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" exporter_config "github.com/ncabatoff/process-exporter/config" @@ -19,10 +21,7 @@ var ( // Config controls the process_exporter integration. type Config struct { - // Enabled enables the process_exporter integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` ProcessExporter exporter_config.MatcherRules `yaml:"process_names"` ProcFSPath string `yaml:"procfs_path"` @@ -38,3 +37,19 @@ func (c *Config) UnmarshalYAML(unmarshal func(v interface{}) error) error { type plain Config return unmarshal((*plain)(c)) } + +func (c *Config) Name() string { + return "process_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} diff --git a/pkg/integrations/process_exporter/process-exporter.go b/pkg/integrations/process_exporter/process-exporter.go index b832cab5c2f3..0bf60116106c 100644 --- a/pkg/integrations/process_exporter/process-exporter.go +++ b/pkg/integrations/process_exporter/process-exporter.go @@ -15,20 +15,14 @@ import ( // Integration is the process_exporter integration. On non-Linux platforms, // this integration does nothing and will print a warning if enabled. type Integration struct { - c Config + c *Config } -func New(logger log.Logger, c Config) (*Integration, error) { +func New(logger log.Logger, c *Config) (*Integration, error) { level.Warn(logger).Log("msg", "the process_exporter only works on Linux; enabling it otherwise will do nothing") return &Integration{c: c}, nil } -// CommonConfig satisfies Integration.CommonConfig. -func (i *Integration) CommonConfig() config.Common { return i.c.CommonConfig } - -// Name satisfies Integration.Name. -func (i *Integration) Name() string { return "process_exporter" } - // RegisterRoutes satisfies Integration.RegisterRoutes. func (i *Integration) RegisterRoutes(r *mux.Router) error { return nil diff --git a/pkg/integrations/process_exporter/process-exporter_linux.go b/pkg/integrations/process_exporter/process-exporter_linux.go index 246ead35412b..38741fad83e2 100644 --- a/pkg/integrations/process_exporter/process-exporter_linux.go +++ b/pkg/integrations/process_exporter/process-exporter_linux.go @@ -20,11 +20,11 @@ import ( // metrics based on information in the /proc filesystem for Linux. // Agent's own metrics. type Integration struct { - c Config + c *Config collector *collector.NamedProcessCollector } -func New(logger log.Logger, c Config) (*Integration, error) { +func New(logger log.Logger, c *Config) (*Integration, error) { cfg, err := c.ProcessExporter.ToConfig() if err != nil { return nil, fmt.Errorf("process_names is invalid: %w", err) @@ -46,12 +46,6 @@ func New(logger log.Logger, c Config) (*Integration, error) { return &Integration{c: c, collector: pc}, nil } -// CommonConfig satisfies Integration.CommonConfig. -func (i *Integration) CommonConfig() config.Common { return i.c.CommonConfig } - -// Name satisfies Integration.Name. -func (i *Integration) Name() string { return "process_exporter" } - // RegisterRoutes satisfies Integration.RegisterRoutes. func (i *Integration) RegisterRoutes(r *mux.Router) error { handler, err := i.handler() @@ -88,7 +82,7 @@ func (i *Integration) handler() (http.Handler, error) { // ScrapeConfigs satisfies Integration.ScrapeConfigs. func (i *Integration) ScrapeConfigs() []config.ScrapeConfig { return []config.ScrapeConfig{{ - JobName: i.Name(), + JobName: i.c.Name(), MetricsPath: "/metrics", }} } diff --git a/pkg/integrations/redis_exporter/redis_exporter.go b/pkg/integrations/redis_exporter/redis_exporter.go index cc8434b2087a..185d05c1c8f9 100644 --- a/pkg/integrations/redis_exporter/redis_exporter.go +++ b/pkg/integrations/redis_exporter/redis_exporter.go @@ -14,7 +14,7 @@ import ( re "github.com/oliver006/redis_exporter/exporter" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" ) @@ -29,8 +29,7 @@ var DefaultConfig = Config{ // Config controls the redis_exporter integration. type Config struct { - Enabled bool `yaml:"enabled"` - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` IncludeExporterMetrics bool `yaml:"include_exporter_metrics"` @@ -96,9 +95,25 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "redis_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // New creates a new redis_exporter integration. The integration queries // a redis instance's INFO and exposes the results as metrics. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { level.Debug(log).Log("msg", "initialising redis_exporer with config %v", c) exporterConfig := c.GetExporterOptions() @@ -157,9 +172,8 @@ func New(log log.Logger, c Config) (common.Integration, error) { return nil, fmt.Errorf("failed to create redis exporter: %w", err) } - return common.NewCollectorIntegration( - "redis_exporter", - c.CommonConfig, + return integrations.NewCollectorIntegration( + c.Name(), exporter, c.IncludeExporterMetrics, ), nil diff --git a/pkg/integrations/redis_exporter/redis_exporter_test.go b/pkg/integrations/redis_exporter/redis_exporter_test.go index eb4b4f25e9c0..2d0fe620bd6f 100644 --- a/pkg/integrations/redis_exporter/redis_exporter_test.go +++ b/pkg/integrations/redis_exporter/redis_exporter_test.go @@ -104,7 +104,7 @@ func TestRedisCases(t *testing.T) { for _, test := range tt { t.Run(test.name, func(t *testing.T) { - integration, err := New(logger, test.cfg) + integration, err := New(logger, &test.cfg) if test.expectConstructorError { require.Error(t, err, "expected failure when setting up redis_exporter") diff --git a/pkg/integrations/register.go b/pkg/integrations/register.go new file mode 100644 index 000000000000..f98e2900b5f0 --- /dev/null +++ b/pkg/integrations/register.go @@ -0,0 +1,240 @@ +package integrations + +import ( + "fmt" + "reflect" + "strings" + + "gopkg.in/yaml.v2" +) + +var ( + registeredIntegrations = []Config{} + configFieldNames = make(map[reflect.Type]string) + + emptyStructType = reflect.TypeOf(struct{}{}) + configsType = reflect.TypeOf(Configs{}) +) + +// RegisterIntegration dynamically registers a new integration. The Config +// will represent the configuration that controls the specific integration. +// Registered Configs may be loaded using UnmarshalYAML or manually +// constructed. +// +// RegisterIntegration panics if cfg is not a pointer. +func RegisterIntegration(cfg Config) { + if reflect.TypeOf(cfg).Kind() != reflect.Ptr { + panic(fmt.Sprintf("RegisterIntegration must be given a pointer, got %T", cfg)) + } + registeredIntegrations = append(registeredIntegrations, cfg) + configFieldNames[reflect.TypeOf(cfg)] = cfg.Name() +} + +// Configs is a list of integrations. +type Configs []Config + +func (c *Configs) UnmarshalYAML(unmarshal func(interface{}) error) error { + return c.unmarshalWithIntegrations(registeredIntegrations, unmarshal) +} + +func (c *Configs) unmarshalWithIntegrations(integrations []Config, unmarshal func(interface{}) error) error { + // Create a dynamic struct type full of our registered integrations and + // unmarshal to it. + var fields []reflect.StructField + for _, cfg := range integrations { + fields = append(fields, reflect.StructField{ + Name: "Config_" + cfg.Name(), + Tag: reflect.StructTag(fmt.Sprintf(`yaml:"%s"`, cfg.Name())), + Type: reflect.TypeOf(cfg), + }) + } + + var ( + structType = reflect.StructOf(fields) + structVal = reflect.New(structType) + ) + if err := unmarshal(structVal.Interface()); err != nil { + return err + } + + // Go over all non-nil fields in structVal and append them to c. + structVal = structVal.Elem() + for i := 0; i < structVal.NumField(); i++ { + if structVal.Field(i).IsNil() { + continue + } + + val := structVal.Field(i).Interface().(Config) + *c = append(*c, val) + } + + return nil +} + +// MarshalYAML helps implement yaml.Marshaller for structs that have a Configs +// field that should be inlined in the YAML string. +func MarshalYAML(v interface{}) (interface{}, error) { + inVal := reflect.ValueOf(v) + for inVal.Kind() == reflect.Ptr { + inVal = inVal.Elem() + } + if inVal.Kind() != reflect.Struct { + return nil, fmt.Errorf("integrations: can only marshal a struct, got %T", v) + } + inType := inVal.Type() + + var ( + cfgType = getConfigTypeForIntegrations(registeredIntegrations, inType) + cfgPointer = reflect.New(cfgType) + cfgVal = cfgPointer.Elem() + ) + + // Copy over any existing value from inVal to cfgVal. + // + // The ordering of fields in inVal and cfgVal match identically up until the + // extra fields appended to the end of cfgVal. + var configs Configs + for i, n := 0, inType.NumField(); i < n; i++ { + if inType.Field(i).Type == configsType { + configs = inVal.Field(i).Interface().(Configs) + } + if cfgType.Field(i).PkgPath != "" { + continue // Field is unexported: ignore. + } + cfgVal.Field(i).Set(inVal.Field(i)) + } + if configs == nil { + return nil, fmt.Errorf("discovery: Configs field not found in type: %T", v) + } + + for _, c := range configs { + fieldName, ok := configFieldNames[reflect.TypeOf(c)] + if !ok { + return nil, fmt.Errorf("discovery: cannot marshal unregistered Config type: %T", c) + } + field := cfgVal.FieldByName("XXX_Config_" + fieldName) + field.Set(reflect.ValueOf(c)) + } + + return cfgPointer.Interface(), nil +} + +// UnmarshalYAML helps implement yaml.Unmarshaller for structs that have a +// Configs field that should be inlined in the YAML string. +func UnmarshalYAML(out interface{}, unmarshal func(interface{}) error) error { + return unmarshalIntegrationsWithList(registeredIntegrations, out, unmarshal) +} + +// unmarshalIntegrationsWithList unmarshals to a subtype of out that has a +// field added for every integration in integrations. Code adapted from +// Prometheus: +// +// https://github.com/prometheus/prometheus/blob/511511324adfc4f4178f064cc104c2deac3335de/discovery/registry.go#L111 +func unmarshalIntegrationsWithList(integrations []Config, out interface{}, unmarshal func(interface{}) error) error { + outVal := reflect.ValueOf(out) + if outVal.Kind() != reflect.Ptr { + return fmt.Errorf("integrations: can only unmarshal into a struct pointer, got %T", out) + } + outVal = outVal.Elem() + if outVal.Kind() != reflect.Struct { + return fmt.Errorf("integrations: can only unmarshal into a struct pointer, got %T", out) + } + outType := outVal.Type() + + var ( + cfgType = getConfigTypeForIntegrations(integrations, outType) + cfgPointer = reflect.New(cfgType) + cfgVal = cfgPointer.Elem() + ) + + // Copy over any existing value from outVal to cfgVal. + // + // The ordering of fields in outVal and cfgVal match identically up until the + // extra fields appended to the end of cfgVal. + var configs *Configs + for i := 0; i < outVal.NumField(); i++ { + if outType.Field(i).Type == configsType { + if configs != nil { + return fmt.Errorf("integrations: Multiple Configs fields found in %T", out) + } + configs = outVal.Field(i).Addr().Interface().(*Configs) + continue + } + if cfgType.Field(i).PkgPath != "" { + // Ignore unexported fields + continue + } + cfgVal.Field(i).Set(outVal.Field(i)) + } + if configs == nil { + return fmt.Errorf("integrations: No Configs field found in %T", out) + } + + // Unmarshal into our dynamic type. + if err := unmarshal(cfgPointer.Interface()); err != nil { + return replaceYAMLTypeError(err, cfgType, outType) + } + + // Copy back unmarshaled fields that were originally in outVal. + for i := 0; i < outVal.NumField(); i++ { + if cfgType.Field(i).PkgPath != "" { + // Ignore unexported fields + continue + } + outVal.Field(i).Set(cfgVal.Field(i)) + } + + // Iterate through the remainder of our fields, which should all be of + // type Config. + for i := outVal.NumField(); i < cfgVal.NumField(); i++ { + field := cfgVal.Field(i) + + if field.IsNil() { + continue + } + val := cfgVal.Field(i).Interface().(Config) + *configs = append(*configs, val) + } + + return nil +} + +// getConfigTypeForIntegrations returns a dynamic struct type that has all of +// the same fields as out including the fields for the provided integrations. +func getConfigTypeForIntegrations(integrations []Config, out reflect.Type) reflect.Type { + // Initial exported fields map one-to-one. + var fields []reflect.StructField + for i, n := 0, out.NumField(); i < n; i++ { + switch field := out.Field(i); { + case field.PkgPath == "" && field.Type != configsType: + fields = append(fields, field) + default: + fields = append(fields, reflect.StructField{ + Name: "_" + field.Name, // Field must be unexported. + PkgPath: out.PkgPath(), + Type: emptyStructType, + }) + } + } + for _, cfg := range integrations { + // Use a prefix that's unlikely to collide with anything else. + fieldName := "XXX_Config_" + cfg.Name() + fields = append(fields, reflect.StructField{ + Name: fieldName, + Tag: reflect.StructTag(fmt.Sprintf(`yaml:"%s"`, cfg.Name())), + Type: reflect.TypeOf(cfg), + }) + } + return reflect.StructOf(fields) +} + +func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error { + if e, ok := err.(*yaml.TypeError); ok { + oldStr := oldTyp.String() + newStr := newTyp.String() + for i, s := range e.Errors { + e.Errors[i] = strings.Replace(s, oldStr, newStr, -1) + } + } + return err +} diff --git a/pkg/integrations/register_test.go b/pkg/integrations/register_test.go new file mode 100644 index 000000000000..80369034580a --- /dev/null +++ b/pkg/integrations/register_test.go @@ -0,0 +1,93 @@ +package integrations + +import ( + "fmt" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/grafana/agent/pkg/integrations/config" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestIntegrationRegistration(t *testing.T) { + // This test checks for a few things: + // + // 1. Registered integrations will be parseable + // 2. Registered integrations that are not present will not be unmarshaled to + // the list of configs + // 3. Registered integrations that have defaults may still be parsed + // 4. Strict parsing should still work as expected. + + var cfgToParse = ` +name: John Doe +duration: 500ms +test: + text: Hello, world! +` + + var fullCfg testFullConfig + err := yaml.UnmarshalStrict([]byte(cfgToParse), &fullCfg) + require.NoError(t, err) + + expect := testFullConfig{ + Name: "John Doe", + Duration: 500 * time.Millisecond, + Default: 12345, + Configs: []Config{ + &testIntegrationA{Text: "Hello, world!", Truth: true}, + }, + } + require.Equal(t, expect, fullCfg) +} + +type testIntegrationA struct { + Text string `yaml:"text"` + Truth bool `yaml:"truth"` +} + +func (i *testIntegrationA) Name() string { return "test" } +func (i *testIntegrationA) CommonConfig() config.Common { return config.Common{} } + +func (i *testIntegrationA) NewIntegration(l log.Logger) (Integration, error) { + return nil, fmt.Errorf("not implemented") +} + +func (i *testIntegrationA) UnmarshalYAML(unmarshal func(interface{}) error) error { + i.Truth = true + type plain testIntegrationA + return unmarshal((*plain)(i)) +} + +type testIntegrationB struct { + Text string `yaml:"text"` +} + +func (*testIntegrationB) Name() string { return "shouldnotbefound" } +func (*testIntegrationB) CommonConfig() config.Common { return config.Common{} } + +func (*testIntegrationB) NewIntegration(l log.Logger) (Integration, error) { + return nil, fmt.Errorf("not implemented") +} + +type testFullConfig struct { + // Some random fields that will also be exposed + Name string `yaml:"name"` + Duration time.Duration `yaml:"duration"` + Default int `yaml:"default"` + + Configs Configs `yaml:"-"` +} + +func (c *testFullConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + // This default value should not change. + c.Default = 12345 + + // Mock out registered integrations. + registered := []Config{ + &testIntegrationA{}, + &testIntegrationB{}, + } + return unmarshalIntegrationsWithList(registered, c, unmarshal) +} diff --git a/pkg/integrations/statsd_exporter/statsd_exporter.go b/pkg/integrations/statsd_exporter/statsd_exporter.go index 60fce10b75b8..d1b551fa2a2a 100644 --- a/pkg/integrations/statsd_exporter/statsd_exporter.go +++ b/pkg/integrations/statsd_exporter/statsd_exporter.go @@ -12,7 +12,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gorilla/mux" - "github.com/grafana/agent/pkg/integrations/common" + "github.com/grafana/agent/pkg/integrations" "github.com/grafana/agent/pkg/integrations/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -45,10 +45,7 @@ var DefaultConfig = Config{ // Config controls the statsd_exporter integration. type Config struct { - // Enabled enables the integration. - Enabled bool `yaml:"enabled"` - - CommonConfig config.Common `yaml:",inline"` + Common config.Common `yaml:",inline"` ListenUDP string `yaml:"listen_udp"` ListenTCP string `yaml:"listen_tcp"` @@ -77,9 +74,25 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(c)) } +func (c *Config) Name() string { + return "statsd_exporter" +} + +func (c *Config) CommonConfig() config.Common { + return c.Common +} + +func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) { + return New(l, c) +} + +func init() { + integrations.RegisterIntegration(&Config{}) +} + // Exporters defines the statsd_exporter integration. type Exporter struct { - cfg Config + cfg *Config reg *prometheus.Registry metrics *Metrics exporter *exporter.Exporter @@ -88,7 +101,7 @@ type Exporter struct { // New creates a new statsd_exporter integration. The integration scrapes // metrics from a statsd process. -func New(log log.Logger, c Config) (common.Integration, error) { +func New(log log.Logger, c *Config) (integrations.Integration, error) { reg := prometheus.NewRegistry() m, err := NewMetrics(reg) @@ -132,13 +145,7 @@ func New(log log.Logger, c Config) (common.Integration, error) { }, nil } -// Name satisfies common.Integration. -func (e *Exporter) Name() string { return "statsd_exporter" } - -// CommonConfig satisfies common.Integration. -func (e *Exporter) CommonConfig() config.Common { return e.cfg.CommonConfig } - -// RegisterRoutes satisfies common.Integration. The mux.Router provided +// RegisterRoutes satisfies integrations.Integration. The mux.Router provided // here is expected to be a subrouter, where all registered paths will be // registered within that subroute. func (e *Exporter) RegisterRoutes(r *mux.Router) error { @@ -152,7 +159,7 @@ func (e *Exporter) RegisterRoutes(r *mux.Router) error { // ScrapeConfigs satisfies Integration.ScrapeConfigs. func (e *Exporter) ScrapeConfigs() []config.ScrapeConfig { - return []config.ScrapeConfig{{JobName: e.Name(), MetricsPath: "/metrics"}} + return []config.ScrapeConfig{{JobName: e.cfg.Name(), MetricsPath: "/metrics"}} } // Run satisfies Run. diff --git a/vendor/github.com/grafana/loki/pkg/build/build.go b/vendor/github.com/grafana/loki/pkg/build/build.go deleted file mode 100644 index bab8f4d52eb1..000000000000 --- a/vendor/github.com/grafana/loki/pkg/build/build.go +++ /dev/null @@ -1,23 +0,0 @@ -package build - -import "github.com/prometheus/common/version" - -// Version information passed to Prometheus version package. -// Package path as used by linker changes based on vendoring being used or not, -// so it's easier just to use stable Loki path, and pass it to -// Prometheus in the code. -var ( - Version string - Revision string - Branch string - BuildUser string - BuildDate string -) - -func init() { - version.Version = Version - version.Revision = Revision - version.Branch = Branch - version.BuildUser = BuildUser - version.BuildDate = BuildDate -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0166a4a69512..da719ece0904 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -269,7 +269,6 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket # github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 -github.com/grafana/loki/pkg/build github.com/grafana/loki/pkg/distributor github.com/grafana/loki/pkg/helpers github.com/grafana/loki/pkg/ingester/client