diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index a14f5b7d1e2..199c2aea927 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -118,3 +118,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { func (c *publishClient) Test(d testing.Driver) { c.es.Test(d) } + +func (c *publishClient) String() string { + return "publish(" + c.es.String() + ")" +} diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 085b1b6c842..1484bc42c00 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -77,3 +77,7 @@ func (b *backoffClient) Test(d testing.Driver) { c.Test(d) } + +func (b *backoffClient) String() string { + return "backoff(" + b.client.String() + ")" +} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index e5e7ba632ed..67aab4bf569 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -160,3 +160,7 @@ func (c *console) writeBuffer(buf []byte) error { } return nil } + +func (c *console) String() string { + return "console" +} diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 37091683f58..dae9d279ec3 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -663,6 +663,10 @@ func (client *Client) Test(d testing.Driver) { }) } +func (client *Client) String() string { + return "elasticsearch(" + client.Connection.URL + ")" +} + // Connect connects the client. func (conn *Connection) Connect() error { var err error diff --git a/libbeat/outputs/failover.go b/libbeat/outputs/failover.go index 6f24e3a2eb3..99d379a3943 100644 --- a/libbeat/outputs/failover.go +++ b/libbeat/outputs/failover.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "strings" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/testing" @@ -109,3 +110,13 @@ func (f *failoverClient) Test(d testing.Driver) { }) } } + +func (f *failoverClient) String() string { + names := make([]string, len(f.clients)) + + for i, client := range f.clients { + names[i] = client.String() + } + + return "failover(" + strings.Join(names, ",") + ")" +} diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index b30a28733d7..9cdb21452a0 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,6 +35,7 @@ func init() { } type fileOutput struct { + filePath string beat beat.Info observer outputs.Observer rotator *file.Rotator @@ -74,6 +75,8 @@ func (out *fileOutput) init(beat beat.Info, c config) error { path = filepath.Join(c.Path, out.beat.Beat) } + out.filePath = path + var err error out.rotator, err = file.NewFileRotator( path, @@ -148,3 +151,7 @@ func (out *fileOutput) Publish( return nil } + +func (out *fileOutput) String() string { + return "file(" + out.filePath + ")" +} diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c20b4bd3f86..cae9c262eb5 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -20,6 +20,7 @@ package kafka import ( "errors" "fmt" + "strings" "sync" "sync/atomic" @@ -141,6 +142,10 @@ func (c *client) Publish(batch publisher.Batch) error { return nil } +func (c *client) String() string { + return "kafka(" + strings.Join(c.hosts, ",") + ")" +} + func (c *client) getEventMessage(data *publisher.Event) (*message, error) { event := &data.Content msg := &message{partition: -1, data: *data} diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 1e2ad5ba1a1..fa9a6e94825 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -169,6 +169,10 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { return nil } +func (c *asyncClient) String() string { + return "async(" + c.Client.String() + ")" +} + func (c *asyncClient) publishWindowed( ref *msgRef, events []publisher.Event, diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 2f771049537..aff2bf14574 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -35,6 +35,8 @@ type Client interface { // the publisher pipeline. The publisher pipeline (if configured by the output // factory) will take care of retrying/dropping events. Publish(publisher.Batch) error + + String() string } // NetworkClient defines the required client capabilities for network based diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 9b84b6e0fb5..72c6279a4f8 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -152,6 +152,10 @@ func (c *client) Publish(batch publisher.Batch) error { return err } +func (c *client) String() string { + return "redis(" + c.Client.String() + ")" +} + func (c *client) makePublish( conn redis.Conn, ) (publishFn, error) { diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index d337d5f4d82..8313b592e10 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -247,3 +247,7 @@ func (c *Client) Test(d testing.Driver) { d.Fatal("talk to server", err) }) } + +func (c *Client) String() string { + return c.network + "://" + c.host +} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 497f04d4af0..60ed3519ae0 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -85,21 +85,24 @@ func (w *netClientWorker) run() { batch.Cancelled() if w.closed.Load() { - logp.Info("Closed connection") + logp.Info("Closed connection to %v", w.client) return } if reconnectAttempts > 0 { - logp.Info("Attempting to reconnect with %d reconnect attempt(s)", reconnectAttempts) + logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + } else { + logp.Info("Connecting to %v", w.client) } err := w.client.Connect() if err != nil { - logp.Err("Failed to connect: %v", err) + logp.Err("Failed to connect to %v: %v", w.client, err) reconnectAttempts++ continue } + logp.Info("Connection to %v established", w.client) reconnectAttempts = 0 break } diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 6ca2b09fdd6..5bc72ed33cd 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -106,3 +106,7 @@ func (t *testOutput) Publish(batch publisher.Batch) error { return nil } + +func (t *testOutput) String() string { + return "test" +}