diff --git a/Cargo.toml b/Cargo.toml index e03e2c7a2..7e183832b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ lazy_static = "1.4.0" # for testing we'll use the git version. bytes = { version = "0.5.0", git = "https://github.com/tokio-rs/bytes", rev = "79e4b2847f27137faaf149d116a352cbeae47fd1" } env_logger = { version = "0.6.2", default-features = false } -slab = "0.4.2" tempdir = "0.3.7" net2 = "0.2.33" diff --git a/tests/close_on_drop.rs b/tests/close_on_drop.rs index 0f1dc4dc3..faa85c291 100644 --- a/tests/close_on_drop.rs +++ b/tests/close_on_drop.rs @@ -1,4 +1,5 @@ -use bytes::BytesMut; +use std::io::Read; + use log::debug; use mio::net::{TcpListener, TcpStream}; @@ -6,7 +7,7 @@ use mio::{Events, Interests, Poll, Token}; mod util; -use util::{any_local_address, init, TryRead}; +use util::{any_local_address, init}; use self::TestState::{AfterRead, Initial}; @@ -50,16 +51,16 @@ impl TestHandler { match self.state { Initial => { let mut buf = [0; 4096]; - debug!("GOT={:?}", self.cli.try_read(&mut buf[..])); + debug!("GOT={:?}", self.cli.read(&mut buf[..])); self.state = AfterRead; } AfterRead => {} } - let mut buf = BytesMut::with_capacity(1024); + let mut buf = Vec::with_capacity(1024); - match self.cli.try_read_buf(&mut buf) { - Ok(Some(0)) => self.shutdown = true, + match self.cli.read(&mut buf) { + Ok(0) => self.shutdown = true, Ok(_) => panic!("the client socket should not be readable"), Err(e) => panic!("Unexpected error {:?}", e), } diff --git a/tests/tcp.rs b/tests/tcp.rs index 763bafa59..5e82e9079 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1,26 +1,23 @@ use std::io::{self, Read, Write}; -use std::net::Shutdown; +use std::net::{self, Shutdown}; #[cfg(unix)] use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::sync::mpsc::channel; -use std::thread::sleep; +use std::thread::{self, sleep}; use std::time::Duration; -use std::{net, thread}; -use bytes::{Buf, Bytes, BytesMut}; -use log::{debug, info}; #[cfg(unix)] use net2::TcpStreamExt; -use slab::Slab; use mio::net::{TcpListener, TcpStream}; -use mio::{Events, Interests, Poll, Registry, Token}; +use mio::{Events, Interests, Poll, Token}; +#[macro_use] mod util; use util::{ any_local_address, assert_send, assert_sync, expect_events, expect_no_events, init, - init_with_poll, ExpectEvent, TryRead, TryWrite, + init_with_poll, ExpectEvent, }; const LISTEN: Token = Token(0); @@ -196,7 +193,7 @@ fn read() { assert_eq!(event.token(), Token(1)); let mut b = [0; 1024]; loop { - if let Some(amt) = h.socket.try_read(&mut b).unwrap() { + if let Ok(amt) = h.socket.read(&mut b) { h.amt += amt; } else { break; @@ -261,7 +258,7 @@ fn peek() { } loop { - if let Some(amt) = h.socket.try_read(&mut b).unwrap() { + if let Ok(amt) = h.socket.read(&mut b) { h.amt += amt; } else { break; @@ -320,7 +317,7 @@ fn write() { assert_eq!(event.token(), Token(1)); let b = [0; 1024]; loop { - if let Some(amt) = h.socket.try_write(&b).unwrap() { + if let Ok(amt) = h.socket.write(&b) { h.amt += amt; } else { break; @@ -515,7 +512,7 @@ fn connection_reset_by_peer() { drop(client); // Wait a moment - thread::sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(100)); // Register interest in the server socket poll.registry() @@ -745,12 +742,13 @@ fn local_addr_ready() { SERVER => { handler.accepted.as_ref().unwrap().peer_addr().unwrap(); handler.accepted.as_ref().unwrap().local_addr().unwrap(); - handler + let n = handler .accepted .as_mut() .unwrap() - .try_write(&[1, 2, 3]) + .write(&[1, 2, 3]) .unwrap(); + assert_eq!(n, 3); handler.accepted = None; } CLIENT => { @@ -764,322 +762,6 @@ fn local_addr_ready() { } } -struct EchoConn { - sock: TcpStream, - buf: Option, - mut_buf: Option, - token: Option, - interests: Option, -} - -impl EchoConn { - fn new(sock: TcpStream) -> EchoConn { - EchoConn { - sock, - buf: None, - mut_buf: Some(BytesMut::with_capacity(2048)), - token: None, - interests: None, - } - } - - fn writable(&mut self, registry: &Registry) -> io::Result<()> { - let mut buf = self.buf.take().unwrap(); - - match self.sock.try_write_buf(&mut buf) { - Ok(None) => { - debug!("client flushing buf; WOULDBLOCK"); - - self.buf = Some(buf); - self.interests = match self.interests { - None => Some(Interests::WRITABLE), - Some(i) => Some(i | Interests::WRITABLE), - }; - } - Ok(Some(r)) => { - debug!("CONN : we wrote {} bytes!", r); - - self.mut_buf = Some(buf.try_mut().unwrap()); - - self.interests = Some(Interests::READABLE); - } - Err(e) => debug!("not implemented; client err={:?}", e), - } - - assert!( - self.interests.unwrap().is_readable() || self.interests.unwrap().is_writable(), - "actual={:?}", - self.interests - ); - registry.reregister(&self.sock, self.token.unwrap(), self.interests.unwrap()) - } - - fn readable(&mut self, registry: &Registry) -> io::Result<()> { - let mut buf = self.mut_buf.take().unwrap(); - buf.clear(); - - match self.sock.try_read_buf(&mut buf) { - Ok(None) => { - debug!("CONN : spurious read wakeup"); - self.mut_buf = Some(buf); - } - Ok(Some(r)) => { - debug!("CONN : we read {} bytes!", r); - - // prepare to provide this to writable - self.buf = Some(buf.freeze()); - - self.interests = Some(Interests::WRITABLE); - } - Err(e) => { - debug!("not implemented; client err={:?}", e); - if self.interests == Some(Interests::READABLE) { - self.interests = None; - } else if let Some(x) = self.interests.as_mut() { - *x = Interests::WRITABLE; - } - } - }; - - assert!( - self.interests.unwrap().is_readable() || self.interests.unwrap().is_writable(), - "actual={:?}", - self.interests - ); - registry.reregister(&self.sock, self.token.unwrap(), self.interests.unwrap()) - } -} - -struct EchoServer { - sock: TcpListener, - conns: Slab, -} - -impl EchoServer { - fn accept(&mut self, registry: &Registry) -> io::Result<()> { - debug!("server accepting socket"); - - let sock = self.sock.accept().unwrap().0; - let conn = EchoConn::new(sock); - let tok = self.conns.insert(conn); - - // Register the connection - self.conns[tok].token = Some(Token(tok)); - registry - .register(&self.conns[tok].sock, Token(tok), Interests::READABLE) - .expect("could not register socket with event loop"); - - Ok(()) - } - - fn conn_readable(&mut self, registry: &Registry, tok: Token) -> io::Result<()> { - debug!("server conn readable; tok={:?}", tok); - self.conn(tok).readable(registry) - } - - fn conn_writable(&mut self, registry: &Registry, tok: Token) -> io::Result<()> { - debug!("server conn writable; tok={:?}", tok); - self.conn(tok).writable(registry) - } - - fn conn(&mut self, tok: Token) -> &mut EchoConn { - &mut self.conns[tok.into()] - } -} - -struct EchoClient { - sock: TcpStream, - msgs: Vec<&'static str>, - tx: Bytes, - rx: Bytes, - mut_buf: Option, - token: Token, - interests: Option, - shutdown: bool, -} - -// Sends a message and expects to receive the same exact message, one at a time -impl EchoClient { - fn new(sock: TcpStream, token: Token, mut msgs: Vec<&'static str>) -> EchoClient { - let curr = msgs.remove(0); - - EchoClient { - sock, - msgs, - tx: Bytes::from_static(curr.as_bytes()), - rx: Bytes::from_static(curr.as_bytes()), - mut_buf: Some(BytesMut::with_capacity(2048)), - token, - interests: None, - shutdown: false, - } - } - - fn readable(&mut self, registry: &Registry) -> io::Result<()> { - debug!("client socket readable"); - - let mut buf = self.mut_buf.take().unwrap(); - buf.clear(); - - match self.sock.try_read_buf(&mut buf) { - Ok(None) => { - debug!("CLIENT : spurious read wakeup"); - self.mut_buf = Some(buf); - } - Ok(Some(r)) => { - debug!("CLIENT : We read {} bytes!", r); - - // prepare for reading - let mut buf = buf.freeze(); - - while buf.has_remaining() { - let actual = buf.get_u8(); - let expect = self.rx.get_u8(); - - assert!(actual == expect, "actual={}; expect={}", actual, expect); - } - - self.mut_buf = Some(buf.try_mut().unwrap()); - - if self.interests == Some(Interests::READABLE) { - self.interests = None; - } else if let Some(x) = self.interests.as_mut() { - *x = Interests::WRITABLE; - } - - if !self.rx.has_remaining() { - self.next_msg(registry).unwrap(); - } - } - Err(e) => { - panic!("not implemented; client err={:?}", e); - } - }; - - if let Some(x) = self.interests { - registry.reregister(&self.sock, self.token, x)?; - } - - Ok(()) - } - - fn writable(&mut self, registry: &Registry) -> io::Result<()> { - debug!("client socket writable"); - - match self.sock.try_write_buf(&mut self.tx) { - Ok(None) => { - debug!("client flushing buf; WOULDBLOCK"); - self.interests = match self.interests { - None => Some(Interests::WRITABLE), - Some(i) => Some(i | Interests::WRITABLE), - }; - } - Ok(Some(r)) => { - debug!("CLIENT : we wrote {} bytes!", r); - self.interests = match self.interests { - None => Some(Interests::READABLE), - Some(_) => Some(Interests::READABLE), - }; - } - Err(e) => debug!("not implemented; client err={:?}", e), - } - - if self.interests.unwrap().is_readable() || self.interests.unwrap().is_writable() { - registry.reregister(&self.sock, self.token, self.interests.unwrap())?; - } - - Ok(()) - } - - fn next_msg(&mut self, registry: &Registry) -> io::Result<()> { - if self.msgs.is_empty() { - self.shutdown = true; - return Ok(()); - } - - let curr = self.msgs.remove(0); - - debug!("client prepping next message"); - self.tx = Bytes::from_static(curr.as_bytes()); - self.rx = Bytes::from_static(curr.as_bytes()); - - self.interests = match self.interests { - None => Some(Interests::WRITABLE), - Some(i) => Some(i | Interests::WRITABLE), - }; - registry.reregister(&self.sock, self.token, self.interests.unwrap()) - } -} - -struct Echo { - server: EchoServer, - client: EchoClient, -} - -impl Echo { - fn new(srv: TcpListener, client: TcpStream, msgs: Vec<&'static str>) -> Echo { - Echo { - server: EchoServer { - sock: srv, - conns: Slab::with_capacity(128), - }, - client: EchoClient::new(client, CLIENT, msgs), - } - } -} - -#[test] -pub fn echo_server() { - init(); - - debug!("Starting TEST_ECHO_SERVER"); - let mut poll = Poll::new().unwrap(); - - let srv = TcpListener::bind(any_local_address()).unwrap(); - let addr = srv.local_addr().unwrap(); - - info!("listen for connections"); - poll.registry() - .register(&srv, SERVER, Interests::READABLE) - .unwrap(); - - let sock = TcpStream::connect(addr).unwrap(); - - // Connect to the server - poll.registry() - .register(&sock, CLIENT, Interests::WRITABLE) - .unwrap(); - - // == Create storage for events - let mut events = Events::with_capacity(1024); - - let mut handler = Echo::new(srv, sock, vec!["foo", "bar"]); - - // Start the event loop - while !handler.client.shutdown { - poll.poll(&mut events, None).unwrap(); - - for event in &events { - debug!("ready {:?} {:?}", event.token(), event); - if event.is_readable() { - match event.token() { - SERVER => handler.server.accept(poll.registry()).unwrap(), - CLIENT => handler.client.readable(poll.registry()).unwrap(), - i => handler.server.conn_readable(poll.registry(), i).unwrap(), - } - } - - if event.is_writable() { - match event.token() { - SERVER => panic!("received writable for token 0"), - CLIENT => handler.client.writable(poll.registry()).unwrap(), - i => handler.server.conn_writable(poll.registry(), i).unwrap(), - }; - } - } - } -} - #[test] fn write_then_drop() { init(); @@ -1133,8 +815,7 @@ fn write_then_drop() { assert_eq!(events.iter().next().unwrap().token(), Token(3)); let mut buf = [0; 10]; - assert_eq!(s.read(&mut buf).unwrap(), 4); - assert_eq!(&buf[0..4], &[1, 2, 3, 4]); + expect_read!(s.read(&mut buf), &[1, 2, 3, 4]); } #[test] @@ -1189,8 +870,7 @@ fn write_then_deregister() { assert_eq!(events.iter().next().unwrap().token(), Token(3)); let mut buf = [0; 10]; - assert_eq!(s.read(&mut buf).unwrap(), 4); - assert_eq!(&buf[0..4], &[1, 2, 3, 4]); + expect_read!(s.read(&mut buf), &[1, 2, 3, 4]); } const ID1: Token = Token(1); @@ -1247,15 +927,13 @@ fn tcp_no_events_after_deregister() { expect_no_events(&mut poll, &mut events); let mut buf = [0; 10]; - assert_eq!(stream.read(&mut buf).unwrap(), 4); - assert_eq!(&buf[0..4], &[1, 2, 3, 4]); + expect_read!(stream.read(&mut buf), &[1, 2, 3, 4]); - stream2.write_all(&[1, 2, 3, 4]).unwrap(); + checked_write!(stream2.write(&[1, 2, 3, 4])); expect_no_events(&mut poll, &mut events); sleep(Duration::from_millis(200)); - assert_eq!(stream.read(&mut buf).unwrap(), 4); - assert_eq!(&buf[0..4], &[1, 2, 3, 4]); + expect_read!(stream.read(&mut buf), &[1, 2, 3, 4]); expect_no_events(&mut poll, &mut events); } diff --git a/tests/tcp_listener.rs b/tests/tcp_listener.rs index 31888b548..cfe3ac0b7 100644 --- a/tests/tcp_listener.rs +++ b/tests/tcp_listener.rs @@ -40,7 +40,7 @@ fn tcp_listener_std() { let listener = net::TcpListener::bind(addr).unwrap(); // `std::net::TcpListener`s are blocking by default, so make sure it is in // non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(listener.set_nonblocking(true)); + listener.set_nonblocking(true).unwrap(); Ok(TcpListener::from_std(listener)) }); } diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 6877349c5..5947ee6ef 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -1,11 +1,9 @@ -use log::warn; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::{self, Shutdown, SocketAddr}; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::sync::{mpsc::channel, Arc, Barrier}; use std::thread; -use std::time::Duration; use mio::net::TcpStream; use mio::{Interests, Token}; @@ -15,7 +13,7 @@ mod util; use util::{ any_local_address, any_local_ipv6_address, assert_send, assert_sync, assert_would_block, - expect_events, expect_no_events, init, init_with_poll, ExpectEvent, + expect_events, expect_no_events, init, init_with_poll, ExpectEvent, Readiness, }; const DATA1: &[u8] = b"Hello world!"; @@ -52,7 +50,7 @@ fn tcp_stream_std() { let stream = net::TcpStream::connect(addr).unwrap(); // `std::net::TcpStream`s are blocking by default, so make sure it is // in non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(stream.set_nonblocking(true)); + stream.set_nonblocking(true).unwrap(); Ok(TcpStream::from_std(stream)) }); } @@ -85,8 +83,7 @@ where assert_eq!(stream.peer_addr().unwrap(), addr); assert!(stream.local_addr().unwrap().ip().is_loopback()); - let n = stream.write(&DATA1).expect("unable to write to stream"); - assert_eq!(n, DATA1.len()); + checked_write!(stream.write(&DATA1)); stream.flush().unwrap(); @@ -96,13 +93,8 @@ where vec![ExpectEvent::new(ID1, Interests::READABLE)], ); - let n = stream.peek(&mut buf).expect("unable to peek from stream"); - assert_eq!(n, DATA1.len()); - assert_eq!(&buf[..n], DATA1); - - let n = stream.read(&mut buf).expect("unable to read from stream"); - assert_eq!(n, DATA1.len()); - assert_eq!(&buf[..n], DATA1); + expect_read!(stream.peek(&mut buf), DATA1); + expect_read!(stream.read(&mut buf), DATA1); assert!(stream.take_error().unwrap().is_none()); @@ -152,8 +144,7 @@ fn try_clone() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - let n = stream1.write(DATA1).unwrap(); - assert_eq!(n, DATA1.len()); + checked_write!(stream1.write(&DATA1)); let mut stream2 = stream1.try_clone().unwrap(); @@ -172,9 +163,7 @@ fn try_clone() { ); let mut buf = [0; 20]; - let n = stream2.read(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(&buf[..n], DATA1); + expect_read!(stream2.read(&mut buf), DATA1); drop(stream2); thread_handle.join().expect("unable to join thread"); @@ -324,8 +313,7 @@ fn shutdown_read() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - let n = stream.write(DATA2).unwrap(); - assert_eq!(n, DATA2.len()); + checked_write!(stream.write(&DATA2)); expect_events( &mut poll, @@ -347,8 +335,7 @@ fn shutdown_read() { ))] { let mut buf = [0; 20]; - let n = stream.read(&mut buf).unwrap(); - assert_eq!(n, 0); + expect_read!(stream.read(&mut buf), &[]); } drop(stream); @@ -374,8 +361,7 @@ fn shutdown_write() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - let n = stream.write(DATA1).unwrap(); - assert_eq!(n, DATA1.len()); + checked_write!(stream.write(&DATA1)); stream.shutdown(Shutdown::Write).unwrap(); @@ -391,9 +377,7 @@ fn shutdown_write() { // Read should be ok. let mut buf = [0; 20]; - let n = stream.read(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(&buf[..n], DATA1); + expect_read!(stream.read(&mut buf), DATA1); drop(stream); thread_handle.join().expect("unable to join thread"); @@ -417,8 +401,7 @@ fn shutdown_both() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - let n = stream.write(DATA1).unwrap(); - assert_eq!(n, DATA1.len()); + checked_write!(stream.write(&DATA1)); expect_events( &mut poll, @@ -440,8 +423,7 @@ fn shutdown_both() { ))] { let mut buf = [0; 20]; - let n = stream.read(&mut buf).unwrap(); - assert_eq!(n, 0); + expect_read!(stream.read(&mut buf), &[]); } let err = stream.write(DATA2).unwrap_err(); @@ -549,8 +531,7 @@ fn no_events_after_deregister() { assert_would_block(stream.peek(&mut buf)); assert_would_block(stream.read(&mut buf)); - let n = stream.write(&DATA1).expect("unable to write to stream"); - assert_eq!(n, DATA1.len()); + checked_write!(stream.write(&DATA1)); stream.flush().unwrap(); expect_no_events(&mut poll, &mut events); @@ -569,11 +550,11 @@ fn tcp_shutdown_client_read_close_event() { let barrier = Arc::new(Barrier::new(2)); let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false); - let stream = assert_ok!(TcpStream::connect(sockaddr)); + let stream = TcpStream::connect(sockaddr).unwrap(); let interests = Interests::READABLE | Interests::WRITABLE; - assert_ok!(poll.registry().register(&stream, ID1, interests)); + poll.registry().register(&stream, ID1, interests).unwrap(); expect_events( &mut poll, @@ -581,8 +562,12 @@ fn tcp_shutdown_client_read_close_event() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Read)); - expect_readiness!(poll, events, is_read_closed); + stream.shutdown(Shutdown::Read).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], + ); barrier.wait(); handle.join().expect("failed to join thread"); @@ -599,11 +584,11 @@ fn tcp_shutdown_client_write_close_event() { let barrier = Arc::new(Barrier::new(2)); let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false); - let stream = assert_ok!(TcpStream::connect(sockaddr)); + let stream = TcpStream::connect(sockaddr).unwrap(); let interests = Interests::READABLE | Interests::WRITABLE; - assert_ok!(poll.registry().register(&stream, ID1, interests)); + poll.registry().register(&stream, ID1, interests).unwrap(); expect_events( &mut poll, @@ -611,8 +596,12 @@ fn tcp_shutdown_client_write_close_event() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Write)); - expect_readiness!(poll, events, is_write_closed); + stream.shutdown(Shutdown::Write).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)], + ); barrier.wait(); handle.join().expect("failed to join thread"); @@ -624,13 +613,11 @@ fn tcp_shutdown_server_write_close_event() { let barrier = Arc::new(Barrier::new(2)); let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), true); - let stream = assert_ok!(TcpStream::connect(sockaddr)); + let stream = TcpStream::connect(sockaddr).unwrap(); - assert_ok!(poll.registry().register( - &stream, - ID1, - Interests::READABLE.add(Interests::WRITABLE) - )); + poll.registry() + .register(&stream, ID1, Interests::READABLE.add(Interests::WRITABLE)) + .unwrap(); expect_events( &mut poll, @@ -640,7 +627,11 @@ fn tcp_shutdown_server_write_close_event() { barrier.wait(); - expect_readiness!(poll, events, is_read_closed); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], + ); barrier.wait(); handle.join().expect("failed to join thread"); @@ -656,13 +647,11 @@ fn tcp_shutdown_client_both_close_event() { let barrier = Arc::new(Barrier::new(2)); let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false); - let stream = assert_ok!(TcpStream::connect(sockaddr)); + let stream = TcpStream::connect(sockaddr).unwrap(); - assert_ok!(poll.registry().register( - &stream, - ID1, - Interests::READABLE.add(Interests::WRITABLE) - )); + poll.registry() + .register(&stream, ID1, Interests::READABLE.add(Interests::WRITABLE)) + .unwrap(); expect_events( &mut poll, @@ -670,8 +659,12 @@ fn tcp_shutdown_client_both_close_event() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Both)); - expect_readiness!(poll, events, is_write_closed); + stream.shutdown(Shutdown::Both).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)], + ); barrier.wait(); handle.join().expect("failed to join thread"); @@ -705,8 +698,7 @@ fn echo_listener(addr: SocketAddr, n_connections: usize) -> (thread::JoinHandle< if n == 0 { break; } - let written = stream.write(&buf[..n]).expect("error writing"); - assert_eq!(written, n, "short write"); + checked_write!(stream.write(&buf[..n])); } } }); @@ -733,7 +725,7 @@ fn start_listener( barrier.wait(); if shutdown_write { - assert_ok!(stream.shutdown(Shutdown::Write)); + stream.shutdown(Shutdown::Write).unwrap(); barrier.wait(); } } diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index c79503fd3..936f5c7a5 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -13,6 +13,7 @@ use log::{debug, info}; use mio::net::UdpSocket; use mio::{Events, Interests, Poll, Registry, Token}; +#[macro_use] mod util; use util::{ @@ -56,8 +57,8 @@ fn unconnected_udp_socket_std() { // `std::net::UdpSocket`s are blocking by default, so make sure they are // in non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(socket1.set_nonblocking(true)); - assert_ok!(socket2.set_nonblocking(true)); + socket1.set_nonblocking(true).unwrap(); + socket2.set_nonblocking(true).unwrap(); let socket1 = UdpSocket::from_std(socket1); let socket2 = UdpSocket::from_std(socket2); @@ -90,8 +91,8 @@ fn smoke_test_unconnected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { assert_would_block(socket1.peek_from(&mut buf)); assert_would_block(socket1.recv_from(&mut buf)); - socket1.send_to(DATA1, address2).unwrap(); - socket2.send_to(DATA2, address1).unwrap(); + checked_write!(socket1.send_to(DATA1, address2)); + checked_write!(socket2.send_to(DATA2, address1)); expect_events( &mut poll, @@ -102,25 +103,11 @@ fn smoke_test_unconnected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { ], ); - let (n, got_address1) = socket1.peek_from(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - assert_eq!(got_address1, address2); + expect_read!(socket1.peek_from(&mut buf), DATA2, address2); + expect_read!(socket2.peek_from(&mut buf), DATA1, address1); - let (n, got_address2) = socket2.peek_from(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); - assert_eq!(got_address2, address1); - - let (n, got_address1) = socket1.recv_from(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - assert_eq!(got_address1, address2); - - let (n, got_address2) = socket2.recv_from(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); - assert_eq!(got_address2, address1); + expect_read!(socket1.recv_from(&mut buf), DATA2, address2); + expect_read!(socket2.recv_from(&mut buf), DATA1, address1); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -186,8 +173,8 @@ fn connected_udp_socket_std() { // `std::net::UdpSocket`s are blocking by default, so make sure they are // in non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(socket1.set_nonblocking(true)); - assert_ok!(socket2.set_nonblocking(true)); + socket1.set_nonblocking(true).unwrap(); + socket2.set_nonblocking(true).unwrap(); let socket1 = UdpSocket::from_std(socket1); let socket2 = UdpSocket::from_std(socket2); @@ -218,8 +205,8 @@ fn smoke_test_connected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { assert_would_block(socket1.peek(&mut buf)); assert_would_block(socket1.recv(&mut buf)); - socket1.send(DATA1).unwrap(); - socket2.send(DATA2).unwrap(); + checked_write!(socket1.send(DATA1)); + checked_write!(socket2.send(DATA2)); expect_events( &mut poll, @@ -231,21 +218,11 @@ fn smoke_test_connected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { ); let mut buf = [0; 20]; - let n = socket1.peek(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - - let n = socket2.peek(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket1.peek(&mut buf), DATA2); + expect_read!(socket2.peek(&mut buf), DATA1); - let n = socket1.recv(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - - let n = socket2.recv(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket1.recv(&mut buf), DATA2); + expect_read!(socket2.recv(&mut buf), DATA1); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -283,7 +260,7 @@ fn reconnect_udp_socket_sending() { vec![ExpectEvent::new(ID1, Interests::WRITABLE)], ); - socket1.send(DATA1).unwrap(); + checked_write!(socket1.send(DATA1)); expect_events( &mut poll, @@ -292,12 +269,10 @@ fn reconnect_udp_socket_sending() { ); let mut buf = [0; 20]; - let n = socket2.recv(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket2.recv(&mut buf), DATA1); socket1.connect(address3).unwrap(); - socket1.send(DATA2).unwrap(); + checked_write!(socket1.send(DATA2)); expect_events( &mut poll, @@ -305,9 +280,7 @@ fn reconnect_udp_socket_sending() { vec![ExpectEvent::new(ID3, Interests::READABLE)], ); - let n = socket3.recv(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); + expect_read!(socket3.recv(&mut buf), DATA2); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -350,7 +323,7 @@ fn reconnect_udp_socket_receiving() { ], ); - socket2.send(DATA1).unwrap(); + checked_write!(socket2.send(DATA1)); expect_events( &mut poll, @@ -359,12 +332,10 @@ fn reconnect_udp_socket_receiving() { ); let mut buf = [0; 20]; - let n = socket1.recv(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket1.recv(&mut buf), DATA1); socket1.connect(address3).unwrap(); - socket3.send(DATA2).unwrap(); + checked_write!(socket3.send(DATA2)); expect_events( &mut poll, @@ -374,13 +345,11 @@ fn reconnect_udp_socket_receiving() { // Read only a part of the data. let max = 4; - let n = socket1.recv(&mut buf[..max]).unwrap(); - assert_eq!(n, max); - assert_eq!(buf[..max], DATA2[..max]); + expect_read!(socket1.recv(&mut buf[..max]), &DATA2[..max]); // Now connect back to socket 2, dropping the unread data. socket1.connect(address2).unwrap(); - socket2.send(DATA2).unwrap(); + checked_write!(socket2.send(DATA2)); expect_events( &mut poll, @@ -388,9 +357,7 @@ fn reconnect_udp_socket_receiving() { vec![ExpectEvent::new(ID1, Interests::READABLE)], ); - let n = socket1.recv(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); + expect_read!(socket1.recv(&mut buf), DATA2); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -423,7 +390,7 @@ fn unconnected_udp_socket_connected_methods() { assert_error(socket1.send(DATA1), "address required"); // Now send some actual data. - socket1.send_to(DATA1, address2).unwrap(); + checked_write!(socket1.send_to(DATA1, address2)); expect_events( &mut poll, @@ -434,13 +401,8 @@ fn unconnected_udp_socket_connected_methods() { // Receive methods don't require the socket to be connected, you just won't // know the sender. let mut buf = [0; 20]; - let n = socket2.peek(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); - - let n = socket2.recv(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket2.peek(&mut buf), DATA1); + expect_read!(socket2.recv(&mut buf), DATA1); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -488,7 +450,7 @@ fn connected_udp_socket_unconnected_methods() { #[cfg(not(any(target_os = "android", target_os = "linux", target_os = "windows")))] assert_error(socket1.send_to(DATA1, address3), "already connected"); - socket2.send_to(DATA2, address3).unwrap(); + checked_write!(socket2.send_to(DATA2, address3)); expect_events( &mut poll, @@ -497,15 +459,8 @@ fn connected_udp_socket_unconnected_methods() { ); let mut buf = [0; 20]; - let (n, got_address1) = socket3.peek_from(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - assert_eq!(got_address1, address2); - - let (n, got_address2) = socket3.recv_from(&mut buf).unwrap(); - assert_eq!(n, DATA2.len()); - assert_eq!(buf[..n], DATA2[..]); - assert_eq!(got_address2, address2); + expect_read!(socket3.peek_from(&mut buf), DATA2, address2); + expect_read!(socket3.recv_from(&mut buf), DATA2, address2); assert!(socket1.take_error().unwrap().is_none()); assert!(socket2.take_error().unwrap().is_none()); @@ -574,9 +529,7 @@ fn udp_socket_reregister() { ); let mut buf = [0; 20]; - let (n, _) = socket.recv_from(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket.recv_from(&mut buf), DATA1, __anywhere); thread_handle.join().expect("unable to join thread"); } @@ -604,9 +557,7 @@ fn udp_socket_no_events_after_deregister() { // But we do expect a packet to be send. let mut buf = [0; 20]; - let (n, _) = socket.recv_from(&mut buf).unwrap(); - assert_eq!(n, DATA1.len()); - assert_eq!(buf[..n], DATA1[..]); + expect_read!(socket.recv_from(&mut buf), DATA1, __anywhere); thread_handle.join().expect("unable to join thread"); } @@ -622,7 +573,7 @@ fn send_packets( let socket = net::UdpSocket::bind(any_local_address()).unwrap(); for _ in 0..n_packets { barrier.wait(); - assert_eq!(socket.send_to(DATA1, address).unwrap(), DATA1.len()); + checked_write!(socket.send_to(DATA1, address)); } }) } @@ -769,8 +720,7 @@ pub fn udp_socket_discard() { let mut poll = Poll::new().unwrap(); - let r = udp_outside.send(b"hello world"); - assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock); + checked_write!(udp_outside.send(b"hello world")); poll.registry() .register(&rx, LISTENER, Interests::READABLE) diff --git a/tests/unix_datagram.rs b/tests/unix_datagram.rs index 57419340f..a2c242bd5 100644 --- a/tests/unix_datagram.rs +++ b/tests/unix_datagram.rs @@ -2,24 +2,20 @@ #[macro_use] mod util; -use log::warn; use mio::net::UnixDatagram; use mio::{Interests, Token}; use std::io; use std::net::Shutdown; use std::os::unix::io::AsRawFd; use std::os::unix::net; -use std::time::Duration; use tempdir::TempDir; use util::{ assert_send, assert_sync, assert_would_block, expect_events, expect_no_events, init_with_poll, - ExpectEvent, + ExpectEvent, Readiness, }; const DATA1: &[u8] = b"Hello same host!"; const DATA2: &[u8] = b"Why hello mio!"; -const DATA1_LEN: usize = DATA1.len(); -const DATA2_LEN: usize = DATA2.len(); const DEFAULT_BUF_SIZE: usize = 64; const TEST_DIR: &str = "mio_unix_datagram_tests"; const TOKEN_1: Token = Token(0); @@ -34,40 +30,40 @@ fn is_send_and_sync() { #[test] fn unix_datagram_smoke_unconnected() { - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); smoke_test_unconnected(datagram1, datagram2); } #[test] fn unix_datagram_smoke_connected() { - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); - assert_ok!(datagram1.connect(&path2)); - assert_ok!(datagram2.connect(&path1)); + datagram1.connect(&path2).unwrap(); + datagram2.connect(&path1).unwrap(); smoke_test_connected(datagram1, datagram2); } #[test] fn unix_datagram_smoke_unconnected_from_std() { - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(net::UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(net::UnixDatagram::bind(&path2)); + let datagram1 = net::UnixDatagram::bind(&path1).unwrap(); + let datagram2 = net::UnixDatagram::bind(&path2).unwrap(); - assert_ok!(datagram1.set_nonblocking(true)); - assert_ok!(datagram2.set_nonblocking(true)); + datagram1.set_nonblocking(true).unwrap(); + datagram2.set_nonblocking(true).unwrap(); let datagram1 = UnixDatagram::from_std(datagram1); let datagram2 = UnixDatagram::from_std(datagram2); @@ -76,18 +72,18 @@ fn unix_datagram_smoke_unconnected_from_std() { #[test] fn unix_datagram_smoke_connected_from_std() { - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(net::UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(net::UnixDatagram::bind(&path2)); + let datagram1 = net::UnixDatagram::bind(&path1).unwrap(); + let datagram2 = net::UnixDatagram::bind(&path2).unwrap(); - assert_ok!(datagram1.connect(&path2)); - assert_ok!(datagram2.connect(&path1)); + datagram1.connect(&path2).unwrap(); + datagram2.connect(&path1).unwrap(); - assert_ok!(datagram1.set_nonblocking(true)); - assert_ok!(datagram2.set_nonblocking(true)); + datagram1.set_nonblocking(true).unwrap(); + datagram2.set_nonblocking(true).unwrap(); let datagram1 = UnixDatagram::from_std(datagram1); let datagram2 = UnixDatagram::from_std(datagram2); @@ -96,42 +92,50 @@ fn unix_datagram_smoke_connected_from_std() { #[test] fn unix_datagram_connect() { - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - let datagram1_local = assert_ok!(datagram1.local_addr()); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); - let datagram2_local = assert_ok!(datagram2.local_addr()); - - assert_ok!(datagram1.connect( - datagram1_local - .as_pathname() - .expect("failed to get pathname") - )); - assert_ok!(datagram2.connect( - datagram2_local - .as_pathname() - .expect("failed to get pathname") - )); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + let datagram1_local = datagram1.local_addr().unwrap(); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); + let datagram2_local = datagram2.local_addr().unwrap(); + + datagram1 + .connect( + datagram1_local + .as_pathname() + .expect("failed to get pathname"), + ) + .unwrap(); + datagram2 + .connect( + datagram2_local + .as_pathname() + .expect("failed to get pathname"), + ) + .unwrap(); } #[test] fn unix_datagram_pair() { let (mut poll, mut events) = init_with_poll(); - let (datagram1, datagram2) = assert_ok!(UnixDatagram::pair()); - assert_ok!(poll.registry().register( - &datagram1, - TOKEN_1, - Interests::READABLE | Interests::WRITABLE - )); - assert_ok!(poll.registry().register( - &datagram2, - TOKEN_2, - Interests::READABLE | Interests::WRITABLE - )); + let (datagram1, datagram2) = UnixDatagram::pair().unwrap(); + poll.registry() + .register( + &datagram1, + TOKEN_1, + Interests::READABLE | Interests::WRITABLE, + ) + .unwrap(); + poll.registry() + .register( + &datagram2, + TOKEN_2, + Interests::READABLE | Interests::WRITABLE, + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -145,10 +149,8 @@ fn unix_datagram_pair() { assert_would_block(datagram1.recv(&mut buf)); assert_would_block(datagram2.recv(&mut buf)); - let wrote1 = assert_ok!(datagram1.send(&DATA1)); - assert_eq!(wrote1, DATA1_LEN); - let wrote2 = assert_ok!(datagram2.send(&DATA2)); - assert_eq!(wrote2, DATA2_LEN); + checked_write!(datagram1.send(DATA1)); + checked_write!(datagram2.send(DATA2)); expect_events( &mut poll, &mut events, @@ -158,46 +160,40 @@ fn unix_datagram_pair() { ], ); - let read = assert_ok!(datagram2.recv(&mut buf)); - assert_would_block(datagram2.recv(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(&buf[..read], DATA1); - assert_eq!(read, wrote1, "unequal reads and writes"); - - let read = assert_ok!(datagram1.recv(&mut buf)); - assert_eq!(read, DATA2_LEN); - assert_eq!(&buf[..read], DATA2); - assert_eq!(read, wrote2, "unequal reads and writes"); + expect_read!(datagram1.recv(&mut buf), DATA2); + expect_read!(datagram2.recv(&mut buf), DATA1); - assert!(assert_ok!(datagram1.take_error()).is_none()); - assert!(assert_ok!(datagram2.take_error()).is_none()); + assert!(datagram1.take_error().unwrap().is_none()); + assert!(datagram2.take_error().unwrap().is_none()); } #[test] fn unix_datagram_try_clone() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(datagram1.try_clone()); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + let datagram2 = datagram1.try_clone().unwrap(); assert_ne!(datagram1.as_raw_fd(), datagram2.as_raw_fd()); - let datagram3 = assert_ok!(UnixDatagram::bind(&path2)); - assert_ok!(datagram3.connect(&path1)); - - assert_ok!(poll - .registry() - .register(&datagram1, TOKEN_1, Interests::READABLE)); - assert_ok!(poll - .registry() - .register(&datagram2, TOKEN_2, Interests::READABLE)); - assert_ok!(poll.registry().register( - &datagram3, - TOKEN_3, - Interests::READABLE.add(Interests::WRITABLE) - )); + let datagram3 = UnixDatagram::bind(&path2).unwrap(); + datagram3.connect(&path1).unwrap(); + + poll.registry() + .register(&datagram1, TOKEN_1, Interests::READABLE) + .unwrap(); + poll.registry() + .register(&datagram2, TOKEN_2, Interests::READABLE) + .unwrap(); + poll.registry() + .register( + &datagram3, + TOKEN_3, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -209,7 +205,7 @@ fn unix_datagram_try_clone() { assert_would_block(datagram2.recv_from(&mut buf)); assert_would_block(datagram3.recv_from(&mut buf)); - assert_ok!(datagram3.send(DATA1)); + checked_write!(datagram3.send(DATA1)); expect_events( &mut poll, &mut events, @@ -219,98 +215,103 @@ fn unix_datagram_try_clone() { ], ); - let (read, from_addr1) = assert_ok!(datagram1.recv_from(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(buf[..read], DATA1[..]); - assert_eq!( - from_addr1.as_pathname().expect("failed to get pathname"), - path2 - ); + expect_read!(datagram1.recv_from(&mut buf), DATA1, path: path2); assert_would_block(datagram2.recv_from(&mut buf)); - assert!(assert_ok!(datagram1.take_error()).is_none()); - assert!(assert_ok!(datagram2.take_error()).is_none()); - assert!(assert_ok!(datagram3.take_error()).is_none()); + assert!(datagram1.take_error().unwrap().is_none()); + assert!(datagram2.take_error().unwrap().is_none()); + assert!(datagram3.take_error().unwrap().is_none()); } #[test] fn unix_datagram_shutdown() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); - - assert_ok!(poll.registry().register( - &datagram1, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); - assert_ok!(poll.registry().register( - &datagram2, - TOKEN_2, - Interests::WRITABLE.add(Interests::READABLE) - )); - - assert_ok!(datagram1.connect(&path2)); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); + + poll.registry() + .register( + &datagram1, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); + poll.registry() + .register( + &datagram2, + TOKEN_2, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); + + datagram1.connect(&path2).unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::WRITABLE)], ); - let wrote = assert_ok!(datagram1.send(DATA1)); - assert_eq!(wrote, DATA1_LEN); + checked_write!(datagram1.send(DATA1)); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_2, Interests::READABLE)], ); - assert_ok!(datagram1.shutdown(Shutdown::Read)); - expect_readiness!(poll, events, is_read_closed); + datagram1.shutdown(Shutdown::Read).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::READ_CLOSED)], + ); - assert_ok!(datagram1.shutdown(Shutdown::Write)); - expect_readiness!(poll, events, is_write_closed); + datagram1.shutdown(Shutdown::Write).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::WRITE_CLOSED)], + ); - let err = assert_err!(datagram1.send(DATA2)); + let err = datagram1.send(DATA2).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); - assert!(assert_ok!(datagram1.take_error()).is_none()); + assert!(datagram1.take_error().unwrap().is_none()); } #[test] fn unix_datagram_register() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path = dir.path().join("any"); - let datagram = assert_ok!(UnixDatagram::bind(path)); - assert_ok!(poll - .registry() - .register(&datagram, TOKEN_1, Interests::READABLE)); + let datagram = UnixDatagram::bind(path).unwrap(); + poll.registry() + .register(&datagram, TOKEN_1, Interests::READABLE) + .unwrap(); expect_no_events(&mut poll, &mut events); } #[test] fn unix_datagram_reregister() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - assert_ok!(poll - .registry() - .register(&datagram1, TOKEN_1, Interests::READABLE)); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + poll.registry() + .register(&datagram1, TOKEN_1, Interests::READABLE) + .unwrap(); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); - assert_ok!(datagram2.connect(&path1)); - assert_ok!(poll - .registry() - .reregister(&datagram1, TOKEN_1, Interests::WRITABLE)); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); + datagram2.connect(&path1).unwrap(); + poll.registry() + .reregister(&datagram1, TOKEN_1, Interests::WRITABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -321,39 +322,43 @@ fn unix_datagram_reregister() { #[test] fn unix_datagram_deregister() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new(TEST_DIR)); + let dir = TempDir::new(TEST_DIR).unwrap(); let path1 = dir.path().join("one"); let path2 = dir.path().join("two"); - let datagram1 = assert_ok!(UnixDatagram::bind(&path1)); - assert_ok!(poll - .registry() - .register(&datagram1, TOKEN_1, Interests::WRITABLE)); + let datagram1 = UnixDatagram::bind(&path1).unwrap(); + poll.registry() + .register(&datagram1, TOKEN_1, Interests::WRITABLE) + .unwrap(); - let datagram2 = assert_ok!(UnixDatagram::bind(&path2)); - assert_ok!(datagram2.connect(&path1)); - assert_ok!(poll.registry().deregister(&datagram1)); + let datagram2 = UnixDatagram::bind(&path2).unwrap(); + datagram2.connect(&path1).unwrap(); + poll.registry().deregister(&datagram1).unwrap(); expect_no_events(&mut poll, &mut events); } fn smoke_test_unconnected(datagram1: UnixDatagram, datagram2: UnixDatagram) { let (mut poll, mut events) = init_with_poll(); - let addr1 = assert_ok!(datagram1.local_addr()); - let addr2 = assert_ok!(datagram2.local_addr()); + let addr1 = datagram1.local_addr().unwrap(); + let addr2 = datagram2.local_addr().unwrap(); let path1 = addr1.as_pathname().expect("failed to get pathname"); let path2 = addr2.as_pathname().expect("failed to get pathname"); - assert_ok!(poll.registry().register( - &datagram1, - TOKEN_1, - Interests::READABLE.add(Interests::WRITABLE) - )); - assert_ok!(poll.registry().register( - &datagram2, - TOKEN_2, - Interests::READABLE.add(Interests::WRITABLE) - )); + poll.registry() + .register( + &datagram1, + TOKEN_1, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); + poll.registry() + .register( + &datagram2, + TOKEN_2, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -367,8 +372,8 @@ fn smoke_test_unconnected(datagram1: UnixDatagram, datagram2: UnixDatagram) { assert_would_block(datagram1.recv_from(&mut buf)); assert_would_block(datagram2.recv_from(&mut buf)); - assert_ok!(datagram1.send_to(DATA1, path2)); - assert_ok!(datagram2.send_to(DATA2, path1)); + checked_write!(datagram1.send_to(DATA1, path2)); + checked_write!(datagram2.send_to(DATA2, path1)); expect_events( &mut poll, &mut events, @@ -378,33 +383,20 @@ fn smoke_test_unconnected(datagram1: UnixDatagram, datagram2: UnixDatagram) { ], ); - let (read, from_addr1) = assert_ok!(datagram1.recv_from(&mut buf)); - assert_eq!(read, DATA2_LEN); - assert_eq!(buf[..read], DATA2[..]); - assert_eq!( - from_addr1.as_pathname().expect("failed to get pathname"), - path2 - ); + expect_read!(datagram1.recv_from(&mut buf), DATA2, path: path2); + expect_read!(datagram2.recv_from(&mut buf), DATA1, path: path1); - let (read, from_addr2) = assert_ok!(datagram2.recv_from(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(buf[..read], DATA1[..]); - assert_eq!( - from_addr2.as_pathname().expect("failed to get pathname"), - path1 - ); - - assert!(assert_ok!(datagram1.take_error()).is_none()); - assert!(assert_ok!(datagram2.take_error()).is_none()); + assert!(datagram1.take_error().unwrap().is_none()); + assert!(datagram2.take_error().unwrap().is_none()); } fn smoke_test_connected(datagram1: UnixDatagram, datagram2: UnixDatagram) { let (mut poll, mut events) = init_with_poll(); - let local_addr1 = assert_ok!(datagram1.local_addr()); - let peer_addr1 = assert_ok!(datagram1.peer_addr()); - let local_addr2 = assert_ok!(datagram2.local_addr()); - let peer_addr2 = assert_ok!(datagram2.peer_addr()); + let local_addr1 = datagram1.local_addr().unwrap(); + let peer_addr1 = datagram1.peer_addr().unwrap(); + let local_addr2 = datagram2.local_addr().unwrap(); + let peer_addr2 = datagram2.peer_addr().unwrap(); assert_eq!( local_addr1.as_pathname().expect("failed to get pathname"), peer_addr2.as_pathname().expect("failed to get pathname") @@ -414,16 +406,20 @@ fn smoke_test_connected(datagram1: UnixDatagram, datagram2: UnixDatagram) { peer_addr1.as_pathname().expect("failed to get pathname") ); - assert_ok!(poll.registry().register( - &datagram1, - TOKEN_1, - Interests::READABLE.add(Interests::WRITABLE) - )); - assert_ok!(poll.registry().register( - &datagram2, - TOKEN_2, - Interests::READABLE.add(Interests::WRITABLE) - )); + poll.registry() + .register( + &datagram1, + TOKEN_1, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); + poll.registry() + .register( + &datagram2, + TOKEN_2, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -437,8 +433,8 @@ fn smoke_test_connected(datagram1: UnixDatagram, datagram2: UnixDatagram) { assert_would_block(datagram1.recv(&mut buf)); assert_would_block(datagram2.recv(&mut buf)); - assert_ok!(datagram1.send(DATA1)); - assert_ok!(datagram2.send(DATA2)); + checked_write!(datagram1.send(DATA1)); + checked_write!(datagram2.send(DATA2)); expect_events( &mut poll, &mut events, @@ -448,14 +444,9 @@ fn smoke_test_connected(datagram1: UnixDatagram, datagram2: UnixDatagram) { ], ); - let read = assert_ok!(datagram1.recv(&mut buf)); - assert_eq!(read, DATA2_LEN); - assert_eq!(buf[..read], DATA2[..]); - - let read = assert_ok!(datagram2.recv(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(buf[..read], DATA1[..]); + expect_read!(datagram1.recv(&mut buf), DATA2); + expect_read!(datagram2.recv(&mut buf), DATA1); - assert!(assert_ok!(datagram1.take_error()).is_none()); - assert!(assert_ok!(datagram2.take_error()).is_none()); + assert!(datagram1.take_error().unwrap().is_none()); + assert!(datagram2.take_error().unwrap().is_none()); } diff --git a/tests/unix_listener.rs b/tests/unix_listener.rs index 3f3a2c1b6..5da4ff251 100644 --- a/tests/unix_listener.rs +++ b/tests/unix_listener.rs @@ -35,10 +35,10 @@ fn unix_listener_smoke() { #[test] fn unix_listener_from_std() { smoke_test(|path| { - let listener = assert_ok!(net::UnixListener::bind(path)); + let listener = net::UnixListener::bind(path).unwrap(); // `std::os::unix::net::UnixStream`s are blocking by default, so make sure // it is in non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(listener.set_nonblocking(true)); + listener.set_nonblocking(true).unwrap(); Ok(UnixListener::from_std(listener)) }) } @@ -47,20 +47,20 @@ fn unix_listener_from_std() { fn unix_listener_try_clone_same_poll() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(3)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener1 = assert_ok!(UnixListener::bind(&path)); - let listener2 = assert_ok!(listener1.try_clone()); + let listener1 = UnixListener::bind(&path).unwrap(); + let listener2 = listener1.try_clone().unwrap(); assert_ne!(listener1.as_raw_fd(), listener2.as_raw_fd()); let handle_1 = open_connections(path.clone(), 1, barrier.clone()); - assert_ok!(poll - .registry() - .register(&listener1, TOKEN_1, Interests::READABLE)); - assert_ok!(poll - .registry() - .register(&listener2, TOKEN_2, Interests::READABLE)); + poll.registry() + .register(&listener1, TOKEN_1, Interests::READABLE) + .unwrap(); + poll.registry() + .register(&listener2, TOKEN_2, Interests::READABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -70,7 +70,7 @@ fn unix_listener_try_clone_same_poll() { ], ); - assert_ok!(listener1.accept()); + listener1.accept().unwrap(); let handle_2 = open_connections(path.clone(), 1, barrier.clone()); expect_events( @@ -82,38 +82,40 @@ fn unix_listener_try_clone_same_poll() { ], ); - assert_ok!(listener2.accept()); + listener2.accept().unwrap(); assert_would_block(listener1.accept()); assert_would_block(listener2.accept()); - assert!(assert_ok!(listener1.take_error()).is_none()); - assert!(assert_ok!(listener2.take_error()).is_none()); + assert!(listener1.take_error().unwrap().is_none()); + assert!(listener2.take_error().unwrap().is_none()); barrier.wait(); - assert_ok!(handle_1.join()); - assert_ok!(handle_2.join()); + handle_1.join().unwrap(); + handle_2.join().unwrap(); } #[test] fn unix_listener_try_clone_different_poll() { let (mut poll1, mut events) = init_with_poll(); - let mut poll2 = assert_ok!(Poll::new()); + let mut poll2 = Poll::new().unwrap(); let barrier = Arc::new(Barrier::new(3)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener1 = assert_ok!(UnixListener::bind(&path)); - let listener2 = assert_ok!(listener1.try_clone()); + let listener1 = UnixListener::bind(&path).unwrap(); + let listener2 = listener1.try_clone().unwrap(); assert_ne!(listener1.as_raw_fd(), listener2.as_raw_fd()); let handle_1 = open_connections(path.clone(), 1, barrier.clone()); - assert_ok!(poll1 + poll1 .registry() - .register(&listener1, TOKEN_1, Interests::READABLE)); - assert_ok!(poll2 + .register(&listener1, TOKEN_1, Interests::READABLE) + .unwrap(); + poll2 .registry() - .register(&listener2, TOKEN_2, Interests::READABLE)); + .register(&listener2, TOKEN_2, Interests::READABLE) + .unwrap(); expect_events( &mut poll1, &mut events, @@ -125,7 +127,7 @@ fn unix_listener_try_clone_different_poll() { vec![ExpectEvent::new(TOKEN_2, Interests::READABLE)], ); - assert_ok!(listener1.accept()); + listener1.accept().unwrap(); let handle_2 = open_connections(path.clone(), 1, barrier.clone()); expect_events( @@ -139,32 +141,34 @@ fn unix_listener_try_clone_different_poll() { vec![ExpectEvent::new(TOKEN_2, Interests::READABLE)], ); - assert_ok!(listener2.accept()); + listener2.accept().unwrap(); assert_would_block(listener1.accept()); assert_would_block(listener2.accept()); - assert!(assert_ok!(listener1.take_error()).is_none()); - assert!(assert_ok!(listener2.take_error()).is_none()); + assert!(listener1.take_error().unwrap().is_none()); + assert!(listener2.take_error().unwrap().is_none()); barrier.wait(); - assert_ok!(handle_1.join()); - assert_ok!(handle_2.join()); + handle_1.join().unwrap(); + handle_2.join().unwrap(); } #[test] fn unix_listener_local_addr() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(UnixListener::bind(&path)); - assert_ok!(poll.registry().register( - &listener, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); + let listener = UnixListener::bind(&path).unwrap(); + poll.registry() + .register( + &listener, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); let handle = open_connections(path.clone(), 1, barrier.clone()); expect_events( @@ -173,26 +177,23 @@ fn unix_listener_local_addr() { vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - let (stream, expected_addr) = assert_ok!(listener.accept()); - assert_eq!( - assert_ok!(stream.local_addr()).as_pathname().unwrap(), - &path - ); + let (stream, expected_addr) = listener.accept().unwrap(); + assert_eq!(stream.local_addr().unwrap().as_pathname().unwrap(), &path); assert!(expected_addr.as_pathname().is_none()); barrier.wait(); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] fn unix_listener_register() { let (mut poll, mut events) = init_with_poll(); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); - let listener = assert_ok!(UnixListener::bind(dir.path().join("any"))); - assert_ok!(poll - .registry() - .register(&listener, TOKEN_1, Interests::READABLE)); + let listener = UnixListener::bind(dir.path().join("any")).unwrap(); + poll.registry() + .register(&listener, TOKEN_1, Interests::READABLE) + .unwrap(); expect_no_events(&mut poll, &mut events) } @@ -200,20 +201,20 @@ fn unix_listener_register() { fn unix_listener_reregister() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(UnixListener::bind(&path)); - assert_ok!(poll - .registry() - .register(&listener, TOKEN_1, Interests::WRITABLE)); + let listener = UnixListener::bind(&path).unwrap(); + poll.registry() + .register(&listener, TOKEN_1, Interests::WRITABLE) + .unwrap(); let handle = open_connections(path.clone(), 1, barrier.clone()); expect_no_events(&mut poll, &mut events); - assert_ok!(poll - .registry() - .reregister(&listener, TOKEN_1, Interests::READABLE)); + poll.registry() + .reregister(&listener, TOKEN_1, Interests::READABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -221,28 +222,28 @@ fn unix_listener_reregister() { ); barrier.wait(); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] fn unix_listener_deregister() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(UnixListener::bind(&path)); - assert_ok!(poll - .registry() - .register(&listener, TOKEN_1, Interests::READABLE)); + let listener = UnixListener::bind(&path).unwrap(); + poll.registry() + .register(&listener, TOKEN_1, Interests::READABLE) + .unwrap(); let handle = open_connections(path.clone(), 1, barrier.clone()); - assert_ok!(poll.registry().deregister(&listener)); + poll.registry().deregister(&listener).unwrap(); expect_no_events(&mut poll, &mut events); barrier.wait(); - assert_ok!(handle.join()); + handle.join().unwrap(); } fn smoke_test(new_listener: F) @@ -251,15 +252,17 @@ where { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); - let dir = assert_ok!(TempDir::new("unix_listener")); + let dir = TempDir::new("unix_listener").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(new_listener(&path)); - assert_ok!(poll.registry().register( - &listener, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); + let listener = new_listener(&path).unwrap(); + poll.registry() + .register( + &listener, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); expect_no_events(&mut poll, &mut events); let handle = open_connections(path.clone(), 1, barrier.clone()); @@ -269,16 +272,16 @@ where vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - let (mut stream, _) = assert_ok!(listener.accept()); + let (mut stream, _) = listener.accept().unwrap(); let mut buf = [0; DEFAULT_BUF_SIZE]; assert_would_block(stream.read(&mut buf)); assert_would_block(listener.accept()); - assert!(assert_ok!(listener.take_error()).is_none()); + assert!(listener.take_error().unwrap().is_none()); barrier.wait(); - assert_ok!(handle.join()); + handle.join().unwrap(); } fn open_connections( @@ -288,7 +291,7 @@ fn open_connections( ) -> thread::JoinHandle<()> { thread::spawn(move || { for _ in 0..n_connections { - let conn = assert_ok!(net::UnixStream::connect(path.clone())); + let conn = net::UnixStream::connect(path.clone()).unwrap(); barrier.wait(); drop(conn); } diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 74227eda1..a114dcce2 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -1,8 +1,8 @@ #![cfg(unix)] + #[macro_use] mod util; -use log::warn; use mio::net::UnixStream; use mio::{Interests, Token}; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; @@ -12,11 +12,10 @@ use std::path::Path; use std::sync::mpsc::channel; use std::sync::{Arc, Barrier}; use std::thread; -use std::time::Duration; use tempdir::TempDir; use util::{ assert_send, assert_sync, assert_would_block, expect_events, expect_no_events, init_with_poll, - ExpectEvent, TryRead, TryWrite, + ExpectEvent, Readiness, }; const DATA1: &[u8] = b"Hello same host!"; @@ -43,24 +42,22 @@ fn unix_stream_smoke() { fn unix_stream_connect() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); - let dir = assert_ok!(TempDir::new("unix")); + let dir = TempDir::new("unix").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(net::UnixListener::bind(path.clone())); - let stream = assert_ok!(UnixStream::connect(path)); + let listener = net::UnixListener::bind(path.clone()).unwrap(); + let stream = UnixStream::connect(path).unwrap(); let barrier_clone = barrier.clone(); let handle = thread::spawn(move || { - let (stream, _) = assert_ok!(listener.accept()); + let (stream, _) = listener.accept().unwrap(); barrier_clone.wait(); drop(stream); }); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::READABLE | Interests::WRITABLE - )); + poll.registry() + .register(&stream, TOKEN_1, Interests::READABLE | Interests::WRITABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -74,16 +71,16 @@ fn unix_stream_connect() { vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] fn unix_stream_from_std() { smoke_test(|path| { - let local = assert_ok!(net::UnixStream::connect(path)); + let local = net::UnixStream::connect(path).unwrap(); // `std::os::unix::net::UnixStream`s are blocking by default, so make sure // it is in non-blocking mode before wrapping in a Mio equivalent. - assert_ok!(local.set_nonblocking(true)); + local.set_nonblocking(true).unwrap(); Ok(UnixStream::from_std(local)) }) } @@ -92,13 +89,13 @@ fn unix_stream_from_std() { fn unix_stream_pair() { let (mut poll, mut events) = init_with_poll(); - let (mut s1, mut s2) = assert_ok!(UnixStream::pair()); - assert_ok!(poll - .registry() - .register(&s1, TOKEN_1, Interests::READABLE | Interests::WRITABLE)); - assert_ok!(poll - .registry() - .register(&s2, TOKEN_2, Interests::READABLE | Interests::WRITABLE)); + let (mut s1, mut s2) = UnixStream::pair().unwrap(); + poll.registry() + .register(&s1, TOKEN_1, Interests::READABLE | Interests::WRITABLE) + .unwrap(); + poll.registry() + .register(&s2, TOKEN_2, Interests::READABLE | Interests::WRITABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -108,24 +105,17 @@ fn unix_stream_pair() { let mut buf = [0; DEFAULT_BUF_SIZE]; assert_would_block(s1.read(&mut buf)); - let wrote = assert_ok!(s1.write(&DATA1)); - assert_eq!(wrote, DATA1_LEN); - assert_ok!(s1.flush()); + checked_write!(s1.write(&DATA1)); + s1.flush().unwrap(); + + expect_read!(s2.read(&mut buf), DATA1); + assert_would_block(s2.read(&mut buf)); + + checked_write!(s2.write(&DATA2)); + s2.flush().unwrap(); - let read = assert_ok!(s2.read(&mut buf)); + expect_read!(s1.read(&mut buf), DATA2); assert_would_block(s2.read(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(&buf[..read], DATA1); - assert_eq!(read, wrote, "unequal reads and writes"); - - let wrote = assert_ok!(s2.write(&DATA2)); - assert_eq!(wrote, DATA2_LEN); - assert_ok!(s2.flush()); - - let read = assert_ok!(s1.read(&mut buf)); - assert_eq!(read, DATA2_LEN); - assert_eq!(&buf[..read], DATA2); - assert_eq!(read, wrote, "unequal reads and writes"); } #[test] @@ -134,10 +124,10 @@ fn unix_stream_try_clone() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let mut stream_1 = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll - .registry() - .register(&stream_1, TOKEN_1, Interests::WRITABLE)); + let mut stream_1 = UnixStream::connect(path).unwrap(); + poll.registry() + .register(&stream_1, TOKEN_1, Interests::WRITABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -145,31 +135,28 @@ fn unix_stream_try_clone() { ); let mut buf = [0; DEFAULT_BUF_SIZE]; - let wrote = assert_ok!(stream_1.write(&DATA1)); - assert_eq!(wrote, DATA1_LEN); + checked_write!(stream_1.write(&DATA1)); - let mut stream_2 = assert_ok!(stream_1.try_clone()); + let mut stream_2 = stream_1.try_clone().unwrap(); // When using `try_clone` the `TcpStream` needs to be deregistered! - assert_ok!(poll.registry().deregister(&stream_1)); + poll.registry().deregister(&stream_1).unwrap(); drop(stream_1); - assert_ok!(poll - .registry() - .register(&stream_2, TOKEN_2, Interests::READABLE)); + poll.registry() + .register(&stream_2, TOKEN_2, Interests::READABLE) + .unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_2, Interests::READABLE)], ); - let read = assert_ok!(stream_2.read(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(&buf[..read], DATA1); + expect_read!(stream_2.read(&mut buf), DATA1); // Close the connection to allow the remote to shutdown drop(stream_2); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -177,17 +164,17 @@ fn unix_stream_peer_addr() { let (handle, expected_addr) = new_echo_listener(1); let expected_path = expected_addr.as_pathname().expect("failed to get pathname"); - let stream = assert_ok!(UnixStream::connect(expected_path)); + let stream = UnixStream::connect(expected_path).unwrap(); assert_eq!( - assert_ok!(stream.peer_addr()).as_pathname().unwrap(), + stream.peer_addr().unwrap().as_pathname().unwrap(), expected_path ); - assert!(assert_ok!(stream.local_addr()).as_pathname().is_none()); + assert!(stream.local_addr().unwrap().as_pathname().is_none()); // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -196,28 +183,33 @@ fn unix_stream_shutdown_read() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let mut stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::READABLE.add(Interests::WRITABLE) - )); + let mut stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register( + &stream, + TOKEN_1, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::WRITABLE)], ); - let wrote = assert_ok!(stream.write(DATA1)); - assert_eq!(wrote, DATA1_LEN); + checked_write!(stream.write(&DATA1)); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Read)); - expect_readiness!(poll, events, is_read_closed); + stream.shutdown(Shutdown::Read).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::READ_CLOSED)], + ); // Shutting down the reading side is different on each platform. For example // on Linux based systems we can still read. @@ -231,13 +223,12 @@ fn unix_stream_shutdown_read() { ))] { let mut buf = [0; DEFAULT_BUF_SIZE]; - let read = assert_ok!(stream.read(&mut buf)); - assert_eq!(read, 0); + expect_read!(stream.read(&mut buf), &[]); } // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -246,27 +237,28 @@ fn unix_stream_shutdown_write() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let mut stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); + let mut stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register( + &stream, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::WRITABLE)], ); - let wrote = assert_ok!(stream.write(DATA1)); - assert_eq!(wrote, DATA1_LEN); + checked_write!(stream.write(&DATA1)); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Write)); + stream.shutdown(Shutdown::Write).unwrap(); #[cfg(any( target_os = "dragonfly", @@ -276,20 +268,22 @@ fn unix_stream_shutdown_write() { target_os = "netbsd", target_os = "openbsd" ))] - expect_readiness!(poll, events, is_write_closed); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::WRITE_CLOSED)], + ); - let err = assert_err!(stream.write(DATA2)); + let err = stream.write(DATA2).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); // Read should be ok let mut buf = [0; DEFAULT_BUF_SIZE]; - let read = assert_ok!(stream.read(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(&buf[..read], DATA1); + expect_read!(stream.read(&mut buf), DATA1); // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -298,28 +292,33 @@ fn unix_stream_shutdown_both() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let mut stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); + let mut stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register( + &stream, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::WRITABLE)], ); - let wrote = assert_ok!(stream.write(DATA1)); - assert_eq!(wrote, DATA1_LEN); + checked_write!(stream.write(&DATA1)); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - assert_ok!(stream.shutdown(Shutdown::Both)); - expect_readiness!(poll, events, is_write_closed); + stream.shutdown(Shutdown::Both).unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::WRITE_CLOSED)], + ); // Shutting down the reading side is different on each platform. For example // on Linux based systems we can still read. @@ -333,11 +332,10 @@ fn unix_stream_shutdown_both() { ))] { let mut buf = [0; DEFAULT_BUF_SIZE]; - let read = assert_ok!(stream.read(&mut buf)); - assert_eq!(read, 0); + expect_read!(stream.read(&mut buf), &[]); } - let err = assert_err!(stream.write(DATA2)); + let err = stream.write(DATA2).unwrap_err(); #[cfg(unix)] assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); #[cfg(window)] @@ -345,7 +343,7 @@ fn unix_stream_shutdown_both() { // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -355,12 +353,14 @@ fn unix_stream_shutdown_listener_write() { let (handle, remote_addr) = new_noop_listener(1, barrier.clone()); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::READABLE.add(Interests::WRITABLE) - )); + let stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register( + &stream, + TOKEN_1, + Interests::READABLE.add(Interests::WRITABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -368,10 +368,14 @@ fn unix_stream_shutdown_listener_write() { ); barrier.wait(); - expect_readiness!(poll, events, is_read_closed); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(TOKEN_1, Readiness::READ_CLOSED)], + ); barrier.wait(); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -380,15 +384,15 @@ fn unix_stream_register() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll - .registry() - .register(&stream, TOKEN_1, Interests::READABLE)); + let stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register(&stream, TOKEN_1, Interests::READABLE) + .unwrap(); expect_no_events(&mut poll, &mut events); // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -397,13 +401,13 @@ fn unix_stream_reregister() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll - .registry() - .register(&stream, TOKEN_1, Interests::READABLE)); - assert_ok!(poll - .registry() - .reregister(&stream, TOKEN_1, Interests::WRITABLE)); + let stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register(&stream, TOKEN_1, Interests::READABLE) + .unwrap(); + poll.registry() + .reregister(&stream, TOKEN_1, Interests::WRITABLE) + .unwrap(); expect_events( &mut poll, &mut events, @@ -412,7 +416,7 @@ fn unix_stream_reregister() { // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } #[test] @@ -421,16 +425,16 @@ fn unix_stream_deregister() { let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let stream = assert_ok!(UnixStream::connect(path)); - assert_ok!(poll - .registry() - .register(&stream, TOKEN_1, Interests::WRITABLE)); - assert_ok!(poll.registry().deregister(&stream)); + let stream = UnixStream::connect(path).unwrap(); + poll.registry() + .register(&stream, TOKEN_1, Interests::WRITABLE) + .unwrap(); + poll.registry().deregister(&stream).unwrap(); expect_no_events(&mut poll, &mut events); // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } fn smoke_test(connect_stream: F) @@ -441,12 +445,14 @@ where let (handle, remote_addr) = new_echo_listener(1); let path = remote_addr.as_pathname().expect("failed to get pathname"); - let mut stream = assert_ok!(connect_stream(path)); - assert_ok!(poll.registry().register( - &stream, - TOKEN_1, - Interests::WRITABLE.add(Interests::READABLE) - )); + let mut stream = connect_stream(path).unwrap(); + poll.registry() + .register( + &stream, + TOKEN_1, + Interests::WRITABLE.add(Interests::READABLE), + ) + .unwrap(); expect_events( &mut poll, &mut events, @@ -456,24 +462,20 @@ where let mut buf = [0; DEFAULT_BUF_SIZE]; assert_would_block(stream.read(&mut buf)); - let wrote = assert_ok!(stream.write(&DATA1)); - assert_eq!(wrote, DATA1_LEN); - assert_ok!(stream.flush()); + checked_write!(stream.write(&DATA1)); + stream.flush().unwrap(); expect_events( &mut poll, &mut events, vec![ExpectEvent::new(TOKEN_1, Interests::READABLE)], ); - let read = assert_ok!(stream.read(&mut buf)); - assert_eq!(read, DATA1_LEN); - assert_eq!(&buf[..read], DATA1); - assert_eq!(read, wrote, "unequal reads and writes"); + expect_read!(stream.read(&mut buf), DATA1); - assert!(assert_ok!(stream.take_error()).is_none()); + assert!(stream.take_error().unwrap().is_none()); let bufs = [IoSlice::new(&DATA1), IoSlice::new(&DATA2)]; - let wrote = assert_ok!(stream.write_vectored(&bufs)); + let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN); expect_events( &mut poll, @@ -484,7 +486,7 @@ where let mut buf1 = [1; DATA1_LEN]; let mut buf2 = [2; DATA2_LEN + 1]; let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)]; - let read = assert_ok!(stream.read_vectored(&mut bufs)); + let read = stream.read_vectored(&mut bufs).unwrap(); assert_eq!(read, DATA1_LEN + DATA2_LEN); assert_eq!(&buf1, DATA1); assert_eq!(&buf2[..DATA2.len()], DATA2); @@ -494,20 +496,20 @@ where // Close the connection to allow the remote to shutdown drop(stream); - assert_ok!(handle.join()); + handle.join().unwrap(); } fn new_echo_listener(connections: usize) -> (thread::JoinHandle<()>, net::SocketAddr) { let (addr_sender, addr_receiver) = channel(); let handle = thread::spawn(move || { - let dir = assert_ok!(TempDir::new("unix")); + let dir = TempDir::new("unix").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(net::UnixListener::bind(path)); - let local_addr = assert_ok!(listener.local_addr()); - assert_ok!(addr_sender.send(local_addr)); + let listener = net::UnixListener::bind(path).unwrap(); + let local_addr = listener.local_addr().unwrap(); + addr_sender.send(local_addr).unwrap(); for _ in 0..connections { - let (mut stream, _) = assert_ok!(listener.accept()); + let (mut stream, _) = listener.accept().unwrap(); // On Linux based system it will cause a connection reset // error when the reading side of the peer connection is @@ -515,29 +517,29 @@ fn new_echo_listener(connections: usize) -> (thread::JoinHandle<()>, net::Socket let (mut read, mut written) = (0, 0); let mut buf = [0; DEFAULT_BUF_SIZE]; loop { - let n = match stream.try_read(&mut buf) { - Ok(Some(amount)) => { + let n = match stream.read(&mut buf) { + Ok(amount) => { read += amount; amount } - Ok(None) => continue, + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, Err(ref err) if err.kind() == io::ErrorKind::ConnectionReset => break, Err(err) => panic!("{}", err), }; if n == 0 { break; } - match stream.try_write(&buf[..n]) { - Ok(Some(amount)) => written += amount, - Ok(None) => continue, + match stream.write(&buf[..n]) { + Ok(amount) => written += amount, + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, Err(ref err) if err.kind() == io::ErrorKind::BrokenPipe => break, - Err(err) => panic!("{:?}", err), + Err(err) => panic!("{}", err), }; } assert_eq!(read, written, "unequal reads and writes"); } }); - (handle, assert_ok!(addr_receiver.recv())) + (handle, addr_receiver.recv().unwrap()) } fn new_noop_listener( @@ -546,19 +548,19 @@ fn new_noop_listener( ) -> (thread::JoinHandle<()>, net::SocketAddr) { let (sender, receiver) = channel(); let handle = thread::spawn(move || { - let dir = assert_ok!(TempDir::new("unix")); + let dir = TempDir::new("unix").unwrap(); let path = dir.path().join("any"); - let listener = assert_ok!(net::UnixListener::bind(path)); - let local_addr = assert_ok!(listener.local_addr()); - assert_ok!(sender.send(local_addr)); + let listener = net::UnixListener::bind(path).unwrap(); + let local_addr = listener.local_addr().unwrap(); + sender.send(local_addr).unwrap(); for _ in 0..connections { - let (stream, _) = assert_ok!(listener.accept()); + let (stream, _) = listener.accept().unwrap(); barrier.wait(); - assert_ok!(stream.shutdown(Shutdown::Write)); + stream.shutdown(Shutdown::Write).unwrap(); barrier.wait(); drop(stream); } }); - (handle, assert_ok!(receiver.recv())) + (handle, receiver.recv().unwrap()) } diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 6d3cafe9c..4feee05a3 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -1,95 +1,16 @@ // Not all functions are used by all tests. -#![allow(dead_code)] +#![allow(dead_code, unused_macros)] -use std::fmt; -use std::io::{self, Read, Write}; use std::net::SocketAddr; +use std::ops::BitOr; use std::sync::Once; use std::time::Duration; +use std::{fmt, io}; -use bytes::{Buf, BufMut}; use log::{error, warn}; use mio::event::Event; use mio::{Events, Interests, Poll, Token}; -// TODO: replace w/ assertive -// https://github.com/carllerche/assertive -#[macro_export] -macro_rules! assert_ok { - ($e:expr) => { - assert_ok!($e,) - }; - ($e:expr,) => {{ - use std::result::Result::*; - match $e { - Ok(v) => v, - Err(e) => panic!("assertion failed: error = {:?}", e), - } - }}; - ($e:expr, $($arg:tt)+) => {{ - use std::result::Result::*; - match $e { - Ok(v) => v, - Err(e) => panic!("assertion failed: error = {:?}: {}", e, format_args!($($arg)+)), - } - }}; -} - -#[macro_export] -macro_rules! assert_err { - ($e:expr) => { - assert_err!($e,); - }; - ($e:expr,) => {{ - use std::result::Result::*; - match $e { - Ok(v) => panic!("assertion failed: Ok({:?})", v), - Err(e) => e, - } - }}; - ($e:expr, $($arg:tt)+) => {{ - use std::result::Result::*; - match $e { - Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)), - Err(e) => e, - } - }}; -} - -/// Expect specific readiness on an event. -#[macro_export] -macro_rules! expect_readiness { - ($poll:ident, $events:ident, $readiness:ident) => {{ - let mut found = false; - // Poll a couple of times in case the event does not immediately - // happen - for _ in 0..3 { - assert_ok!($poll.poll(&mut $events, Some(Duration::from_millis(500)))); - for event in $events.iter() { - if event.$readiness() { - found = true; - break; - } else { - // Accept sporadic events. - warn!("got unexpected event: {:?}", event); - } - } - - // Explicitly break here instead of labeling the outer for loop - // because this introduces label shadowing if `expect_readiness!` - // is used more than once within one function. - // - // https://github.com/rust-lang/rust/issues/24278 - if found { - break; - } - } - if !found { - panic!("failed to find event readiness") - } - }}; -} - pub fn init() { static INIT: Once = Once::new(); @@ -109,111 +30,95 @@ pub fn init_with_poll() -> (Poll, Events) { pub fn assert_sync() {} pub fn assert_send() {} -pub trait TryRead { - fn try_read_buf(&mut self, buf: &mut B) -> io::Result> - where - Self: Sized, - { - // Reads the length of the slice supplied by buf.mut_bytes into the buffer - // This is not guaranteed to consume an entire datagram or segment. - // If your protocol is msg based (instead of continuous stream) you should - // ensure that your buffer is large enough to hold an entire segment (1532 bytes if not jumbo - // frames) - let res = self.try_read(unsafe { buf.bytes_mut() }); - - if let Ok(Some(cnt)) = res { - unsafe { - buf.advance_mut(cnt); - } - } - - res - } - - fn try_read(&mut self, buf: &mut [u8]) -> io::Result>; +/// An event that is expected to show up when `Poll` is polled, see +/// `expect_events`. +#[derive(Debug)] +pub struct ExpectEvent { + token: Token, + readiness: Readiness, } -pub trait TryWrite { - fn try_write_buf(&mut self, buf: &mut B) -> io::Result> +impl ExpectEvent { + pub fn new(token: Token, readiness: R) -> ExpectEvent where - Self: Sized, + R: Into, { - let res = self.try_write(buf.bytes()); - - if let Ok(Some(cnt)) = res { - buf.advance(cnt); + ExpectEvent { + token, + readiness: readiness.into(), } - - res } - fn try_write(&mut self, buf: &[u8]) -> io::Result>; -} - -impl TryRead for T { - fn try_read(&mut self, dst: &mut [u8]) -> io::Result> { - self.read(dst).map_non_block() - } -} - -impl TryWrite for T { - fn try_write(&mut self, src: &[u8]) -> io::Result> { - self.write(src).map_non_block() + fn matches(&self, event: &Event) -> bool { + event.token() == self.token && self.readiness.matches(event) } } -/* - * - * ===== Helpers ===== - * - */ - -/// A helper trait to provide the map_non_block function on Results. -trait MapNonBlock { - /// Maps a `Result` to a `Result>` by converting - /// operation-would-block errors into `Ok(None)`. - fn map_non_block(self) -> io::Result>; -} +#[derive(Debug)] +pub struct Readiness(usize); + +const READABLE: usize = 0b00_000_001; +const WRITABLE: usize = 0b00_000_010; +const AIO: usize = 0b00_000_100; +const LIO: usize = 0b00_001_000; +const ERROR: usize = 0b00_010_000; +const READ_CLOSED: usize = 0b00_100_000; +const WRITE_CLOSED: usize = 0b01_000_000; +const PRIORITY: usize = 0b10_000_000; + +impl Readiness { + pub const READABLE: Readiness = Readiness(READABLE); + pub const WRITABLE: Readiness = Readiness(WRITABLE); + pub const AIO: Readiness = Readiness(AIO); + pub const LIO: Readiness = Readiness(LIO); + pub const ERROR: Readiness = Readiness(ERROR); + pub const READ_CLOSED: Readiness = Readiness(READ_CLOSED); + pub const WRITE_CLOSED: Readiness = Readiness(WRITE_CLOSED); + pub const PRIORITY: Readiness = Readiness(PRIORITY); -impl MapNonBlock for io::Result { - fn map_non_block(self) -> io::Result> { - use std::io::ErrorKind::WouldBlock; + fn matches(&self, event: &Event) -> bool { + // If we expect a readiness then also match on the event. + // In maths terms that is p -> q, which is the same as !p || q. + (!self.is(READABLE) || event.is_readable()) + && (!self.is(WRITABLE) || event.is_writable()) + && (!self.is(AIO) || event.is_aio()) + && (!self.is(LIO) || event.is_lio()) + && (!self.is(ERROR) || event.is_error()) + && (!self.is(READ_CLOSED) || event.is_read_closed()) + && (!self.is(WRITE_CLOSED) || event.is_write_closed()) + && (!self.is(PRIORITY) || event.is_priority()) + } - match self { - Ok(value) => Ok(Some(value)), - Err(err) => { - if let WouldBlock = err.kind() { - Ok(None) - } else { - Err(err) - } - } - } + /// Usage: `self.is(READABLE)`. + fn is(&self, value: usize) -> bool { + self.0 & value != 0 } } -/// An event that is expected to show up when `Poll` is polled, see -/// `expect_events`. -#[derive(Debug)] -pub struct ExpectEvent { - token: Token, - // We're (ab)using `Interests` as readiness in `matches`. - interests: Interests, -} +impl BitOr for Readiness { + type Output = Self; -impl ExpectEvent { - pub const fn new(token: Token, interests: Interests) -> ExpectEvent { - ExpectEvent { token, interests } + fn bitor(self, other: Self) -> Self { + Readiness(self.0 | other.0) } +} - fn matches(&self, event: &Event) -> bool { - event.token() == self.token && - // If we expect a readiness then also match on the event. - // In maths terms that is p -> q, which is the same as !p || q. - (!self.interests.is_readable() || event.is_readable()) && - (!self.interests.is_writable() || event.is_writable()) && - (!self.interests.is_aio() || event.is_aio()) && - (!self.interests.is_lio() || event.is_lio()) +impl From for Readiness { + fn from(interests: Interests) -> Readiness { + let mut readiness = Readiness(0); + if interests.is_readable() { + readiness.0 |= READABLE; + } + if interests.is_writable() { + readiness.0 |= WRITABLE; + } + if interests.is_aio() { + readiness.0 |= AIO; + } + if interests.is_lio() { + readiness.0 |= LIO; + } + readiness } } @@ -291,3 +196,65 @@ pub fn any_local_address() -> SocketAddr { pub fn any_local_ipv6_address() -> SocketAddr { "[::1]:0".parse().unwrap() } + +/// A checked {write, send, send_to} macro that ensures the entire buffer is +/// written. +/// +/// Usage: `checked_write!(stream.write(&DATA));` +/// Also works for send(_to): `checked_write!(socket.send_to(DATA, address))`. +macro_rules! checked_write { + ($socket: ident . $method: ident ( $data: expr $(, $arg: expr)* ) ) => {{ + let data = $data; + let n = $socket.$method($data $(, $arg)*) + .expect("unable to write to socket"); + assert_eq!(n, data.len(), "short write"); + }}; +} + +/// A checked {read, recv, recv_from, peek, peek_from} macro that ensures the +/// current buffer is read. +/// +/// Usage: `expect_read!(stream.read(&mut buf), DATA);` reads into `buf` and +/// compares it to `DATA`. +/// Also works for recv(_from): `expect_read!(socket.recv_from(&mut buf), DATA, address)`. +macro_rules! expect_read { + ($socket: ident . $method: ident ( $buf: expr $(, $arg: expr)* ), $expected: expr) => {{ + let n = $socket.$method($buf $(, $arg)*) + .expect("unable to read from socket"); + let expected = $expected; + assert_eq!(n, expected.len()); + assert_eq!(&$buf[..n], expected); + }}; + // TODO: change the call sites to check the source address. + // Support for recv_from and peek_from, without checking the address. + ($socket: ident . $method: ident ( $buf: expr $(, $arg: expr)* ), $expected: expr, __anywhere) => {{ + let (n, _address) = $socket.$method($buf $(, $arg)*) + .expect("unable to read from socket"); + let expected = $expected; + assert_eq!(n, expected.len()); + assert_eq!(&$buf[..n], expected); + }}; + // Support for recv_from and peek_from for `UnixDatagram`s. + ($socket: ident . $method: ident ( $buf: expr $(, $arg: expr)* ), $expected: expr, path: $source: expr) => {{ + let (n, path) = $socket.$method($buf $(, $arg)*) + .expect("unable to read from socket"); + let expected = $expected; + let source = $source; + assert_eq!(n, expected.len()); + assert_eq!(&$buf[..n], expected); + assert_eq!( + path.as_pathname().expect("failed to get path name"), + source + ); + }}; + // Support for recv_from and peek_from for `UdpSocket`s. + ($socket: ident . $method: ident ( $buf: expr $(, $arg: expr)* ), $expected: expr, $source: expr) => {{ + let (n, address) = $socket.$method($buf $(, $arg)*) + .expect("unable to read from socket"); + let expected = $expected; + let source = $source; + assert_eq!(n, expected.len()); + assert_eq!(&$buf[..n], expected); + assert_eq!(address, source); + }}; +}