Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add convert processor #11686

Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN]
- Add `add_observer_metadata` processor. {pull}11394[11394]
- Add `decode_csv_fields` processor. {pull}11753[11753]
- Add `convert` processor for converting data types of fields. {issue}8124[8124] {pull}11686[11686]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_observer_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/libbeat/processors/communityid"
_ "github.com/elastic/beats/libbeat/processors/convert"
_ "github.com/elastic/beats/libbeat/processors/dissect"
_ "github.com/elastic/beats/libbeat/processors/dns"
_ "github.com/elastic/beats/libbeat/publisher/includes" // Register publisher pipeline modules
Expand Down
47 changes: 47 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The supported processors are:
* <<add-process-metadata,`add_process_metadata`>>
* <<add-tags, `add_tags`>>
* <<community-id,`community_id`>>
* <<convert,`convert`>>
ifdef::has_decode_csv_fields_processor[]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
Expand Down Expand Up @@ -907,6 +908,52 @@ silently continue without adding the target field.
The processor also accepts an optional `seed` parameter that must be a 16-bit
unsigned integer. This value gets incorporated into all generated hashes.

[[convert]]
=== Convert

The `convert` processor converts a field in the event to a different type, such
as converting a string to an integer.

The supported types include: `integer`, `long`, `float`, `double`, `string`,
`boolean`, and `ip`.

The `ip` type is effectively an alias for `string`, but with an added validation
that the value is an IPv4 or IPv6 address.

[source,yaml]
----
processors:
- convert:
fields:
- {from: "src_ip", to: "source.ip", type: "ip"}
- {from: "src_port", to: "source.port", type: "integer"}
ignore_missing: true
fail_on_error: false
----

The `convert` processor has the following configuration settings:

`fields`:: (Required) This is the list of fields to convert. At least one item
must be contained in the list. Each item in the list must have a `from` key that
specifies the source field. The `to` key is optional and specifies where to
assign the converted value. If `to` is omitted then the `from` field is updated
in-place. The `type` key specifies the data type to convert the value to. If
`type` is omitted then the processor copies or renames the field without any
type conversion.

`ignore_missing`:: (Optional) If `true` the processor continues to the next
field when the `from` key is not found in the event. If false then the processor
returns an error and does not process the remaining fields. Default is `false`.

`fail_on_error`:: (Optional) If false type conversion failures are ignored and
the processor continues to the next field. Default is `true`.

`tag`:: (Optional) An identifier for this processor. Useful for debugging.

`mode`:: (Optional) When both `from` and `to` are defined for a field then
`mode` controls whether to `copy` or `rename` the field when the type conversion
is successful. Default is `copy`.

[[drop-event]]
=== Drop events

Expand Down
134 changes: 134 additions & 0 deletions libbeat/processors/convert/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package convert

import (
"fmt"
"strings"

"github.com/pkg/errors"
)

func defaultConfig() config {
return config{
IgnoreMissing: false,
FailOnError: true,
Mode: copyMode,
}
}

type config struct {
Fields []field `config:"fields" validate:"required"` // List of fields to convert.
Tag string `config:"tag"` // Processor ID for debug and metrics.
IgnoreMissing bool `config:"ignore_missing"` // Skip field when From field is missing.
FailOnError bool `config:"fail_on_error"` // Ignore errors (missing fields / conversion failures).
Mode mode `config:"mode"` // Mode (copy vs rename).
}

type field struct {
From string `config:"from" validate:"required"`
To string `config:"to"`
Type dataType `config:"type"`
}

func (f field) Validate() error {
if f.To == "" && f.Type == unset {
return errors.New("each field must have a 'to' or a 'type'")
}
return nil
}

func (f field) String() string {
return fmt.Sprintf("{from=%v, to=%v, type=%v}", f.From, f.To, f.Type)
}

type dataType uint8

// List of dataTypes.
const (
unset dataType = iota
Integer
Long
Float
Double
String
Boolean
IP
)

var dataTypeNames = map[dataType]string{
unset: "[unset]",
Integer: "integer",
Long: "long",
Float: "float",
Double: "double",
String: "string",
Boolean: "boolean",
IP: "ip",
}

func (dt dataType) String() string {
return dataTypeNames[dt]
}

func (dt dataType) MarshalText() ([]byte, error) {
return []byte(dt.String()), nil
}

func (dt *dataType) Unpack(s string) error {
s = strings.ToLower(s)
for typ, name := range dataTypeNames {
if s == name {
*dt = typ
return nil
}
}
return errors.Errorf("invalid data type: %v", s)
}

type mode uint8

// List of modes.
const (
copyMode mode = iota
renameMode
)

var modeNames = map[mode]string{
copyMode: "copy",
renameMode: "rename",
}

func (m mode) String() string {
return modeNames[m]
}

func (m mode) MarshalText() ([]byte, error) {
return []byte(m.String()), nil
}

func (m *mode) Unpack(s string) error {
s = strings.ToLower(s)
for md, name := range modeNames {
if s == name {
*m = md
return nil
}
}
return errors.Errorf("invalid mode: %v", s)
}
Loading