diff --git a/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md new file mode 100644 index 00000000000000..5149bebb686aa4 --- /dev/null +++ b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md @@ -0,0 +1,3 @@ +Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior. + +authors: sam6258 diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 775bb4c08a85fb..cda83175acc71e 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -124,6 +124,9 @@ pub trait ClientBuilder { /// Build the client using the given config settings. fn build(config: &SdkConfig) -> Self::Client; + + /// Build the client using the given config settings and path style addressing. + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client; } fn region_provider( @@ -168,8 +171,9 @@ pub async fn create_client( proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, + force_path_style: impl Into, ) -> crate::Result { - create_client_and_region::(auth, region, endpoint, proxy, tls_options, timeout) + create_client_and_region::(auth, region, endpoint, proxy, tls_options, timeout, force_path_style) .await .map(|(client, _)| client) } @@ -182,6 +186,7 @@ pub async fn create_client_and_region( proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, + force_path_style: impl Into, ) -> crate::Result<(T::Client, Region)> { let retry_config = RetryConfig::disabled(); @@ -239,7 +244,11 @@ pub async fn create_client_and_region( let config = config_builder.build(); - Ok((T::build(&config), region)) + if force_path_style.into() { + Ok((T::build_and_force_path_style(&config), region)) + } else { + Ok((T::build(&config), region)) + } } #[derive(Snafu, Debug)] diff --git a/src/common/s3.rs b/src/common/s3.rs index cdb69725b4c662..29c4cbca11cdc3 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -8,6 +8,11 @@ impl ClientBuilder for S3ClientBuilder { type Client = aws_sdk_s3::client::Client; fn build(config: &aws_types::SdkConfig) -> Self::Client { + let config = config::Builder::from(config).build(); + aws_sdk_s3::client::Client::from_conf(config) + } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { let config = config::Builder::from(config).force_path_style(true).build(); aws_sdk_s3::client::Client::from_conf(config) } diff --git a/src/common/sqs.rs b/src/common/sqs.rs index f02aa2f7c4021f..c02fd4ceafef5d 100644 --- a/src/common/sqs.rs +++ b/src/common/sqs.rs @@ -8,4 +8,8 @@ impl ClientBuilder for SqsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sqs::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SqsClientBuilder::build(config) + } } diff --git a/src/secrets/aws_secrets_manager.rs b/src/secrets/aws_secrets_manager.rs index f3c221b42eee0d..0a86456a8506d8 100644 --- a/src/secrets/aws_secrets_manager.rs +++ b/src/secrets/aws_secrets_manager.rs @@ -17,6 +17,10 @@ impl ClientBuilder for SecretsManagerClientBuilder { let config = config::Builder::from(config).build(); Client::from_conf(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SecretsManagerClientBuilder::build(config) + } } /// Configuration for the `aws_secrets_manager` secrets backend. @@ -63,6 +67,7 @@ impl SecretBackend for AwsSecretsManagerBackend { &ProxyConfig::default(), &self.tls, &None, + false, ) .await?; diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 0dc7f909176201..e83c88cc47dabb 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -36,6 +36,10 @@ impl ClientBuilder for CloudwatchLogsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatchlogs::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + CloudwatchLogsClientBuilder::build(config) + } } #[configurable_component] @@ -175,6 +179,7 @@ impl CloudwatchLogsSinkConfig { proxy, &self.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 4e80493adaf88a..296a6af4ab99bb 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -467,7 +467,7 @@ async fn create_client_test() -> CloudwatchLogsClient { let endpoint = Some(cloudwatch_address()); let proxy = ProxyConfig::default(); - create_client::(&auth, region, endpoint, &proxy, &None, &None) + create_client::(&auth, region, endpoint, &proxy, &None, &None, false) .await .unwrap() } diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index 371b1e29e79a49..2e369d1728d8bd 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -122,6 +122,10 @@ impl ClientBuilder for CloudwatchMetricsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatch::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + CloudwatchMetricsClientBuilder::build(config) + } } #[async_trait::async_trait] @@ -178,6 +182,7 @@ impl CloudWatchMetricsSinkConfig { proxy, &self.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 96b032f4b5738c..e796d91dd68ee8 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -41,6 +41,10 @@ impl ClientBuilder for KinesisFirehoseClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { Self::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + KinesisFirehoseClientBuilder::build(config) + } } // AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events @@ -108,6 +112,7 @@ impl KinesisFirehoseSinkConfig { proxy, &self.base.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 00f06301eb1648..26ba4f3f5d6c27 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -261,6 +261,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index aa1896d0c7333a..a95ca18a495d16 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -40,6 +40,10 @@ impl ClientBuilder for KinesisClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { KinesisClient::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + KinesisClientBuilder::build(config) + } } pub const MAX_PAYLOAD_SIZE: usize = 5_000_000; @@ -105,6 +109,7 @@ impl KinesisStreamsSinkConfig { proxy, &self.base.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index 57958858f01220..fe967f3c29e065 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -183,6 +183,7 @@ async fn client() -> aws_sdk_kinesis::Client { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index c1c8067fb8909d..242ae8d0049c1a 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -139,6 +139,12 @@ pub struct S3SinkConfig { #[configurable(derived)] #[serde(default)] pub timezone: Option, + + /// Specifies which addressing style to use. + /// + /// This controls if the bucket name is in the hostname or part of the URL. + #[serde(default = "crate::serde::default_true")] + pub force_path_style: bool, } pub(super) fn default_key_prefix() -> String { @@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig { auth: AwsAuthentication::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: Default::default(), }) .unwrap() } @@ -251,7 +258,7 @@ impl S3SinkConfig { } pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result { - s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await + s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls, self.force_path_style).await } } diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index e86c8956895c61..c099b694681b2a 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } }; let prefix = config.key_prefix.clone(); @@ -496,6 +497,7 @@ async fn client() -> S3Client { &proxy, &tls_options, &None, + true, ) .await .unwrap() @@ -522,6 +524,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } } diff --git a/src/sinks/aws_s_s/sns/config.rs b/src/sinks/aws_s_s/sns/config.rs index 3463443d3639cd..447c11b5c0fc49 100644 --- a/src/sinks/aws_s_s/sns/config.rs +++ b/src/sinks/aws_s_s/sns/config.rs @@ -54,6 +54,7 @@ impl SnsSinkConfig { proxy, &self.base_config.tls, &None, + false, ) .await } @@ -111,6 +112,10 @@ impl ClientBuilder for SnsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sns::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SnsClientBuilder::build(config) + } } pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> { diff --git a/src/sinks/aws_s_s/sns/integration_tests.rs b/src/sinks/aws_s_s/sns/integration_tests.rs index 30b43d8b9b1163..3e46615c9366cf 100644 --- a/src/sinks/aws_s_s/sns/integration_tests.rs +++ b/src/sinks/aws_s_s/sns/integration_tests.rs @@ -38,6 +38,7 @@ async fn create_sns_test_client() -> SnsClient { &proxy, &None, &None, + false, ) .await .unwrap() @@ -59,6 +60,7 @@ async fn create_sqs_test_client() -> SqsClient { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_s_s/sqs/config.rs b/src/sinks/aws_s_s/sqs/config.rs index a936a6badc03f4..8b8afa84e2f8a8 100644 --- a/src/sinks/aws_s_s/sqs/config.rs +++ b/src/sinks/aws_s_s/sqs/config.rs @@ -55,6 +55,7 @@ impl SqsSinkConfig { proxy, &self.base_config.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_s_s/sqs/integration_tests.rs b/src/sinks/aws_s_s/sqs/integration_tests.rs index 3428caa4374fe6..8046b2a7cd4f43 100644 --- a/src/sinks/aws_s_s/sqs/integration_tests.rs +++ b/src/sinks/aws_s_s/sqs/integration_tests.rs @@ -36,6 +36,7 @@ async fn create_test_client() -> SqsClient { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 2f87f3d6754a22..4d2e72f1e689f1 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -363,11 +363,12 @@ pub async fn create_service( auth: &AwsAuthentication, proxy: &ProxyConfig, tls_options: &Option, + force_path_style: impl Into, ) -> crate::Result { let endpoint = region.endpoint(); let region = region.region(); let client = - create_client::(auth, region.clone(), endpoint, proxy, tls_options, &None) + create_client::(auth, region.clone(), endpoint, proxy, tls_options, &None, force_path_style) .await?; Ok(S3Service::new(client)) } diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 988eb55bfcce4a..8a889142efdd9c 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -238,6 +238,7 @@ impl AwsS3Config { proxy, &self.tls_options, &None, + true, ) .await?; @@ -254,6 +255,7 @@ impl AwsS3Config { proxy, &sqs.tls_options, &sqs.timeout, + false, ) .await?; @@ -1023,6 +1025,7 @@ mod integration_tests { &proxy_config, &None, &None, + true, ) .await .unwrap() @@ -1042,6 +1045,7 @@ mod integration_tests { &proxy_config, &None, &None, + false, ) .await .unwrap() diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 3d0583a0076882..a97260e3b2ed53 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -167,6 +167,7 @@ impl AwsSqsConfig { &cx.proxy, &self.tls, &None, + false, ) .await } diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 48bf7eeba61f1c..6af7188cd42129 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -615,6 +615,15 @@ base: components: sinks: aws_s3: configuration: { required: false type: string: default: "%s" } + force_path_style: { + description: """ + Specifies which addressing style to use. + + This controls if the bucket name is in the hostname or is part of the URL. + """ + required: false + type: bool: default: true + } framing: { description: "Framing configuration." required: false