Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New decode_csv_fields processor #11753

Merged
merged 17 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/decode_csv_fields"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand Down
55 changes: 53 additions & 2 deletions filebeat/tests/system/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,57 @@ def test_truncate_characters(self):
u"This is OK",
])

def test_decode_csv_fields_defaults(self):
"""
Check CSV decoding using defaults
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"decode_csv_fields": {
"fields": {
"message": "csv"
}
},
}]
)

self._init_and_read_test_input([
u"42,\"string with \"\"quotes\"\"\"\n",
u",\n"
])

self._assert_expected_lines([
["42", "string with \"quotes\""],
["", ""]
], field="csv")

def test_decode_csv_fields_all_options(self):
"""
Check CSV decoding with options
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"decode_csv_fields": {
"fields": {
"message": "message"
},
"overwrite_keys": True,
"separator": "\"\t\"",
"trim_leading_space": True,
},
}]
)

self._init_and_read_test_input([
u" 42\t hello world\t \"string\twith tabs and \"broken\" quotes\"\n",
])

self._assert_expected_lines([
["42", "hello world", "string\twith tabs and \"broken\" quotes"],
])

def _init_and_read_test_input(self, input_lines):
with io.open(self.working_dir + "/test.log", "w", encoding="utf-8") as f:
for line in input_lines:
Expand All @@ -258,10 +309,10 @@ def _init_and_read_test_input(self, input_lines):
self.wait_until(lambda: self.output_has(lines=len(input_lines)))
filebeat.check_kill_and_wait()

def _assert_expected_lines(self, expected_lines):
def _assert_expected_lines(self, expected_lines, field="message"):
output = self.read_output()

assert len(output) == len(expected_lines)

for i in range(len(expected_lines)):
assert output[i]["message"] == expected_lines[i]
assert output[i][field] == expected_lines[i]
3 changes: 3 additions & 0 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

"github.com/elastic/beats/journalbeat/config"
_ "github.com/elastic/beats/journalbeat/include"

// Add dedicated processors
_ "github.com/elastic/beats/libbeat/processors/decode_csv_fields"
)

// Journalbeat instance
Expand Down
48 changes: 48 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ The supported processors are:
* <<add-process-metadata,`add_process_metadata`>>
* <<add-tags, `add_tags`>>
* <<community-id,`community_id`>>
ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
* <<decode-json-fields,`decode_json_fields`>>
* <<dissect, `dissect`>>
* <<processor-dns, `dns`>>
Expand Down Expand Up @@ -775,6 +778,51 @@ Adds the environment field to every event:
}
-------------------------------------------------------------------------------

ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")]
* <<decode-csv-fields,`decode_csv_fields`>>
[[decode-csv-fields]]
=== Decode CSV fields

experimental[]

The `decode_csv_fields` processor decodes fields containing records in
comma-separated format (CSV). It will output the values as an array of strings.
This processor is only available for Filebeat and Journalbeat.
adriansr marked this conversation as resolved.
Show resolved Hide resolved

[source,yaml]
-----------------------------------------------------
processors:
- decode_csv_fields:
fields:
message: decoded.csv
separator: ,
ignore_missing: false
overwrite_keys: true
trim_leading_whitespace: false
fail_on_error: true
-----------------------------------------------------

The `decode_csv_fields` has the following settings:

`fields`:: This is a mapping from the source field containing the CSV data to
the destination field where the decoded array will be written.
`separator`:: (Optional) Character to be used as a column separator.
The default is the comma character. For using a TAB character you
must set it to "\t".
`ignore_missing`:: (Optional) Whether to ignore events which lack the source
field. The default is false, which will fail processing of an
event if `target` field is missing.
`overwrite_keys`:: Whether the target field is overwritten if it
already exists. The default is false, which will fail
processing of an event when `target` already exists.
`trim_leading_space`:: Whether extra space after the separator is trimmed from
values. This works even if the separator is also a space.
The default is false.
`fail_on_error`:: (Optional) If set to true, in case of an error the changes to
the event are reverted and the original event is returned. If set to false,
processing continues also if an error happened during renaming. Default is `true`.

endif::[]

