diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1bd83b1b8b5a..709fb64b0d61 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d ==== Bugfixes *Affecting all Beats* +- Fix Elasticsearch structured error response parsing error. {issue}2229[2229] *Metricbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 594c68dc2acd..97134b7eeb9f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -480,7 +480,10 @@ func itemStatusInner(reader *jsonReader) (int, []byte, error) { } default: // ignore unknown fields - reader.ignoreNext() + _, err = reader.ignoreNext() + if err != nil { + return 0, nil, err + } } } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index a88af0cd8eea..3d27e6932729 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) @@ -120,6 +121,49 @@ func TestCollectPublishFailAll(t *testing.T) { assert.Equal(t, events, res) } +func TestCollectPipelinePublishFail(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"}) + } + + response := []byte(`{ + "took": 0, "ingest_took": 0, "errors": true, + "items": [ + { + "index": { + "_index": "filebeat-2016.08.10", + "_type": "log", + "_id": null, + "status": 500, + "error": { + "type": "exception", + "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [fail_on_purpose] not present as part of path [fail_on_purpose]", + "caused_by": { + "type": "illegal_argument_exception", + "reason": "java.lang.IllegalArgumentException: field [fail_on_purpose] not present as part of path [fail_on_purpose]", + "caused_by": { + "type": "illegal_argument_exception", + "reason": "field [fail_on_purpose] not present as part of path [fail_on_purpose]" + } + }, + "header": { + "processor_type": "lowercase" + } + } + } + } + ] + }`) + + event := common.MapStr{"field": 2} + events := []common.MapStr{event} + + reader := newJSONReader(response) + res := bulkCollectPublishFails(reader, events) + assert.Equal(t, 1, len(res)) + assert.Equal(t, events, res) +} + func TestGetIndexStandard(t *testing.T) { time := time.Now().UTC() diff --git a/libbeat/outputs/elasticsearch/json_read.go b/libbeat/outputs/elasticsearch/json_read.go index 97ed18a80b25..c914f28d7942 100644 --- a/libbeat/outputs/elasticsearch/json_read.go +++ b/libbeat/outputs/elasticsearch/json_read.go @@ -77,6 +77,21 @@ const ( dictFieldStateEnd ) +var entityNames = map[entity]string{ + failEntity: "failEntity", + trueValue: "trueValue", + falseValue: "falseValue", + nullValue: "nullValue", + dictStart: "dictStart", + dictEnd: "dictEnd", + arrStart: "arrStart", + arrEnd: "arrEnd", + stringEntity: "stringEntity", + mapKeyEntity: "mapKeyEntity", + intEntity: "intEntity", + doubleEntity: "doubleEntity", +} + var stateNames = map[state]string{ failedState: "failed", startState: "start", @@ -87,6 +102,13 @@ var stateNames = map[state]string{ dictFieldStateEnd: "dictNext", } +func (e entity) String() string { + if name, ok := entityNames[e]; ok { + return name + } + return "unknown" +} + func (s state) String() string { if name, ok := stateNames[s]; ok { return name @@ -131,6 +153,7 @@ func (r *jsonReader) popState() { func (r *jsonReader) expectDict() error { entity, _, err := r.step() + if err != nil { return err } @@ -190,36 +213,17 @@ func (r *jsonReader) ignoreNext() (raw []byte, err error) { snapshot := r.Snapshot() before := r.Len() - var ignoreKind func(*jsonReader, entity) error - ignoreKind = func(r *jsonReader, kind entity) error { - - for { - entity, _, err := r.step() - if err != nil { - return err - } - - switch entity { - case kind: - return nil - case arrStart: - return ignoreKind(r, arrEnd) - case dictStart: - return ignoreKind(r, dictEnd) - } - } - } - entity, _, err := r.step() if err != nil { return nil, err } switch entity { - case dictStart: - err = ignoreKind(r, dictEnd) case arrStart: err = ignoreKind(r, arrEnd) + case dictStart: + err = ignoreKind(r, dictEnd) + default: } if err != nil { return nil, err @@ -232,6 +236,28 @@ func (r *jsonReader) ignoreNext() (raw []byte, err error) { return bytes, nil } +func ignoreKind(r *jsonReader, kind entity) error { + for { + entity, _, err := r.step() + if err != nil { + return err + } + + switch entity { + case kind: + return nil + case arrStart: + if err := ignoreKind(r, arrEnd); err != nil { + return err + } + case dictStart: + if err := ignoreKind(r, dictEnd); err != nil { + return err + } + } + } +} + // step continues the JSON parser state machine until next entity has been parsed. func (r *jsonReader) step() (entity, []byte, error) { r.skipWS()