From 7ffb35a135d0f996505ef67199df25aedc95bd02 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 23 Feb 2024 08:26:34 +1030 Subject: [PATCH] x-pack/filebeat/input/http_endpoint: generalise event path using CEL Allow users to handle events where multiple events are provided via single JSON objects in an array field. This is enabled by providing an optional CEL program with only minimal support for CEL evaluations; no extensions from the mito lib extensions are made available. --- CHANGELOG.next.asciidoc | 3 +- .../docs/inputs/input-http-endpoint.asciidoc | 5 + x-pack/filebeat/input/http_endpoint/config.go | 1 + .../filebeat/input/http_endpoint/handler.go | 117 +++++++++++++++++- .../input/http_endpoint/handler_test.go | 38 +++++- x-pack/filebeat/input/http_endpoint/input.go | 15 ++- 6 files changed, 167 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14a24c00aabd..20f456224da7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -187,7 +187,8 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Add ETW input. {pull}36915[36915] - Update CEL mito extensions to v1.9.0 to add keys/values helper. {pull}37971[37971] - Add logging for cache processor file reads and writes. {pull}38052[38052] -- Add parseDateInTZ value template for the HTTPJSON input {pull}37738[37738] +- Add parseDateInTZ value template for the HTTPJSON input. {pull}37738[37738] +- Add support for complex event objects in the HTTP Endpoint input. {issue}37910[37910] {pull}38193[38193] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index b7a7ee06f70d..5ffd1924fcdb 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -227,6 +227,11 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON. In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null. +[float] +==== `program` + +The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be treated handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a CEL program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL standard library. CEL optional types are supported. + [float] ==== `response_code` diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 3b0c97741dee..1618dc907583 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -37,6 +37,7 @@ type config struct { URL string `config:"url" validate:"required"` Prefix string `config:"prefix"` ContentType string `config:"content_type"` + Program string `config:"program"` SecretHeader string `config:"secret.header"` SecretValue string `config:"secret.value"` HMACHeader string `config:"hmac.header"` diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 0e2620b5b658..86dccfb57301 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -12,10 +12,16 @@ import ( "io" "net" "net/http" + "reflect" "time" + "github.com/google/cel-go/cel" + "github.com/google/cel-go/checker/decls" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/types/known/structpb" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" @@ -24,6 +30,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/mito/lib" ) const headerContentEncoding = "Content-Encoding" @@ -43,6 +50,7 @@ type handler struct { reqLogger *zap.Logger host, scheme string + program *program messageField string responseCode int responseBody string @@ -80,7 +88,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } - objs, _, status, err := httpReadJSON(body) + objs, _, status, err := httpReadJSON(body, h.program) if err != nil { h.sendAPIErrorResponse(w, r, h.log, status, err) h.metrics.apiErrors.Add(1) @@ -218,22 +226,22 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error { return nil } -func httpReadJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) { +func httpReadJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) { if body == http.NoBody { return nil, nil, http.StatusNotAcceptable, errBodyEmpty } - obj, rawMessage, err := decodeJSON(body) + obj, rawMessage, err := decodeJSON(body, prg) if err != nil { return nil, nil, http.StatusBadRequest, err } return obj, rawMessage, http.StatusOK, err } -func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) { +func decodeJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, err error) { decoder := json.NewDecoder(body) for decoder.More() { var raw json.RawMessage - if err := decoder.Decode(&raw); err != nil { + if err = decoder.Decode(&raw); err != nil { if err == io.EOF { //nolint:errorlint // This will never be a wrapped error. break } @@ -241,9 +249,22 @@ func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, } var obj interface{} - if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil { + if err = newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil { return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err) } + + if prg != nil { + obj, err = prg.eval(obj) + if err != nil { + return nil, nil, err + } + // Re-marshal to ensure the raw bytes agree with the constructed object. + raw, err = json.Marshal(obj) + if err != nil { + return nil, nil, fmt.Errorf("failed to remarshal object: %w", err) + } + } + switch v := obj.(type) { case map[string]interface{}: objs = append(objs, v) @@ -265,6 +286,90 @@ func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, return objs, rawMessages, nil } +type program struct { + prg cel.Program + ast *cel.Ast +} + +func newProgram(src string) (*program, error) { + if src == "" { + return nil, nil + } + + registry, err := types.NewRegistry() + if err != nil { + return nil, fmt.Errorf("failed to create env: %v", err) + } + env, err := cel.NewEnv( + cel.Declarations(decls.NewVar("obj", decls.Dyn)), + cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)), + cel.CustomTypeAdapter(&numberAdapter{registry}), + cel.CustomTypeProvider(registry), + ) + if err != nil { + return nil, fmt.Errorf("failed to create env: %v", err) + } + + ast, iss := env.Compile(src) + if iss.Err() != nil { + return nil, fmt.Errorf("failed compilation: %v", iss.Err()) + } + + prg, err := env.Program(ast) + if err != nil { + return nil, fmt.Errorf("failed program instantiation: %v", err) + } + return &program{prg: prg, ast: ast}, nil +} + +var _ types.Adapter = (*numberAdapter)(nil) + +type numberAdapter struct { + fallback types.Adapter +} + +func (a *numberAdapter) NativeToValue(value any) ref.Val { + if n, ok := value.(json.Number); ok { + var errs []error + i, err := n.Int64() + if err == nil { + return types.Int(i) + } + errs = append(errs, err) + f, err := n.Float64() + if err == nil { + return types.Double(f) + } + errs = append(errs, err) + return types.NewErr("%v", errors.Join(errs...)) + } + return a.fallback.NativeToValue(value) +} + +func (p *program) eval(obj interface{}) (interface{}, error) { + out, _, err := p.prg.Eval(map[string]interface{}{"obj": obj}) + if err != nil { + err = lib.DecoratedError{AST: p.ast, Err: err} + return nil, fmt.Errorf("failed eval: %w", err) + } + + v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Value)(nil))) + if err != nil { + return nil, fmt.Errorf("failed proto conversion: %w", err) + } + switch v := v.(type) { + case *structpb.Value: + return v.AsInterface(), nil + default: + // This should never happen. + return nil, fmt.Errorf("unexpected native conversion type: %T", v) + } +} + +func errorMessage(msg string) map[string]interface{} { + return map[string]interface{}{"error": map[string]interface{}{"message": msg}} +} + func decodeJSONArray(raw *bytes.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) { dec := newJSONDecoder(raw) token, err := dec.Token() diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index 6660508b15b4..cb911f8ab188 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -38,6 +38,7 @@ func Test_httpReadJSON(t *testing.T) { tests := []struct { name string body string + program string wantObjs []mapstr.M wantStatus int wantErr bool @@ -135,10 +136,43 @@ func Test_httpReadJSON(t *testing.T) { }, wantStatus: http.StatusOK, }, + { + name: "kinesis", + body: `{ + "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", + "timestamp": 1578090901599, + "records": [ + { + "data": "aGVsbG8=" + }, + { + "data": "aGVsbG8gd29ybGQ=" + } + ] +}`, + program: `obj.records.map(r, { + "requestId": obj.requestId, + "timestamp": string(obj.timestamp), // leave timestamp in unix milli for ingest to handle. + "event": r, + })`, + wantRawMessage: []json.RawMessage{ + []byte(`{"event":{"data":"aGVsbG8="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`), + []byte(`{"event":{"data":"aGVsbG8gd29ybGQ="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`), + }, + wantObjs: []mapstr.M{ + {"event": map[string]any{"data": "aGVsbG8="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"}, + {"event": map[string]any{"data": "aGVsbG8gd29ybGQ="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"}, + }, + wantStatus: http.StatusOK, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body)) + prg, err := newProgram(tt.program) + if err != nil { + t.Fatalf("failed to compile program: %v", err) + } + gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body), prg) if (err != nil) != tt.wantErr { t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr) return @@ -344,7 +378,7 @@ func Test_apiResponse(t *testing.T) { pub := new(publisher) metrics := newInputMetrics("") defer metrics.Close() - apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics) + apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics) // Execute handler. respRec := httptest.NewRecorder() diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index ca648b697470..7d5055ebe653 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -131,6 +131,14 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m metrics.route.Set(u.Path) metrics.isTLS.Set(e.tlsConfig != nil) + var prg *program + if e.config.Program != "" { + prg, err = newProgram(e.config.Program) + if err != nil { + return err + } + } + p.mu.Lock() s, ok := p.servers[e.addr] if ok { @@ -149,7 +157,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m return err } log.Infof("Adding %s end point to server on %s", pattern, e.addr) - s.mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics)) + s.mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics)) s.idOf[pattern] = ctx.ID p.mu.Unlock() <-s.ctx.Done() @@ -165,7 +173,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m srv: srv, } s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() }) - mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics)) + mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics)) p.servers[e.addr] = s p.mu.Unlock() @@ -287,7 +295,7 @@ func (s *server) getErr() error { return s.err } -func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler { +func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler { h := &handler{ log: log, publisher: pub, @@ -305,6 +313,7 @@ func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *log hmacType: c.HMACType, hmacPrefix: c.HMACPrefix, }, + program: prg, messageField: c.Prefix, responseCode: c.ResponseCode, responseBody: c.ResponseBody,