From 620fba559e57d0e47c5e804fd7a29d9268f043ac Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Tue, 5 Nov 2019 09:16:51 -0500 Subject: [PATCH] coordinate closing of doneCh, use interface to simplify callers --- client/agent_endpoint.go | 5 +-- command/agent/monitor/monitor.go | 59 +++++++++++++++++++++------ command/agent/monitor/monitor_test.go | 9 ++-- nomad/client_agent_endpoint.go | 5 +-- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index d22291cef471..19df4732abfd 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -61,9 +61,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { return } - stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) - defer close(stopCh) defer cancel() monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{ @@ -93,7 +91,8 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } }() - logCh := monitor.Start(stopCh) + logCh := monitor.Start() + defer monitor.Stop() initialOffset := int64(0) // receive logs and build frames diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 60397b54491d..053592b0b3e9 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -8,18 +8,34 @@ import ( log "github.com/hashicorp/go-hclog" ) +type Monitor interface { + // Start returns a channel of log messages which are sent + // ever time a log message occurs + Start() <-chan []byte + + // Stop de-registers the sink from the InterceptLogger + // and closes the log channels + Stop() +} + // Monitor provides a mechanism to stream logs using go-hclog // InterceptLogger and SinkAdapter. It allows streaming of logs // at a different log level than what is set on the logger. -type Monitor struct { - // sync.Mutex protects droppedCount and logCh +type monitor struct { + // protects droppedCount and logCh sync.Mutex sink log.SinkAdapter // logger is the logger we will be monitoring logger log.InterceptLogger - logCh chan []byte + + // logCh is a buffered chan where we send logs when streaming + logCh chan []byte + + // doneCh coordinates the shutdown of logCh + doneCh chan struct{} + // droppedCount is the current count of messages // that were dropped from the logCh buffer. // only access under lock @@ -33,10 +49,15 @@ type Monitor struct { // New creates a new Monitor. Start must be called in order to actually start // streaming logs -func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor { - sw := &Monitor{ +func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor { + return new(buf, logger, opts) +} + +func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor { + sw := &monitor{ logger: logger, logCh: make(chan []byte, buf), + doneCh: make(chan struct{}, 1), bufSize: buf, droppedDuration: 3 * time.Second, } @@ -48,10 +69,14 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor return sw } +// Stop stops the monitoring process +func (d *monitor) Stop() { + close(d.doneCh) +} + // Start registers a sink on the monitor's logger and starts sending -// received log messages over the returned channel. A non-nil -// stopCh can be used to de-register the sink and stop log streaming -func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { +// received log messages over the returned channel. +func (d *monitor) Start() <-chan []byte { d.logger.RegisterSink(d.sink) streamCh := make(chan []byte, d.bufSize) @@ -61,13 +86,13 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { select { case log := <-d.logCh: select { - case <-stopCh: + case <-d.doneCh: d.logger.DeregisterSink(d.sink) close(d.logCh) return case streamCh <- log: } - case <-stopCh: + case <-d.doneCh: d.Lock() defer d.Unlock() @@ -83,7 +108,7 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { LOOP: for { select { - case <-stopCh: + case <-d.doneCh: break LOOP case <-time.After(d.droppedDuration): d.Lock() @@ -92,6 +117,8 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { if d.droppedCount > 0 { dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) select { + case <-d.doneCh: + break LOOP // Try sending dropped message count to logCh in case // there is room in the buffer now. case d.logCh <- []byte(dropped): @@ -107,6 +134,7 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { } d.droppedCount = 0 } + // unlock after handling dropped message d.Unlock() } } @@ -117,10 +145,17 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { // Write attempts to send latest log to logCh // it drops the log if channel is unavailable to receive -func (d *Monitor) Write(p []byte) (n int, err error) { +func (d *monitor) Write(p []byte) (n int, err error) { d.Lock() defer d.Unlock() + // ensure logCh is still open + select { + case <-d.doneCh: + return + default: + } + bytes := make([]byte, len(p)) copy(bytes, p) diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go index 2aa91d1bd62d..697b5bdc58c4 100644 --- a/command/agent/monitor/monitor_test.go +++ b/command/agent/monitor/monitor_test.go @@ -21,9 +21,8 @@ func TestMonitor_Start(t *testing.T) { Level: log.Debug, }) - closeCh := make(chan struct{}) - - logCh := m.Start(closeCh) + logCh := m.Start() + defer m.Stop() go func() { logger.Debug("test log") @@ -49,7 +48,7 @@ func TestMonitor_DroppedMessages(t *testing.T) { Level: log.Warn, }) - m := New(5, logger, &log.LoggerOptions{ + m := new(5, logger, &log.LoggerOptions{ Level: log.Debug, }) m.droppedDuration = 5 * time.Millisecond @@ -57,7 +56,7 @@ func TestMonitor_DroppedMessages(t *testing.T) { doneCh := make(chan struct{}) defer close(doneCh) - logCh := m.Start(doneCh) + logCh := m.Start() for i := 0; i <= 100; i++ { logger.Debug(fmt.Sprintf("test message %d", i)) diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 3c4c15a5355f..bb9281441f13 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -65,9 +65,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } // NodeID was empty, so monitor this current server - stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) - defer close(stopCh) defer cancel() monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{ @@ -97,7 +95,8 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { } }() - logCh := monitor.Start(stopCh) + logCh := monitor.Start() + defer monitor.Stop() initialOffset := int64(0) // receive logs and build frames