Skip to content

Commit

Permalink
moving endpoints over to frames
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Nov 1, 2019
1 parent 522c45f commit 334df12
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 27 deletions.
2 changes: 1 addition & 1 deletion api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream
}
}()

return frames, nil
return frames, errCh
}

// joinResponse is used to decode the response we get while
Expand Down
25 changes: 16 additions & 9 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"bytes"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -311,22 +312,24 @@ func TestAgent_MonitorWithNode(t *testing.T) {
},
}

logCh, err := agent.Monitor(doneCh, q)
require.NoError(t, err)
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err = agent.NodeName()
_, err := agent.NodeName()
require.NoError(t, err)

// Wait for a log message
var result bytes.Buffer
OUTER:
for {
select {
case log := <-logCh:
if strings.Contains(string(log.Data), "[DEBUG]") {
case f := <-frames:
if strings.Contains(string(f.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Errorf("Error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
Expand All @@ -350,22 +353,26 @@ func TestAgent_Monitor(t *testing.T) {
}

doneCh := make(chan struct{})
logCh, err := agent.Monitor(doneCh, q)
require.NoError(t, err)
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err = agent.Region()
_, err := agent.Region()
require.NoError(t, err)

// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
case log := <-frames:
if log == nil {
continue
}
if strings.Contains(string(log.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Fatalf("error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
Expand Down
7 changes: 2 additions & 5 deletions command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,13 @@ func TestHTTP_AgentMonitor(t *testing.T) {
tried := 0
testutil.WaitForResult(func() (bool, error) {
if tried < maxLogAttempts {
s.Server.logger.Debug("log that should not be sent")
s.Server.logger.Warn("log that should be sent")
tried++
}

got := resp.Body.String()
want := "[WARN] http: log that should be sent"
want := `{"Data":"`
if strings.Contains(got, want) {
require.NotContains(t, resp.Body.String(), "[DEBUG]")
return true, nil
}

Expand Down Expand Up @@ -344,9 +342,8 @@ func TestHTTP_AgentMonitor(t *testing.T) {
}

out += string(output)
want := "[WARN] http: log that should be sent"
want := `{"Data":"`
if strings.Contains(out, want) {
require.NotContains(t, resp.Body.String(), "[DEBUG]")
return true, nil
}

Expand Down
2 changes: 2 additions & 0 deletions command/agent_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ func TestMonitorCommand_Implements(t *testing.T) {

func TestMonitorCommand_Fails(t *testing.T) {
t.Parallel()
srv, _, _ := testServer(t, false, nil)
defer srv.Shutdown()

ui := new(cli.MockUi)
cmd := &MonitorCommand{Meta: Meta{Ui: ui}}
Expand Down
71 changes: 62 additions & 9 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package nomad

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"time"

log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -138,9 +141,20 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
JSONFormat: args.LogJSON,
})

frames := make(chan *sframer.StreamFrame, 32)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

// framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()

// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe closed, exit
// One end of the pipe explicitly closed, exit
cancel()
return
}
Expand All @@ -151,14 +165,59 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}()

logCh := monitor.Start(stopCh)
initialOffset := int64(0)

// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()

var streamErr error
OUTER:
for {
select {
case log := <-logCh:
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}

break OUTER
}

var resp cstructs.StreamErrWrapper
resp.Payload = log
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}

resp.Payload = buf.Bytes()
buf.Reset()
}

if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
Expand All @@ -174,11 +233,5 @@ OUTER:
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}

// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)),
})
return
}
}
17 changes: 14 additions & 3 deletions nomad/client_agent_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"encoding/json"
"fmt"
"io"
"net"
Expand All @@ -11,10 +12,12 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
Expand Down Expand Up @@ -85,7 +88,7 @@ func TestMonitor_Monitor_Remote_Server(t *testing.T) {
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))

timeout := time.After(1 * time.Second)
timeout := time.After(3 * time.Second)
expected := "[DEBUG]"
received := ""

Expand All @@ -101,7 +104,11 @@ OUTER:
t.Fatalf("Got error: %v", msg.Error.Error())
}

received += string(msg.Payload)
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
Expand Down Expand Up @@ -181,7 +188,11 @@ OUTER:
t.Fatalf("Got error: %v", msg.Error.Error())
}

received += string(msg.Payload)
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
Expand Down

0 comments on commit 334df12

Please sign in to comment.