Skip to content

Commit

Permalink
chore(deps): Update aws-sdks to 0.23 and aws supporting crates to 0.53 (
Browse files Browse the repository at this point in the history
#16365)

* chore(deps): Update aws-sdks to 0.23 and aws supporting crates to 0.53

* +lockfil

* +debugging

* fix s3 tests, revert debugging changes

* +newline

* +dont adjust nextest timeout

* +check events

* addressing pr feedback

* move poll_inner into poll_data

* +native-tls over rustls
  • Loading branch information
spencergilbert authored Feb 13, 2023
1 parent ab4aca8 commit f4e90e1
Show file tree
Hide file tree
Showing 21 changed files with 325 additions and 373 deletions.
356 changes: 154 additions & 202 deletions Cargo.lock

Large diffs are not rendered by default.

30 changes: 16 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,22 @@ metrics = "0.20.1"
metrics-tracing-context = { version = "0.12.0", default-features = false }

# AWS - Official SDK
aws-sdk-s3 = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sqs = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatch = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatchlogs = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-elasticsearch = {version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-firehose = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-kinesis = { version = "0.21.0", default-features = false, features = ["native-tls"], optional = true }
aws-types = { version = "0.51.0", default-features = false, features = ["hardcoded-credentials"], optional = true }
aws-sdk-s3 = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sqs = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatch = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatchlogs = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-elasticsearch = {version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-firehose = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-kinesis = { version = "0.23.0", default-features = false, features = ["native-tls"], optional = true }
aws-types = { version = "0.53.0", default-features = false, optional = true }
aws-sigv4 = { version = "0.53.0", default-features = false, features = ["sign-http"], optional = true }
aws-config = { version = "0.51.0", default-features = false, features = ["native-tls"], optional = true }
aws-smithy-async = { version = "0.51.0", default-features = false, optional = true }
aws-smithy-client = { version = "0.51.0", default-features = false, features = ["client-hyper"], optional = true}
aws-smithy-http = { version = "0.51.0", default-features = false, features = ["event-stream"], optional = true }
aws-smithy-http-tower = { version = "0.54.1", default-features = false, optional = true }
aws-smithy-types = { version = "0.51.0", default-features = false, optional = true }
aws-config = { version = "0.53.0", default-features = false, features = ["native-tls"], optional = true }
aws-credential-types = { version = "0.53.0", default-features = false, features = ["hardcoded-credentials"], optional = true }
aws-smithy-async = { version = "0.53.1", default-features = false, optional = true }
aws-smithy-client = { version = "0.53.1", default-features = false, features = ["client-hyper"], optional = true}
aws-smithy-http = { version = "0.53.1", default-features = false, features = ["event-stream"], optional = true }
aws-smithy-http-tower = { version = "0.53.1", default-features = false, optional = true }
aws-smithy-types = { version = "0.53.1", default-features = false, optional = true }

# Azure
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"], optional = true }
Expand Down Expand Up @@ -439,6 +440,7 @@ api-client = [

aws-core = [
"aws-config",
"dep:aws-credential-types",
"dep:aws-sigv4",
"dep:aws-types",
"dep:aws-smithy-async",
Expand Down
1 change: 1 addition & 0 deletions scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
image: docker.io/localstack/localstack-full:0.11.6
environment:
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs
- DEBUG=1
mock-watchlogs:
image: docker.io/luciofranco/mockwatchlogs:latest
mock-ecs:
Expand Down
4 changes: 3 additions & 1 deletion src/aws/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;
use aws_config::{
default_provider::credentials::DefaultCredentialsChain, imds, sts::AssumeRoleProviderBuilder,
};
use aws_types::{credentials::SharedCredentialsProvider, region::Region, Credentials};
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::Credentials;
use aws_types::region::Region;
use serde_with::serde_as;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
Expand Down
196 changes: 100 additions & 96 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod auth;
pub mod region;

use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -10,22 +11,23 @@ use std::time::{Duration, SystemTime};

pub use auth::{AwsAuthentication, ImdsAuthentication};
use aws_config::meta::region::ProvideRegion;
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_sigv4::http_request::{SignableRequest, SigningSettings};
use aws_sigv4::SigningParams;
use aws_smithy_async::rt::sleep::{AsyncSleep, Sleep};
use aws_smithy_client::bounds::SmithyMiddleware;
use aws_smithy_client::erase::{DynConnector, DynMiddleware};
use aws_smithy_client::{Builder, SdkError};
use aws_smithy_http::callback::BodyCallback;
use aws_smithy_http::endpoint::Endpoint;
use aws_smithy_http::event_stream::BoxError;
use aws_smithy_http::body::{BoxBody, SdkBody};
use aws_smithy_http::operation::{Request, Response};
use aws_smithy_types::retry::RetryConfig;
use aws_types::credentials::{ProvideCredentials, SharedCredentialsProvider};
use aws_types::region::Region;
use aws_types::SdkConfig;
use bytes::Bytes;
use http::HeaderMap;
use http_body::Body;
use once_cell::sync::OnceCell;
use pin_project::pin_project;
use regex::RegexSet;
pub use region::RegionOrEndpoint;
use tower::{Layer, Service, ServiceBuilder};
Expand All @@ -41,41 +43,48 @@ pub fn is_retriable_error<T>(error: &SdkError<T>) -> bool {
match error {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) => true,
SdkError::ConstructionFailure(_) => false,
SdkError::ResponseError { err: _, raw } | SdkError::ServiceError { err: _, raw } => {
// This header is a direct indication that we should retry the request. Eventually it'd
// be nice to actually schedule the retry after the given delay, but for now we just
// check that it contains a positive value.
let retry_header = raw.http().headers().get("x-amz-retry-after").is_some();

// Certain 400-level responses will contain an error code indicating that the request
// should be retried. Since we don't retry 400-level responses by default, we'll look
// for these specifically before falling back to more general heuristics. Because AWS
// services use a mix of XML and JSON response bodies and the AWS SDK doesn't give us
// a parsed representation, we resort to a simple string match.
//
// S3: RequestTimeout
// SQS: RequestExpired, ThrottlingException
// ECS: RequestExpired, ThrottlingException
// Kinesis: RequestExpired, ThrottlingException
// Cloudwatch: RequestExpired, ThrottlingException
//
// Now just look for those when it's a client_error
let re = RETRIABLE_CODES.get_or_init(|| {
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException"])
.expect("invalid regex")
});

let status = raw.http().status();
let response_body = String::from_utf8_lossy(raw.http().body().bytes().unwrap_or(&[]));

retry_header
|| status.is_server_error()
|| status == http::StatusCode::TOO_MANY_REQUESTS
|| (status.is_client_error() && re.is_match(response_body.as_ref()))
SdkError::ResponseError(err) => check_response(err.raw()),
SdkError::ServiceError(err) => check_response(err.raw()),
_ => {
warn!("AWS returned an unhandled error, retrying request.");
true
}
}
}

fn check_response(res: &Response) -> bool {
// This header is a direct indication that we should retry the request. Eventually it'd
// be nice to actually schedule the retry after the given delay, but for now we just
// check that it contains a positive value.
let retry_header = res.http().headers().get("x-amz-retry-after").is_some();

// Certain 400-level responses will contain an error code indicating that the request
// should be retried. Since we don't retry 400-level responses by default, we'll look
// for these specifically before falling back to more general heuristics. Because AWS
// services use a mix of XML and JSON response bodies and the AWS SDK doesn't give us
// a parsed representation, we resort to a simple string match.
//
// S3: RequestTimeout
// SQS: RequestExpired, ThrottlingException
// ECS: RequestExpired, ThrottlingException
// Kinesis: RequestExpired, ThrottlingException
// Cloudwatch: RequestExpired, ThrottlingException
//
// Now just look for those when it's a client_error
let re = RETRIABLE_CODES.get_or_init(|| {
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException"])
.expect("invalid regex")
});

let status = res.http().status();
let response_body = String::from_utf8_lossy(res.http().body().bytes().unwrap_or(&[]));

retry_header
|| status.is_server_error()
|| status == http::StatusCode::TOO_MANY_REQUESTS
|| (status.is_client_error() && re.is_match(response_body.as_ref()))
}

pub trait ClientBuilder {
type Config;
type Client;
Expand Down Expand Up @@ -134,7 +143,7 @@ pub async fn resolve_region(region: Option<Region>) -> crate::Result<Region> {
pub async fn create_client<T: ClientBuilder>(
auth: &AwsAuthentication,
region: Option<Region>,
endpoint: Option<Endpoint>,
endpoint: Option<String>,
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
is_sink: bool,
Expand All @@ -152,7 +161,7 @@ pub async fn create_client<T: ClientBuilder>(
.retry_config(retry_config.clone());

if let Some(endpoint_override) = endpoint {
config_builder = config_builder.endpoint_resolver(endpoint_override);
config_builder = config_builder.endpoint_url(endpoint_override);
}

let config = config_builder.build();
Expand Down Expand Up @@ -245,16 +254,31 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: Request) -> Self::Future {
fn call(&mut self, req: Request) -> Self::Future {
// Attach a body callback that will capture the bytes sent by interrogating the body chunks that get read as it
// sends the request out over the wire. We'll read the shared atomic counter, which will contain the number of
// bytes "read", aka the bytes it actually sent, if and only if we get back a successful response.
let maybe_bytes_sent = self.enabled.then(|| {
let (callback, shared_bytes_sent) = BodyCaptureCallback::new();
req.http_mut().body_mut().with_callback(Box::new(callback));

shared_bytes_sent
});
let (req, maybe_bytes_sent) = if self.enabled {
let shared_bytes_sent = Arc::new(AtomicUsize::new(0));
let (request, properties) = req.into_parts();
let (parts, body) = request.into_parts();

let body = {
let shared_bytes_sent = Arc::clone(&shared_bytes_sent);

body.map(move |body| {
let body = MeasuredBody::new(body, Arc::clone(&shared_bytes_sent));
SdkBody::from_dyn(BoxBody::new(body))
})
};

let req = Request::from_parts(http::Request::from_parts(parts, body), properties);

(req, Some(shared_bytes_sent))
} else {
(req, None)
};

let region = self.region.clone();
let fut = self.inner.call(req);
Expand Down Expand Up @@ -283,69 +307,49 @@ where
}
}

struct BodyCaptureCallback {
bytes_sent: usize,
#[pin_project]
struct MeasuredBody {
#[pin]
inner: SdkBody,
shared_bytes_sent: Arc<AtomicUsize>,
}

impl BodyCaptureCallback {
fn new() -> (Self, Arc<AtomicUsize>) {
let shared_bytes_sent = Arc::new(AtomicUsize::new(0));

(
Self {
bytes_sent: 0,
shared_bytes_sent: Arc::clone(&shared_bytes_sent),
},
impl MeasuredBody {
fn new(body: SdkBody, shared_bytes_sent: Arc<AtomicUsize>) -> Self {
Self {
inner: body,
shared_bytes_sent,
)
}
}
}

impl BodyCallback for BodyCaptureCallback {
fn update(&mut self, bytes: &[u8]) -> Result<(), BoxError> {
// This gets called every time a chunk is read from the request body, which includes both static chunks and
// streaming bodies. Just add the chunk's length to our running tally.
self.bytes_sent += bytes.len();
Ok(())
}
impl Body for MeasuredBody {
type Data = Bytes;
type Error = Box<dyn Error + Send + Sync>;

fn trailers(&self) -> Result<Option<headers::HeaderMap<headers::HeaderValue>>, BoxError> {
Ok(None)
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();

fn make_new(&self) -> Box<dyn BodyCallback> {
// We technically don't use retries within the AWS side of the API clients, but we have to satisfy this trait
// method, because `aws_smithy_http` uses the retry layer from `tower`, which clones the request regardless
// before it even executes the first attempt... so there's no reason not to make it technically correct.
Box::new(Self {
bytes_sent: 0,
shared_bytes_sent: Arc::clone(&self.shared_bytes_sent),
})
match this.inner.poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
this.shared_bytes_sent
.fetch_add(data.len(), Ordering::Release);

Poll::Ready(Some(Ok(data)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}

impl Drop for BodyCaptureCallback {
fn drop(&mut self) {
// This is where we actually emit. We specifically emit here, and not in `trailers`, because despite the
// documentation that `trailers` is called after all chunks of the body are successfully read, `hyper` won't
// continue polling a body if it knows it's gotten all the available bytes i.e. it doesn't necessarily drive it
// until `poll_data` returns `None`. This means the only consistent place to know that the body is "done" is
// when it's dropped.
//
// We update our shared atomic counter with the total bytes sent that we accumulated, and it will read the
// atomic if the response indicates that the request was successful. Since we know the body will go out-of-scope
// before a response can possibly be generated, we know the atomic will in turn be updated before it is read.
//
// This design also copes with the fact that, technically, `aws_smithy_client` supports retries and could clone
// this callback for each copy of the request... which it already does at least once per request since the retry
// middleware has to clone the request before trying it. As requests are retried sequentially, only after the
// previous attempt failed, we know that we'll end up in a "last write wins" scenario, so this is still sound.
//
// In the future, we may track every single byte sent in order to generate "raw bytes over the wire, regardless
// of status" metrics, but right now, this is purely "how many bytes have we sent as part of _successful_
// sends?"
self.shared_bytes_sent
.store(self.bytes_sent, Ordering::Release);
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
10 changes: 3 additions & 7 deletions src/aws/region.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::str::FromStr;

use aws_smithy_http::endpoint::Endpoint;
use aws_types::region::Region;
use http::Uri;
use vector_config::configurable_component;

/// Configuration of the region/endpoint to use when interacting with an AWS service.
Expand Down Expand Up @@ -36,9 +32,9 @@ impl RegionOrEndpoint {
}
}

pub fn endpoint(&self) -> crate::Result<Option<Endpoint>> {
let uri = self.endpoint.as_deref().map(Uri::from_str).transpose()?;
Ok(uri.map(Endpoint::immutable))
pub fn endpoint(&self) -> crate::Result<Option<String>> {
let endpoint = self.endpoint.clone();
Ok(endpoint)
}

pub fn region(&self) -> Option<Region> {
Expand Down
6 changes: 5 additions & 1 deletion src/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ impl ClientBuilder for S3ClientBuilder {
}

fn build(client: aws_smithy_client::Client, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_s3::client::Client::with_config(client, config.into())
let config = aws_sdk_s3::config::Builder::from(config)
.force_path_style(true)
.build();

aws_sdk_s3::client::Client::with_config(client, config)
}
}
Loading

0 comments on commit f4e90e1

Please sign in to comment.