[[decode-json-fields]]
=== Decode JSON fields
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type addFields struct {
Expand All @@ -36,9 +37,9 @@ const FieldsKey = "fields"

func init() {
processors.RegisterPlugin("add_fields",
configChecked(CreateAddFields,
requireFields(FieldsKey),
allowedFields(FieldsKey, "target", "when")))
checks.ConfigChecked(CreateAddFields,
checks.RequireFields(FieldsKey),
checks.AllowedFields(FieldsKey, "target", "when")))
}

// CreateAddFields constructs an add_fields processor from config.
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

// LabelsKey is the default target key for the add_labels processor.
const LabelsKey = "labels"

func init() {
processors.RegisterPlugin("add_labels",
configChecked(createAddLabels,
requireFields(LabelsKey),
allowedFields(LabelsKey, "when")))
checks.ConfigChecked(createAddLabels,
checks.RequireFields(LabelsKey),
checks.AllowedFields(LabelsKey, "when")))
}

func createAddLabels(c *common.Config) (processors.Processor, error) {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type addTags struct {
Expand All @@ -33,9 +34,9 @@ type addTags struct {

func init() {
processors.RegisterPlugin("add_tags",
configChecked(createAddTags,
requireFields("tags"),
allowedFields("tags", "target", "when")))
checks.ConfigChecked(createAddTags,
checks.RequireFields("tags"),
checks.AllowedFields("tags", "target", "when")))
}

func createAddTags(c *common.Config) (processors.Processor, error) {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type copyFields struct {
Expand All @@ -40,8 +41,8 @@ type copyFieldsConfig struct {

func init() {
processors.RegisterPlugin("copy_fields",
configChecked(NewCopyFields,
requireFields("fields"),
checks.ConfigChecked(NewCopyFields,
checks.RequireFields("fields"),
),
)
}
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type decodeJSONFields struct {
Expand Down Expand Up @@ -60,9 +61,9 @@ var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(NewDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
checks.ConfigChecked(NewDecodeJSONFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
}

// NewDecodeJSONFields construct a new decode_json_fields processor.
Expand Down
3 changes: 2 additions & 1 deletion libbeat/processors/actions/drop_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type dropEvent struct{}

func init() {
processors.RegisterPlugin("drop_event",
configChecked(newDropEvent, allowedFields("when")))
checks.ConfigChecked(newDropEvent, checks.AllowedFields("when")))
}

var dropEventsSingleton = (*dropEvent)(nil)
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type dropFields struct {
Expand All @@ -32,9 +33,9 @@ type dropFields struct {

func init() {
processors.RegisterPlugin("drop_fields",
configChecked(newDropFields,
requireFields("fields"),
allowedFields("fields", "when")))
checks.ConfigChecked(newDropFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "when")))
}

func newDropFields(c *common.Config) (processors.Processor, error) {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/include_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type includeFields struct {
Expand All @@ -34,9 +35,9 @@ type includeFields struct {

func init() {
processors.RegisterPlugin("include_fields",
configChecked(newIncludeFields,
requireFields("fields"),
allowedFields("fields", "when")))
checks.ConfigChecked(newIncludeFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "when")))
}

func newIncludeFields(c *common.Config) (processors.Processor, error) {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type renameFields struct {
Expand All @@ -45,8 +46,8 @@ type fromTo struct {

func init() {
processors.RegisterPlugin("rename",
configChecked(NewRenameFields,
requireFields("fields")))
checks.ConfigChecked(NewRenameFields,
checks.RequireFields("fields")))
}

// NewRenameFields returns a new rename processor.
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/truncate_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type truncateFieldsConfig struct {
Expand All @@ -48,9 +49,9 @@ type truncater func(*truncateFields, []byte) ([]byte, bool, error)

func init() {
processors.RegisterPlugin("truncate_fields",
configChecked(NewTruncateFields,
requireFields("fields"),
mutuallyExclusiveRequiredFields("max_bytes", "max_characters"),
checks.ConfigChecked(NewTruncateFields,
checks.RequireFields("fields"),
checks.MutuallyExclusiveRequiredFields("max_bytes", "max_characters"),
),
)
}
Expand Down
Loading