Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Fix s3 input parsing json file without expand_event_list_from_field #19962

Merged
merged 3 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]
- Ignore missing in Zeek module when dropping unecessary fields. {pull}19984[19984]
- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]
- Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962]

*Heartbeat*

Expand Down
79 changes: 40 additions & 39 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,10 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
gzipReader.Close()
}

// Check if expand_event_list_from_field is given with document content-type = "application/json"
if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" {
err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file")
p.logger.Error(err)
return err
}

// Decode JSON documents when expand_event_list_from_field is given in config
if p.config.ExpandEventListFromField != "" {
// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
Expand Down Expand Up @@ -512,59 +505,67 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
return nil
}

func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
offset := 0
for {
var jsonFields map[string][]interface{}
var jsonFields interface{}
err := decoder.Decode(&jsonFields)
if jsonFields == nil {
return nil
}

if err == io.EOF {
// create event for last line
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name)
p.logger.Warn(err)
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}
}
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
return offset, err
}
for _, v := range textValues {
offset, err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
}
return offset, nil
}
case map[string]interface{}:
offset, err := p.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
return offset, nil
}
return offset, nil
}

func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
vJSON, err := json.Marshal(jsonFields)
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
Expand All @@ -575,9 +576,9 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH
if err != nil {
err = errors.Wrap(err, "forwardEvent failed")
p.logger.Error(err)
return err
return offset, err
}
return nil
return offset, nil
}

func (p *s3Input) forwardEvent(event beat.Event) error {
Expand Down