Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.14] Fix handling of custom Endpoint when using S3 + SQS #39709

Merged
merged 14 commits into from
May 28, 2024
Merged
92 changes: 76 additions & 16 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,38 @@ type s3Input struct {

func newInput(config config, store beater.StateStore) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
if config.RegionName != "" {
awsConfig.Region = config.RegionName
}

if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
// A custom endpoint has been specified!
if config.AWSConfig.Endpoint != "" {

// Parse a URL for the host regardless of it missing the scheme
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint: %w", err)
}

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint
// If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs
if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
}
}

return &s3Input{
Expand Down Expand Up @@ -112,16 +130,23 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
defer cancelInputCtx()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion)
strawgate marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's uncommon to return an error and a value that the caller should use. Typically these are mutually exclusive. You either get an error OR you get values that you should use. I suggest trying to a do a small bit of refactoring to keep with those conventions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faec has refactored basically all of this plugin on main including undoing this but it's too different to backport.

I made a series of integration tests which cover all the various combinations of settings but I'm worried refactoring this might not be worth it given it's all going away soon


// If we can't get a region from anywhere, error out
if err != nil && regionName == "" && in.config.RegionName == "" {
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err)
}
var warn regionMismatchError
if errors.As(err, &warn) {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName

// Ensure we don't overwrite region when getRegionFromURL fails
// Ensure we don't overwrite a user-specified region with a parsed region.
if regionName != "" && in.config.RegionName == "" {
strawgate marked this conversation as resolved.
Show resolved Hide resolved
in.awsConfig.Region = regionName
}

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
Expand Down Expand Up @@ -186,7 +211,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if in.config.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint)
}
}),

queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
Expand All @@ -198,6 +227,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if in.config.AWSConfig.Endpoint != "" {
o.EndpointResolver = s3.EndpointResolverFromURL(in.config.AWSConfig.Endpoint)
}
o.UsePathStyle = in.config.PathStyle
}),
}
Expand Down Expand Up @@ -322,17 +354,45 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}

e, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf(endpoint + " is not a valid URL")
}

if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
endpointSplit := strings.SplitN(e.Host, ".", 3)
// check for sqs queue url

// Parse a user-provided custom endpoint
if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 {
// Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint
endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2]
strawgate marked this conversation as resolved.
Show resolved Hide resolved
if !endpointMatchesQueueUrl {
// We couldn't resolve the URL
// We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning
return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]}
}

region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}

// Parse a standard SQS url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
// handle endpoint with no scheme, handle endpoint with scheme
if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
Expand Down
168 changes: 168 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunSQSWithConfig(t *testing.T) {
tests := []struct {
name string
queue_url string
endpoint string
region string
default_region string
want string
wantErr error
}{
{
name: "no region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "no region but with long endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "no region but with short endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "no region custom queue domain",
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
want: "us-west-2",
},
{
name: "default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
default_region: "us-west-2",
want: "us-west-2",
},
{
name: "region and default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-east-2",
default_region: "us-east-3",
want: "us-east-2",
},
{
name: "short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://amazonaws.com",
want: "us-east-1",
},
{
name: "long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-east-1",
},
{
name: "region and custom short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://.elastic.co",
want: "us-west-2",
},
{
name: "region and custom long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.elastic.co",
want: "us-west-2",
},
{
name: "region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
}

for _, test := range tests {
logp.TestingSetup()

// Create a filebeat config using the provided test parameters
config := ""
if test.queue_url != "" {
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
}
if test.region != "" {
config += fmt.Sprintf("region: %s \n", test.region)
}
if test.default_region != "" {
config += fmt.Sprintf("default_region: %s \n", test.default_region)
}
if test.endpoint != "" {
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
}

s3Input := createInput(t, conf.MustNewConfigFrom(config))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
time.AfterFunc(5*time.Second, func() {
cancel()
})

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
// assert that err == test.wantErr
if test.wantErr != nil {
continue
}
// Print the test name to help identify the failing test
t.Fatal(test.name, err)
}

// If the endpoint starts with s3, the endpoint resolver should be null at this point
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
// If the endpoint is not set, the endpoint resolver should be null
if test.endpoint == "" {
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else if strings.HasPrefix(test.endpoint, "https://s3") {
// S3 resolvers are added later in the code than this integration test covers
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else { // If the endpoint is specified but is not s3
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
}

assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
}
}

func TestInputRunSQS(t *testing.T) {
logp.TestingSetup()

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func TestGetRegionFromQueueURL(t *testing.T) {
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
Expand Down
Loading