From 008f2a8850b8e803b1cae8f681c454ec3579a03e Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 14 Jan 2020 14:45:52 -0700 Subject: [PATCH 1/2] [Filebeat] Handle error message in handleS3Objects function (#15545) * Handle error message in handleS3Objects function * remove s3Context.Fail and use setError and done instead * Add changelog (cherry picked from commit 2228af4835068db988c9571562f575313db70c80) --- CHANGELOG.next.asciidoc | 4 ++++ x-pack/filebeat/input/s3/input.go | 28 ++++++++++++++++------------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9c5d720558a..a4ead1451b3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -71,6 +71,10 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767] - Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225] - Fix session reset detection and a crash in Netflow input. {pull}14904[14904] +- Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545] +- netflow: Allow for options templates without scope fields. {pull}15449[15449] +- netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] +- netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 3ef20ee1041..eef16d918fa 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -232,6 +232,7 @@ func (p *s3Input) Wait() { func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) { var wg sync.WaitGroup numMessages := len(messages) + p.logger.Debugf("Processing %v messages", numMessages) wg.Add(numMessages * 2) // process messages received from sqs @@ -251,14 +252,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } + p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos)) // read from s3 object and create event for each log line err = p.handleS3Objects(svcS3, s3Infos, errC) if err != nil { err = errors.Wrap(err, "handleS3Objects failed") p.logger.Error(err) - errC <- err + return } + p.logger.Debugf("handleS3Objects succeed") } func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) { @@ -288,13 +291,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } return case <-time.After(time.Duration(visibilityTimeout/2) * time.Second): + p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated") // If half of the set visibilityTimeout passed and this is // still ongoing, then change visibility timeout. err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) } } } @@ -370,8 +374,11 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC // read from s3 object reader, err := p.newS3BucketReader(svc, s3Info) if err != nil { - return errors.Wrap(err, "newS3BucketReader failed") + err = errors.Wrap(err, "newS3BucketReader failed") + s3Context.setError(err) + return err } + if reader == nil { continue } @@ -382,7 +389,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) if err != nil { err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil @@ -403,12 +410,14 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil } else if err != nil { - return errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + s3Context.setError(err) + return err } // create event per log line @@ -417,7 +426,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } } @@ -610,11 +619,6 @@ func s3ObjectHash(s3Info s3Info) string { return prefix[:10] } -func (c *s3Context) Fail(err error) { - c.setError(err) - c.done() -} - func (c *s3Context) setError(err error) { // only care about the last error for now // TODO: add "Typed" error to error for context From 8b612adc45774d9b8281d5cc024a19cb8c472ea6 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 14 Jan 2020 14:48:10 -0700 Subject: [PATCH 2/2] update changelog --- CHANGELOG.next.asciidoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a4ead1451b3..4c7b1c89f09 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -72,9 +72,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225] - Fix session reset detection and a crash in Netflow input. {pull}14904[14904] - Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545] -- netflow: Allow for options templates without scope fields. {pull}15449[15449] -- netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] -- netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] *Heartbeat*