Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to new cluster package #472

Merged
merged 4 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewEntryPoint(logger log.Logger, cfg *config.Config) (*Entrypoint, error) {
}

if cfg.Integrations.Enabled {
manager, err = integrations.NewManager(cfg.Integrations, logger, promMetrics.InstanceManager())
manager, err = integrations.NewManager(cfg.Integrations, logger, promMetrics.InstanceManager(), promMetrics.Validate)
if err != nil {
return nil, err

Expand Down
27 changes: 16 additions & 11 deletions example/k3d/environment/main.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ local etcd = import 'etcd/main.libsonnet';
local agent_cluster = import 'grafana-agent/scraping-svc/main.libsonnet';
local k = import 'ksonnet-util/kausal.libsonnet';

local grafana_agent = import 'grafana-agent/v1/main.libsonnet';
local loki_config = import 'default/loki_config.libsonnet';
local grafana_agent = import 'grafana-agent/v1/main.libsonnet';

local service = k.core.v1.service;
local images = {
Expand Down Expand Up @@ -46,8 +46,9 @@ local images = {
// set by external_labels.
scrape_configs: std.map(function(config) config {
relabel_configs+: [{
target_label: 'cluster', replacement: cluster_label,
}]
target_label: 'cluster',
replacement: cluster_label,
}],
}, super.scrape_configs),
}) +
grafana_agent.withRemoteWrite([{
Expand Down Expand Up @@ -92,13 +93,17 @@ local images = {
},
},

// We want our cluster and agent labels to remain static
// for this deployment, so if they are overwritten by a metric
// we will change them to the values set by external_labels.
kubernetes_scrape_configs: std.map(function(config) config {
relabel_configs+: [
{ target_label: 'cluster', replacement: cluster_label },
],
}, super.deployment_scrape_configs + super.kubernetes_scrape_configs),
kubernetes_scrape_configs:
(grafana_agent.scrapeInstanceKubernetes {
// We want our cluster and label to remain static for this deployment, so
// if they are overwritten by a metric we will change them to the values
// set by external_labels.
scrape_configs: std.map(function(config) config {
relabel_configs+: [{
target_label: 'cluster',
replacement: cluster_label,
}],
}, super.scrape_configs),
}).scrape_configs,
}),
}
2 changes: 1 addition & 1 deletion pkg/agentctl/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"testing"

"github.com/grafana/agent/pkg/prom/ha/configapi"
"github.com/grafana/agent/pkg/prom/cluster/configapi"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/stretchr/testify/require"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"net/http"
"strings"

"github.com/grafana/agent/pkg/prom/ha/configapi"
"github.com/grafana/agent/pkg/prom/cluster/configapi"
"github.com/grafana/agent/pkg/prom/instance"
"gopkg.in/yaml.v2"
)
Expand Down
20 changes: 14 additions & 6 deletions pkg/integrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/grafana/agent/pkg/prom/instance/configstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -127,15 +128,16 @@ type Manager struct {
hostname string
defaultRelabelConfigs []*relabel.Config

im instance.Manager
cancel context.CancelFunc
done chan bool
im instance.Manager
cancel context.CancelFunc
done chan bool
validator configstore.Validator
}

