Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Pass at an IO layer for the Rust SDK #107

Closed
wants to merge 61 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
4e1db70
Move AWS specific generation into its own folder
rcoh Jan 5, 2021
7d25fba
Initial Pass at an IO layer for the Rust SDK
rcoh Jan 5, 2021
edbcba9
Missing files
rcoh Jan 12, 2021
7f0020d
Cleanup Rust code
rcoh Jan 12, 2021
47d0646
Merge branch 'main' into cred-providers
rcoh Jan 12, 2021
832886a
Refactor operation to split input and reponse handler
rcoh Jan 14, 2021
5304515
Improve parse response traits
rcoh Jan 14, 2021
c5856ec
creds experimentation
rcoh Jan 15, 2021
3c69e44
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Jan 15, 2021
bb50baf
wip
rcoh Jan 15, 2021
39d8e1a
Migrate request to use extensions instead of specific fields
rcoh Jan 15, 2021
a0fedb9
Working Dynamo IT again!
rcoh Jan 17, 2021
39e61d7
more clippy cleanup
rcoh Jan 17, 2021
2a4c2e4
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Jan 17, 2021
ba0adbc
Update IT
rcoh Jan 17, 2021
82cf564
Fixup imports
rcoh Jan 17, 2021
bded4d5
Include the Dynamo IT in tests
rcoh Jan 17, 2021
1523fa5
Generate examples in build artifact
rcoh Jan 19, 2021
c794581
Add DynamoDB Readme
rcoh Jan 19, 2021
ee6bf85
Fix signing
rcoh Jan 20, 2021
b2445f2
Add tracing middleware
rcoh Jan 20, 2021
0b099d9
raise logging level
rcoh Jan 20, 2021
f1f25fd
environment variable provider
rcoh Jan 20, 2021
2e58049
update comment
rcoh Jan 20, 2021
42c2061
go back to defaulting to DDB local
rcoh Jan 20, 2021
56ea4b1
Make requests clonable
rcoh Jan 20, 2021
f2429f0
Use fn once, remove unecessary mut
rcoh Jan 20, 2021
eaad212
Update middleware to operate on owned requests
rcoh Jan 20, 2021
d13a6ed
Add retry policy
rcoh Jan 20, 2021
1fb4cb3
Add region provider from environment
rcoh Jan 21, 2021
07b052b
Lots more improvements
rcoh Jan 22, 2021
83a0e87
continued iteration on the service
rcoh Jan 22, 2021
1f5e85b
retries!
rcoh Jan 22, 2021
a0f7f6c
Working retry support!
rcoh Jan 22, 2021
4d52a52
More cleanups
rcoh Jan 22, 2021
a0081cf
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Jan 25, 2021
9decb96
aws-hyper building again
rcoh Jan 25, 2021
2c6d2c6
Tests passing again
rcoh Jan 25, 2021
cdb0143
get all tests passing again
rcoh Jan 25, 2021
694c16f
Fix tests
rcoh Jan 25, 2021
13699e3
Update lock, fix sigv4 usage
rcoh Jan 25, 2021
28015dc
add ready_and
rcoh Jan 26, 2021
f42f06c
wip
rcoh Jan 27, 2021
382cb86
Split call from call_rwa
rcoh Jan 27, 2021
b3ee27b
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Jan 28, 2021
6925cd4
Split out sig auth crate
rcoh Jan 28, 2021
9f0689d
Add missing aws-sig-auth files
rcoh Jan 28, 2021
4adcaa7
Demo integration with the crucible ITs
rcoh Feb 2, 2021
1a05bc1
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Feb 8, 2021
c07720f
Cleanup code in AWS hyper
rcoh Feb 8, 2021
de2be35
More cleanups
rcoh Feb 8, 2021
c1d466f
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Feb 10, 2021
20e5466
Get working KMS client
rcoh Feb 10, 2021
8fddd4b
Add KMS example
rcoh Feb 10, 2021
0ecae9d
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Feb 11, 2021
bbd5bb1
get the dynamo test compiling again
rcoh Feb 12, 2021
bc9b73d
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Feb 12, 2021
712afbf
Get everything working with new endpoints
rcoh Feb 12, 2021
8737b8e
Cleanup unused imports
rcoh Feb 12, 2021
02529e8
Merge branch 'main' of github.com:awslabs/smithy-rs into cred-providers
rcoh Feb 13, 2021
c6c45a6
working again
rcoh Feb 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working retry support!
  • Loading branch information
