diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 48e462c722b..aa9731d7961 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -74,6 +74,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff] - Add experimental Kafka output. {pull}942[942] - Add config file option to configure GOMAXPROCS. {pull}969[969] - Added a `fields` and `fields_under_root` as options available under the `shipper` configuration {pull}1092[1092] +- Ensure proper shutdown of libbeat. {pull}1075[1075] *Packetbeat* - Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803] diff --git a/libbeat/common/worker.go b/libbeat/common/worker.go new file mode 100644 index 00000000000..508dc513ab7 --- /dev/null +++ b/libbeat/common/worker.go @@ -0,0 +1,45 @@ +package common + +import ( + "sync" +) + +// WorkerSignal ensure all events have been +// treated before closing Go routines +type WorkerSignal struct { + Done chan struct{} + wgEvent sync.WaitGroup + wgWorker sync.WaitGroup +} + +func NewWorkerSignal() *WorkerSignal { + w := &WorkerSignal{} + w.Init() + return w +} + +func (ws *WorkerSignal) Init() { + ws.Done = make(chan struct{}) +} + +func (ws *WorkerSignal) AddEvent(delta int) { + ws.wgEvent.Add(delta) +} + +func (ws *WorkerSignal) DoneEvent() { + ws.wgEvent.Done() +} + +func (ws *WorkerSignal) WorkerStart() { + ws.wgWorker.Add(1) +} + +func (ws *WorkerSignal) WorkerFinished() { + ws.wgWorker.Done() +} + +func (ws *WorkerSignal) Stop() { + ws.wgEvent.Wait() // Wait for all events to be dealt with + close(ws.Done) // Ask Go routines to exit + ws.wgWorker.Wait() // Wait for Go routines to finish +} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index aa73c948a9d..c0a3891a185 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -45,6 +45,11 @@ func writeBuffer(buf []byte) error { return nil } +// Implement Outputer +func (c *console) Close() error { + return nil +} + func (c *console) PublishEvent( s outputs.Signaler, opts outputs.Options, diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 550787f2a13..ec45557cf37 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -204,6 +204,10 @@ func makeClientFactory( } } +func (out *elasticsearchOutput) Close() error { + return out.mode.Close() +} + func (out *elasticsearchOutput) PublishEvent( signaler outputs.Signaler, opts outputs.Options, diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 720b42f2511..a63ffd2f9ea 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -64,6 +64,11 @@ func (out *fileOutput) init(config config) error { return nil } +// Implement Outputer +func (out *fileOutput) Close() error { + return nil +} + func (out *fileOutput) PublishEvent( trans outputs.Signaler, opts outputs.Options, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index b3384465ce8..a2c1a8ebcf0 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -109,6 +109,10 @@ func (k *kafka) init(cfg *ucfg.Config) error { return nil } +func (k *kafka) Close() error { + return k.mode.Close() +} + func (k *kafka) PublishEvent( signal outputs.Signaler, opts outputs.Options, diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 0f5d17714b0..c4b3b624f90 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -112,6 +112,10 @@ func makeTLSClient(port int, tls *tls.Config) func(string) (TransportClient, err } } +func (lj *logstash) Close() error { + return lj.mode.Close() +} + // TODO: update Outputer interface to support multiple events for batch-like // processing (e.g. for filebeat). Batch like processing might reduce // send/receive overhead per event for other implementors too. diff --git a/libbeat/outputs/mode/balance.go b/libbeat/outputs/mode/balance.go index 98477672c8c..d3e61ba2a88 100644 --- a/libbeat/outputs/mode/balance.go +++ b/libbeat/outputs/mode/balance.go @@ -40,9 +40,8 @@ type LoadBalancerMode struct { // block until event has been successfully published. maxAttempts int - // waitGroup + signaling channel for handling shutdown - wg sync.WaitGroup - done chan struct{} + // WorkerSignal for handling events and a clean shutdown + ws common.WorkerSignal // channels for forwarding work items to workers. // The work channel is used by publisher to insert new events @@ -83,18 +82,16 @@ func NewLoadBalancerMode( work: make(chan eventsMessage), retries: make(chan eventsMessage, len(clients)*2), - done: make(chan struct{}), } + m.ws.Init() m.start(clients) return m, nil } -// Close stops all workers and closes all open connections. In flight events -// are signaled as failed. +// Close waits for the workers to end and connections to close. func (m *LoadBalancerMode) Close() error { - close(m.done) - m.wg.Wait() + m.ws.Stop() return nil } @@ -128,8 +125,9 @@ func (m *LoadBalancerMode) publishEventsMessage( } msg.attemptsLeft = maxAttempts + m.ws.AddEvent(1) if ok := m.forwardEvent(m.work, msg); !ok { - dropping(msg) + dropping(msg, &m.ws) } return nil } @@ -141,7 +139,7 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) { if client.IsConnected() { _ = client.Close() } - m.wg.Done() + m.ws.WorkerFinished() }() waitStart.Done() @@ -149,7 +147,7 @@ func (m *LoadBalancerMode) start(clients []ProtocolClient) { } for _, client := range clients { - m.wg.Add(1) + m.ws.WorkerStart() waitStart.Add(1) go worker(client) } @@ -160,7 +158,7 @@ func (m *LoadBalancerMode) clientLoop(client ProtocolClient) { debug("load balancer: start client loop") defer debug("load balancer: stop client loop") - backoff := newBackoff(m.done, m.waitRetry, m.maxWaitRetry) + backoff := newBackoff(m.ws.Done, m.waitRetry, m.maxWaitRetry) done := false for !done { @@ -190,7 +188,7 @@ func (m *LoadBalancerMode) sendLoop(client ProtocolClient, backoff *backoff) boo for { var msg eventsMessage select { - case <-m.done: + case <-m.ws.Done: return true case msg = <-m.retries: // receive message from other failed worker case msg = <-m.work: // receive message from publisher @@ -208,7 +206,6 @@ func (m *LoadBalancerMode) onMessage( client ProtocolClient, msg eventsMessage, ) (bool, error) { - done := false if msg.event != nil { err := client.PublishEvent(msg.event) @@ -254,7 +251,7 @@ func (m *LoadBalancerMode) onMessage( if m.maxAttempts > 0 && msg.attemptsLeft == 0 { // no more attempts left => drop - dropping(msg) + dropping(msg, &m.ws) return done, err } @@ -265,15 +262,15 @@ func (m *LoadBalancerMode) onMessage( } outputs.SignalCompleted(msg.signaler) + m.ws.DoneEvent() return done, nil } func (m *LoadBalancerMode) onFail(msg eventsMessage, err error) { - logp.Info("Error publishing events (retrying): %s", err) if !m.forwardEvent(m.retries, msg) { - dropping(msg) + dropping(msg, &m.ws) } } @@ -285,7 +282,7 @@ func (m *LoadBalancerMode) forwardEvent( select { case ch <- msg: return true - case <-m.done: // shutdown + case <-m.ws.Done: // shutdown return false } } else { @@ -293,7 +290,7 @@ func (m *LoadBalancerMode) forwardEvent( select { case ch <- msg: return true - case <-m.done: // shutdown + case <-m.ws.Done: // shutdown return false case <-time.After(m.timeout): } @@ -304,8 +301,9 @@ func (m *LoadBalancerMode) forwardEvent( // dropping is called when a message is dropped. It updates the // relevant counters and sends a failed signal. -func dropping(msg eventsMessage) { +func dropping(msg eventsMessage, ws *common.WorkerSignal) { debug("messages dropped") messagesDropped.Add(1) outputs.SignalFailed(msg.signaler, nil) + ws.DoneEvent() } diff --git a/libbeat/outputs/mode/balance_async.go b/libbeat/outputs/mode/balance_async.go index a1994524596..d9c26978098 100644 --- a/libbeat/outputs/mode/balance_async.go +++ b/libbeat/outputs/mode/balance_async.go @@ -42,9 +42,8 @@ type AsyncLoadBalancerMode struct { // block until event has been successfully published. maxAttempts int - // waitGroup + signaling channel for handling shutdown - wg sync.WaitGroup - done chan struct{} + // WorkerSignal for handling events and a clean shutdown + ws common.WorkerSignal // channels for forwarding work items to workers. // The work channel is used by publisher to insert new events @@ -80,8 +79,8 @@ func NewAsyncLoadBalancerMode( work: make(chan eventsMessage), retries: make(chan eventsMessage, len(clients)*2), - done: make(chan struct{}), } + m.ws.Init() m.start(clients) return m, nil @@ -90,8 +89,7 @@ func NewAsyncLoadBalancerMode( // Close stops all workers and closes all open connections. In flight events // are signaled as failed. func (m *AsyncLoadBalancerMode) Close() error { - close(m.done) - m.wg.Wait() + m.ws.Stop() return nil } @@ -129,8 +127,9 @@ func (m *AsyncLoadBalancerMode) publishEventsMessage( msg.attemptsLeft = maxAttempts debug("publish events with attempts=%v", msg.attemptsLeft) + m.ws.AddEvent(1) if ok := m.forwardEvent(m.work, msg); !ok { - dropping(msg) + dropping(msg, &m.ws) } return nil } @@ -142,12 +141,12 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) { if client.IsConnected() { _ = client.Close() } - m.wg.Done() + m.ws.WorkerFinished() }() waitStart.Done() - backoff := newBackoff(m.done, m.waitRetry, m.maxWaitRetry) + backoff := newBackoff(m.ws.Done, m.waitRetry, m.maxWaitRetry) for { // reconnect loop for !client.IsConnected() { @@ -163,7 +162,7 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) { // receive and process messages var msg eventsMessage select { - case <-m.done: + case <-m.ws.Done: return case msg = <-m.retries: // receive message from other failed worker debug("events from retries queue") @@ -179,7 +178,7 @@ func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) { } for _, client := range clients { - m.wg.Add(1) + m.ws.WorkerStart() waitStart.Add(1) go worker(client) } @@ -220,6 +219,7 @@ func handlePublishEventResult(m *AsyncLoadBalancerMode, msg eventsMessage) func( m.onFail(false, msg, err) } else { outputs.SignalCompleted(msg.signaler) + m.ws.DoneEvent() } } } @@ -253,7 +253,7 @@ func handlePublishEventsResult( if m.maxAttempts > 0 && msg.attemptsLeft == 0 { // no more attempts left => drop - dropping(msg) + dropping(msg, &m.ws) return } @@ -268,7 +268,7 @@ func handlePublishEventsResult( debug("add non-published events back into pipeline: %v", len(events)) msg.events = events if ok := m.forwardEvent(m.retries, msg); !ok { - dropping(msg) + dropping(msg, &m.ws) } return } @@ -276,6 +276,7 @@ func handlePublishEventsResult( // all events published -> signal success debug("async bulk publish success") outputs.SignalCompleted(msg.signaler) + m.ws.DoneEvent() } } @@ -284,7 +285,7 @@ func (m *AsyncLoadBalancerMode) onFail(async bool, msg eventsMessage, err error) logp.Info("Error publishing events (retrying): %s", err) if ok := m.forwardEvent(m.retries, msg); !ok { - dropping(msg) + dropping(msg, &m.ws) } } @@ -306,7 +307,7 @@ func (m *AsyncLoadBalancerMode) forwardEvent( case ch <- msg: debug("message forwarded") return true - case <-m.done: // shutdown + case <-m.ws.Done: // shutdown debug("shutting down") return false } @@ -316,7 +317,7 @@ func (m *AsyncLoadBalancerMode) forwardEvent( case ch <- msg: debug("message forwarded") return true - case <-m.done: // shutdown + case <-m.ws.Done: // shutdown debug("shutting down") return false case <-time.After(m.timeout): diff --git a/libbeat/outputs/mode/balance_async_test.go b/libbeat/outputs/mode/balance_async_test.go index 515b23c24da..7ca3b70ce2d 100644 --- a/libbeat/outputs/mode/balance_async_test.go +++ b/libbeat/outputs/mode/balance_async_test.go @@ -22,9 +22,8 @@ func TestAsyncLBStartStop(t *testing.T) { } func testAsyncLBFailSendWithoutActiveConnection(t *testing.T, events []eventInfo) { - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) - } + enableLogging([]string{"*"}) + errFail := errors.New("fail connect") mode, _ := NewAsyncConnectionMode( []AsyncProtocolClient{ @@ -57,6 +56,8 @@ func TestAsyncLBFailSendMultWithoutActiveConnections(t *testing.T) { } func testAsyncLBOKSend(t *testing.T, events []eventInfo) { + enableLogging([]string{"*"}) + var collected [][]common.MapStr mode, _ := NewAsyncConnectionMode( []AsyncProtocolClient{ diff --git a/libbeat/outputs/mode/mode_test.go b/libbeat/outputs/mode/mode_test.go index 21da4d39b88..b111a62b5b0 100644 --- a/libbeat/outputs/mode/mode_test.go +++ b/libbeat/outputs/mode/mode_test.go @@ -217,7 +217,12 @@ func testMode( expectedSignals []bool, collectedEvents *[][]common.MapStr, ) { - defer mode.Close() + defer func() { + err := mode.Close() + if err != nil { + t.Fatal(err) + } + }() if events == nil { return diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 6822582c1ac..3a168c0dced 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -13,8 +13,9 @@ type Options struct { type Outputer interface { // Publish event - PublishEvent(trans Signaler, opts Options, event common.MapStr) error + + Close() error } type TopologyOutputer interface { diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index c8d342b1484..7ad9c5d6910 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -160,8 +160,8 @@ func (out *redisOutput) Connect() error { return nil } -func (out *redisOutput) Close() { - _ = out.Conn.Close() +func (out *redisOutput) Close() error { + return out.Conn.Close() } func (out *redisOutput) Reconnect() { diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index f6851dfce37..3e0bf5fc179 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -11,29 +11,24 @@ import ( type asyncPublisher struct { outputs []worker pub *PublisherType - ws workerSignal } const ( defaultBulkSize = 2048 ) -func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher { +func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int, ws *common.WorkerSignal) *asyncPublisher { p := &asyncPublisher{pub: pub} - p.ws.Init() var outputs []worker for _, out := range pub.Output { - outputs = append(outputs, asyncOutputer(&p.ws, hwm, bulkHWM, out)) + outputs = append(outputs, asyncOutputer(ws, hwm, bulkHWM, out)) } p.outputs = outputs return p } -// onStop will send stop signal to message batching workers -func (p *asyncPublisher) onStop() { p.ws.stop() } - func (p *asyncPublisher) client() eventPublisher { return p } @@ -67,7 +62,7 @@ func (p *asyncPublisher) send(m message) { } } -func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker { +func asyncOutputer(ws *common.WorkerSignal, hwm, bulkHWM int, worker *outputWorker) worker { config := worker.config flushInterval := time.Duration(config.FlushInterval) * time.Second diff --git a/libbeat/publisher/async_test.go b/libbeat/publisher/async_test.go index 0aec55404ea..8a129c08997 100644 --- a/libbeat/publisher/async_test.go +++ b/libbeat/publisher/async_test.go @@ -9,10 +9,13 @@ import ( ) func TestAsyncPublishEvent(t *testing.T) { + enableLogging([]string{"*"}) // Init testPub := newTestPublisherNoBulk(CompletedResponse) event := testEvent() + defer testPub.pub.Stop() + // Execute. Async PublishEvent always immediately returns true. assert.True(t, testPub.asyncPublishEvent(event)) @@ -29,6 +32,8 @@ func TestAsyncPublishEvents(t *testing.T) { testPub := newTestPublisherNoBulk(CompletedResponse) events := []common.MapStr{testEvent(), testEvent()} + defer testPub.pub.Stop() + // Execute. Async PublishEvent always immediately returns true. assert.True(t, testPub.asyncPublishEvents(events)) @@ -41,6 +46,25 @@ func TestAsyncPublishEvents(t *testing.T) { assert.Equal(t, events[1], msgs[0].events[1]) } +func TestAsyncShutdownPublishEvents(t *testing.T) { + // Init + testPub := newTestPublisherNoBulk(CompletedResponse) + events := []common.MapStr{testEvent(), testEvent()} + + // Execute. Async PublishEvent always immediately returns true. + assert.True(t, testPub.asyncPublishEvents(events)) + + testPub.pub.Stop() + + // Validate + msgs := testPub.outputMsgHandler.msgs + close(msgs) + assert.Equal(t, 1, len(msgs)) + msg := <-msgs + assert.Equal(t, events[0], msg.events[0]) + assert.Equal(t, events[1], msg.events[1]) +} + func TestBulkAsyncPublishEvent(t *testing.T) { if testing.Verbose() { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) @@ -50,6 +74,8 @@ func TestBulkAsyncPublishEvent(t *testing.T) { testPub := newTestPublisherWithBulk(CompletedResponse) event := testEvent() + defer testPub.pub.Stop() + // Execute. Async PublishEvent always immediately returns true. assert.True(t, testPub.asyncPublishEvent(event)) @@ -65,10 +91,12 @@ func TestBulkAsyncPublishEvent(t *testing.T) { } func TestBulkAsyncPublishEvents(t *testing.T) { - // init + // Init testPub := newTestPublisherWithBulk(CompletedResponse) events := []common.MapStr{testEvent(), testEvent()} + defer testPub.pub.Stop() + // Async PublishEvent always immediately returns true. assert.True(t, testPub.asyncPublishEvents(events)) @@ -79,3 +107,22 @@ func TestBulkAsyncPublishEvents(t *testing.T) { assert.Equal(t, events[0], msgs[0].events[0]) assert.Equal(t, events[1], msgs[0].events[1]) } + +func TestBulkAsyncShutdownPublishEvents(t *testing.T) { + // Init + testPub := newTestPublisherWithBulk(CompletedResponse) + events := []common.MapStr{testEvent(), testEvent()} + + // Async PublishEvent always immediately returns true. + assert.True(t, testPub.asyncPublishEvents(events)) + + testPub.pub.Stop() + + // Validate + msgs := testPub.outputMsgHandler.msgs + close(msgs) + assert.Equal(t, 1, len(msgs)) + msg := <-msgs + assert.Equal(t, events[0], msg.events[0]) + assert.Equal(t, events[1], msg.events[1]) +} diff --git a/libbeat/publisher/bulk.go b/libbeat/publisher/bulk.go index a10dff6f6a1..b8c8455cdb9 100644 --- a/libbeat/publisher/bulk.go +++ b/libbeat/publisher/bulk.go @@ -9,7 +9,7 @@ import ( type bulkWorker struct { output worker - ws *workerSignal + ws *common.WorkerSignal queue chan message bulkQueue chan message @@ -22,7 +22,7 @@ type bulkWorker struct { } func newBulkWorker( - ws *workerSignal, hwm int, bulkHWM int, + ws *common.WorkerSignal, hwm int, bulkHWM int, output worker, flushInterval time.Duration, maxBatchSize int, @@ -38,15 +38,17 @@ func newBulkWorker( pending: nil, } - ws.wg.Add(1) + b.ws.WorkerStart() go b.run() return b } func (b *bulkWorker) send(m message) { if m.events == nil { + b.ws.AddEvent(1) b.queue <- m } else { + b.ws.AddEvent(len(m.events)) b.bulkQueue <- m } } @@ -56,20 +58,24 @@ func (b *bulkWorker) run() { for { select { - case <-b.ws.done: + case <-b.ws.Done: return case m := <-b.queue: b.onEvent(&m.context, m.event) case m := <-b.bulkQueue: b.onEvents(&m.context, m.events) case <-b.flushTicker.C: - if len(b.events) > 0 { - b.publish() - } + b.flush() } } } +func (b *bulkWorker) flush() { + if len(b.events) > 0 { + b.publish() + } +} + func (b *bulkWorker) onEvent(ctx *Context, event common.MapStr) { b.events = append(b.events, event) b.guaranteed = b.guaranteed || ctx.Guaranteed @@ -127,6 +133,7 @@ func (b *bulkWorker) publish() { events: b.events, }) + b.ws.AddEvent(-len(b.events)) b.pending = nil b.guaranteed = false b.events = make([]common.MapStr, 0, b.maxBatchSize) @@ -134,7 +141,7 @@ func (b *bulkWorker) publish() { func (b *bulkWorker) shutdown() { b.flushTicker.Stop() - stopQueue(b.queue) - stopQueue(b.bulkQueue) - b.ws.wg.Done() + close(b.queue) + close(b.bulkQueue) + b.ws.WorkerFinished() } diff --git a/libbeat/publisher/bulk_test.go b/libbeat/publisher/bulk_test.go index a28ce425f3a..8177a96e2bb 100644 --- a/libbeat/publisher/bulk_test.go +++ b/libbeat/publisher/bulk_test.go @@ -18,12 +18,14 @@ const ( // Send a single event to the bulkWorker and verify that the event // is sent after the flush timeout occurs. func TestBulkWorkerSendSingle(t *testing.T) { + enableLogging([]string{"*"}) + ws := common.NewWorkerSignal() + defer ws.Stop() + mh := &testMessageHandler{ response: CompletedResponse, msgs: make(chan message, queueSize), } - ws := newWorkerSignal() - defer ws.stop() bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize) s := newTestSignaler() @@ -37,16 +39,39 @@ func TestBulkWorkerSendSingle(t *testing.T) { assert.Equal(t, m.event, msgs[0].events[0]) } +// Send a single event to a bulkWorker and verify that the event +// is sent (flushed) after shutdown. +func TestBulkWorkerShutdownSendSingle(t *testing.T) { + ws := common.NewWorkerSignal() + mh := &testMessageHandler{ + response: CompletedResponse, + msgs: make(chan message, queueSize), + } + bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize) + + s := newTestSignaler() + m := testMessage(s, testEvent()) + bw.send(m) + + ws.Stop() + + close(mh.msgs) + assert.Equal(t, 1, len(mh.msgs)) + assert.True(t, s.wait()) + assert.Equal(t, m.event, (<-mh.msgs).events[0]) +} + // Send a batch of events to the bulkWorker and verify that a single // message is distributed (not triggered by flush timeout). func TestBulkWorkerSendBatch(t *testing.T) { // Setup + ws := common.NewWorkerSignal() + defer ws.Stop() + mh := &testMessageHandler{ response: CompletedResponse, msgs: make(chan message, queueSize), } - ws := newWorkerSignal() - defer ws.stop() bw := newBulkWorker(ws, queueSize, 0, mh, time.Duration(time.Hour), maxBatchSize) events := make([]common.MapStr, maxBatchSize) @@ -71,12 +96,13 @@ func TestBulkWorkerSendBatch(t *testing.T) { // that the events are split across two messages. func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) { // Setup + ws := common.NewWorkerSignal() + defer ws.Stop() + mh := &testMessageHandler{ response: CompletedResponse, msgs: make(chan message), } - ws := newWorkerSignal() - defer ws.stop() bw := newBulkWorker(ws, queueSize, 0, mh, flushInterval, maxBatchSize) // Send diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index 68f625f505e..283c3efcc3b 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -3,12 +3,20 @@ package publisher import ( "fmt" "sync/atomic" + "testing" "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" ) +func enableLogging(selectors []string) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, selectors) + } +} + // testMessageHandler receives messages and acknowledges them through // their Signaler. type testMessageHandler struct { @@ -132,6 +140,7 @@ const ( ) func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { + pub := &PublisherType{} mh := &testMessageHandler{ msgs: make(chan message, 10), response: response, @@ -140,17 +149,14 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { ow := &outputWorker{} ow.config.BulkMaxSize = bulkSize ow.handler = mh - ws := workerSignal{} - ow.messageWorker.init(&ws, defaultChanSize, defaultBulkChanSize, mh) + ow.messageWorker.init(&pub.wsOutput, defaultChanSize, defaultBulkChanSize, mh) + + pub.Output = []*outputWorker{ow} - pub := &PublisherType{ - Output: []*outputWorker{ow}, - wsOutput: ws, - } pub.wsOutput.Init() pub.wsPublisher.Init() pub.syncPublisher = newSyncPublisher(pub, defaultChanSize, defaultBulkChanSize) - pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize) + pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize, &pub.wsPublisher) return &testPublisher{ pub: pub, outputMsgHandler: mh, diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index acf94b687e2..a65d64425c3 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -36,7 +36,7 @@ var ( func newOutputWorker( cfg *ucfg.Config, out outputs.Outputer, - ws *workerSignal, + ws *common.WorkerSignal, hwm int, bulkHWM int, ) *outputWorker { @@ -56,7 +56,12 @@ func newOutputWorker( return o } -func (o *outputWorker) onStop() {} +func (o *outputWorker) onStop() { + err := o.out.Close() + if err != nil { + logp.Info("Failed to close outputer: %s", err) + } +} func (o *outputWorker) onMessage(m message) { if m.event != nil { diff --git a/libbeat/publisher/output_test.go b/libbeat/publisher/output_test.go index 2c71b5c9246..41c0cf06f25 100644 --- a/libbeat/publisher/output_test.go +++ b/libbeat/publisher/output_test.go @@ -17,6 +17,10 @@ type testOutputer struct { var _ outputs.Outputer = &testOutputer{} +func (t *testOutputer) Close() error { + return nil +} + // PublishEvent writes events to a channel then calls Completed on trans. // It always returns nil. func (t *testOutputer) PublishEvent(trans outputs.Signaler, opts outputs.Options, @@ -32,7 +36,7 @@ func TestOutputWorker(t *testing.T) { ow := newOutputWorker( ucfg.New(), outputer, - newWorkerSignal(), + common.NewWorkerSignal(), 1, 0) ow.onStop() // Noop diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 92d80d17e56..e5ed61fb08e 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -64,13 +64,11 @@ type PublisherType struct { RefreshTopologyTimer <-chan time.Time - // wsOutput and wsPublisher should be used for proper shutdown of publisher - // (not implemented yet). On shutdown the publisher should be finished first - // and the outputers next, so no publisher will attempt to send messages on - // closed channels. + // On shutdown the publisher is finished first and the outputers next, + // so no publisher will attempt to send messages on closed channels. // Note: beat data producers must be shutdown before the publisher plugin - wsOutput workerSignal - wsPublisher workerSignal + wsPublisher common.WorkerSignal + wsOutput common.WorkerSignal syncPublisher *syncPublisher asyncPublisher *asyncPublisher @@ -219,8 +217,8 @@ func (publisher *PublisherType) init( publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip) - publisher.wsOutput.Init() publisher.wsPublisher.Init() + publisher.wsOutput.Init() if !publisher.disabled { plugins, err := outputs.InitOutputs(beatName, configs, shipper.Topology_expire) @@ -320,9 +318,14 @@ func (publisher *PublisherType) init( go publisher.UpdateTopologyPeriodically() } - publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM) + publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM, &publisher.wsPublisher) publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM) publisher.client = newClient(publisher) return nil } + +func (publisher *PublisherType) Stop() { + publisher.wsPublisher.Stop() + publisher.wsOutput.Stop() +} diff --git a/libbeat/publisher/sync_test.go b/libbeat/publisher/sync_test.go index 6511cd3ae0d..6bbff8a2ee6 100644 --- a/libbeat/publisher/sync_test.go +++ b/libbeat/publisher/sync_test.go @@ -4,11 +4,11 @@ import ( "testing" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/stretchr/testify/assert" ) func TestSyncPublishEventSuccess(t *testing.T) { + enableLogging([]string{"*"}) testPub := newTestPublisherNoBulk(CompletedResponse) event := testEvent() @@ -64,10 +64,6 @@ func TestSyncPublishEventsFailed(t *testing.T) { // Test that PublishEvent returns true when publishing is disabled. func TestSyncPublisherDisabled(t *testing.T) { - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) - } - testPub := newTestPublisherNoBulk(FailedResponse) testPub.pub.disabled = true event := testEvent() diff --git a/libbeat/publisher/worker.go b/libbeat/publisher/worker.go index 1e1832334a6..c57298187cc 100644 --- a/libbeat/publisher/worker.go +++ b/libbeat/publisher/worker.go @@ -2,10 +2,8 @@ package publisher import ( "expvar" - "sync" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/outputs" ) // Metrics that can retrieved through the expvar web interface. @@ -20,7 +18,7 @@ type worker interface { type messageWorker struct { queue chan message bulkQueue chan message - ws *workerSignal + ws *common.WorkerSignal handler messageHandler } @@ -30,28 +28,23 @@ type message struct { events []common.MapStr } -type workerSignal struct { - done chan struct{} - wg sync.WaitGroup -} - type messageHandler interface { onMessage(m message) onStop() } -func newMessageWorker(ws *workerSignal, hwm, bulkHWM int, h messageHandler) *messageWorker { +func newMessageWorker(ws *common.WorkerSignal, hwm, bulkHWM int, h messageHandler) *messageWorker { p := &messageWorker{} p.init(ws, hwm, bulkHWM, h) return p } -func (p *messageWorker) init(ws *workerSignal, hwm, bulkHWM int, h messageHandler) { +func (p *messageWorker) init(ws *common.WorkerSignal, hwm, bulkHWM int, h messageHandler) { p.queue = make(chan message, hwm) p.bulkQueue = make(chan message, bulkHWM) p.ws = ws p.handler = h - ws.wg.Add(1) + defer p.ws.WorkerStart() go p.run() } @@ -59,26 +52,31 @@ func (p *messageWorker) run() { defer p.shutdown() for { select { - case <-p.ws.done: + case <-p.ws.Done: return case m := <-p.queue: - messagesInWorkerQueues.Add(-1) - p.handler.onMessage(m) + p.onEvent(m) case m := <-p.bulkQueue: - messagesInWorkerQueues.Add(-1) - p.handler.onMessage(m) + p.onEvent(m) } } } func (p *messageWorker) shutdown() { p.handler.onStop() - stopQueue(p.queue) - stopQueue(p.bulkQueue) - p.ws.wg.Done() + close(p.queue) + close(p.bulkQueue) + p.ws.WorkerFinished() +} + +func (p *messageWorker) onEvent(m message) { + messagesInWorkerQueues.Add(-1) + p.handler.onMessage(m) + p.ws.DoneEvent() } func (p *messageWorker) send(m message) { + p.ws.AddEvent(1) if m.event != nil { p.queue <- m } else { @@ -86,25 +84,3 @@ func (p *messageWorker) send(m message) { } messagesInWorkerQueues.Add(1) } - -func (ws *workerSignal) stop() { - close(ws.done) - ws.wg.Wait() -} - -func newWorkerSignal() *workerSignal { - w := &workerSignal{} - w.Init() - return w -} - -func (ws *workerSignal) Init() { - ws.done = make(chan struct{}) -} - -func stopQueue(qu chan message) { - close(qu) - for msg := range qu { // clear queue and send fail signal - outputs.SignalFailed(msg.context.Signal, nil) - } -} diff --git a/libbeat/publisher/worker_test.go b/libbeat/publisher/worker_test.go index 030532e01e1..3ecab3fae4a 100644 --- a/libbeat/publisher/worker_test.go +++ b/libbeat/publisher/worker_test.go @@ -4,14 +4,16 @@ import ( "sync/atomic" "testing" + "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) // Test sending events through the messageWorker. func TestMessageWorkerSend(t *testing.T) { + enableLogging([]string{"*"}) + // Setup - ws := &workerSignal{} - ws.Init() + ws := common.NewWorkerSignal() mh := &testMessageHandler{msgs: make(chan message, 10), response: true} mw := newMessageWorker(ws, 10, 0, mh) @@ -25,7 +27,7 @@ func TestMessageWorkerSend(t *testing.T) { m2 := message{context: Context{Signal: s2}} mw.send(m2) - // Verify that the messageWorker pushed to two messages to the + // Verify that the messageWorker pushed the two messages to the // messageHandler. msgs, err := mh.waitForMessages(2) if err != nil { @@ -38,25 +40,40 @@ func TestMessageWorkerSend(t *testing.T) { assert.Contains(t, msgs, m2) assert.True(t, s2.wait()) - // Verify that stopping workerSignal causes a onStop notification - // in the messageHandler. - ws.stop() + ws.Stop() assert.True(t, atomic.LoadUint32(&mh.stopped) == 1) } -// Test that stopQueue invokes the Failed callback on all events in the queue. -func TestMessageWorkerStopQueue(t *testing.T) { +// Test that events sent before shutdown are pushed to the messageHandler. +func TestMessageWorkerShutdownSend(t *testing.T) { + enableLogging([]string{"*"}) + + // Setup + ws := common.NewWorkerSignal() + mh := &testMessageHandler{msgs: make(chan message, 10), response: true} + mw := newMessageWorker(ws, 10, 0, mh) + + // Send an event. s1 := newTestSignaler() m1 := message{context: Context{Signal: s1}} + mw.send(m1) + // Send another event. s2 := newTestSignaler() m2 := message{context: Context{Signal: s2}} + mw.send(m2) + + ws.Stop() + assert.True(t, atomic.LoadUint32(&mh.stopped) == 1) - qu := make(chan message, 2) - qu <- m1 - qu <- m2 + // Verify that the messageWorker pushed the two messages to the + // messageHandler. + close(mh.msgs) + assert.Equal(t, 2, len(mh.msgs)) - stopQueue(qu) - assert.False(t, s1.wait()) - assert.False(t, s2.wait()) + // Verify the messages and the signals. + assert.Equal(t, <-mh.msgs, m1) + assert.True(t, s1.wait()) + assert.Equal(t, <-mh.msgs, m2) + assert.True(t, s2.wait()) }