diff --git a/agent/agent.go b/agent/agent.go index dfc5b4cb..9edeb70f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -7,9 +7,9 @@ import ( "os/signal" "syscall" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-autoscaler/agent/config" - agentServer "github.com/hashicorp/nomad-autoscaler/agent/http" "github.com/hashicorp/nomad-autoscaler/plugins/manager" "github.com/hashicorp/nomad-autoscaler/policy" filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file" @@ -26,7 +26,7 @@ type Agent struct { nomadClient *api.Client pluginManager *manager.PluginManager policyManager *policy.Manager - httpServer *agentServer.Server + inMemSink *metrics.InmemSink evalBroker *policyeval.Broker } @@ -59,16 +59,7 @@ func (a *Agent) Run() error { if err != nil { return fmt.Errorf("failed to setup telemetry: %v", err) } - - // Setup and start the HTTP server. - httpServer, err := agentServer.NewHTTPServer(a.config.HTTP, a.logger, inMem) - if err != nil { - return fmt.Errorf("failed to setup HTTP getHealth server: %v", err) - } - - a.httpServer = httpServer - go a.httpServer.Start() - go a.handleHTTPRequests(ctx) + a.inMemSink = inMem policyEvalCh := a.setupPolicyManager() go a.policyManager.Run(ctx, policyEvalCh) @@ -145,11 +136,6 @@ func (a *Agent) setupPolicyManager() chan *sdk.ScalingEvaluation { } func (a *Agent) stop() { - // Stop the health server. - if a.httpServer != nil { - a.httpServer.Stop() - } - // Kill all the plugins. if a.pluginManager != nil { a.pluginManager.KillPlugins() @@ -215,7 +201,7 @@ func (a *Agent) generateNomadClient() error { // reload triggers the reload of sub-routines based on the operator sending a // SIGHUP signal to the agent. -func (a Agent) reload() { +func (a *Agent) reload() { a.logger.Debug("reloading policy sources") a.policyManager.ReloadSources() } diff --git a/agent/http/agent.go b/agent/http/agent.go index 902904f9..58eff754 100644 --- a/agent/http/agent.go +++ b/agent/http/agent.go @@ -1,46 +1,10 @@ package http import ( - "fmt" "net/http" "strings" - "time" ) -const ( - AgentRequestTypeReload = iota -) - -type AgentRequest struct { - Type int - Request *http.Request - ResponseCh chan interface{} -} - -// sendAgentRequest wraps an AgentRequest into a synchronous request that will -// timeout if the agent doesn't reply back in time. -func (s *Server) sendAgentRequest(req AgentRequest) interface{} { - timeout := time.NewTimer(15 * time.Second) - timeoutErr := fmt.Errorf("request timeout") - - if req.ResponseCh == nil { - req.ResponseCh = make(chan interface{}) - } - - select { - case <-timeout.C: - return timeoutErr - case s.agentCh <- req: - } - - select { - case <-timeout.C: - return timeoutErr - case agentResp := <-req.ResponseCh: - return agentResp - } -} - func (s *Server) agentSpecificRequest(w http.ResponseWriter, r *http.Request) (interface{}, error) { path := strings.TrimPrefix(r.URL.Path, "/v1/agent") switch { @@ -56,14 +20,5 @@ func (s *Server) agentReload(w http.ResponseWriter, r *http.Request) (interface{ return nil, newCodedError(http.StatusMethodNotAllowed, errInvalidMethod) } - agentResp := s.sendAgentRequest(AgentRequest{ - Type: AgentRequestTypeReload, - Request: r, - }) - - if err, ok := agentResp.(error); ok && err != nil { - return nil, newCodedError(http.StatusInternalServerError, err.Error()) - } - - return nil, nil + return s.agent.Reload(w, r) } diff --git a/agent/http/agent_test.go b/agent/http/agent_test.go new file mode 100644 index 00000000..a553a092 --- /dev/null +++ b/agent/http/agent_test.go @@ -0,0 +1,41 @@ +package http + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestServer_agentReload(t *testing.T) { + testCases := []struct { + inputReq *http.Request + expectedRespCode int + name string + }{ + { + inputReq: httptest.NewRequest("PUT", "/v1/agent/reload", nil), + expectedRespCode: 200, + name: "successfully reload", + }, + { + inputReq: httptest.NewRequest("GET", "/v1/agent/reload", nil), + expectedRespCode: 405, + name: "incorrect request method", + }, + } + + srv, stopSrv := TestServer(t) + defer stopSrv() + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert := assert.New(t) + + w := httptest.NewRecorder() + srv.mux.ServeHTTP(w, tc.inputReq) + assert.Equal(tc.expectedRespCode, w.Code) + }) + } +} diff --git a/agent/http/health_test.go b/agent/http/health_test.go index 19361397..0f80537e 100644 --- a/agent/http/health_test.go +++ b/agent/http/health_test.go @@ -6,8 +6,6 @@ import ( "sync/atomic" "testing" - hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad-autoscaler/agent/config" "github.com/stretchr/testify/assert" ) @@ -43,9 +41,8 @@ func TestServer_getHealth(t *testing.T) { } // Create our HTTP server. - srv, err := NewHTTPServer(&config.HTTP{BindAddress: "127.0.0.1", BindPort: 8080}, hclog.NewNullLogger(), nil) - assert.Nil(t, err) - defer srv.ln.Close() + srv, stopSrv := TestServer(t) + defer stopSrv() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/agent/http/metrics.go b/agent/http/metrics.go index cb4246d9..a05d6252 100644 --- a/agent/http/metrics.go +++ b/agent/http/metrics.go @@ -28,7 +28,7 @@ func (s *Server) getMetrics(w http.ResponseWriter, r *http.Request) (interface{} s.getPrometheusMetrics().ServeHTTP(w, r) return nil, nil } - return s.inMemSink.DisplayMetrics(w, r) + return s.agent.DisplayMetrics(w, r) } // getPrometheusMetrics is the getMetrics handler when the caller wishes to diff --git a/agent/http/metrics_test.go b/agent/http/metrics_test.go index e5594685..e65587a8 100644 --- a/agent/http/metrics_test.go +++ b/agent/http/metrics_test.go @@ -4,11 +4,7 @@ import ( "net/http" "net/http/httptest" "testing" - "time" - metrics "github.com/armon/go-metrics" - hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad-autoscaler/agent/config" "github.com/stretchr/testify/assert" ) @@ -44,14 +40,9 @@ func TestServer_getMetrics(t *testing.T) { }, } - // Create a simple in-memory sink to use. - inm := metrics.NewInmemSink(10*time.Second, time.Minute) - metrics.DefaultInmemSignal(inm) - // Create our HTTP server. - srv, err := NewHTTPServer(&config.HTTP{BindAddress: "127.0.0.1", BindPort: 8080}, hclog.NewNullLogger(), inm) - assert.Nil(t, err) - defer srv.ln.Close() + srv, stopSrv := TestServer(t) + defer stopSrv() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/agent/http/server.go b/agent/http/server.go index b89ef212..e764a8a3 100644 --- a/agent/http/server.go +++ b/agent/http/server.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - metrics "github.com/armon/go-metrics" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad-autoscaler/agent/config" @@ -35,33 +34,34 @@ const ( healthAlivenessUnavailable ) +// AgentHTTP is the interface that defines the HTTP handlers that an Agent +// must implement in order to be accessible through the HTTP API. +type AgentHTTP interface { + DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error) +} + type Server struct { log hclog.Logger ln net.Listener mux *http.ServeMux srv *http.Server - // agentCh is used by the HTTP server to communicate back with the agent. - agentCh chan AgentRequest - // aliveness is used to describe the health response and should be set // atomically using healthAlivenessReady and healthAlivenessUnavailable // const declarations. aliveness int32 - // inMemSink is our in-memory telemetry sink used to server metrics - // endpoint requests. - inMemSink *metrics.InmemSink + agent AgentHTTP } // NewHTTPServer creates a new agent HTTP server. -func NewHTTPServer(cfg *config.HTTP, log hclog.Logger, inmSink *metrics.InmemSink) (*Server, error) { +func NewHTTPServer(cfg *config.HTTP, log hclog.Logger, agent AgentHTTP) (*Server, error) { srv := &Server{ - inMemSink: inmSink, - log: log.Named("http_server"), - mux: http.NewServeMux(), - agentCh: make(chan AgentRequest), + log: log.Named("http_server"), + mux: http.NewServeMux(), + agent: agent, } // Setup our handlers. @@ -129,11 +129,6 @@ func (s *Server) Stop() { } } -// AgentCh returns a read-only channel used to listen for agent requests. -func (s *Server) AgentCh() <-chan AgentRequest { - return s.agentCh -} - // wrap is a helper for all HTTP handler functions providing common // functionality including logging and error handling. func (s *Server) wrap(handler func(w http.ResponseWriter, r *http.Request) (interface{}, error)) func(w http.ResponseWriter, r *http.Request) { diff --git a/agent/http/testing.go b/agent/http/testing.go new file mode 100644 index 00000000..44589131 --- /dev/null +++ b/agent/http/testing.go @@ -0,0 +1,25 @@ +package http + +import ( + "testing" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/agent" + "github.com/hashicorp/nomad-autoscaler/agent/config" +) + +func TestServer(t *testing.T) (*Server, func()) { + cfg := &config.HTTP{ + BindAddress: "127.0.0.1", + BindPort: 0, // Use next available port. + } + + s, err := NewHTTPServer(cfg, hclog.NewNullLogger(), &agent.MockAgentHTTP{}) + if err != nil { + t.Fatalf("failed to start test server: %v", err) + } + + return s, func() { + s.Stop() + } +} diff --git a/agent/http_handler.go b/agent/http_handler.go index e9e94691..d58b6b3e 100644 --- a/agent/http_handler.go +++ b/agent/http_handler.go @@ -1,29 +1,14 @@ package agent -import ( - "context" - "fmt" +import "net/http" - "github.com/hashicorp/nomad-autoscaler/agent/http" -) +// The methods in this file implement in the http.AgentHTTP interface. -func (a *Agent) handleHTTPRequests(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case r := <-a.httpServer.AgentCh(): - switch r.Type { - case http.AgentRequestTypeReload: - a.handleHTTPReload(r) - default: - r.ResponseCh <- fmt.Errorf("invalid request type %q", r.Type) - } - } - } +func (a *Agent) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return a.inMemSink.DisplayMetrics(resp, req) } -func (a *Agent) handleHTTPReload(r http.AgentRequest) { +func (a *Agent) Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { a.reload() - r.ResponseCh <- nil + return nil, nil } diff --git a/agent/testing.go b/agent/testing.go new file mode 100644 index 00000000..364d7b38 --- /dev/null +++ b/agent/testing.go @@ -0,0 +1,22 @@ +package agent + +import ( + "net/http" + + metrics "github.com/armon/go-metrics" +) + +type MockAgentHTTP struct{} + +func (m *MockAgentHTTP) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return metrics.MetricsSummary{ + Timestamp: "2020-11-17 00:17:50 +0000 UTC", + Counters: []metrics.SampledValue{}, + Gauges: []metrics.GaugeValue{}, + Points: []metrics.PointValue{}, + Samples: []metrics.SampledValue{}, + }, nil +} +func (m *MockAgentHTTP) Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return nil, nil +} diff --git a/command/agent.go b/command/agent.go index 6f124d57..e93641ff 100644 --- a/command/agent.go +++ b/command/agent.go @@ -10,11 +10,15 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad-autoscaler/agent" "github.com/hashicorp/nomad-autoscaler/agent/config" + agentHTTP "github.com/hashicorp/nomad-autoscaler/agent/http" flaghelper "github.com/hashicorp/nomad-autoscaler/sdk/helper/flag" ) type AgentCommand struct { args []string + + agent *agent.Agent + httpServer *agentHTTP.Server } // Help should return long-form help text that includes the command-line @@ -226,9 +230,19 @@ func (c *AgentCommand) Run(args []string) int { JSONFormat: parsedConfig.LogJson, }) - // create and run agent - a := agent.NewAgent(parsedConfig, logger) - if err := a.Run(); err != nil { + // create and run agent and HTTP server + c.agent = agent.NewAgent(parsedConfig, logger) + httpServer, err := agentHTTP.NewHTTPServer(parsedConfig.HTTP, logger, c.agent) + if err != nil { + logger.Error("failed to setup HTTP getHealth server", "error", err) + return 1 + } + + c.httpServer = httpServer + go c.httpServer.Start() + defer c.httpServer.Stop() + + if err := c.agent.Run(); err != nil { logger.Error("failed to start agent", "error", err) return 1 }