diff --git a/design/.gitignore b/design/.gitignore new file mode 100644 index 0000000000..7585238efe --- /dev/null +++ b/design/.gitignore @@ -0,0 +1 @@ +book diff --git a/design/book.toml b/design/book.toml new file mode 100644 index 0000000000..c122b84f0b --- /dev/null +++ b/design/book.toml @@ -0,0 +1,6 @@ +[book] +authors = ["Russell Cohen"] +language = "en" +multilingual = false +src = "src" +title = "AWS Rust SDK Design" diff --git a/design/src/SUMMARY.md b/design/src/SUMMARY.md new file mode 100644 index 0000000000..b0ebab3db0 --- /dev/null +++ b/design/src/SUMMARY.md @@ -0,0 +1,3 @@ +# Summary + +- [Http Operations](./operation.md) diff --git a/design/src/operation.md b/design/src/operation.md new file mode 100644 index 0000000000..fe9bb3b790 --- /dev/null +++ b/design/src/operation.md @@ -0,0 +1,74 @@ +# HTTP-based Operations +The Smithy code generator for Rust (and by extension), the AWS SDK use an `Operation` abstraction to provide a unified +interface for dispatching requests. `Operation`s contain: +* A base HTTP request (with a potentially streaming body) +* A typed property bag of configuration options +* A fully generic response handler + +In the typical case, these configuration options include things like a `CredentialsProvider`, however, they can also be +full middleware layers that will get added by the dispatch stack. + +## Operation Phases +This section details the flow of a request through the SDK until a response is returned to the user. + +### Input Construction + +A customer interacts with the SDK builders to construct an input. The `build()` method on an input returns +an `Operation`. This codifies the base HTTP request & all the configuration and middleware layers required to modify and dispatch the request. + +```rust,ignore +pub struct Operation { + request: Request, + response_handler: Box, +} + +pub struct Request { + base: http::Request, + configuration: PropertyBag, +} +``` + +For most requests, `.build()` will NOT consume the input. A user can call `.build()` multiple times to produce multiple operations from the same input. + +By using a property bag, we can define the `Operation` in Smithy core. AWS specific configuration can be added later in the stack. + +### Operation Construction +In order to construct an operation, the generated code injects appropriate middleware & configuration via the configuration property bag. It does this by reading the configuration properties out of the service +config, copying them as necessary, and loading them into the `Request`: + +```rust,ignore +// This is approximately the generated code, I've cleaned a few things up for readability. +pub fn build(self, config: &dynamodb::config::Config) -> Operation { + let op = BatchExecuteStatement::new(BatchExecuteStatementInput { + statements: self.statements, + }); + let mut request = operation::Request::new( + op.build_http_request() + .map(|body| operation::SdkBody::from(body)), + ); + + use operation::signing_middleware::SigningConfigExt; + request + .config + .insert_signingconfig(SigningConfig::default_config( + auth::ServiceConfig { + service: config.signing_service().into(), + region: config.region.clone().into(), + }, + auth::RequestConfig { + request_ts: || std::time::SystemTime::now(), + }, + )); + use operation::signing_middleware::CredentialProviderExt; + request + .config + .insert_credentials_provider(config.credentials_provider.clone()); + + use operation::endpoint::EndpointProviderExt; + request + .config + .insert_endpoint_provider(config.endpoint_provider.clone()); + + Operation::new(request, op) +} +``` diff --git a/rust-runtime/smithy-http/Cargo.toml b/rust-runtime/smithy-http/Cargo.toml index 7dd147d099..77e2cc76d2 100644 --- a/rust-runtime/smithy-http/Cargo.toml +++ b/rust-runtime/smithy-http/Cargo.toml @@ -6,6 +6,9 @@ edition = "2018" [dependencies] smithy-types = { path = "../smithy-types" } +bytes = "1" +http-body = "0.4.0" +http = "0.2.3" [dev-dependencies] proptest = "0.10.1" diff --git a/rust-runtime/smithy-http/src/body.rs b/rust-runtime/smithy-http/src/body.rs new file mode 100644 index 0000000000..dea1386d92 --- /dev/null +++ b/rust-runtime/smithy-http/src/body.rs @@ -0,0 +1,94 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use bytes::Bytes; +use http::{HeaderMap, HeaderValue}; +use std::error::Error; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type BodyError = Box; + +/// SdkBody type +/// +/// This is the Body used for dispatching all HTTP Requests. +/// For handling responses, the type of the body will be controlled +/// by the HTTP stack. +/// +/// TODO: Consider renaming to simply `Body`, although I'm concerned about naming headaches +/// between hyper::Body and our Body +pub enum SdkBody { + Once(Option), + // TODO: tokio::sync::mpsc based streaming body +} + +impl SdkBody { + fn poll_inner(&mut self) -> Poll>> { + match self { + SdkBody::Once(ref mut opt) => { + let data = opt.take(); + match data { + Some(bytes) => Poll::Ready(Some(Ok(bytes))), + None => Poll::Ready(None), + } + } + } + } + + /// If possible, return a reference to this body as `&[u8]` + /// + /// If this SdkBody is NOT streaming, this will return the byte slab + /// If this SdkBody is streaming, this will return `None` + pub fn bytes(&self) -> Option<&[u8]> { + match self { + SdkBody::Once(Some(b)) => Some(&b), + SdkBody::Once(None) => Some(&[]), + // In the future, streaming variants will return `None` + } + } + + pub fn try_clone(&self) -> Option { + match self { + SdkBody::Once(bytes) => Some(SdkBody::Once(bytes.clone())), + } + } +} + +impl From<&str> for SdkBody { + fn from(s: &str) -> Self { + SdkBody::Once(Some(Bytes::copy_from_slice(s.as_bytes()))) + } +} + +impl From for SdkBody { + fn from(bytes: Bytes) -> Self { + SdkBody::Once(Some(bytes)) + } +} + +impl From> for SdkBody { + fn from(data: Vec) -> Self { + Self::from(Bytes::from(data)) + } +} + +impl http_body::Body for SdkBody { + type Data = Bytes; + type Error = BodyError; + + fn poll_data( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_inner() + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} diff --git a/rust-runtime/smithy-http/src/lib.rs b/rust-runtime/smithy-http/src/lib.rs index a708878c89..58474adf20 100644 --- a/rust-runtime/smithy-http/src/lib.rs +++ b/rust-runtime/smithy-http/src/lib.rs @@ -3,8 +3,10 @@ * SPDX-License-Identifier: Apache-2.0. */ -// TODO: there is no compelling reason to have this be a shared crate—we should vendor this -// module into the individual crates pub mod base64; +pub mod body; pub mod label; +pub mod operation; +pub mod property_bag; pub mod query; +pub mod response; diff --git a/rust-runtime/smithy-http/src/operation.rs b/rust-runtime/smithy-http/src/operation.rs new file mode 100644 index 0000000000..2df4e5cb4f --- /dev/null +++ b/rust-runtime/smithy-http/src/operation.rs @@ -0,0 +1,127 @@ +use crate::body::SdkBody; +use crate::property_bag::PropertyBag; +use std::cell::{Ref, RefCell, RefMut}; +use std::rc::Rc; + +pub struct Operation { + request: Request, + response_handler: H, + _retry_policy: R, +} + +impl Operation { + pub fn into_request_response(self) -> (Request, H) { + (self.request, self.response_handler) + } + + pub fn new(request: Request, response_handler: H) -> Self { + Operation { + request, + response_handler, + _retry_policy: (), + } + } +} + +pub struct Request { + /// The underlying HTTP Request + inner: http::Request, + + /// Property bag of configuration options + /// + /// Middleware can read and write from the property bag and use its + /// contents to augment the request (see `Request::augment`) + /// + /// configuration is stored in an `Rc>` to facilitate cloning requests during retries + /// We should consider if this should instead be an `Arc`. I'm not aware of times where + /// we'd need to modify the request concurrently, but perhaps such a thing may some day exist. + configuration: Rc>, +} + +impl Request { + pub fn new(base: http::Request) -> Self { + Request { + inner: base, + configuration: Rc::new(RefCell::new(PropertyBag::new())), + } + } + + pub fn augment( + self, + f: impl FnOnce(http::Request, &mut PropertyBag) -> Result, T>, + ) -> Result { + let inner = { + let configuration: &mut PropertyBag = &mut self.configuration.as_ref().borrow_mut(); + f(self.inner, configuration)? + }; + Ok(Request { + inner, + configuration: self.configuration, + }) + } + + pub fn config_mut(&mut self) -> RefMut<'_, PropertyBag> { + self.configuration.as_ref().borrow_mut() + } + + pub fn config(&self) -> Ref<'_, PropertyBag> { + self.configuration.as_ref().borrow() + } + + pub fn try_clone(&self) -> Option { + let cloned_body = self.inner.body().try_clone()?; + let mut cloned_request = http::Request::builder() + .uri(self.inner.uri().clone()) + .method(self.inner.method()); + *cloned_request + .headers_mut() + .expect("builder has not been modified, headers must be valid") = + self.inner.headers().clone(); + let inner = cloned_request + .body(cloned_body) + .expect("a clone of a valid request should be a valid request"); + Some(Request { + inner, + configuration: self.configuration.clone(), + }) + } + + pub fn into_parts(self) -> (http::Request, Rc>) { + (self.inner, self.configuration) + } +} + +#[cfg(test)] +mod test { + use crate::body::SdkBody; + use crate::operation::Request; + use http::header::{AUTHORIZATION, CONTENT_LENGTH}; + use http::Uri; + + #[test] + fn try_clone_clones_all_data() { + let mut request = Request::new( + http::Request::builder() + .uri(Uri::from_static("http://www.amazon.com")) + .method("POST") + .header(CONTENT_LENGTH, 456) + .header(AUTHORIZATION, "Token: hello") + .body(SdkBody::from("hello world!")) + .expect("valid request"), + ); + request.config_mut().insert("hello"); + let cloned = request.try_clone().expect("request is cloneable"); + + let (request, config) = cloned.into_parts(); + assert_eq!(request.uri(), &Uri::from_static("http://www.amazon.com")); + assert_eq!(request.method(), "POST"); + assert_eq!(request.headers().len(), 2); + assert_eq!( + request.headers().get(AUTHORIZATION).unwrap(), + "Token: hello" + ); + assert_eq!(request.headers().get(CONTENT_LENGTH).unwrap(), "456"); + assert_eq!(request.body().bytes().unwrap(), "hello world!".as_bytes()); + assert_eq!(config.as_ref().borrow().get::<&str>(), Some(&"hello")); + } +} diff --git a/rust-runtime/smithy-http/src/property_bag.rs b/rust-runtime/smithy-http/src/property_bag.rs new file mode 100644 index 0000000000..92b768a726 --- /dev/null +++ b/rust-runtime/smithy-http/src/property_bag.rs @@ -0,0 +1,196 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::fmt; +use std::hash::{BuildHasherDefault, Hasher}; + +type AnyMap = HashMap, BuildHasherDefault>; + +// With TypeIds as keys, there's no need to hash them. They are already hashes +// themselves, coming from the compiler. The IdHasher just holds the u64 of +// the TypeId, and then returns it, instead of doing any bit fiddling. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + #[inline] + fn finish(&self) -> u64 { + self.0 + } + + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } +} + +/// A type map of protocol extensions. +/// +/// `PropertyBag` can be used by `Request` and `Response` to store +/// extra data derived from the underlying protocol. +/// +/// TODO: We should consider if we want to require members of the property to be "resettable" in some +/// way to reset any state prior to a retry. I think this is worth delaying until we need it, but +/// is worth keeping in mind. +#[derive(Default)] +pub struct PropertyBag { + // In http where this property bag is usually empty, this makes sense. We will almost always put + // something in the bag, so we could consider removing the layer of indirection. + // If extensions are never used, no need to carry around an empty HashMap. + // That's 3 words. Instead, this is only 1 word. + map: Option>, +} + +impl PropertyBag { + /// Create an empty `PropertyBag`. + #[inline] + pub fn new() -> PropertyBag { + PropertyBag { map: None } + } + + /// Insert a type into this `PropertyBag`. + /// + /// If a extension of this type already existed, it will + /// be returned. + /// + /// Generally, this method should not be called directly. The best practice is + /// calling this method via an extension trait on `PropertyBag`. + /// + /// # Example + /// + /// ``` + /// # use smithy_http::property_bag::PropertyBag; + /// let mut ext = PropertyBag::new(); + /// #[derive(Debug, Eq, PartialEq)] + /// struct Endpoint(&'static str); + /// assert!(ext.insert(Endpoint("dynamo.amazon.com")).is_none()); + /// assert_eq!(ext.insert(Endpoint("kinesis.amazon.com")), Some(Endpoint("dynamo.amazon.com"))); + /// ``` + pub fn insert(&mut self, val: T) -> Option { + self.map + .get_or_insert_with(|| Box::new(HashMap::default())) + .insert(TypeId::of::(), Box::new(val)) + .and_then(|boxed| { + (boxed as Box) + .downcast() + .ok() + .map(|boxed| *boxed) + }) + } + + /// Get a reference to a type previously inserted on this `PropertyBag`. + /// + /// # Example + /// + /// ``` + /// # use smithy_http::property_bag::PropertyBag; + /// let mut ext = PropertyBag::new(); + /// assert!(ext.get::().is_none()); + /// ext.insert(5i32); + /// + /// assert_eq!(ext.get::(), Some(&5i32)); + /// ``` + pub fn get(&self) -> Option<&T> { + self.map + .as_ref() + .and_then(|map| map.get(&TypeId::of::())) + .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref()) + } + + /// Get a mutable reference to a type previously inserted on this `PropertyBag`. + /// + /// # Example + /// + /// ``` + /// # use smithy_http::property_bag::PropertyBag; + /// let mut ext = PropertyBag::new(); + /// ext.insert(String::from("Hello")); + /// ext.get_mut::().unwrap().push_str(" World"); + /// + /// assert_eq!(ext.get::().unwrap(), "Hello World"); + /// ``` + pub fn get_mut(&mut self) -> Option<&mut T> { + self.map + .as_mut() + .and_then(|map| map.get_mut(&TypeId::of::())) + .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut()) + } + + /// Remove a type from this `PropertyBag`. + /// + /// If a extension of this type existed, it will be returned. + /// + /// # Example + /// + /// ``` + /// # use smithy_http::property_bag::PropertyBag; + /// let mut ext = PropertyBag::new(); + /// ext.insert(5i32); + /// assert_eq!(ext.remove::(), Some(5i32)); + /// assert!(ext.get::().is_none()); + /// ``` + pub fn remove(&mut self) -> Option { + self.map + .as_mut() + .and_then(|map| map.remove(&TypeId::of::())) + .and_then(|boxed| { + (boxed as Box) + .downcast() + .ok() + .map(|boxed| *boxed) + }) + } + + /// Clear the `PropertyBag` of all inserted extensions. + /// + /// # Example + /// + /// ``` + /// # use smithy_http::property_bag::PropertyBag; + /// let mut ext = PropertyBag::new(); + /// ext.insert(5i32); + /// ext.clear(); + /// + /// assert!(ext.get::().is_none()); + /// ``` + #[inline] + pub fn clear(&mut self) { + if let Some(ref mut map) = self.map { + map.clear(); + } + } +} + +impl fmt::Debug for PropertyBag { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PropertyBag").finish() + } +} + +#[test] +fn test_extensions() { + #[derive(Debug, PartialEq)] + struct MyType(i32); + + let mut extensions = PropertyBag::new(); + + extensions.insert(5i32); + extensions.insert(MyType(10)); + + assert_eq!(extensions.get(), Some(&5i32)); + assert_eq!(extensions.get_mut(), Some(&mut 5i32)); + + assert_eq!(extensions.remove::(), Some(5i32)); + assert!(extensions.get::().is_none()); + + assert_eq!(extensions.get::(), None); + assert_eq!(extensions.get(), Some(&MyType(10))); +} diff --git a/rust-runtime/smithy-http/src/response.rs b/rust-runtime/smithy-http/src/response.rs new file mode 100644 index 0000000000..79d72c1d5c --- /dev/null +++ b/rust-runtime/smithy-http/src/response.rs @@ -0,0 +1,79 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use bytes::Bytes; +use http::Response; + +/// `ParseHttpResponse` is a generic trait for parsing structured data from HTTP respsones. +/// +/// It is designed to be nearly infinitely flexible, because `Output` is unconstrained, it can be used to support +/// event streams, S3 streaming responses, regular request-response style operations, as well +/// as any other HTTP-based protocol that we manage to come up with. +/// +/// The split between `parse_unloaded` and `parse_loaded` enables keeping the parsing code pure and sync +/// whenever possible and delegating the process of actually reading the HTTP response to the caller when +/// the required behavior is simply "read to the end." +/// +/// It also enables this critical and core trait to avoid being async, and it makes code that uses +/// the trait easier to test. +pub trait ParseHttpResponse { + /// Output type of the HttpResponse. + /// + /// For request/response style operations, this is typically something like: + /// `Result` + /// + /// For streaming operations, this is something like: + /// `Result, TranscribeStreamingError>` + type Output; + + /// Parse an HTTP request without reading the body. If the body must be provided to proceed, + /// return `None` + /// + /// This exists to serve APIs like S3::GetObject where the body is passed directly into the + /// response and consumed by the client. However, even in the case of S3::GetObject, errors + /// require reading the entire body. + /// + /// This also facilitates `EventStream` and other streaming HTTP protocols by enabling the + /// handler to take ownership of the HTTP response directly. + /// + /// Currently `parse_unloaded` operates on a borrowed HTTP request to enable + /// the caller to provide a raw HTTP response to the caller for inspection after the response is + /// returned. For EventStream-like use cases, the caller can use `mem::swap` to replace + /// the streaming body with an empty body as long as the body implements default. + /// + /// We should consider if this is too limiting & if this should take an owned response instead. + fn parse_unloaded(&self, response: &mut http::Response) -> Option; + + /// Parse an HTTP request from a fully loaded body. This is for standard request/response style + /// APIs like AwsJSON as well as for the error path of most streaming APIs + /// + /// Using an explicit body type of Bytes here is a conscious decision—If you _really_ need + /// to precisely control how the data is loaded into memory, use `parse_unloaded`. + fn parse_loaded(&self, response: &http::Response) -> Self::Output; +} + +/// Convenience Trait for non-streaming APIs +/// +/// `ParseStrictResponse` enables operations that _never_ need to stream the body incrementally to +/// have cleaner implementations. There is a blanket implementation +pub trait ParseStrictResponse { + type Output; + fn parse(&self, response: &Response) -> Self::Output; +} + +impl ParseHttpResponse for T +where + T: ParseStrictResponse, +{ + type Output = T::Output; + + fn parse_unloaded(&self, _response: &mut Response) -> Option { + None + } + + fn parse_loaded(&self, response: &Response) -> Self::Output { + self.parse(response) + } +}