From 84cc638b6d00216eeba311f30fdea95fd920b32a Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 1 Sep 2020 15:29:30 -0500 Subject: [PATCH 1/3] backwards compatibility for set processor - "ignore_empty_value" option for the set processor only works on elasticsearch >= 7.9.0. This change removes that option and replaces it with an if statement if pipeline is loaded on an earlier version of elasticsearch. --- CHANGELOG.next.asciidoc | 1 + filebeat/fileset/pipelines.go | 49 ++++++++++++++ filebeat/fileset/pipelines_test.go | 103 +++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce36b86222c8..45e28dafc0cb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -256,6 +256,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] - Fix event types and categories in auditd module to comply with ECS {pull}20652[20652] - Update documentation in the azure module filebeat. {pull}20815[20815] +- provide backwards compatibility for set processor and elasticsearch less than 7.9.0 {pull}20908[20908] *Heartbeat* diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index db1293054632..de558426d5df 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -127,6 +127,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) } + err = modifySetProcessor(esClient.GetVersion(), pipelineID, content) + if err != nil { + return fmt.Errorf("failed to modify set processor in pipeline: %v", err) + } + body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) @@ -232,3 +237,47 @@ func interpretError(initialErr error, body []byte) error { return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) } + +// modifySetProcessor replaces ignore_empty_value option with an if statement +// so ES less than 7.9 will still work +func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + flagVersion := common.MustNewVersion("7.9.0") + if !esVersion.LessThan(flagVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + for _, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["set"].(map[string]interface{}); ok { + iev, ok := options["ignore_empty_value"].(bool) + if !ok || !iev { + continue + } + val, ok := options["value"].(string) + if !ok { + continue + } + newIf := strings.ReplaceAll(val, "{", "") + newIf = strings.ReplaceAll(newIf, "}", "") + newIf = strings.TrimSpace(newIf) + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" + logp.Debug("modules", "in pipeline %s replacing unsupported 'ignore_empty_value' with if %s in set processor", pipelineID, newIf) + delete(options, "ignore_empty_value") + options["if"] = newIf + } + } + return nil +} diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 648e82a1c2ed..f5d0abe97ec9 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -215,3 +215,106 @@ func TestSetEcsProcessors(t *testing.T) { }) } } + +func TestModifySetProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.9.0", + esVersion: common.MustNewVersion("7.8.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.9.0", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.9.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content) + } + }) + } +} From 0233ae7d34349ef78076b11d95891f1d2d961da3 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 2 Sep 2020 16:24:40 -0500 Subject: [PATCH 2/3] incorporate feedback --- filebeat/fileset/pipelines.go | 24 +++++++--- filebeat/fileset/pipelines_test.go | 76 +++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index de558426d5df..3d89e607ec6e 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -261,21 +261,31 @@ func modifySetProcessor(esVersion common.Version, pipelineID string, content map continue } if options, ok := processor["set"].(map[string]interface{}); ok { - iev, ok := options["ignore_empty_value"].(bool) - if !ok || !iev { + _, ok := options["ignore_empty_value"].(bool) + if !ok { + // don't have ignore_empty_value nothing to do + continue + } + + logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) + delete(options, "ignore_empty_value") + + _, ok = options["if"].(string) + if ok { + // assume if check is sufficient continue } val, ok := options["value"].(string) if !ok { continue } - newIf := strings.ReplaceAll(val, "{", "") - newIf = strings.ReplaceAll(newIf, "}", "") - newIf = strings.TrimSpace(newIf) + + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") newIf = strings.ReplaceAll(newIf, ".", "?.") newIf = "ctx?." + newIf + " != null" - logp.Debug("modules", "in pipeline %s replacing unsupported 'ignore_empty_value' with if %s in set processor", pipelineID, newIf) - delete(options, "ignore_empty_value") + + logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) options["if"] = newIf } } diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index f5d0abe97ec9..65a10212b6ba 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -302,6 +302,80 @@ func TestModifySetProcessor(t *testing.T) { }, isErrExpected: false, }, + { + name: "existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ignore_empty_value is false", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": false, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "ignore_empty_value": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + }, + }, + }}, + isErrExpected: false, + }, } for _, test := range cases { @@ -313,7 +387,7 @@ func TestModifySetProcessor(t *testing.T) { assert.Error(t, err) } else { assert.NoError(t, err) - assert.Equal(t, test.expected, test.content) + assert.Equal(t, test.expected, test.content, test.name) } }) } From 0799f0189507687146f586300f4210b286139132 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 2 Sep 2020 16:26:52 -0500 Subject: [PATCH 3/3] update changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 45e28dafc0cb..70439279c219 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -256,7 +256,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] - Fix event types and categories in auditd module to comply with ECS {pull}20652[20652] - Update documentation in the azure module filebeat. {pull}20815[20815] -- provide backwards compatibility for set processor and elasticsearch less than 7.9.0 {pull}20908[20908] +- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] *Heartbeat*