From 18d67611f257aa31c7ab3a53e09bb1f3a93c2ab3 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 8 Nov 2023 20:00:31 +0800 Subject: [PATCH 01/12] feat: introduce volo-http crate Signed-off-by: Gwo Tzu-Hsing Signed-off-by: Zheng Li <875543533@qq.com> Signed-off-by: bobozhengsir Signed-off-by: Yu Li --- Cargo.lock | 83 +++++++++++-- Cargo.toml | 2 + examples/Cargo.toml | 12 +- examples/src/http/http.rs | 96 +++++++++++++++ volo-grpc/Cargo.toml | 2 +- volo-http/Cargo.toml | 31 ++++- volo-http/src/dispatch.rs | 67 +++++++++++ volo-http/src/extract.rs | 42 +++++++ volo-http/src/handler.rs | 116 +++++++++++++++++++ volo-http/src/layer.rs | 93 +++++++++++++++ volo-http/src/lib.rs | 72 ++++++++++++ volo-http/src/param.rs | 51 ++++++++ volo-http/src/request.rs | 111 ++++++++++++++++++ volo-http/src/response.rs | 137 ++++++++++++++++++++++ volo-http/src/route.rs | 238 ++++++++++++++++++++++++++++++++++++++ volo-http/src/server.rs | 143 +++++++++++++++++++++++ 16 files changed, 1286 insertions(+), 10 deletions(-) create mode 100644 examples/src/http/http.rs create mode 100644 volo-http/src/dispatch.rs create mode 100644 volo-http/src/extract.rs create mode 100644 volo-http/src/handler.rs create mode 100644 volo-http/src/layer.rs create mode 100644 volo-http/src/param.rs create mode 100644 volo-http/src/request.rs create mode 100644 volo-http/src/response.rs create mode 100644 volo-http/src/route.rs create mode 100644 volo-http/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index e5a54bc7..fb09360b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,11 +528,16 @@ version = "0.0.0" dependencies = [ "anyhow", "async-stream", + "bytes", + "http", + "hyper 1.0.0-rc.3", "lazy_static", "metainfo", + "motore", "pilota", "rustls", "rustls-pemfile", + "serde", "tokio", "tokio-rustls", "tokio-stream", @@ -541,6 +546,7 @@ dependencies = [ "volo", "volo-gen", "volo-grpc", + "volo-http", "volo-thrift", ] @@ -833,6 +839,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92445bc9cc14bfa0a3ce56817dc3b5bcc227a168781a356b702410789cec0d10" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body 1.0.0-rc.2", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -863,7 +892,7 @@ dependencies = [ "futures-util", "h2", "http", - "http-body", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -875,6 +904,28 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body 1.0.0-rc.2", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "tracing", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -883,7 +934,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http", - "hyper", + "hyper 0.14.27", "rustls", "tokio", "tokio-rustls", @@ -895,7 +946,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.27", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -1882,8 +1933,8 @@ dependencies = [ "futures-util", "h2", "http", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-rustls", "ipnet", "js-sys", @@ -2816,8 +2867,8 @@ dependencies = [ "h2", "hex", "http", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-timeout", "matchit", "metainfo", @@ -2839,6 +2890,24 @@ dependencies = [ [[package]] name = "volo-http" version = "0.0.0" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body-util", + "hyper 1.0.0-rc.3", + "matchit", + "mime", + "motore", + "parking_lot 0.12.1", + "pin-project", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", + "volo", +] [[package]] name = "volo-macros" diff --git a/Cargo.toml b/Cargo.toml index bd1ed275..50975deb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ linkedbytes = "0.1" linked-hash-map = "0.5" log = "0.4" matchit = "0.7" +mime = "0.3" mur3 = "0.1" nix = "0.27" nom = "7" @@ -86,6 +87,7 @@ regex = "1" run_script = "0.10" same-file = "1" serde = "1" +serde_json = "1" serde_yaml = "0.9" socket2 = "0.5" syn = "1" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 6195c2fd..06877f88 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -72,12 +72,21 @@ path = "src/unknown/thrift_server.rs" name = "unknown-thrift-client" path = "src/unknown/thrift_client.rs" +[[bin]] +name = "http" +path = "src/http/http.rs" + [dependencies] anyhow.workspace = true async-stream.workspace = true +bytes.workspace = true +http.workspace = true +hyper = { version = "1.0.0-rc.3", features = ["server", "http1", "http2"] } lazy_static.workspace = true metainfo.workspace = true +motore.workspace = true +serde.workspace = true tokio = { workspace = true, features = ["full"] } tokio-stream.workspace = true tracing.workspace = true @@ -87,6 +96,7 @@ pilota.workspace = true volo = { path = "../volo" } volo-grpc = { path = "../volo-grpc" } volo-thrift = { path = "../volo-thrift" } +volo-http = { path = "../volo-http" } volo-gen = { path = "./volo-gen" } @@ -96,4 +106,4 @@ rustls-pemfile = { workspace = true, optional = true } tokio-rustls = { workspace = true, optional = true } [features] -tls = ["librustls", "rustls-pemfile", "tokio-rustls", "volo/rustls", "volo-grpc/rustls"] \ No newline at end of file +tls = ["librustls", "rustls-pemfile", "tokio-rustls", "volo/rustls", "volo-grpc/rustls"] diff --git a/examples/src/http/http.rs b/examples/src/http/http.rs new file mode 100644 index 00000000..8b7f7b6a --- /dev/null +++ b/examples/src/http/http.rs @@ -0,0 +1,96 @@ +use std::{convert::Infallible, net::SocketAddr}; + +use bytes::Bytes; +use http::{Method, Response, StatusCode, Uri}; +use hyper::body::Incoming; +use motore::{service::service_fn, timeout::TimeoutLayer}; +use serde::{Deserialize, Serialize}; +use volo_http::{ + handler::HandlerService, + request::Json, + route::{Route, Router, ServiceLayerExt}, + server::Server, + HttpContext, +}; + +async fn hello( + _cx: &mut HttpContext, + _request: Incoming, +) -> Result, Infallible> { + Ok(Response::new("hello, world\n")) +} + +async fn echo(cx: &mut HttpContext, _request: Incoming) -> Result, Infallible> { + if let Some(echo) = cx.params.get("echo") { + return Ok(Response::new(echo.clone())); + } + Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Bytes::new()) + .unwrap()) +} + +#[derive(Serialize, Deserialize, Debug)] +struct Person { + name: String, + age: u8, + phones: Vec, +} + +async fn json( + _cx: &mut HttpContext, + Json(request): Json, +) -> Result, Infallible> { + let first_phone = request + .phones + .get(0) + .map(|p| p.as_str()) + .unwrap_or("no number"); + println!( + "{} is {} years old, {}'s first phone number is {}", + request.name, request.age, request.name, first_phone + ); + Ok(Response::new(())) +} + +async fn test( + u: Uri, + m: Method, + Json(request): Json, +) -> Result<&'static str, (StatusCode, &'static str)> { + println!("{u:?}"); + println!("{m:?}"); + println!("{request:?}"); + if u.to_string().ends_with("a") { + Ok("a") // http://localhost:3000/test?a=a + } else { + Err((StatusCode::BAD_REQUEST, "b")) // http://localhost:3000/test?a=bb + } +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + let app = Router::new() + .route( + "/", + Route::builder() + .get(service_fn(hello)) + .build() + .layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))), + ) + .route("/:echo", Route::builder().get(service_fn(echo)).build()) + .route("/user", Route::builder().post(service_fn(json)).build()) + .route( + "/test", + Route::builder() + .get(HandlerService::new(test)) + .post(HandlerService::new(test)) + .build(), + ) + .layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))); + + let addr: SocketAddr = "[::]:9091".parse().unwrap(); + let addr = volo::net::Address::from(addr); + + Server::new(app).run(addr).await.unwrap(); +} diff --git a/volo-grpc/Cargo.toml b/volo-grpc/Cargo.toml index e2b0a970..16cc0db9 100644 --- a/volo-grpc/Cargo.toml +++ b/volo-grpc/Cargo.toml @@ -66,4 +66,4 @@ tracing-subscriber.workspace = true default = [] rustls = ["tokio-rustls"] -native-tls = ["tokio-native-tls"] \ No newline at end of file +native-tls = ["tokio-native-tls"] diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index 1f384e27..0987660d 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -10,8 +10,37 @@ description = "HTTP framework implementation of volo." documentation = "https://docs.rs/volo-http" readme = "README.md" categories = ["asynchronous", "network-programming", "web-programming"] -keywords = ["async", "http"] +keywords = ["async", "rpc", "http"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[badges] +maintenance = { status = "actively-developed" } + [dependencies] +volo = { version = "0.8", path = "../volo" } +motore.workspace = true + +hyper = { version = "=1.0.0-rc.3", features = ["server", "http1", "http2"] } +tokio = { workspace = true, features = [ + "time", + "macros", + "rt", + "signal", + "parking_lot", +] } +http.workspace = true +http-body-util = "=0.1.0-rc.2" +matchit.workspace = true +tracing.workspace = true +futures-util.workspace = true +pin-project.workspace = true +bytes.workspace = true +serde_json.workspace = true +thiserror.workspace = true +mime.workspace = true +serde.workspace = true +parking_lot.workspace = true + +[dev-dependencies] +serde = { version = "1", features = ["derive"] } diff --git a/volo-http/src/dispatch.rs b/volo-http/src/dispatch.rs new file mode 100644 index 00000000..89be3936 --- /dev/null +++ b/volo-http/src/dispatch.rs @@ -0,0 +1,67 @@ +use std::marker::PhantomData; + +use http::Response; +use hyper::body::Incoming; + +use crate::{request::FromRequest, response::RespBody, DynError, HttpContext}; + +pub(crate) struct DispatchService { + inner: S, + _marker: PhantomData<(IB, OB)>, +} + +impl DispatchService { + pub(crate) fn new(service: S) -> Self { + Self { + inner: service, + _marker: PhantomData, + } + } +} + +impl Clone for DispatchService +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _marker: PhantomData, + } + } +} + +unsafe impl Send for DispatchService where S: Send {} + +unsafe impl Sync for DispatchService where S: Sync {} + +impl motore::Service for DispatchService +where + S: motore::Service> + Send + Sync + 'static, + S::Error: std::error::Error + Send + Sync + 'static, + OB: Into, + IB: FromRequest + Send, +{ + type Response = Response; + + type Error = DynError; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + match IB::from(&*cx, req).await { + Ok(body) => self + .inner + .call(cx, body) + .await + .map(|resp| { + let (parts, body) = resp.into_parts(); + Response::from_parts(parts, body.into()) + }) + .map_err(|e| Box::new(e) as DynError), + Err(response) => Ok(response), + } + } +} diff --git a/volo-http/src/extract.rs b/volo-http/src/extract.rs new file mode 100644 index 00000000..c2c58c4a --- /dev/null +++ b/volo-http/src/extract.rs @@ -0,0 +1,42 @@ +use futures_util::Future; +use http::{Method, Response, Uri}; + +use crate::{response::IntoResponse, HttpContext}; + +pub trait FromContext: Sized { + type Rejection: IntoResponse; + fn from_context( + context: &HttpContext, + ) -> impl Future> + Send; +} + +impl FromContext for Option +where + T: FromContext, +{ + type Rejection = Response<()>; // Infallible + + fn from_context( + context: &HttpContext, + ) -> impl Future> + Send { + async move { Ok(T::from_context(context).await.ok()) } + } +} + +impl FromContext for Uri { + type Rejection = Response<()>; // Infallible + + fn from_context( + context: &HttpContext, + ) -> impl Future> + Send { + async move { Ok(context.uri.clone()) } + } +} + +impl FromContext for Method { + type Rejection = Response<()>; + + async fn from_context(context: &HttpContext) -> Result { + Ok(context.method.clone()) + } +} diff --git a/volo-http/src/handler.rs b/volo-http/src/handler.rs new file mode 100644 index 00000000..070ee282 --- /dev/null +++ b/volo-http/src/handler.rs @@ -0,0 +1,116 @@ +use std::{future::Future, marker::PhantomData}; + +use http::Response; +use hyper::body::Incoming; + +use crate::{ + extract::FromContext, + request::FromRequest, + response::{IntoResponse, RespBody}, + HttpContext, +}; + +impl Clone for HandlerService +where + H: Clone, +{ + fn clone(&self) -> Self { + Self { + h: self.h.clone(), + _mark: PhantomData, + } + } +} +pub trait Handler { + fn call( + self, + context: &mut HttpContext, + req: Incoming, + ) -> impl Future> + Send; +} + +macro_rules! impl_handler { + ( + [$($ty:ident),*], $last:ident + ) => { + #[allow(non_snake_case, unused_mut, unused_variables)] + impl Handler<($($ty,)* $last,)> for F + where + F: FnOnce($($ty,)* $last) -> Fut + Clone + Send, + Fut: Future + Send, + $( for<'r> $ty: FromContext + Send + 'r, )* + for<'r> $last: FromRequest + Send + 'r, + Res: IntoResponse, + { + async fn call(self, context: &mut HttpContext, req: Incoming) -> Response { + $( + let $ty = match $ty::from_context(context).await { + Ok(value) => value, + Err(rejection) => return rejection.into_response(), + }; + )* + let $last = match $last::from(context, req).await { + Ok(value) => value, + Err(rejection) => return rejection, + }; + self($($ty,)* $last).await.into_response() + } + } + }; +} + +impl_handler!([], T1); +impl_handler!([T1], T2); +impl_handler!([T1, T2], T3); +impl_handler!([T1, T2, T3], T4); +impl_handler!([T1, T2, T3, T4], T5); +impl_handler!([T1, T2, T3, T4, T5], T6); +impl_handler!([T1, T2, T3, T4, T5, T6], T7); +impl_handler!([T1, T2, T3, T4, T5, T6, T7], T8); +impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8], T9); +impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9], T10); +impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10], T11); +impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11], T12); +impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12], T13); +impl_handler!( + [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13], + T14 +); +impl_handler!( + [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14], + T15 +); +impl_handler!( + [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15], + T16 +); + +pub struct HandlerService { + h: H, + _mark: PhantomData, +} + +impl HandlerService { + pub fn new(h: H) -> Self { + Self { + h, + _mark: PhantomData, + } + } +} + +impl motore::Service for HandlerService +where + for<'r> H: Handler + Clone + Send + Sync + 'r, +{ + type Response = Response; + type Error = http::Error; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + Ok(self.h.clone().call(cx, req).await) + } +} diff --git a/volo-http/src/layer.rs b/volo-http/src/layer.rs new file mode 100644 index 00000000..5b04772f --- /dev/null +++ b/volo-http/src/layer.rs @@ -0,0 +1,93 @@ +use http::{Method, Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::body::{Bytes, Incoming}; + +use crate::HttpContext; + +pub trait Layer { + type Service: motore::Service>; + + fn layer(self, inner: S) -> Self::Service; +} + +pub trait LayerExt { + fn method( + self, + method: Method, + ) -> FilterLayer) -> Result<(), StatusCode>>> + where + Self: Sized, + { + self.filter(Box::new( + move |cx: &mut HttpContext, _: &Request| { + if cx.method == method { + Ok(()) + } else { + Err(StatusCode::METHOD_NOT_ALLOWED) + } + }, + )) + } + + fn filter(self, f: F) -> FilterLayer + where + Self: Sized, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode>, + { + FilterLayer { f } + } +} + +pub struct FilterLayer { + f: F, +} + +impl Layer for FilterLayer +where + S: motore::Service, Response = Response>> + + Send + + Sync + + 'static, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, +{ + type Service = Filter; + + fn layer(self, inner: S) -> Self::Service { + Filter { + service: inner, + f: self.f, + } + } +} + +pub struct Filter { + service: S, + f: F, +} + +impl motore::Service> for Filter +where + S: motore::Service, Response = Response>> + + Send + + Sync + + 'static, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, +{ + type Response = S::Response; + + type Error = S::Error; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Request, + ) -> Result { + if let Err(status) = (self.f)(cx, &req) { + return Ok(Response::builder() + .status(status) + .body(Full::new(Bytes::new())) + .unwrap()); + } + self.service.call(cx, req).await + } +} diff --git a/volo-http/src/lib.rs b/volo-http/src/lib.rs index 8b137891..e80f2c97 100644 --- a/volo-http/src/lib.rs +++ b/volo-http/src/lib.rs @@ -1 +1,73 @@ +#![feature(impl_trait_in_assoc_type)] +pub(crate) mod dispatch; +pub mod extract; +pub mod handler; +pub mod layer; +pub mod param; +pub mod request; +pub mod response; +pub mod route; +pub mod server; + +use std::future::Future; + +use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version}; +use hyper::{ + body::{Body, Incoming}, + Request, Response, +}; +use param::Params; +use volo::net::Address; + +pub type DynError = Box; + +pub struct HttpContext { + pub peer: Address, + pub method: Method, + pub uri: Uri, + pub version: Version, + pub headers: HeaderMap, + pub extensions: Extensions, + + pub params: Params, +} + +#[derive(Clone)] +pub struct MotoreService { + pub peer: Address, + pub inner: S, +} + +impl hyper::service::Service> for MotoreService +where + OB: Body, + S: motore::Service> + Clone, + S::Error: Into, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = impl Future>; + + fn call(&mut self, req: Request) -> Self::Future { + let s = self.inner.clone(); + let peer = self.peer.clone(); + async move { + let (parts, req) = req.into_parts(); + let mut cx = HttpContext { + peer, + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + extensions: parts.extensions, + params: Params { + inner: Vec::with_capacity(0), + }, + }; + s.call(&mut cx, req).await + } + } +} diff --git a/volo-http/src/param.rs b/volo-http/src/param.rs new file mode 100644 index 00000000..f86cf760 --- /dev/null +++ b/volo-http/src/param.rs @@ -0,0 +1,51 @@ +use std::slice::Iter; + +use bytes::{BufMut, Bytes, BytesMut}; + +pub struct Params { + pub(crate) inner: Vec<(Bytes, Bytes)>, +} + +impl From> for Params { + fn from(params: matchit::Params) -> Self { + let mut inner = Vec::with_capacity(params.len()); + let mut capacity = 0; + for (k, v) in params.iter() { + capacity += k.len(); + capacity += v.len(); + } + + let mut buf = BytesMut::with_capacity(capacity); + + for (k, v) in params.iter() { + buf.put(k.as_bytes()); + let k = buf.split().freeze(); + buf.put(v.as_bytes()); + let v = buf.split().freeze(); + inner.push((k, v)); + } + + Self { inner } + } +} + +impl Params { + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn iter(&self) -> Iter<'_, (Bytes, Bytes)> { + self.inner.iter() + } + + pub fn get>(&self, k: K) -> Option<&Bytes> { + self.iter() + .filter(|(ik, _)| ik.as_ref() == k.as_ref()) + .map(|(_, v)| v) + .next() + } +} diff --git a/volo-http/src/request.rs b/volo-http/src/request.rs new file mode 100644 index 00000000..8b30aecf --- /dev/null +++ b/volo-http/src/request.rs @@ -0,0 +1,111 @@ +use bytes::Bytes; +use futures_util::Future; +use http::{header, HeaderMap, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use serde::de::DeserializeOwned; + +use crate::{ + extract::FromContext, + response::{IntoResponse, RespBody}, + HttpContext, +}; + +pub trait FromRequest: Sized { + fn from( + cx: &HttpContext, + body: Incoming, + ) -> impl Future>> + Send; +} + +impl FromRequest for T +where + T: FromContext, +{ + fn from( + cx: &HttpContext, + _body: Incoming, + ) -> impl Future>> + Send { + async move { + match T::from_context(cx).await { + Ok(value) => Ok(value), + Err(rejection) => Err(rejection.into_response()), + } + } + } +} + +impl FromRequest for Incoming { + fn from( + _cx: &HttpContext, + body: Incoming, + ) -> impl Future>> + Send { + async { Ok(body) } + } +} + +pub struct Json(pub T); + +impl FromRequest for Json { + fn from( + cx: &HttpContext, + body: Incoming, + ) -> impl Future>> + Send { + async move { + if !json_content_type(&cx.headers) { + return Err(Response::builder() + .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) + .body(Bytes::new().into()) + .unwrap()); + } + + match body.collect().await { + Ok(body) => { + let body = body.to_bytes(); + match serde_json::from_slice::(body.as_ref()) { + Ok(t) => Ok(Self(t)), + Err(e) => { + tracing::warn!("json serialization error {e}"); + Err(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Bytes::new().into()) + .unwrap()) + } + } + } + Err(e) => { + tracing::warn!("collect body error: {e}"); + Err(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Bytes::new().into()) + .unwrap()) + } + } + } + } +} + +fn json_content_type(headers: &HeaderMap) -> bool { + let content_type = if let Some(content_type) = headers.get(header::CONTENT_TYPE) { + content_type + } else { + return false; + }; + + let content_type = if let Ok(content_type) = content_type.to_str() { + content_type + } else { + return false; + }; + + let mime = if let Ok(mime) = content_type.parse::() { + mime + } else { + return false; + }; + + let is_json_content_type = mime.type_() == "application" + && (mime.subtype() == "json" || mime.suffix().map_or(false, |name| name == "json")); + + is_json_content_type +} diff --git a/volo-http/src/response.rs b/volo-http/src/response.rs new file mode 100644 index 00000000..4cd756c3 --- /dev/null +++ b/volo-http/src/response.rs @@ -0,0 +1,137 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{ready, stream}; +use http::{Response, StatusCode}; +use http_body_util::{Full, StreamBody}; +use hyper::body::{Body, Bytes, Frame}; +use pin_project::pin_project; + +use crate::DynError; + +#[pin_project(project = RespBodyProj)] +pub enum RespBody { + Stream { + #[pin] inner: StreamBody, DynError>> + Send + Sync>>>, + }, + Full { + #[pin] inner: Full, + }, +} + +impl Body for RespBody { + type Data = Bytes; + + type Error = DynError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.project() { + RespBodyProj::Stream { inner } => inner.poll_frame(cx), + RespBodyProj::Full { inner } => { + Poll::Ready(ready!(inner.poll_frame(cx)).map(|result| Ok(result.unwrap()))) + } + } + } +} + +impl From> for RespBody { + fn from(value: Full) -> Self { + Self::Full { inner: value } + } +} + +impl From for RespBody { + fn from(value: Bytes) -> Self { + Self::Full { + inner: Full::new(value), + } + } +} + +impl From for RespBody { + fn from(value: String) -> Self { + Self::Full { + inner: Full::new(value.into()), + } + } +} + +impl From<&'static str> for RespBody { + fn from(value: &'static str) -> Self { + Self::Full { + inner: Full::new(value.into()), + } + } +} + +impl From<()> for RespBody { + fn from(_: ()) -> Self { + Self::Full { + inner: Full::new(Bytes::new()), + } + } +} + +pub trait IntoResponse { + fn into_response(self) -> Response; +} + +impl IntoResponse for Response +where + T: Into, +{ + fn into_response(self) -> Response { + let (parts, body) = self.into_parts(); + Response::from_parts(parts, body.into()) + } +} + +impl IntoResponse for T +where + T: Into, +{ + fn into_response(self) -> Response { + Response::builder() + .status(StatusCode::OK) + .body(self.into()) + .unwrap() + } +} + +impl IntoResponse for Result +where + R: IntoResponse, + E: IntoResponse, +{ + fn into_response(self) -> Response { + match self { + Ok(value) => value.into_response(), + Err(err) => err.into_response(), + } + } +} + +impl IntoResponse for (StatusCode, T) +where + T: IntoResponse, +{ + fn into_response(self) -> Response { + let mut resp = self.1.into_response(); + *resp.status_mut() = self.0; + resp + } +} + +impl IntoResponse for StatusCode { + fn into_response(self) -> Response { + Response::builder() + .status(self) + .body(String::new().into()) + .unwrap() + } +} diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs new file mode 100644 index 00000000..cc9246ae --- /dev/null +++ b/volo-http/src/route.rs @@ -0,0 +1,238 @@ +use http::{Method, Response, StatusCode}; +use http_body_util::Full; +use hyper::body::{Bytes, Incoming}; +use motore::{layer::Layer, Service}; + +use crate::{ + dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext, +}; + +pub type DynService = motore::BoxCloneService, DynError>; + +#[derive(Clone, Default)] +pub struct Router { + inner: matchit::Router, +} + +impl Service for Router { + type Response = Response; + + type Error = DynError; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + if let Ok(matched) = self.inner.at(cx.uri.path()) { + cx.params = matched.params.into(); + matched.value.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::new()).into()) + .unwrap()) + } + } +} + +impl Router { + pub fn new() -> Self { + Default::default() + } + + pub fn route(mut self, uri: R, route: S) -> Self + where + R: Into, + S: Service, Error = DynError> + + Send + + Sync + + Clone + + 'static, + { + if let Err(e) = self.inner.insert(uri, motore::BoxCloneService::new(route)) { + panic!("routing error: {e}"); + } + self + } +} + +pub trait ServiceLayerExt: Sized { + fn layer(self, l: L) -> L::Service + where + L: Layer; +} + +impl ServiceLayerExt for S { + fn layer(self, l: L) -> L::Service + where + L: Layer, + { + Layer::layer(l, self) + } +} + +#[derive(Default, Clone)] +pub struct Route { + options: Option, + get: Option, + post: Option, + put: Option, + delete: Option, + head: Option, + trace: Option, + connect: Option, + patch: Option, +} + +impl Route { + pub fn new() -> Self { + Default::default() + } + + pub fn builder() -> RouteBuilder { + RouteBuilder { route: Self::new() } + } +} + +impl Service for Route { + type Response = Response; + + type Error = DynError; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + match cx.method { + Method::GET => { + if let Some(service) = &self.get { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::POST => { + if let Some(service) = &self.post { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::PUT => { + if let Some(service) = &self.put { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::DELETE => { + if let Some(service) = &self.delete { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::HEAD => { + if let Some(service) = &self.head { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::OPTIONS => { + if let Some(service) = &self.options { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::CONNECT => { + if let Some(service) = &self.connect { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::PATCH => { + if let Some(service) = &self.patch { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + Method::TRACE => { + if let Some(service) = &self.trace { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()) + .unwrap()) + } + } + _ => Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body("".into()) + .unwrap()), + } + } +} + +macro_rules! impl_method_register { + ($( $method:ident ),*) => { + $( + pub fn $method(mut self, handler: S) -> Self + where + S: Service> + + Send + + Sync + + Clone + + 'static, + S::Error: std::error::Error + Send + Sync, + OB: Into + 'static, + IB: FromRequest + Send + 'static, + { + self.route.$method = Some(motore::BoxCloneService::new(DispatchService::new(handler))); + self + } + )+ + }; +} + +pub struct RouteBuilder { + route: Route, +} + +impl RouteBuilder { + impl_method_register!(options, get, post, put, delete, head, trace, connect, patch); + + pub fn build(self) -> Route { + self.route + } +} diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs new file mode 100644 index 00000000..95d33673 --- /dev/null +++ b/volo-http/src/server.rs @@ -0,0 +1,143 @@ +use std::{ + sync::{atomic::Ordering, Arc}, + time::Duration, +}; + +use http::Response; +use hyper::{ + body::{Body, Incoming as BodyIncoming}, + server::conn::http1, +}; +use motore::BoxError; +use tracing::{info, trace}; +use volo::net::{incoming::Incoming, MakeIncoming}; + +use crate::{DynError, HttpContext, MotoreService}; + +#[derive(Clone)] +pub struct Server { + app: App, +} + +impl Server +where + OB: Body + Send + 'static, + OB::Data: Send, + App: motore::Service> + + Clone + + Send + + Sync + + 'static, + App::Error: Into, +{ + pub fn new(app: App) -> Self { + Self { app: app } + } + + pub async fn run(self, mk_incoming: MI) -> Result<(), BoxError> { + let mut incoming = mk_incoming.make_incoming().await?; + info!("[VOLO-HTTP] server start at: {:?}", incoming); + + let (tx, rx) = tokio::sync::watch::channel(()); + let exit_mark = Arc::new(std::sync::atomic::AtomicBool::default()); + + let exit_mark_inner = exit_mark.clone(); + let rx_inner = rx.clone(); + + let service = self; + let handler = tokio::spawn(async move { + let exit_mark = exit_mark_inner.clone(); + loop { + if exit_mark.load(Ordering::Relaxed) { + break Ok(()); + } + match incoming.accept().await { + Ok(Some(conn)) => { + let peer = conn.info.peer_addr.clone().unwrap(); + trace!("[VOLO] accept connection from: {:?}", peer); + + let s = service.clone(); + let mut watch = rx_inner.clone(); + tokio::task::spawn(async move { + let mut http_conn = http1::Builder::new() + .serve_connection(conn, MotoreService { peer, inner: s.app }); + tokio::select! { + _ = watch.changed() => { + tracing::trace!("[VOLO] closing a pending connection"); + // Graceful shutdown. + hyper::server::conn::http1::Connection::graceful_shutdown(Pin::new(&mut http_conn)); + // Continue to poll this connection until shutdown can finish. + let result = http_conn.await; + if let Err(err) = result { + tracing::debug!("[VOLO] connection error: {:?}", err); + } + } + result = &mut http_conn => { + if let Err(err) = result { + tracing::debug!("[VOLO] connection error: {:?}", err); + } + }, + } + }); + } + Ok(None) => break Ok(()), + Err(e) => break Err(Box::new(e)), + } + } + }); + + #[cfg(target_family = "unix")] + { + // graceful shutdown + let mut sigint = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; + let mut sighup = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())?; + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + + // graceful shutdown handler + tokio::select! { + _ = sigint.recv() => {} + _ = sighup.recv() => {} + _ = sigterm.recv() => {} + res = handler => { + match res { + Ok(res) => { + match res { + Ok(()) => {} + Err(e) => return Err(Box::new(e)) + }; + } + Err(e) => return Err(Box::new(e)), + } + } + } + } + + // graceful shutdown handler for windows + #[cfg(target_family = "windows")] + tokio::select! { + _ = tokio::signal::ctrl_c() => {} + res = handler => { + match res { + Ok(res) => { + match res { + Ok(()) => {} + Err(e) => return Err(Box::new(e)) + }; + } + Err(e) => return Err(Box::new(e)), + } + } + } + + // received signal, graceful shutdown now + info!("[VOLO] received signal, gracefully exiting now"); + exit_mark.store(true, Ordering::Relaxed); + drop(rx); + let _ = tx.send(()); + let _ = tokio::time::timeout(Duration::from_secs(5), tx.closed()).await; + Ok(()) + } +} From 53bd6a3fee0398f0c5fe80e9e136cd9c7aa91bae Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 17:16:31 +0800 Subject: [PATCH 02/12] feat: introduce volo-http crate Signed-off-by: Gwo Tzu-Hsing Signed-off-by: Zheng Li <875543533@qq.com> Signed-off-by: bobozhengsir Signed-off-by: Yu Li --- volo-http/src/lib.rs | 47 --------------------------------------- volo-http/src/response.rs | 8 +++++-- volo-http/src/server.rs | 33 +++++++++++++++++++++------ 3 files changed, 32 insertions(+), 56 deletions(-) diff --git a/volo-http/src/lib.rs b/volo-http/src/lib.rs index e80f2c97..c0ca341f 100644 --- a/volo-http/src/lib.rs +++ b/volo-http/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - pub(crate) mod dispatch; pub mod extract; pub mod handler; @@ -10,13 +8,7 @@ pub mod response; pub mod route; pub mod server; -use std::future::Future; - use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version}; -use hyper::{ - body::{Body, Incoming}, - Request, Response, -}; use param::Params; use volo::net::Address; @@ -32,42 +24,3 @@ pub struct HttpContext { pub params: Params, } - -#[derive(Clone)] -pub struct MotoreService { - pub peer: Address, - pub inner: S, -} - -impl hyper::service::Service> for MotoreService -where - OB: Body, - S: motore::Service> + Clone, - S::Error: Into, -{ - type Response = S::Response; - - type Error = S::Error; - - type Future = impl Future>; - - fn call(&mut self, req: Request) -> Self::Future { - let s = self.inner.clone(); - let peer = self.peer.clone(); - async move { - let (parts, req) = req.into_parts(); - let mut cx = HttpContext { - peer, - method: parts.method, - uri: parts.uri, - version: parts.version, - headers: parts.headers, - extensions: parts.extensions, - params: Params { - inner: Vec::with_capacity(0), - }, - }; - s.call(&mut cx, req).await - } - } -} diff --git a/volo-http/src/response.rs b/volo-http/src/response.rs index 4cd756c3..897d2fdb 100644 --- a/volo-http/src/response.rs +++ b/volo-http/src/response.rs @@ -14,10 +14,14 @@ use crate::DynError; #[pin_project(project = RespBodyProj)] pub enum RespBody { Stream { - #[pin] inner: StreamBody, DynError>> + Send + Sync>>>, + #[pin] + inner: StreamBody< + stream::Iter, DynError>> + Send + Sync>>, + >, }, Full { - #[pin] inner: Full, + #[pin] + inner: Full, }, } diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs index 95d33673..6f6558d3 100644 --- a/volo-http/src/server.rs +++ b/volo-http/src/server.rs @@ -3,7 +3,7 @@ use std::{ time::Duration, }; -use http::Response; +use http::{Request, Response}; use hyper::{ body::{Body, Incoming as BodyIncoming}, server::conn::http1, @@ -12,7 +12,7 @@ use motore::BoxError; use tracing::{info, trace}; use volo::net::{incoming::Incoming, MakeIncoming}; -use crate::{DynError, HttpContext, MotoreService}; +use crate::{param::Params, DynError, HttpContext}; #[derive(Clone)] pub struct Server { @@ -31,7 +31,7 @@ where App::Error: Into, { pub fn new(app: App) -> Self { - Self { app: app } + Self { app } } pub async fn run(self, mk_incoming: MI) -> Result<(), BoxError> { @@ -44,7 +44,6 @@ where let exit_mark_inner = exit_mark.clone(); let rx_inner = rx.clone(); - let service = self; let handler = tokio::spawn(async move { let exit_mark = exit_mark_inner.clone(); loop { @@ -56,11 +55,31 @@ where let peer = conn.info.peer_addr.clone().unwrap(); trace!("[VOLO] accept connection from: {:?}", peer); - let s = service.clone(); + let service = self.clone(); let mut watch = rx_inner.clone(); tokio::task::spawn(async move { - let mut http_conn = http1::Builder::new() - .serve_connection(conn, MotoreService { peer, inner: s.app }); + let mut http_conn = http1::Builder::new().serve_connection( + conn, + hyper::service::service_fn(move |req: Request| { + let s = service.clone(); + let peer = peer.clone(); + async move { + let (parts, req) = req.into_parts(); + let mut cx = HttpContext { + peer, + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + extensions: parts.extensions, + params: Params { + inner: Vec::with_capacity(0), + }, + }; + s.app.call(&mut cx, req).await + } + }), + ); tokio::select! { _ = watch.changed() => { tracing::trace!("[VOLO] closing a pending connection"); From e536be604e12d9dc30667f899e45e41b9fa128a8 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Fri, 10 Nov 2023 18:41:30 +0800 Subject: [PATCH 03/12] feat(volo-http): introduce route id The `matchit::Router` cannot be converted to `Iterator`, so using `matchit::Router` is not convenient enough. To solve the problem, we refer to the implementation of `axum` and introduce a `RouteId` as a bridge, the `matchit::Router` only handles some IDs and each ID corresponds to a `DynService`. Signed-off-by: Yu Li --- volo-http/src/route.rs | 52 +++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index cc9246ae..367e70f7 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use http::{Method, Response, StatusCode}; use http_body_util::Full; use hyper::body::{Bytes, Incoming}; @@ -7,11 +9,33 @@ use crate::{ dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext, }; +// The `matchit::Router` cannot be converted to `Iterator`, so using `matchit::Router` +// is not convenient enough. +// +// To solve the problem, we refer to the implementation of `axum` and introduce a `RouteId` as a +// bridge, the `matchit::Router` only handles some IDs and each ID corresponds to a `DynService`. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct RouteId(u32); + +impl RouteId { + fn next() -> Self { + use std::sync::atomic::{AtomicU32, Ordering}; + // `AtomicU64` isn't supported on all platforms + static ID: AtomicU32 = AtomicU32::new(0); + let id = ID.fetch_add(1, Ordering::Relaxed); + if id == u32::MAX { + panic!("Over `u32::MAX` routes created. If you need this, please file an issue."); + } + Self(id) + } +} + pub type DynService = motore::BoxCloneService, DynError>; #[derive(Clone, Default)] pub struct Router { - inner: matchit::Router, + matcher: matchit::Router, + routes: HashMap, } impl Service for Router { @@ -24,15 +48,17 @@ impl Service for Router { cx: &'cx mut HttpContext, req: Incoming, ) -> Result { - if let Ok(matched) = self.inner.at(cx.uri.path()) { - cx.params = matched.params.into(); - matched.value.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::new()).into()) - .unwrap()) + if let Ok(matched) = self.matcher.at(cx.uri.path()) { + if let Some(srv) = self.routes.get(matched.value) { + cx.params = matched.params.into(); + return srv.call(cx, req).await; + } } + + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::new()).into()) + .unwrap()) } } @@ -50,9 +76,13 @@ impl Router { + Clone + 'static, { - if let Err(e) = self.inner.insert(uri, motore::BoxCloneService::new(route)) { - panic!("routing error: {e}"); + let route_id = RouteId::next(); + if let Err(e) = self.matcher.insert(uri, route_id) { + panic!("Insert routing rule failed, error: {e}"); } + self.routes + .insert(route_id, motore::BoxCloneService::new(route)); + self } } From 203d9ff252834343bb07c4f337ffcae2a8995482 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 15 Nov 2023 16:18:32 +0800 Subject: [PATCH 04/12] feat(volo-http): wrap `DynService` with `Route` With tie type `Route`, it can support `Service`, `Handler` and so on. Signed-off-by: Yu Li --- examples/src/http/http.rs | 16 ++- volo-http/src/route.rs | 296 ++++++++++++++++++++------------------ 2 files changed, 168 insertions(+), 144 deletions(-) diff --git a/examples/src/http/http.rs b/examples/src/http/http.rs index 8b7f7b6a..bcea86d3 100644 --- a/examples/src/http/http.rs +++ b/examples/src/http/http.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use volo_http::{ handler::HandlerService, request::Json, - route::{Route, Router, ServiceLayerExt}, + route::{MethodRouter, Router}, server::Server, HttpContext, }; @@ -73,16 +73,22 @@ async fn main() { let app = Router::new() .route( "/", - Route::builder() + MethodRouter::builder() .get(service_fn(hello)) .build() .layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))), ) - .route("/:echo", Route::builder().get(service_fn(echo)).build()) - .route("/user", Route::builder().post(service_fn(json)).build()) + .route( + "/:echo", + MethodRouter::builder().get(service_fn(echo)).build(), + ) + .route( + "/user", + MethodRouter::builder().post(service_fn(json)).build(), + ) .route( "/test", - Route::builder() + MethodRouter::builder() .get(HandlerService::new(test)) .post(HandlerService::new(test)) .build(), diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index 367e70f7..244dc969 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -9,11 +9,11 @@ use crate::{ dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext, }; -// The `matchit::Router` cannot be converted to `Iterator`, so using `matchit::Router` -// is not convenient enough. +// The `matchit::Router` cannot be converted to `Iterator`, so using +// `matchit::Router` is not convenient enough. // // To solve the problem, we refer to the implementation of `axum` and introduce a `RouteId` as a -// bridge, the `matchit::Router` only handles some IDs and each ID corresponds to a `DynService`. +// bridge, the `matchit::Router` only handles some IDs and each ID corresponds to a `MethodRouter`. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct RouteId(u32); @@ -30,12 +30,10 @@ impl RouteId { } } -pub type DynService = motore::BoxCloneService, DynError>; - #[derive(Clone, Default)] pub struct Router { matcher: matchit::Router, - routes: HashMap, + routes: HashMap, } impl Service for Router { @@ -67,65 +65,112 @@ impl Router { Default::default() } - pub fn route(mut self, uri: R, route: S) -> Self + pub fn route(mut self, uri: R, route: MethodRouter) -> Self where R: Into, - S: Service, Error = DynError> - + Send - + Sync - + Clone - + 'static, { let route_id = RouteId::next(); if let Err(e) = self.matcher.insert(uri, route_id) { panic!("Insert routing rule failed, error: {e}"); } - self.routes - .insert(route_id, motore::BoxCloneService::new(route)); + self.routes.insert(route_id, route); self } -} - -pub trait ServiceLayerExt: Sized { - fn layer(self, l: L) -> L::Service - where - L: Layer; -} -impl ServiceLayerExt for S { - fn layer(self, l: L) -> L::Service + pub fn layer(self, l: L) -> Self where - L: Layer, + L: Layer + Clone + Send + Sync + 'static, + L::Service: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, { - Layer::layer(l, self) + let routes = self + .routes + .into_iter() + .map(|(id, route)| { + let route = route.layer(l.clone()); + (id, route) + }) + .collect(); + + Router { + matcher: self.matcher, + routes, + } } } #[derive(Default, Clone)] -pub struct Route { - options: Option, - get: Option, - post: Option, - put: Option, - delete: Option, - head: Option, - trace: Option, - connect: Option, - patch: Option, +pub struct MethodRouter { + options: Option, + get: Option, + post: Option, + put: Option, + delete: Option, + head: Option, + trace: Option, + connect: Option, + patch: Option, } -impl Route { +impl MethodRouter { pub fn new() -> Self { Default::default() } - pub fn builder() -> RouteBuilder { - RouteBuilder { route: Self::new() } + pub fn builder() -> MethodRouterBuilder { + MethodRouterBuilder { route: Self::new() } + } + + pub fn layer(self, l: L) -> Self + where + L: Layer + Clone + Send + Sync + 'static, + L::Service: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + let Self { + options, + get, + post, + put, + delete, + head, + trace, + connect, + patch, + } = self; + + let options = options.map(|r| r.layer(l.clone())); + let get = get.map(|r| r.layer(l.clone())); + let post = post.map(|r| r.layer(l.clone())); + let put = put.map(|r| r.layer(l.clone())); + let delete = delete.map(|r| r.layer(l.clone())); + let head = head.map(|r| r.layer(l.clone())); + let trace = trace.map(|r| r.layer(l.clone())); + let connect = connect.map(|r| r.layer(l.clone())); + let patch = patch.map(|r| r.layer(l.clone())); + + Self { + options, + get, + post, + put, + delete, + head, + trace, + connect, + patch, + } } } -impl Service for Route { +impl Service for MethodRouter { type Response = Response; type Error = DynError; @@ -135,105 +180,39 @@ impl Service for Route { cx: &'cx mut HttpContext, req: Incoming, ) -> Result { - match cx.method { - Method::GET => { - if let Some(service) = &self.get { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } + let handler = match cx.method { + Method::OPTIONS => &self.options, + Method::GET => &self.get, + Method::POST => &self.post, + Method::PUT => &self.put, + Method::DELETE => &self.delete, + Method::HEAD => &self.head, + Method::TRACE => &self.trace, + Method::CONNECT => &self.connect, + Method::PATCH => &self.patch, + _ => { + return Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body("".into()) + .unwrap()); } - Method::POST => { - if let Some(service) = &self.post { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::PUT => { - if let Some(service) = &self.put { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::DELETE => { - if let Some(service) = &self.delete { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::HEAD => { - if let Some(service) = &self.head { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::OPTIONS => { - if let Some(service) = &self.options { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::CONNECT => { - if let Some(service) = &self.connect { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::PATCH => { - if let Some(service) = &self.patch { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - Method::TRACE => { - if let Some(service) = &self.trace { - service.call(cx, req).await - } else { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) - } - } - _ => Ok(Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) + }; + + if let Some(service) = handler { + service.call(cx, req).await + } else { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) .body("".into()) - .unwrap()), + .unwrap()) } } } +pub struct MethodRouterBuilder { + route: MethodRouter, +} + macro_rules! impl_method_register { ($( $method:ident ),*) => { $( @@ -248,21 +227,60 @@ macro_rules! impl_method_register { OB: Into + 'static, IB: FromRequest + Send + 'static, { - self.route.$method = Some(motore::BoxCloneService::new(DispatchService::new(handler))); + self.route.$method = Some(Route::new(DispatchService::new(handler))); self } )+ }; } -pub struct RouteBuilder { - route: Route, -} - -impl RouteBuilder { +impl MethodRouterBuilder { impl_method_register!(options, get, post, put, delete, head, trace, connect, patch); - pub fn build(self) -> Route { + pub fn build(self) -> MethodRouter { self.route } } + +#[derive(Clone)] +pub struct Route(motore::BoxCloneService, DynError>); + +impl Route { + pub fn new(inner: S) -> Self + where + S: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + Route(motore::BoxCloneService::new(inner)) + } + + pub fn layer(self, l: L) -> Self + where + L: Layer, + L::Service: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + Route::new(l.layer(self)) + } +} + +impl Service for Route { + type Response = Response; + + type Error = DynError; + + #[inline] + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + self.0.call(cx, req).await + } +} From f4ac3a32d620cd58c8bbfc42e5200854b952fee3 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 15 Nov 2023 19:23:51 +0800 Subject: [PATCH 05/12] feat(volo-http): make handler more powerful With this commit, there is no need to keep `HttpContext` in handler function. Like axum, anything with `FromContext` or `FromRequest` trait can be used as arguments of a handler. Signed-off-by: Yu Li --- examples/src/http/http.rs | 54 ++++++++++++--------------------------- volo-http/src/extract.rs | 14 ++++++++-- volo-http/src/handler.rs | 12 +++++++++ volo-http/src/param.rs | 1 + volo-http/src/route.rs | 32 ++++++++++++++++++++--- 5 files changed, 71 insertions(+), 42 deletions(-) diff --git a/examples/src/http/http.rs b/examples/src/http/http.rs index bcea86d3..74c8090c 100644 --- a/examples/src/http/http.rs +++ b/examples/src/http/http.rs @@ -1,46 +1,36 @@ -use std::{convert::Infallible, net::SocketAddr}; +use std::net::SocketAddr; use bytes::Bytes; -use http::{Method, Response, StatusCode, Uri}; -use hyper::body::Incoming; -use motore::{service::service_fn, timeout::TimeoutLayer}; -use serde::{Deserialize, Serialize}; +use http::{Method, StatusCode, Uri}; +use motore::timeout::TimeoutLayer; +use serde::Deserialize; use volo_http::{ handler::HandlerService, + param::Params, request::Json, - route::{MethodRouter, Router}, + route::{get, post, MethodRouter, Router}, server::Server, - HttpContext, }; -async fn hello( - _cx: &mut HttpContext, - _request: Incoming, -) -> Result, Infallible> { - Ok(Response::new("hello, world\n")) +async fn hello() -> &'static str { + "hello, world\n" } -async fn echo(cx: &mut HttpContext, _request: Incoming) -> Result, Infallible> { - if let Some(echo) = cx.params.get("echo") { - return Ok(Response::new(echo.clone())); +async fn echo(params: Params) -> Result { + if let Some(echo) = params.get("echo") { + return Ok(echo.clone()); } - Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Bytes::new()) - .unwrap()) + Err(StatusCode::BAD_REQUEST) } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Deserialize, Debug)] struct Person { name: String, age: u8, phones: Vec, } -async fn json( - _cx: &mut HttpContext, - Json(request): Json, -) -> Result, Infallible> { +async fn json(Json(request): Json) { let first_phone = request .phones .get(0) @@ -50,7 +40,6 @@ async fn json( "{} is {} years old, {}'s first phone number is {}", request.name, request.age, request.name, first_phone ); - Ok(Response::new(())) } async fn test( @@ -73,19 +62,10 @@ async fn main() { let app = Router::new() .route( "/", - MethodRouter::builder() - .get(service_fn(hello)) - .build() - .layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))), - ) - .route( - "/:echo", - MethodRouter::builder().get(service_fn(echo)).build(), - ) - .route( - "/user", - MethodRouter::builder().post(service_fn(json)).build(), + get(hello).layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))), ) + .route("/:echo", get(echo)) + .route("/user", post(json)) .route( "/test", MethodRouter::builder() diff --git a/volo-http/src/extract.rs b/volo-http/src/extract.rs index c2c58c4a..909c5aea 100644 --- a/volo-http/src/extract.rs +++ b/volo-http/src/extract.rs @@ -1,7 +1,7 @@ use futures_util::Future; use http::{Method, Response, Uri}; -use crate::{response::IntoResponse, HttpContext}; +use crate::{response::IntoResponse, HttpContext, Params}; pub trait FromContext: Sized { type Rejection: IntoResponse; @@ -34,9 +34,19 @@ impl FromContext for Uri { } impl FromContext for Method { - type Rejection = Response<()>; + type Rejection = Response<()>; // Infallible async fn from_context(context: &HttpContext) -> Result { Ok(context.method.clone()) } } + +impl FromContext for Params { + type Rejection = Response<()>; // Infallible + + fn from_context( + context: &HttpContext, + ) -> impl Future> + Send { + async move { Ok(context.params.clone()) } + } +} diff --git a/volo-http/src/handler.rs b/volo-http/src/handler.rs index 070ee282..7a448ce4 100644 --- a/volo-http/src/handler.rs +++ b/volo-http/src/handler.rs @@ -21,6 +21,7 @@ where } } } + pub trait Handler { fn call( self, @@ -29,6 +30,17 @@ pub trait Handler { ) -> impl Future> + Send; } +impl Handler<()> for F +where + F: FnOnce() -> Fut + Clone + Send, + Fut: Future + Send, + Res: IntoResponse, +{ + async fn call(self, _context: &mut HttpContext, _req: Incoming) -> Response { + self().await.into_response() + } +} + macro_rules! impl_handler { ( [$($ty:ident),*], $last:ident diff --git a/volo-http/src/param.rs b/volo-http/src/param.rs index f86cf760..1a3dd40b 100644 --- a/volo-http/src/param.rs +++ b/volo-http/src/param.rs @@ -2,6 +2,7 @@ use std::slice::Iter; use bytes::{BufMut, Bytes, BytesMut}; +#[derive(Clone)] pub struct Params { pub(crate) inner: Vec<(Bytes, Bytes)>, } diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index 244dc969..c6236da2 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -6,7 +6,11 @@ use hyper::body::{Bytes, Incoming}; use motore::{layer::Layer, Service}; use crate::{ - dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext, + dispatch::DispatchService, + handler::{Handler, HandlerService}, + request::FromRequest, + response::RespBody, + DynError, HttpContext, }; // The `matchit::Router` cannot be converted to `Iterator`, so using @@ -209,11 +213,17 @@ impl Service for MethodRouter { } } +macro_rules! for_all_methods { + ($name:ident) => { + $name!(options, get, post, put, delete, head, trace, connect, patch); + }; +} + pub struct MethodRouterBuilder { route: MethodRouter, } -macro_rules! impl_method_register { +macro_rules! impl_method_register_for_builder { ($( $method:ident ),*) => { $( pub fn $method(mut self, handler: S) -> Self @@ -235,7 +245,7 @@ macro_rules! impl_method_register { } impl MethodRouterBuilder { - impl_method_register!(options, get, post, put, delete, head, trace, connect, patch); + for_all_methods!(impl_method_register_for_builder); pub fn build(self) -> MethodRouter { self.route @@ -284,3 +294,19 @@ impl Service for Route { self.0.call(cx, req).await } } + +macro_rules! impl_method_register { + ($( $method:ident ),*) => { + $( + pub fn $method(h: H) -> MethodRouter + where + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, + { + MethodRouter::builder().$method(HandlerService::new(h)).build() + } + )+ + }; +} + +for_all_methods!(impl_method_register); From 9b54a1fd3f9ad6483df32cc9210a88b0df275298 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 20:39:54 +0800 Subject: [PATCH 06/12] chore(volo-http): bump to hyper 1.0.0 Note that `hyper` v1.0.0-rc4 replaces IO traits from `tokio` with its IO traits in `hyper::rt`. To solve the building problem, we can introduce `TokioIo` from `hyper-util` and wrap the `DefaultIncoming` by the `TokioIo`. Also, the `volo-grpc` uses `hyper` v0.14 with its auto-version (using both http1 and http2) and graceful shutdown, but the latest `hyper` and `hyper-util` cannot use the both features at the same time, so the version of `hyper` in `volo` has not been upgraded. Ref: - https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2 - https://github.com/hyperium/hyper/pull/3013 - https://github.com/hyperium/hyper/issues/2862 Signed-off-by: Yu Li --- Cargo.lock | 106 +++++++++++++++++++++++++++++----------- examples/Cargo.toml | 4 +- volo-http/Cargo.toml | 28 ++++++----- volo-http/src/server.rs | 7 ++- 4 files changed, 99 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb09360b..450816b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -529,8 +529,8 @@ dependencies = [ "anyhow", "async-stream", "bytes", - "http", - "hyper 1.0.0-rc.3", + "http 1.0.0", + "hyper 1.0.1", "lazy_static", "metainfo", "motore", @@ -761,7 +761,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap 2.1.0", "slab", "tokio", @@ -828,6 +847,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -835,30 +865,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] [[package]] name = "http-body" -version = "1.0.0-rc.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http", + "http 1.0.0", ] [[package]] name = "http-body-util" -version = "0.1.0-rc.2" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92445bc9cc14bfa0a3ce56817dc3b5bcc227a168781a356b702410789cec0d10" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http", - "http-body 1.0.0-rc.2", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -890,8 +920,8 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "httparse", "httpdate", @@ -906,24 +936,21 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.0-rc.3" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092" +checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "h2", - "http", - "http-body 1.0.0-rc.2", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", "httparse", "httpdate", "itoa", "pin-project-lite", "tokio", - "tracing", - "want", ] [[package]] @@ -933,7 +960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", + "http 0.2.11", "hyper 0.14.27", "rustls", "tokio", @@ -952,6 +979,26 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca339002caeb0d159cc6e023dff48e199f081e42fa039895c7c6f38b37f2e9d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.0.1", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1931,8 +1978,8 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", + "h2 0.3.22", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-rustls", @@ -2823,7 +2870,7 @@ dependencies = [ "clap", "colored", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.12.0", "lazy_static", "log", "normpath", @@ -2864,9 +2911,9 @@ dependencies = [ "futures", "futures-util", "fxhash", - "h2", + "h2 0.3.22", "hex", - "http", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-timeout", @@ -2893,9 +2940,10 @@ version = "0.0.0" dependencies = [ "bytes", "futures-util", - "http", + "http 1.0.0", "http-body-util", - "hyper 1.0.0-rc.3", + "hyper 1.0.1", + "hyper-util", "matchit", "mime", "motore", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 06877f88..4c321f73 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -81,8 +81,8 @@ path = "src/http/http.rs" anyhow.workspace = true async-stream.workspace = true bytes.workspace = true -http.workspace = true -hyper = { version = "1.0.0-rc.3", features = ["server", "http1", "http2"] } +http = "1" +hyper = { version = "1", features = ["server", "http1", "http2"] } lazy_static.workspace = true metainfo.workspace = true motore.workspace = true diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index 0987660d..1a81ffef 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -19,9 +19,22 @@ maintenance = { status = "actively-developed" } [dependencies] volo = { version = "0.8", path = "../volo" } -motore.workspace = true -hyper = { version = "=1.0.0-rc.3", features = ["server", "http1", "http2"] } +http = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["server", "http1", "http2"] } +hyper-util = { version = "0.1", features = ["tokio"] } + +bytes.workspace = true +futures-util.workspace = true +matchit.workspace = true +mime.workspace = true +motore.workspace = true +parking_lot.workspace = true +pin-project.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = [ "time", "macros", @@ -29,18 +42,7 @@ tokio = { workspace = true, features = [ "signal", "parking_lot", ] } -http.workspace = true -http-body-util = "=0.1.0-rc.2" -matchit.workspace = true tracing.workspace = true -futures-util.workspace = true -pin-project.workspace = true -bytes.workspace = true -serde_json.workspace = true -thiserror.workspace = true -mime.workspace = true -serde.workspace = true -parking_lot.workspace = true [dev-dependencies] serde = { version = "1", features = ["derive"] } diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs index 6f6558d3..a27956f8 100644 --- a/volo-http/src/server.rs +++ b/volo-http/src/server.rs @@ -8,6 +8,7 @@ use hyper::{ body::{Body, Incoming as BodyIncoming}, server::conn::http1, }; +use hyper_util::rt::TokioIo; use motore::BoxError; use tracing::{info, trace}; use volo::net::{incoming::Incoming, MakeIncoming}; @@ -59,7 +60,7 @@ where let mut watch = rx_inner.clone(); tokio::task::spawn(async move { let mut http_conn = http1::Builder::new().serve_connection( - conn, + TokioIo::new(conn), hyper::service::service_fn(move |req: Request| { let s = service.clone(); let peer = peer.clone(); @@ -84,7 +85,9 @@ where _ = watch.changed() => { tracing::trace!("[VOLO] closing a pending connection"); // Graceful shutdown. - hyper::server::conn::http1::Connection::graceful_shutdown(Pin::new(&mut http_conn)); + hyper::server::conn::http1::Connection::graceful_shutdown( + Pin::new(&mut http_conn) + ); // Continue to poll this connection until shutdown can finish. let result = http_conn.await; if let Err(err) = result { From 2d5cc33f9042748b6999a3659152451a994c7d94 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 17:54:13 +0800 Subject: [PATCH 07/12] feat(volo-http): support `with_state` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 质疑 axum,理解 axum,成为 axum Signed-off-by: Yu Li --- examples/Cargo.toml | 6 +- examples/src/http/{http.rs => simple.rs} | 19 +- examples/src/http/state.rs | 39 +++ volo-http/src/dispatch.rs | 67 ----- volo-http/src/extract.rs | 56 +++-- volo-http/src/handler.rs | 212 +++++++++++++--- volo-http/src/lib.rs | 15 +- volo-http/src/request.rs | 37 +-- volo-http/src/route.rs | 299 +++++++++++++---------- volo-http/src/server.rs | 26 +- 10 files changed, 491 insertions(+), 285 deletions(-) rename examples/src/http/{http.rs => simple.rs} (80%) create mode 100644 examples/src/http/state.rs delete mode 100644 volo-http/src/dispatch.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 4c321f73..30a8848d 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -72,10 +72,10 @@ path = "src/unknown/thrift_server.rs" name = "unknown-thrift-client" path = "src/unknown/thrift_client.rs" +# http [[bin]] -name = "http" -path = "src/http/http.rs" - +name = "http-simple" +path = "src/http/simple.rs" [dependencies] anyhow.workspace = true diff --git a/examples/src/http/http.rs b/examples/src/http/simple.rs similarity index 80% rename from examples/src/http/http.rs rename to examples/src/http/simple.rs index 74c8090c..cb4d196d 100644 --- a/examples/src/http/http.rs +++ b/examples/src/http/simple.rs @@ -1,11 +1,10 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, time::Duration}; use bytes::Bytes; use http::{Method, StatusCode, Uri}; use motore::timeout::TimeoutLayer; use serde::Deserialize; use volo_http::{ - handler::HandlerService, param::Params, request::Json, route::{get, post, MethodRouter, Router}, @@ -57,26 +56,30 @@ async fn test( } } +async fn timeout_test() { + tokio::time::sleep(Duration::from_secs(5)).await +} + #[tokio::main(flavor = "multi_thread")] async fn main() { let app = Router::new() .route( "/", - get(hello).layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))), + get(hello).layer(TimeoutLayer::new(Some(Duration::from_secs(1)))), ) .route("/:echo", get(echo)) .route("/user", post(json)) .route( "/test", - MethodRouter::builder() - .get(HandlerService::new(test)) - .post(HandlerService::new(test)) - .build(), + MethodRouter::builder().get(test).post(test).build(), ) - .layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1)))); + .route("/timeout", get(timeout_test)) + .layer(TimeoutLayer::new(Some(Duration::from_secs(1)))); let addr: SocketAddr = "[::]:9091".parse().unwrap(); let addr = volo::net::Address::from(addr); + println!("Listening on {addr}"); + Server::new(app).run(addr).await.unwrap(); } diff --git a/examples/src/http/state.rs b/examples/src/http/state.rs new file mode 100644 index 00000000..47fd16bb --- /dev/null +++ b/examples/src/http/state.rs @@ -0,0 +1,39 @@ +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +use motore::timeout::TimeoutLayer; +use volo_http::{ + route::{get, Router}, + server::Server, + State, +}; + +async fn hello() -> &'static str { + "hello, world\n" +} + +struct AppState { + val: usize, +} + +async fn state_test(State(state): State>) -> Result { + Ok(format!("{}", state.val)) +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + let app = Router::new() + .route( + "/", + get(hello).layer(TimeoutLayer::new(Some(Duration::from_secs(1)))), + ) + .route("/state", get(state_test)) + .layer(TimeoutLayer::new(Some(Duration::from_secs(1)))) + .with_state(Arc::new(AppState { val: 114514 })); + + let addr: SocketAddr = "[::]:9091".parse().unwrap(); + let addr = volo::net::Address::from(addr); + + println!("Listening on {addr}"); + + Server::new(app).run(addr).await.unwrap(); +} diff --git a/volo-http/src/dispatch.rs b/volo-http/src/dispatch.rs deleted file mode 100644 index 89be3936..00000000 --- a/volo-http/src/dispatch.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::marker::PhantomData; - -use http::Response; -use hyper::body::Incoming; - -use crate::{request::FromRequest, response::RespBody, DynError, HttpContext}; - -pub(crate) struct DispatchService { - inner: S, - _marker: PhantomData<(IB, OB)>, -} - -impl DispatchService { - pub(crate) fn new(service: S) -> Self { - Self { - inner: service, - _marker: PhantomData, - } - } -} - -impl Clone for DispatchService -where - S: Clone, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - _marker: PhantomData, - } - } -} - -unsafe impl Send for DispatchService where S: Send {} - -unsafe impl Sync for DispatchService where S: Sync {} - -impl motore::Service for DispatchService -where - S: motore::Service> + Send + Sync + 'static, - S::Error: std::error::Error + Send + Sync + 'static, - OB: Into, - IB: FromRequest + Send, -{ - type Response = Response; - - type Error = DynError; - - async fn call<'s, 'cx>( - &'s self, - cx: &'cx mut HttpContext, - req: Incoming, - ) -> Result { - match IB::from(&*cx, req).await { - Ok(body) => self - .inner - .call(cx, body) - .await - .map(|resp| { - let (parts, body) = resp.into_parts(); - Response::from_parts(parts, body.into()) - }) - .map_err(|e| Box::new(e) as DynError), - Err(response) => Ok(response), - } - } -} diff --git a/volo-http/src/extract.rs b/volo-http/src/extract.rs index 909c5aea..3bc4c182 100644 --- a/volo-http/src/extract.rs +++ b/volo-http/src/extract.rs @@ -1,52 +1,68 @@ use futures_util::Future; use http::{Method, Response, Uri}; -use crate::{response::IntoResponse, HttpContext, Params}; +use crate::{response::IntoResponse, HttpContext, Params, State}; -pub trait FromContext: Sized { +pub trait FromContext: Sized { type Rejection: IntoResponse; fn from_context( context: &HttpContext, + state: &S, ) -> impl Future> + Send; } -impl FromContext for Option +impl FromContext for Option where - T: FromContext, + T: FromContext, + S: Send + Sync, { type Rejection = Response<()>; // Infallible - fn from_context( - context: &HttpContext, - ) -> impl Future> + Send { - async move { Ok(T::from_context(context).await.ok()) } + async fn from_context(context: &HttpContext, state: &S) -> Result { + Ok(T::from_context(context, state).await.ok()) } } -impl FromContext for Uri { +impl FromContext for Uri +where + S: Send + Sync, +{ type Rejection = Response<()>; // Infallible - fn from_context( - context: &HttpContext, - ) -> impl Future> + Send { - async move { Ok(context.uri.clone()) } + async fn from_context(context: &HttpContext, _state: &S) -> Result { + Ok(context.uri.clone()) } } -impl FromContext for Method { +impl FromContext for Method +where + S: Send + Sync, +{ type Rejection = Response<()>; // Infallible - async fn from_context(context: &HttpContext) -> Result { + async fn from_context(context: &HttpContext, _state: &S) -> Result { Ok(context.method.clone()) } } -impl FromContext for Params { +impl FromContext for Params +where + S: Send + Sync, +{ type Rejection = Response<()>; // Infallible - fn from_context( - context: &HttpContext, - ) -> impl Future> + Send { - async move { Ok(context.params.clone()) } + async fn from_context(context: &HttpContext, _state: &S) -> Result { + Ok(context.params.clone()) + } +} + +impl FromContext for State +where + S: Clone + Send + Sync, +{ + type Rejection = Response<()>; // Infallible + + async fn from_context(_context: &HttpContext, state: &S) -> Result { + Ok(State(state.clone())) } } diff --git a/volo-http/src/handler.rs b/volo-http/src/handler.rs index 7a448ce4..0f74deee 100644 --- a/volo-http/src/handler.rs +++ b/volo-http/src/handler.rs @@ -2,41 +2,48 @@ use std::{future::Future, marker::PhantomData}; use http::Response; use hyper::body::Incoming; +use motore::Service; use crate::{ extract::FromContext, request::FromRequest, response::{IntoResponse, RespBody}, - HttpContext, + DynError, DynService, HttpContext, }; -impl Clone for HandlerService -where - H: Clone, -{ - fn clone(&self) -> Self { - Self { - h: self.h.clone(), - _mark: PhantomData, - } - } -} - -pub trait Handler { +pub trait Handler: Sized { fn call( self, context: &mut HttpContext, req: Incoming, + state: &S, ) -> impl Future> + Send; + + fn with_state(self, state: S) -> HandlerService + where + S: Clone, + { + HandlerService { + handler: self, + state, + _marker: PhantomData, + } + } } -impl Handler<()> for F +impl Handler<((),), S> for F where F: FnOnce() -> Fut + Clone + Send, Fut: Future + Send, Res: IntoResponse, + S: Send + Sync, { - async fn call(self, _context: &mut HttpContext, _req: Incoming) -> Response { + async fn call( + self, + _context: &mut HttpContext, + _req: Incoming, + _state: &S, + ) -> Response { self().await.into_response() } } @@ -46,22 +53,23 @@ macro_rules! impl_handler { [$($ty:ident),*], $last:ident ) => { #[allow(non_snake_case, unused_mut, unused_variables)] - impl Handler<($($ty,)* $last,)> for F + impl Handler<(M, $($ty,)* $last,), S> for F where F: FnOnce($($ty,)* $last) -> Fut + Clone + Send, Fut: Future + Send, - $( for<'r> $ty: FromContext + Send + 'r, )* - for<'r> $last: FromRequest + Send + 'r, Res: IntoResponse, + S: Send + Sync, + $( for<'r> $ty: FromContext + Send + 'r, )* + for<'r> $last: FromRequest + Send + 'r, { - async fn call(self, context: &mut HttpContext, req: Incoming) -> Response { + async fn call(self, context: &mut HttpContext, req: Incoming, state: &S) -> Response { $( - let $ty = match $ty::from_context(context).await { + let $ty = match $ty::from_context(context, state).await { Ok(value) => value, Err(rejection) => return rejection.into_response(), }; )* - let $last = match $last::from(context, req).await { + let $last = match $last::from(context, req, state).await { Ok(value) => value, Err(rejection) => return rejection, }; @@ -97,32 +105,170 @@ impl_handler!( T16 ); -pub struct HandlerService { - h: H, - _mark: PhantomData, +// Use an extra trait with less generic types for hiding the type of handler +pub(crate) struct DynHandler(Box>); + +unsafe impl Send for DynHandler {} +unsafe impl Sync for DynHandler {} + +impl DynHandler +where + S: Clone + Send + Sync + 'static, +{ + pub(crate) fn new(handler: H) -> Self + where + H: Handler + Clone + Send + Sync + 'static, + T: 'static, + { + // The anonymous function should ensure that the `handler` must be an impl of `Handler`, + // but the `ErasedIntoRoute::into_route` does not need to care it. + Self(Box::new(MakeErasedHandler { + handler, + into_route: |handler, state| { + DynService::new(HandlerService { + handler, + state, + _marker: PhantomData, + }) + }, + })) + } + + // State can only be injected into handler because route does not have such a field, so before + // injected a state, a handler should keep being a handler. + pub(crate) fn map(self, f: F) -> DynHandler + where + F: FnOnce(DynService) -> DynService + Clone + 'static, + { + DynHandler(Box::new(LayerMap { + inner: self.0, + layer: Box::new(f), + })) + } + + pub(crate) fn into_route(self, state: S) -> DynService { + self.0.into_route(state) + } + + pub(crate) async fn call_with_state( + self, + cx: &mut HttpContext, + req: Incoming, + state: S, + ) -> Result, DynError> { + self.0.into_route(state).call(cx, req).await + } +} + +impl Clone for DynHandler { + fn clone(&self) -> Self { + Self(self.0.clone_box()) + } +} + +pub(crate) trait ErasedIntoRoute { + fn clone_box(&self) -> Box>; + fn into_route(self: Box, state: S) -> DynService; +} + +pub(crate) struct MakeErasedHandler { + handler: H, + into_route: fn(H, S) -> DynService, +} + +impl ErasedIntoRoute for MakeErasedHandler +where + H: Clone + 'static, + S: 'static, +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } + + fn into_route(self: Box, state: S) -> DynService { + motore::BoxCloneService::new((self.into_route)(self.handler, state)) + } +} + +impl Clone for MakeErasedHandler +where + H: Clone, +{ + fn clone(&self) -> Self { + Self { + handler: self.handler.clone(), + into_route: self.into_route, + } + } +} + +struct LayerMap { + inner: Box>, + layer: Box, +} + +trait LayerFn: FnOnce(DynService) -> DynService { + fn clone_box(&self) -> Box; } -impl HandlerService { - pub fn new(h: H) -> Self { +impl LayerFn for F +where + F: FnOnce(DynService) -> DynService + Clone + 'static, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl ErasedIntoRoute for LayerMap +where + S: 'static, +{ + fn clone_box(&self) -> Box> { + Box::new(Self { + inner: self.inner.clone_box(), + layer: self.layer.clone_box(), + }) + } + + fn into_route(self: Box, state: S) -> DynService { + (self.layer)(self.inner.into_route(state)) + } +} + +pub struct HandlerService { + handler: H, + state: S, + _marker: PhantomData, +} + +impl Clone for HandlerService +where + H: Clone, + S: Clone, +{ + fn clone(&self) -> Self { Self { - h, - _mark: PhantomData, + handler: self.handler.clone(), + state: self.state.clone(), + _marker: PhantomData, } } } -impl motore::Service for HandlerService +impl motore::Service for HandlerService where - for<'r> H: Handler + Clone + Send + Sync + 'r, + for<'r> H: Handler + Clone + Send + Sync + 'r, + S: Sync, { type Response = Response; - type Error = http::Error; + type Error = DynError; async fn call<'s, 'cx>( &'s self, cx: &'cx mut HttpContext, req: Incoming, ) -> Result { - Ok(self.h.clone().call(cx, req).await) + Ok(self.handler.clone().call(cx, req, &self.state).await) } } diff --git a/volo-http/src/lib.rs b/volo-http/src/lib.rs index c0ca341f..fa62b57e 100644 --- a/volo-http/src/lib.rs +++ b/volo-http/src/lib.rs @@ -1,4 +1,3 @@ -pub(crate) mod dispatch; pub mod extract; pub mod handler; pub mod layer; @@ -9,11 +8,25 @@ pub mod route; pub mod server; use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version}; +use hyper::{body::Incoming, Response}; use param::Params; use volo::net::Address; +mod private { + #[derive(Debug, Clone, Copy)] + pub enum ViaContext {} + + #[derive(Debug, Clone, Copy)] + pub enum ViaRequest {} +} + +pub type DynService = + motore::BoxCloneService, DynError>; pub type DynError = Box; +#[derive(Debug, Default, Clone, Copy)] +pub struct State(pub S); + pub struct HttpContext { pub peer: Address, pub method: Method, diff --git a/volo-http/src/request.rs b/volo-http/src/request.rs index 8b30aecf..193c3063 100644 --- a/volo-http/src/request.rs +++ b/volo-http/src/request.rs @@ -7,49 +7,56 @@ use serde::de::DeserializeOwned; use crate::{ extract::FromContext, + private, response::{IntoResponse, RespBody}, HttpContext, }; -pub trait FromRequest: Sized { +pub trait FromRequest: Sized { fn from( cx: &HttpContext, body: Incoming, + state: &S, ) -> impl Future>> + Send; } -impl FromRequest for T +impl FromRequest for T where - T: FromContext, + T: FromContext + Sync, + S: Sync, { - fn from( + async fn from( cx: &HttpContext, _body: Incoming, - ) -> impl Future>> + Send { - async move { - match T::from_context(cx).await { - Ok(value) => Ok(value), - Err(rejection) => Err(rejection.into_response()), - } + state: &S, + ) -> Result> { + match T::from_context(cx, state).await { + Ok(value) => Ok(value), + Err(rejection) => Err(rejection.into_response()), } } } -impl FromRequest for Incoming { - fn from( +impl FromRequest for Incoming +where + S: Sync, +{ + async fn from( _cx: &HttpContext, body: Incoming, - ) -> impl Future>> + Send { - async { Ok(body) } + _state: &S, + ) -> Result> { + Ok(body) } } pub struct Json(pub T); -impl FromRequest for Json { +impl FromRequest for Json { fn from( cx: &HttpContext, body: Incoming, + _state: &S, ) -> impl Future>> + Send { async move { if !json_content_type(&cx.headers) { diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index c6236da2..aee9215c 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -6,11 +6,9 @@ use hyper::body::{Bytes, Incoming}; use motore::{layer::Layer, Service}; use crate::{ - dispatch::DispatchService, - handler::{Handler, HandlerService}, - request::FromRequest, + handler::{DynHandler, Handler}, response::RespBody, - DynError, HttpContext, + DynError, DynService, HttpContext, }; // The `matchit::Router` cannot be converted to `Iterator`, so using @@ -34,42 +32,23 @@ impl RouteId { } } -#[derive(Clone, Default)] -pub struct Router { +pub struct Router { matcher: matchit::Router, - routes: HashMap, -} - -impl Service for Router { - type Response = Response; - - type Error = DynError; - - async fn call<'s, 'cx>( - &'s self, - cx: &'cx mut HttpContext, - req: Incoming, - ) -> Result { - if let Ok(matched) = self.matcher.at(cx.uri.path()) { - if let Some(srv) = self.routes.get(matched.value) { - cx.params = matched.params.into(); - return srv.call(cx, req).await; - } - } - - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::new()).into()) - .unwrap()) - } + routes: HashMap>, } -impl Router { +impl Router +where + S: Clone + Send + Sync + 'static, +{ pub fn new() -> Self { - Default::default() + Self { + matcher: Default::default(), + routes: Default::default(), + } } - pub fn route(mut self, uri: R, route: MethodRouter) -> Self + pub fn route(mut self, uri: R, route: MethodRouter) -> Self where R: Into, { @@ -84,7 +63,7 @@ impl Router { pub fn layer(self, l: L) -> Self where - L: Layer + Clone + Send + Sync + 'static, + L: Layer + Clone + Send + Sync + 'static, L::Service: Service, Error = DynError> + Clone + Send @@ -105,33 +84,88 @@ impl Router { routes, } } + + #[allow(dead_code)] + fn with_state(self, s: S) -> Router { + let routes = self + .routes + .into_iter() + .map(|(id, route)| { + let route = route.with_state(s.clone()); + (id, route) + }) + .collect(); + + Router { + matcher: self.matcher, + routes, + } + } } -#[derive(Default, Clone)] -pub struct MethodRouter { - options: Option, - get: Option, - post: Option, - put: Option, - delete: Option, - head: Option, - trace: Option, - connect: Option, - patch: Option, +impl Service for Router<()> { + type Response = Response; + + type Error = DynError; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + if let Ok(matched) = self.matcher.at(cx.uri.path()) { + if let Some(srv) = self.routes.get(matched.value) { + cx.params = matched.params.into(); + return srv.call_with_state(cx, req, ()).await; + } + } + + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::new()).into()) + .unwrap()) + } } -impl MethodRouter { +pub struct MethodRouter { + options: MethodEndpoint, + get: MethodEndpoint, + post: MethodEndpoint, + put: MethodEndpoint, + delete: MethodEndpoint, + head: MethodEndpoint, + trace: MethodEndpoint, + connect: MethodEndpoint, + patch: MethodEndpoint, +} + +impl MethodRouter +where + S: Clone + Send + Sync + 'static, +{ pub fn new() -> Self { - Default::default() + Self { + options: MethodEndpoint::None, + get: MethodEndpoint::None, + post: MethodEndpoint::None, + put: MethodEndpoint::None, + delete: MethodEndpoint::None, + head: MethodEndpoint::None, + trace: MethodEndpoint::None, + connect: MethodEndpoint::None, + patch: MethodEndpoint::None, + } } - pub fn builder() -> MethodRouterBuilder { - MethodRouterBuilder { route: Self::new() } + pub fn builder() -> MethodRouterBuilder { + MethodRouterBuilder { + router: Self::new(), + } } pub fn layer(self, l: L) -> Self where - L: Layer + Clone + Send + Sync + 'static, + L: Layer + Clone + Send + Sync + 'static, L::Service: Service, Error = DynError> + Clone + Send @@ -150,15 +184,17 @@ impl MethodRouter { patch, } = self; - let options = options.map(|r| r.layer(l.clone())); - let get = get.map(|r| r.layer(l.clone())); - let post = post.map(|r| r.layer(l.clone())); - let put = put.map(|r| r.layer(l.clone())); - let delete = delete.map(|r| r.layer(l.clone())); - let head = head.map(|r| r.layer(l.clone())); - let trace = trace.map(|r| r.layer(l.clone())); - let connect = connect.map(|r| r.layer(l.clone())); - let patch = patch.map(|r| r.layer(l.clone())); + let layer_fn = move |route: DynService| DynService::new(l.clone().layer(route)); + + let options = options.map(layer_fn.clone()); + let get = get.map(layer_fn.clone()); + let post = post.map(layer_fn.clone()); + let put = put.map(layer_fn.clone()); + let delete = delete.map(layer_fn.clone()); + let head = head.map(layer_fn.clone()); + let trace = trace.map(layer_fn.clone()); + let connect = connect.map(layer_fn.clone()); + let patch = patch.map(layer_fn.clone()); Self { options, @@ -172,18 +208,30 @@ impl MethodRouter { patch, } } -} -impl Service for MethodRouter { - type Response = Response; - - type Error = DynError; + pub fn with_state(self, state: S) -> MethodRouter { + MethodRouter { + options: self.options.with_state(&state), + get: self.get.with_state(&state), + post: self.post.with_state(&state), + put: self.put.with_state(&state), + delete: self.delete.with_state(&state), + head: self.head.with_state(&state), + trace: self.trace.with_state(&state), + connect: self.connect.with_state(&state), + patch: self.patch.with_state(&state), + } + } - async fn call<'s, 'cx>( + async fn call_with_state<'s, 'cx>( &'s self, cx: &'cx mut HttpContext, req: Incoming, - ) -> Result { + state: S, + ) -> Result, DynError> + where + S: 'cx, + { let handler = match cx.method { Method::OPTIONS => &self.options, Method::GET => &self.get, @@ -197,18 +245,20 @@ impl Service for MethodRouter { _ => { return Ok(Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) - .body("".into()) + .body(Full::new(Bytes::new()).into()) .unwrap()); } }; - if let Some(service) = handler { - service.call(cx, req).await - } else { - Ok(Response::builder() + match handler { + MethodEndpoint::None => Ok(Response::builder() .status(StatusCode::NOT_FOUND) - .body("".into()) - .unwrap()) + .body(Full::new(Bytes::new()).into()) + .unwrap()), + MethodEndpoint::Route(route) => route.call(cx, req).await, + MethodEndpoint::Handler(handler) => { + handler.clone().call_with_state(cx, req, state).await + } } } } @@ -219,91 +269,86 @@ macro_rules! for_all_methods { }; } -pub struct MethodRouterBuilder { - route: MethodRouter, +pub struct MethodRouterBuilder { + router: MethodRouter, } macro_rules! impl_method_register_for_builder { ($( $method:ident ),*) => { $( - pub fn $method(mut self, handler: S) -> Self + pub fn $method(mut self, handler: H) -> Self where - S: Service> - + Send - + Sync - + Clone - + 'static, - S::Error: std::error::Error + Send + Sync, - OB: Into + 'static, - IB: FromRequest + Send + 'static, + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, { - self.route.$method = Some(Route::new(DispatchService::new(handler))); + self.router.$method = MethodEndpoint::Handler(DynHandler::new(handler)); self } )+ }; } -impl MethodRouterBuilder { +impl MethodRouterBuilder +where + S: Clone + Send + Sync + 'static, +{ + pub fn new() -> Self { + Self { + router: MethodRouter::new(), + } + } + for_all_methods!(impl_method_register_for_builder); - pub fn build(self) -> MethodRouter { - self.route + pub fn build(self) -> MethodRouter { + self.router } } -#[derive(Clone)] -pub struct Route(motore::BoxCloneService, DynError>); - -impl Route { - pub fn new(inner: S) -> Self - where - S: Service, Error = DynError> - + Clone - + Send - + Sync - + 'static, - { - Route(motore::BoxCloneService::new(inner)) - } +#[derive(Clone, Default)] +enum MethodEndpoint { + #[default] + None, + Route(DynService), + Handler(DynHandler), +} - pub fn layer(self, l: L) -> Self +impl MethodEndpoint +where + S: Clone + Send + Sync + 'static, +{ + fn map(self, f: F) -> MethodEndpoint where - L: Layer, - L::Service: Service, Error = DynError> - + Clone - + Send - + Sync - + 'static, + F: FnOnce(DynService) -> DynService + Clone + 'static, { - Route::new(l.layer(self)) + match self { + MethodEndpoint::None => MethodEndpoint::None, + MethodEndpoint::Route(route) => MethodEndpoint::Route(f(route)), + MethodEndpoint::Handler(handler) => MethodEndpoint::Handler(handler.map(f)), + } } -} - -impl Service for Route { - type Response = Response; - type Error = DynError; - - #[inline] - async fn call<'s, 'cx>( - &'s self, - cx: &'cx mut HttpContext, - req: Incoming, - ) -> Result { - self.0.call(cx, req).await + fn with_state(self, state: &S) -> MethodEndpoint { + match self { + MethodEndpoint::None => MethodEndpoint::None, + MethodEndpoint::Route(route) => MethodEndpoint::Route(route), + MethodEndpoint::Handler(handler) => { + MethodEndpoint::Route(handler.into_route(state.clone())) + } + } } } macro_rules! impl_method_register { ($( $method:ident ),*) => { $( - pub fn $method(h: H) -> MethodRouter + pub fn $method(h: H) -> MethodRouter where - for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> H: Handler + Clone + Send + Sync + 'a, for<'a> T: 'a, + S: Clone + Send + Sync + 'static, { - MethodRouter::builder().$method(HandlerService::new(h)).build() + MethodRouterBuilder::new().$method(h).build() } )+ }; diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs index a27956f8..88defcbe 100644 --- a/volo-http/src/server.rs +++ b/volo-http/src/server.rs @@ -15,24 +15,28 @@ use volo::net::{incoming::Incoming, MakeIncoming}; use crate::{param::Params, DynError, HttpContext}; -#[derive(Clone)] pub struct Server { - app: App, + app: Arc, +} + +impl Clone for Server { + fn clone(&self) -> Self { + Self { + app: self.app.clone(), + } + } } impl Server where OB: Body + Send + 'static, OB::Data: Send, - App: motore::Service> - + Clone - + Send - + Sync - + 'static, + App: + motore::Service> + Send + Sync + 'static, App::Error: Into, { pub fn new(app: App) -> Self { - Self { app } + Self { app: Arc::new(app) } } pub async fn run(self, mk_incoming: MI) -> Result<(), BoxError> { @@ -56,13 +60,13 @@ where let peer = conn.info.peer_addr.clone().unwrap(); trace!("[VOLO] accept connection from: {:?}", peer); - let service = self.clone(); + let app = self.app.clone(); let mut watch = rx_inner.clone(); tokio::task::spawn(async move { let mut http_conn = http1::Builder::new().serve_connection( TokioIo::new(conn), hyper::service::service_fn(move |req: Request| { - let s = service.clone(); + let app = app.clone(); let peer = peer.clone(); async move { let (parts, req) = req.into_parts(); @@ -77,7 +81,7 @@ where inner: Vec::with_capacity(0), }, }; - s.app.call(&mut cx, req).await + app.call(&mut cx, req).await } }), ); From b5274fbf2461186ff64e6bbb2473f6f8e36885ab Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 20:12:23 +0800 Subject: [PATCH 08/12] fix(volo-http): fix timeout problem of graceful shutdown With the previous codes, server could not stop immediately after pressing Ctrl+C, but waited until timeout before exiting. This commit uses the same approach as "volo-thrift" and the problem has been resolved. Signed-off-by: Yu Li --- examples/src/http/simple.rs | 6 ++ volo-http/src/server.rs | 166 +++++++++++++++++++++++------------- 2 files changed, 115 insertions(+), 57 deletions(-) diff --git a/examples/src/http/simple.rs b/examples/src/http/simple.rs index cb4d196d..2d68d390 100644 --- a/examples/src/http/simple.rs +++ b/examples/src/http/simple.rs @@ -62,6 +62,12 @@ async fn timeout_test() { #[tokio::main(flavor = "multi_thread")] async fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let app = Router::new() .route( "/", diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs index 88defcbe..7b49df4d 100644 --- a/volo-http/src/server.rs +++ b/volo-http/src/server.rs @@ -10,8 +10,9 @@ use hyper::{ }; use hyper_util::rt::TokioIo; use motore::BoxError; +use tokio::sync::Notify; use tracing::{info, trace}; -use volo::net::{incoming::Incoming, MakeIncoming}; +use volo::net::{conn::Conn, incoming::Incoming, Address, MakeIncoming}; use crate::{param::Params, DynError, HttpContext}; @@ -41,70 +42,39 @@ where pub async fn run(self, mk_incoming: MI) -> Result<(), BoxError> { let mut incoming = mk_incoming.make_incoming().await?; - info!("[VOLO-HTTP] server start at: {:?}", incoming); + info!("[VOLO] server start at: {:?}", incoming); - let (tx, rx) = tokio::sync::watch::channel(()); - let exit_mark = Arc::new(std::sync::atomic::AtomicBool::default()); - - let exit_mark_inner = exit_mark.clone(); - let rx_inner = rx.clone(); + let conn_cnt = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let gconn_cnt = conn_cnt.clone(); + let (exit_notify, exit_flag, exit_mark) = ( + Arc::new(Notify::const_new()), + Arc::new(parking_lot::RwLock::new(false)), + Arc::new(std::sync::atomic::AtomicBool::default()), + ); + let (exit_notify_inner, exit_flag_inner, exit_mark_inner) = + (exit_notify.clone(), exit_flag.clone(), exit_mark.clone()); + // spawn accept loop let handler = tokio::spawn(async move { - let exit_mark = exit_mark_inner.clone(); + let exit_flag = exit_flag_inner.clone(); loop { - if exit_mark.load(Ordering::Relaxed) { + if *exit_flag.read() { break Ok(()); } match incoming.accept().await { Ok(Some(conn)) => { let peer = conn.info.peer_addr.clone().unwrap(); trace!("[VOLO] accept connection from: {:?}", peer); + conn_cnt.fetch_add(1, Ordering::Relaxed); - let app = self.app.clone(); - let mut watch = rx_inner.clone(); - tokio::task::spawn(async move { - let mut http_conn = http1::Builder::new().serve_connection( - TokioIo::new(conn), - hyper::service::service_fn(move |req: Request| { - let app = app.clone(); - let peer = peer.clone(); - async move { - let (parts, req) = req.into_parts(); - let mut cx = HttpContext { - peer, - method: parts.method, - uri: parts.uri, - version: parts.version, - headers: parts.headers, - extensions: parts.extensions, - params: Params { - inner: Vec::with_capacity(0), - }, - }; - app.call(&mut cx, req).await - } - }), - ); - tokio::select! { - _ = watch.changed() => { - tracing::trace!("[VOLO] closing a pending connection"); - // Graceful shutdown. - hyper::server::conn::http1::Connection::graceful_shutdown( - Pin::new(&mut http_conn) - ); - // Continue to poll this connection until shutdown can finish. - let result = http_conn.await; - if let Err(err) = result { - tracing::debug!("[VOLO] connection error: {:?}", err); - } - } - result = &mut http_conn => { - if let Err(err) = result { - tracing::debug!("[VOLO] connection error: {:?}", err); - } - }, - } - }); + tokio::task::spawn(handle_conn( + conn, + self.app.clone(), + exit_notify_inner.clone(), + exit_mark_inner.clone(), + conn_cnt.clone(), + peer, + )); } Ok(None) => break Ok(()), Err(e) => break Err(Box::new(e)), @@ -160,10 +130,92 @@ where // received signal, graceful shutdown now info!("[VOLO] received signal, gracefully exiting now"); + *exit_flag.write() = true; exit_mark.store(true, Ordering::Relaxed); - drop(rx); - let _ = tx.send(()); - let _ = tokio::time::timeout(Duration::from_secs(5), tx.closed()).await; + + // Now we won't accept new connections. + // And we want to send crrst reply to the peers in the short future. + if gconn_cnt.load(Ordering::Relaxed) != 0 { + tokio::time::sleep(Duration::from_secs(2)).await; + } + exit_notify.notify_waiters(); + + // wait for all connections to be closed + for _ in 0..28 { + if gconn_cnt.load(Ordering::Relaxed) == 0 { + break; + } + trace!( + "[VOLO] gracefully exiting, remaining connection count: {}", + gconn_cnt.load(Ordering::Relaxed) + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + Ok(()) } } + +async fn handle_conn( + conn: Conn, + service: S, + exit_notify: Arc, + _exit_mark: Arc, + conn_cnt: Arc, + peer: Address, +) where + S: motore::Service> + + Clone + + Send + + Sync + + 'static, + S::Error: Into, + B: Body + Send + 'static, + B::Data: Send, +{ + let notified = exit_notify.notified(); + tokio::pin!(notified); + + let mut http_conn = http1::Builder::new().serve_connection( + TokioIo::new(conn), + hyper::service::service_fn(move |req: Request| { + let service = service.clone(); + let peer = peer.clone(); + async move { + let (parts, req) = req.into_parts(); + let mut cx = HttpContext { + peer, + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + extensions: parts.extensions, + params: Params { + inner: Vec::with_capacity(0), + }, + }; + service.call(&mut cx, req).await + } + }), + ); + tokio::select! { + _ = &mut notified => { + tracing::trace!("[VOLO] closing a pending connection"); + // Graceful shutdown. + hyper::server::conn::http1::Connection::graceful_shutdown( + Pin::new(&mut http_conn) + ); + // Continue to poll this connection until shutdown can finish. + let result = http_conn.await; + if let Err(err) = result { + tracing::debug!("[VOLO] connection error: {:?}", err); + } + } + result = &mut http_conn => { + if let Err(err) = result { + tracing::debug!("[VOLO] http connection error: {:?}", err); + } + }, + } + conn_cnt.fetch_sub(1, Ordering::Relaxed); +} From 9d1e4a26bd0136aec1f0d4b2c93cbb6158dd5974 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Thu, 23 Nov 2023 20:25:41 +0800 Subject: [PATCH 09/12] feat(volo-http): support fallback This commit supports fallback in `Router` and `MethodRouter`. With `fallback`, when no route or method can handle the request, the `fallback` will be called and the specified response will be returned. Signed-off-by: Yu Li --- examples/src/http/simple.rs | 6 +- volo-http/src/handler.rs | 2 +- volo-http/src/route.rs | 234 +++++++++++++++++++++++++++++------- 3 files changed, 192 insertions(+), 50 deletions(-) diff --git a/examples/src/http/simple.rs b/examples/src/http/simple.rs index 2d68d390..7e17ee2a 100644 --- a/examples/src/http/simple.rs +++ b/examples/src/http/simple.rs @@ -44,11 +44,9 @@ async fn json(Json(request): Json) { async fn test( u: Uri, m: Method, - Json(request): Json, ) -> Result<&'static str, (StatusCode, &'static str)> { - println!("{u:?}"); - println!("{m:?}"); - println!("{request:?}"); + println!("uri: {u:?}"); + println!("method: {m:?}"); if u.to_string().ends_with("a") { Ok("a") // http://localhost:3000/test?a=a } else { diff --git a/volo-http/src/handler.rs b/volo-http/src/handler.rs index 0f74deee..8cf5a9be 100644 --- a/volo-http/src/handler.rs +++ b/volo-http/src/handler.rs @@ -106,7 +106,7 @@ impl_handler!( ); // Use an extra trait with less generic types for hiding the type of handler -pub(crate) struct DynHandler(Box>); +pub struct DynHandler(Box>); unsafe impl Send for DynHandler {} unsafe impl Sync for DynHandler {} diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index aee9215c..92c0ea86 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -1,13 +1,12 @@ use std::collections::HashMap; use http::{Method, Response, StatusCode}; -use http_body_util::Full; -use hyper::body::{Bytes, Incoming}; +use hyper::body::Incoming; use motore::{layer::Layer, Service}; use crate::{ handler::{DynHandler, Handler}, - response::RespBody, + response::{IntoResponse, RespBody}, DynError, DynService, HttpContext, }; @@ -35,6 +34,7 @@ impl RouteId { pub struct Router { matcher: matchit::Router, routes: HashMap>, + fallback: Fallback, } impl Router @@ -45,6 +45,7 @@ where Self { matcher: Default::default(), routes: Default::default(), + fallback: Fallback::from_status_code(StatusCode::NOT_FOUND), } } @@ -61,6 +62,11 @@ where self } + pub fn fallback_for_all(mut self, fallback: Fallback) -> Self { + self.fallback = fallback; + self + } + pub fn layer(self, l: L) -> Self where L: Layer + Clone + Send + Sync + 'static, @@ -79,9 +85,12 @@ where }) .collect(); + let fallback = self.fallback.layer(l.clone()); + Router { matcher: self.matcher, routes, + fallback, } } @@ -96,9 +105,12 @@ where }) .collect(); + let fallback = self.fallback.with_state(&s); + Router { matcher: self.matcher, routes, + fallback, } } } @@ -120,10 +132,7 @@ impl Service for Router<()> { } } - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::new()).into()) - .unwrap()) + self.fallback.call_with_state(cx, req, ()).await } } @@ -137,6 +146,7 @@ pub struct MethodRouter { trace: MethodEndpoint, connect: MethodEndpoint, patch: MethodEndpoint, + fallback: Fallback, } impl MethodRouter @@ -154,6 +164,7 @@ where trace: MethodEndpoint::None, connect: MethodEndpoint::None, patch: MethodEndpoint::None, + fallback: Fallback::from_status_code(StatusCode::METHOD_NOT_ALLOWED), } } @@ -182,6 +193,7 @@ where trace, connect, patch, + fallback, } = self; let layer_fn = move |route: DynService| DynService::new(l.clone().layer(route)); @@ -196,6 +208,8 @@ where let connect = connect.map(layer_fn.clone()); let patch = patch.map(layer_fn.clone()); + let fallback = fallback.map(layer_fn); + Self { options, get, @@ -206,6 +220,7 @@ where trace, connect, patch, + fallback, } } @@ -220,10 +235,11 @@ where trace: self.trace.with_state(&state), connect: self.connect.with_state(&state), patch: self.patch.with_state(&state), + fallback: self.fallback.with_state(&state), } } - async fn call_with_state<'s, 'cx>( + pub(crate) async fn call_with_state<'s, 'cx>( &'s self, cx: &'cx mut HttpContext, req: Incoming, @@ -233,32 +249,24 @@ where S: 'cx, { let handler = match cx.method { - Method::OPTIONS => &self.options, - Method::GET => &self.get, - Method::POST => &self.post, - Method::PUT => &self.put, - Method::DELETE => &self.delete, - Method::HEAD => &self.head, - Method::TRACE => &self.trace, - Method::CONNECT => &self.connect, - Method::PATCH => &self.patch, - _ => { - return Ok(Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .body(Full::new(Bytes::new()).into()) - .unwrap()); - } + Method::OPTIONS => Some(&self.options), + Method::GET => Some(&self.get), + Method::POST => Some(&self.post), + Method::PUT => Some(&self.put), + Method::DELETE => Some(&self.delete), + Method::HEAD => Some(&self.head), + Method::TRACE => Some(&self.trace), + Method::CONNECT => Some(&self.connect), + Method::PATCH => Some(&self.patch), + _ => None, }; match handler { - MethodEndpoint::None => Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Full::new(Bytes::new()).into()) - .unwrap()), - MethodEndpoint::Route(route) => route.call(cx, req).await, - MethodEndpoint::Handler(handler) => { + Some(MethodEndpoint::Route(route)) => route.call(cx, req).await, + Some(MethodEndpoint::Handler(handler)) => { handler.clone().call_with_state(cx, req, state).await } + _ => self.fallback.call_with_state(cx, req, state).await, } } } @@ -305,8 +313,25 @@ where } } +macro_rules! impl_method_register { + ($( $method:ident ),*) => { + $( + pub fn $method(h: H) -> MethodRouter + where + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, + S: Clone + Send + Sync + 'static, + { + MethodRouterBuilder::new().$method(h).build() + } + )+ + }; +} + +for_all_methods!(impl_method_register); + #[derive(Clone, Default)] -enum MethodEndpoint { +pub enum MethodEndpoint { #[default] None, Route(DynService), @@ -317,7 +342,27 @@ impl MethodEndpoint where S: Clone + Send + Sync + 'static, { - fn map(self, f: F) -> MethodEndpoint + pub fn from_handler(h: H) -> MethodEndpoint + where + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, + S: Clone + Send + Sync + 'static, + { + MethodEndpoint::Handler(DynHandler::new(h)) + } + + pub fn from_service(srv: Srv) -> MethodEndpoint + where + Srv: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + MethodEndpoint::Route(DynService::new(srv)) + } + + pub(crate) fn map(self, f: F) -> MethodEndpoint where F: FnOnce(DynService) -> DynService + Clone + 'static, { @@ -328,7 +373,7 @@ where } } - fn with_state(self, state: &S) -> MethodEndpoint { + pub(crate) fn with_state(self, state: &S) -> MethodEndpoint { match self { MethodEndpoint::None => MethodEndpoint::None, MethodEndpoint::Route(route) => MethodEndpoint::Route(route), @@ -339,19 +384,118 @@ where } } -macro_rules! impl_method_register { - ($( $method:ident ),*) => { - $( - pub fn $method(h: H) -> MethodRouter - where - for<'a> H: Handler + Clone + Send + Sync + 'a, - for<'a> T: 'a, - S: Clone + Send + Sync + 'static, - { - MethodRouterBuilder::new().$method(h).build() +#[derive(Clone)] +pub enum Fallback { + Route(DynService), + Handler(DynHandler), +} + +impl Fallback +where + S: Clone + Send + Sync + 'static, +{ + pub(crate) fn from_status_code(status: StatusCode) -> Fallback { + Fallback::Route(DynService::new(RouteForStatusCode(status))) + } + + pub fn from_handler(h: H) -> Fallback + where + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, + S: Clone + Send + Sync + 'static, + { + Fallback::Handler(DynHandler::new(h)) + } + + pub fn from_service(srv: Srv) -> Fallback + where + Srv: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + Fallback::Route(DynService::new(srv)) + } + + pub(crate) fn map(self, f: F) -> Fallback + where + F: FnOnce(DynService) -> DynService + Clone + 'static, + { + match self { + Fallback::Route(route) => Fallback::Route(f(route)), + Fallback::Handler(handler) => Fallback::Handler(handler.map(f)), } - )+ - }; + } + + pub(crate) fn layer(self, l: L) -> Self + where + L: Layer + Clone + Send + Sync + 'static, + L::Service: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + { + self.map(move |route: DynService| DynService::new(l.clone().layer(route))) + } + + pub(crate) fn with_state(self, state: &S) -> Fallback { + match self { + Fallback::Route(route) => Fallback::Route(route), + Fallback::Handler(handler) => Fallback::Route(handler.into_route(state.clone())), + } + } + + pub(crate) async fn call_with_state<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + state: S, + ) -> Result, DynError> + where + S: 'cx, + { + match self { + Fallback::Route(route) => route.call(cx, req).await, + Fallback::Handler(handler) => handler.clone().call_with_state(cx, req, state).await, + } + } } -for_all_methods!(impl_method_register); +pub fn from_handler(h: H) -> MethodEndpoint +where + for<'a> H: Handler + Clone + Send + Sync + 'a, + for<'a> T: 'a, + S: Clone + Send + Sync + 'static, +{ + MethodEndpoint::from_handler(h) +} + +pub fn from_service(srv: Srv) -> MethodEndpoint +where + Srv: Service, Error = DynError> + + Clone + + Send + + Sync + + 'static, + S: Clone + Send + Sync + 'static, +{ + MethodEndpoint::from_service(srv) +} + +#[derive(Clone)] +struct RouteForStatusCode(StatusCode); + +impl Service for RouteForStatusCode { + type Response = Response; + type Error = DynError; + + async fn call<'s, 'cx>( + &'s self, + _cx: &'cx mut HttpContext, + _req: Incoming, + ) -> Result { + Ok(self.0.into_response()) + } +} From 21e3d4b368ee99a4b5023cc26470ae77c203f9c9 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 20:43:08 +0800 Subject: [PATCH 10/12] feat(volo-http): add `TimeoutLayer` with response This commit adds a `TimeoutLayer` for `volo-http` which can use a function as a callback and when timeout occurs, it will return a response by the callback. Signed-off-by: Yu Li --- Cargo.lock | 1 + examples/Cargo.toml | 1 + examples/src/http/simple.rs | 27 ++++++++---- volo-http/src/layer.rs | 87 ++++++++++++++++++++++++++++++++----- volo-http/src/route.rs | 2 +- 5 files changed, 99 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 450816b6..5c168dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -530,6 +530,7 @@ dependencies = [ "async-stream", "bytes", "http 1.0.0", + "http-body-util", "hyper 1.0.1", "lazy_static", "metainfo", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 30a8848d..4779866c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -82,6 +82,7 @@ anyhow.workspace = true async-stream.workspace = true bytes.workspace = true http = "1" +http-body-util = "0.1" hyper = { version = "1", features = ["server", "http1", "http2"] } lazy_static.workspace = true metainfo.workspace = true diff --git a/examples/src/http/simple.rs b/examples/src/http/simple.rs index 7e17ee2a..577cc265 100644 --- a/examples/src/http/simple.rs +++ b/examples/src/http/simple.rs @@ -1,14 +1,16 @@ use std::{net::SocketAddr, time::Duration}; use bytes::Bytes; -use http::{Method, StatusCode, Uri}; -use motore::timeout::TimeoutLayer; +use http::{Method, Response, StatusCode, Uri}; +use http_body_util::Full; use serde::Deserialize; use volo_http::{ + layer::TimeoutLayer, param::Params, request::Json, route::{get, post, MethodRouter, Router}, server::Server, + HttpContext, }; async fn hello() -> &'static str { @@ -41,10 +43,7 @@ async fn json(Json(request): Json) { ); } -async fn test( - u: Uri, - m: Method, -) -> Result<&'static str, (StatusCode, &'static str)> { +async fn test(u: Uri, m: Method) -> Result<&'static str, (StatusCode, &'static str)> { println!("uri: {u:?}"); println!("method: {m:?}"); if u.to_string().ends_with("a") { @@ -58,6 +57,11 @@ async fn timeout_test() { tokio::time::sleep(Duration::from_secs(5)).await } +fn timeout_handler(ctx: &HttpContext) -> StatusCode { + tracing::info!("Timeout on `{}`, peer: {}", ctx.uri, ctx.peer); + StatusCode::INTERNAL_SERVER_ERROR +} + #[tokio::main(flavor = "multi_thread")] async fn main() { let subscriber = tracing_subscriber::FmtSubscriber::builder() @@ -66,10 +70,17 @@ async fn main() { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let timeout_response = Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Full::new(Bytes::new())) + .unwrap(); + let app = Router::new() .route( "/", - get(hello).layer(TimeoutLayer::new(Some(Duration::from_secs(1)))), + get(hello).layer(TimeoutLayer::new(Duration::from_secs(1), move |_| { + timeout_response + })), ) .route("/:echo", get(echo)) .route("/user", post(json)) @@ -78,7 +89,7 @@ async fn main() { MethodRouter::builder().get(test).post(test).build(), ) .route("/timeout", get(timeout_test)) - .layer(TimeoutLayer::new(Some(Duration::from_secs(1)))); + .layer(TimeoutLayer::new(Duration::from_secs(1), timeout_handler)); let addr: SocketAddr = "[::]:9091".parse().unwrap(); let addr = volo::net::Address::from(addr); diff --git a/volo-http/src/layer.rs b/volo-http/src/layer.rs index 5b04772f..43097f0f 100644 --- a/volo-http/src/layer.rs +++ b/volo-http/src/layer.rs @@ -1,14 +1,14 @@ +use std::time::Duration; + use http::{Method, Request, Response, StatusCode}; use http_body_util::Full; use hyper::body::{Bytes, Incoming}; +use motore::{layer::Layer, service::Service}; -use crate::HttpContext; - -pub trait Layer { - type Service: motore::Service>; - - fn layer(self, inner: S) -> Self::Service; -} +use crate::{ + response::{IntoResponse, RespBody}, + HttpContext, +}; pub trait LayerExt { fn method( @@ -44,7 +44,7 @@ pub struct FilterLayer { impl Layer for FilterLayer where - S: motore::Service, Response = Response>> + S: Service, Response = Response>> + Send + Sync + 'static, @@ -65,9 +65,9 @@ pub struct Filter { f: F, } -impl motore::Service> for Filter +impl Service> for Filter where - S: motore::Service, Response = Response>> + S: Service, Response = Response>> + Send + Sync + 'static, @@ -91,3 +91,70 @@ where self.service.call(cx, req).await } } + +#[derive(Clone)] +pub struct TimeoutLayer { + duration: Duration, + handler: F, +} + +impl TimeoutLayer { + pub fn new(duration: Duration, handler: F) -> Self + where + F: FnOnce(&HttpContext) -> T + Clone + Sync, + T: IntoResponse, + { + Self { duration, handler } + } +} + +impl Layer for TimeoutLayer +where + S: Service> + Send + Sync + 'static, + F: FnOnce(&HttpContext) -> T + Clone + Sync, + T: IntoResponse, +{ + type Service = Timeout; + + fn layer(self, inner: S) -> Self::Service { + Timeout { + service: inner, + duration: self.duration, + handler: self.handler, + } + } +} + +#[derive(Clone)] +pub struct Timeout { + service: S, + duration: Duration, + handler: F, +} + +impl Service for Timeout +where + S: Service> + Send + Sync + 'static, + F: FnOnce(&HttpContext) -> T + Clone + Sync, + T: IntoResponse, +{ + type Response = S::Response; + + type Error = S::Error; + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut HttpContext, + req: Incoming, + ) -> Result { + let fut_service = self.service.call(cx, req); + let fut_timeout = tokio::time::sleep(self.duration); + + tokio::select! { + resp = fut_service => resp, + _ = fut_timeout => { + Ok((self.handler.clone())(cx).into_response()) + }, + } + } +} diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index 92c0ea86..c0b0fef8 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use http::{Method, Response, StatusCode}; use hyper::body::Incoming; -use motore::{layer::Layer, Service}; +use motore::{layer::Layer, service::Service}; use crate::{ handler::{DynHandler, Handler}, From 39dea33d6a029d469ed34980a539b18290cb6548 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Fri, 24 Nov 2023 20:30:35 +0800 Subject: [PATCH 11/12] chore(volo-http): bump version to `0.1.0` Signed-off-by: Yu Li --- Cargo.lock | 2 +- volo-http/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c168dfa..2d501d6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2937,7 +2937,7 @@ dependencies = [ [[package]] name = "volo-http" -version = "0.0.0" +version = "0.1.0" dependencies = [ "bytes", "futures-util", diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index 1a81ffef..35d15099 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-http" -version = "0.0.0" +version = "0.1.0" edition.workspace = true homepage.workspace = true repository.workspace = true From 8ec89d8e09fcdb0b1f3dd34ba60d1930591f6467 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Mon, 27 Nov 2023 10:58:13 +0800 Subject: [PATCH 12/12] credits: update license of axum Signed-off-by: Yu Li --- CREDITS.md | 7 +++++++ licenses/axum/LICENSE | 25 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 licenses/axum/LICENSE diff --git a/CREDITS.md b/CREDITS.md index dac5c9d2..2c623c10 100644 --- a/CREDITS.md +++ b/CREDITS.md @@ -18,3 +18,10 @@ We've copied and modified some code of [`tonic`](https://github.com/hyperium/ton We really appreciate the work that the `tonic` team have done. `Tonic` is licensed under the MIT license, and a copy can be found under the `licenses/tonic` directory. + +# axum + +We've copied and modified some code of [`axum`](https://github.com/tokio-rs/axum). +We really appreciate the work that the `axum` team have done. + +`axum` is licensed under the MIT license, and a copy can be found under the `licenses/axum` directory. diff --git a/licenses/axum/LICENSE b/licenses/axum/LICENSE new file mode 100644 index 00000000..11598b4b --- /dev/null +++ b/licenses/axum/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Axum Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE.