From c656da47ad6674def925c79d3f6121213380507f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 16 Mar 2020 11:00:14 -0400 Subject: [PATCH] Fix _id field in s3 and googlepubsub inputs In #15859 the Elasticsearch output was changed to read from the @metadata._id field when it had been using @metadata.id. The s3 and googlepubsub inputs had both been setting @metadata.id, but were not updated with that change. This updates the s3 and googlepubsub inputs to use `beat.Event#SetID()` rather than creating the metadata object themselves. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/googlepubsub/input.go | 27 ++++++------ x-pack/filebeat/input/s3/input.go | 47 +++++++++++---------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 525d234afd6..21e6e83e863 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -90,6 +90,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix merging of fileset inputs to replace paths and append processors. {pull}16450{16450} - Add queue_url definition in manifest file for aws module. {pull}16640{16640} - Fix issue where autodiscover hints default configuration was not being copied. {pull}16987[16987] +- Fix Elasticsearch `_id` field set by S3 and Google Pub/Sub inputs. {pull}17026[17026] *Heartbeat* diff --git a/x-pack/filebeat/input/googlepubsub/input.go b/x-pack/filebeat/input/googlepubsub/input.go index f2b5811e194..e9f48073d74 100644 --- a/x-pack/filebeat/input/googlepubsub/input.go +++ b/x-pack/filebeat/input/googlepubsub/input.go @@ -193,25 +193,24 @@ func makeTopicID(project, topic string) string { func makeEvent(topicID string, msg *pubsub.Message) beat.Event { id := topicID + "-" + msg.ID - fields := common.MapStr{ - "event": common.MapStr{ - "id": id, - "created": time.Now().UTC(), + event := beat.Event{ + Timestamp: msg.PublishTime.UTC(), + Fields: common.MapStr{ + "event": common.MapStr{ + "id": id, + "created": time.Now().UTC(), + }, + "message": string(msg.Data), }, - "message": string(msg.Data), + Private: msg, } + event.SetID(id) + if len(msg.Attributes) > 0 { - fields.Put("labels", msg.Attributes) + event.PutValue("labels", msg.Attributes) } - return beat.Event{ - Timestamp: msg.PublishTime.UTC(), - Meta: common.MapStr{ - "id": id, - }, - Fields: fields, - Private: msg, - } + return event } func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubsub.Client) (*pubsub.Subscription, error) { diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index d75499b4d11..d10ea10bd63 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -587,33 +587,34 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s } func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { - f := common.MapStr{ - "message": log, - "log": common.MapStr{ - "offset": int64(offset), - "file.path": constructObjectURL(info), - }, - "aws": common.MapStr{ - "s3": common.MapStr{ - "bucket": common.MapStr{ - "name": info.name, - "arn": info.arn}, - "object.key": info.key, + s3Ctx.Inc() + + event := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + "message": log, + "log": common.MapStr{ + "offset": int64(offset), + "file.path": constructObjectURL(info), + }, + "aws": common.MapStr{ + "s3": common.MapStr{ + "bucket": common.MapStr{ + "name": info.name, + "arn": info.arn}, + "object.key": info.key, + }, + }, + "cloud": common.MapStr{ + "provider": "aws", + "region": info.region, }, }, - "cloud": common.MapStr{ - "provider": "aws", - "region": info.region, - }, + Private: s3Ctx, } + event.SetID(objectHash + "-" + fmt.Sprintf("%012d", offset)) - s3Ctx.Inc() - return beat.Event{ - Timestamp: time.Now(), - Fields: f, - Meta: common.MapStr{"id": objectHash + "-" + fmt.Sprintf("%012d", offset)}, - Private: s3Ctx, - } + return event } func constructObjectURL(info s3Info) string {