Skip to content

Commit

Permalink
refactor HTTP server to make it easier to call agent methods
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Nov 17, 2020
1 parent f9ff705 commit b6d36ea
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 122 deletions.
22 changes: 4 additions & 18 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"os/signal"
"syscall"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad-autoscaler/agent/config"
agentServer "github.com/hashicorp/nomad-autoscaler/agent/http"
"github.com/hashicorp/nomad-autoscaler/plugins/manager"
"github.com/hashicorp/nomad-autoscaler/policy"
filePolicy "github.com/hashicorp/nomad-autoscaler/policy/file"
Expand All @@ -26,7 +26,7 @@ type Agent struct {
nomadClient *api.Client
pluginManager *manager.PluginManager
policyManager *policy.Manager
httpServer *agentServer.Server
inMemSink *metrics.InmemSink
evalBroker *policyeval.Broker
}

Expand Down Expand Up @@ -59,16 +59,7 @@ func (a *Agent) Run() error {
if err != nil {
return fmt.Errorf("failed to setup telemetry: %v", err)
}

// Setup and start the HTTP server.
httpServer, err := agentServer.NewHTTPServer(a.config.HTTP, a.logger, inMem)
if err != nil {
return fmt.Errorf("failed to setup HTTP getHealth server: %v", err)
}

a.httpServer = httpServer
go a.httpServer.Start()
go a.handleHTTPRequests(ctx)
a.inMemSink = inMem

policyEvalCh := a.setupPolicyManager()
go a.policyManager.Run(ctx, policyEvalCh)
Expand Down Expand Up @@ -145,11 +136,6 @@ func (a *Agent) setupPolicyManager() chan *sdk.ScalingEvaluation {
}

func (a *Agent) stop() {
// Stop the health server.
if a.httpServer != nil {
a.httpServer.Stop()
}

// Kill all the plugins.
if a.pluginManager != nil {
a.pluginManager.KillPlugins()
Expand Down Expand Up @@ -215,7 +201,7 @@ func (a *Agent) generateNomadClient() error {

// reload triggers the reload of sub-routines based on the operator sending a
// SIGHUP signal to the agent.
func (a Agent) reload() {
func (a *Agent) reload() {
a.logger.Debug("reloading policy sources")
a.policyManager.ReloadSources()
}
Expand Down
47 changes: 1 addition & 46 deletions agent/http/agent.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,10 @@
package http

import (
"fmt"
"net/http"
"strings"
"time"
)

const (
AgentRequestTypeReload = iota
)

type AgentRequest struct {
Type int
Request *http.Request
ResponseCh chan interface{}
}

// sendAgentRequest wraps an AgentRequest into a synchronous request that will
// timeout if the agent doesn't reply back in time.
func (s *Server) sendAgentRequest(req AgentRequest) interface{} {
timeout := time.NewTimer(15 * time.Second)
timeoutErr := fmt.Errorf("request timeout")

if req.ResponseCh == nil {
req.ResponseCh = make(chan interface{})
}

select {
case <-timeout.C:
return timeoutErr
case s.agentCh <- req:
}

select {
case <-timeout.C:
return timeoutErr
case agentResp := <-req.ResponseCh:
return agentResp
}
}

func (s *Server) agentSpecificRequest(w http.ResponseWriter, r *http.Request) (interface{}, error) {
path := strings.TrimPrefix(r.URL.Path, "/v1/agent")
switch {
Expand All @@ -56,14 +20,5 @@ func (s *Server) agentReload(w http.ResponseWriter, r *http.Request) (interface{
return nil, newCodedError(http.StatusMethodNotAllowed, errInvalidMethod)
}

agentResp := s.sendAgentRequest(AgentRequest{
Type: AgentRequestTypeReload,
Request: r,
})

if err, ok := agentResp.(error); ok && err != nil {
return nil, newCodedError(http.StatusInternalServerError, err.Error())
}

return nil, nil
return s.agent.Reload(w, r)
}
41 changes: 41 additions & 0 deletions agent/http/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package http

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestServer_agentReload(t *testing.T) {
testCases := []struct {
inputReq *http.Request
expectedRespCode int
name string
}{
{
inputReq: httptest.NewRequest("PUT", "/v1/agent/reload", nil),
expectedRespCode: 200,
name: "successfully reload",
},
{
inputReq: httptest.NewRequest("GET", "/v1/agent/reload", nil),
expectedRespCode: 405,
name: "incorrect request method",
},
}

srv, stopSrv := TestServer(t)
defer stopSrv()

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert := assert.New(t)

w := httptest.NewRecorder()
srv.mux.ServeHTTP(w, tc.inputReq)
assert.Equal(tc.expectedRespCode, w.Code)
})
}
}
7 changes: 2 additions & 5 deletions agent/http/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync/atomic"
"testing"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad-autoscaler/agent/config"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -43,9 +41,8 @@ func TestServer_getHealth(t *testing.T) {
}

// Create our HTTP server.
srv, err := NewHTTPServer(&config.HTTP{BindAddress: "127.0.0.1", BindPort: 8080}, hclog.NewNullLogger(), nil)
assert.Nil(t, err)
defer srv.ln.Close()
srv, stopSrv := TestServer(t)
defer stopSrv()

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion agent/http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *Server) getMetrics(w http.ResponseWriter, r *http.Request) (interface{}
s.getPrometheusMetrics().ServeHTTP(w, r)
return nil, nil
}
return s.inMemSink.DisplayMetrics(w, r)
return s.agent.DisplayMetrics(w, r)
}

// getPrometheusMetrics is the getMetrics handler when the caller wishes to
Expand Down
13 changes: 2 additions & 11 deletions agent/http/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

metrics "github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad-autoscaler/agent/config"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -44,14 +40,9 @@ func TestServer_getMetrics(t *testing.T) {
},
}

