Skip to content

Commit

Permalink
Merge pull request #6841 from hashicorp/f-agent-pprof-acl
Browse files Browse the repository at this point in the history
Remote agent pprof endpoints
  • Loading branch information
drewbailey committed Jan 10, 2020
2 parents 19711d1 + a58b8a5 commit ac0fef1
Show file tree
Hide file tree
Showing 24 changed files with 1,680 additions and 80 deletions.
81 changes: 81 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"strconv"
)

// Agent encapsulates an API client which talks to Nomad's
Expand Down Expand Up @@ -288,6 +290,85 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream
return frames, errCh
}

// PprofOptions contain a set of parameters for profiling a node or server.
type PprofOptions struct {
// ServerID is the server ID, name, or special value "leader" to
// specify the server that a given profile should be run on.
ServerID string

// NodeID is the node ID that a given profile should be run on.
NodeID string

// Seconds specifies the amount of time a profile should be run for.
// Seconds only applies for certain runtime profiles like CPU and Trace.
Seconds int

// GC determines if a runtime.GC() should be called before a heap
// profile.
GC int

// Debug specifies if the output of a lookup profile should be returned
// in human readable format instead of binary.
Debug int
}

// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
// The profile will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("profile", opts, q)
}

// Trace returns a runtime/pprof trace for a given server or node.
// The trace will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("trace", opts, q)
}

// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
// which profile to run. Accepts a client or server ID but not both simultaneously.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile unless debug is set.
func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest(profile, opts, q)
}

func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["seconds"] = strconv.Itoa(opts.Seconds)
q.Params["debug"] = strconv.Itoa(opts.Debug)
q.Params["gc"] = strconv.Itoa(opts.GC)
q.Params["node_id"] = opts.NodeID
q.Params["server_id"] = opts.ServerID

body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
if err != nil {
return nil, err
}

resp, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
return resp, nil
}

// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {
Expand Down
80 changes: 80 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,83 @@ OUTER:
}
}
}

func TestAgentCPUProfile(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

// Valid local request
{
opts := PprofOptions{
Seconds: 1,
}
resp, err := agent.CPUProfile(opts, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

// Invalid server request
{
opts := PprofOptions{
Seconds: 1,
ServerID: "unknown.global",
}
resp, err := agent.CPUProfile(opts, q)
require.Error(t, err)
require.Contains(t, err.Error(), "500 (unknown nomad server unknown.global)")
require.Nil(t, resp)
}

}

func TestAgentTrace(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

resp, err := agent.Trace(PprofOptions{}, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestAgentProfile(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

{
resp, err := agent.Lookup("heap", PprofOptions{}, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

// unknown profile
{
resp, err := agent.Lookup("invalid", PprofOptions{}, q)
require.Error(t, err)
require.Contains(t, err.Error(), "Unexpected response code: 404")
require.Nil(t, resp)
}
}
61 changes: 55 additions & 6 deletions client/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
Expand All @@ -23,12 +24,59 @@ type Agent struct {
}

func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
a := &Agent{c: c}
a.c.streamingRpcs.Register("Agent.Monitor", a.monitor)
return a
}

func (m *Agent) monitor(conn io.ReadWriteCloser) {
func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPprofResponse) error {
// Check ACL for agent write
aclObj, err := a.c.ResolveToken(args.AuthToken)
if err != nil {
return err
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return structs.ErrPermissionDenied
}

// If ACLs are disabled, EnableDebug must be enabled
if aclObj == nil && !a.c.config.EnableDebug {
return structs.ErrPermissionDenied
}

var resp []byte
var headers map[string]string

// Determine which profile to run and generate profile.
// Blocks for args.Seconds
// Our RPC endpoints currently don't support context
// or request cancellation so stubbing with TODO
switch args.ReqType {
case pprof.CPUReq:
resp, headers, err = pprof.CPUProfile(context.TODO(), args.Seconds)
case pprof.CmdReq:
resp, headers, err = pprof.Cmdline()
case pprof.LookupReq:
resp, headers, err = pprof.Profile(args.Profile, args.Debug, args.GC)
case pprof.TraceReq:
resp, headers, err = pprof.Trace(context.TODO(), args.Seconds)
}

if err != nil {
if pprof.IsErrProfileNotFound(err) {
return structs.NewErrRPCCoded(404, err.Error())
}
return structs.NewErrRPCCoded(500, err.Error())
}

// Copy profile response to reply
reply.Payload = resp
reply.AgentID = a.c.NodeID()
reply.HTTPHeaders = headers

return nil
}

func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()

Expand All @@ -43,7 +91,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}

// Check acl
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
Expand All @@ -64,7 +112,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})
Expand All @@ -76,6 +124,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {

framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()

defer framer.Destroy()

// goroutine to detect remote side closing
Expand Down
Loading

0 comments on commit ac0fef1

Please sign in to comment.