Skip to content

Commit

Permalink
feat(aws_s3): add option to use virtual addressing
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Miller <smiller1@coreweave.com>
  • Loading branch information
sam6258 committed Dec 11, 2024
1 parent 1fe4152 commit 3e95f13
Show file tree
Hide file tree
Showing 22 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior.

authors: sam6258
13 changes: 11 additions & 2 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub trait ClientBuilder {

/// Build the client using the given config settings.
fn build(config: &SdkConfig) -> Self::Client;

/// Build the client using the given config settings and path style addressing.
fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client;
}

fn region_provider(
Expand Down Expand Up @@ -168,8 +171,9 @@ pub async fn create_client<T: ClientBuilder>(
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
timeout: &Option<AwsTimeout>,
force_path_style: impl Into<bool>,
) -> crate::Result<T::Client> {
create_client_and_region::<T>(auth, region, endpoint, proxy, tls_options, timeout)
create_client_and_region::<T>(auth, region, endpoint, proxy, tls_options, timeout, force_path_style)
.await
.map(|(client, _)| client)
}
Expand All @@ -182,6 +186,7 @@ pub async fn create_client_and_region<T: ClientBuilder>(
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
timeout: &Option<AwsTimeout>,
force_path_style: impl Into<bool>,
) -> crate::Result<(T::Client, Region)> {
let retry_config = RetryConfig::disabled();

Expand Down Expand Up @@ -239,7 +244,11 @@ pub async fn create_client_and_region<T: ClientBuilder>(

let config = config_builder.build();

Ok((T::build(&config), region))
if force_path_style.into() {
Ok((T::build_and_force_path_style(&config), region))
} else {
Ok((T::build(&config), region))
}
}

#[derive(Snafu, Debug)]
Expand Down
5 changes: 5 additions & 0 deletions src/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ impl ClientBuilder for S3ClientBuilder {
type Client = aws_sdk_s3::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
let config = config::Builder::from(config).build();
aws_sdk_s3::client::Client::from_conf(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
let config = config::Builder::from(config).force_path_style(true).build();
aws_sdk_s3::client::Client::from_conf(config)
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ impl ClientBuilder for SqsClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_sqs::client::Client::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
SqsClientBuilder::build(config)
}
}
5 changes: 5 additions & 0 deletions src/secrets/aws_secrets_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl ClientBuilder for SecretsManagerClientBuilder {
let config = config::Builder::from(config).build();
Client::from_conf(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
SecretsManagerClientBuilder::build(config)
}
}

/// Configuration for the `aws_secrets_manager` secrets backend.
Expand Down Expand Up @@ -63,6 +67,7 @@ impl SecretBackend for AwsSecretsManagerBackend {
&ProxyConfig::default(),
&self.tls,
&None,
false,
)
.await?;

Expand Down
5 changes: 5 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_cloudwatchlogs::client::Client::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
CloudwatchLogsClientBuilder::build(config)
}
}

#[configurable_component]
Expand Down Expand Up @@ -175,6 +179,7 @@ impl CloudwatchLogsSinkConfig {
proxy,
&self.tls,
&None,
false,
)
.await
}
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async fn create_client_test() -> CloudwatchLogsClient {
let endpoint = Some(cloudwatch_address());
let proxy = ProxyConfig::default();

create_client::<CloudwatchLogsClientBuilder>(&auth, region, endpoint, &proxy, &None, &None)
create_client::<CloudwatchLogsClientBuilder>(&auth, region, endpoint, &proxy, &None, &None, false)
.await
.unwrap()
}
Expand Down
5 changes: 5 additions & 0 deletions src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl ClientBuilder for CloudwatchMetricsClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_cloudwatch::client::Client::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
CloudwatchMetricsClientBuilder::build(config)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -178,6 +182,7 @@ impl CloudWatchMetricsSinkConfig {
proxy,
&self.tls,
&None,
false,
)
.await
}
Expand Down
5 changes: 5 additions & 0 deletions src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl ClientBuilder for KinesisFirehoseClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
Self::Client::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
KinesisFirehoseClientBuilder::build(config)
}
}

// AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events
Expand Down Expand Up @@ -108,6 +112,7 @@ impl KinesisFirehoseSinkConfig {
proxy,
&self.base.tls,
&None,
false,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client {
&proxy,
&None,
&None,
false,
)
.await
.unwrap()
Expand Down
5 changes: 5 additions & 0 deletions src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl ClientBuilder for KinesisClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
KinesisClient::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
KinesisClientBuilder::build(config)
}
}

pub const MAX_PAYLOAD_SIZE: usize = 5_000_000;
Expand Down Expand Up @@ -105,6 +109,7 @@ impl KinesisStreamsSinkConfig {
proxy,
&self.base.tls,
&None,
false,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async fn client() -> aws_sdk_kinesis::Client {
&proxy,
&None,
&None,
false,
)
.await
.unwrap()
Expand Down
9 changes: 8 additions & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ pub struct S3SinkConfig {
#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,

/// Specifies which addressing style to use.
///
/// This controls if the bucket name is in the hostname or part of the URL.
#[serde(default = "crate::serde::default_true")]
pub force_path_style: bool,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig {
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -251,7 +258,7 @@ impl S3SinkConfig {
}

pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls, self.force_path_style).await
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() {
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: true,
}
};
let prefix = config.key_prefix.clone();
Expand Down Expand Up @@ -496,6 +497,7 @@ async fn client() -> S3Client {
&proxy,
&tls_options,
&None,
true,
)
.await
.unwrap()
Expand All @@ -522,6 +524,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: true,
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/sinks/aws_s_s/sns/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl SnsSinkConfig {
proxy,
&self.base_config.tls,
&None,
false,
)
.await
}
Expand Down Expand Up @@ -111,6 +112,10 @@ impl ClientBuilder for SnsClientBuilder {
fn build(config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_sns::client::Client::new(config)
}

fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client {
SnsClientBuilder::build(config)
}
}

pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/aws_s_s/sns/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn create_sns_test_client() -> SnsClient {
&proxy,
&None,
&None,
false,
)
.await
.unwrap()
Expand All @@ -59,6 +60,7 @@ async fn create_sqs_test_client() -> SqsClient {
&proxy,
&None,
&None,
false,
)
.await
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_s_s/sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl SqsSinkConfig {
proxy,
&self.base_config.tls,
&None,
false,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_s_s/sqs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn create_test_client() -> SqsClient {
&proxy,
&None,
&None,
false,
)
.await
.unwrap()
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/s3_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,12 @@ pub async fn create_service(
auth: &AwsAuthentication,
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
force_path_style: impl Into<bool>,
) -> crate::Result<S3Service> {
let endpoint = region.endpoint();
let region = region.region();
let client =
create_client::<S3ClientBuilder>(auth, region.clone(), endpoint, proxy, tls_options, &None)
create_client::<S3ClientBuilder>(auth, region.clone(), endpoint, proxy, tls_options, &None, force_path_style)
.await?;
Ok(S3Service::new(client))
}
Expand Down
4 changes: 4 additions & 0 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl AwsS3Config {
proxy,
&self.tls_options,
&None,
true,
)
.await?;

Expand All @@ -254,6 +255,7 @@ impl AwsS3Config {
proxy,
&sqs.tls_options,
&sqs.timeout,
false,
)
.await?;

Expand Down Expand Up @@ -1023,6 +1025,7 @@ mod integration_tests {
&proxy_config,
&None,
&None,
true,
)
.await
.unwrap()
Expand All @@ -1042,6 +1045,7 @@ mod integration_tests {
&proxy_config,
&None,
&None,
false,
)
.await
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl AwsSqsConfig {
&cx.proxy,
&self.tls,
&None,
false,
)
.await
}
Expand Down
9 changes: 9 additions & 0 deletions website/cue/reference/components/sinks/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,15 @@ base: components: sinks: aws_s3: configuration: {
required: false
type: string: default: "%s"
}
force_path_style: {
description: """
Specifies which addressing style to use.
This controls if the bucket name is in the hostname or is part of the URL.
"""
required: false
type: bool: default: true
}
framing: {
description: "Framing configuration."
required: false
Expand Down

0 comments on commit 3e95f13

Please sign in to comment.