Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement /-/reload #493

Merged
merged 10 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ cross-compilation issue, but will return in v0.13.0.

- [FEATURE] Add support for running as a Windows service (@mattdurham)

- [FEATURE] (beta) Add /-/reload support. It is not recommended to invoke
`/-/reload` against the main HTTP server. Instead, two new command-line flags
have been added: `--reload-addr` and `--reload-port`. These will launch a
`/-/reload`-only HTTP server that can be used to safely reload the Agent's
state. (@rfratto)

- [ENHANCEMENT] Support compression for trace export. (@mdisibio)

- [ENHANCEMENT] Allow Prometheus URL configuration to propagate to instances and integrations, if not given. (@mattdurham)
Expand All @@ -35,7 +41,7 @@ cross-compilation issue, but will return in v0.13.0.

- [CHANGE] The User-Agent header sent for logs will now be
`GrafanaAgent/<version>` (@rfratto)

- [ENHANCEMENT] Upgrade `go.opentelemetry.io/collector` to v0.21.0 (@mapno)

# v0.13.0 (2021-02-25)
Expand Down
194 changes: 139 additions & 55 deletions cmd/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ package main

import (
"fmt"
"net"
"net/http"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/loki"
"github.com/grafana/agent/pkg/tempo"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/agent/pkg/util/server"
"github.com/oklog/run"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/pkg/config"
"github.com/grafana/agent/pkg/prom"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/signals"

"github.com/go-kit/kit/log/level"
)
Expand All @@ -25,6 +28,8 @@ import (
type Entrypoint struct {
mut sync.Mutex

reloader Reloader

log *util.Logger
cfg config.Config

Expand All @@ -33,15 +38,37 @@ type Entrypoint struct {
lokiLogs *loki.Loki
tempoTraces *tempo.Tempo
manager *integrations.Manager

reloadListener net.Listener
reloadServer *http.Server
}

// Reloader is any function that returns a new config.
type Reloader = func() (*config.Config, error)

// NewEntrypoint creates a new Entrypoint.
func NewEntrypoint(logger *util.Logger, cfg *config.Config) (*Entrypoint, error) {
func NewEntrypoint(logger *util.Logger, cfg *config.Config, reloader Reloader) (*Entrypoint, error) {
var (
ep = Entrypoint{log: logger}
ep = &Entrypoint{
log: logger,
reloader: reloader,
}
err error
)

if cfg.ReloadPort != 0 {
ep.reloadListener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ReloadAddress, cfg.ReloadPort))
if err != nil {
return nil, fmt.Errorf("failed to listen on address for secondary /-/reload server: %w", err)
}

reloadMux := mux.NewRouter()
reloadMux.HandleFunc("/-/reload", ep.reloadHandler)
ep.reloadServer = &http.Server{Handler: reloadMux}
}

ep.srv = server.New(prometheus.DefaultRegisterer, logger)

ep.promMetrics, err = prom.New(prometheus.DefaultRegisterer, cfg.Prometheus, logger)
if err != nil {
return nil, err
Expand All @@ -67,52 +94,28 @@ func NewEntrypoint(logger *util.Logger, cfg *config.Config) (*Entrypoint, error)
if err := ep.ApplyConfig(*cfg); err != nil {
return nil, err
}
return &ep, nil
return ep, nil
}

// ApplyConfig applies changes to the subsystems of the Agent.
func (srv *Entrypoint) ApplyConfig(cfg config.Config) error {
srv.mut.Lock()
defer srv.mut.Unlock()

// The server config uses some unexported fields which can't be compared by
// default. Since only exported fields are used by YAML, we'll only compare
// those here.
var ignoreUnexported = cmpopts.IgnoreUnexported(logging.Format{}, logging.Level{})

if cmp.Equal(srv.cfg, cfg, ignoreUnexported) {
return nil
if cfg.Server.Log == nil {
rfratto marked this conversation as resolved.
Show resolved Hide resolved
cfg.Server.Log = srv.cfg.Server.Log
}

var (
// wireServer indicates a new server and that all API endpoints
// (HTTP and gRPC) need to be re-created.
wireServer bool

failed bool
err error
)
var failed bool

if err := srv.log.ApplyConfig(&cfg.Server); err != nil {
level.Error(srv.log).Log("msg", "failed to update logger", "err", err)
failed = true
}

// Server doesn't have an ApplyConfig method so we need to do a full
// restart of it here.
if !cmp.Equal(srv.cfg.Server, cfg.Server, ignoreUnexported) {
if srv.srv != nil {
srv.srv.Shutdown()
}

srv.srv, err = server.New(cfg.Server)
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload server", "err", err)
failed = true
}

// New server, so everything needs re-wiring.
wireServer = true
if err := srv.srv.ApplyConfig(cfg.Server, srv.wire); err != nil {
level.Error(srv.log).Log("msg", "failed to update server", "err", err)
failed = true
}

// Go through each component and update it.
Expand All @@ -136,40 +139,121 @@ func (srv *Entrypoint) ApplyConfig(cfg config.Config) error {
failed = true
}

if wireServer {
srv.promMetrics.WireAPI(srv.srv.HTTP)
srv.promMetrics.WireGRPC(srv.srv.GRPC)

srv.manager.WireAPI(srv.srv.HTTP)

srv.srv.HTTP.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Healthy.\n")
})
srv.srv.HTTP.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
})
}

