Skip to content

Commit

Permalink
aws-s3 input default content-type (#25772) (#25786)
Browse files Browse the repository at this point in the history
* aws-s3 input default content-type

- new option `content_type`
- can be set at input or file selector level
- overrides Content-Type that was given to the S3 object when it was
  uploaded.

Closes #25697

Co-authored-by: kaiyan-sheng <kaiyan.sheng@elastic.co>
(cherry picked from commit d62b1be)

Co-authored-by: Lee Hinman <57081003+leehinman@users.noreply.github.com>
  • Loading branch information
mergify[bot] and leehinman authored May 19, 2021
1 parent e1e8a83 commit c1acdf5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add HMAC signature validation support for http_endpoint input. {pull}24918[24918]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]
- Add monitoring metrics to the `aws-s3` input. {pull}25711[25711]
- Add Content-Type override to aws-s3 input. {issue}25697[25697] {pull}25772[25772]

*Heartbeat*

Expand Down
33 changes: 21 additions & 12 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ seconds. The maximum is half of the visibility timeout value.
The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs. The default is `16 KiB`.

[id="input-{type}-content_type"]
[float]
==== `content_type`

A standard MIME type describing the format of the object data. This
can be set to override the MIME type that was given to the object when
it was uploaded. For example: `application/json`.

[id="input-{type}-encoding"]
[float]
==== `encoding`
Expand Down Expand Up @@ -94,18 +102,19 @@ Content type will not be checked. If a file has "application/json" content-type,
[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that {beatname_uc}
shouldn't process `file_selectors` can be used to limit the files that are
downloaded. This is a list of selectors which are made up of `regex` and
`expand_event_list_from_field` options. The `regex` should match the S3 object
key in the SQS message, and the optional `expand_event_list_from_field` is the
same as the global setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones specified
in the `file_selectors`. Regex syntax is the same as the Go language. Files
that don't match one of the regexes won't be processed.
<<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>>, and <<input-aws-s3-encoding>> may also be set for
each file selector.
If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed. <<input-aws-s3-content_type>>, <<input-aws-s3-multiline>>,
<<input-aws-s3-max_bytes>>, <<input-aws-s3-buffer_size>>, and
<<input-aws-s3-encoding>> may also be set for each file selector.

["source", "yml"]
----
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
bodyReader = bufio.NewReader(gzipReader)
}

if info.readerConfig.ContentType != "" {
*resp.ContentType = info.readerConfig.ContentType
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
decoder := json.NewDecoder(bodyReader)
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type readerConfig struct {
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
ContentType string `config:"content_type"`
}

func (f *readerConfig) Validate() error {
Expand All @@ -82,6 +83,9 @@ func (f *readerConfig) Validate() error {
if f.MaxBytes <= 0 {
return fmt.Errorf("max_bytes <%v> must be greater than 0", f.MaxBytes)
}
if f.ExpandEventListFromField != "" && f.ContentType != "" && f.ContentType != "application/json" {
return fmt.Errorf("content_type must be `application/json` when expand_event_list_from_field is used")
}

return nil
}
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ func TestConfig(t *testing.T) {
"max_bytes <0> must be greater than 0",
nil,
},
{
"error on expand_event_list_from_field and content_type != application/json ",
common.MapStr{
"queue_url": queueURL,
"expand_event_list_from_field": "Records",
"content_type": "text/plain",
},
"content_type must be `application/json` when expand_event_list_from_field is used",
nil,
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit c1acdf5

Please sign in to comment.