Skip to content

Commit

Permalink
*: Introduce rustfmt (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Jun 3, 2022
1 parent dcf8752 commit 233199d
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 335 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Check formatting
run: cargo fmt -- --check
- name: Build
run: cargo build --verbose
- name: Run tests
Expand Down
53 changes: 35 additions & 18 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use constrained_connection::{Endpoint, new_unconstrained_connection, samples};
use criterion::{BenchmarkId, criterion_group, criterion_main, Criterion, Throughput};
use futures::{channel::mpsc, future, prelude::*, io::AsyncReadExt};
use constrained_connection::{new_unconstrained_connection, samples, Endpoint};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures::{channel::mpsc, future, io::AsyncReadExt, prelude::*};
use std::sync::Arc;
use tokio::{runtime::Runtime, task};
use yamux::{Config, Connection, Mode};
Expand All @@ -31,9 +31,15 @@ fn concurrent(c: &mut Criterion) {
let data = Bytes(Arc::new(vec![0x42; 4096]));
let networks = vec![
("mobile", (|| samples::mobile_hsdpa().2) as fn() -> (_, _)),
("adsl2+", (|| samples::residential_adsl2().2) as fn() -> (_, _)),
(
"adsl2+",
(|| samples::residential_adsl2().2) as fn() -> (_, _),
),
("gbit-lan", (|| samples::gbit_lan().2) as fn() -> (_, _)),
("unconstrained", new_unconstrained_connection as fn() -> (_, _)),
(
"unconstrained",
new_unconstrained_connection as fn() -> (_, _),
),
];

let mut group = c.benchmark_group("concurrent");
Expand All @@ -45,15 +51,20 @@ fn concurrent(c: &mut Criterion) {
let data = data.clone();
let rt = Runtime::new().unwrap();

group.throughput(Throughput::Bytes((nstreams * nmessages * data.0.len()) as u64));
group.throughput(Throughput::Bytes(
(nstreams * nmessages * data.0.len()) as u64,
));
group.bench_function(
BenchmarkId::from_parameter(
format!("{}/#streams{}/#messages{}", network_name, nstreams, nmessages),
),
|b| b.iter(|| {
let (server, client) = new_connection();
rt.block_on(oneway(*nstreams, *nmessages, data.clone(), server, client))
}),
BenchmarkId::from_parameter(format!(
"{}/#streams{}/#messages{}",
network_name, nstreams, nmessages
)),
|b| {
b.iter(|| {
let (server, client) = new_connection();
rt.block_on(oneway(*nstreams, *nmessages, data.clone(), server, client))
})
},
);
}
}
Expand Down Expand Up @@ -89,7 +100,7 @@ async fn oneway(
let mut b = vec![0; msg_len];

// Receive `nmessages` messages.
for _ in 0 .. nmessages {
for _ in 0..nmessages {
stream.read_exact(&mut b[..]).await.unwrap();
n += b.len();
}
Expand All @@ -103,24 +114,30 @@ async fn oneway(

let conn = Connection::new(client, config(), Mode::Client);
let mut ctrl = conn.control();
task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} ));
task::spawn(yamux::into_stream(conn).for_each(|r| {
r.unwrap();
future::ready(())
}));

for _ in 0 .. nstreams {
for _ in 0..nstreams {
let data = data.clone();
let mut ctrl = ctrl.clone();
task::spawn(async move {
let mut stream = ctrl.open_stream().await.unwrap();

// Send `nmessages` messages.
for _ in 0 .. nmessages {
for _ in 0..nmessages {
stream.write_all(data.as_ref()).await.unwrap();
}

stream.close().await.unwrap();
});
}

let n = rx.take(nstreams).fold(0, |acc, n| future::ready(acc + n)).await;
let n = rx
.take(nstreams)
.fold(0, |acc, n| future::ready(acc + n))
.await;
assert_eq!(n, nstreams * nmessages * msg_len);
ctrl.close().await.expect("close");
}
22 changes: 14 additions & 8 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use std::{collections::VecDeque, io};
#[derive(Debug)]
pub(crate) struct Chunks {
seq: VecDeque<Chunk>,
len: usize
len: usize,
}

impl Chunks {
/// A new empty chunk list.
pub(crate) fn new() -> Self {
Chunks { seq: VecDeque::new(), len: 0 }
Chunks {
seq: VecDeque::new(),
len: 0,
}
}

/// The total length of bytes yet-to-be-read in all `Chunk`s.
Expand All @@ -36,7 +39,9 @@ impl Chunks {
pub(crate) fn push(&mut self, x: Vec<u8>) {
self.len += x.len();
if !x.is_empty() {
self.seq.push_back(Chunk { cursor: io::Cursor::new(x) })
self.seq.push_back(Chunk {
cursor: io::Cursor::new(x),
})
}
}

Expand All @@ -59,7 +64,7 @@ impl Chunks {
/// vector can be consumed in steps.
#[derive(Debug)]
pub(crate) struct Chunk {
cursor: io::Cursor<Vec<u8>>
cursor: io::Cursor<Vec<u8>>,
}

impl Chunk {
Expand All @@ -83,13 +88,15 @@ impl Chunk {
/// The `AsRef<[u8]>` impl of `Chunk` provides a byte-slice view
/// from the current position to the end.
pub(crate) fn advance(&mut self, amount: usize) {
assert!({ // the new position must not exceed the vector's length
assert!({
// the new position must not exceed the vector's length
let pos = self.offset().checked_add(amount);
let max = self.cursor.get_ref().len();
pos.is_some() && pos <= Some(max)
});

self.cursor.set_position(self.cursor.position() + amount as u64);
self.cursor
.set_position(self.cursor.position() + amount as u64);
}

/// Consume `self` and return the inner vector.
Expand All @@ -100,7 +107,6 @@ impl Chunk {

impl AsRef<[u8]> for Chunk {
fn as_ref(&self) -> &[u8] {
&self.cursor.get_ref()[self.offset() ..]
&self.cursor.get_ref()[self.offset()..]
}
}

53 changes: 30 additions & 23 deletions src/connection/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use crate::{Stream, error::ConnectionError};
use futures::{ready, channel::{mpsc, oneshot}, prelude::*};
use std::{pin::Pin, task::{Context, Poll}};
use super::ControlCommand;
use crate::{error::ConnectionError, Stream};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
ready,
};
use std::{
pin::Pin,
task::{Context, Poll},
};

type Result<T> = std::result::Result<T, ConnectionError>;

Expand All @@ -31,15 +38,15 @@ pub struct Control {
/// Pending state of `poll_open_stream`.
pending_open: Option<oneshot::Receiver<Result<Stream>>>,
/// Pending state of `poll_close`.
pending_close: Option<oneshot::Receiver<()>>
pending_close: Option<oneshot::Receiver<()>>,
}

impl Clone for Control {
fn clone(&self) -> Self {
Control {
sender: self.sender.clone(),
pending_open: None,
pending_close: None
pending_close: None,
}
}
}
Expand All @@ -49,7 +56,7 @@ impl Control {
Control {
sender,
pending_open: None,
pending_close: None
pending_close: None,
}
}

Expand All @@ -63,9 +70,14 @@ impl Control {
/// Close the connection.
pub async fn close(&mut self) -> Result<()> {
let (tx, rx) = oneshot::channel();
if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
if self
.sender
.send(ControlCommand::CloseConnection(tx))
.await
.is_err()
{
// The receiver is closed which means the connection is already closed.
return Ok(())
return Ok(());
}
// A dropped `oneshot::Sender` means the `Connection` is gone,
// so we do not treat receive errors differently here.
Expand All @@ -84,14 +96,12 @@ impl Control {
self.pending_open = Some(rx)
}
Some(mut rx) => match rx.poll_unpin(cx)? {
Poll::Ready(result) => {
return Poll::Ready(result)
}
Poll::Ready(result) => return Poll::Ready(result),
Poll::Pending => {
self.pending_open = Some(rx);
return Poll::Pending
return Poll::Pending;
}
}
},
}
}
}
Expand All @@ -108,35 +118,32 @@ impl Control {
None => {
if ready!(self.sender.poll_ready(cx)).is_err() {
// The receiver is closed which means the connection is already closed.
return Poll::Ready(Ok(()))
return Poll::Ready(Ok(()));
}
let (tx, rx) = oneshot::channel();
if let Err(e) = self.sender.start_send(ControlCommand::CloseConnection(tx)) {
if e.is_full() {
continue
continue;
}
debug_assert!(e.is_disconnected());
// The receiver is closed which means the connection is already closed.
return Poll::Ready(Ok(()))
return Poll::Ready(Ok(()));
}
self.pending_close = Some(rx)
}
Some(mut rx) => match rx.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
return Poll::Ready(Ok(()))
}
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
Poll::Ready(Err(oneshot::Canceled)) => {
// A dropped `oneshot::Sender` means the `Connection` is gone,
// which is `Ok`ay for us here.
return Poll::Ready(Ok(()))
return Poll::Ready(Ok(()));
}
Poll::Pending => {
self.pending_close = Some(rx);
return Poll::Pending
return Poll::Pending;
}
}
},
}
}
}
}

Loading

0 comments on commit 233199d

Please sign in to comment.