From f80771fcbf927c2a78473845758223f5156907aa Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 14 Dec 2020 20:46:52 +0800 Subject: [PATCH] Add 'expand_keys' option to JSON input/processor (#22849) Co-authored-by: Brandon Morelli (cherry picked from commit 4f4a5536b72f4a25962d56262f31e3b8533b252e) --- CHANGELOG.next.asciidoc | 1 + .../config/filebeat.inputs.reference.yml.tmpl | 5 + .../input-common-harvester-options.asciidoc | 7 +- filebeat/filebeat.reference.yml | 5 + libbeat/common/jsontransform/expand.go | 114 +++++++++++++++ libbeat/common/jsontransform/expand_test.go | 133 ++++++++++++++++++ libbeat/common/jsontransform/jsonhelper.go | 9 +- .../common/jsontransform/jsonhelper_test.go | 42 +++++- .../processors/actions/decode_json_fields.go | 5 +- .../actions/decode_json_fields_test.go | 48 +++++++ .../actions/docs/decode_json_fields.asciidoc | 5 +- libbeat/reader/readjson/json.go | 4 +- libbeat/reader/readjson/json_config.go | 1 + libbeat/reader/readjson/json_test.go | 7 +- x-pack/filebeat/filebeat.reference.yml | 5 + 15 files changed, 383 insertions(+), 8 deletions(-) create mode 100644 libbeat/common/jsontransform/expand.go create mode 100644 libbeat/common/jsontransform/expand_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 72142f53395..95980269e2f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -362,6 +362,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439] - Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521] - Fix polling node when it is not ready and monitor by hostname {pull}22666[22666] +- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849] - Update k8s client and release k8s leader lock gracefully {pull}22919[22919] - Add tini as init system in docker images {pull}22137[22137] - Added "detect_mime_type" processor for detecting mime types {pull}22940[22940] diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index 7eceb559f16..dd459a2cfac 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -119,6 +119,11 @@ filebeat.inputs: # in case of conflicts. #json.overwrite_keys: false + # If this setting is enabled, then keys in the decoded JSON object will be recursively + # de-dotted, and expanded into a hierarchical object structure. + # For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. + #json.expand_keys: false + # If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON # unmarshaling errors or when a text key is defined in the configuration but cannot # be used. diff --git a/filebeat/docs/inputs/input-common-harvester-options.asciidoc b/filebeat/docs/inputs/input-common-harvester-options.asciidoc index e75906f7b64..014958050a2 100644 --- a/filebeat/docs/inputs/input-common-harvester-options.asciidoc +++ b/filebeat/docs/inputs/input-common-harvester-options.asciidoc @@ -181,6 +181,12 @@ level in the output document. The default is false. values from the decoded JSON object overwrite the fields that {beatname_uc} normally adds (type, source, offset, etc.) in case of conflicts. +*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively +de-dot keys in the decoded JSON, and expand them into a hierarchical object +structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. +This setting should be enabled when the input is produced by an +https://github.com/elastic/ecs-logging[ECS logger]. + *`add_error_key`*:: If this setting is enabled, {beatname_uc} adds a "error.message" and "error.type: json" key in case of JSON unmarshalling errors or when a `message_key` is defined in the configuration but cannot be used. @@ -206,4 +212,3 @@ Options that control how {beatname_uc} deals with log messages that span multiple lines. See <> for more information about configuring multiline options. - diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index c03fd89186c..fa3edc4391e 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -506,6 +506,11 @@ filebeat.inputs: # in case of conflicts. #json.overwrite_keys: false + # If this setting is enabled, then keys in the decoded JSON object will be recursively + # de-dotted, and expanded into a hierarchical object structure. + # For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. + #json.expand_keys: false + # If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON # unmarshaling errors or when a text key is defined in the configuration but cannot # be used. diff --git a/libbeat/common/jsontransform/expand.go b/libbeat/common/jsontransform/expand.go new file mode 100644 index 00000000000..534429d96dc --- /dev/null +++ b/libbeat/common/jsontransform/expand.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jsontransform + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// expandFields de-dots the keys in m by expanding them in-place into a +// nested object structure, merging objects as necessary. If there are any +// conflicts (i.e. a common prefix where one field is an object and another +// is a non-object), an error will be returned. +// +// Note that expandFields is destructive, and in the case of an error the +// map may be left in a semi-expanded state. +func expandFields(m common.MapStr) error { + for k, v := range m { + newMap, newIsMap := getMap(v) + if newIsMap { + if err := expandFields(newMap); err != nil { + return errors.Wrapf(err, "error expanding %q", k) + } + } + if dot := strings.IndexRune(k, '.'); dot < 0 { + continue + } + + // Delete the dotted key. + delete(m, k) + + // Put expands k, returning the original value if any. + // + // If v is a map then we will merge with an existing map if any, + // otherwise there must not be an existing value. + old, err := m.Put(k, v) + if err != nil { + // Put will return an error if we attempt to insert into a non-object value. + return fmt.Errorf("cannot expand %q: found conflicting key", k) + } + if old == nil { + continue + } + if !newIsMap { + return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old) + } else { + oldMap, oldIsMap := getMap(old) + if !oldIsMap { + return fmt.Errorf("cannot expand %q: found conflicting key", k) + } + if err := mergeObjects(newMap, oldMap); err != nil { + return errors.Wrapf(err, "cannot expand %q", k) + } + } + } + return nil +} + +// mergeObjects deep merges the elements of rhs into lhs. +// +// mergeObjects will recursively combine the entries of +// objects with the same key in each object. If there exist +// two entries with the same key in each object which +// are not both objects, then an error will result. +func mergeObjects(lhs, rhs common.MapStr) error { + for k, rhsValue := range rhs { + lhsValue, ok := lhs[k] + if !ok { + lhs[k] = rhsValue + continue + } + lhsMap, ok := getMap(lhsValue) + if !ok { + return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue) + } + rhsMap, ok := getMap(rhsValue) + if !ok { + return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue) + } + if err := mergeObjects(lhsMap, rhsMap); err != nil { + return errors.Wrapf(err, "cannot merge %q", k) + } + } + return nil +} + +func getMap(v interface{}) (map[string]interface{}, bool) { + switch v := v.(type) { + case map[string]interface{}: + return v, true + case common.MapStr: + return v, true + } + return nil, false +} diff --git a/libbeat/common/jsontransform/expand_test.go b/libbeat/common/jsontransform/expand_test.go new file mode 100644 index 00000000000..3cdcb94f37d --- /dev/null +++ b/libbeat/common/jsontransform/expand_test.go @@ -0,0 +1,133 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jsontransform + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestExpand(t *testing.T) { + type data struct { + Event common.MapStr + Expected common.MapStr + Err string + } + tests := []data{ + { + Event: common.MapStr{ + "hello.world": 15, + }, + Expected: common.MapStr{ + "hello": common.MapStr{ + "world": 15, + }, + }, + }, + { + Event: common.MapStr{ + "test": 15, + }, + Expected: common.MapStr{ + "test": 15, + }, + }, + { + Event: common.MapStr{ + "test": 15, + "hello.there": 1, + "hello.world.ok": "test", + "elastic.for": "search", + }, + Expected: common.MapStr{ + "test": 15, + "hello": common.MapStr{ + "there": 1, + "world": common.MapStr{ + "ok": "test", + }, + }, + "elastic": common.MapStr{ + "for": "search", + }, + }, + }, + { + Event: common.MapStr{ + "root": common.MapStr{ + "ok": 1, + }, + "root.shared": "yes", + "root.one.two.three": 4, + }, + Expected: common.MapStr{ + "root": common.MapStr{ + "ok": 1, + "shared": "yes", + "one": common.MapStr{"two": common.MapStr{"three": 4}}, + }, + }, + }, + { + Event: common.MapStr{ + "root": common.MapStr{ + "seven": 1, + }, + "root.seven.eight": 2, + }, + Err: `cannot expand .*`, + }, + { + Event: common.MapStr{ + "a.b": 1, + "a": common.MapStr{ + "b": 2, + }, + }, + Err: `cannot expand .*`, + }, + { + Event: common.MapStr{ + "a.b": common.MapStr{ + "c": common.MapStr{ + "d": 1, + }, + }, + "a.b.c": common.MapStr{ + "d": 2, + }, + }, + Err: `cannot expand .*`, + }, + } + + for _, test := range tests { + err := expandFields(test.Event) + if test.Err != "" { + require.Error(t, err) + assert.Regexp(t, test.Err, err.Error()) + continue + } + require.NoError(t, err) + assert.Equal(t, test.Expected, test.Event) + } +} diff --git a/libbeat/common/jsontransform/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go index 6f044ba7f8f..74742cddef2 100644 --- a/libbeat/common/jsontransform/jsonhelper.go +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -27,8 +27,15 @@ import ( ) // WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey -func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) { +func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) { logger := logp.NewLogger("jsonhelper") + if expandKeys { + if err := expandFields(keys); err != nil { + logger.Errorf("JSON: failed to expand fields: %s", err) + event.SetErrorWithOption(createJSONError(err.Error()), addErrKey) + return + } + } if !overwriteKeys { // @timestamp and @metadata fields are root-level fields. We remove them so they // don't become part of event.Fields. diff --git a/libbeat/common/jsontransform/jsonhelper_test.go b/libbeat/common/jsontransform/jsonhelper_test.go index d7679579be1..ae4e4874f8d 100644 --- a/libbeat/common/jsontransform/jsonhelper_test.go +++ b/libbeat/common/jsontransform/jsonhelper_test.go @@ -48,6 +48,7 @@ func TestWriteJSONKeys(t *testing.T) { tests := map[string]struct { keys map[string]interface{} + expandKeys bool overwriteKeys bool expectedMetadata common.MapStr expectedTimestamp time.Time @@ -117,6 +118,45 @@ func TestWriteJSONKeys(t *testing.T) { "top_c": "COMPLETELY_NEW_c", }, }, + "expand_true": { + expandKeys: true, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + expectedMetadata: eventMetadata.Clone(), + expectedTimestamp: eventTimestamp, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": common.MapStr{ + "inner_e": "COMPLETELY_NEW_e", + }, + }, + }, + }, + "expand_false": { + expandKeys: false, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + expectedMetadata: eventMetadata.Clone(), + expectedTimestamp: eventTimestamp, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "dee", + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + }, } for name, test := range tests { @@ -127,7 +167,7 @@ func TestWriteJSONKeys(t *testing.T) { Fields: eventFields.Clone(), } - WriteJSONKeys(event, test.keys, test.overwriteKeys, false) + WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false) require.Equal(t, test.expectedMetadata, event.Meta) require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano()) require.Equal(t, test.expectedFields, event.Fields) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index f0e3db61b3e..f546f4eaf54 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -38,6 +38,7 @@ import ( type decodeJSONFields struct { fields []string maxDepth int + expandKeys bool overwriteKeys bool addErrorKey bool processArray bool @@ -49,6 +50,7 @@ type decodeJSONFields struct { type config struct { Fields []string `config:"fields"` MaxDepth int `config:"max_depth" validate:"min=1"` + ExpandKeys bool `config:"expand_keys"` OverwriteKeys bool `config:"overwrite_keys"` AddErrorKey bool `config:"add_error_key"` ProcessArray bool `config:"process_array"` @@ -87,6 +89,7 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) { f := &decodeJSONFields{ fields: config.Fields, maxDepth: config.MaxDepth, + expandKeys: config.ExpandKeys, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, @@ -144,7 +147,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { } else { switch t := output.(type) { case map[string]interface{}: - jsontransform.WriteJSONKeys(event, t, f.overwriteKeys, f.addErrorKey) + jsontransform.WriteJSONKeys(event, t, f.expandKeys, f.overwriteKeys, f.addErrorKey) default: errs = append(errs, "failed to add target to root") } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 91246c7fed4..ca9c01b08d5 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -400,6 +400,54 @@ func TestAddErrKeyOption(t *testing.T) { } } +func TestExpandKeys(t *testing.T) { + testConfig := common.MustNewConfigFrom(map[string]interface{}{ + "fields": fields, + "expand_keys": true, + "target": "", + }) + input := common.MapStr{"msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`} + expected := common.MapStr{ + "msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`, + "a": common.MapStr{ + "b": map[string]interface{}{ + "c": "c", + "d": "d", + }, + }, + } + actual := getActualValue(t, testConfig, input) + assert.Equal(t, expected, actual) +} + +func TestExpandKeysError(t *testing.T) { + testConfig := common.MustNewConfigFrom(map[string]interface{}{ + "fields": fields, + "expand_keys": true, + "add_error_key": true, + "target": "", + }) + input := common.MapStr{"msg": `{"a.b": "c", "a.b.c": "d"}`} + expected := common.MapStr{ + "msg": `{"a.b": "c", "a.b.c": "d"}`, + "error": common.MapStr{ + "message": "cannot expand ...", + "type": "json", + }, + } + + actual := getActualValue(t, testConfig, input) + assert.Contains(t, actual, "error") + errorField := actual["error"].(common.MapStr) + assert.Contains(t, errorField, "message") + + // The order in which keys are processed is not defined, so the error + // message is not defined. Apart from that, the outcome is the same. + assert.Regexp(t, `cannot expand ".*": .*`, errorField["message"]) + errorField["message"] = "cannot expand ..." + assert.Equal(t, expected, actual) +} + func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { log := logp.NewLogger("decode_json_fields_test") diff --git a/libbeat/processors/actions/docs/decode_json_fields.asciidoc b/libbeat/processors/actions/docs/decode_json_fields.asciidoc index c5aa15c2a3c..fba3342032c 100644 --- a/libbeat/processors/actions/docs/decode_json_fields.asciidoc +++ b/libbeat/processors/actions/docs/decode_json_fields.asciidoc @@ -36,10 +36,13 @@ is treated as if the field was not set at all. `overwrite_keys`:: (Optional) A boolean that specifies whether keys that already exist in the event are overwritten by keys from the decoded JSON object. The default value is false. +`expand_keys`:: (Optional) A boolean that specifies whether keys in the decoded JSON +should be recursively de-dotted, and expanded into a hierarchical object structure. +For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. `add_error_key`:: (Optional) If it set to true, in case of error while decoding json keys `error` field is going to be part of event with error message. If it set to false, there will not be any error in event's field. Even error occurs while decoding json keys. The default value is false. `document_id`:: (Optional) JSON key to use as the document id. If configured, the field will be removed from the original json document and stored in -`@metadata._id` \ No newline at end of file +`@metadata._id` diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index b2c0e5e028f..bbbbdeb3ade 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -120,7 +120,7 @@ func createJSONError(message string) common.MapStr { } // MergeJSONFields writes the JSON fields in the event map, -// respecting the KeysUnderRoot and OverwriteKeys configuration options. +// respecting the KeysUnderRoot, ExpandKeys, and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always // takes precedence. func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) (string, time.Time) { @@ -164,7 +164,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, Timestamp: ts, Fields: data, } - jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys, config.AddErrorKey) + jsontransform.WriteJSONKeys(event, jsonFields, config.ExpandKeys, config.OverwriteKeys, config.AddErrorKey) return id, event.Timestamp } diff --git a/libbeat/reader/readjson/json_config.go b/libbeat/reader/readjson/json_config.go index 5469f00a3c6..ff11c57d95e 100644 --- a/libbeat/reader/readjson/json_config.go +++ b/libbeat/reader/readjson/json_config.go @@ -25,6 +25,7 @@ type Config struct { OverwriteKeys bool `config:"overwrite_keys"` AddErrorKey bool `config:"add_error_key"` IgnoreDecodingError bool `config:"ignore_decoding_error"` + ExpandKeys bool `config:"expand_keys"` } // Validate validates the Config option for JSON reader. diff --git a/libbeat/reader/readjson/json_test.go b/libbeat/reader/readjson/json_test.go index b0470c4512b..68c88f6297c 100644 --- a/libbeat/reader/readjson/json_test.go +++ b/libbeat/reader/readjson/json_test.go @@ -194,7 +194,7 @@ func TestDecodeJSON(t *testing.T) { } } -func TestAddJSONFields(t *testing.T) { +func TestMergeJSONFields(t *testing.T) { type io struct { } @@ -344,6 +344,11 @@ func TestAddJSONFields(t *testing.T) { JSONConfig: Config{DocumentID: "id"}, ExpectedID: "", }, + "expand dotted fields": { + Data: common.MapStr{"json": common.MapStr{"a.b": common.MapStr{"c": "c"}, "a.b.d": "d"}}, + JSONConfig: Config{ExpandKeys: true, KeysUnderRoot: true}, + ExpectedItems: common.MapStr{"a": common.MapStr{"b": common.MapStr{"c": "c", "d": "d"}}}, + }, } for name, test := range tests { diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index e5ef21c6455..4243a1e3913 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2109,6 +2109,11 @@ filebeat.inputs: # in case of conflicts. #json.overwrite_keys: false + # If this setting is enabled, then keys in the decoded JSON object will be recursively + # de-dotted, and expanded into a hierarchical object structure. + # For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`. + #json.expand_keys: false + # If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON # unmarshaling errors or when a text key is defined in the configuration but cannot # be used.