srv.cfg = cfg
if failed {
return fmt.Errorf("changes did not apply successfully")
}

return nil
}

// wire is used to hook up API endpoints to components, and is called every
// time a new Weaveworks server is creatd.
func (srv *Entrypoint) wire(mux *mux.Router, grpc *grpc.Server) {
srv.promMetrics.WireAPI(mux)
srv.promMetrics.WireGRPC(grpc)

srv.manager.WireAPI(mux)

mux.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Healthy.\n")
})

mux.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Agent is Ready.\n")
})

mux.HandleFunc("/agent/api/v1/config", func(rw http.ResponseWriter, r *http.Request) {
srv.mut.Lock()
bb, err := yaml.Marshal(srv.cfg)
srv.mut.Unlock()

if err != nil {
http.Error(rw, fmt.Sprintf("failed to marshal config: %s", err), http.StatusInternalServerError)
} else {
_, _ = rw.Write(bb)
}
})

mux.HandleFunc("/-/reload", srv.reloadHandler)
}

func (srv *Entrypoint) reloadHandler(rw http.ResponseWriter, r *http.Request) {
success := srv.TriggerReload()
if success {
rw.WriteHeader(http.StatusOK)
} else {
rw.WriteHeader(http.StatusBadRequest)
}
}

// TriggerReload will cause the Entrypoint to re-request the config file and
// apply the latest config. TriggerReload returns true if the reload was
// successful.
func (srv *Entrypoint) TriggerReload() bool {
level.Info(srv.log).Log("msg", "reload of config file requested")

cfg, err := srv.reloader()
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload config file", "err", err)
return false
}

err = srv.ApplyConfig(*cfg)
if err != nil {
level.Error(srv.log).Log("msg", "failed to reload config file", "err", err)
return false
}
return true
}

// Stop stops the Entrypoint and all subsystems.
func (srv *Entrypoint) Stop() {
srv.mut.Lock()
defer srv.mut.Unlock()

srv.manager.Stop()
srv.lokiLogs.Stop()
srv.promMetrics.Stop()
srv.tempoTraces.Stop()
srv.srv.Shutdown()
srv.srv.Close()

if srv.reloadServer != nil {
srv.reloadServer.Close()
}
}

// Start starts the server used by the Entrypoint, and will block until a
// termination signal is sent to the process.
func (srv *Entrypoint) Start() error {
return srv.srv.Run()
var g run.Group

// Create a signal handler that will stop the Entrypoint once a termination
// signal is received.
signalHandler := signals.NewHandler(srv.cfg.Server.Log)

g.Add(func() error {
signalHandler.Loop()
return nil
}, func(e error) {
signalHandler.Stop()
})

if srv.reloadServer != nil && srv.reloadListener != nil {
g.Add(func() error {
return srv.reloadServer.Serve(srv.reloadListener)
}, func(e error) {
srv.reloadServer.Close()
})
}

g.Add(func() error {
return srv.srv.Run()
}, func(e error) {
srv.srv.Close()
})

return g.Run()
}
14 changes: 9 additions & 5 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func init() {
}

func main() {
// If this is a windows service then run it until if finishes
// If Windows is trying to run us as a service, go through that
// path instead.
if IsWindowsService() {
err := RunService()
if err != nil {
Expand All @@ -37,9 +38,11 @@ func main() {
return
}

// This is not a windows service so proceed normally
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])
reloader := func() (*config.Config, error) {
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
return config.Load(fs, os.Args[1:])
}
cfg, err := reloader()
if err != nil {
log.Fatalln(err)
}
Expand All @@ -48,11 +51,12 @@ func main() {
logger := util.NewLogger(&cfg.Server)
util_log.Logger = logger

ep, err := NewEntrypoint(logger, cfg)
ep, err := NewEntrypoint(logger, cfg, reloader)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
}

if err = ep.Start(); err != nil {
level.Error(logger).Log("msg", "error running agent", "err", err)
// Don't os.Exit here; we want to do cleanup by stopping promMetrics
Expand Down
10 changes: 7 additions & 3 deletions cmd/agent/service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ func (m *AgentService) Execute(args []string, serviceRequests <-chan svc.ChangeR
// Executable name and any command line parameters will be placed into os.args, this comes from
// registry key `Computer\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Services\<servicename>\ImagePath`
// oddly enough args is blank
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
cfg, err := config.Load(fs, os.Args[1:])

reloader := func() (*config.Config, error) {
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
return config.Load(fs, os.Args[1:])
}
cfg, err := reloader()
if err != nil {
log.Fatalln(err)
}
Expand All @@ -38,7 +42,7 @@ func (m *AgentService) Execute(args []string, serviceRequests <-chan svc.ChangeR
logger := util.NewLogger(&cfg.Server)
util_log.Logger = logger

ep, err := NewEntrypoint(logger, cfg)
ep, err := NewEntrypoint(logger, cfg, reloader)
if err != nil {
level.Error(logger).Log("msg", "error creating the agent server entrypoint", "err", err)
os.Exit(1)
Expand Down
Loading