Skip to content

Commit

Permalink
Cherry-pick #19962 to 7.9: [Filebeat] Fix s3 input parsing json file …
Browse files Browse the repository at this point in the history
…without expand_event_list_from_field (#20135)

* [Filebeat] Fix s3 input parsing json file without expand_event_list_from_field (#19962)

* Fix s3 input parsing json file without expand_event_list_from_field

(cherry picked from commit 2bf84dd)

* update changelog
  • Loading branch information
kaiyan-sheng authored Jul 22, 2020
1 parent b7c8acc commit a7c87c7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834]
- Fixed typo in log message. {pull}17897[17897]
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]
- Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962]

*Heartbeat*

Expand Down
79 changes: 40 additions & 39 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,10 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
gzipReader.Close()
}

// Check if expand_event_list_from_field is given with document content-type = "application/json"
if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" {
err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file")
p.logger.Error(err)
return err
}

// Decode JSON documents when expand_event_list_from_field is given in config
if p.config.ExpandEventListFromField != "" {
// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
Expand Down Expand Up @@ -512,59 +505,67 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
return nil
}

func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
offset := 0
for {
var jsonFields map[string][]interface{}
var jsonFields interface{}
err := decoder.Decode(&jsonFields)
if jsonFields == nil {
return nil
}

if err == io.EOF {
// create event for last line
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name)
p.logger.Warn(err)
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}
}
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
return offset, err
}
for _, v := range textValues {
offset, err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
}
return offset, nil
}
case map[string]interface{}:
offset, err := p.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
return offset, nil
}
return offset, nil
}

func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
vJSON, err := json.Marshal(jsonFields)
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
Expand All @@ -575,9 +576,9 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH
if err != nil {
err = errors.Wrap(err, "forwardEvent failed")
p.logger.Error(err)
return err
return offset, err
}
return nil
return offset, nil
}

func (p *s3Input) forwardEvent(event beat.Event) error {
Expand Down

0 comments on commit a7c87c7

Please sign in to comment.