rcoh committed Jan 22, 2021
commit a0f7f6c1a95909f855fad9cd5264b682aa03cc4f
1 change: 1 addition & 0 deletions aws/rust-runtime/aws-hyper/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ auth = { path = "../auth" }
http = "0.2.3"
bytes = "1"
http-body = "0.4.0"
tokio = { version = "1", features = ["time"]}
pin-utils = "0.1.0"

[dev-dependencies]
89 changes: 42 additions & 47 deletions aws/rust-runtime/aws-hyper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use bytes::{Buf, Bytes};
use hyper::Client as HyperClient;
use operation::{middleware::OperationError, Operation, ParseHttpResponse, SdkBody};
use std::error::Error;
use tower::{Layer, Service, ServiceBuilder, ServiceExt};
use tower::{Layer, Service, ServiceBuilder};

type SdkSuccess<O> = _SdkSuccess<O, hyper::Body>;
type SdkError<E> = _SdkError<E, hyper::Body>;
@@ -70,8 +70,8 @@ impl<S> Client<S> {
}

fn operation_error<OE, E, B>(o: OperationError<OE>) -> _SdkError<E, B>
where
OE: Into<BoxError>,
where
OE: Into<BoxError>,
{
match o {
OperationError::DispatchError(e) => _SdkError::DispatchFailure(e.into()),
@@ -83,11 +83,11 @@ async fn load_response<B, T, E, O>(
mut response: http::Response<B>,
handler: &O,
) -> Result<_SdkSuccess<T, B>, _SdkError<E, B>>
where
B: http_body::Body + Unpin,
B: From<Bytes>,
B::Error: Error + Send + Sync + 'static,
O: ParseHttpResponse<B, Output = Result<T, E>>,
where
B: http_body::Body + Unpin,
B: From<Bytes>,
B::Error: Error + Send + Sync + 'static,
O: ParseHttpResponse<B, Output=Result<T, E>>,
{
if let Some(parsed_response) = handler.parse_unloaded(&mut response) {
return sdk_result(parsed_response, response);
@@ -113,28 +113,27 @@ pub struct LoadResponseMiddleware<S> {
inner: S,
}

pub struct RetryService<S, R, E> {
inner: S,
retry_strategy: R,
_e: PhantomData<E>,
}

#[derive(Clone)]
struct RetryStrategy {}

impl<Handler: Clone, R: Clone, Response, Error>
tower::retry::Policy<operation::Operation<Handler, R>, Response, Error>
for RetryStrategy where R: RetryPolicy<Response, Error>
tower::retry::Policy<operation::Operation<Handler, R>, Response, Error>
for RetryStrategy where R: RetryPolicy<Response, Error>
{
type Future = std::future::Ready<Self>;
type Future = Pin<Box<dyn Future<Output=Self>>>;

fn retry(
&self,
req: &Operation<Handler, R>,
result: Result<&Response, &Error>,
) -> Option<Self::Future> {
let _ = req.retry_policy.should_retry(result);
None
let _resp = req.retry_policy.should_retry(result)?;
let next = self.clone();
let fut = async move {
tokio::time::sleep(Duration::new(5, 0)).await;
next
};
Some(Box::pin(fut))
}

fn clone_request(
@@ -176,8 +175,8 @@ impl <O, R, S, T, B> tower::Service<operation::Operation<O, R>> for RetryService
pub struct ParseResponseLayer;

impl<S> Layer<S> for ParseResponseLayer
where
S: Service<operation::Request>,
where
S: Service<operation::Request>,
{
type Service = LoadResponseMiddleware<S>;

@@ -187,18 +186,18 @@ where
}

impl<S, O, T, E, B, R, OE> tower::Service<operation::Operation<O, R>> for LoadResponseMiddleware<S>
where
S: Service<operation::Request, Response = http::Response<B>, Error = OperationError<OE>>,
OE: Into<BoxError>,
S::Future: 'static,
O: ParseHttpResponse<B, Output = Result<T, E>> + 'static,
B: http_body::Body + Unpin,
B: From<Bytes>,
B::Error: Error + Send + Sync + 'static,
where
S: Service<operation::Request, Response=http::Response<B>, Error=OperationError<OE>>,
OE: Into<BoxError>,
S::Future: 'static,
O: ParseHttpResponse<B, Output=Result<T, E>> + 'static,
B: http_body::Body + Unpin,
B: From<Bytes>,
B::Error: Error + Send + Sync + 'static,
{
type Response = _SdkSuccess<T, B>;
type Error = _SdkError<E, B>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>>>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(operation_error)
@@ -229,25 +228,25 @@ where
*/

impl<S> Client<S>
where
S: Service<http::Request<SdkBody>, Response = http::Response<hyper::Body>>
where
S: Service<http::Request<SdkBody>, Response=http::Response<hyper::Body>>
+ Send
+ Clone
+ 'static,
S::Error: Into<BoxError> + Send + Sync + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send + Sync + 'static,
S::Future: Send + 'static,
{
pub async fn call<O, R, E, Retry>(
&self,
input: Operation<O, Retry>,
) -> Result<SdkSuccess<R>, SdkError<E>>
where
O: ParseHttpResponse<hyper::Body, Output = Result<R, E>> + Send + Clone + 'static,
Retry: RetryPolicy<SdkSuccess<R>, SdkError<E>> + Send + Clone + 'static,
where
O: ParseHttpResponse<hyper::Body, Output=Result<R, E>> + Send + Clone + 'static,
Retry: RetryPolicy<SdkSuccess<R>, SdkError<E>> + Send + Clone + 'static,
{
let signer = OperationRequestMiddlewareLayer::for_middleware(SigningMiddleware::new());
let endpoint_resolver = OperationRequestMiddlewareLayer::for_middleware(EndpointMiddleware);
let mut inner = self.inner.clone();
let inner = self.inner.clone();
// TODO: reorder to call ready_and on the entire stack
/*let inner = inner
.ready_and()
@@ -268,21 +267,18 @@ where
}
}

use http::Response;
use http_body::Body;
use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;
use middleware_tracing::RawRequestLogging;
use operation::endpoint::EndpointMiddleware;
use operation::middleware::{DispatchLayer, OperationRequestMiddlewareLayer};
use operation::retry_policy::{RetryPolicy, RetryType};
use operation::retry_policy::{RetryPolicy};
use operation::signing_middleware::SigningMiddleware;
use pin_utils::core_reexport::time::Duration;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::retry::{Policy, RetryLayer};
use pin_utils::core_reexport::time::Duration;

async fn read_body<B: http_body::Body>(body: B) -> Result<Vec<u8>, B::Error> {
let mut output = Vec::new();
@@ -335,7 +331,7 @@ mod test {
impl tower::Service<http::Request<SdkBody>> for TestService {
type Response = http::Response<hyper::Body>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@@ -352,8 +348,8 @@ mod test {
struct TestOperationParser;

impl<B> ParseHttpResponse<B> for TestOperationParser
where
B: http_body::Body,
where
B: http_body::Body,
{
type Output = Result<String, String>;

@@ -424,7 +420,6 @@ mod test {

let (svc, rx) = TestService::new(|_req| http::Response::new(hyper::Body::from("hello!")));
let client = Client { inner: svc };
//handle.sen
let resp = client.call(operation).await;
println!("{:?}", resp);
let request = rx.try_recv().unwrap();
1 change: 0 additions & 1 deletion aws/rust-runtime/operation/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
*/

use core::convert::AsRef;
use std::error::Error;
use std::str::FromStr;

use http::uri::Uri;
12 changes: 10 additions & 2 deletions aws/rust-runtime/operation/src/lib.rs
Original file line number Diff line number Diff line change
@@ -105,6 +105,14 @@ impl<H> Operation<H, ()> {
retry_policy: (),
}
}

pub fn with_policy<R>(self, retry_policy: R) -> Operation<H, R> {
Operation {
request: self.request,
response_handler: self.response_handler,
retry_policy,
}
}
}

pub struct Request {
@@ -170,8 +178,8 @@ pub trait ParseStrictResponse {
}

impl<B, T> ParseHttpResponse<B> for T
where
T: ParseStrictResponse,
where
T: ParseStrictResponse,
{
type Output = T::Output;

1 change: 0 additions & 1 deletion aws/rust-runtime/operation/src/signing_middleware.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ use auth::{
SigningConfig,
};
use http::Request;
use std::error::Error;
use std::sync::Arc;
use std::time::SystemTime;
use tower::BoxError;
1 change: 1 addition & 0 deletions aws/sdk/examples/dynamo-helloworld/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 69 additions & 27 deletions aws/sdk/examples/dynamo-helloworld/src/main.rs
Original file line number Diff line number Diff line change
@@ -5,51 +5,90 @@

use std::error::Error;

use dynamodb::{model::{AttributeDefinition, KeySchemaElement, KeyType, ProvisionedThroughput, ScalarAttributeType}, operation::CreateTable};
use operation::endpoint::StaticEndpoint;
use dynamodb::error::ListTablesError;
use dynamodb::output::ListTablesOutput;
use dynamodb::{
model::{
AttributeDefinition, KeySchemaElement, KeyType, ProvisionedThroughput, ScalarAttributeType,
},
operation::CreateTable,
};
use env_logger::Env;
use operation::endpoint::StaticEndpoint;
use operation::retry_policy::{RetryPolicy, RetryType};
use tokio::time::Duration;
use aws_hyper::{_SdkSuccess, _SdkError};

#[derive(Clone)]
struct RetryIfNoTables;
impl<B> RetryPolicy<_SdkSuccess<ListTablesOutput, B>, _SdkError<ListTablesError, B>> for RetryIfNoTables {
fn should_retry(&self, input: Result<&_SdkSuccess<ListTablesOutput, B>, &_SdkError<ListTablesError, B>>) -> Option<RetryType> {
match input {
Ok(list_tables) => {
if list_tables.parsed
.table_names
.as_ref()
.map(|t| t.len())
.unwrap_or_default()
== 0
{
Some(RetryType::Explicit(Duration::new(5, 0)))
} else {
None
}
}
_ => None,
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init_from_env(Env::default().default_filter_or("info"));
let config = dynamodb::Config::builder()
.region("us-east-1")
// To load credentials from environment variables, delete this line
.credentials_provider(auth::Credentials::from_static("<fill me in>", "<fill me in>"))
.credentials_provider(auth::Credentials::from_static(
"<fill me in2>",
"<fill me in>",
))
// To use real DynamoDB, delete this line:
.endpoint_provider(StaticEndpoint::from_uri(http::Uri::from_static("http://localhost:8000")))
.endpoint_provider(StaticEndpoint::from_uri(http::Uri::from_static(
"http://localhost:8000",
)))
.build();
let client = aws_hyper::Client::default().with_tracing();
let list_tables = dynamodb::operation::ListTables::builder().build(&config);
let list_tables = dynamodb::operation::ListTables::builder()
.build(&config);
// For a custom retry policy:
// .with_policy(RetryIfNoTables);

let response = client.call(list_tables).await;
let tables = match response {
Ok(output) => {
output.parsed.table_names.unwrap()
},
Ok(output) => output.parsed.table_names.unwrap(),
Err(e) => panic!("err: {:?}", e),
};
if tables.is_empty() {
let create_table = CreateTable::builder()
.table_name("new_table")
.attribute_definitions(vec![AttributeDefinition::builder()
.attribute_name("ForumName")
.attribute_type(ScalarAttributeType::S)
.build()])
.key_schema(vec![KeySchemaElement::builder()
.attribute_name("ForumName")
.key_type(KeyType::Hash)
.build()])
.provisioned_throughput(
ProvisionedThroughput::builder()
.read_capacity_units(100)
.write_capacity_units(100)
.build(),
)
.build(&config);
.table_name("new_table")
.attribute_definitions(vec![AttributeDefinition::builder()
.attribute_name("ForumName")
.attribute_type(ScalarAttributeType::S)
.build()])
.key_schema(vec![KeySchemaElement::builder()
.attribute_name("ForumName")
.key_type(KeyType::Hash)
.build()])
.provisioned_throughput(
ProvisionedThroughput::builder()
.read_capacity_units(100)
.write_capacity_units(100)
.build(),
)
.build(&config);
match client.call(create_table).await {
Ok(created) => println!("table created! {:#?}", created.parsed),
Err(failed) => println!("failed to create table: {:?}", failed)
Err(failed) => println!("failed to create table: {:?}", failed),
}
}

@@ -58,8 +97,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
let response = client.call(list_tables).await;
match response {
Ok(output) => {
println!("tables: {:?}", output.parsed.table_names.unwrap_or_default());
},
println!(
"tables: {:?}",
output.parsed.table_names.unwrap_or_default()
);
}
Err(e) => panic!("err: {:?}", e.error()),
};