Skip to content

Commit

Permalink
Merge pull request #6499 from hashicorp/f-nomad-monitor
Browse files Browse the repository at this point in the history
Nomad Monitor
  • Loading branch information
drewbailey committed Nov 5, 2019
2 parents 1848073 + 03f0aff commit eb51141
Show file tree
Hide file tree
Showing 52 changed files with 3,098 additions and 674 deletions.
51 changes: 51 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,57 @@ func (a *Agent) Health() (*AgentHealthResponse, error) {
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
}

// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
if err != nil {
errCh <- err
return nil, errCh
}

r.setQueryOptions(q)
_, resp, err := requireOK(a.client.doRequest(r))
if err != nil {
errCh <- err
return nil, errCh
}

frames := make(chan *StreamFrame, 10)
go func() {
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)

for {
select {
case <-stopCh:
close(frames)
return
default:
}

// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
errCh <- err
return
}

// Discard heartbeat frame
if frame.IsHeartbeat() {
continue
}

frames <- &frame
}
}()

return frames, errCh
}

// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {
Expand Down
119 changes: 119 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package api

import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/kr/pretty"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -257,3 +263,116 @@ func TestAgent_Health(t *testing.T) {
assert.Nil(err)
assert.True(health.Server.Ok)
}

// TestAgent_MonitorWithNode tests the Monitor endpoint
// passing in a log level and node ie, which tests monitor
// functionality for a specific client node
func TestAgent_MonitorWithNode(t *testing.T) {
t.Parallel()
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()

require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))

agent := c.Agent()

index := uint64(0)
var node *NodeListStub
// grab a node
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
node = nodes[0]
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

doneCh := make(chan struct{})
q := &QueryOptions{
Params: map[string]string{
"log_level": "debug",
"node_id": node.ID,
},
}

frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err := agent.NodeName()
require.NoError(t, err)

// Wait for a log message
OUTER:
for {
select {
case f := <-frames:
if strings.Contains(string(f.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Errorf("Error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}

// TestAgent_Monitor tests the Monitor endpoint
// passing in only a log level, which tests the servers
// monitor functionality
func TestAgent_Monitor(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
Params: map[string]string{
"log_level": "debug",
},
}

doneCh := make(chan struct{})
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err := agent.Region()
require.NoError(t, err)

// Wait for a log message
OUTER:
for {
select {
case log := <-frames:
if log == nil {
continue
}
if strings.Contains(string(log.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Fatalf("error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}
163 changes: 163 additions & 0 deletions client/agent_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package client

import (
"bytes"
"context"
"errors"
"io"
"time"

"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)

type Agent struct {
c *Client
}

func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
}

func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()

// Decode arguments
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)

if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}

// Check acl
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}

logLevel := log.LevelFromString(args.LogLevel)
if args.LogLevel == "" {
logLevel = log.LevelFromString("INFO")
}

if logLevel == log.NoLevel {
handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder)
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})

frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()

// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()

logCh := monitor.Start()
defer monitor.Stop()
initialOffset := int64(0)

// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()

var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}

break OUTER
}

var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}

resp.Payload = buf.Bytes()
buf.Reset()
}

if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}

if streamErr != nil {
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
Loading

0 comments on commit eb51141

Please sign in to comment.