Skip to content

Commit

Permalink
Make network_direction, registered_domain and convert processors comp…
Browse files Browse the repository at this point in the history
…atible with ES older than 7.13.0 (#26676)

Adds three new Filebeat fileset compatibility tweaks to support Elasticsearch versions before 7.13.0:

- Replaces usages of convert processor using type: ip with an equivalent grok expression.
  Convert to ip type is used to make a conditional field copy if the source field is a valid IP address.
- Removes the network_direction processor.
- Removes the registered_domain processor.
  • Loading branch information
adriansr authored Jul 2, 2021
1 parent f8c68b5 commit 65d2193
Show file tree
Hide file tree
Showing 3 changed files with 533 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `kibana.log` pipeline when `event.duration` calculation becomes a Long. {issue}24556[24556] {pull}25675[25675]
- Removed incorrect `http.request.referrer` field from `aws.elb` module. {issue}26435[26435] {pull}26441[26441]
- Fix `threatintel.indicator.url.full` not being populated. {issue}26351[26351] {pull}26508[26508]
- Fix Elasticsearch compatibility for modules that use `type: ip` with `convert` processors. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `network_direction` processor. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `registered_domain` processor. {issue}26629[26629] {pull}26676[26676]

*Heartbeat*

Expand Down
132 changes: 103 additions & 29 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,31 @@ import (
// processorCompatibility defines a processor's minimum version requirements or
// a transformation to make it compatible.
type processorCompatibility struct {
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible.
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible.
}

type compatAction func(interface{}) (interface{}, error)

func keepProcessor(original interface{}) (interface{}, error) {
return original, nil
}

func dropProcessor(interface{}) (interface{}, error) {
return nil, nil
}

func replaceProcessor(newProc interface{}) compatAction {
return func(interface{}) (interface{}, error) {
return newProc, nil
}
}

func fail(err error) compatAction {
return func(interface{}) (interface{}, error) {
return nil, err
}
}

var processorCompatibilityChecks = []processorCompatibility{
Expand Down Expand Up @@ -70,26 +92,47 @@ var processorCompatibilityChecks = []processorCompatibility{
return esVersion.LessThan(common.MustNewVersion("7.0.0")) &&
!esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
config["ecs"] = true
return false, nil
return keepProcessor
},
},
{
procType: "user_agent",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required"))
},
},
{
procType: "convert",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: replaceConvertIP,
},
{
procType: "network_direction",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
{
procType: "registered_domain",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
}

// adaptPipelineForCompatibility iterates over all processors in the pipeline
// and adapts them for version of Elasticsearch used. Adapt can mean modifying
// processor options or removing the processor.
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error {
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) {
p, ok := content["processors"]
if !ok {
return errors.New("'processors' is missing from the pipeline definition")
Expand All @@ -104,12 +147,12 @@ func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string,

nextProcessor:
for i, obj := range processors {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

for _, proc := range processorCompatibilityChecks {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

configIfc, found := processor[proc.procType]
if !found {
continue
Expand All @@ -123,16 +166,17 @@ nextProcessor:
continue
}

drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
obj, err = act(obj)
if err != nil {
return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err)
}
if drop {
if obj == nil {
continue nextProcessor
}
}

filteredProcs = append(filteredProcs, processors[i])
filteredProcs = append(filteredProcs, obj)
}

content["processors"] = filteredProcs
Expand All @@ -141,14 +185,16 @@ nextProcessor:

// deleteProcessor returns true to indicate that the processor should be deleted
// in order to adapt the pipeline for backwards compatibility to Elasticsearch.
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil }
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction {
return dropProcessor
}

// replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if
// statement so ES less than 7.9 will work.
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction {
_, ok := config["ignore_empty_value"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'ignore_empty_value' from set processor.")
Expand All @@ -157,11 +203,11 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)
_, ok = config["if"].(string)
if ok {
// assume if check is sufficient
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

newIf := strings.TrimLeft(val, "{ ")
Expand All @@ -171,37 +217,37 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)

log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf)
config["if"] = newIf
return false, nil
return keepProcessor
}

// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement
// so ES less than 7.10 will work.
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction {
allow, ok := config["allow_duplicates"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'allow_duplicates' from append processor.")
delete(config, "allow_duplicates")

if allow {
// It was set to true, nothing else to do after removing the option.
return false, nil
return keepProcessor
}

currIf, _ := config["if"].(string)
if strings.Contains(strings.ToLower(currIf), "contains") {
// If it has a contains statement, we assume it is checking for duplicates already.
return false, nil
return keepProcessor
}
field, ok := config["field"].(string)
if !ok {
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

field = strings.ReplaceAll(field, ".", "?.")
Expand All @@ -220,5 +266,33 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge
log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf)
config["if"] = newIf

return false, nil
return keepProcessor
}

// replaceConvertIP replaces convert processors with type: ip with a grok expression that uses
// the IP pattern.
func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction {
wantedType, found := config["type"]
if !found || wantedType != "ip" {
return keepProcessor
}
log.Debug("processor input=", config)
delete(config, "type")
var srcIf, dstIf interface{}
if srcIf, found = config["field"]; !found {
return fail(errors.New("field option is required for convert processor"))
}
if dstIf, found = config["target_field"]; found {
delete(config, "target_field")
} else {
dstIf = srcIf
}
config["patterns"] = []string{
fmt.Sprintf("^%%{IP:%s}$", dstIf),
}
grok := map[string]interface{}{
"grok": config,
}
log.Debug("processor output=", grok)
return replaceProcessor(grok)
}
Loading

0 comments on commit 65d2193

Please sign in to comment.