Skip to content

Commit

Permalink
Drop NewConnection in favor of simpler API
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Sep 24, 2022
1 parent 499f81e commit 4fc71b6
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 357 deletions.
15 changes: 5 additions & 10 deletions bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,17 @@ async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = incoming.next().await.unwrap();
let quinn::NewConnection {
mut bi_streams,
connection,
..
} = handshake.await.context("handshake failed")?;
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match bi_streams.next().await {
None => break,
Some(Err(quinn::ConnectionError::ApplicationClosed(_))) => break,
Some(Err(e)) => {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {:?}", e);
break;
}
Some(Ok(stream)) => stream,
Ok(stream) => stream,
};
trace!("stream established");

Expand Down
2 changes: 1 addition & 1 deletion bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn connect_client(
let mut client_config = quinn::ClientConfig::new(Arc::new(crypto));
client_config.transport_config(Arc::new(transport_config(&opt)));

let quinn::NewConnection { connection, .. } = endpoint
let connection = endpoint
.connect_with(client_config, server_addr, "localhost")
.unwrap()
.await
Expand Down
27 changes: 5 additions & 22 deletions perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,17 @@ async fn run(opt: Opt) -> Result<()> {

let stream_stats = OpenStreamStats::default();

let quinn::NewConnection {
connection,
uni_streams,
..
} = endpoint
let connection = endpoint
.connect_with(cfg, addr, host_name)?
.await
.context("connecting")?;

info!("established");

let acceptor = UniAcceptor(Arc::new(tokio::sync::Mutex::new(uni_streams)));

let drive_fut = async {
tokio::try_join!(
drive_uni(
connection.clone(),
acceptor,
stream_stats.clone(),
opt.uni_requests,
opt.upload_size,
Expand Down Expand Up @@ -236,7 +229,6 @@ async fn drain_stream(

async fn drive_uni(
connection: quinn::Connection,
acceptor: UniAcceptor,
stream_stats: OpenStreamStats,
concurrency: u64,
upload: u64,
Expand All @@ -247,12 +239,12 @@ async fn drive_uni(
loop {
let permit = sem.clone().acquire_owned().await.unwrap();
let send = connection.open_uni().await?;
let acceptor = acceptor.clone();
let stream_stats = stream_stats.clone();

debug!("sending request on {}", send.id());
let connection = connection.clone();
tokio::spawn(async move {
if let Err(e) = request_uni(send, acceptor, upload, download, stream_stats).await {
if let Err(e) = request_uni(send, connection, upload, download, stream_stats).await {
error!("sending request failed: {:#}", e);
}

Expand All @@ -263,19 +255,13 @@ async fn drive_uni(

async fn request_uni(
send: quinn::SendStream,
acceptor: UniAcceptor,
conn: quinn::Connection,
upload: u64,
download: u64,
stream_stats: OpenStreamStats,
) -> Result<()> {
request(send, upload, download, stream_stats.clone()).await?;
let recv = {
let mut guard = acceptor.0.lock().await;
guard
.next()
.await
.ok_or_else(|| anyhow::anyhow!("End of stream"))
}??;
let recv = conn.accept_uni().await?;
drain_stream(recv, download, stream_stats).await?;
Ok(())
}
Expand Down Expand Up @@ -348,9 +334,6 @@ async fn request_bi(
Ok(())
}

#[derive(Clone)]
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams>>);

struct SkipServerVerification;

impl SkipServerVerification {
Expand Down
24 changes: 7 additions & 17 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,11 @@ async fn run(opt: Opt) -> Result<()> {
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
let quinn::NewConnection {
uni_streams,
bi_streams,
connection,
..
} = handshake.await.context("handshake failed")?;
let connection = handshake.await.context("handshake failed")?;
debug!("{} connected", connection.remote_address());
tokio::try_join!(
drive_uni(connection.clone(), uni_streams),
drive_bi(bi_streams),
drive_uni(connection.clone()),
drive_bi(connection.clone()),
conn_stats(connection, opt)
)?;
Ok(())
Expand All @@ -129,12 +124,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()>
Ok(())
}

async fn drive_uni(
connection: quinn::Connection,
mut streams: quinn::IncomingUniStreams,
) -> Result<()> {
while let Some(stream) = streams.next().await {
let stream = stream?;
async fn drive_uni(connection: quinn::Connection) -> Result<()> {
while let Ok(stream) = connection.accept_uni().await {
let connection = connection.clone();
tokio::spawn(async move {
if let Err(e) = handle_uni(connection, stream).await {
Expand All @@ -152,9 +143,8 @@ async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) ->
Ok(())
}

async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
while let Some(stream) = streams.next().await {
let (send, recv) = stream?;
async fn drive_bi(connection: quinn::Connection) -> Result<()> {
while let Ok((send, recv)) = connection.accept_bi().await {
tokio::spawn(async move {
if let Err(e) = handle_bi(send, recv).await {
error!("request failed: {:#}", e);
Expand Down
8 changes: 3 additions & 5 deletions quinn/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,14 @@ impl Context {
};
let handle = runtime.spawn(
async move {
let quinn::NewConnection {
mut uni_streams, ..
} = incoming
let connection = incoming
.next()
.await
.expect("accept")
.await
.expect("connect");

while let Some(Ok(mut stream)) = uni_streams.next().await {
while let Ok(mut stream) = connection.accept_uni().await {
tokio::spawn(async move {
while stream
.read_chunk(usize::MAX, false)
Expand All @@ -142,7 +140,7 @@ impl Context {
let _guard = runtime.enter();
Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap()
};
let quinn::NewConnection { connection, .. } = runtime
let connection = runtime
.block_on(async {
endpoint
.connect_with(self.client_config.clone(), server_addr, "localhost")
Expand Down
5 changes: 1 addition & 4 deletions quinn/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,11 @@ async fn run(options: Opt) -> Result<()> {
.ok_or_else(|| anyhow!("no hostname specified"))?;

eprintln!("connecting to {} at {}", host, remote);
let new_conn = endpoint
let conn = endpoint
.connect(remote, host)?
.await
.map_err(|e| anyhow!("failed to connect: {}", e))?;
eprintln!("connected at {:?}", start.elapsed());
let quinn::NewConnection {
connection: conn, ..
} = new_conn;
let (mut send, recv) = conn
.open_bi()
.await
Expand Down
12 changes: 4 additions & 8 deletions quinn/examples/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// accept a single connection
tokio::spawn(async move {
let incoming_conn = incoming.next().await.unwrap();
let new_conn = incoming_conn.await.unwrap();
let conn = incoming_conn.await.unwrap();
println!(
"[server] connection accepted: addr={}",
new_conn.connection.remote_address()
conn.remote_address()
);
// Dropping all handles associated with a connection implicitly closes it
});

let endpoint = make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&server_cert])?;
// connect to server
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = endpoint
let connection = endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
.unwrap();
println!("[client] connected: addr={}", connection.remote_address());

// Waiting for a stream will complete with an error when the server closes the connection
let _ = uni_streams.next().await;
let _ = connection.accept_uni().await;

// Give the server has a chance to clean up
endpoint.wait_idle().await;
Expand Down
6 changes: 3 additions & 3 deletions quinn/examples/insecure_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ async fn run_server(addr: SocketAddr) {
let (mut incoming, _server_cert) = make_server_endpoint(addr).unwrap();
// accept a single connection
let incoming_conn = incoming.next().await.unwrap();
let new_conn = incoming_conn.await.unwrap();
let conn = incoming_conn.await.unwrap();
println!(
"[server] connection accepted: addr={}",
new_conn.connection.remote_address()
conn.remote_address()
);
}

Expand All @@ -36,7 +36,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box<dyn Error>> {
endpoint.set_default_client_config(client_cfg);

// connect to server
let quinn::NewConnection { connection, .. } = endpoint
let connection = endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
Expand Down
10 changes: 3 additions & 7 deletions quinn/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,7 @@ async fn run(options: Opt) -> Result<()> {
}

async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<()> {
let quinn::NewConnection {
connection,
mut bi_streams,
..
} = conn.await?;
let connection = conn.await?;
let span = info_span!(
"connection",
remote = %connection.remote_address(),
Expand All @@ -180,7 +176,8 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
info!("established");

// Each stream initiated by the client constitutes a new request.
while let Some(stream) = bi_streams.next().await {
loop {
let stream = connection.accept_bi().await;
let stream = match stream {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
info!("connection closed");
Expand All @@ -201,7 +198,6 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
.instrument(info_span!("request")),
);
}
Ok(())
}
.instrument(span)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions quinn/examples/single_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
let (mut incoming, server_cert) = make_server_endpoint(addr)?;
// accept a single connection
tokio::spawn(async move {
let quinn::NewConnection { connection, .. } = incoming.next().await.unwrap().await.unwrap();
let connection = incoming.next().await.unwrap().await.unwrap();
println!(
"[server] incoming connection: addr={}",
connection.remote_address()
Expand All @@ -54,6 +54,6 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
/// Attempt QUIC connection with the given server address.
async fn run_client(endpoint: &Endpoint, server_addr: SocketAddr) {
let connect = endpoint.connect(server_addr, "localhost").unwrap();
let quinn::NewConnection { connection, .. } = connect.await.unwrap();
let connection = connect.await.unwrap();
println!("[client] connected: addr={}", connection.remote_address());
}
Loading

0 comments on commit 4fc71b6

Please sign in to comment.