From c55226e041e7cbf23e6d4a282d389008a78a3351 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 12 Jan 2019 20:58:52 -0800 Subject: [PATCH] Emit error if fileset with multiple pipelines is being used with ES < 6.5 (#10001) Follow up to https://github.com/elastic/beats/pull/8914. In #8914, we introduced the ability for Filebeat filesets to have multiple Ingest pipelines, the first one being the entry point. This feature relies on the Elasticsearch Ingest node having a `pipeline` processor and `if` conditions for processors, both of which were introduced in Elasticsearch 6.5.0. This PR implements a check for whether a fileset has multiple Ingest pipelines AND is talking to an Elasticsearch cluster < 6.5.0. If that's the case, we emit an error. --- CHANGELOG.next.asciidoc | 1 + filebeat/fileset/pipelines.go | 26 +++++ filebeat/fileset/pipelines_test.go | 105 ++++++++++++++++++ .../fileset/test/mod/fls/pipeline-json.json | 1 + .../fileset/test/mod/fls/pipeline-plain.json | 1 + filebeat/fileset/test/pipeline-plain.json | 0 6 files changed, 134 insertions(+) create mode 100644 filebeat/fileset/pipelines_test.go create mode 100644 filebeat/fileset/test/mod/fls/pipeline-json.json create mode 100644 filebeat/fileset/test/mod/fls/pipeline-plain.json create mode 100644 filebeat/fileset/test/pipeline-plain.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8600f545e6f..77355006d6e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Rename a few `logstash.*` fields to map to ECS, remove logstash.slowlog.message. {pull}9935[9935] - Rename a few `mysql.*` fields to map to ECS. {pull}10008[10008] - Rename a few `nginx.error.*` fields to map to ECS. {pull}10007[10007] +- Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001] *Heartbeat* diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 0b6e853ce22..07e520d64f8 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -39,6 +39,25 @@ type PipelineLoader interface { GetVersion() common.Version } +// MultiplePipelineUnsupportedError is an error returned when a fileset uses multiple pipelines but is +// running against a version of Elasticsearch that doesn't support this feature. +type MultiplePipelineUnsupportedError struct { + module string + fileset string + esVersion common.Version + minESVersionRequired common.Version +} + +func (m MultiplePipelineUnsupportedError) Error() string { + return fmt.Sprintf( + "the %s/%s fileset has multiple pipelines, which are only supported with Elasticsearch >= %s. Currently running with Elasticsearch version %s", + m.module, + m.fileset, + m.minESVersionRequired.String(), + m.esVersion.String(), + ) +} + // LoadPipelines loads the pipelines for each configured fileset. func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error { for module, filesets := range reg.registry { @@ -58,6 +77,13 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) } + // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 + esVersion := esClient.GetVersion() + minESVersionRequired := common.MustNewVersion("6.5.0") + if len(pipelines) > 1 && esVersion.LessThan(minESVersionRequired) { + return MultiplePipelineUnsupportedError{module, name, esVersion, *minESVersionRequired} + } + var pipelineIDsLoaded []string for _, pipeline := range pipelines { err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go new file mode 100644 index 00000000000..194df5e9f14 --- /dev/null +++ b/filebeat/fileset/pipelines_test.go @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package fileset + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { + cases := []struct { + name string + esVersion string + isErrExpected bool + }{ + { + name: "ES < 6.5.0", + esVersion: "6.4.1", + isErrExpected: true, + }, + { + name: "ES == 6.5.0", + esVersion: "6.5.0", + isErrExpected: false, + }, + { + name: "ES > 6.5.0", + esVersion: "6.6.0", + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + testFilesetManifest := &manifest{ + Requires: struct { + Processors []ProcessorRequirement `config:"processors"` + }{ + Processors: []ProcessorRequirement{}, + }, + IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"}, + } + testFileset := &Fileset{ + name: "fls", + modulePath: "./test/mod", + manifest: testFilesetManifest, + vars: map[string]interface{}{ + "builtin": map[string]interface{}{}, + }, + pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"}, + } + testRegistry := ModuleRegistry{ + registry: map[string]map[string]*Fileset{ + "mod": map[string]*Fileset{ + "fls": testFileset, + }, + }, + } + + testESServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("{\"version\":{\"number\":\"" + test.esVersion + "\"}}")) + })) + defer testESServer.Close() + + testESClient, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ + URL: testESServer.URL, + }, nil) + assert.NoError(t, err) + + err = testESClient.Connect() + assert.NoError(t, err) + + err = testRegistry.LoadPipelines(testESClient, false) + if test.isErrExpected { + assert.IsType(t, MultiplePipelineUnsupportedError{}, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/filebeat/fileset/test/mod/fls/pipeline-json.json b/filebeat/fileset/test/mod/fls/pipeline-json.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/filebeat/fileset/test/mod/fls/pipeline-json.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/fileset/test/mod/fls/pipeline-plain.json b/filebeat/fileset/test/mod/fls/pipeline-plain.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/filebeat/fileset/test/mod/fls/pipeline-plain.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/fileset/test/pipeline-plain.json b/filebeat/fileset/test/pipeline-plain.json new file mode 100644 index 00000000000..e69de29bb2d