From 5ba1f11dc3ea582649800de1a9255c731ee61fac Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 27 Dec 2018 11:19:31 -0800 Subject: [PATCH] Accept multiple ingest pipelines in Filebeat (#8914) Motivated by https://github.com/elastic/beats/pull/8852#issuecomment-434973388. Starting with 6.5.0, Elasticsearch Ingest Pipelines have gained the ability to: - run sub-pipelines via the [`pipeline` processor](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/pipeline-processor.html), and - conditionally run processors via an [`if` field](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/ingest-processors.html). These abilities combined present the opportunity for a fileset to ingest the same _logical_ information presented in different formats, e.g. plaintext vs. json versions of the same log files. Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally delegates further processing of that log entry, depending on the format, to another pipeline. This PR allows filesets to specify one or more ingest pipelines via the `ingest_pipeline` property in their `manifest.yml`. If more than one ingest pipeline is specified, the first one is taken to be the entry point ingest pipeline. #### Example with multiple pipelines ```yaml ingest_pipeline: - pipeline-ze-boss.json - pipeline-plain.json - pipeline-json.json ``` #### Example with a single pipeline _This is just to show that the existing functionality will continue to work as-is._ ```yaml ingest_pipeline: pipeline.json ``` Now, if the root pipeline wants to delegate processing to another pipeline, it must use a `pipeline` processor to do so. This processor's `name` field will need to reference the other pipeline by its name. To ensure correct referencing, the `name` field must be specified as follows: ```json { "pipeline" : { "name": "{< IngestPipeline "pipeline-plain" >}" } } ``` This will ensure that the specified name gets correctly converted to the corresponding name in Elasticsearch, since Filebeat prefixes it's "raw" Ingest pipeline names with `filebeat----` when loading them into Elasticsearch. --- CHANGELOG-developer.asciidoc | 1 + docs/devguide/modules-dev-guide.asciidoc | 75 +++++++++ .../_meta/test/module/foo/_meta/config.yml | 8 + .../test/module/foo/multi/config/multi.yml | 8 + .../module/foo/multi/ingest/json_logs.json | 10 ++ .../module/foo/multi/ingest/pipeline.json | 27 ++++ .../module/foo/multi/ingest/plain_logs.json | 12 ++ .../_meta/test/module/foo/multi/manifest.yml | 8 + .../test/module/foo/multibad/config/multi.yml | 8 + .../module/foo/multibad/ingest/json_logs.json | 10 ++ .../module/foo/multibad/ingest/pipeline.json | 27 ++++ .../foo/multibad/ingest/plain_logs_bad.json | 12 ++ .../test/module/foo/multibad/manifest.yml | 8 + filebeat/fileset/fileset.go | 142 ++++++++++++------ filebeat/fileset/fileset_test.go | 50 ++++-- filebeat/fileset/modules_integration_test.go | 94 ++++++++++++ filebeat/fileset/pipelines.go | 40 ++++- 17 files changed, 479 insertions(+), 61 deletions(-) create mode 100644 filebeat/_meta/test/module/foo/_meta/config.yml create mode 100644 filebeat/_meta/test/module/foo/multi/config/multi.yml create mode 100644 filebeat/_meta/test/module/foo/multi/ingest/json_logs.json create mode 100644 filebeat/_meta/test/module/foo/multi/ingest/pipeline.json create mode 100644 filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json create mode 100644 filebeat/_meta/test/module/foo/multi/manifest.yml create mode 100644 filebeat/_meta/test/module/foo/multibad/config/multi.yml create mode 100644 filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json create mode 100644 filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json create mode 100644 filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json create mode 100644 filebeat/_meta/test/module/foo/multibad/manifest.yml diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index ffd921aa6cc..8bcaa0cdceb 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -72,3 +72,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only. - Simplified exporting of dashboards. {pull}7730[7730] - Update Beats to use go 1.11.2 {pull}8746[8746] - Allow/Merge fields.yml overrides {pull}9188[9188] +- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914] diff --git a/docs/devguide/modules-dev-guide.asciidoc b/docs/devguide/modules-dev-guide.asciidoc index 7fc1e5e10b1..0d8c9d7883d 100644 --- a/docs/devguide/modules-dev-guide.asciidoc +++ b/docs/devguide/modules-dev-guide.asciidoc @@ -229,6 +229,24 @@ This example selects the ingest pipeline file based on the value of the resolve to `ingest/with_plugins.json` (assuming the variable value isn't overridden at runtime.) +In 6.6 and later, you can specify multiple ingest pipelines. + +[source,yaml] +---- +ingest_pipeline: + - ingest/main.json + - ingest/plain_logs.json + - ingest/json_logs.json +---- + +When multiple ingest pipelines are specified the first one in the list is +considered to be the entry point pipeline. + +One reason for using multiple pipelines might be to send all logs harvested +by this fileset to the entry point pipeline and have it delegate different parts of +the processing to other pipelines. You can read details about setting +this up in <>. + [float] ==== config/*.yml @@ -336,6 +354,63 @@ Note that you should follow the convention of naming of fields prefixed with the module and fileset name: `{module}.{fileset}.field`, e.g. `nginx.access.remote_ip`. Also, please review our <>. +[[ingest-json-entry-point-pipeline]] +In 6.6 and later, ingest pipelines can use the +{ref}/conditionals-with-multiple-pipelines.html[`pipeline` processor] to delegate +parts of the processings to other pipelines. + +This can be useful if you want a fileset to ingest the same _logical_ information +presented in different formats, e.g. csv vs. json versions of the same log files. +Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally +delegates further processing of that log entry, depending on the format, to another +pipeline. + +["source","json",subs="callouts"] +---- +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json-log-processing-pipeline" >}" <1> + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain-log-processing-pipeline" >}" + } + } + ] +} +---- +<1> Use the `IngestPipeline` template function to resolve the name. This function converts the +specified name into the fully qualified pipeline ID that is stored in Elasticsearch. + +In order for the above pipeline to work, Filebeat must load the entry point pipeline +as well as any sub-pipelines into Elasticsearch. You can tell Filebeat to do +so by specifying all the necessary pipelines for the fileset in its `manifest.yml` +file. The first pipeline in the list is considered to be the entry point pipeline. + +[source,yaml] +---- +ingest_pipeline: + - ingest/main.json + - ingest/plain_logs.json + - ingest/json_logs.json +---- + While developing the pipeline definition, we recommend making use of the {elasticsearch}/simulate-pipeline-api.html[Simulate Pipeline API] for testing and quick iteration. diff --git a/filebeat/_meta/test/module/foo/_meta/config.yml b/filebeat/_meta/test/module/foo/_meta/config.yml new file mode 100644 index 00000000000..309b856eb83 --- /dev/null +++ b/filebeat/_meta/test/module/foo/_meta/config.yml @@ -0,0 +1,8 @@ +- module: foo + # Fileset with multiple pipelines + multi: + enabled: true + + # Fileset with multiple pipelines with the last one being bad + multibad: + enabled: true diff --git a/filebeat/_meta/test/module/foo/multi/config/multi.yml b/filebeat/_meta/test/module/foo/multi/config/multi.yml new file mode 100644 index 00000000000..00ffdbc7204 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/config/multi.yml @@ -0,0 +1,8 @@ +type: log +paths: + - /tmp +exclude_files: [".gz$"] + +fields: + service.name: "foo" +fields_under_root: true diff --git a/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json b/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json new file mode 100644 index 00000000000..183f11b881c --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json @@ -0,0 +1,10 @@ +{ + "processors": [ + { + "rename": { + "field": "json", + "target_field": "log.meta" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json b/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json new file mode 100644 index 00000000000..3197d1d1731 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json @@ -0,0 +1,27 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json_logs" >}" + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain_logs" >}" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json b/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json new file mode 100644 index 00000000000..c6547f342d9 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json @@ -0,0 +1,12 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{DATA:some_data}" + ] + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/manifest.yml b/filebeat/_meta/test/module/foo/multi/manifest.yml new file mode 100644 index 00000000000..aa961b73579 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/manifest.yml @@ -0,0 +1,8 @@ +module_version: 1.0 + +ingest_pipeline: + - ingest/pipeline.json + - ingest/json_logs.json + - ingest/plain_logs.json + +input: config/multi.yml diff --git a/filebeat/_meta/test/module/foo/multibad/config/multi.yml b/filebeat/_meta/test/module/foo/multibad/config/multi.yml new file mode 100644 index 00000000000..00ffdbc7204 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/config/multi.yml @@ -0,0 +1,8 @@ +type: log +paths: + - /tmp +exclude_files: [".gz$"] + +fields: + service.name: "foo" +fields_under_root: true diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json b/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json new file mode 100644 index 00000000000..183f11b881c --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json @@ -0,0 +1,10 @@ +{ + "processors": [ + { + "rename": { + "field": "json", + "target_field": "log.meta" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json b/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json new file mode 100644 index 00000000000..3197d1d1731 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json @@ -0,0 +1,27 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json_logs" >}" + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain_logs" >}" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json b/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json new file mode 100644 index 00000000000..bc3ea8d3fa5 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json @@ -0,0 +1,12 @@ +{ + "processors": [ + { + "invalid_processor": { + "field": "message", + "patterns": [ + "^%{DATA:some_data}" + ] + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/manifest.yml b/filebeat/_meta/test/module/foo/multibad/manifest.yml new file mode 100644 index 00000000000..c3ae496cba4 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/manifest.yml @@ -0,0 +1,8 @@ +module_version: 1.0 + +ingest_pipeline: + - ingest/pipeline.json + - ingest/json_logs.json + - ingest/plain_logs_bad.json + +input: config/multi.yml diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 59a30b7e17c..6b0dc51fed9 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -34,6 +34,8 @@ import ( "strings" "text/template" + errw "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" @@ -42,13 +44,18 @@ import ( // Fileset struct is the representation of a fileset. type Fileset struct { - name string - mcfg *ModuleConfig - fcfg *FilesetConfig - modulePath string - manifest *manifest - vars map[string]interface{} - pipelineID string + name string + mcfg *ModuleConfig + fcfg *FilesetConfig + modulePath string + manifest *manifest + vars map[string]interface{} + pipelineIDs []string +} + +type pipeline struct { + id string + contents map[string]interface{} } // New allocates a new Fileset object with the given configuration. @@ -84,12 +91,12 @@ func (fs *Fileset) Read(beatVersion string) error { return err } - fs.vars, err = fs.evaluateVars() + fs.vars, err = fs.evaluateVars(beatVersion) if err != nil { return err } - fs.pipelineID, err = fs.getPipelineID(beatVersion) + fs.pipelineIDs, err = fs.getPipelineIDs(beatVersion) if err != nil { return err } @@ -102,7 +109,7 @@ func (fs *Fileset) Read(beatVersion string) error { type manifest struct { ModuleVersion string `config:"module_version"` Vars []map[string]interface{} `config:"var"` - IngestPipeline string `config:"ingest_pipeline"` + IngestPipeline []string `config:"ingest_pipeline"` Input string `config:"input"` MachineLearning []struct { Name string `config:"name"` @@ -150,10 +157,10 @@ func (fs *Fileset) readManifest() (*manifest, error) { } // evaluateVars resolves the fileset variables. -func (fs *Fileset) evaluateVars() (map[string]interface{}, error) { +func (fs *Fileset) evaluateVars(beatVersion string) (map[string]interface{}, error) { var err error vars := map[string]interface{}{} - vars["builtin"], err = fs.getBuiltinVars() + vars["builtin"], err = fs.getBuiltinVars(beatVersion) if err != nil { return nil, err } @@ -264,7 +271,14 @@ func applyTemplate(vars map[string]interface{}, templateString string, specialDe if specialDelims { tpl = tpl.Delims("{<", ">}") } - tpl, err := tpl.Parse(templateString) + + tplFunctions, err := getTemplateFunctions(vars) + if err != nil { + return "", errw.Wrap(err, "error fetching template functions") + } + tpl = tpl.Funcs(tplFunctions) + + tpl, err = tpl.Parse(templateString) if err != nil { return "", fmt.Errorf("Error parsing template %s: %v", templateString, err) } @@ -276,9 +290,27 @@ func applyTemplate(vars map[string]interface{}, templateString string, specialDe return buf.String(), nil } +func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error) { + builtinVars, ok := vars["builtin"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("error fetching built-in vars as a dictionary") + } + + return template.FuncMap{ + "IngestPipeline": func(shortID string) string { + return formatPipelineID( + builtinVars["module"].(string), + builtinVars["fileset"].(string), + shortID, + builtinVars["beatVersion"].(string), + ) + }, + }, nil +} + // getBuiltinVars computes the supported built in variables and groups them // in a dictionary -func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) { +func (fs *Fileset) getBuiltinVars(beatVersion string) (map[string]interface{}, error) { host, err := os.Hostname() if err != nil || len(host) == 0 { return nil, fmt.Errorf("Error getting the hostname: %v", err) @@ -291,8 +323,11 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) { } return map[string]interface{}{ - "hostname": hostname, - "domain": domain, + "hostname": hostname, + "domain": domain, + "module": fs.mcfg.Module, + "fileset": fs.name, + "beatVersion": beatVersion, }, nil } @@ -329,7 +364,11 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { } // force our pipeline ID - err = cfg.SetString("pipeline", -1, fs.pipelineID) + rootPipelineID := "" + if len(fs.pipelineIDs) > 0 { + rootPipelineID = fs.pipelineIDs[0] + } + err = cfg.SetString("pipeline", -1, rootPipelineID) if err != nil { return nil, fmt.Errorf("Error setting the pipeline ID in the input config: %v", err) } @@ -349,43 +388,60 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { return cfg, nil } -// getPipelineID returns the Ingest Node pipeline ID -func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { - path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) - if err != nil { - return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) +// getPipelineIDs returns the Ingest Node pipeline IDs +func (fs *Fileset) getPipelineIDs(beatVersion string) ([]string, error) { + var pipelineIDs []string + for _, ingestPipeline := range fs.manifest.IngestPipeline { + path, err := applyTemplate(fs.vars, ingestPipeline, false) + if err != nil { + return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + } + + pipelineIDs = append(pipelineIDs, formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion)) } - return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil + return pipelineIDs, nil } -// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs. -func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) { - path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) +// GetPipelines returns the JSON content of the Ingest Node pipeline that parses the logs. +func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline, err error) { + vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion) if err != nil { - return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + return nil, err } - strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path)) - if err != nil { - return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) - } + for idx, ingestPipeline := range fs.manifest.IngestPipeline { + path, err := applyTemplate(fs.vars, ingestPipeline, false) + if err != nil { + return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + } - vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion) - if err != nil { - return "", nil, err - } + strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path)) + if err != nil { + return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) + } - jsonString, err := applyTemplate(vars, string(strContents), true) - if err != nil { - return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) - } + jsonString, err := applyTemplate(vars, string(strContents), true) + if err != nil { + return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) + } - err = json.Unmarshal([]byte(jsonString), &content) - if err != nil { - return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + var content map[string]interface{} + err = json.Unmarshal([]byte(jsonString), &content) + if err != nil { + return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + } + + pipelineID := fs.pipelineIDs[idx] + + p := pipeline{ + pipelineID, + content, + } + pipelines = append(pipelines, p) } - return fs.pipelineID, content, nil + + return pipelines, nil } // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 8435532efa7..f3b398c8612 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -25,6 +25,7 @@ import ( "path/filepath" "runtime" "testing" + "text/template" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -48,7 +49,7 @@ func TestLoadManifestNginx(t *testing.T) { manifest, err := fs.readManifest() assert.NoError(t, err) assert.Equal(t, manifest.ModuleVersion, "1.0") - assert.Equal(t, manifest.IngestPipeline, "ingest/default.json") + assert.Equal(t, manifest.IngestPipeline, []string{"ingest/default.json"}) assert.Equal(t, manifest.Input, "config/nginx-access.yml") vars := manifest.Vars @@ -60,11 +61,14 @@ func TestLoadManifestNginx(t *testing.T) { func TestGetBuiltinVars(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - vars, err := fs.getBuiltinVars() + vars, err := fs.getBuiltinVars("6.6.0") assert.NoError(t, err) assert.IsType(t, vars["hostname"], "a-mac-with-esc-key") assert.IsType(t, vars["domain"], "local") + assert.Equal(t, "nginx", vars["module"]) + assert.Equal(t, "access", vars["fileset"]) + assert.Equal(t, "6.6.0", vars["beatVersion"]) } func TestEvaluateVarsNginx(t *testing.T) { @@ -74,7 +78,7 @@ func TestEvaluateVarsNginx(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) @@ -97,7 +101,7 @@ func TestEvaluateVarsNginxOverride(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) assert.Equal(t, "no_plugins", vars["pipeline"]) @@ -110,7 +114,7 @@ func TestEvaluateVarsMySQL(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) @@ -144,14 +148,16 @@ func TestResolveVariable(t *testing.T) { { Value: "test-{{.value}}", Vars: map[string]interface{}{ - "value": 2, + "value": 2, + "builtin": map[string]interface{}{}, }, Expected: "test-2", }, { Value: []interface{}{"test-{{.value}}", "test1-{{.value}}"}, Vars: map[string]interface{}{ - "value": 2, + "value": 2, + "builtin": map[string]interface{}{}, }, Expected: []interface{}{"test-2", "test1-2"}, }, @@ -216,11 +222,14 @@ func TestGetPipelineNginx(t *testing.T) { assert.NoError(t, fs.Read("5.2.0")) version := common.MustNewVersion("5.2.0") - pipelineID, content, err := fs.GetPipeline(*version) + pipelines, err := fs.GetPipelines(*version) assert.NoError(t, err) - assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID) - assert.Contains(t, content, "description") - assert.Contains(t, content, "processors") + assert.Len(t, pipelines, 1) + + pipeline := pipelines[0] + assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipeline.id) + assert.Contains(t, pipeline.contents, "description") + assert.Contains(t, pipeline.contents, "processors") } func TestGetPipelineConvertTS(t *testing.T) { @@ -251,11 +260,13 @@ func TestGetPipelineConvertTS(t *testing.T) { t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) { ver := common.MustNewVersion(esVersion) - pipelineID, content, err := fs.GetPipeline(*ver) + pipelines, err := fs.GetPipelines(*ver) require.NoError(t, err) - assert.Equal(t, pipelineName, pipelineID) - marshaled, err := json.Marshal(content) + pipeline := pipelines[0] + assert.Equal(t, pipelineName, pipeline.id) + + marshaled, err := json.Marshal(pipeline.contents) require.NoError(t, err) if cfg.Timezone { assert.Contains(t, string(marshaled), "beat.timezone") @@ -265,3 +276,14 @@ func TestGetPipelineConvertTS(t *testing.T) { }) } } + +func TestGetTemplateFunctions(t *testing.T) { + vars := map[string]interface{}{ + "builtin": map[string]interface{}{}, + } + templateFunctions, err := getTemplateFunctions(vars) + assert.NoError(t, err) + assert.IsType(t, template.FuncMap{}, templateFunctions) + assert.Len(t, templateFunctions, 1) + assert.Contains(t, templateFunctions, "IngestPipeline") +} diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 45baabdc354..c806ec2c7b7 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -143,3 +143,97 @@ func hasIngest(client *elasticsearch.Client) bool { v := client.GetVersion() return v.Major >= 5 } + +func hasIngestPipelineProcessor(client *elasticsearch.Client) bool { + v := client.GetVersion() + return v.Major > 6 || (v.Major == 6 && v.Minor >= 5) +} + +func TestLoadMultiplePipelines(t *testing.T) { + client := estest.GetTestingElasticsearch(t) + if !hasIngest(client) { + t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion()) + } + + if !hasIngestPipelineProcessor(client) { + t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion()) + } + + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) + + modulesPath, err := filepath.Abs("../_meta/test/module") + assert.NoError(t, err) + + enabled := true + disabled := false + filesetConfigs := map[string]*FilesetConfig{ + "multi": &FilesetConfig{Enabled: &enabled}, + "multibad": &FilesetConfig{Enabled: &disabled}, + } + configs := []*ModuleConfig{ + &ModuleConfig{"foo", &enabled, filesetConfigs}, + } + + reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0") + if err != nil { + t.Fatal(err) + } + + err = reg.LoadPipelines(client, false) + if err != nil { + t.Fatal(err) + } + + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil) + assert.Equal(t, 200, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil) + assert.Equal(t, 200, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) + assert.Equal(t, 200, status) +} + +func TestLoadMultiplePipelinesWithRollback(t *testing.T) { + client := estest.GetTestingElasticsearch(t) + if !hasIngest(client) { + t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion()) + } + + if !hasIngestPipelineProcessor(client) { + t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion()) + } + + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) + + modulesPath, err := filepath.Abs("../_meta/test/module") + assert.NoError(t, err) + + enabled := true + disabled := false + filesetConfigs := map[string]*FilesetConfig{ + "multi": &FilesetConfig{Enabled: &disabled}, + "multibad": &FilesetConfig{Enabled: &enabled}, + } + configs := []*ModuleConfig{ + &ModuleConfig{"foo", &enabled, filesetConfigs}, + } + + reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0") + if err != nil { + t.Fatal(err) + } + + err = reg.LoadPipelines(client, false) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "invalid_processor") + + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil) + assert.Equal(t, 404, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil) + assert.Equal(t, 404, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) + assert.Equal(t, 404, status) +} diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index d7f63bdabad..0b6e853ce22 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -22,6 +22,8 @@ import ( "fmt" "strings" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -51,13 +53,33 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool } } - pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) + pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) } - err = loadPipeline(esClient, pipelineID, content, overwrite) + + var pipelineIDsLoaded []string + for _, pipeline := range pipelines { + err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) + if err != nil { + err = fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + break + } + pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) + } + if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + // Rollback pipelines and return errors + // TODO: Instead of attempting to load all pipelines and then rolling back loaded ones when there's an + // error, validate all pipelines before loading any of them. This requires https://github.com/elastic/elasticsearch/issues/35495. + errs := multierror.Errors{err} + for _, pipelineID := range pipelineIDsLoaded { + err = deletePipeline(esClient, pipelineID) + if err != nil { + errs = append(errs, err) + } + } + return errs.Err() } } } @@ -65,7 +87,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool } func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { - path := "/_ingest/pipeline/" + pipelineID + path := makeIngestPipelinePath(pipelineID) if !overwrite { status, _, _ := esClient.Request("GET", path, "", nil, nil) if status == 200 { @@ -81,6 +103,16 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } +func deletePipeline(esClient PipelineLoader, pipelineID string) error { + path := makeIngestPipelinePath(pipelineID) + _, _, err := esClient.Request("DELETE", path, "", nil, nil) + return err +} + +func makeIngestPipelinePath(pipelineID string) string { + return "/_ingest/pipeline/" + pipelineID +} + func interpretError(initialErr error, body []byte) error { var response struct { Error struct {