Skip to content

Commit

Permalink
coordinate closing of doneCh, use interface to simplify callers
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Nov 5, 2019
1 parent 33ba36a commit 620fba5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 23 deletions.
5 changes: 2 additions & 3 deletions client/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
return
}

stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()

monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
Expand Down Expand Up @@ -93,7 +91,8 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}
}()

logCh := monitor.Start(stopCh)
logCh := monitor.Start()
defer monitor.Stop()
initialOffset := int64(0)

// receive logs and build frames
Expand Down
59 changes: 47 additions & 12 deletions command/agent/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@ import (
log "github.com/hashicorp/go-hclog"
)

type Monitor interface {
// Start returns a channel of log messages which are sent
// ever time a log message occurs
Start() <-chan []byte

// Stop de-registers the sink from the InterceptLogger
// and closes the log channels
Stop()
}

// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor struct {
// sync.Mutex protects droppedCount and logCh
type monitor struct {
// protects droppedCount and logCh
sync.Mutex

sink log.SinkAdapter

// logger is the logger we will be monitoring
logger log.InterceptLogger
logCh chan []byte

// logCh is a buffered chan where we send logs when streaming
logCh chan []byte

// doneCh coordinates the shutdown of logCh
doneCh chan struct{}

// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
// only access under lock
Expand All @@ -33,10 +49,15 @@ type Monitor struct {

// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor {
sw := &Monitor{
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor {
return new(buf, logger, opts)
}

func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor {
sw := &monitor{
logger: logger,
logCh: make(chan []byte, buf),
doneCh: make(chan struct{}, 1),
bufSize: buf,
droppedDuration: 3 * time.Second,
}
Expand All @@ -48,10 +69,14 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor
return sw
}

// Stop stops the monitoring process
func (d *monitor) Stop() {
close(d.doneCh)
}

// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel. A non-nil
// stopCh can be used to de-register the sink and stop log streaming
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
d.logger.RegisterSink(d.sink)

streamCh := make(chan []byte, d.bufSize)
Expand All @@ -61,13 +86,13 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
select {
case log := <-d.logCh:
select {
case <-stopCh:
case <-d.doneCh:
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
case streamCh <- log:
}
case <-stopCh:
case <-d.doneCh:
d.Lock()
defer d.Unlock()

Expand All @@ -83,7 +108,7 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
LOOP:
for {
select {
case <-stopCh:
case <-d.doneCh:
break LOOP
case <-time.After(d.droppedDuration):
d.Lock()
Expand All @@ -92,6 +117,8 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
case <-d.doneCh:
break LOOP
// Try sending dropped message count to logCh in case
// there is room in the buffer now.
case d.logCh <- []byte(dropped):
Expand All @@ -107,6 +134,7 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
}
d.droppedCount = 0
}
// unlock after handling dropped message
d.Unlock()
}
}
Expand All @@ -117,10 +145,17 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {

// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *Monitor) Write(p []byte) (n int, err error) {
func (d *monitor) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()

// ensure logCh is still open
select {
case <-d.doneCh:
return
default:
}

bytes := make([]byte, len(p))
copy(bytes, p)

Expand Down
9 changes: 4 additions & 5 deletions command/agent/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ func TestMonitor_Start(t *testing.T) {
Level: log.Debug,
})

closeCh := make(chan struct{})

logCh := m.Start(closeCh)
logCh := m.Start()
defer m.Stop()

go func() {
logger.Debug("test log")
Expand All @@ -49,15 +48,15 @@ func TestMonitor_DroppedMessages(t *testing.T) {
Level: log.Warn,
})

m := New(5, logger, &log.LoggerOptions{
m := new(5, logger, &log.LoggerOptions{
Level: log.Debug,
})
m.droppedDuration = 5 * time.Millisecond

doneCh := make(chan struct{})
defer close(doneCh)

logCh := m.Start(doneCh)
logCh := m.Start()

for i := 0; i <= 100; i++ {
logger.Debug(fmt.Sprintf("test message %d", i))
Expand Down
5 changes: 2 additions & 3 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}

// NodeID was empty, so monitor this current server
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()

monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
Expand Down Expand Up @@ -97,7 +95,8 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}
}()

logCh := monitor.Start(stopCh)
logCh := monitor.Start()
defer monitor.Stop()
initialOffset := int64(0)

// receive logs and build frames
Expand Down

0 comments on commit 620fba5

Please sign in to comment.