diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index da63b4565b6..19b746796f8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Introduce log message for not supported annotations for Hints based autodiscover {pull}38213[38213] - Add persistent volume claim name to volume if available {pull}38839[38839] - Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767] +- Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-websocket.asciidoc b/x-pack/filebeat/docs/inputs/input-websocket.asciidoc index 9e08060a22b..eed0a25d4b5 100644 --- a/x-pack/filebeat/docs/inputs/input-websocket.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-websocket.asciidoc @@ -153,6 +153,30 @@ program: | }) ---- +[[input-url-program-websocket]] +[float] +==== `url_program` + +If present, this CEL program is executed before the websocket connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime. + +["source","yaml",subs="attributes"] +---- +url: ws://testapi:443/v1/streamresults +state: + initial_start_time: "2022-01-01T00:00:00Z" +url_program: | + state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time) +program: | + bytes(state.response).decode_json().as(inner_body,{ + "events": { + "message": inner_body.encode_json(), + }, + "cursor": { + "since": inner_body.timestamp + } + }) +---- + [[state-websocket]] [float] ==== `state` diff --git a/x-pack/filebeat/input/websocket/config.go b/x-pack/filebeat/input/websocket/config.go index 1a961f3c162..490a407790d 100644 --- a/x-pack/filebeat/input/websocket/config.go +++ b/x-pack/filebeat/input/websocket/config.go @@ -14,6 +14,8 @@ import ( ) type config struct { + // URLProgram is the CEL program to be run once before to prep the url. + URLProgram string `config:"url_program"` // Program is the CEL program to be run for each polling. Program string `config:"program"` // Regexps is the set of regular expression to be made diff --git a/x-pack/filebeat/input/websocket/input.go b/x-pack/filebeat/input/websocket/input.go index c48ce177931..69810edd944 100644 --- a/x-pack/filebeat/input/websocket/input.go +++ b/x-pack/filebeat/input/websocket/input.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "net/url" "reflect" "time" @@ -97,9 +98,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p state["cursor"] = cursor } + // initialize the input url with the help of the url_program. + url, err := i.getURL(ctx, state, log) + if err != nil { + metrics.errorsTotal.Inc() + return err + } + // websocket client headers := formHeader(cfg) - url := cfg.URL.String() c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers) if resp != nil && resp.Body != nil { log.Debugw("websocket connection response", "body", resp.Body) @@ -150,6 +157,72 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p } } +// getURL initializes the input URL with the help of the url_program. +func (i input) getURL(ctx context.Context, state map[string]interface{}, log *logp.Logger) (string, error) { + var ( + url string + err error + ) + cfg := i.cfg + if cfg.URLProgram != "" { + state["url"] = cfg.URL.String() + // CEL program which is used to prime/initialize the input url + url_prg, ast, err := newProgram(ctx, cfg.URLProgram, root, nil, log) + if err != nil { + return url, err + } + + log.Debugw("cel engine state before url_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: cfg.Redact}) + start := i.now().In(time.UTC) + url, err = evalURLWith(ctx, url_prg, ast, state, start) + log.Debugw("url_eval result", logp.Namespace("websocket"), "modified_url", url) + if err != nil { + log.Errorw("failed url evaluation", "error", err) + return url, err + } + } else { + url = cfg.URL.String() + } + return url, err +} + +func evalURLWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (string, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{ + // Replace global program "now" with current time. This is necessary + // as the lib.Time now global is static at program instantiation time + // which will persist over multiple evaluations. The lib.Time behaviour + // is correct for mito where CEL program instances live for only a + // single evaluation. Rather than incurring the cost of creating a new + // cel.Program for each evaluation, shadow lib.Time's now with a new + // value for each eval. We retain the lib.Time now global for + // compatibility between CEL programs developed in mito with programs + // run in the input. + "now": now, + root: state, + }) + if err != nil { + err = lib.DecoratedError{AST: ast, Err: err} + } + if e := ctx.Err(); e != nil { + err = e + } + if err != nil { + return "", fmt.Errorf("failed eval: %w", err) + } + v, err := out.ConvertToNative(reflect.TypeOf("")) + if err != nil { + return "", fmt.Errorf("failed type conversion: %w", err) + } + switch v := v.(type) { + case string: + _, err = url.Parse(v) + return v, err + default: + // This should never happen. + return "", fmt.Errorf("unexpected native conversion type: %T", v) + } +} + // processAndPublishData processes the data in state, updates the cursor and publishes it to the publisher. // the CEL program here only executes a single time, since the websocket connection is persistent and events are received and processed in real time. func (i *input) processAndPublishData(ctx context.Context, metrics *inputMetrics, prg cel.Program, ast *cel.Ast, diff --git a/x-pack/filebeat/input/websocket/input_test.go b/x-pack/filebeat/input/websocket/input_test.go index fc98a2f0b46..5c0148ba49d 100644 --- a/x-pack/filebeat/input/websocket/input_test.go +++ b/x-pack/filebeat/input/websocket/input_test.go @@ -6,6 +6,7 @@ package websocket import ( "context" + "errors" "fmt" "net/http" "net/http/httptest" @@ -26,6 +27,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) +//nolint:gosec // These are test tokens and are not used in production code. const ( basicToken = "dXNlcjpwYXNz" bearerToken = "BXNlcjpwYVVz" @@ -399,6 +401,101 @@ var inputTests = []struct { }, } +var urlEvalTests = []struct { + name string + config map[string]interface{} + time func() time.Time + want string +}{ + { + name: "cursor based url modification", + config: map[string]interface{}{ + "url": "ws://testapi/getresults", + "url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since : state.url`, + "state": map[string]interface{}{ + "cursor": map[string]interface{}{ + "since": "2017-08-17T14:54:12", + }, + }, + }, + want: "ws://testapi/getresults?since=2017-08-17T14:54:12", + }, + { + name: "cursor based url modification using simplified query", + config: map[string]interface{}{ + "url": "ws://testapi/getresults", + "url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.url)`, + "state": map[string]interface{}{ + "cursor": map[string]interface{}{ + "since": "2017-08-17T14:54:12", + }, + }, + }, + want: "ws://testapi/getresults?since=2017-08-17T14:54:12", + }, + { + name: "url modification with no cursor", + config: map[string]interface{}{ + "url": "ws://testapi/getresults", + "url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since: state.url+"?since="+ state.initial_start_time`, + "state": map[string]interface{}{ + "initial_start_time": "2022-01-01T00:00:00Z", + }, + }, + want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z", + }, + { + name: "url modification with no cursor, using simplified query", + config: map[string]interface{}{ + "url": "ws://testapi/getresults", + "url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)`, + "state": map[string]interface{}{ + "initial_start_time": "2022-01-01T00:00:00Z", + }, + }, + want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z", + }, +} + +func TestURLEval(t *testing.T) { + logp.TestingSetup() + for _, test := range urlEvalTests { + t.Run(test.name, func(t *testing.T) { + + cfg := conf.MustNewConfigFrom(test.config) + + conf := config{} + conf.Redact = &redact{} + err := cfg.Unpack(&conf) + if err != nil { + t.Fatalf("unexpected error unpacking config: %v", err) + } + + name := input{}.Name() + if name != "websocket" { + t.Errorf(`unexpected input name: got:%q want:"websocket"`, name) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var state map[string]interface{} + if conf.State == nil { + state = make(map[string]interface{}) + } else { + state = conf.State + } + + response, err := input{test.time, conf}.getURL(ctx, state, logp.NewLogger("websocket_url_eval_test")) + if err != nil && !errors.Is(err, context.Canceled) { + t.Errorf("unexpected error from running input: got:%v want:%v", err, nil) + } + + assert.Equal(t, test.want, response) + }) + } +} + func TestInput(t *testing.T) { // tests will ignore context cancelled errors, since they are expected ctxCancelledError := fmt.Errorf("context canceled") @@ -432,7 +529,7 @@ func TestInput(t *testing.T) { t.Fatalf("unexpected error running test: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() v2Ctx := v2.Context{