Skip to content

Commit

Permalink
Implement /-/reload (#493)
Browse files Browse the repository at this point in the history
* implement /-/reload

* move reloading server to a new server util package

* documentation

* more docs

* fix bug in util/server

* move unregisterer to util/server

* document logger propagation

* remove weird log propagation hack

* remove go-cmp and defer to yaml comparison

* document reload server constraints
  • Loading branch information
rfratto authored Mar 31, 2021
1 parent 3692097 commit f98ff9f
Show file tree
Hide file tree
Showing 28 changed files with 516 additions and 842 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ cross-compilation issue, but will return in v0.13.0.

- [FEATURE] Add support for running as a Windows service (@mattdurham)

- [FEATURE] (beta) Add /-/reload support. It is not recommended to invoke
`/-/reload` against the main HTTP server. Instead, two new command-line flags
have been added: `--reload-addr` and `--reload-port`. These will launch a
`/-/reload`-only HTTP server that can be used to safely reload the Agent's
state. (@rfratto)

- [ENHANCEMENT] Support compression for trace export. (@mdisibio)

- [ENHANCEMENT] Allow Prometheus URL configuration to propagate to instances and integrations, if not given. (@mattdurham)
Expand All @@ -35,7 +41,7 @@ cross-compilation issue, but will return in v0.13.0.

- [CHANGE] The User-Agent header sent for logs will now be
`GrafanaAgent/<version>` (@rfratto)

- [ENHANCEMENT] Upgrade `go.opentelemetry.io/collector` to v0.21.0 (@mapno)

- [ENHANCEMENT] Add kafka trace receiver (@mapno)
Expand Down
194 changes: 137 additions & 57 deletions cmd/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ package main

import (
"fmt"
"net"
"net/http"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/loki"
"github.com/grafana/agent/pkg/tempo"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/agent/pkg/util/server"
"github.com/oklog/run"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/prom"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/signals"

"github.com/go-kit/kit/log/level"
)
Expand All @@ -25,6 +28,8 @@ import (
type Entrypoint struct {
mut sync.Mutex

reloader Reloader

log *util.Logger
cfg config.Config

Expand All @@ -33,15 +38,37 @@ type Entrypoint struct {
lokiLogs *loki.Loki
tempoTraces *tempo.Tempo
manager *integrations.Manager

reloadListener net.Listener
reloadServer *http.Server
}

// Reloader is any function that returns a new config.
type Reloader = func() (*config.Config, error)

// NewEntrypoint creates a new Entrypoint.
func NewEntrypoint(logger *util.Logger, cfg *config.Config) (*Entrypoint, error) {
func NewEntrypoint(logger *util.Logger, cfg *config.Config, reloader Reloader) (*Entrypoint, error) {
var (
ep = Entrypoint{log: logger}
ep = &Entrypoint{
log: logger,
reloader: reloader,
}
err error
)

if cfg.ReloadPort != 0 {
ep.reloadListener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ReloadAddress, cfg.ReloadPort))
if err != nil {
return nil, fmt.Errorf("failed to listen on address for secondary /-/reload server: %w", err)
}

reloadMux := mux.NewRouter()
reloadMux.HandleFunc("/-/reload", ep.reloadHandler)
ep.reloadServer = &http.Server{Handler: reloadMux}
}

ep.srv = server.New(prometheus.DefaultRegisterer, logger)

ep.promMetrics, err = prom.New(prometheus.DefaultRegisterer, cfg.Prometheus, logger)
if err != nil {
return nil, err
Expand All @@ -67,52 +94,24 @@ func NewEntrypoint(logger *util.Logger, cfg *config.Config) (*Entrypoint, error)
if err := ep.ApplyConfig(*cfg); err != nil {
return nil, err
}
return &ep, nil
return ep, nil
}

// ApplyConfig applies changes to the subsystems of the Agent.
func (srv *Entrypoint) ApplyConfig(cfg config.Config) error {
srv.mut.Lock()
defer srv.mut.Unlock()

// The server config uses some unexported fields which can't be compared by
// default. Since only exported fields are used by YAML, we'll only compare
// those here.
var ignoreUnexported = cmpopts.IgnoreUnexported(logging.Format{}, logging.Level{})

if cmp.Equal(srv.cfg, cfg, ignoreUnexported) {
return nil
}

var (
// wireServer indicates a new server and that all API endpoints
// (HTTP and gRPC) need to be re-created.
wireServer bool

failed bool
err error
)
var failed bool

if err := srv.log.ApplyConfig(&cfg.Server); err != nil {
level.Error(srv.log).Log("msg", "failed to update logger", "err", err)
failed = true
}

// Server doesn't have an ApplyConfig method so we need to do a full
// restart of it here.
if !cmp.Equal(srv.cfg.Server, cfg.Server, ignoreUnexported) {
if srv.srv != nil {
srv.srv.Shutdown()
}

srv.srv, err = server.New(cfg.Server)
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload server", "err", err)
failed = true
}

// New server, so everything needs re-wiring.
wireServer = true
if err := srv.srv.ApplyConfig(cfg.Server, srv.wire); err != nil {
level.Error(srv.log).Log("msg", "failed to update server", "err", err)
failed = true
}

// Go through each component and update it.
Expand All @@ -136,40 +135,121 @@ func (srv *Entrypoint) ApplyConfig(cfg config.Config) error {
failed = true
}

if wireServer {
srv.promMetrics.WireAPI(srv.srv.HTTP)
srv.promMetrics.WireGRPC(srv.srv.GRPC)

srv.manager.WireAPI(srv.srv.HTTP)

srv.srv.HTTP.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Healthy.\n")
})
srv.srv.HTTP.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
})
}

