From 892241536fa5cf5f5d0d7f2d66f681e4cb748e21 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 26 Oct 2021 12:55:10 +0200 Subject: [PATCH] [filebeat] Add CSV decoder to httpjson input (#28564) * Add CSV decoder to httpjson input * Fix error check and test * Make allocated map to the size of the header --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 4 +- x-pack/filebeat/input/httpjson/encoding.go | 43 +++++++++++++++++ .../filebeat/input/httpjson/encoding_test.go | 46 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c2ed5ee5aec..fe00a7dc4bd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -781,6 +781,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385] - Add latency config option for aws-cloudwatch input. {pull}28509[28509] - Added proxy support to threatintel/malwarebazaar. {pull}28533[28533] +- Add `text/csv` decoder to `httpjson` input {pull}28564[28564] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index e2f8b7eba28..4548b40413e 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -473,7 +473,9 @@ filebeat.inputs: [float] ==== `response.decode_as` -ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`. It is not set by default. +ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`, `text/csv`. It is not set by default. + +NOTE: For `text/csv`, one event for each line will be created, using the header values as the object keys. For this reason is always assumed that a header exists. [[response-transforms]] [float] diff --git a/x-pack/filebeat/input/httpjson/encoding.go b/x-pack/filebeat/input/httpjson/encoding.go index 178b5918aba..38bc6c2ad3a 100644 --- a/x-pack/filebeat/input/httpjson/encoding.go +++ b/x-pack/filebeat/input/httpjson/encoding.go @@ -6,8 +6,10 @@ package httpjson import ( "bytes" + "encoding/csv" "encoding/json" "errors" + "io" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -85,6 +87,7 @@ func registerDecoders() { log := logp.L().Named(logName) log.Debug(registerDecoder("application/json", decodeAsJSON)) log.Debug(registerDecoder("application/x-ndjson", decodeAsNdjson)) + log.Debug(registerDecoder("text/csv", decodeAsCSV)) } func encodeAsJSON(trReq transformable) ([]byte, error) { @@ -125,3 +128,43 @@ func decodeAsNdjson(p []byte, dst *response) error { dst.body = results return nil } + +func decodeAsCSV(p []byte, dst *response) error { + var results []interface{} + + r := csv.NewReader(bytes.NewReader(p)) + + // a header is always expected, otherwise we can't map + // values to keys in the event + header, err := r.Read() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + event, err := r.Read() + for ; err == nil; event, err = r.Read() { + o := make(map[string]interface{}, len(header)) + if len(header) != len(event) { + // sanity check, csv.Reader should fail on this scenario + // and this code path should be unreachable + return errors.New("malformed CSV, record does not match header length") + } + for i, h := range header { + o[h] = event[i] + } + results = append(results, o) + } + + if err != nil { + if err != io.EOF { + return err + } + } + + dst.body = results + + return nil +} diff --git a/x-pack/filebeat/input/httpjson/encoding_test.go b/x-pack/filebeat/input/httpjson/encoding_test.go index 6c967f7cbb5..8045953ba9a 100644 --- a/x-pack/filebeat/input/httpjson/encoding_test.go +++ b/x-pack/filebeat/input/httpjson/encoding_test.go @@ -38,6 +38,51 @@ func TestDecodeNdjson(t *testing.T) { } } +func TestDecodeCSV(t *testing.T) { + tests := []struct { + body string + result string + err string + }{ + {"", "", ""}, + { + "EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" + + "Login,20211018071353.465,id1,id2,user1\n" + + "Login,20211018071505.579,id4,id5,user2\n", + `[{"EVENT_TYPE":"Login","TIMESTAMP":"20211018071353.465","REQUEST_ID":"id1","ORGANIZATION_ID":"id2","USER_ID":"user1"}, + {"EVENT_TYPE":"Login","TIMESTAMP":"20211018071505.579","REQUEST_ID":"id4","ORGANIZATION_ID":"id5","USER_ID":"user2"}]`, + "", + }, + { + "EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" + + "Login,20211018071505.579,id4,user2\n", + "", + "record on line 2: wrong number of fields", + }, + } + for _, test := range tests { + resp := &response{} + err := decodeAsCSV([]byte(test.body), resp) + if test.err != "" { + assert.Error(t, err) + assert.EqualError(t, err, test.err) + } else { + assert.NoError(t, err) + + var j []byte + if test.body != "" { + j, err = json.Marshal(resp.body) + if err != nil { + t.Fatalf("Marshal failed: %v", err) + } + assert.JSONEq(t, test.result, string(j)) + } else { + assert.Equal(t, test.result, string(j)) + } + } + } +} + func TestEncodeAsForm(t *testing.T) { tests := []struct { params map[string]string @@ -60,6 +105,7 @@ func TestEncodeAsForm(t *testing.T) { trReq := transformable{} trReq.setURL(*u) res, err := encodeAsForm(trReq) + assert.NoError(t, err) assert.Equal(t, test.body, string(res)) assert.Equal(t, "application/x-www-form-urlencoded", trReq.header().Get("Content-Type")) }