Skip to content

Commit

Permalink
[Ingest Manager] Guard against empty stream.datasource and namespace (#…
Browse files Browse the repository at this point in the history
…18769)

[Ingest Manager] Guard against empty stream.datasource and namespace (#18769)
  • Loading branch information
michalpristas authored May 27, 2020
1 parent 76dd79f commit d37b598
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 7 deletions.
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
- Remove fleet admin from setup script {pull}18611[18611]
- Correctly report platform and family. {issue}18665[18665]
- Clean action store after enrolling to new configuration {pull}18656[18656]
[Ingest Manager] Avoid watching monitor logs {pull}18723[18723]
- Avoid watching monitor logs {pull}18723[18723]
- Guard against empty stream.datasource and namespace {pull}18769[18769]

==== New features

Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
paths:
- /var/log/hello1.log
- /var/log/hello2.log
dataset: generic
index: logs-generic-default
processors:
- add_fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
dataset: generic
vars:
var: value
processors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ metricbeat:
type: metrics
dataset: docker.status
namespace: default
- module: docker
metricsets: [info]
index: metrics-generic-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
target: "stream"
fields:
type: metrics
dataset: generic
namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ datasources:
streams:
- metricset: status
dataset: docker.status
- metricset: info
dataset: ""
hosts: ["http://127.0.0.1:8080"]
- type: logs
streams:
Expand Down
115 changes: 110 additions & 5 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "make_array"
case *RemoveKeyRule:
name = "remove_key"

case *FixStreamRule:
name = "fix_stream"
default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -153,6 +154,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &MakeArrayRule{}
case "remove_key":
r = &RemoveKeyRule{}
case "fix_stream":
r = &FixStreamRule{}
default:
return fmt.Errorf("unknown rule of type %s", name)
}
Expand Down Expand Up @@ -345,6 +348,100 @@ func CopyAllToList(to, onMerge string, except ...string) *CopyAllToListRule {
}
}

// FixStreamRule fixes streams to contain default values
// in case no value or invalid value are provided
type FixStreamRule struct {
}

// Apply stream fixes.
func (r *FixStreamRule) Apply(ast *AST) error {
const defaultNamespace = "default"
const defaultDataset = "generic"

datasourcesNode, found := Lookup(ast, "datasources")
if !found {
return nil
}

datasourcesList, ok := datasourcesNode.Value().(*List)
if !ok {
return nil
}

for _, datasourceNode := range datasourcesList.value {
nsNode, found := datasourceNode.Find("namespace")
if found {
nsKey, ok := nsNode.(*Key)
if ok {
if newNamespace := nsKey.value.String(); newNamespace == "" {
nsKey.value = &StrVal{value: defaultNamespace}
}
}
} else {
datasourceMap, ok := datasourceNode.(*Dict)
if !ok {
continue
}
datasourceMap.value = append(datasourceMap.value, &Key{
name: "namespace",
value: &StrVal{value: defaultNamespace},
})
}

// get input
inputNode, found := datasourceNode.Find("inputs")
if !found {
continue
}

inputsList, ok := inputNode.Value().(*List)
if !ok {
continue
}

for _, inputNode := range inputsList.value {
streamsNode, ok := inputNode.Find("streams")
if !ok {
continue
}

streamsList, ok := streamsNode.Value().(*List)
if !ok {
continue
}

for _, streamNode := range streamsList.value {
streamMap, ok := streamNode.(*Dict)
if !ok {
continue
}

dsNode, found := streamNode.Find("dataset")
if found {
dsKey, ok := dsNode.(*Key)
if ok {
if newDataset := dsKey.value.String(); newDataset == "" {
dsKey.value = &StrVal{value: defaultDataset}
}
}
} else {
streamMap.value = append(streamMap.value, &Key{
name: "dataset",
value: &StrVal{value: defaultDataset},
})
}
}
}
}

return nil
}

// FixStream creates a FixStreamRule
func FixStream() *FixStreamRule {
return &FixStreamRule{}
}

// InjectIndexRule injects index to each input.
// Index is in form {type}-{namespace}-{dataset-type}
// type: is provided to the rule.
Expand Down Expand Up @@ -375,7 +472,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -413,7 +512,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}

}
Expand Down Expand Up @@ -464,7 +565,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
nsKey, ok := nsNode.(*Key)
if ok {
namespace = nsKey.value.String()
if newNamespace := nsKey.value.String(); newNamespace != "" {
namespace = newNamespace
}
}
}

Expand Down Expand Up @@ -502,7 +605,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
if found {
dsKey, ok := dsNode.(*Key)
if ok {
dataset = dsKey.value.String()
if newDataset := dsKey.value.String(); newDataset != "" {
dataset = newDataset
}
}
}

Expand Down
97 changes: 97 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,86 @@ func TestRules(t *testing.T) {
expectedYAML string
rule Rule
}{
"fix streams": {
givenYAML: `
datasources:
- name: All default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
- name: Specified dataset
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
- name: All default
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/error.log
dataset: generic
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
- name: Specified dataset
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: default
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: generic
`,
rule: &RuleList{
Rules: []Rule{
FixStream(),
},
},
},

"inject index": {
givenYAML: `
datasources:
Expand Down Expand Up @@ -49,6 +129,13 @@ datasources:
streams:
- paths: /var/log/mysql/access.log
dataset: dsds
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
`,
expectedYAML: `
datasources:
Expand Down Expand Up @@ -80,6 +167,14 @@ datasources:
- paths: /var/log/mysql/access.log
dataset: dsds
index: mytype-dsds-nsns
- name: All specified with empty strings
namespace: ""
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
dataset: ""
index: mytype-generic-default
`,
rule: &RuleList{
Rules: []Rule{
Expand Down Expand Up @@ -564,6 +659,7 @@ func TestSerialization(t *testing.T) {
InjectStreamProcessor("insert_after", "index-type"),
CopyToList("t1", "t2", "insert_after"),
CopyAllToList("t2", "insert_before", "a", "b"),
FixStream(),
)

y := `- rename:
Expand Down Expand Up @@ -623,6 +719,7 @@ func TestSerialization(t *testing.T) {
- a
- b
on_conflict: insert_before
- fix_stream: {}
`

t.Run("serialize_rules", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmd: filebeat
args: ["-E", "setup.ilm.enabled=false", "-E", "setup.template.enabled=false", "-E", "management.mode=x-pack-fleet", "-E", "management.enabled=true", "-E", "logging.level=debug"]
configurable: grpc
rules:
- fix_stream: {}
- inject_index:
type: logs

Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ post_install:
path: "modules.d/system.yml"
target: "modules.d/system.yml.disabled"
rules:
- fix_stream: {}
- inject_index:
type: metrics

Expand Down

0 comments on commit d37b598

Please sign in to comment.