Skip to content

Commit

Permalink
upgrade hyper and http-body to 1.x (Nugine#134)
Browse files Browse the repository at this point in the history
* upgrade hyper and http-body to 1.x

* add BoxBody variant

* discard non-data frames in Body Stream impl

---------

Co-authored-by: Liam Perlaki <liam@perlaki.org>
  • Loading branch information
lperlaki and lperlaki authored Mar 17, 2024
1 parent 3640ecf commit 83e34cd
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 100 deletions.
14 changes: 5 additions & 9 deletions crates/s3s-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ categories = ["web-programming", "web-programming::http-server"]

[dependencies]
async-trait = "0.1.73"
aws-sdk-s3 = "1.12.0"
aws-smithy-http = { version = "0.60.2", features = ["event-stream"] }
aws-smithy-runtime-api = { version = "1.1.2", features = ["client", "http-02x"] }
aws-smithy-types = { version = "1.1.2", features = ["http-body-0-4-x"] }
aws-smithy-types-convert = { version = "0.60.2", features = ["convert-time"] }
bytes = "1.4.0"
futures = { version = "0.3.28", default-features = false, features = ["std"] }
http-body = "0.4.5"
hyper = "0.14.27"
aws-sdk-s3 = "1.17.0"
aws-smithy-runtime-api = { version = "1.2.0", features = ["client", "http-1x"] }
aws-smithy-types = { version = "1.1.8", features = ["http-body-1-x"] }
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-time"] }
hyper = "1.1.0"
s3s = { version = "0.10.0-dev", path = "../s3s" }
sync_wrapper = "0.1.2"
tracing = "0.1.37"
Expand Down
27 changes: 2 additions & 25 deletions crates/s3s-aws/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,9 @@
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use aws_smithy_types::body::SdkBody;

use futures::Stream;
use hyper::body::HttpBody;

pub fn s3s_body_into_sdk_body(body: s3s::Body) -> SdkBody {
SdkBody::from_body_0_4(body.boxed())
SdkBody::from_body_1_x(body)
}

pub fn sdk_body_into_s3s_body(body: SdkBody) -> s3s::Body {
s3s::Body::from(Box::pin(Wrapper(body)) as s3s::stream::DynByteStream)
}

struct Wrapper(SdkBody);

impl Stream for Wrapper {
type Item = Result<bytes::Bytes, s3s::StdError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
hyper::body::HttpBody::poll_data(Pin::new(&mut self.0), cx)
}
}

impl s3s::stream::ByteStream for Wrapper {
fn remaining_length(&self) -> s3s::stream::RemainingLength {
hyper::body::HttpBody::size_hint(&self.0).into()
}
s3s::Body::http_body(body)
}
2 changes: 1 addition & 1 deletion crates/s3s-aws/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl HttpConnector for Connector {
}

