Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-s3 input default content-type #25772

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]
- Add monitoring metrics to the `aws-s3` input. {pull}25711[25711]
- Add Content-Type override to aws-s3 input. {issue}25697[25697] {pull}25772[25772]

*Heartbeat*

Expand Down
33 changes: 21 additions & 12 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ seconds. The maximum is half of the visibility timeout value.
The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs. The default is `16 KiB`.

[id="input-{type}-content_type"]
[float]
==== `content_type`

A standard MIME type describing the format of the object data. This
can be set to override the MIME type that was given to the object when
it was uploaded. For example: `application/json`.

[id="input-{type}-encoding"]
[float]
==== `encoding`
Expand Down Expand Up @@ -94,18 +102,19 @@ Content type will not be checked. If a file has "application/json" content-type,
[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that {beatname_uc}
shouldn't process `file_selectors` can be used to limit the files that are
downloaded. This is a list of selectors which are made up of `regex` and
`expand_event_list_from_field` options. The `regex` should match the S3 object
key in the SQS message, and the optional `expand_event_list_from_field` is the
same as the global setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones specified
in the `file_selectors`. Regex syntax is the same as the Go language. Files
that don't match one of the regexes won't be processed.
<<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>>, and <<input-aws-s3-encoding>> may also be set for
each file selector.
If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed. <<input-aws-s3-content_type>>, <<input-aws-s3-multiline>>,
<<input-aws-s3-max_bytes>>, <<input-aws-s3-buffer_size>>, and
<<input-aws-s3-encoding>> may also be set for each file selector.

["source", "yml"]
----
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
bodyReader = bufio.NewReader(gzipReader)
}

if info.readerConfig.ContentType != "" {
*resp.ContentType = info.readerConfig.ContentType
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
decoder := json.NewDecoder(bodyReader)
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type readerConfig struct {
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
ContentType string `config:"content_type"`
}

func (f *readerConfig) Validate() error {
Expand All @@ -82,6 +83,9 @@ func (f *readerConfig) Validate() error {
if f.MaxBytes <= 0 {
return fmt.Errorf("max_bytes <%v> must be greater than 0", f.MaxBytes)
}
if f.ExpandEventListFromField != "" && f.ContentType != "" && f.ContentType != "application/json" {
return fmt.Errorf("content_type must be `application/json` when expand_event_list_from_field is used")
}

return nil
}
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ func TestConfig(t *testing.T) {
"max_bytes <0> must be greater than 0",
nil,
},
{
"error on expand_event_list_from_field and content_type != application/json ",
common.MapStr{
"queue_url": queueURL,
"expand_event_list_from_field": "Records",
"content_type": "text/plain",
},
"content_type must be `application/json` when expand_event_list_from_field is used",
nil,
},
}

for _, tc := range testCases {
Expand Down