Skip to content

Commit

Permalink
Implement Applyconfig for Entrypoint (#490)
Browse files Browse the repository at this point in the history
* implement applyconfig for entrypoint

* require WAL when scraping service is enabled

* s/NewEntryPoint/NewEntrypoint

* address review feedback
  • Loading branch information
rfratto authored Mar 25, 2021
1 parent 11a74e6 commit b21ae0c
Show file tree
Hide file tree
Showing 16 changed files with 958 additions and 186 deletions.
119 changes: 0 additions & 119 deletions cmd/agent/agent.go

This file was deleted.

170 changes: 170 additions & 0 deletions cmd/agent/entrypoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package main

import (
"fmt"
"net/http"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/loki"
"github.com/grafana/agent/pkg/tempo"

"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/prom"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)

// Entrypoint is the entrypoint of the application that starts all subsystems.
type Entrypoint struct {
mut sync.Mutex

log log.Logger
cfg config.Config

srv *server.Server
promMetrics *prom.Agent
lokiLogs *loki.Loki
tempoTraces *tempo.Tempo
manager *integrations.Manager
}

// NewEntrypoint creates a new Entrypoint.
func NewEntrypoint(logger log.Logger, cfg *config.Config) (*Entrypoint, error) {
var (
ep = Entrypoint{log: logger}
err error
)

ep.promMetrics, err = prom.New(prometheus.DefaultRegisterer, cfg.Prometheus, logger)
if err != nil {
return nil, err
}

ep.lokiLogs, err = loki.New(prometheus.DefaultRegisterer, cfg.Loki, logger)
if err != nil {
return nil, err
}

ep.tempoTraces, err = tempo.New(prometheus.DefaultRegisterer, cfg.Tempo, cfg.Server.LogLevel)
if err != nil {
return nil, err
}

ep.manager, err = integrations.NewManager(cfg.Integrations, logger, ep.promMetrics.InstanceManager(), ep.promMetrics.Validate)
if err != nil {
return nil, err
}

// Mostly everything should be up to date except for the server, which hasn't
// been created yet.
if err := ep.ApplyConfig(*cfg); err != nil {
return nil, err
}
return &ep, nil
}

// ApplyConfig applies changes to the subsystems of the Agent.
func (srv *Entrypoint) ApplyConfig(cfg config.Config) error {
srv.mut.Lock()
defer srv.mut.Unlock()

// The server config uses some unexported fields which can't be compared by
// default. Since only exported fields are used by YAML, we'll only compare
// those here.
var ignoreUnexported = cmpopts.IgnoreUnexported(logging.Format{}, logging.Level{})

if cmp.Equal(srv.cfg, cfg, ignoreUnexported) {
return nil
}

var (
// wireServer indicates a new server and that all API endpoints
// (HTTP and gRPC) need to be re-created.
wireServer bool

failed bool
err error
)

// Server doesn't have an ApplyConfig method so we need to do a full
// restart of it here.
if !cmp.Equal(srv.cfg.Server, cfg.Server, ignoreUnexported) {
if srv.srv != nil {
srv.srv.Shutdown()
}

srv.srv, err = server.New(cfg.Server)
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload server", "err", err)
failed = true
}

// New server, so everything needs re-wiring.
wireServer = true
}

// Go through each component and update it.
if err := srv.promMetrics.ApplyConfig(cfg.Prometheus); err != nil {
level.Error(srv.log).Log("msg", "failed to update prometheus", "err", err)
failed = true
}

if err := srv.lokiLogs.ApplyConfig(cfg.Loki); err != nil {
level.Error(srv.log).Log("msg", "failed to update loki", "err", err)
failed = true
}

if err := srv.tempoTraces.ApplyConfig(cfg.Tempo); err != nil {
level.Error(srv.log).Log("msg", "failed to update tempo", "err", err)
failed = true
}

if err := srv.manager.ApplyConfig(cfg.Integrations); err != nil {
level.Error(srv.log).Log("msg", "failed to update integrations", "err", err)
failed = true
}

if wireServer {
srv.promMetrics.WireAPI(srv.srv.HTTP)
srv.promMetrics.WireGRPC(srv.srv.GRPC)

srv.manager.WireAPI(srv.srv.HTTP)

srv.srv.HTTP.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Healthy.\n")
})
srv.srv.HTTP.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
})
}

srv.cfg = cfg
if failed {
return fmt.Errorf("changes did not apply successfully")
}
return nil
}

// Stop stops the Entrypoint and all subsystems.
func (srv *Entrypoint) Stop() {
srv.manager.Stop()
srv.lokiLogs.Stop()
srv.promMetrics.Stop()
srv.tempoTraces.Stop()
srv.srv.Shutdown()
}

// Start starts the server used by the Entrypoint, and will block until a
// termination signal is sent to the process.
func (srv *Entrypoint) Start() error {
return srv.srv.Run()
}
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
util_log.InitLogger(&cfg.Server)
logger := util_log.Logger

ep, err := NewEntryPoint(logger, cfg)
ep, err := NewEntrypoint(logger, cfg)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *AgentService) Execute(args []string, serviceRequests <-chan svc.ChangeR
util_log.InitLogger(&cfg.Server)
logger := util_log.Logger

ep, err := NewEntryPoint(logger, cfg)
ep, err := NewEntrypoint(logger, cfg)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
Expand Down
37 changes: 13 additions & 24 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,24 @@ type Config struct {

// ApplyDefaults sets default values in the config
func (c *Config) ApplyDefaults() error {
// The integration subsystem depends on Prometheus; so if it's enabled, force Prometheus
// to be enabled.
//
// TODO(rfratto): when Loki integrations are added, this line will no longer work; each
// integration will then have to be associated with a subsystem.
if c.Integrations.Enabled && !c.Prometheus.Enabled {
fmt.Println("NOTE: enabling Prometheus subsystem as Integrations are enabled")
c.Prometheus.Enabled = true
if err := c.Prometheus.ApplyDefaults(); err != nil {
return err
}

if c.Prometheus.Enabled {
if err := c.Prometheus.ApplyDefaults(); err != nil {
return err
}

// The default port exposed to the lifecycler should be the gRPC listen
// port since the agents will use gRPC for notifying other agents of
// resharding.
c.Prometheus.ServiceConfig.Lifecycler.ListenPort = c.Server.GRPCListenPort
if err := c.Integrations.ApplyDefaults(&c.Prometheus); err != nil {
return err
}

if c.Integrations.Enabled {
c.Integrations.ListenPort = &c.Server.HTTPListenPort
c.Integrations.ListenHost = &c.Server.HTTPListenAddress
c.Integrations.ServerUsingTLS = c.Server.HTTPTLSConfig.TLSKeyPath != "" && c.Server.HTTPTLSConfig.TLSCertPath != ""
if len(c.Integrations.PrometheusRemoteWrite) == 0 {
c.Integrations.PrometheusRemoteWrite = c.Prometheus.RemoteWrite
}
c.Prometheus.ServiceConfig.Lifecycler.ListenPort = c.Server.GRPCListenPort
c.Integrations.ListenPort = c.Server.HTTPListenPort
c.Integrations.ListenHost = c.Server.HTTPListenAddress

c.Integrations.ServerUsingTLS = c.Server.HTTPTLSConfig.TLSKeyPath != "" && c.Server.HTTPTLSConfig.TLSCertPath != ""

if len(c.Integrations.PrometheusRemoteWrite) == 0 {
c.Integrations.PrometheusRemoteWrite = c.Prometheus.RemoteWrite
}

return nil
}

Expand Down
Loading

0 comments on commit b21ae0c

Please sign in to comment.