srv.cfg = cfg
if failed {
return fmt.Errorf("changes did not apply successfully")
}

return nil
}

// wire is used to hook up API endpoints to components, and is called every
// time a new Weaveworks server is creatd.
func (srv *Entrypoint) wire(mux *mux.Router, grpc *grpc.Server) {
srv.promMetrics.WireAPI(mux)
srv.promMetrics.WireGRPC(grpc)

srv.manager.WireAPI(mux)

mux.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Healthy.\n")
})

mux.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
})

mux.HandleFunc("/agent/api/v1/config", func(rw http.ResponseWriter, r *http.Request) {
srv.mut.Lock()
bb, err := yaml.Marshal(srv.cfg)
srv.mut.Unlock()

if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal config: %s", err), http.StatusInternalServerError)
} else {
_, _ = rw.Write(bb)
}
})

mux.HandleFunc("/-/reload", srv.reloadHandler)
}

func (srv *Entrypoint) reloadHandler(rw http.ResponseWriter, r *http.Request) {
success := srv.TriggerReload()
if success {
rw.WriteHeader(http.StatusOK)
} else {
rw.WriteHeader(http.StatusBadRequest)
}
}

// TriggerReload will cause the Entrypoint to re-request the config file and
// apply the latest config. TriggerReload returns true if the reload was
// successful.
func (srv *Entrypoint) TriggerReload() bool {
level.Info(srv.log).Log("msg", "reload of config file requested")

cfg, err := srv.reloader()
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload config file", "err", err)
return false
}

err = srv.ApplyConfig(*cfg)
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload config file", "err", err)
return false
}
return true
}

// Stop stops the Entrypoint and all subsystems.
func (srv *Entrypoint) Stop() {
srv.mut.Lock()
defer srv.mut.Unlock()

srv.manager.Stop()
srv.lokiLogs.Stop()
srv.promMetrics.Stop()
srv.tempoTraces.Stop()
srv.srv.Shutdown()
srv.srv.Close()

if srv.reloadServer != nil {
srv.reloadServer.Close()
}
}

// Start starts the server used by the Entrypoint, and will block until a
// termination signal is sent to the process.
func (srv *Entrypoint) Start() error {
return srv.srv.Run()
var g run.Group

// Create a signal handler that will stop the Entrypoint once a termination
// signal is received.
signalHandler := signals.NewHandler(srv.cfg.Server.Log)

g.Add(func() error {
signalHandler.Loop()
return nil
}, func(e error) {
signalHandler.Stop()
})

if srv.reloadServer != nil && srv.reloadListener != nil {
g.Add(func() error {
return srv.reloadServer.Serve(srv.reloadListener)
}, func(e error) {
srv.reloadServer.Close()
})
}

g.Add(func() error {
return srv.srv.Run()
}, func(e error) {
srv.srv.Close()
})

return g.Run()
}
26 changes: 21 additions & 5 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/util"
"github.com/weaveworks/common/logging"

// Adds version information
_ "github.com/grafana/agent/pkg/build"
Expand All @@ -28,7 +29,8 @@ func init() {
}

func main() {
// If this is a windows service then run it until if finishes
// If Windows is trying to run us as a service, go through that
// path instead.
if IsWindowsService() {
err := RunService()
if err != nil {
Expand All @@ -37,9 +39,17 @@ func main() {
return
}

// This is not a windows service so proceed normally
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])
var cfgLogger logging.Interface

reloader := func() (*config.Config, error) {
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])
if cfg != nil {
cfg.Server.Log = cfgLogger
}
return cfg, err
}
cfg, err := reloader()
if err != nil {
log.Fatalln(err)
}
Expand All @@ -48,11 +58,17 @@ func main() {
logger := util.NewLogger(&cfg.Server)
util_log.Logger = logger

ep, err := NewEntrypoint(logger, cfg)
// We need to manually set the logger for the first call to reload.
// Subsequent reloads will use cfgLogger.
cfgLogger = util.GoKitLogger(logger)
cfg.Server.Log = cfgLogger

ep, err := NewEntrypoint(logger, cfg, reloader)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
}

if err = ep.Start(); err != nil {
level.Error(logger).Log("msg", "error running agent", "err", err)
// Don't os.Exit here; we want to do cleanup by stopping promMetrics
Expand Down
22 changes: 19 additions & 3 deletions cmd/agent/service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/util"
"github.com/weaveworks/common/logging"

"golang.org/x/sys/windows/svc"
)
Expand All @@ -28,8 +29,18 @@ func (m *AgentService) Execute(args []string, serviceRequests <-chan svc.ChangeR
// Executable name and any command line parameters will be placed into os.args, this comes from
// registry key `Computer\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Services\<servicename>\ImagePath`
// oddly enough args is blank
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])

var cfgLogger logging.Interface

reloader := func() (*config.Config, error) {
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])
if cfg != nil {
cfg.Server.Log = cfgLogger
}
return cfg, err
}
cfg, err := reloader()
if err != nil {
log.Fatalln(err)
}
Expand All @@ -38,7 +49,12 @@ func (m *AgentService) Execute(args []string, serviceRequests <-chan svc.ChangeR
logger := util.NewLogger(&cfg.Server)
util_log.Logger = logger

ep, err := NewEntrypoint(logger, cfg)
// We need to manually set the logger for the first call to reload.
// Subsequent reloads will use cfgLogger.
cfgLogger = util.GoKitLogger(logger)
cfg.Server.Log = cfgLogger

ep, err := NewEntrypoint(logger, cfg, reloader)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
Expand Down
Loading

0 comments on commit f98ff9f

Please sign in to comment.