Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept multiple ingest pipelines in Filebeat #8914

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ The list below covers the major changes between 6.3.0 and 7.0.0-alpha2 only.
- Simplified exporting of dashboards. {pull}7730[7730]
- Update Beats to use go 1.11.2 {pull}8746[8746]
- Allow/Merge fields.yml overrides {pull}9188[9188]
- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914]
75 changes: 75 additions & 0 deletions docs/devguide/modules-dev-guide.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ This example selects the ingest pipeline file based on the value of the
resolve to `ingest/with_plugins.json` (assuming the variable value isn't
overridden at runtime.)

In 6.6 and later, you can specify multiple ingest pipelines.

[source,yaml]
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/json_logs.json
----

When multiple ingest pipelines are specified the first one in the list is
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
considered to be the entry point pipeline.

One reason for using multiple pipelines might be to send all logs harvested
by this fileset to the entry point pipeline and have it delegate different parts of
the processing to other pipelines. You can read details about setting
this up in <<ingest-json-entry-point-pipeline, the `ingest/*.json` section>>.

[float]
==== config/*.yml

Expand Down Expand Up @@ -336,6 +354,63 @@ Note that you should follow the convention of naming of fields prefixed with the
module and fileset name: `{module}.{fileset}.field`, e.g.
`nginx.access.remote_ip`. Also, please review our <<event-conventions>>.

[[ingest-json-entry-point-pipeline]]
In 6.6 and later, ingest pipelines can use the
{ref}/conditionals-with-multiple-pipelines.html[`pipeline` processor] to delegate
parts of the processings to other pipelines.

This can be useful if you want a fileset to ingest the same _logical_ information
presented in different formats, e.g. csv vs. json versions of the same log files.
Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally
delegates further processing of that log entry, depending on the format, to another
pipeline.

["source","json",subs="callouts"]
----
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json-log-processing-pipeline" >}" <1>
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain-log-processing-pipeline" >}"
}
}
]
}
----
<1> Use the `IngestPipeline` template function to resolve the name. This function converts the
specified name into the fully qualified pipeline ID that is stored in Elasticsearch.

In order for the above pipeline to work, Filebeat must load the entry point pipeline
as well as any sub-pipelines into Elasticsearch. You can tell Filebeat to do
so by specifying all the necessary pipelines for the fileset in its `manifest.yml`
file. The first pipeline in the list is considered to be the entry point pipeline.

[source,yaml]
----
ingest_pipeline:
- ingest/main.json
- ingest/plain_logs.json
- ingest/json_logs.json
----

While developing the pipeline definition, we recommend making use of the
{elasticsearch}/simulate-pipeline-api.html[Simulate Pipeline API] for testing
and quick iteration.
Expand Down
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/_meta/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- module: foo
# Fileset with multiple pipelines
multi:
enabled: true

# Fileset with multiple pipelines with the last one being bad
multibad:
enabled: true
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs.json

input: config/multi.yml
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"invalid_processor": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs_bad.json

input: config/multi.yml
Loading