Skip to content

Commit

Permalink
[AWS] Support VPC endpoint for aws-s3 input SQS queue url (#38189)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored Mar 19, 2024
1 parent 15d6fd7 commit 4a3da80
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Update CEL mito extensions to v1.9.0 to add keys/values helper. {pull}37971[37971]
- Add logging for cache processor file reads and writes. {pull}38052[38052]
- Add parseDateInTZ value template for the HTTPJSON input {pull}37738[37738]
- Support VPC endpoint for aws-s3 input SQS queue url. {pull}38189[38189]
- Improve rate limit handling by HTTPJSON {issue}36207[36207] {pull}38161[38161] {pull}38237[38237]
- Add parseDateInTZ value template for the HTTPJSON input. {pull}37738[37738]
- Add support for complex event objects in the HTTP Endpoint input. {issue}37910[37910] {pull}38193[38193]
Expand Down
24 changes: 20 additions & 4 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,26 +317,42 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
return s3Poller, nil
}

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")
var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
// get region from queueURL
// Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}
if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
if len(queueHostSplit) == 3 {
// check for sqs queue url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}
} else if defaultRegion != "" {
}

// check for vpce url
queueHostSplitVPC := strings.SplitN(u.Host, ".", 5)
if len(queueHostSplitVPC) == 5 && queueHostSplitVPC[1] == "sqs" {
if queueHostSplitVPC[4] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplitVPC[4], "amazonaws.")) {
region = queueHostSplitVPC[2]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}
}

if defaultRegion != "" {
return defaultRegion, nil
}
}
Expand Down
39 changes: 7 additions & 32 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package awss3

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,40 +75,16 @@ func TestGetRegionFromQueueURL(t *testing.T) {
wantErr: errBadQueueURL,
},
{
name: "abc.xyz_and_domain_with_different_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "googlecloud.com",
wantErr: errBadQueueURL,
},
{
name: "mismatch_regions_no_default",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
name: "vpce_endpoint",
queueURL: "https://vpce-test.sqs.us-east-2.vpce.amazonaws.com/12345678912/sqs-queue",
deflt: "",
want: "us-east-1",
},
{
name: "mismatch_regions",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
deflt: "ap-west-1",
want: "ap-west-1",
wantErr: regionMismatchError{queueURLRegion: "us-east-1", defaultRegion: "ap-west-1"},
},
{
name: "localstack",
queueURL: "http://localhost:4566/000000000000/filebeat-s3-integtest-d9clk9",
deflt: "localstack",
want: "localstack",
want: "us-east-2",
},
{
name: "localstack_sns",
queueURL: "http://localhost:4566/000000000000/filebeat-s3-integtest-sns-d9clk9",
deflt: "localstack_sns",
want: "localstack_sns",
},
{
name: "invalid_queue_url",
queueURL: ":foo",
wantErr: errors.New(":foo is not a valid URL"),
name: "vpce_endpoint_with_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue",
endpoint: "amazonaws.com",
want: "us-east-1",
},
}

Expand Down

0 comments on commit 4a3da80

Please sign in to comment.