diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 24f8c38455b..5d2db76c716 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -39,4 +39,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Update urllib3 version to 1.24.2 {pull}11930[11930] - Add libbeat/common/cleanup package. {pull}12134[12134] - Only Load minimal template if no fields are provided. {pull}12103[12103] +- Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089] - Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132] \ No newline at end of file diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e4874c185ec..f4e68455c88 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Avoid generating hints-based configuration with empty hosts when no exposed port is suitable for the hosts hint. {issue}8264[8264] {pull}12086[12086] - Fixed a socket leak in the postgresql module under Windows when SSL is disabled on the server. {pull}11393[11393] - Change some field type from scaled_float to long in aws module. {pull}11982[11982] +- Fixed RabbitMQ `queue` metricset gathering when `consumer_utilisation` is set empty at the metrics source {pull}12089[12089] *Packetbeat* diff --git a/libbeat/common/schema/schema.go b/libbeat/common/schema/schema.go index 491da93d3c3..fe9cd794588 100644 --- a/libbeat/common/schema/schema.go +++ b/libbeat/common/schema/schema.go @@ -20,6 +20,8 @@ package schema import ( "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/common" ) @@ -39,10 +41,11 @@ type Mapper interface { // A Conv object represents a conversion mechanism from the data map to the event map. type Conv struct { - Func Converter // Convertor function - Key string // The key in the data map - Optional bool // Whether to ignore errors if the key is not found - Required bool // Whether to provoke errors if the key is not found + Func Converter // Convertor function + Key string // The key in the data map + Optional bool // Whether to ignore errors if the key is not found + Required bool // Whether to provoke errors if the key is not found + IgnoreAllErrors bool // Ignore any value conversion error } // Converter function type @@ -57,6 +60,10 @@ func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{ err.Optional = conv.Optional err.Required = conv.Required } + if conv.IgnoreAllErrors { + logp.Debug("schema", "ignoring error for key %q: %s", key, err) + return nil + } return multierror.Errors{err} } event[key] = value @@ -142,6 +149,12 @@ func Required(c Conv) Conv { return c } +// IgnoreAllErrors set the enable all errors flag +func IgnoreAllErrors(c Conv) Conv { + c.IgnoreAllErrors = true + return c +} + // setOptions adds the optional flags to the Conv object func SetOptions(c Conv, opts []SchemaOption) Conv { for _, opt := range opts { diff --git a/libbeat/common/schema/schema_test.go b/libbeat/common/schema/schema_test.go index 188e416af29..58299c3d5d2 100644 --- a/libbeat/common/schema/schema_test.go +++ b/libbeat/common/schema/schema_test.go @@ -20,6 +20,7 @@ package schema import ( "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" @@ -83,3 +84,90 @@ func TestOptions(t *testing.T) { assert.Equal(t, conv.Key, "test") assert.Equal(t, conv.Optional, true) } + +func TestSchemaCases(t *testing.T) { + + var errFunc = func(key string, data map[string]interface{}) (interface{}, error) { + return nil, errors.New("test error") + } + var noopFunc = func(key string, data map[string]interface{}) (interface{}, error) { return data[key], nil } + + var testCases = []struct { + name string + schema Schema + source map[string]interface{} + + expectedErrorMessage string + expectedOutput common.MapStr + }{ + { + name: "standard schema conversion case", + schema: Schema{ + "outField": Conv{ + Key: "inField", + Func: noopFunc, + IgnoreAllErrors: true, + }, + }, + source: map[string]interface{}{ + "inField": "10", + }, + + expectedOutput: common.MapStr{ + "outField": "10", + }, + }, + { + name: "error at conversion case", + schema: Schema{ + "outField": Conv{ + Key: "inField", + Func: errFunc, + Optional: true, + }, + }, + source: map[string]interface{}{ + "doesntMatter": "", + }, + + expectedErrorMessage: "test error", + expectedOutput: common.MapStr{}, + }, + { + name: "ignore error at conversion case", + schema: Schema{ + "outField": Conv{ + Key: "inField", + Func: errFunc, + Optional: true, + IgnoreAllErrors: true, + }, + }, + source: map[string]interface{}{ + "doesntMatter": "", + }, + + expectedOutput: common.MapStr{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + event, errs := tc.schema.Apply(tc.source) + + if errs != nil { + errorMessage := errs.Error() + if tc.expectedErrorMessage == "" { + t.Errorf("unexpected error ocurred: %s", errorMessage) + } + assert.Contains(t, errorMessage, tc.expectedErrorMessage) + } else if tc.expectedErrorMessage != "" { + t.Errorf("exepected error message %q was not returned", tc.expectedErrorMessage) + } + + assert.Equal(t, tc.expectedOutput, event) + + }) + } +} diff --git a/metricbeat/module/rabbitmq/queue/data.go b/metricbeat/module/rabbitmq/queue/data.go index be042509fdd..2ebae73a138 100644 --- a/metricbeat/module/rabbitmq/queue/data.go +++ b/metricbeat/module/rabbitmq/queue/data.go @@ -45,7 +45,7 @@ var ( "consumers": s.Object{ "count": c.Int("consumers"), "utilisation": s.Object{ - "pct": c.Int("consumer_utilisation", s.Optional), + "pct": c.Int("consumer_utilisation", s.IgnoreAllErrors), }, }, "messages": s.Object{