// Create a simple in-memory sink to use.
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)

// Create our HTTP server.
srv, err := NewHTTPServer(&config.HTTP{BindAddress: "127.0.0.1", BindPort: 8080}, hclog.NewNullLogger(), inm)
assert.Nil(t, err)
defer srv.ln.Close()
srv, stopSrv := TestServer(t)
defer stopSrv()

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
29 changes: 12 additions & 17 deletions agent/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync/atomic"
"time"

metrics "github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad-autoscaler/agent/config"
Expand All @@ -35,33 +34,34 @@ const (
healthAlivenessUnavailable
)

// AgentHTTP is the interface that defines the HTTP handlers that an Agent
// must implement in order to be accessible through the HTTP API.
type AgentHTTP interface {
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error)
}

type Server struct {
log hclog.Logger
ln net.Listener
mux *http.ServeMux
srv *http.Server

// agentCh is used by the HTTP server to communicate back with the agent.
agentCh chan AgentRequest

// aliveness is used to describe the health response and should be set
// atomically using healthAlivenessReady and healthAlivenessUnavailable
// const declarations.
aliveness int32

// inMemSink is our in-memory telemetry sink used to server metrics
// endpoint requests.
inMemSink *metrics.InmemSink
agent AgentHTTP
}

// NewHTTPServer creates a new agent HTTP server.
func NewHTTPServer(cfg *config.HTTP, log hclog.Logger, inmSink *metrics.InmemSink) (*Server, error) {
func NewHTTPServer(cfg *config.HTTP, log hclog.Logger, agent AgentHTTP) (*Server, error) {

srv := &Server{
inMemSink: inmSink,
log: log.Named("http_server"),
mux: http.NewServeMux(),
agentCh: make(chan AgentRequest),
log: log.Named("http_server"),
mux: http.NewServeMux(),
agent: agent,
}

// Setup our handlers.
Expand Down Expand Up @@ -129,11 +129,6 @@ func (s *Server) Stop() {
}
}

// AgentCh returns a read-only channel used to listen for agent requests.
func (s *Server) AgentCh() <-chan AgentRequest {
return s.agentCh
}

// wrap is a helper for all HTTP handler functions providing common
// functionality including logging and error handling.
func (s *Server) wrap(handler func(w http.ResponseWriter, r *http.Request) (interface{}, error)) func(w http.ResponseWriter, r *http.Request) {
Expand Down
25 changes: 25 additions & 0 deletions agent/http/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package http

import (
"testing"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad-autoscaler/agent"
"github.com/hashicorp/nomad-autoscaler/agent/config"
)

func TestServer(t *testing.T) (*Server, func()) {
cfg := &config.HTTP{
BindAddress: "127.0.0.1",
BindPort: 0, // Use next available port.
}

s, err := NewHTTPServer(cfg, hclog.NewNullLogger(), &agent.MockAgentHTTP{})
if err != nil {
t.Fatalf("failed to start test server: %v", err)
}

return s, func() {
s.Stop()
}
}
27 changes: 6 additions & 21 deletions agent/http_handler.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
package agent

import (
"context"
"fmt"
import "net/http"

"github.com/hashicorp/nomad-autoscaler/agent/http"
)
// The methods in this file implement in the http.AgentHTTP interface.

func (a *Agent) handleHTTPRequests(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case r := <-a.httpServer.AgentCh():
switch r.Type {
case http.AgentRequestTypeReload:
a.handleHTTPReload(r)
default:
r.ResponseCh <- fmt.Errorf("invalid request type %q", r.Type)
}
}
}
func (a *Agent) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
return a.inMemSink.DisplayMetrics(resp, req)
}

func (a *Agent) handleHTTPReload(r http.AgentRequest) {
func (a *Agent) Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
a.reload()
r.ResponseCh <- nil
return nil, nil
}
22 changes: 22 additions & 0 deletions agent/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package agent

import (
"net/http"

metrics "github.com/armon/go-metrics"
)

type MockAgentHTTP struct{}

func (m *MockAgentHTTP) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
return metrics.MetricsSummary{
Timestamp: "2020-11-17 00:17:50 +0000 UTC",
Counters: []metrics.SampledValue{},
Gauges: []metrics.GaugeValue{},
Points: []metrics.PointValue{},
Samples: []metrics.SampledValue{},
}, nil
}
func (m *MockAgentHTTP) Reload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
return nil, nil
}
Loading

0 comments on commit b6d36ea

Please sign in to comment.