// NewManager creates a new integrations manager. NewManager must be given an
// InstanceManager which is responsible for accepting instance configs to
// scrape and send metrics from running integrations.
func NewManager(c ManagerConfig, logger log.Logger, im instance.Manager) (*Manager, error) {
func NewManager(c ManagerConfig, logger log.Logger, im instance.Manager, validate configstore.Validator) (*Manager, error) {
integrations := make(map[Config]Integration)

for _, integrationCfg := range c.Integrations {
Expand All @@ -149,10 +151,10 @@ func NewManager(c ManagerConfig, logger log.Logger, im instance.Manager) (*Manag
}
}

return newManager(c, logger, im, integrations)
return newManager(c, logger, im, integrations, validate)
}

func newManager(c ManagerConfig, logger log.Logger, im instance.Manager, integrations map[Config]Integration) (*Manager, error) {
func newManager(c ManagerConfig, logger log.Logger, im instance.Manager, integrations map[Config]Integration, validate configstore.Validator) (*Manager, error) {
defaultRelabels, err := c.DefaultRelabelConfigs()
if err != nil {
return nil, fmt.Errorf("cannot get default relabel configs: %w", err)
Expand All @@ -168,6 +170,7 @@ func newManager(c ManagerConfig, logger log.Logger, im instance.Manager, integra
im: im,
cancel: cancel,
done: make(chan bool),
validator: validate,
}

if c.UseHostnameLabel {
Expand Down Expand Up @@ -213,6 +216,11 @@ func (m *Manager) runIntegration(ctx context.Context, cfg Config, i Integration)
// Apply the config so an instance is launched to scrape our integration.
instanceConfig := m.instanceConfigForIntegration(cfg, i)

if err := m.validator(&instanceConfig); err != nil {
level.Error(m.logger).Log("msg", "failed to apply integration. integration will not run", "err", err, "integration", cfg.Name())
return
}

if err := m.im.ApplyConfig(instanceConfig); err != nil {
level.Error(m.logger).Log("msg", "failed to apply integration. integration will not run", "err", err, "integration", cfg.Name())
return
Expand Down
14 changes: 8 additions & 6 deletions pkg/integrations/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"gopkg.in/yaml.v2"
)

func noOpValidator(*instance.Config) error { return nil }

// Test that embedded integration fields in the struct can be unmarshaled and
// remarshaled back out to text.
func TestConfig_Remarshal(t *testing.T) {
Expand Down Expand Up @@ -86,7 +88,7 @@ func TestManager_instanceConfigForIntegration(t *testing.T) {
icfg := mockConfig{integration: mock}

im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, nil)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, nil, noOpValidator)
require.NoError(t, err)
defer m.Stop()

Expand All @@ -109,7 +111,7 @@ func TestManager_NoIntegrationsScrape(t *testing.T) {
cfg := mockManagerConfig()
cfg.ScrapeIntegrations = false

m, err := newManager(cfg, log.NewNopLogger(), im, integrations)
m, err := newManager(cfg, log.NewNopLogger(), im, integrations, noOpValidator)
require.NoError(t, err)
defer m.Stop()

Expand All @@ -132,7 +134,7 @@ func TestManager_NoIntegrationScrape(t *testing.T) {
integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)

m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations, noOpValidator)
require.NoError(t, err)
defer m.Stop()

Expand All @@ -149,7 +151,7 @@ func TestManager_StartsIntegrations(t *testing.T) {
integrations := map[Config]Integration{icfg: mock}

im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations, noOpValidator)
require.NoError(t, err)
defer m.Stop()

Expand All @@ -169,7 +171,7 @@ func TestManager_RestartsIntegrations(t *testing.T) {

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations, noOpValidator)
require.NoError(t, err)
defer m.Stop()

Expand All @@ -186,7 +188,7 @@ func TestManager_GracefulStop(t *testing.T) {

integrations := map[Config]Integration{icfg: mock}
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations)
m, err := newManager(mockManagerConfig(), log.NewNopLogger(), im, integrations, noOpValidator)
require.NoError(t, err)

test.Poll(t, time.Second, 1, func() interface{} {
Expand Down
47 changes: 27 additions & 20 deletions pkg/prom/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/agent/pkg/prom/ha"
"github.com/grafana/agent/pkg/prom/ha/client"
"github.com/grafana/agent/pkg/prom/cluster"
"github.com/grafana/agent/pkg/prom/cluster/client"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
Expand All @@ -25,7 +25,7 @@ var (
InstanceRestartBackoff: instance.DefaultBasicManagerConfig.InstanceRestartBackoff,
WALCleanupAge: DefaultCleanupAge,
WALCleanupPeriod: DefaultCleanupPeriod,
ServiceConfig: ha.DefaultConfig,
ServiceConfig: cluster.DefaultConfig,
ServiceClientConfig: client.DefaultConfig,
InstanceMode: instance.DefaultMode,
}
Expand All @@ -41,7 +41,7 @@ type Config struct {
WALDir string `yaml:"wal_directory"`
WALCleanupAge time.Duration `yaml:"wal_cleanup_age"`
WALCleanupPeriod time.Duration `yaml:"wal_cleanup_period"`
ServiceConfig ha.Config `yaml:"scraping_service"`
ServiceConfig cluster.Config `yaml:"scraping_service"`
ServiceClientConfig client.Config `yaml:"scraping_service_client"`
Configs []instance.Config `yaml:"configs,omitempty"`
InstanceRestartBackoff time.Duration `yaml:"instance_restart_backoff,omitempty"`
Expand All @@ -58,7 +58,13 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
c.Enabled = true

type plain Config
return unmarshal((*plain)(c))
err := unmarshal((*plain)(c))
if err != nil {
return err
}

c.ServiceConfig.Client = c.ServiceClientConfig
return nil
}

// ApplyDefaults applies default values to the Config and validates it.
Expand Down Expand Up @@ -124,7 +130,7 @@ type Agent struct {

instanceFactory instanceFactory

ha *ha.Server
cluster *cluster.Cluster
}

// New creates and starts a new Agent.
Expand Down Expand Up @@ -171,12 +177,9 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins
return nil, fmt.Errorf("one or more configs was found to be invalid")
}

if cfg.ServiceConfig.Enabled {
var err error
a.ha, err = ha.New(reg, cfg.ServiceConfig, &cfg.Global, cfg.ServiceClientConfig, a.logger, a.mm, cfg.RemoteWrite)
if err != nil {
return nil, err
}
a.cluster, err = cluster.New(a.logger, reg, cfg.ServiceConfig, a.mm, a.Validate)
if err != nil {
return nil, err
}

return a, nil
Expand All @@ -197,22 +200,26 @@ func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error)
return a.instanceFactory(reg, a.cfg.Global, c, a.cfg.WALDir, a.logger)
}

func (a *Agent) WireGRPC(s *grpc.Server) {
if a.cfg.ServiceConfig.Enabled {
a.ha.WireGRPC(s)
// Validate will validate the incoming Config and mutate it to apply defaults.
func (a *Agent) Validate(c *instance.Config) error {
if err := c.ApplyDefaults(&a.cfg.Global, a.cfg.RemoteWrite); err != nil {
return fmt.Errorf("failed to apply defaults to %q: %w", c.Name, err)
}

return nil
}

func (a *Agent) WireGRPC(s *grpc.Server) {
a.cluster.WireGRPC(s)
}

func (a *Agent) Config() Config { return a.cfg }
func (a *Agent) InstanceManager() instance.Manager { return a.mm }

// Stop stops the agent and all its instances.
func (a *Agent) Stop() {
if a.ha != nil {
if err := a.ha.Stop(); err != nil {
level.Error(a.logger).Log("msg", "failed to stop scraping service server", "err", err)
}
}
a.cluster.Stop()

a.cleaner.Stop()

// Only need to stop the ModalManager, which will passthrough everything to the
Expand Down
File renamed without changes.
Loading