From ccefba0bf0108e4f7817f9ea5110a000021fbb76 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 31 Mar 2020 12:55:44 -0700 Subject: [PATCH 01/34] Refactoring: extracting common fields into worker struct --- libbeat/publisher/pipeline/output.go | 30 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 02ec2975db6..e07b9c4330d 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -23,20 +23,22 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" ) -// clientWorker manages output client of type outputs.Client, not supporting reconnect. -type clientWorker struct { +type worker struct { observer outputObserver qu workQueue - client outputs.Client closed atomic.Bool } +// clientWorker manages output client of type outputs.Client, not supporting reconnect. +type clientWorker struct { + worker + client outputs.Client +} + // netClientWorker manages reconnectable output clients of type outputs.NetworkClient. type netClientWorker struct { - observer outputObserver - qu workQueue - client outputs.NetworkClient - closed atomic.Bool + worker + client outputs.NetworkClient batchSize int batchSizer func() int @@ -44,17 +46,21 @@ type netClientWorker struct { } func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { + w := worker{ + observer: observer, + qu: qu, + } + if nc, ok := client.(outputs.NetworkClient); ok { c := &netClientWorker{ - observer: observer, - qu: qu, - client: nc, - logger: logp.NewLogger("publisher_pipeline_output"), + worker: w, + client: nc, + logger: logp.NewLogger("publisher_pipeline_output"), } go c.run() return c } - c := &clientWorker{observer: observer, qu: qu, client: client} + c := &clientWorker{worker: w, client: client} go c.run() return c } From 7e5e57cb6b0c8e16ebfafb51e03fc1b6cd8e3847 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 31 Mar 2020 16:26:38 -0700 Subject: [PATCH 02/34] More refactoring --- libbeat/publisher/pipeline/output.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index e07b9c4330d..4e706b1b0f4 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -51,16 +51,21 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie qu: qu, } + var c interface { + outputWorker + run() + } + if nc, ok := client.(outputs.NetworkClient); ok { - c := &netClientWorker{ + c = &netClientWorker{ worker: w, client: nc, logger: logp.NewLogger("publisher_pipeline_output"), } - go c.run() - return c + } else { + c = &clientWorker{worker: w, client: client} } - c := &clientWorker{worker: w, client: client} + go c.run() return c } From 89aa8cbe7881b8c35c5d3a0dddda9a5a6969cdc5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 31 Mar 2020 17:41:11 -0700 Subject: [PATCH 03/34] Address goroutine leak in publisher --- libbeat/publisher/pipeline/output.go | 101 +++++++++++++-------------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 4e706b1b0f4..a0d63af5125 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,7 +18,6 @@ package pipeline import ( - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) @@ -26,7 +25,7 @@ import ( type worker struct { observer outputObserver qu workQueue - closed atomic.Bool + done chan struct{} } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -49,6 +48,7 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie w := worker{ observer: observer, qu: qu, + done: make(chan struct{}), } var c interface { @@ -71,80 +71,77 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie } func (w *clientWorker) Close() error { - w.closed.Store(true) + close(w.worker.done) return w.client.Close() } func (w *clientWorker) run() { - for !w.closed.Load() { - for batch := range w.qu { - if w.closed.Load() { - if batch != nil { - batch.Cancelled() - } - return - } + for { + // We wait for either the worker to be closed or for there to be a batch of + // events to publish. + select { - w.observer.outBatchSend(len(batch.events)) + case <-w.done: + return + case batch := <-w.qu: + w.observer.outBatchSend(len(batch.events)) if err := w.client.Publish(batch); err != nil { - break + return } + } } } func (w *netClientWorker) Close() error { - w.closed.Store(true) + close(w.worker.done) return w.client.Close() } func (w *netClientWorker) run() { - for !w.closed.Load() { - reconnectAttempts := 0 - - // start initial connect loop from first batch, but return - // batch to pipeline for other outputs to catch up while we're trying to connect - for batch := range w.qu { - batch.Cancelled() - - if w.closed.Load() { - w.logger.Infof("Closed connection to %v", w.client) - return - } + var ( + connected = false + reconnectAttempts = 0 + ) + + for { + // We wait for either the worker to be closed or for there to be a batch of + // events to publish. + select { + + case <-w.done: + return + + case batch := <-w.qu: + // Try to (re)connect so we can publish batch + if !connected { + // Return batch to other output workers while we try to (re)connect + batch.Cancelled() + + if reconnectAttempts == 0 { + w.logger.Infof("Connecting to %v", w.client) + } else { + w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + } - if reconnectAttempts > 0 { - w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) - } else { - w.logger.Infof("Connecting to %v", w.client) - } + err := w.client.Connect() + connected = err == nil + if connected { + w.logger.Infof("Connection to %v established", w.client) + reconnectAttempts = 0 + } else { + w.logger.Errorf("Failed to connect to %v: %v", w.client, err) + reconnectAttempts++ + } - err := w.client.Connect() - if err != nil { - w.logger.Errorf("Failed to connect to %v: %v", w.client, err) - reconnectAttempts++ continue } - w.logger.Infof("Connection to %v established", w.client) - reconnectAttempts = 0 - break - } - - // send loop - for batch := range w.qu { - if w.closed.Load() { - if batch != nil { - batch.Cancelled() - } - return - } - - err := w.client.Publish(batch) - if err != nil { + if err := w.client.Publish(batch); err != nil { w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop - break + connected = false } } } From 03db18931d1db8bf08485e4bfb4d21f97a5b8f6e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 31 Mar 2020 19:05:35 -0700 Subject: [PATCH 04/34] Workaround: add Connection: close header to prevent FD leaks --- libbeat/esleg/eslegclient/connection.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index b591307c444..facaa8104fa 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -407,6 +407,9 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) req.Host = host } + // TODO: workaround for output reloading leaking FDs until context.WithCancel is used on transport dialer instead + req.Header.Set("Connection", "close") + resp, err := conn.HTTP.Do(req) if err != nil { return 0, nil, err From 6d160f42147715ed42028814fdd0c58118359af9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 31 Mar 2020 19:39:31 -0700 Subject: [PATCH 05/34] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e6611da1dd..d1a162565fe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix bug with `monitoring.cluster_uuid` setting not always being exposed via GET /state Beats API. {issue}16732[16732] {pull}17420[17420] - Fix building on FreeBSD by removing build flags from `add_cloudfoundry_metadata` processor. {pull}17486[17486] - Do not rotate log files on startup when interval is configured and rotateonstartup is disabled. {pull}17613[17613] +- Fix goroutine leak and Elasticsearch output file descriptor leak when output reloading is in use. {issue}10491[10491] {pull}17381[17381] *Auditbeat* From c8a16c637811e5ce8a286c27be67e04e909464c8 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 1 Apr 2020 14:06:59 -0700 Subject: [PATCH 06/34] Adding IdleConnTimeout setting --- libbeat/esleg/eslegclient/connection.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index facaa8104fa..09817cecbe6 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -63,11 +63,15 @@ type ConnectionSettings struct { Parameters map[string]string CompressionLevel int EscapeHTML bool - Timeout time.Duration + + Timeout time.Duration + IdleConnTimeout time.Duration } // NewConnection returns a new Elasticsearch client func NewConnection(s ConnectionSettings) (*Connection, error) { + s = settingsWithDefaults(s) + u, err := url.Parse(s.URL) if err != nil { return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err) @@ -124,6 +128,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { DialTLS: tlsDialer.Dial, TLSClientConfig: s.TLS.ToConfig(), Proxy: proxy, + IdleConnTimeout: s.IdleConnTimeout, }, Timeout: s.Timeout, }, @@ -132,6 +137,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { }, nil } +func settingsWithDefaults(s ConnectionSettings) ConnectionSettings { + settings := s + if settings.IdleConnTimeout == 0 { + settings.IdleConnTimeout = 1 * time.Minute + } + + return settings +} + // NewClients returns a list of Elasticsearch clients based on the given // configuration. It accepts the same configuration parameters as the Elasticsearch // output, except for the output specific configuration options. If multiple hosts @@ -407,9 +421,6 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) req.Host = host } - // TODO: workaround for output reloading leaking FDs until context.WithCancel is used on transport dialer instead - req.Header.Set("Connection", "close") - resp, err := conn.HTTP.Do(req) if err != nil { return 0, nil, err From f28d0b7f9064b49196bab8fdecc15080e77358f7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 1 Apr 2020 14:12:29 -0700 Subject: [PATCH 07/34] Close idle connections when ES client is closed --- libbeat/esleg/eslegclient/connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 09817cecbe6..e1c20f795bd 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -280,6 +280,7 @@ func (conn *Connection) Ping() (string, error) { // Close closes a connection. func (conn *Connection) Close() error { + conn.HTTP.CloseIdleConnections() return nil } From d9bc4e5fce3ccd0ae4d9aa5e3e7fa98506cabc0d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 9 Apr 2020 11:15:06 -0700 Subject: [PATCH 08/34] When closing worker, make sure to cancel in-flight batches --- libbeat/publisher/pipeline/output.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index a0d63af5125..7eec5242269 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -70,8 +70,17 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie return c } +func (w *worker) close() { + close(w.done) + + // Cancel in-flight batches so they may be retried + for batch := range w.qu { + batch.Cancelled() + } +} + func (w *clientWorker) Close() error { - close(w.worker.done) + w.worker.close() return w.client.Close() } @@ -95,7 +104,7 @@ func (w *clientWorker) run() { } func (w *netClientWorker) Close() error { - close(w.worker.done) + w.worker.close() return w.client.Close() } From 5d3b10ecfef652b840ea4e77dcdbb9a29d4f34d4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 9 Apr 2020 17:18:02 -0700 Subject: [PATCH 09/34] Cancel batch + guard --- libbeat/publisher/pipeline/output.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 7eec5242269..6da0d2402a4 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -74,8 +74,13 @@ func (w *worker) close() { close(w.done) // Cancel in-flight batches so they may be retried - for batch := range w.qu { - batch.Cancelled() + for { + select { + case batch := <-w.qu: + batch.Cancelled() + default: + return + } } } @@ -94,6 +99,9 @@ func (w *clientWorker) run() { return case batch := <-w.qu: + if batch == nil { + continue + } w.observer.outBatchSend(len(batch.events)) if err := w.client.Publish(batch); err != nil { return @@ -123,6 +131,10 @@ func (w *netClientWorker) run() { return case batch := <-w.qu: + if batch == nil { + continue + } + // Try to (re)connect so we can publish batch if !connected { // Return batch to other output workers while we try to (re)connect From 5c4daff5578b206051c09cb566b355e451e731ff Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 9 Apr 2020 17:18:36 -0700 Subject: [PATCH 10/34] [WIP] Adding output reload test --- libbeat/publisher/pipeline/controller_test.go | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 libbeat/publisher/pipeline/controller_test.go diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go new file mode 100644 index 00000000000..a88c8d920d0 --- /dev/null +++ b/libbeat/publisher/pipeline/controller_test.go @@ -0,0 +1,115 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "testing" + "testing/quick" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/resources" + + "github.com/elastic/beats/v7/libbeat/outputs" + + "github.com/elastic/beats/v7/libbeat/publisher" + + "github.com/elastic/beats/v7/libbeat/common/atomic" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" +) + +func TestOutputReload(t *testing.T) { + tests := map[string]func(mockPublishFn) outputs.Client{ + "client": newMockClient, + "network_client": newMockNetworkClient, + } + + for name, ctor := range tests { + t.Run(name, func(t *testing.T) { + seedPRNG(t) + + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + err := quick.Check(func(q uint) bool { + numEventsToPublish := 500 + (q % 1000) // 500 to 1499 + numOutputReloads := 5 + (q % 10) // 5 to 14 + + queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { + return memqueue.NewQueue( + logp.L(), + memqueue.Settings{ + ACKListener: ackListener, + Events: int(numEventsToPublish), + }), nil + } + + var publishedCount atomic.Uint + countingPublishFn := func(batch publisher.Batch) error { + publishedCount.Add(uint(len(batch.Events()))) + return nil + } + + pipeline, err := New( + beat.Info{}, + Monitors{}, + queueFactory, + outputs.Group{}, + Settings{}, + ) + require.NoError(t, err) + defer pipeline.Close() + + pipelineClient, err := pipeline.Connect() + require.NoError(t, err) + defer pipelineClient.Close() + + go func() { + for i := uint(0); i < numEventsToPublish; i++ { + pipelineClient.Publish(beat.Event{}) + } + }() + + for i := uint(0); i < numOutputReloads; i++ { + outputClient := ctor(countingPublishFn) + out := outputs.Group{ + Clients: []outputs.Client{outputClient}, + } + pipeline.output.Set(out) + } + + timeout := 20 * time.Second + return waitUntilTrue(timeout, func() bool { + return uint(numEventsToPublish) == publishedCount.Load() + }) + }, &quick.Config{MaxCount: 25}) + + if err != nil { + t.Error(err) + } + }) + } +} From 71b1d55dbb8597cf5add3230f56245a1261f9de5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 9 Apr 2020 18:17:58 -0700 Subject: [PATCH 11/34] More WIP --- libbeat/publisher/pipeline/controller_test.go | 41 +++++++++++-------- libbeat/publisher/pipeline/output.go | 19 +++++++++ 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index a88c8d920d0..77e9e9ebb8f 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -18,32 +18,26 @@ package pipeline import ( + "fmt" "testing" "testing/quick" "time" - "github.com/elastic/beats/v7/libbeat/tests/resources" - - "github.com/elastic/beats/v7/libbeat/outputs" - - "github.com/elastic/beats/v7/libbeat/publisher" - + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/logp" - - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" - + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/beats/v7/libbeat/tests/resources" "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/beat" ) func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - "client": newMockClient, + //"client": newMockClient, "network_client": newMockNetworkClient, } @@ -55,8 +49,10 @@ func TestOutputReload(t *testing.T) { defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - numEventsToPublish := 500 + (q % 1000) // 500 to 1499 - numOutputReloads := 5 + (q % 10) // 5 to 14 + //numEventsToPublish := 500 + (q % 1000) // 500 to 1499 + //numOutputReloads := 5 + (q % 10) // 5 to 14 + numEventsToPublish := uint(20000) + numOutputReloads := uint(15) queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue( @@ -70,6 +66,7 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) + fmt.Printf("published so far: %v\n", publishedCount.Load()) return nil } @@ -98,14 +95,22 @@ func TestOutputReload(t *testing.T) { out := outputs.Group{ Clients: []outputs.Client{outputClient}, } + fmt.Println("reloading output...") pipeline.output.Set(out) } - timeout := 20 * time.Second - return waitUntilTrue(timeout, func() bool { + timeout := 5 * time.Second + success := waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - }, &quick.Config{MaxCount: 25}) + if !success { + fmt.Printf( + "numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v\n", + numOutputReloads, numEventsToPublish, publishedCount.Load(), + ) + } + return success + }, &quick.Config{MaxCount: 1}) if err != nil { t.Error(err) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 6da0d2402a4..7a451891306 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,6 +18,8 @@ package pipeline import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) @@ -26,6 +28,7 @@ type worker struct { observer outputObserver qu workQueue done chan struct{} + inFlight *Batch } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -71,14 +74,24 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie } func (w *worker) close() { + //fmt.Println("worker close called") close(w.done) + //if w.inFlight != nil { + // fmt.Println("Canceling in-flight batch before closing...") + // w.inFlight.Cancelled() + // w.inFlight = nil + //} + //return + //fmt.Println("done signal sent") // Cancel in-flight batches so they may be retried for { select { case batch := <-w.qu: + //fmt.Println("Canceling in-flight batch before closing...") batch.Cancelled() default: + fmt.Println("no inflight batches") return } } @@ -103,9 +116,12 @@ func (w *clientWorker) run() { continue } w.observer.outBatchSend(len(batch.events)) + + w.worker.inFlight = batch if err := w.client.Publish(batch); err != nil { return } + w.worker.inFlight = nil } } @@ -159,11 +175,14 @@ func (w *netClientWorker) run() { continue } + w.worker.inFlight = batch + fmt.Printf("about to publish %v events\n", len(batch.Events())) if err := w.client.Publish(batch); err != nil { w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop connected = false } + w.worker.inFlight = nil } } } From 9ce4b54240a8a2228997103419caa19410232a2e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 08:43:14 -0700 Subject: [PATCH 12/34] Update test --- libbeat/publisher/pipeline/controller_test.go | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 77e9e9ebb8f..8576fcc0026 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -19,6 +19,7 @@ package pipeline import ( "fmt" + "sync" "testing" "testing/quick" "time" @@ -37,7 +38,7 @@ import ( func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - //"client": newMockClient, + "client": newMockClient, "network_client": newMockNetworkClient, } @@ -49,10 +50,8 @@ func TestOutputReload(t *testing.T) { defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - //numEventsToPublish := 500 + (q % 1000) // 500 to 1499 - //numOutputReloads := 5 + (q % 10) // 5 to 14 - numEventsToPublish := uint(20000) - numOutputReloads := uint(15) + numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 + numOutputReloads := 350 + (q % 150) // 350 to 499 queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue( @@ -66,7 +65,7 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) - fmt.Printf("published so far: %v\n", publishedCount.Load()) + lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) return nil } @@ -84,10 +83,13 @@ func TestOutputReload(t *testing.T) { require.NoError(t, err) defer pipelineClient.Close() + var wg sync.WaitGroup + wg.Add(1) go func() { for i := uint(0); i < numEventsToPublish; i++ { pipelineClient.Publish(beat.Event{}) } + wg.Done() }() for i := uint(0); i < numOutputReloads; i++ { @@ -95,11 +97,13 @@ func TestOutputReload(t *testing.T) { out := outputs.Group{ Clients: []outputs.Client{outputClient}, } - fmt.Println("reloading output...") + lf("in test: reloading output...") pipeline.output.Set(out) } - timeout := 5 * time.Second + wg.Wait() + + timeout := 20 * time.Second success := waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) @@ -110,7 +114,7 @@ func TestOutputReload(t *testing.T) { ) } return success - }, &quick.Config{MaxCount: 1}) + }, &quick.Config{MaxCount: 25}) if err != nil { t.Error(err) @@ -118,3 +122,8 @@ func TestOutputReload(t *testing.T) { }) } } + +func lf(msg string, v ...interface{}) { + now := time.Now().Format("15:04:05.00000") + fmt.Printf(now+" "+msg+"\n", v...) +} From 014fd25a73add4d7a0a4f916c58b2c5df3ec2d78 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 08:47:37 -0700 Subject: [PATCH 13/34] Try to get test passing for client first --- libbeat/publisher/pipeline/controller_test.go | 13 ++++--------- libbeat/publisher/pipeline/output.go | 10 ++++++++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 8576fcc0026..2df83f40b9a 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -38,8 +38,8 @@ import ( func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - "client": newMockClient, - "network_client": newMockNetworkClient, + "client": newMockClient, + //"network_client": newMockNetworkClient, } for name, ctor := range tests { @@ -65,7 +65,7 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) - lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) + //lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) return nil } @@ -97,7 +97,7 @@ func TestOutputReload(t *testing.T) { out := outputs.Group{ Clients: []outputs.Client{outputClient}, } - lf("in test: reloading output...") + //lf("in test: reloading output...") pipeline.output.Set(out) } @@ -122,8 +122,3 @@ func TestOutputReload(t *testing.T) { }) } } - -func lf(msg string, v ...interface{}) { - now := time.Now().Format("15:04:05.00000") - fmt.Printf(now+" "+msg+"\n", v...) -} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 7a451891306..916ba895a74 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -19,11 +19,17 @@ package pipeline import ( "fmt" + "time" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) +func lf(msg string, v ...interface{}) { + now := time.Now().Format("15:04:05.00000") + fmt.Printf(now+" "+msg+"\n", v...) +} + type worker struct { observer outputObserver qu workQueue @@ -88,10 +94,10 @@ func (w *worker) close() { for { select { case batch := <-w.qu: - //fmt.Println("Canceling in-flight batch before closing...") + //lf("Canceling in-flight batch before closing...") batch.Cancelled() default: - fmt.Println("no inflight batches") + //lf("no inflight batches") return } } From 5c5e48a8bf31e2e63d6c541d90268a8dc44ce5cf Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 15:05:31 -0700 Subject: [PATCH 14/34] Make workqueue shared --- libbeat/publisher/pipeline/consumer.go | 23 ++++ libbeat/publisher/pipeline/controller.go | 54 ++++---- libbeat/publisher/pipeline/controller_test.go | 22 ++-- libbeat/publisher/pipeline/output.go | 59 +++++---- libbeat/publisher/pipeline/output_test.go | 91 -------------- libbeat/publisher/pipeline/retry.go | 1 + libbeat/publisher/pipeline/testing.go | 117 ++++++++++++++++++ libbeat/publisher/queue/memqueue/consume.go | 8 ++ 8 files changed, 223 insertions(+), 152 deletions(-) create mode 100644 libbeat/publisher/pipeline/testing.go diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 4dd211052c2..d8778ca32a5 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -147,6 +147,9 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { case sigConsumerCheck: case sigConsumerUpdateOutput: + //if out == nil && batch != nil { + // lf("handling sigConsumerUpdateOutput") + //} c.out = sig.out case sigConsumerUpdateInput: @@ -164,20 +167,27 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { for { if !paused && c.out != nil && consumer != nil && batch == nil { out = c.out.workQueue + //lf("consuming from queue...") queueBatch, err := consumer.Get(c.out.batchSize) if err != nil { + lf("error consuming from queue") out = nil consumer = nil continue } if queueBatch != nil { + lf("consumed batch of %v events from queue", len(queueBatch.Events())) batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) } paused = c.paused() if paused || batch == nil { + lf("paused: %v, batch == nil? = %v; setting out to nil", paused, batch == nil) out = nil } + //} else { + // lf("paused = %v, c.out == nil = %v, consumer == nil = %v, batch == nil? = %v", + // paused, c.out == nil, consumer == nil, batch == nil) } select { @@ -187,13 +197,26 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { default: } + if out == nil && batch != nil { + lf("out == nil and batch != nil") + } + select { case <-c.done: + lf("consumer done") log.Debug("stop pipeline event consumer") return case sig := <-c.sig: + if out == nil && batch != nil { + lf("in second select; handled sig %v", sig.tag) + } handleSignal(sig) case out <- batch: + numEvents := 0 + if batch != nil { + numEvents = len(batch.Events()) + } + lf("in consumer: sent batch of %v events to workqueue", numEvents) batch = nil } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 05bd65338a9..3ce0cabdec7 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -34,7 +34,8 @@ type outputController struct { monitors Monitors observer outputObserver - queue queue.Queue + queue queue.Queue + workQueue workQueue retryer *retryer consumer *eventConsumer @@ -65,15 +66,16 @@ func newOutputController( b queue.Queue, ) *outputController { c := &outputController{ - beat: beat, - monitors: monitors, - observer: observer, - queue: b, + beat: beat, + monitors: monitors, + observer: observer, + queue: b, + workQueue: makeWorkQueue(), } ctx := &batchContext{} c.consumer = newEventConsumer(monitors.Logger, b, ctx) - c.retryer = newRetryer(monitors.Logger, observer, nil, c.consumer) + c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer) ctx.observer = observer ctx.retryer = c.retryer @@ -86,56 +88,60 @@ func (c *outputController) Close() error { c.consumer.sigPause() c.consumer.close() c.retryer.close() + close(c.workQueue) if c.out != nil { for _, out := range c.out.outputs { out.Close() } - close(c.out.workQueue) } return nil } func (c *outputController) Set(outGrp outputs.Group) { + //lf("Set() called") + c.consumer.sigPause() + + // close old group, so events are send to new workQueue via retryer + if c.out != nil { + for _, w := range c.out.outputs { + w.Close() + c.retryer.sigOutputRemoved() + } + } + // create new outputGroup with shared work queue clients := outGrp.Clients - queue := makeWorkQueue() worker := make([]outputWorker, len(clients)) for i, client := range clients { - worker[i] = makeClientWorker(c.observer, queue, client) + worker[i] = makeClientWorker(c.observer, c.workQueue, client) } grp := &outputGroup{ - workQueue: queue, + workQueue: c.workQueue, outputs: worker, timeToLive: outGrp.Retry + 1, batchSize: outGrp.BatchSize, } // update consumer and retryer - c.consumer.sigPause() - if c.out != nil { - for range c.out.outputs { - c.retryer.sigOutputRemoved() - } - } - c.retryer.updOutput(queue) + //c.consumer.sigPause() + //if c.out != nil { + // for range c.out.outputs { + // c.retryer.sigOutputRemoved() + // } + //} + //c.retryer.updOutput(queue) for range clients { c.retryer.sigOutputAdded() } c.consumer.updOutput(grp) - // close old group, so events are send to new workQueue via retryer - if c.out != nil { - for _, w := range c.out.outputs { - w.Close() - } - } - c.out = grp // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() + c.consumer.sigUnWait() c.observer.updateOutputGroup() } diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 2df83f40b9a..d10a26ba912 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "fmt" "sync" "testing" "testing/quick" @@ -38,8 +37,8 @@ import ( func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - "client": newMockClient, - //"network_client": newMockNetworkClient, + //"client": newMockClient, + "network_client": newMockNetworkClient, } for name, ctor := range tests { @@ -50,8 +49,11 @@ func TestOutputReload(t *testing.T) { defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 - numOutputReloads := 350 + (q % 150) // 350 to 499 + lf("*** Starting new test ***") + //numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 + //numOutputReloads := 350 + (q % 150) // 350 to 499 + numEventsToPublish := uint(19999) + numOutputReloads := uint(499) queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue( @@ -65,7 +67,7 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) - //lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) + lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) return nil } @@ -103,18 +105,18 @@ func TestOutputReload(t *testing.T) { wg.Wait() - timeout := 20 * time.Second + timeout := 5 * time.Second success := waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) if !success { - fmt.Printf( - "numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v\n", + lf( + "in test: result: numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v", numOutputReloads, numEventsToPublish, publishedCount.Load(), ) } return success - }, &quick.Config{MaxCount: 25}) + }, &quick.Config{MaxCount: 250}) if err != nil { t.Error(err) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 916ba895a74..8f2b5baa371 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -19,22 +19,32 @@ package pipeline import ( "fmt" + "strconv" "time" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) +var _workerID atomic.Uint + func lf(msg string, v ...interface{}) { now := time.Now().Format("15:04:05.00000") fmt.Printf(now+" "+msg+"\n", v...) } +func (w *worker) lf(msg string, v ...interface{}) { + lf("[worker "+strconv.Itoa(int(w.id))+"] "+msg, v...) +} + type worker struct { + id uint observer outputObserver qu workQueue done chan struct{} - inFlight *Batch + inFlight chan struct{} } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -58,6 +68,7 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie observer: observer, qu: qu, done: make(chan struct{}), + id: _workerID.Inc(), } var c interface { @@ -75,32 +86,20 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie c = &clientWorker{worker: w, client: client} } + //w.lf("starting...") go c.run() return c } func (w *worker) close() { - //fmt.Println("worker close called") close(w.done) - //if w.inFlight != nil { - // fmt.Println("Canceling in-flight batch before closing...") - // w.inFlight.Cancelled() - // w.inFlight = nil - //} - //return - //fmt.Println("done signal sent") - - // Cancel in-flight batches so they may be retried - for { - select { - case batch := <-w.qu: - //lf("Canceling in-flight batch before closing...") - batch.Cancelled() - default: - //lf("no inflight batches") - return - } + //lf("w.inFlight == nil: %#v", w.inFlight == nil) + if w.inFlight != nil { + //lf("waiting for inflight events to publish") + <-w.inFlight + //lf("inflight events published") } + //w.lf("closed") } func (w *clientWorker) Close() error { @@ -123,12 +122,12 @@ func (w *clientWorker) run() { } w.observer.outBatchSend(len(batch.events)) - w.worker.inFlight = batch + w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { + close(w.inFlight) return } - w.worker.inFlight = nil - + close(w.inFlight) } } } @@ -150,9 +149,13 @@ func (w *netClientWorker) run() { select { case <-w.done: + //lf("got done signal") return - case batch := <-w.qu: + case batch, ok := <-w.qu: + if !ok { + w.lf("workqueue closed") + } if batch == nil { continue } @@ -160,6 +163,7 @@ func (w *netClientWorker) run() { // Try to (re)connect so we can publish batch if !connected { // Return batch to other output workers while we try to (re)connect + //w.lf("canceling batch of %v events", len(batch.Events())) batch.Cancelled() if reconnectAttempts == 0 { @@ -181,14 +185,15 @@ func (w *netClientWorker) run() { continue } - w.worker.inFlight = batch - fmt.Printf("about to publish %v events\n", len(batch.Events())) + w.lf("about to publish %v events", len(batch.Events())) + w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { + close(w.inFlight) w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop connected = false } - w.worker.inFlight = nil + close(w.inFlight) } } } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index d89c166ee15..b04411ae926 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -20,7 +20,6 @@ package pipeline import ( "flag" "math" - "math/rand" "sync" "testing" "testing/quick" @@ -29,10 +28,8 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" ) var ( @@ -171,91 +168,3 @@ func TestMakeClientWorkerAndClose(t *testing.T) { }) } } - -type mockPublishFn func(publisher.Batch) error - -func newMockClient(publishFn mockPublishFn) outputs.Client { - return &mockClient{publishFn: publishFn} -} - -type mockClient struct { - publishFn mockPublishFn -} - -func (c *mockClient) String() string { return "mock_client" } -func (c *mockClient) Close() error { return nil } -func (c *mockClient) Publish(batch publisher.Batch) error { - return c.publishFn(batch) -} - -func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { - return &mockNetworkClient{newMockClient(publishFn)} -} - -type mockNetworkClient struct { - outputs.Client -} - -func (c *mockNetworkClient) Connect() error { return nil } - -type mockQueue struct{} - -func (q mockQueue) Close() error { return nil } -func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } -func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } -func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } - -type mockProducer struct{} - -func (p mockProducer) Publish(event publisher.Event) bool { return true } -func (p mockProducer) TryPublish(event publisher.Event) bool { return true } -func (p mockProducer) Cancel() int { return 0 } - -type mockConsumer struct{} - -func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } -func (c mockConsumer) Close() error { return nil } - -func randomBatch(min, max int, wqu workQueue) *Batch { - numEvents := randIntBetween(min, max) - events := make([]publisher.Event, numEvents) - - consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) - retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) - - batch := Batch{ - events: events, - ctx: &batchContext{ - observer: nilObserver, - retryer: retryer, - }, - } - - return &batch -} - -// randIntBetween returns a random integer in [min, max) -func randIntBetween(min, max int) int { - return rand.Intn(max-min) + min -} - -func seedPRNG(t *testing.T) { - seed := *SeedFlag - if seed == 0 { - seed = time.Now().UnixNano() - } - - t.Logf("reproduce test with `go test ... -seed %v`", seed) - rand.Seed(seed) -} - -func waitUntilTrue(duration time.Duration, fn func() bool) bool { - end := time.Now().Add(duration) - for time.Now().Before(end) { - if fn() { - return true - } - time.Sleep(1 * time.Millisecond) - } - return false -} diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index a65a7d227c8..93dd8bbf5d0 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -49,6 +49,7 @@ type retryQueue chan batchEvent type retryerSignal struct { tag retryerEventTag channel workQueue + done chan struct{} } type batchEvent struct { diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go new file mode 100644 index 00000000000..72c0bd21111 --- /dev/null +++ b/libbeat/publisher/pipeline/testing.go @@ -0,0 +1,117 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "math/rand" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +type mockPublishFn func(publisher.Batch) error + +func newMockClient(publishFn mockPublishFn) outputs.Client { + return &mockClient{publishFn: publishFn} +} + +type mockClient struct { + publishFn mockPublishFn +} + +func (c *mockClient) String() string { return "mock_client" } +func (c *mockClient) Close() error { return nil } +func (c *mockClient) Publish(batch publisher.Batch) error { + return c.publishFn(batch) +} + +func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { + return &mockNetworkClient{newMockClient(publishFn)} +} + +type mockNetworkClient struct { + outputs.Client +} + +func (c *mockNetworkClient) Connect() error { return nil } + +type mockQueue struct{} + +func (q mockQueue) Close() error { return nil } +func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } +func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } +func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } + +type mockProducer struct{} + +func (p mockProducer) Publish(event publisher.Event) bool { return true } +func (p mockProducer) TryPublish(event publisher.Event) bool { return true } +func (p mockProducer) Cancel() int { return 0 } + +type mockConsumer struct{} + +func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } +func (c mockConsumer) Close() error { return nil } + +func randomBatch(min, max int, wqu workQueue) *Batch { + numEvents := randIntBetween(min, max) + events := make([]publisher.Event, numEvents) + + consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) + retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) + + batch := Batch{ + events: events, + ctx: &batchContext{ + observer: nilObserver, + retryer: retryer, + }, + } + + return &batch +} + +// randIntBetween returns a random integer in [min, max) +func randIntBetween(min, max int) int { + return rand.Intn(max-min) + min +} + +func seedPRNG(t *testing.T) { + seed := *SeedFlag + if seed == 0 { + seed = time.Now().UnixNano() + } + + t.Logf("reproduce test with `go test ... -seed %v`", seed) + rand.Seed(seed) +} + +func waitUntilTrue(duration time.Duration, fn func() bool) bool { + end := time.Now().Add(duration) + for time.Now().Before(end) { + if fn() { + return true + } + time.Sleep(10 * time.Millisecond) + } + return false +} diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index a995fbdc0ca..dfe46026aa0 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -19,13 +19,20 @@ package memqueue import ( "errors" + "fmt" "io" + "time" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) +func lf(msg string, v ...interface{}) { + now := time.Now().Format("15:04:05.00000") + fmt.Printf(now+" "+msg+"\n", v...) +} + type consumer struct { broker *broker resp chan getResponse @@ -70,6 +77,7 @@ func (c *consumer) Get(sz int) (queue.Batch, error) { // if request has been send, we do have to wait for a response resp := <-c.resp + lf("in memqueue Get(): about to return batch of %v events", len(resp.buf)) return &batch{ consumer: c, events: resp.buf, From 6227a1fc4356bce5a30db1686d334a5360917f26 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 15:46:38 -0700 Subject: [PATCH 15/34] Making tests pass --- libbeat/publisher/pipeline/consumer.go | 30 ++++++++++--------- libbeat/publisher/pipeline/controller.go | 5 ++++ libbeat/publisher/pipeline/controller_test.go | 20 ++++++------- libbeat/publisher/pipeline/output.go | 4 +-- libbeat/publisher/queue/memqueue/consume.go | 2 +- 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index d8778ca32a5..ae9f58515cc 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -129,6 +129,8 @@ func (c *eventConsumer) updOutput(grp *outputGroup) { tag: sigConsumerUpdateInput, consumer: c.consumer, } + + //lf("consumer: updated output group to id = %v", grp.id) } func (c *eventConsumer) loop(consumer queue.Consumer) { @@ -157,7 +159,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { } paused = c.paused() - if !paused && c.out != nil && batch != nil { + if c.out != nil && batch != nil { out = c.out.workQueue } else { out = nil @@ -170,19 +172,19 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { //lf("consuming from queue...") queueBatch, err := consumer.Get(c.out.batchSize) if err != nil { - lf("error consuming from queue") + //lf("error consuming from queue") out = nil consumer = nil continue } if queueBatch != nil { - lf("consumed batch of %v events from queue", len(queueBatch.Events())) + //lf("consumed batch of %v events from queue", len(queueBatch.Events())) batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) } paused = c.paused() if paused || batch == nil { - lf("paused: %v, batch == nil? = %v; setting out to nil", paused, batch == nil) + //lf("paused: %v, batch == nil? = %v; setting out to nil", paused, batch == nil) out = nil } //} else { @@ -197,26 +199,26 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { default: } - if out == nil && batch != nil { - lf("out == nil and batch != nil") - } + //if out == nil && batch != nil { + // lf("out == nil but have batch with %v events", len(batch.Events())) + //} select { case <-c.done: - lf("consumer done") + //lf("consumer done") log.Debug("stop pipeline event consumer") return case sig := <-c.sig: if out == nil && batch != nil { - lf("in second select; handled sig %v", sig.tag) + //lf("in second select; handled sig %v", sig.tag) } handleSignal(sig) case out <- batch: - numEvents := 0 - if batch != nil { - numEvents = len(batch.Events()) - } - lf("in consumer: sent batch of %v events to workqueue", numEvents) + //numEvents := 0 + //if batch != nil { + // numEvents = len(batch.Events()) + //} + //lf("in consumer: sent batch of %v events to workqueue", numEvents) batch = nil } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 3ce0cabdec7..e2a5837dc75 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -20,11 +20,14 @@ package pipeline import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) +var _grpID atomic.Uint + // outputController manages the pipelines output capabilities, like: // - start // - stop @@ -44,6 +47,7 @@ type outputController struct { // outputGroup configures a group of load balanced outputs with shared work queue. type outputGroup struct { + id uint workQueue workQueue outputs []outputWorker @@ -118,6 +122,7 @@ func (c *outputController) Set(outGrp outputs.Group) { worker[i] = makeClientWorker(c.observer, c.workQueue, client) } grp := &outputGroup{ + id: _workerID.Inc(), workQueue: c.workQueue, outputs: worker, timeToLive: outGrp.Retry + 1, diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index d10a26ba912..3ae7d2f3ca5 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -37,7 +37,7 @@ import ( func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - //"client": newMockClient, + "client": newMockClient, "network_client": newMockNetworkClient, } @@ -49,11 +49,11 @@ func TestOutputReload(t *testing.T) { defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - lf("*** Starting new test ***") - //numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 - //numOutputReloads := 350 + (q % 150) // 350 to 499 - numEventsToPublish := uint(19999) - numOutputReloads := uint(499) + //lf("*** Starting new test ***") + numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 + numOutputReloads := 350 + (q % 150) // 350 to 499 + //numEventsToPublish := uint(19999) + //numOutputReloads := uint(499) queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue( @@ -67,7 +67,7 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) - lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) + //lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) return nil } @@ -105,18 +105,18 @@ func TestOutputReload(t *testing.T) { wg.Wait() - timeout := 5 * time.Second + timeout := 20 * time.Second success := waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) if !success { lf( - "in test: result: numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v", + "*** test result: numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v ***", numOutputReloads, numEventsToPublish, publishedCount.Load(), ) } return success - }, &quick.Config{MaxCount: 250}) + }, &quick.Config{MaxCount: 25}) if err != nil { t.Error(err) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 8f2b5baa371..3249f09ce0c 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -154,7 +154,7 @@ func (w *netClientWorker) run() { case batch, ok := <-w.qu: if !ok { - w.lf("workqueue closed") + //w.lf("workqueue closed") } if batch == nil { continue @@ -185,7 +185,7 @@ func (w *netClientWorker) run() { continue } - w.lf("about to publish %v events", len(batch.Events())) + //w.lf("about to publish %v events", len(batch.Events())) w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { close(w.inFlight) diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index dfe46026aa0..8987db0e12b 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -77,7 +77,7 @@ func (c *consumer) Get(sz int) (queue.Batch, error) { // if request has been send, we do have to wait for a response resp := <-c.resp - lf("in memqueue Get(): about to return batch of %v events", len(resp.buf)) + //lf("in memqueue Get(): about to return batch of %v events", len(resp.buf)) return &batch{ consumer: c, events: resp.buf, From ffd0822dc40fdbfe643b15b599aa3bce07c452b9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 15:54:59 -0700 Subject: [PATCH 16/34] Clean up --- libbeat/publisher/pipeline/consumer.go | 25 --------------- libbeat/publisher/pipeline/controller.go | 16 ++-------- libbeat/publisher/pipeline/controller_test.go | 14 +-------- libbeat/publisher/pipeline/output.go | 31 +------------------ libbeat/publisher/queue/memqueue/consume.go | 8 ----- 5 files changed, 4 insertions(+), 90 deletions(-) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index ae9f58515cc..bf26be2fbf3 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -129,8 +129,6 @@ func (c *eventConsumer) updOutput(grp *outputGroup) { tag: sigConsumerUpdateInput, consumer: c.consumer, } - - //lf("consumer: updated output group to id = %v", grp.id) } func (c *eventConsumer) loop(consumer queue.Consumer) { @@ -149,9 +147,6 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { case sigConsumerCheck: case sigConsumerUpdateOutput: - //if out == nil && batch != nil { - // lf("handling sigConsumerUpdateOutput") - //} c.out = sig.out case sigConsumerUpdateInput: @@ -169,27 +164,20 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { for { if !paused && c.out != nil && consumer != nil && batch == nil { out = c.out.workQueue - //lf("consuming from queue...") queueBatch, err := consumer.Get(c.out.batchSize) if err != nil { - //lf("error consuming from queue") out = nil consumer = nil continue } if queueBatch != nil { - //lf("consumed batch of %v events from queue", len(queueBatch.Events())) batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) } paused = c.paused() if paused || batch == nil { - //lf("paused: %v, batch == nil? = %v; setting out to nil", paused, batch == nil) out = nil } - //} else { - // lf("paused = %v, c.out == nil = %v, consumer == nil = %v, batch == nil? = %v", - // paused, c.out == nil, consumer == nil, batch == nil) } select { @@ -199,26 +187,13 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { default: } - //if out == nil && batch != nil { - // lf("out == nil but have batch with %v events", len(batch.Events())) - //} - select { case <-c.done: - //lf("consumer done") log.Debug("stop pipeline event consumer") return case sig := <-c.sig: - if out == nil && batch != nil { - //lf("in second select; handled sig %v", sig.tag) - } handleSignal(sig) case out <- batch: - //numEvents := 0 - //if batch != nil { - // numEvents = len(batch.Events()) - //} - //lf("in consumer: sent batch of %v events to workqueue", numEvents) batch = nil } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index e2a5837dc75..072f7a48d5a 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -20,14 +20,11 @@ package pipeline import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -var _grpID atomic.Uint - // outputController manages the pipelines output capabilities, like: // - start // - stop @@ -47,7 +44,6 @@ type outputController struct { // outputGroup configures a group of load balanced outputs with shared work queue. type outputGroup struct { - id uint workQueue workQueue outputs []outputWorker @@ -104,7 +100,7 @@ func (c *outputController) Close() error { } func (c *outputController) Set(outGrp outputs.Group) { - //lf("Set() called") + // Pause consumer c.consumer.sigPause() // close old group, so events are send to new workQueue via retryer @@ -122,7 +118,6 @@ func (c *outputController) Set(outGrp outputs.Group) { worker[i] = makeClientWorker(c.observer, c.workQueue, client) } grp := &outputGroup{ - id: _workerID.Inc(), workQueue: c.workQueue, outputs: worker, timeToLive: outGrp.Retry + 1, @@ -130,13 +125,6 @@ func (c *outputController) Set(outGrp outputs.Group) { } // update consumer and retryer - //c.consumer.sigPause() - //if c.out != nil { - // for range c.out.outputs { - // c.retryer.sigOutputRemoved() - // } - //} - //c.retryer.updOutput(queue) for range clients { c.retryer.sigOutputAdded() } @@ -144,7 +132,7 @@ func (c *outputController) Set(outGrp outputs.Group) { c.out = grp - // restart consumer (potentially blocked by retryer) + // restart consumer (potentially blocked by retryer as well) c.consumer.sigContinue() c.consumer.sigUnWait() diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 3ae7d2f3ca5..32bdc54109a 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -49,11 +49,8 @@ func TestOutputReload(t *testing.T) { defer goroutines.Check(t) err := quick.Check(func(q uint) bool { - //lf("*** Starting new test ***") numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 numOutputReloads := 350 + (q % 150) // 350 to 499 - //numEventsToPublish := uint(19999) - //numOutputReloads := uint(499) queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { return memqueue.NewQueue( @@ -67,7 +64,6 @@ func TestOutputReload(t *testing.T) { var publishedCount atomic.Uint countingPublishFn := func(batch publisher.Batch) error { publishedCount.Add(uint(len(batch.Events()))) - //lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load()) return nil } @@ -99,23 +95,15 @@ func TestOutputReload(t *testing.T) { out := outputs.Group{ Clients: []outputs.Client{outputClient}, } - //lf("in test: reloading output...") pipeline.output.Set(out) } wg.Wait() timeout := 20 * time.Second - success := waitUntilTrue(timeout, func() bool { + return waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - if !success { - lf( - "*** test result: numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v ***", - numOutputReloads, numEventsToPublish, publishedCount.Load(), - ) - } - return success }, &quick.Config{MaxCount: 25}) if err != nil { diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 3249f09ce0c..59ef83456ae 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,27 +18,10 @@ package pipeline import ( - "fmt" - "strconv" - "time" - - "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) -var _workerID atomic.Uint - -func lf(msg string, v ...interface{}) { - now := time.Now().Format("15:04:05.00000") - fmt.Printf(now+" "+msg+"\n", v...) -} - -func (w *worker) lf(msg string, v ...interface{}) { - lf("[worker "+strconv.Itoa(int(w.id))+"] "+msg, v...) -} - type worker struct { id uint observer outputObserver @@ -68,7 +51,6 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie observer: observer, qu: qu, done: make(chan struct{}), - id: _workerID.Inc(), } var c interface { @@ -86,20 +68,15 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie c = &clientWorker{worker: w, client: client} } - //w.lf("starting...") go c.run() return c } func (w *worker) close() { close(w.done) - //lf("w.inFlight == nil: %#v", w.inFlight == nil) if w.inFlight != nil { - //lf("waiting for inflight events to publish") <-w.inFlight - //lf("inflight events published") } - //w.lf("closed") } func (w *clientWorker) Close() error { @@ -149,13 +126,9 @@ func (w *netClientWorker) run() { select { case <-w.done: - //lf("got done signal") return - case batch, ok := <-w.qu: - if !ok { - //w.lf("workqueue closed") - } + case batch := <-w.qu: if batch == nil { continue } @@ -163,7 +136,6 @@ func (w *netClientWorker) run() { // Try to (re)connect so we can publish batch if !connected { // Return batch to other output workers while we try to (re)connect - //w.lf("canceling batch of %v events", len(batch.Events())) batch.Cancelled() if reconnectAttempts == 0 { @@ -185,7 +157,6 @@ func (w *netClientWorker) run() { continue } - //w.lf("about to publish %v events", len(batch.Events())) w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { close(w.inFlight) diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index 8987db0e12b..a995fbdc0ca 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -19,20 +19,13 @@ package memqueue import ( "errors" - "fmt" "io" - "time" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -func lf(msg string, v ...interface{}) { - now := time.Now().Format("15:04:05.00000") - fmt.Printf(now+" "+msg+"\n", v...) -} - type consumer struct { broker *broker resp chan getResponse @@ -77,7 +70,6 @@ func (c *consumer) Get(sz int) (queue.Batch, error) { // if request has been send, we do have to wait for a response resp := <-c.resp - //lf("in memqueue Get(): about to return batch of %v events", len(resp.buf)) return &batch{ consumer: c, events: resp.buf, From b97becc2c6fb23549094be1e33a28051ce49a439 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 16:02:36 -0700 Subject: [PATCH 17/34] Moving SeedFlag var to correct place --- libbeat/publisher/pipeline/output_test.go | 5 ----- libbeat/publisher/pipeline/testing.go | 5 +++++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index b04411ae926..16f33b09fd4 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "flag" "math" "sync" "testing" @@ -32,10 +31,6 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher" ) -var ( - SeedFlag = flag.Int64("seed", 0, "Randomization seed") -) - func TestMakeClientWorker(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ "client": newMockClient, diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 72c0bd21111..13a82816fb3 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -18,6 +18,7 @@ package pipeline import ( + "flag" "math/rand" "testing" "time" @@ -28,6 +29,10 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" ) +var ( + SeedFlag = flag.Int64("seed", 0, "Randomization seed") +) + type mockPublishFn func(publisher.Batch) error func newMockClient(publishFn mockPublishFn) outputs.Client { From 3aeebdaaef1496891be173dedfda92b3b36ed614 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 16:14:36 -0700 Subject: [PATCH 18/34] Clarifying comments --- libbeat/publisher/pipeline/controller.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 072f7a48d5a..4dbc3a72969 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -103,7 +103,8 @@ func (c *outputController) Set(outGrp outputs.Group) { // Pause consumer c.consumer.sigPause() - // close old group, so events are send to new workQueue via retryer + // close old output group's workers and "remove" them from the retryer + // so it temporarily stops processing the retry queue if c.out != nil { for _, w := range c.out.outputs { w.Close() @@ -111,7 +112,7 @@ func (c *outputController) Set(outGrp outputs.Group) { } } - // create new outputGroup with shared work queue + // create new output group with the shared work queue clients := outGrp.Clients worker := make([]outputWorker, len(clients)) for i, client := range clients { From 12083323945b36c8b19e35fff9edbbc2814877e9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 16:57:56 -0700 Subject: [PATCH 19/34] Reducing the number of quick iterations --- libbeat/publisher/pipeline/controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 32bdc54109a..ee4221182c4 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -104,7 +104,7 @@ func TestOutputReload(t *testing.T) { return waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - }, &quick.Config{MaxCount: 25}) + }, &quick.Config{MaxCount: 15}) if err != nil { t.Error(err) From 20393442e007cde0643b5bfcfc21d90bd741997f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 17:37:52 -0700 Subject: [PATCH 20/34] Reducing quick iterations even more --- libbeat/publisher/pipeline/controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index ee4221182c4..9dd9bd28445 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -104,7 +104,7 @@ func TestOutputReload(t *testing.T) { return waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - }, &quick.Config{MaxCount: 15}) + }, &quick.Config{MaxCount: 10}) if err != nil { t.Error(err) From 8a03547d2a369e94956cfb84c862ca671f171129 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 18:39:54 -0700 Subject: [PATCH 21/34] Trying just 1 iteration --- libbeat/publisher/pipeline/controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 9dd9bd28445..09dd7b67038 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -104,7 +104,7 @@ func TestOutputReload(t *testing.T) { return waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - }, &quick.Config{MaxCount: 10}) + }, &quick.Config{MaxCount: 1}) if err != nil { t.Error(err) From e31e53acdb34a3145eb0ac1a364b26ced0cc9d9a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 19:24:19 -0700 Subject: [PATCH 22/34] Setting out to nil after sending batch if paused --- libbeat/publisher/pipeline/consumer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index bf26be2fbf3..405f1b3d8d5 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -195,6 +195,9 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { handleSignal(sig) case out <- batch: batch = nil + if paused { + out = nil + } } } } From dbf275aa9bc0067f5ba3674b9614fc2e2c00bf1d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 16 Apr 2020 22:10:33 -0700 Subject: [PATCH 23/34] Restoring old order of operations in Set() --- libbeat/publisher/pipeline/controller.go | 28 +++++++++++++----------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 4dbc3a72969..2c6857c858a 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -100,18 +100,6 @@ func (c *outputController) Close() error { } func (c *outputController) Set(outGrp outputs.Group) { - // Pause consumer - c.consumer.sigPause() - - // close old output group's workers and "remove" them from the retryer - // so it temporarily stops processing the retry queue - if c.out != nil { - for _, w := range c.out.outputs { - w.Close() - c.retryer.sigOutputRemoved() - } - } - // create new output group with the shared work queue clients := outGrp.Clients worker := make([]outputWorker, len(clients)) @@ -126,14 +114,28 @@ func (c *outputController) Set(outGrp outputs.Group) { } // update consumer and retryer + c.consumer.sigPause() + if c.out != nil { + for range c.out.outputs { + c.retryer.sigOutputRemoved() + } + } + c.retryer.updOutput(c.workQueue) for range clients { c.retryer.sigOutputAdded() } c.consumer.updOutput(grp) + // close old group, so events are send to new workQueue via retryer + if c.out != nil { + for _, w := range c.out.outputs { + w.Close() + } + } + c.out = grp - // restart consumer (potentially blocked by retryer as well) + // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() c.consumer.sigUnWait() From a36cbb1f0647ac1f7b09a71e28f1afae9b04d328 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 17 Apr 2020 17:02:25 +0200 Subject: [PATCH 24/34] proposal --- libbeat/publisher/pipeline/batch.go | 70 ++++++++++++++----- libbeat/publisher/pipeline/consumer.go | 2 +- libbeat/publisher/pipeline/controller.go | 5 +- libbeat/publisher/pipeline/output.go | 12 +--- libbeat/publisher/pipeline/output_test.go | 83 +++++++++++++++++++---- libbeat/publisher/pipeline/retry.go | 62 +++++++---------- libbeat/publisher/pipeline/testing.go | 75 +++++++++++++++++++- 7 files changed, 228 insertions(+), 81 deletions(-) diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/batch.go index 5a8903c5814..54ba2058d74 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/batch.go @@ -24,7 +24,13 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -type Batch struct { +type Batch interface { + publisher.Batch + + reduceTTL() bool +} + +type batch struct { original queue.Batch ctx *batchContext ttl int @@ -38,17 +44,17 @@ type batchContext struct { var batchPool = sync.Pool{ New: func() interface{} { - return &Batch{} + return &batch{} }, } -func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch { +func newBatch(ctx *batchContext, original queue.Batch, ttl int) *batch { if original == nil { panic("empty batch") } - b := batchPool.Get().(*Batch) - *b = Batch{ + b := batchPool.Get().(*batch) + *b = batch{ original: original, ctx: ctx, ttl: ttl, @@ -57,45 +63,47 @@ func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch { return b } -func releaseBatch(b *Batch) { - *b = Batch{} // clear batch +func releaseBatch(b *batch) { + *b = batch{} // clear batch batchPool.Put(b) } -func (b *Batch) Events() []publisher.Event { +func (b *batch) Events() []publisher.Event { return b.events } -func (b *Batch) ACK() { - b.ctx.observer.outBatchACKed(len(b.events)) +func (b *batch) ACK() { + if b.ctx != nil { + b.ctx.observer.outBatchACKed(len(b.events)) + } b.original.ACK() releaseBatch(b) } -func (b *Batch) Drop() { +func (b *batch) Drop() { b.original.ACK() releaseBatch(b) } -func (b *Batch) Retry() { +func (b *batch) Retry() { b.ctx.retryer.retry(b) } -func (b *Batch) Cancelled() { +func (b *batch) Cancelled() { b.ctx.retryer.cancelled(b) } -func (b *Batch) RetryEvents(events []publisher.Event) { +func (b *batch) RetryEvents(events []publisher.Event) { b.updEvents(events) b.Retry() } -func (b *Batch) CancelledEvents(events []publisher.Event) { +func (b *batch) CancelledEvents(events []publisher.Event) { b.updEvents(events) b.Cancelled() } -func (b *Batch) updEvents(events []publisher.Event) { +func (b *batch) updEvents(events []publisher.Event) { l1 := len(b.events) l2 := len(events) if l1 > l2 { @@ -105,3 +113,33 @@ func (b *Batch) updEvents(events []publisher.Event) { b.events = events } + +// reduceTTL reduces the time to live for all events that have no 'guaranteed' +// sending requirements. reduceTTL returns true if the batch is still alive. +func (b *batch) reduceTTL() bool { + if b.ttl <= 0 { + return true + } + + b.ttl-- + if b.ttl > 0 { + return true + } + + // filter for evens with guaranteed send flags + events := b.events[:0] + for _, event := range b.events { + if event.Guaranteed() { + events = append(events, event) + } + } + b.events = events + + if len(b.events) > 0 { + b.ttl = -1 // we need infinite retry for all events left in this batch + return true + } + + // all events have been dropped: + return false +} diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 405f1b3d8d5..a5c4a97e25a 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -138,7 +138,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { var ( out workQueue - batch *Batch + batch Batch paused = true ) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 2c6857c858a..1991e243802 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -51,7 +52,7 @@ type outputGroup struct { timeToLive int // event lifetime } -type workQueue chan *Batch +type workQueue chan publisher.Batch // outputWorker instances pass events from the shared workQueue to the outputs.Client // instances. @@ -143,7 +144,7 @@ func (c *outputController) Set(outGrp outputs.Group) { } func makeWorkQueue() workQueue { - return workQueue(make(chan *Batch, 0)) + return workQueue(make(chan publisher.Batch, 0)) } // Reload the output diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 59ef83456ae..fa2ce73a28c 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -27,7 +27,6 @@ type worker struct { observer outputObserver qu workQueue done chan struct{} - inFlight chan struct{} } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -74,9 +73,6 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie func (w *worker) close() { close(w.done) - if w.inFlight != nil { - <-w.inFlight - } } func (w *clientWorker) Close() error { @@ -97,14 +93,11 @@ func (w *clientWorker) run() { if batch == nil { continue } - w.observer.outBatchSend(len(batch.events)) + w.observer.outBatchSend(len(batch.Events())) - w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { - close(w.inFlight) return } - close(w.inFlight) } } } @@ -157,14 +150,11 @@ func (w *netClientWorker) run() { continue } - w.inFlight = make(chan struct{}) if err := w.client.Publish(batch); err != nil { - close(w.inFlight) w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop connected = false } - close(w.inFlight) } } } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 16f33b09fd4..4f332630dee 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,6 +18,7 @@ package pipeline import ( + "fmt" "math" "sync" "testing" @@ -27,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -43,22 +45,38 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 + numEvents := atomic.MakeUint(0) + + wqu := makeWorkQueue() + retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + defer retryer.close() var published atomic.Uint + var batchCount int publishFn := func(batch publisher.Batch) error { + batchCount++ published.Add(uint(len(batch.Events()))) + fmt.Printf("received batch %p of size: %d -> events: %d/%d, batches: %d/%d\n", + batch, len(batch.Events()), + published.Load(), numEvents.Load(), + batchCount, numBatches, + ) return nil } - wqu := makeWorkQueue() client := ctor(publishFn) - makeClientWorker(nilObserver, wqu, client) - numEvents := atomic.MakeUint(0) - for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { - batch := randomBatch(50, 150, wqu) + worker := makeClientWorker(nilObserver, wqu, client) + defer func() { + fmt.Println("shut down client worker") + worker.Close() + }() + + for i := uint(0); i < numBatches; i++ { + batch := randomBatch(50, 150).withRetryer(retryer) numEvents.Add(uint(len(batch.Events()))) wqu <- batch + fmt.Printf("send batch %p of size: %d\n", batch, len(batch.Events())) } // Give some time for events to be published @@ -77,48 +95,78 @@ func TestMakeClientWorker(t *testing.T) { } } -func TestMakeClientWorkerAndClose(t *testing.T) { +func TestReplaceClientWorker(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } const minEventsInBatch = 50 + const maxEventsInBatch = 150 for name, ctor := range tests { t.Run(name, func(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 1000 + (i % 100) // between 1000 and 1099 + numBatches := 10000 + (i % 100) // between 1000 and 1099 + + fmt.Printf("Starting test with numBatch: %d\n", numBatches) + defer fmt.Printf("Finished test with numBatch: %d\n", numBatches) wqu := makeWorkQueue() - numEvents := atomic.MakeUint(0) + retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + defer retryer.close() + + var batches []publisher.Batch + var numEvents int + for i := uint(0); i < numBatches; i++ { + batch := randomBatch(minEventsInBatch, maxEventsInBatch).withRetryer(retryer) + numEvents += batch.Len() + batches = append(batches, batch) + } var wg sync.WaitGroup wg.Add(1) go func() { + fmt.Println("start forwarding batches") + defer fmt.Println("stopped forwarding batches") + defer wg.Done() - for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { - batch := randomBatch(minEventsInBatch, 150, wqu) - numEvents.Add(uint(len(batch.Events()))) + for _, batch := range batches { wqu <- batch + // fmt.Printf("send batch %p of size: %d\n", batch, len(batch.Events())) } }() // Publish at least 1 batch worth of events but no more than 20% events - publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2)) + publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents)*0.2)) var publishedFirst atomic.Uint blockCtrl := make(chan struct{}) + var batchCount int blockingPublishFn := func(batch publisher.Batch) error { + batchCount++ + // fmt.Printf("(blocking) received batch %p of size: %d -> events: %d/%d, batches: %d/%d\n", + // batch, len(batch.Events()), + // publishedFirst.Load(), numEvents, + // batchCount, numBatches, + // ) + // Emulate blocking. Upon unblocking the in-flight batch that was // blocked is published. if publishedFirst.Load() >= publishLimit { + // fmt.Printf("(blocking) blocking processing waiting for signal. count=%v, limit=%v\n", + // publishedFirst.Load(), + // publishLimit, + // ) <-blockCtrl } publishedFirst.Add(uint(len(batch.Events()))) + // if publishedFirst.Load() >= publishLimit { + // time.Sleep(500 * time.Millisecond) + // } return nil } @@ -128,15 +176,24 @@ func TestMakeClientWorkerAndClose(t *testing.T) { // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second progress := waitUntilTrue(timeout, func() bool { + // fmt.Printf("waiting progress: count=%d, limit=%d\n", + // publishedFirst.Load(), + // publishLimit, + // ) return publishedFirst.Load() >= publishLimit }) if !progress { return false } + fmt.Println("progress detected") // Close worker before all batches have had time to be published + fmt.Println("closing worker") err := worker.Close() require.NoError(t, err) + fmt.Println("worker closed") + + fmt.Println("unblock output") close(blockCtrl) // Start new worker to drain work queue @@ -153,7 +210,7 @@ func TestMakeClientWorkerAndClose(t *testing.T) { // Make sure that all events have eventually been published timeout = 20 * time.Second return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == publishedFirst.Load()+publishedLater.Load() + return numEvents == int(publishedFirst.Load()+publishedLater.Load()) }) }, &quick.Config{MaxCount: 25}) diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 93dd8bbf5d0..f05c9ed2edd 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -36,7 +36,7 @@ type retryer struct { done chan struct{} - consumer *eventConsumer + consumer interruptor sig chan retryerSignal out workQueue @@ -44,6 +44,11 @@ type retryer struct { doneWaiter sync.WaitGroup } +type interruptor interface { + sigWait() + sigUnWait() +} + type retryQueue chan batchEvent type retryerSignal struct { @@ -54,7 +59,7 @@ type retryerSignal struct { type batchEvent struct { tag retryerBatchTag - batch *Batch + batch Batch } type retryerEventTag uint8 @@ -76,7 +81,7 @@ func newRetryer( log *logp.Logger, observer outputObserver, out workQueue, - c *eventConsumer, + c interruptor, ) *retryer { r := &retryer{ logger: log, @@ -114,11 +119,11 @@ func (r *retryer) updOutput(ch workQueue) { } } -func (r *retryer) retry(b *Batch) { +func (r *retryer) retry(b Batch) { r.in <- batchEvent{tag: retryBatch, batch: b} } -func (r *retryer) cancelled(b *Batch) { +func (r *retryer) cancelled(b Batch) { r.in <- batchEvent{tag: cancelledBatch, batch: b} } @@ -128,9 +133,9 @@ func (r *retryer) loop() { out workQueue consumerBlocked bool - active *Batch + active Batch activeSize int - buffer []*Batch + buffer []Batch numOutputs int log = r.logger @@ -145,21 +150,22 @@ func (r *retryer) loop() { countFailed int countDropped int batch = evt.batch - countRetry = len(batch.events) + countRetry = len(batch.Events()) + alive = true ) if evt.tag == retryBatch { - countFailed = len(batch.events) + countFailed = len(batch.Events()) r.observer.eventsFailed(countFailed) - decBatch(batch) + alive = batch.reduceTTL() - countRetry = len(batch.events) + countRetry = len(batch.Events()) countDropped = countFailed - countRetry r.observer.eventsDropped(countDropped) } - if len(batch.events) == 0 { + if !alive { log.Info("Drop batch") batch.Drop() } else { @@ -167,12 +173,14 @@ func (r *retryer) loop() { buffer = append(buffer, batch) out = r.out active = buffer[0] - activeSize = len(active.events) + activeSize = len(active.Events()) if !consumerBlocked { consumerBlocked = blockConsumer(numOutputs, len(buffer)) if consumerBlocked { log.Info("retryer: send wait signal to consumer") - r.consumer.sigWait() + if r.consumer != nil { + r.consumer.sigWait() + } log.Info(" done") } } @@ -188,14 +196,16 @@ func (r *retryer) loop() { out = nil } else { active = buffer[0] - activeSize = len(active.events) + activeSize = len(active.Events()) } if consumerBlocked { consumerBlocked = blockConsumer(numOutputs, len(buffer)) if !consumerBlocked { log.Info("retryer: send unwait-signal to consumer") - r.consumer.sigUnWait() + if r.consumer != nil { + r.consumer.sigUnWait() + } log.Info(" done") } } @@ -216,23 +226,3 @@ func (r *retryer) loop() { func blockConsumer(numOutputs, numBatches int) bool { return numBatches/3 >= numOutputs } - -func decBatch(batch *Batch) { - if batch.ttl <= 0 { - return - } - - batch.ttl-- - if batch.ttl > 0 { - return - } - - // filter for evens with guaranteed send flags - events := batch.events[:0] - for _, event := range batch.events { - if event.Guaranteed() { - events = append(events, event) - } - } - batch.events = events -} diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 13a82816fb3..1a036e4cbc5 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -20,10 +20,10 @@ package pipeline import ( "flag" "math/rand" + "sync" "testing" "time" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -74,9 +74,79 @@ func (p mockProducer) Cancel() int { return 0 } type mockConsumer struct{} -func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } +func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &batch{}, nil } func (c mockConsumer) Close() error { return nil } +type mockBatch struct { + mu sync.Mutex + events []publisher.Event + + onEvents func() + onACK func() + onDrop func() + onRetry func() + onCancelled func() + onReduceTTL func() bool +} + +func (b *mockBatch) Events() []publisher.Event { + b.mu.Lock() + defer b.mu.Unlock() + signalFn(b.onEvents) + return b.events +} + +func (b *mockBatch) ACK() { signalFn(b.onACK) } +func (b *mockBatch) Drop() { signalFn(b.onDrop) } +func (b *mockBatch) Retry() { signalFn(b.onRetry) } +func (b *mockBatch) Cancelled() { signalFn(b.onCancelled) } +func (b *mockBatch) RetryEvents(events []publisher.Event) { b.updateEvents(events); signalFn(b.onRetry) } + +func (b *mockBatch) reduceTTL() bool { + if b.onReduceTTL != nil { + return b.onReduceTTL() + } + return true +} + +func (b *mockBatch) CancelledEvents(events []publisher.Event) { + b.updateEvents(events) + signalFn(b.onCancelled) +} + +func (b *mockBatch) updateEvents(events []publisher.Event) { + b.mu.Lock() + defer b.mu.Unlock() + b.events = events +} + +func (b *mockBatch) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.events) +} + +func (b *mockBatch) withRetryer(r *retryer) *mockBatch { + tmp := &mockBatch{} + *tmp = *b + tmp.onRetry = func() { r.retry(b) } + tmp.onCancelled = func() { r.cancelled(b) } + return tmp +} + +func signalFn(fn func()) { + if fn != nil { + fn() + } +} + +func randomBatch(min, max int) *mockBatch { + return &mockBatch{ + events: make([]publisher.Event, randIntBetween(min, max)), + } +} + +/* func randomBatch(min, max int, wqu workQueue) *Batch { numEvents := randIntBetween(min, max) events := make([]publisher.Event, numEvents) @@ -94,6 +164,7 @@ func randomBatch(min, max int, wqu workQueue) *Batch { return &batch } +*/ // randIntBetween returns a random integer in [min, max) func randIntBetween(min, max int) int { From 229623f2dcf1b9b68de017a60f56a0dd0c7a98ef Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 17 Apr 2020 12:00:58 -0700 Subject: [PATCH 25/34] Do not copy mutex --- libbeat/publisher/pipeline/testing.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 1a036e4cbc5..24bd7975ac6 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -127,11 +127,14 @@ func (b *mockBatch) Len() int { } func (b *mockBatch) withRetryer(r *retryer) *mockBatch { - tmp := &mockBatch{} - *tmp = *b - tmp.onRetry = func() { r.retry(b) } - tmp.onCancelled = func() { r.cancelled(b) } - return tmp + return &mockBatch{ + events: b.events, + onACK: b.onACK, + onDrop: b.onDrop, + onRetry: func() { r.retry(b) }, + onCancelled: func() { r.cancelled(b) }, + onReduceTTL: b.onReduceTTL, + } } func signalFn(fn func()) { From f09c5e017afca36b730d8ac03f6137295fea975e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 17 Apr 2020 13:57:24 -0700 Subject: [PATCH 26/34] Remove debugging statements --- libbeat/publisher/pipeline/output_test.go | 40 +---------------------- 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 4f332630dee..5f3b02d8097 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "fmt" "math" "sync" "testing" @@ -56,27 +55,18 @@ func TestMakeClientWorker(t *testing.T) { publishFn := func(batch publisher.Batch) error { batchCount++ published.Add(uint(len(batch.Events()))) - fmt.Printf("received batch %p of size: %d -> events: %d/%d, batches: %d/%d\n", - batch, len(batch.Events()), - published.Load(), numEvents.Load(), - batchCount, numBatches, - ) return nil } client := ctor(publishFn) worker := makeClientWorker(nilObserver, wqu, client) - defer func() { - fmt.Println("shut down client worker") - worker.Close() - }() + defer worker.Close() for i := uint(0); i < numBatches; i++ { batch := randomBatch(50, 150).withRetryer(retryer) numEvents.Add(uint(len(batch.Events()))) wqu <- batch - fmt.Printf("send batch %p of size: %d\n", batch, len(batch.Events())) } // Give some time for events to be published @@ -111,9 +101,6 @@ func TestReplaceClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 10000 + (i % 100) // between 1000 and 1099 - fmt.Printf("Starting test with numBatch: %d\n", numBatches) - defer fmt.Printf("Finished test with numBatch: %d\n", numBatches) - wqu := makeWorkQueue() retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) defer retryer.close() @@ -129,13 +116,9 @@ func TestReplaceClientWorker(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - fmt.Println("start forwarding batches") - defer fmt.Println("stopped forwarding batches") - defer wg.Done() for _, batch := range batches { wqu <- batch - // fmt.Printf("send batch %p of size: %d\n", batch, len(batch.Events())) } }() @@ -147,26 +130,13 @@ func TestReplaceClientWorker(t *testing.T) { var batchCount int blockingPublishFn := func(batch publisher.Batch) error { batchCount++ - // fmt.Printf("(blocking) received batch %p of size: %d -> events: %d/%d, batches: %d/%d\n", - // batch, len(batch.Events()), - // publishedFirst.Load(), numEvents, - // batchCount, numBatches, - // ) - // Emulate blocking. Upon unblocking the in-flight batch that was // blocked is published. if publishedFirst.Load() >= publishLimit { - // fmt.Printf("(blocking) blocking processing waiting for signal. count=%v, limit=%v\n", - // publishedFirst.Load(), - // publishLimit, - // ) <-blockCtrl } publishedFirst.Add(uint(len(batch.Events()))) - // if publishedFirst.Load() >= publishLimit { - // time.Sleep(500 * time.Millisecond) - // } return nil } @@ -176,24 +146,16 @@ func TestReplaceClientWorker(t *testing.T) { // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second progress := waitUntilTrue(timeout, func() bool { - // fmt.Printf("waiting progress: count=%d, limit=%d\n", - // publishedFirst.Load(), - // publishLimit, - // ) return publishedFirst.Load() >= publishLimit }) if !progress { return false } - fmt.Println("progress detected") // Close worker before all batches have had time to be published - fmt.Println("closing worker") err := worker.Close() require.NoError(t, err) - fmt.Println("worker closed") - fmt.Println("unblock output") close(blockCtrl) // Start new worker to drain work queue From 896505c42348644f1afc5372af3b47d54a908343 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 17 Apr 2020 14:35:51 -0700 Subject: [PATCH 27/34] Bumping up testing/quick max count on TestOutputReload --- libbeat/publisher/pipeline/controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 09dd7b67038..32bdc54109a 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -104,7 +104,7 @@ func TestOutputReload(t *testing.T) { return waitUntilTrue(timeout, func() bool { return uint(numEventsToPublish) == publishedCount.Load() }) - }, &quick.Config{MaxCount: 1}) + }, &quick.Config{MaxCount: 25}) if err != nil { t.Error(err) From a59a56e232cf3a95d7493cc1b752a45ddeb7eddc Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 17 Apr 2020 14:38:42 -0700 Subject: [PATCH 28/34] Removing commented out helper function --- libbeat/publisher/pipeline/testing.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 24bd7975ac6..1d5c2b908ff 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -149,26 +149,6 @@ func randomBatch(min, max int) *mockBatch { } } -/* -func randomBatch(min, max int, wqu workQueue) *Batch { - numEvents := randIntBetween(min, max) - events := make([]publisher.Event, numEvents) - - consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) - retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) - - batch := Batch{ - events: events, - ctx: &batchContext{ - observer: nilObserver, - retryer: retryer, - }, - } - - return &batch -} -*/ - // randIntBetween returns a random integer in [min, max) func randIntBetween(min, max int) int { return rand.Intn(max-min) + min From 041922b16800e2caca696013956c88a3a148a223 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 17 Apr 2020 15:19:41 -0700 Subject: [PATCH 29/34] Simplifying retryer now that workqueue is used across output loads --- libbeat/publisher/pipeline/controller.go | 1 - libbeat/publisher/pipeline/retry.go | 9 --------- 2 files changed, 10 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 1991e243802..0770cf559eb 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -121,7 +121,6 @@ func (c *outputController) Set(outGrp outputs.Group) { c.retryer.sigOutputRemoved() } } - c.retryer.updOutput(c.workQueue) for range clients { c.retryer.sigOutputAdded() } diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index f05c9ed2edd..5521f3444af 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -112,13 +112,6 @@ func (r *retryer) sigOutputRemoved() { r.sig <- retryerSignal{tag: sigRetryerOutputRemoved} } -func (r *retryer) updOutput(ch workQueue) { - r.sig <- retryerSignal{ - tag: sigRetryerUpdateOutput, - channel: ch, - } -} - func (r *retryer) retry(b Batch) { r.in <- batchEvent{tag: retryBatch, batch: b} } @@ -212,8 +205,6 @@ func (r *retryer) loop() { case sig := <-r.sig: switch sig.tag { - case sigRetryerUpdateOutput: - r.out = sig.channel case sigRetryerOutputAdded: numOutputs++ case sigRetryerOutputRemoved: From 5ff72195c6f297c741db9743ebb5bc2681df7ca7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 20 Apr 2020 07:09:49 -0700 Subject: [PATCH 30/34] Renaming parameter --- libbeat/publisher/pipeline/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 0770cf559eb..ed6d7ab5f29 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -64,18 +64,18 @@ func newOutputController( beat beat.Info, monitors Monitors, observer outputObserver, - b queue.Queue, + queue queue.Queue, ) *outputController { c := &outputController{ beat: beat, monitors: monitors, observer: observer, - queue: b, + queue: queue, workQueue: makeWorkQueue(), } ctx := &batchContext{} - c.consumer = newEventConsumer(monitors.Logger, b, ctx) + c.consumer = newEventConsumer(monitors.Logger, queue, ctx) c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer) ctx.observer = observer ctx.retryer = c.retryer From 15a138330bc9768d171f674c9259b3d5ebfadb82 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 20 Apr 2020 18:18:05 -0700 Subject: [PATCH 31/34] Removing debugging variable --- libbeat/publisher/pipeline/output_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index 5f3b02d8097..a1549bccb15 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -51,9 +51,7 @@ func TestMakeClientWorker(t *testing.T) { defer retryer.close() var published atomic.Uint - var batchCount int publishFn := func(batch publisher.Batch) error { - batchCount++ published.Add(uint(len(batch.Events()))) return nil } @@ -127,9 +125,7 @@ func TestReplaceClientWorker(t *testing.T) { var publishedFirst atomic.Uint blockCtrl := make(chan struct{}) - var batchCount int blockingPublishFn := func(batch publisher.Batch) error { - batchCount++ // Emulate blocking. Upon unblocking the in-flight batch that was // blocked is published. if publishedFirst.Load() >= publishLimit { From 6e10331974ef73f88300ddbdf4ec27a9dbe932b0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 20 Apr 2020 18:18:37 -0700 Subject: [PATCH 32/34] Reducing lower bound of random # of batches --- libbeat/publisher/pipeline/output_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index a1549bccb15..5f471ddf396 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -97,7 +97,7 @@ func TestReplaceClientWorker(t *testing.T) { seedPRNG(t) err := quick.Check(func(i uint) bool { - numBatches := 10000 + (i % 100) // between 1000 and 1099 + numBatches := 1000 + (i % 100) // between 1000 and 1099 wqu := makeWorkQueue() retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) From 6a6680cef6399c623a330fa90fe0d829db002a73 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 21 Apr 2020 04:02:58 -0700 Subject: [PATCH 33/34] Removing sigUnWait from controller --- libbeat/publisher/pipeline/controller.go | 1 - libbeat/publisher/pipeline/retry.go | 47 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index ed6d7ab5f29..837a70eab77 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -137,7 +137,6 @@ func (c *outputController) Set(outGrp outputs.Group) { // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() - c.consumer.sigUnWait() c.observer.updateOutputGroup() } diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 5521f3444af..7c1b955b48e 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -168,14 +168,7 @@ func (r *retryer) loop() { active = buffer[0] activeSize = len(active.Events()) if !consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if consumerBlocked { - log.Info("retryer: send wait signal to consumer") - if r.consumer != nil { - r.consumer.sigWait() - } - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } } @@ -193,27 +186,49 @@ func (r *retryer) loop() { } if consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if !consumerBlocked { - log.Info("retryer: send unwait-signal to consumer") - if r.consumer != nil { - r.consumer.sigUnWait() - } - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } case sig := <-r.sig: switch sig.tag { case sigRetryerOutputAdded: numOutputs++ + if consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } case sigRetryerOutputRemoved: numOutputs-- + if !consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } } } } } +func (r *retryer) checkConsumerBlock(numOutputs, numBatches int) bool { + consumerBlocked := blockConsumer(numOutputs, numBatches) + if r.consumer == nil { + return consumerBlocked + } + + if consumerBlocked { + r.logger.Info("retryer: send wait signal to consumer") + if r.consumer != nil { + r.consumer.sigWait() + } + r.logger.Info(" done") + } else { + r.logger.Info("retryer: send unwait signal to consumer") + if r.consumer != nil { + r.consumer.sigUnWait() + } + r.logger.Info(" done") + } + + return consumerBlocked +} + func blockConsumer(numOutputs, numBatches int) bool { return numBatches/3 >= numOutputs } From 09ccd8d0812f3af1e79cf811a024ee9a8fc44319 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 21 Apr 2020 04:42:54 -0700 Subject: [PATCH 34/34] Removing unused field --- libbeat/publisher/pipeline/retry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 7c1b955b48e..0d724e80278 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -54,7 +54,6 @@ type retryQueue chan batchEvent type retryerSignal struct { tag retryerEventTag channel workQueue - done chan struct{} } type batchEvent struct {