diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 198d8d70381..2beb702253d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Log bad handshake details when websocket connection fails {pull}41300[41300] - Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] - Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244] +- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index d80108590ce..6f485431ddf 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -7,6 +7,7 @@ package awss3 import ( "errors" "fmt" + "net/url" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -106,6 +107,13 @@ func (c *config) Validate() error { if c.ProviderOverride != "" && c.NonAWSBucketName == "" { return errors.New("provider can only be overridden when polling non-AWS S3 services") } + if c.AWSConfig.Endpoint != "" { + // Make sure the given endpoint can be parsed + _, err := url.Parse(c.AWSConfig.Endpoint) + if err != nil { + return fmt.Errorf("failed to parse endpoint: %w", err) + } + } if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" { return errors.New("backup to non-AWS bucket can only be used for non-AWS sources") } @@ -245,14 +253,18 @@ func (c config) getBucketARN() string { // options struct. // Should be provided as a parameter to s3.NewFromConfig. func (c config) s3ConfigModifier(o *s3.Options) { - if c.NonAWSBucketName != "" { - //nolint:staticcheck // haven't migrated to the new interface yet - o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint} - } - if c.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + // Apply slightly different endpoint resolvers depending on whether we're in S3 or SQS mode. + if c.AWSConfig.Endpoint != "" { + //nolint:staticcheck // haven't migrated to the new interface yet + o.EndpointResolver = s3.EndpointResolverFromURL(c.AWSConfig.Endpoint, + func(e *awssdk.Endpoint) { + // The S3 hostname is immutable in bucket polling mode, mutable otherwise. + e.HostnameImmutable = (c.getBucketARN() != "") + }) + } o.UsePathStyle = c.PathStyle o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) { @@ -269,6 +281,9 @@ func (c config) sqsConfigModifier(o *sqs.Options) { if c.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + if c.AWSConfig.Endpoint != "" { + o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint) + } } func (c config) getFileSelectors() []fileSelectorConfig { diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index f0fa3137974..6d62f454c42 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -7,8 +7,6 @@ package awss3 import ( "fmt" - awssdk "github.com/aws/aws-sdk-go-v2/aws" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/feature" @@ -48,15 +46,10 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { return nil, fmt.Errorf("initializing AWS config: %w", err) } - if config.AWSConfig.Endpoint != "" { - // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { - return awssdk.Endpoint{ - PartitionID: "aws", - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, - }, nil - }) + // The awsConfig now contains the region from the credential profile or default region + // if the region is explicitly set in the config, then it wins + if config.RegionName != "" { + awsConfig.Region = config.RegionName } if config.QueueURL != "" { diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 0e998034e01..87c199dc246 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -267,6 +267,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } +func TestInputRunSQSWithConfig(t *testing.T) { + tests := []struct { + name string + queue_url string + endpoint string + region string + default_region string + want string + wantErr error + }{ + { + name: "no region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + want: "us-east-1", + }, + { + name: "no region but with long endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, + { + name: "no region but with short endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "no region custom queue domain", + queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs", + wantErr: errBadQueueURL, + }, + { + name: "region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + want: "us-west-2", + }, + { + name: "default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + default_region: "us-west-2", + want: "us-west-2", + }, + { + name: "region and default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-east-2", + default_region: "us-east-3", + want: "us-east-2", + }, + { + name: "short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://amazonaws.com", + want: "us-east-1", + }, + { + name: "long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-east-1", + }, + { + name: "region and custom short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://.elastic.co", + want: "us-west-2", + }, + { + name: "region and custom long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.elastic.co", + want: "us-west-2", + }, + { + name: "region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + } + + for _, test := range tests { + logp.TestingSetup() + + // Create a filebeat config using the provided test parameters + config := "" + if test.queue_url != "" { + config += fmt.Sprintf("queue_url: %s \n", test.queue_url) + } + if test.region != "" { + config += fmt.Sprintf("region: %s \n", test.region) + } + if test.default_region != "" { + config += fmt.Sprintf("default_region: %s \n", test.default_region) + } + if test.endpoint != "" { + config += fmt.Sprintf("endpoint: %s \n", test.endpoint) + } + + s3Input := createInput(t, conf.MustNewConfigFrom(config)) + + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(5*time.Second, func() { + cancel() + }) + + var errGroup errgroup.Group + errGroup.Go(func() error { + return s3Input.Run(inputCtx, &fakePipeline{}) + }) + + if err := errGroup.Wait(); err != nil { + // assert that err == test.wantErr + if test.wantErr != nil { + continue + } + // Print the test name to help identify the failing test + t.Fatal(test.name, err) + } + + // If the endpoint starts with s3, the endpoint resolver should be null at this point + // If the endpoint does not start with s3, the endpointresolverwithoptions should be set + // If the endpoint is not set, the endpoint resolver should be null + if test.endpoint == "" { + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else if strings.HasPrefix(test.endpoint, "https://s3") { + // S3 resolvers are added later in the code than this integration test covers + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else { // If the endpoint is specified but is not s3 + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } + + assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name) + } +} + func TestInputRunSQS(t *testing.T) { logp.TestingSetup() diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index 432bd360bfc..4a2160e5800 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -88,8 +88,20 @@ func TestRegionSelection(t *testing.T) { want: "us-west-3", }, { - name: "abc.xyz_and_domain_with_blank_endpoint", + name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme", queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_matching_url_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_no_region_term", + queueURL: "https://sqs.abc.xyz/627959692251/test-s3-logs", wantErr: errBadQueueURL, }, { @@ -118,7 +130,7 @@ func TestRegionSelection(t *testing.T) { { name: "non_aws_vpce_without_endpoint", queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue", - wantErr: errBadQueueURL, + want: "us-east-1", }, { name: "non_aws_vpce_with_region_override", diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 9901d5fe41d..a4865022850 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -110,12 +110,3 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { } return "unknown" } - -type nonAWSBucketResolver struct { - endpoint string -} - -func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) { - //nolint:staticcheck // haven't migrated to the new interface yet - return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil -} diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 36985f73720..b41468d2ac9 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -31,29 +31,30 @@ const ( var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") -func getRegionFromQueueURL(queueURL, endpoint string) string { +func getRegionFromQueueURL(queueURL string) string { // get region from queueURL + // Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue + // We use a simple heuristic that works for all essential cases: + // - If queue hostname is sqs.X.*, return region X + // - If queue hostname is X.sqs.Y.*, return region Y + // Hosts that don't follow this convention need the input config to + // specify a custom endpoint and an explicit region. u, err := url.Parse(queueURL) if err != nil { return "" } + hostSplit := strings.SplitN(u.Hostname(), ".", 5) - // check for sqs queue url - host := strings.SplitN(u.Host, ".", 3) - if len(host) == 3 && host[0] == "sqs" { - if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) { - return host[1] - } + // check for sqs-style queue url + if len(hostSplit) >= 4 && hostSplit[0] == "sqs" { + return hostSplit[1] } - // check for vpce url - host = strings.SplitN(u.Host, ".", 5) - if len(host) == 5 && host[1] == "sqs" { - if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) { - return host[2] - } + // check for vpce-style url + if len(hostSplit) == 5 && hostSplit[1] == "sqs" { + return hostSplit[2] } return "" diff --git a/x-pack/filebeat/input/awss3/sqs_input.go b/x-pack/filebeat/input/awss3/sqs_input.go index a4308af45a8..596586c7569 100644 --- a/x-pack/filebeat/input/awss3/sqs_input.go +++ b/x-pack/filebeat/input/awss3/sqs_input.go @@ -88,14 +88,19 @@ func (in *sqsReaderInput) setup( in.log = inputContext.Logger.With("queue_url", in.config.QueueURL) in.pipeline = pipeline - in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) + in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL) if in.config.RegionName != "" { + // Configured region always takes precedence in.awsConfig.Region = in.config.RegionName } else if in.detectedRegion != "" { + // Only use detected region if there is no explicit region configured. in.awsConfig.Region = in.detectedRegion + } else if in.config.AWSConfig.DefaultRegion != "" { + // If we can't find anything else, fall back on the default. + in.awsConfig.Region = in.config.AWSConfig.DefaultRegion } else { - // If we can't get a region from the config or the URL, return an error. - return fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL) + // If we can't find a usable region, return an error + return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", errBadQueueURL) } in.sqs = &awsSQSAPI{