Skip to content

Commit

Permalink
Accept multiple ingest pipelines in Filebeat (#8914)
Browse files Browse the repository at this point in the history
Motivated by #8852 (comment).

Starting with 6.5.0, Elasticsearch Ingest Pipelines have gained the ability to:
- run sub-pipelines via the [`pipeline` processor](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/pipeline-processor.html), and
- conditionally run processors via an [`if` field](https://www.elastic.co/guide/en/elasticsearch/reference/6.5/ingest-processors.html).

These abilities combined present the opportunity for a fileset to ingest the same _logical_ information presented in different formats, e.g. plaintext vs. json versions of the same log files. Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally delegates further processing of that log entry, depending on the format, to another pipeline.

This PR allows filesets to specify one or more ingest pipelines via the `ingest_pipeline` property in their `manifest.yml`. If more than one ingest pipeline is specified, the first one is taken to be the entry point ingest pipeline.

#### Example with multiple pipelines
```yaml
ingest_pipeline:
  - pipeline-ze-boss.json 
  - pipeline-plain.json
  - pipeline-json.json
```
#### Example with a single pipeline
_This is just to show that the existing functionality will continue to work as-is._
```yaml
ingest_pipeline: pipeline.json
```

Now, if the root pipeline wants to delegate processing to another pipeline, it must use a `pipeline` processor to do so. This processor's `name` field will need to reference the other pipeline by its name. To ensure correct referencing, the `name` field must be specified as follows:

```json
{
  "pipeline" : {
    "name": "{< IngestPipeline "pipeline-plain" >}"
  }
}
```

This will ensure that the specified name gets correctly converted to the corresponding name in Elasticsearch, since Filebeat prefixes it's "raw" Ingest pipeline names with `filebeat-<version>-<module>-<fileset>-` when loading them into Elasticsearch.
  • Loading branch information
ycombinator committed Dec 27, 2018
1 parent 11a1917 commit 5ba1f11
Show file tree
Hide file tree
Showing 17 changed files with 479 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only.
- Simplified exporting of dashboards. {pull}7730[7730]
- Update Beats to use go 1.11.2 {pull}8746[8746]
- Allow/Merge fields.yml overrides {pull}9188[9188]
- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914]
75 changes: 75 additions & 0 deletions docs/devguide/modules-dev-guide.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ This example selects the ingest pipeline file based on the value of the
resolve to `ingest/with_plugins.json` (assuming the variable value isn't
overridden at runtime.)

In 6.6 and later, you can specify multiple ingest pipelines.

[source,yaml]
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/json_logs.json
----

When multiple ingest pipelines are specified the first one in the list is
considered to be the entry point pipeline.

One reason for using multiple pipelines might be to send all logs harvested
by this fileset to the entry point pipeline and have it delegate different parts of
the processing to other pipelines. You can read details about setting
this up in <<ingest-json-entry-point-pipeline, the `ingest/*.json` section>>.

[float]
==== config/*.yml

Expand Down Expand Up @@ -336,6 +354,63 @@ Note that you should follow the convention of naming of fields prefixed with the
module and fileset name: `{module}.{fileset}.field`, e.g.
`nginx.access.remote_ip`. Also, please review our <<event-conventions>>.

[[ingest-json-entry-point-pipeline]]
In 6.6 and later, ingest pipelines can use the
{ref}/conditionals-with-multiple-pipelines.html[`pipeline` processor] to delegate
parts of the processings to other pipelines.

This can be useful if you want a fileset to ingest the same _logical_ information
presented in different formats, e.g. csv vs. json versions of the same log files.
Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally
delegates further processing of that log entry, depending on the format, to another
pipeline.

["source","json",subs="callouts"]
----
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json-log-processing-pipeline" >}" <1>
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain-log-processing-pipeline" >}"
}
}
]
}
----
<1> Use the `IngestPipeline` template function to resolve the name. This function converts the
specified name into the fully qualified pipeline ID that is stored in Elasticsearch.

In order for the above pipeline to work, Filebeat must load the entry point pipeline
as well as any sub-pipelines into Elasticsearch. You can tell Filebeat to do
so by specifying all the necessary pipelines for the fileset in its `manifest.yml`
file. The first pipeline in the list is considered to be the entry point pipeline.

[source,yaml]
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/json_logs.json
----

While developing the pipeline definition, we recommend making use of the
{elasticsearch}/simulate-pipeline-api.html[Simulate Pipeline API] for testing
and quick iteration.
Expand Down
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/_meta/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- module: foo
# Fileset with multiple pipelines
multi:
enabled: true

# Fileset with multiple pipelines with the last one being bad
multibad:
enabled: true
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs.json

input: config/multi.yml
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"invalid_processor": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs_bad.json

input: config/multi.yml
Loading

0 comments on commit 5ba1f11

Please sign in to comment.