fn convert_input(req: AwsHttpRequest) -> Result<Request<s3s::Body>, ConnectorError> {
let mut req = req.try_into_http02x().map_err(on_err)?;
let mut req = req.try_into_http1x().map_err(on_err)?;

if req.headers().contains_key(HOST).not() {
let host = auto_host_header(req.uri());
Expand Down
20 changes: 16 additions & 4 deletions crates/s3s-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,29 @@ name = "s3s-fs"
required-features = ["binary"]

[features]
binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper"]
binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper-util"]

[dependencies]
async-trait = "0.1.73"
base64-simd = "0.8.0"
bytes = "1.4.0"
chrono = { version = "0.4.26", default-features = false, features = ["std", "clock"] }
chrono = { version = "0.4.26", default-features = false, features = [
"std",
"clock",
] }
clap = { version = "4.3.21", optional = true, features = ["derive"] }
crc32c = "0.6.4"
crc32fast = "1.3.2"
digest = "0.10.7"
futures = "0.3.28"
hex-simd = "0.8.0"
hyper = { version = "0.14.27", optional = true, features = ["http1", "http2", "server", "stream", "runtime"] }

hyper-util = { version = "0.1.3", optional = true, features = [
"server",
"http1",
"http2",
"tokio",
] }
md-5 = "0.10.5"
mime = "0.3.17"
nugine-rust-utils = "0.3.1"
Expand All @@ -43,7 +52,10 @@ tokio = { version = "1.31.0", features = ["fs", "io-util"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tracing = "0.1.37"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.17", optional = true, features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3.17", optional = true, features = [
"env-filter",
"time",
] }
transform-stream = "0.3.0"
uuid = { version = "1.4.1", features = ["v4"] }

Expand Down
39 changes: 30 additions & 9 deletions crates/s3s-fs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use s3s::auth::SimpleAuth;
use s3s::service::S3ServiceBuilder;

use std::io::IsTerminal;
use std::net::TcpListener;
use std::path::PathBuf;

use tokio::net::TcpListener;

use clap::{CommandFactory, Parser};
use hyper::server::Server;
use tracing::info;

use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as ConnBuilder;

#[derive(Debug, Parser)]
#[command(version)]
struct Opt {
Expand Down Expand Up @@ -108,18 +111,36 @@ async fn run(opt: Opt) -> Result {
};

// Run server
let listener = TcpListener::bind((opt.host.as_str(), opt.port))?;
let listener = TcpListener::bind((opt.host.as_str(), opt.port)).await?;
let local_addr = listener.local_addr()?;

let server = Server::from_tcp(listener)?.serve(service.into_shared().into_make_service());
let hyper_service = service.into_shared();

let connection = ConnBuilder::new(TokioExecutor::new());

let server = async move {
loop {
let (socket, _) = match listener.accept().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
};
let service = hyper_service.clone();
let conn = connection.clone();
tokio::spawn(async move {
let _ = conn.serve_connection(TokioIo::new(socket), service).await;
});
}
};

let task = tokio::spawn(server);
info!("server is running at http://{local_addr}");
server.with_graceful_shutdown(shutdown_signal()).await?;

tokio::signal::ctrl_c().await?;
task.abort();

info!("server is stopped");
Ok(())
}

async fn shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
}
15 changes: 11 additions & 4 deletions crates/s3s-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ edition = "2021"
publish = false

[dependencies]
aws-config = { version = "1.1.2", default-features = false, features = ["behavior-version-latest"] }
aws-credential-types = "1.1.2"
aws-sdk-s3 = "1.12.0"
aws-config = { version = "1.1.7", default-features = false, features = [
"behavior-version-latest",
] }
aws-credential-types = "1.1.7"
aws-sdk-s3 = "1.17.0"
clap = { version = "4.3.21", features = ["derive"] }
hyper = { version = "0.14.27", features = ["full"] }
hyper-util = { version = "0.1.3", features = [
"server",
"http1",
"http2",
"tokio",
] }
s3s = { version = "0.10.0-dev", path = "../s3s" }
s3s-aws = { version = "0.10.0-dev", path = "../s3s-aws" }
tokio = { version = "1.31.0", features = ["full"] }
Expand Down
30 changes: 26 additions & 4 deletions crates/s3s-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

use s3s::auth::SimpleAuth;
use s3s::service::S3ServiceBuilder;
use tokio::net::TcpListener;

use std::error::Error;
use std::io::IsTerminal;
use std::net::TcpListener;

use aws_credential_types::provider::ProvideCredentials;

use clap::Parser;
use hyper::server::Server;
use tracing::info;

use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as ConnBuilder;

#[derive(Debug, Parser)]
struct Opt {
#[clap(long, default_value = "localhost")]
Expand Down Expand Up @@ -71,8 +73,28 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
};

// Run server
let listener = TcpListener::bind((opt.host.as_str(), opt.port))?;
let server = Server::from_tcp(listener)?.serve(service.into_shared().into_make_service());
let listener = TcpListener::bind((opt.host.as_str(), opt.port)).await?;

let hyper_service = service.into_shared();

let connection = ConnBuilder::new(TokioExecutor::new());

let server = async move {
loop {
let (socket, _) = match listener.accept().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
};
let service = hyper_service.clone();
let conn = connection.clone();
tokio::spawn(async move {
let _ = conn.serve_connection(TokioIo::new(socket), service).await;
});
}
};

info!("server is running at http://{}:{}/", opt.host, opt.port);
info!("server is forwarding requests to {}", opt.endpoint_url);
Expand Down
5 changes: 3 additions & 2 deletions crates/s3s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ crc32fast = "1.3.2"
futures = { version = "0.3.28", default-features = false, features = ["std"] }
hex-simd = "0.8.0"
hmac = "0.12.1"
http-body = "0.4.5"
http-body = "1.0.0"
http-body-util = "0.1.1"
httparse = "1.8.0"
hyper = { version = "0.14.27", features = ["stream"] }
hyper = { version = "1.1.0", features = ["http1", "server"] }
itoa = "1.0.9"
memchr = "2.6.2"
mime = "0.3.17"
Expand Down
Loading

0 comments on commit 83e34cd

Please sign in to comment.