Skip to content

Commit

Permalink
[filebeat][streaming] - Added retry functionality to websocket connec…
Browse files Browse the repository at this point in the history
…tions (#40601)

* added websocket retry logic, added input & config tests and updated docs

* updated changelog

* fixed function name spelling error

* added a retryable error check

* addressed PR comments

* passed metrics to handleConnectionResponse to track errors

* addressed PR suggestions

* updated retry dial signature
  • Loading branch information
ShourieG authored Aug 28, 2024
1 parent e1bbe05 commit 0c3c9c6
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]
- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601]

*Auditbeat*

Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,43 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[[retry-streaming]]
[float]
==== `retry`

The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
url: ws://localhost:443/_stream
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
retry:
max_attempts: 5
wait_min: 1s
wait_max: 10s
----
[float]
==== `retry.max_attempts`

The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted.

[float]
==== `retry.wait_min`

The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, `wait_min` might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying.

[float]
==== `retry.wait_max`

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

[float]
=== Metrics

Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package streaming

import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -32,6 +34,8 @@ type config struct {
URL *urlConfig `config:"url" validate:"required"`
// Redact is the debug log state redaction configuration.
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`
}

type redact struct {
Expand All @@ -43,6 +47,12 @@ type redact struct {
Delete bool `config:"delete"`
}

type retry struct {
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
}

type authConfig struct {
// Custom auth config to use for authentication.
CustomAuth *customAuthConfig `config:"custom"`
Expand Down Expand Up @@ -94,6 +104,15 @@ func (c config) Validate() error {
if err != nil {
return err
}

if c.Retry != nil {
switch {
case c.Retry.MaxAttempts <= 0:
return errors.New("max_attempts must be greater than zero")
case c.Retry.WaitMin > c.Retry.WaitMax:
return errors.New("wait_min must be less than or equal to wait_max")
}
}
return nil
}

Expand Down
35 changes: 35 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,41 @@ var configTests = []struct {
},
},
},
{
name: "invalid_retry_wait_min_greater_than_wait_max",
config: map[string]interface{}{
"retry": map[string]interface{}{
"max_attempts": 3,
"wait_min": "3s",
"wait_max": "2s",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("wait_min must be less than or equal to wait_max accessing config"),
},
{
name: "invalid_retry_max_attempts_eq_zero",
config: map[string]interface{}{
"retry": map[string]interface{}{
"max_attempts": 0,
"wait_min": "1s",
"wait_max": "2s",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("max_attempts must be greater than zero accessing config"),
},
{
name: "valid_retry",
config: map[string]interface{}{
"retry": map[string]interface{}{
"max_attempts": 3,
"wait_min": "2s",
"wait_max": "5s",
},
"url": "wss://localhost:443/v1/stream",
},
},
}

func TestConfig(t *testing.T) {
Expand Down
146 changes: 115 additions & 31 deletions x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,30 +265,30 @@ var inputTests = []struct {
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? [inner_body] : [],
})`,
bytes(state.response).decode_json().as(inner_body,{
"events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? [inner_body] : [],
})`,
"state": map[string]interface{}{
"cursor": map[string]int{
"last_updated": 1502908200,
},
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
`{
"pps": {
"agent": "example.proofpoint-1.com",
"cid": "mmeng_vxciml"
},
"ts": 1503081000
}`,
"pps": {
"agent": "example.proofpoint-1.com",
"cid": "mmeng_vxciml"
},
"ts": 1503081000
}`,
},
want: []map[string]interface{}{
{
Expand All @@ -314,13 +314,13 @@ var inputTests = []struct {
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
},
want: []map[string]interface{}{
{
Expand All @@ -346,13 +346,13 @@ var inputTests = []struct {
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -381,6 +381,40 @@ var inputTests = []struct {
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": 1502908200
}`,
},
want: []map[string]interface{}{
{
"pps": map[string]interface{}{
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071",
},
"ts": float64(1502908200),
},
},
},
{
name: "test_retry_success",
server: webSocketServerWithRetry(httptest.NewServer),
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
"retry": map[string]interface{}{
"max_attempts": 3,
"wait_min": "1s",
"wait_max": "2s",
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
Expand All @@ -399,6 +433,23 @@ var inputTests = []struct {
},
},
},
{
name: "test_retry_failure",
server: webSocketServerWithRetry(httptest.NewServer),
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
"retry": map[string]interface{}{
"max_attempts": 2,
"wait_min": "1s",
"wait_max": "2s",
},
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
},
}

var urlEvalTests = []struct {
Expand Down Expand Up @@ -533,7 +584,7 @@ func TestInput(t *testing.T) {
t.Fatalf("unexpected error running test: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

v2Ctx := v2.Context{
Expand Down Expand Up @@ -687,6 +738,39 @@ func webSocketTestServerWithAuth(serve func(http.Handler) *httptest.Server) func
}
}

// webSocketServerWithRetry returns a function that creates a WebSocket server that rejects the first two connection attempts and accepts the third.
func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) {
var attempt int
return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) {
server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempt++
if attempt <= 2 {
w.WriteHeader(http.StatusForbidden)
fmt.Fprintf(w, "connection attempt %d rejected", attempt)
return
}
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("error upgrading connection to WebSocket: %v", err)
return
}

handler(t, conn, response)
}))
// only set the resource URL if it is not already set
if config["url"] == nil {
config["url"] = "ws" + server.URL[4:]
}
t.Cleanup(server.Close)
}
}

// defaultHandler is a default handler for WebSocket connections.
func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
for _, r := range response {
Expand Down
Loading

0 comments on commit 0c3c9c6

Please sign in to comment.