Skip to content

Commit

Permalink
[Libbeat][Metricbeat]Add IgnoreAllErrors to schema.Conv object (#12089)
Browse files Browse the repository at this point in the history
* schema conv option for ignore all errors

Co-Authored-By: Jaime Soriano Pastor <jaime.soriano@elastic.co>
  • Loading branch information
Pablo Mercado and jsoriano authored May 20, 2019
1 parent 0495f6c commit 9c848a9
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Update urllib3 version to 1.24.2 {pull}11930[11930]
- Add libbeat/common/cleanup package. {pull}12134[12134]
- Only Load minimal template if no fields are provided. {pull}12103[12103]
- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
- Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Avoid generating hints-based configuration with empty hosts when no exposed port is suitable for the hosts hint. {issue}8264[8264] {pull}12086[12086]
- Fixed a socket leak in the postgresql module under Windows when SSL is disabled on the server. {pull}11393[11393]
- Change some field type from scaled_float to long in aws module. {pull}11982[11982]
- Fixed RabbitMQ `queue` metricset gathering when `consumer_utilisation` is set empty at the metrics source {pull}12089[12089]

*Packetbeat*

Expand Down
21 changes: 17 additions & 4 deletions libbeat/common/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package schema
import (
"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/libbeat/common"
)

Expand All @@ -39,10 +41,11 @@ type Mapper interface {

// A Conv object represents a conversion mechanism from the data map to the event map.
type Conv struct {
Func Converter // Convertor function
Key string // The key in the data map
Optional bool // Whether to ignore errors if the key is not found
Required bool // Whether to provoke errors if the key is not found
Func Converter // Convertor function
Key string // The key in the data map
Optional bool // Whether to ignore errors if the key is not found
Required bool // Whether to provoke errors if the key is not found
IgnoreAllErrors bool // Ignore any value conversion error
}

// Converter function type
Expand All @@ -57,6 +60,10 @@ func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{
err.Optional = conv.Optional
err.Required = conv.Required
}
if conv.IgnoreAllErrors {
logp.Debug("schema", "ignoring error for key %q: %s", key, err)
return nil
}
return multierror.Errors{err}
}
event[key] = value
Expand Down Expand Up @@ -142,6 +149,12 @@ func Required(c Conv) Conv {
return c
}

// IgnoreAllErrors set the enable all errors flag
func IgnoreAllErrors(c Conv) Conv {
c.IgnoreAllErrors = true
return c
}

// setOptions adds the optional flags to the Conv object
func SetOptions(c Conv, opts []SchemaOption) Conv {
for _, opt := range opts {
Expand Down
88 changes: 88 additions & 0 deletions libbeat/common/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package schema
import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -83,3 +84,90 @@ func TestOptions(t *testing.T) {
assert.Equal(t, conv.Key, "test")
assert.Equal(t, conv.Optional, true)
}

func TestSchemaCases(t *testing.T) {

var errFunc = func(key string, data map[string]interface{}) (interface{}, error) {
return nil, errors.New("test error")
}
var noopFunc = func(key string, data map[string]interface{}) (interface{}, error) { return data[key], nil }

var testCases = []struct {
name string
schema Schema
source map[string]interface{}

expectedErrorMessage string
expectedOutput common.MapStr
}{
{
name: "standard schema conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: noopFunc,
IgnoreAllErrors: true,
},
},
source: map[string]interface{}{
"inField": "10",
},

expectedOutput: common.MapStr{
"outField": "10",
},
},
{
name: "error at conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: errFunc,
Optional: true,
},
},
source: map[string]interface{}{
"doesntMatter": "",
},

expectedErrorMessage: "test error",
expectedOutput: common.MapStr{},
},
{
name: "ignore error at conversion case",
schema: Schema{
"outField": Conv{
Key: "inField",
Func: errFunc,
Optional: true,
IgnoreAllErrors: true,
},
},
source: map[string]interface{}{
"doesntMatter": "",
},

expectedOutput: common.MapStr{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

event, errs := tc.schema.Apply(tc.source)

if errs != nil {
errorMessage := errs.Error()
if tc.expectedErrorMessage == "" {
t.Errorf("unexpected error ocurred: %s", errorMessage)
}
assert.Contains(t, errorMessage, tc.expectedErrorMessage)
} else if tc.expectedErrorMessage != "" {
t.Errorf("exepected error message %q was not returned", tc.expectedErrorMessage)
}

assert.Equal(t, tc.expectedOutput, event)

})
}
}
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/queue/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
"consumers": s.Object{
"count": c.Int("consumers"),
"utilisation": s.Object{
"pct": c.Int("consumer_utilisation", s.Optional),
"pct": c.Int("consumer_utilisation", s.IgnoreAllErrors),
},
},
"messages": s.Object{
Expand Down

0 comments on commit 9c848a9

Please sign in to comment.