Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of custom endpoints in AWS input #41504

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,16 @@
// options struct.
// Should be provided as a parameter to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
// Apply slightly different endpoint resolvers depending on whether we're in S3 or SQS mode.
if c.NonAWSBucketName != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
} else if c.QueueURL != "" && c.AWSConfig.Endpoint != "" {
//nolint:staticcheck // haven't migrated to the new interface yet
o.EndpointResolver = s3.EndpointResolverFromURL(c.AWSConfig.Endpoint)
}
o.UsePathStyle = c.PathStyle

Expand All @@ -269,6 +272,9 @@
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)

Check failure on line 276 in x-pack/filebeat/input/awss3/config.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: o.EndpointResolver is deprecated: Deprecated: EndpointResolver and WithEndpointResolver. Providing a value for this field will likely prevent you from using any endpoint-related service features released after the introduction of EndpointResolverV2 and BaseEndpoint. (staticcheck)
}
}

func (c config) getFileSelectors() []fileSelectorConfig {
Expand Down
35 changes: 27 additions & 8 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import (
"fmt"
"net/url"
"strings"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

Expand Down Expand Up @@ -48,15 +50,32 @@
return nil, fmt.Errorf("initializing AWS config: %w", err)
}

// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
if config.RegionName != "" {
awsConfig.Region = config.RegionName
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
// Parse a URL for the host regardless of it missing the scheme
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint: %w", err)
}

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint
// If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs
if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
Copy link
Contributor

@strawgate strawgate Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should check to see if s3 is in the URL at all -- as we probably want the same behavior (avoid the EndpointResolver for AWS URLs) for vpce endpoints like vpce.s3....

Copy link
Contributor Author

@faec faec Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that check was removed then it seems like the only fundamental change in this function is adding Source: awssdk.EndpointSourceCustom, to the existing resolver. The big question for me is still whether this whole block can be removed in lieu of the existing endpoint handling in the config modifiers (or vice versa).

Copy link
Contributor

@strawgate strawgate Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is what results in a mutable endpoint when using an endpoint with s3 in the leaf of the endpoint url.

I dont think we should remove the check, I do think it should check for s3 in other parts of the domain name thugh.

I think that both svc.EndpointResolverFromURL and o.endpoint = ... have the same end effect and the awssdk.EndpointResolverWithOptionsFunc force returns the exact endpoint value.

So my guess is as long as those two use-cases remain distinct (when to allow a mutable endpoint hostname), the logic can be combined?

awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

Check failure on line 70 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)
return awssdk.Endpoint{

Check failure on line 71 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awssdk.Endpoint is deprecated: This structure was used with the global [EndpointResolver] interface, which has been deprecated in favor of service-specific endpoint resolution. See the deprecation docs on that interface for more information. (staticcheck)
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
}
}

if config.QueueURL != "" {
Expand Down
168 changes: 168 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunSQSWithConfig(t *testing.T) {
tests := []struct {
name string
queue_url string
endpoint string
region string
default_region string
want string
wantErr error
}{
{
name: "no region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "no region but with long endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "no region but with short endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "no region custom queue domain",
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
want: "us-west-2",
},
{
name: "default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
default_region: "us-west-2",
want: "us-west-2",
},
{
name: "region and default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-east-2",
default_region: "us-east-3",
want: "us-east-2",
},
{
name: "short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://amazonaws.com",
want: "us-east-1",
},
{
name: "long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-east-1",
},
{
name: "region and custom short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://.elastic.co",
want: "us-west-2",
},
{
name: "region and custom long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.elastic.co",
want: "us-west-2",
},
{
name: "region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
}

for _, test := range tests {
logp.TestingSetup()

// Create a filebeat config using the provided test parameters
config := ""
if test.queue_url != "" {
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
}
if test.region != "" {
config += fmt.Sprintf("region: %s \n", test.region)
}
if test.default_region != "" {
config += fmt.Sprintf("default_region: %s \n", test.default_region)
}
if test.endpoint != "" {
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
}

s3Input := createInput(t, conf.MustNewConfigFrom(config))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
time.AfterFunc(5*time.Second, func() {
cancel()
})

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
// assert that err == test.wantErr
if test.wantErr != nil {
continue
}
// Print the test name to help identify the failing test
t.Fatal(test.name, err)
}

// If the endpoint starts with s3, the endpoint resolver should be null at this point
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
// If the endpoint is not set, the endpoint resolver should be null
if test.endpoint == "" {
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else if strings.HasPrefix(test.endpoint, "https://s3") {
// S3 resolvers are added later in the code than this integration test covers
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else { // If the endpoint is specified but is not s3
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
}

assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
}
}

func TestInputRunSQS(t *testing.T) {
logp.TestingSetup()

Expand Down
16 changes: 14 additions & 2 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ func TestRegionSelection(t *testing.T) {
want: "us-west-3",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_no_region_term",
queueURL: "https://sqs.abc.xyz/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
Expand Down Expand Up @@ -118,7 +130,7 @@ func TestRegionSelection(t *testing.T) {
{
name: "non_aws_vpce_without_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
wantErr: errBadQueueURL,
want: "us-east-1",
},
{
name: "non_aws_vpce_with_region_override",
Expand Down
27 changes: 14 additions & 13 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ const (

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL, endpoint string) string {
func getRegionFromQueueURL(queueURL string) string {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
// We use a simple heuristic that works for all essential cases:
// - If queue hostname is sqs.X.*, return region X
// - If queue hostname is X.sqs.Y.*, return region Y
// Hosts that don't follow this convention need the input config to
// specify a custom endpoint and an explicit region.
u, err := url.Parse(queueURL)
if err != nil {
return ""
}
hostSplit := strings.SplitN(u.Hostname(), ".", 5)

// check for sqs queue url
host := strings.SplitN(u.Host, ".", 3)
if len(host) == 3 && host[0] == "sqs" {
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
return host[1]
}
// check for sqs-style queue url
if len(hostSplit) >= 4 && hostSplit[0] == "sqs" {
return hostSplit[1]
}

// check for vpce url
host = strings.SplitN(u.Host, ".", 5)
if len(host) == 5 && host[1] == "sqs" {
if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) {
return host[2]
}
// check for vpce-style url
if len(hostSplit) == 5 && hostSplit[1] == "sqs" {
return hostSplit[2]
}

return ""
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/awss3/sqs_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ func (in *sqsReaderInput) setup(
in.log = inputContext.Logger.With("queue_url", in.config.QueueURL)
in.pipeline = pipeline

in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL)
if in.config.RegionName != "" {
// Configured region always takes precedence
in.awsConfig.Region = in.config.RegionName
} else if in.detectedRegion != "" {
// Only use detected region if there is no explicit region configured.
in.awsConfig.Region = in.detectedRegion
} else if in.config.AWSConfig.DefaultRegion != "" {
// If we can't find anything else, fall back on the default.
in.awsConfig.Region = in.config.AWSConfig.DefaultRegion
} else {
// If we can't get a region from the config or the URL, return an error.
return fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL)
// If we can't find a usable region, return an error
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", errBadQueueURL)
}

in.sqs = &awsSQSAPI{
Expand Down
Loading