Skip to content

Commit

Permalink
Log reconnect attempts (elastic#6404)
Browse files Browse the repository at this point in the history
* Log reconnect attempts (elastic#5715)

* Add identifiers to connections

(cherry picked from commit 06dea8b)
  • Loading branch information
recrsn authored and urso committed Aug 29, 2018
1 parent 8de038e commit a5b7bf8
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 1 deletion.
4 changes: 4 additions & 0 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
func (c *publishClient) Test(d testing.Driver) {
c.es.Test(d)
}

func (c *publishClient) String() string {
return "publish(" + c.es.String() + ")"
}
4 changes: 4 additions & 0 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ func (b *backoffClient) Test(d testing.Driver) {

c.Test(d)
}

func (b *backoffClient) String() string {
return "backoff(" + b.client.String() + ")"
}
4 changes: 4 additions & 0 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ func (c *console) writeBuffer(buf []byte) error {
}
return nil
}

func (c *console) String() string {
return "console"
}
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ func (client *Client) Test(d testing.Driver) {
})
}

func (client *Client) String() string {
return "elasticsearch(" + client.Connection.URL + ")"
}

// Connect connects the client.
func (conn *Connection) Connect() error {
var err error
Expand Down
11 changes: 11 additions & 0 deletions libbeat/outputs/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/rand"
"strings"

"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/testing"
Expand Down Expand Up @@ -109,3 +110,13 @@ func (f *failoverClient) Test(d testing.Driver) {
})
}
}

func (f *failoverClient) String() string {
names := make([]string, len(f.clients))

for i, client := range f.clients {
names[i] = client.String()
}

return "failover(" + strings.Join(names, ",") + ")"
}
7 changes: 7 additions & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
}

type fileOutput struct {
filePath string
beat beat.Info
observer outputs.Observer
rotator *file.Rotator
Expand Down Expand Up @@ -74,6 +75,8 @@ func (out *fileOutput) init(beat beat.Info, c config) error {
path = filepath.Join(c.Path, out.beat.Beat)
}

out.filePath = path

var err error
out.rotator, err = file.NewFileRotator(
path,
Expand Down Expand Up @@ -149,3 +152,7 @@ func (out *fileOutput) Publish(

return nil
}

func (out *fileOutput) String() string {
return "file(" + out.filePath + ")"
}
5 changes: 5 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka
import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -141,6 +142,10 @@ func (c *client) Publish(batch publisher.Batch) error {
return nil
}

func (c *client) String() string {
return "kafka(" + strings.Join(c.hosts, ",") + ")"
}

func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
event := &data.Content
msg := &message{partition: -1, data: *data}
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (c *asyncClient) Publish(batch publisher.Batch) error {
return nil
}

func (c *asyncClient) String() string {
return "async(" + c.Client.String() + ")"
}

func (c *asyncClient) publishWindowed(
ref *msgRef,
events []publisher.Event,
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Client interface {
// the publisher pipeline. The publisher pipeline (if configured by the output
// factory) will take care of retrying/dropping events.
Publish(publisher.Batch) error

String() string
}

// NetworkClient defines the required client capabilities for network based
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (c *client) Publish(batch publisher.Batch) error {
return err
}

func (c *client) String() string {
return "redis(" + c.Client.String() + ")"
}

func (c *client) makePublish(
conn redis.Conn,
) (publishFn, error) {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,7 @@ func (c *Client) Test(d testing.Driver) {
d.Fatal("talk to server", err)
})
}

func (c *Client) String() string {
return c.network + "://" + c.host
}
14 changes: 13 additions & 1 deletion libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,33 @@ func (w *netClientWorker) Close() error {

func (w *netClientWorker) run() {
for !w.closed.Load() {
reconnectAttempts := 0

// start initial connect loop from first batch, but return
// batch to pipeline for other outputs to catch up while we're trying to connect
for batch := range w.qu {
batch.Cancelled()

if w.closed.Load() {
logp.Info("Closed connection to %v", w.client)
return
}

if reconnectAttempts > 0 {
logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
} else {
logp.Info("Connecting to %v", w.client)
}

err := w.client.Connect()
if err != nil {
logp.Err("Failed to connect: %v", err)
logp.Err("Failed to connect to %v: %v", w.client, err)
reconnectAttempts++
continue
}

logp.Info("Connection to %v established", w.client)
reconnectAttempts = 0
break
}

Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/stress/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ func (t *testOutput) Publish(batch publisher.Batch) error {

return nil
}

func (t *testOutput) String() string {
return "test"
}

0 comments on commit a5b7bf8

Please sign in to comment.