Skip to content

Commit

Permalink
Fix Elasticsearch structured error reponse parsing error
Browse files Browse the repository at this point in the history
- Fix json recursive structured raw content collection
- Add unit test based on ingest node error message
  • Loading branch information
urso committed Aug 11, 2016
1 parent 9c7442f commit 86b4589
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
==== Bugfixes

*Affecting all Beats*
- Fix Elasticsearch structured error response parsing error. {issue}2229[2229]

*Metricbeat*

Expand Down
5 changes: 4 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ func itemStatusInner(reader *jsonReader) (int, []byte, error) {
}

default: // ignore unknown fields
reader.ignoreNext()
_, err = reader.ignoreNext()
if err != nil {
return 0, nil, err
}
}
}

Expand Down
44 changes: 44 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -120,6 +121,49 @@ func TestCollectPublishFailAll(t *testing.T) {
assert.Equal(t, events, res)
}

func TestCollectPipelinePublishFail(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}

response := []byte(`{
"took": 0, "ingest_took": 0, "errors": true,
"items": [
{
"index": {
"_index": "filebeat-2016.08.10",
"_type": "log",
"_id": null,
"status": 500,
"error": {
"type": "exception",
"reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [fail_on_purpose] not present as part of path [fail_on_purpose]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "java.lang.IllegalArgumentException: field [fail_on_purpose] not present as part of path [fail_on_purpose]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "field [fail_on_purpose] not present as part of path [fail_on_purpose]"
}
},
"header": {
"processor_type": "lowercase"
}
}
}
}
]
}`)

event := common.MapStr{"field": 2}
events := []common.MapStr{event}

reader := newJSONReader(response)
res := bulkCollectPublishFails(reader, events)
assert.Equal(t, 1, len(res))
assert.Equal(t, events, res)
}

func TestGetIndexStandard(t *testing.T) {

time := time.Now().UTC()
Expand Down
70 changes: 48 additions & 22 deletions libbeat/outputs/elasticsearch/json_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ const (
dictFieldStateEnd
)

var entityNames = map[entity]string{
failEntity: "failEntity",
trueValue: "trueValue",
falseValue: "falseValue",
nullValue: "nullValue",
dictStart: "dictStart",
dictEnd: "dictEnd",
arrStart: "arrStart",
arrEnd: "arrEnd",
stringEntity: "stringEntity",
mapKeyEntity: "mapKeyEntity",
intEntity: "intEntity",
doubleEntity: "doubleEntity",
}

var stateNames = map[state]string{
failedState: "failed",
startState: "start",
Expand All @@ -87,6 +102,13 @@ var stateNames = map[state]string{
dictFieldStateEnd: "dictNext",
}

func (e entity) String() string {
if name, ok := entityNames[e]; ok {
return name
}
return "unknown"
}

func (s state) String() string {
if name, ok := stateNames[s]; ok {
return name
Expand Down Expand Up @@ -131,6 +153,7 @@ func (r *jsonReader) popState() {

func (r *jsonReader) expectDict() error {
entity, _, err := r.step()

if err != nil {
return err
}
Expand Down Expand Up @@ -190,36 +213,17 @@ func (r *jsonReader) ignoreNext() (raw []byte, err error) {
snapshot := r.Snapshot()
before := r.Len()

var ignoreKind func(*jsonReader, entity) error
ignoreKind = func(r *jsonReader, kind entity) error {

for {
entity, _, err := r.step()
if err != nil {
return err
}

switch entity {
case kind:
return nil
case arrStart:
return ignoreKind(r, arrEnd)
case dictStart:
return ignoreKind(r, dictEnd)
}
}
}

entity, _, err := r.step()
if err != nil {
return nil, err
}

switch entity {
case dictStart:
err = ignoreKind(r, dictEnd)
case arrStart:
err = ignoreKind(r, arrEnd)
case dictStart:
err = ignoreKind(r, dictEnd)
default:
}
if err != nil {
return nil, err
Expand All @@ -232,6 +236,28 @@ func (r *jsonReader) ignoreNext() (raw []byte, err error) {
return bytes, nil
}

func ignoreKind(r *jsonReader, kind entity) error {
for {
entity, _, err := r.step()
if err != nil {
return err
}

switch entity {
case kind:
return nil
case arrStart:
if err := ignoreKind(r, arrEnd); err != nil {
return err
}
case dictStart:
if err := ignoreKind(r, dictEnd); err != nil {
return err
}
}
}
}

// step continues the JSON parser state machine until next entity has been parsed.
func (r *jsonReader) step() (entity, []byte, error) {
r.skipWS()
Expand Down

0 comments on commit 86b4589

Please sign in to comment.