diff --git a/Cargo.toml b/Cargo.toml index dcb52a6..80a7ca9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ name = "stomp" readme = "README.md" repository = "https://github.com/zslayton/stomp-rs" version = "0.12.0" - +rust-version = "1.62" +edition = "2021" [dependencies] bytes = "0.4" futures = "0.1" diff --git a/src/codec.rs b/src/codec.rs index 143eed8..6052cdd 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -1,9 +1,8 @@ -use header::{Header, HeaderList}; -use frame::{Frame, Transmission}; +use crate::frame::{Command, Frame, Transmission}; +use crate::header::{Header, HeaderList}; use bytes::BytesMut; -use frame::Command; -use tokio_io::codec::{Encoder, Decoder}; -use nom::{line_ending, anychar}; +use nom::{anychar, line_ending}; +use tokio_io::codec::{Decoder, Encoder}; named!(parse_server_command(&[u8]) -> Command, alt!( @@ -43,26 +42,22 @@ fn get_body<'a, 'b>(bytes: &'a [u8], headers: &'b [Header]) -> ::nom::IResult<&' trace!("found content-length header"); match header.1.parse::() { Ok(value) => content_length = Some(value), - Err(error) => warn!("failed to parse content-length header: {}", error) + Err(error) => warn!("failed to parse content-length header: {}", error), } } } if let Some(content_length) = content_length { trace!("using content-length header: {}", content_length); take!(bytes, content_length) - } - else { + } else { trace!("using many0 method to parse body"); - map!(bytes, - many0!(is_not!("\0")), - |body| { - if body.len() == 0 { - &[] - } else { - body.into_iter().nth(0).unwrap() - } + map!(bytes, many0!(is_not!("\0")), |body| { + if body.len() == 0 { + &[] + } else { + body.into_iter().nth(0).unwrap() } - ) + }) } } named!(parse_frame(&[u8]) -> Frame, @@ -96,7 +91,11 @@ pub struct Codec; impl Encoder for Codec { type Item = Transmission; type Error = ::std::io::Error; - fn encode(&mut self, item: Transmission, buffer: &mut BytesMut) -> Result<(), ::std::io::Error> { + fn encode( + &mut self, + item: Transmission, + buffer: &mut BytesMut, + ) -> Result<(), ::std::io::Error> { item.write(buffer); Ok(()) } @@ -111,14 +110,12 @@ impl Decoder for Codec { trace!("decoding data: {:?}", src); let (point, data) = match parse_transmission(src) { - IResult::Done(rest, data) => { - (rest.len(), data) - }, + IResult::Done(rest, data) => (rest.len(), data), IResult::Error(e) => { warn!("parse error: {:?}", e); return Err(Error::new(ErrorKind::Other, format!("parse error: {}", e))); - }, - IResult::Incomplete(_) => return Ok(None) + } + IResult::Incomplete(_) => return Ok(None), }; let len = src.len().saturating_sub(point); src.split_to(len); diff --git a/src/connection.rs b/src/connection.rs index 46e3435..bbb9e73 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -25,11 +25,12 @@ impl OwnedCredentials { } impl Connection { - pub fn select_heartbeat(client_tx_ms: u32, - client_rx_ms: u32, - server_tx_ms: u32, - server_rx_ms: u32) - -> (u32, u32) { + pub fn select_heartbeat( + client_tx_ms: u32, + client_rx_ms: u32, + server_tx_ms: u32, + server_rx_ms: u32, + ) -> (u32, u32) { let heartbeat_tx_ms: u32; let heartbeat_rx_ms: u32; if client_tx_ms == 0 || server_rx_ms == 0 { diff --git a/src/frame.rs b/src/frame.rs index b211b3c..16b20fc 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -1,10 +1,7 @@ -use header::HeaderList; -use header::Header; -use subscription::AckMode; -use std::str::from_utf8; -use std::fmt; -use std::fmt::Formatter; +use crate::{header::{Header, HeaderList}, header_list, subscription::AckMode}; use bytes::BytesMut; +use std::fmt::{self, Formatter}; +use std::str::from_utf8; #[derive(Copy, Clone, Debug)] pub enum Command { @@ -22,7 +19,7 @@ pub enum Command { Connected, Message, Receipt, - Error + Error, } impl Command { pub fn as_str(&self) -> &'static str { @@ -108,7 +105,8 @@ impl Frame { let mut space_required: usize = 0; // Add one to space calculations to make room for '\n' space_required += self.command.as_str().len() + 1; - space_required += self.headers + space_required += self + .headers .iter() .fold(0, |length, header| length + header.get_raw().len() + 1); space_required += 1; // Newline at end of headers @@ -176,7 +174,6 @@ impl Frame { disconnect_frame } - pub fn subscribe(subscription_id: &str, destination: &str, ack_mode: AckMode) -> Frame { let subscribe_frame = Frame { command: Command::Subscribe, diff --git a/src/header.rs b/src/header.rs index 3577cc8..e3d954d 100644 --- a/src/header.rs +++ b/src/header.rs @@ -17,7 +17,9 @@ impl HeaderList { HeaderList::with_capacity(0) } pub fn with_capacity(capacity: usize) -> HeaderList { - HeaderList { headers: Vec::with_capacity(capacity) } + HeaderList { + headers: Vec::with_capacity(capacity), + } } pub fn push(&mut self, header: Header) { @@ -33,7 +35,8 @@ impl HeaderList { } pub fn drain(&mut self, mut sink: F) - where F: FnMut(Header) + where + F: FnMut(Header), { while let Some(header) = self.headers.pop() { sink(header); @@ -48,7 +51,8 @@ impl HeaderList { } pub fn retain(&mut self, test: F) - where F: Fn(&Header) -> bool + where + F: Fn(&Header) -> bool, { self.headers.retain(test) } @@ -73,10 +77,10 @@ impl Header { } pub fn encode_value(value: &str) -> String { - let mut encoded = String::new();//self.strings.detached(); + let mut encoded = String::new(); //self.strings.detached(); for grapheme in UnicodeSegmentation::graphemes(value, true) { match grapheme { - "\\" => encoded.push_str(r"\\"),// Order is significant + "\\" => encoded.push_str(r"\\"), // Order is significant "\r" => encoded.push_str(r"\r"), "\n" => encoded.push_str(r"\n"), ":" => encoded.push_str(r"\c"), @@ -128,11 +132,9 @@ pub enum StompVersion { impl HeaderList { pub fn get_header<'a>(&'a self, key: &str) -> Option<&'a Header> { - self.headers.iter().find(|header| { - match **header { - ref h if h.get_key() == key => true, - _ => false, - } + self.headers.iter().find(|header| match **header { + ref h if h.get_key() == key => true, + _ => false, }) } @@ -141,16 +143,15 @@ impl HeaderList { Some(h) => h.get_value(), None => return None, }; - let versions: Vec = versions.split(',') - .filter_map(|v| { - match v.trim() { - "1.0" => Some(StompVersion::Stomp_v1_0), - "1.1" => Some(StompVersion::Stomp_v1_1), - "1.2" => Some(StompVersion::Stomp_v1_2), - _ => None, - } - }) - .collect(); + let versions: Vec = versions + .split(',') + .filter_map(|v| match v.trim() { + "1.0" => Some(StompVersion::Stomp_v1_0), + "1.1" => Some(StompVersion::Stomp_v1_1), + "1.2" => Some(StompVersion::Stomp_v1_2), + _ => None, + }) + .collect(); Some(versions) } @@ -173,9 +174,10 @@ impl HeaderList { Some(h) => h.get_value(), None => return None, }; - let spec_list: Vec = spec.split(',') - .filter_map(|str_val| str_val.parse::().ok()) - .collect(); + let spec_list: Vec = spec + .split(',') + .filter_map(|str_val| str_val.parse::().ok()) + .collect(); if spec_list.len() != 2 { return None; diff --git a/src/lib.rs b/src/lib.rs index 2263cd3..d5e38fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,22 +3,22 @@ #[macro_use] extern crate log; +extern crate bytes; extern crate futures; -extern crate tokio_io; extern crate tokio_core; +extern crate tokio_io; extern crate unicode_segmentation; -extern crate bytes; #[macro_use] extern crate nom; -pub mod connection; -pub mod header; pub mod codec; +pub mod connection; pub mod frame; -pub mod session; -pub mod subscription; -pub mod transaction; +pub mod header; pub mod message_builder; +pub mod option_setter; +pub mod session; pub mod session_builder; +pub mod subscription; pub mod subscription_builder; -pub mod option_setter; +pub mod transaction; diff --git a/src/message_builder.rs b/src/message_builder.rs index 3ef202c..e0559b4 100644 --- a/src/message_builder.rs +++ b/src/message_builder.rs @@ -1,11 +1,11 @@ -use session::{Session, ReceiptRequest, OutstandingReceipt}; -use frame::Frame; -use option_setter::OptionSetter; +use crate::frame::Frame; +use crate::option_setter::OptionSetter; +use crate::session::{OutstandingReceipt, ReceiptRequest, Session}; pub struct MessageBuilder<'a> { pub session: &'a mut Session, pub frame: Frame, - pub receipt_request: Option + pub receipt_request: Option, } impl<'a> MessageBuilder<'a> { @@ -13,7 +13,7 @@ impl<'a> MessageBuilder<'a> { MessageBuilder { session: session, frame: frame, - receipt_request: None + receipt_request: None, } } @@ -21,19 +21,18 @@ impl<'a> MessageBuilder<'a> { pub fn send(self) { if self.receipt_request.is_some() { let request = self.receipt_request.unwrap(); - self.session.state.outstanding_receipts.insert( - request.id, - OutstandingReceipt::new( - self.frame.clone() - ) - ); + self.session + .state + .outstanding_receipts + .insert(request.id, OutstandingReceipt::new(self.frame.clone())); } self.session.send_frame(self.frame) } #[allow(dead_code)] pub fn with(self, option_setter: T) -> MessageBuilder<'a> - where T: OptionSetter> + where + T: OptionSetter>, { option_setter.set_option(self) } diff --git a/src/option_setter.rs b/src/option_setter.rs index 078952a..5781336 100644 --- a/src/option_setter.rs +++ b/src/option_setter.rs @@ -1,34 +1,40 @@ -use message_builder::MessageBuilder; -use session_builder::SessionBuilder; -use subscription_builder::SubscriptionBuilder; -use header::{Header, SuppressedHeader, ContentType}; -use connection::{HeartBeat, Credentials, OwnedCredentials}; -use subscription::AckMode; -use session::{ReceiptRequest, GenerateReceipt}; +use crate::connection::{Credentials, HeartBeat, OwnedCredentials}; +use crate::header::{ContentType, Header, SuppressedHeader}; +use crate::message_builder::MessageBuilder; +use crate::session::{GenerateReceipt, ReceiptRequest}; +use crate::session_builder::SessionBuilder; +use crate::subscription::AckMode; +use crate::subscription_builder::SubscriptionBuilder; pub trait OptionSetter { - fn set_option(self, T) -> T; + fn set_option(self, _: T) -> T; } -impl <'a> OptionSetter> for Header { +impl<'a> OptionSetter> for Header { fn set_option(self, mut builder: MessageBuilder<'a>) -> MessageBuilder<'a> { builder.frame.headers.push(self); builder } } -impl <'a, 'b> OptionSetter> for SuppressedHeader<'a> { +impl<'a, 'b> OptionSetter> for SuppressedHeader<'a> { fn set_option(self, mut builder: MessageBuilder<'b>) -> MessageBuilder<'b> { let SuppressedHeader(key) = self; - builder.frame.headers.retain(|header| (*header).get_key() != key); + builder + .frame + .headers + .retain(|header| (*header).get_key() != key); builder } } -impl <'a, 'b> OptionSetter> for ContentType<'a> { +impl<'a, 'b> OptionSetter> for ContentType<'a> { fn set_option(self, mut builder: MessageBuilder<'b>) -> MessageBuilder<'b> { let ContentType(content_type) = self; - builder.frame.headers.push(Header::new("content-type", content_type)); + builder + .frame + .headers + .push(Header::new("content-type", content_type)); builder } } @@ -57,19 +63,22 @@ impl<'b> OptionSetter for Credentials<'b> { impl<'b> OptionSetter for SuppressedHeader<'b> { fn set_option(self, mut builder: SessionBuilder) -> SessionBuilder { let SuppressedHeader(key) = self; - builder.config.headers.retain(|header| (*header).get_key() != key); + builder + .config + .headers + .retain(|header| (*header).get_key() != key); builder } } -impl <'a> OptionSetter> for Header { +impl<'a> OptionSetter> for Header { fn set_option(self, mut builder: SubscriptionBuilder<'a>) -> SubscriptionBuilder<'a> { builder.headers.push(self); builder } } -impl <'a, 'b> OptionSetter> for SuppressedHeader<'a> { +impl<'a, 'b> OptionSetter> for SuppressedHeader<'a> { fn set_option(self, mut builder: SubscriptionBuilder<'b>) -> SubscriptionBuilder<'b> { let SuppressedHeader(key) = self; builder.headers.retain(|header| (*header).get_key() != key); @@ -77,29 +86,34 @@ impl <'a, 'b> OptionSetter> for SuppressedHeader<'a> { } } -impl <'a> OptionSetter> for AckMode { +impl<'a> OptionSetter> for AckMode { fn set_option(self, mut builder: SubscriptionBuilder<'a>) -> SubscriptionBuilder<'a> { builder.ack_mode = self; builder } } -impl <'a> OptionSetter> for GenerateReceipt { +impl<'a> OptionSetter> for GenerateReceipt { fn set_option(self, mut builder: MessageBuilder<'a>) -> MessageBuilder<'a> { let next_id = builder.session.generate_receipt_id(); let receipt_id = format!("message/{}", next_id); builder.receipt_request = Some(ReceiptRequest::new(receipt_id.clone())); - builder.frame.headers.push(Header::new("receipt", receipt_id.as_ref())); + builder + .frame + .headers + .push(Header::new("receipt", receipt_id.as_ref())); builder } } -impl <'a> OptionSetter> for GenerateReceipt { +impl<'a> OptionSetter> for GenerateReceipt { fn set_option(self, mut builder: SubscriptionBuilder<'a>) -> SubscriptionBuilder<'a> { let next_id = builder.session.generate_receipt_id(); let receipt_id = format!("message/{}", next_id); builder.receipt_request = Some(ReceiptRequest::new(receipt_id.clone())); - builder.headers.push(Header::new("receipt", receipt_id.as_ref())); + builder + .headers + .push(Header::new("receipt", receipt_id.as_ref())); builder } } diff --git a/src/session.rs b/src/session.rs index 220387e..74e23d2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,20 +1,23 @@ +use crate::codec::Codec; +use crate::connection::{self, Connection}; +use crate::frame; +use crate::frame::Transmission::{self, CompleteFrame, HeartBeat}; +use crate::header::{self, Header}; +use crate::message_builder::MessageBuilder; +use crate::session_builder::SessionConfig; +use crate::subscription_builder::SubscriptionBuilder; +use crate::transaction::Transaction; +use crate::{ + frame::{Command, Frame, ToFrameBody}, + subscription::{AckMode, AckOrNack, Subscription}, +}; +use futures::*; use std::collections::hash_map::HashMap; use std::io::Result; -use connection::{self, Connection}; -use subscription::{AckMode, AckOrNack, Subscription}; -use frame::{Frame, Command, ToFrameBody}; -use frame::Transmission::{self, HeartBeat, CompleteFrame}; -use header::{self, Header}; -use transaction::Transaction; -use session_builder::SessionConfig; -use message_builder::MessageBuilder; -use subscription_builder::SubscriptionBuilder; -use tokio_core::net::{TcpStreamNew, TcpStream}; -use tokio_core::reactor::{Timeout, Handle}; +use tokio_core::net::{TcpStream, TcpStreamNew}; +use tokio_core::reactor::{Handle, Timeout}; use tokio_io::codec::Framed; -use codec::Codec; use tokio_io::AsyncRead; -use futures::*; const GRACE_PERIOD_MULTIPLIER: f32 = 2.0; @@ -25,7 +28,7 @@ pub struct OutstandingReceipt { impl OutstandingReceipt { pub fn new(original_frame: Frame) -> Self { OutstandingReceipt { - original_frame: original_frame + original_frame: original_frame, } } } @@ -36,9 +39,7 @@ pub struct ReceiptRequest { impl ReceiptRequest { pub fn new(id: String) -> Self { - ReceiptRequest { - id: id, - } + ReceiptRequest { id: id } } } @@ -51,7 +52,7 @@ pub struct SessionState { pub rx_heartbeat_timeout: Option, pub tx_heartbeat_timeout: Option, pub subscriptions: HashMap, - pub outstanding_receipts: HashMap + pub outstanding_receipts: HashMap, } impl SessionState { @@ -75,18 +76,19 @@ impl Session { pub fn send_frame(&mut self, fr: Frame) { self.send(Transmission::CompleteFrame(fr)) } - pub fn message<'builder, T: ToFrameBody>(&'builder mut self, - destination: &str, - body_convertible: T) - -> MessageBuilder<'builder> { + pub fn message<'builder, T: ToFrameBody>( + &'builder mut self, + destination: &str, + body_convertible: T, + ) -> MessageBuilder<'builder> { let send_frame = Frame::send(destination, body_convertible.to_frame_body()); MessageBuilder::new(self, send_frame) } - pub fn subscription<'builder>(&'builder mut self, - destination: &str) - -> SubscriptionBuilder<'builder> - { + pub fn subscription<'builder>( + &'builder mut self, + destination: &str, + ) -> SubscriptionBuilder<'builder> { SubscriptionBuilder::new(self, destination.to_owned()) } @@ -106,14 +108,18 @@ impl Session { self.send_frame(Frame::disconnect()); } pub fn reconnect(&mut self) -> ::std::io::Result<()> { - use std::net::ToSocketAddrs; use std::io; + use std::net::ToSocketAddrs; info!("Reconnecting..."); let address = (&self.config.host as &str, self.config.port) - .to_socket_addrs()?.nth(0) - .ok_or(io::Error::new(io::ErrorKind::Other, "address provided resolved to nothing"))?; + .to_socket_addrs()? + .nth(0) + .ok_or(io::Error::new( + io::ErrorKind::Other, + "address provided resolved to nothing", + ))?; self.stream = StreamState::Connecting(TcpStream::connect(&address, &self.hdl)); task::current().notify(); Ok(()) @@ -122,8 +128,7 @@ impl Session { if let Some(header::Ack(ack_id)) = frame.headers.get_ack() { let ack_frame = if let AckOrNack::Ack = which { Frame::ack(ack_id) - } - else { + } else { Frame::nack(ack_id) }; self.send_frame(ack_frame); @@ -134,10 +139,11 @@ impl Session { impl Session { pub(crate) fn new(config: SessionConfig, stream: TcpStreamNew, hdl: Handle) -> Self { Self { - config, hdl, + config, + hdl, state: SessionState::new(), events: vec![], - stream: StreamState::Connecting(stream) + stream: StreamState::Connecting(stream), } } pub(crate) fn generate_transaction_id(&mut self) -> u32 { @@ -164,8 +170,7 @@ impl Session { if let StreamState::Connected(ref mut st) = self.stream { st.start_send(tx)?; st.poll_complete()?; - } - else { + } else { warn!("sending {:?} whilst disconnected", tx); } Ok(()) @@ -183,8 +188,10 @@ impl Session { } let tx_heartbeat_ms = self.state.tx_heartbeat_ms.unwrap(); if tx_heartbeat_ms <= 0 { - debug!("Heartbeat transmission ms is {}, no need to register a callback.", - tx_heartbeat_ms); + debug!( + "Heartbeat transmission ms is {}, no need to register a callback.", + tx_heartbeat_ms + ); return Ok(()); } let timeout = Timeout::new(Duration::from_millis(tx_heartbeat_ms as _), &self.hdl)?; @@ -195,16 +202,19 @@ impl Session { fn register_rx_heartbeat_timeout(&mut self) -> Result<()> { use std::time::Duration; - let rx_heartbeat_ms = self.state.rx_heartbeat_ms - .unwrap_or_else(|| { - debug!("Trying to register RX heartbeat timeout but no \ + let rx_heartbeat_ms = self.state.rx_heartbeat_ms.unwrap_or_else(|| { + debug!( + "Trying to register RX heartbeat timeout but no \ rx_heartbeat_ms was set. This is expected for receipt \ - of CONNECTED."); - 0 - }); + of CONNECTED." + ); + 0 + }); if rx_heartbeat_ms <= 0 { - debug!("Heartbeat receipt ms is {}, no need to register a callback.", - rx_heartbeat_ms); + debug!( + "Heartbeat receipt ms is {}, no need to register a callback.", + rx_heartbeat_ms + ); return Ok(()); } let timeout = Timeout::new(Duration::from_millis(rx_heartbeat_ms as _), &self.hdl)?; @@ -238,11 +248,13 @@ impl Session { fn on_stream_ready(&mut self) { debug!("Stream ready!"); // Add credentials to the header list if specified - match self.config.credentials.clone() { // TODO: Refactor to avoid clone + match self.config.credentials.clone() { + // TODO: Refactor to avoid clone Some(credentials) => { - debug!("Using provided credentials: login '{}', passcode '{}'", - credentials.login, - credentials.passcode); + debug!( + "Using provided credentials: login '{}', passcode '{}'", + credentials.login, credentials.passcode + ); let mut headers = &mut self.config.headers; headers.push(Header::new("login", &credentials.login)); headers.push(Header::new("passcode", &credentials.passcode)); @@ -253,7 +265,9 @@ impl Session { let connection::HeartBeat(client_tx_ms, client_rx_ms) = self.config.heartbeat; let heart_beat_string = format!("{},{}", client_tx_ms, client_rx_ms); debug!("Using heartbeat: {},{}", client_tx_ms, client_rx_ms); - self.config.headers.push(Header::new("heart-beat", heart_beat_string.as_ref())); + self.config + .headers + .push(Header::new("heart-beat", heart_beat_string.as_ref())); let connect_frame = Frame { command: Command::Connect, @@ -274,10 +288,9 @@ impl Session { self.events.push(SessionEvent::Message { destination, ack_mode, - frame + frame, }); - } - else { + } else { self.events.push(SessionEvent::SubscriptionlessFrame(frame)); } } @@ -292,11 +305,10 @@ impl Session { None => (0, 0), }; - let (agreed_upon_tx_ms, agreed_upon_rx_ms) = Connection::select_heartbeat(client_tx_ms, - client_rx_ms, - server_tx_ms, - server_rx_ms); - self.state.rx_heartbeat_ms = Some((agreed_upon_rx_ms as f32 * GRACE_PERIOD_MULTIPLIER) as u32); + let (agreed_upon_tx_ms, agreed_upon_rx_ms) = + Connection::select_heartbeat(client_tx_ms, client_rx_ms, server_tx_ms, server_rx_ms); + self.state.rx_heartbeat_ms = + Some((agreed_upon_rx_ms as f32 * GRACE_PERIOD_MULTIPLIER) as u32); self.state.tx_heartbeat_ms = Some(agreed_upon_tx_ms); self.register_tx_heartbeat_timeout()?; @@ -310,8 +322,7 @@ impl Session { let receipt_id = { if let Some(header::ReceiptId(receipt_id)) = frame.headers.get_receipt_id() { Some(receipt_id.to_owned()) - } - else { + } else { None } }; @@ -324,7 +335,7 @@ impl Session { self.events.push(SessionEvent::Receipt { id: receipt_id, original: original_frame, - receipt: frame + receipt: frame, }); } } @@ -334,8 +345,7 @@ impl Session { let res = { if let StreamState::Connected(ref mut fr) = self.stream { fr.poll_complete() - } - else { + } else { Ok(Async::NotReady) } }; @@ -347,46 +357,42 @@ impl Session { use self::StreamState::*; loop { match ::std::mem::replace(&mut self.stream, Failed) { - Connected(mut fr) => { - match fr.poll() { - Ok(Async::Ready(Some(r))) => { - self.stream = Connected(fr); - return Async::Ready(Some(r)); - }, - Ok(Async::Ready(None)) => { - self.on_disconnect(DisconnectionReason::ClosedByOtherSide); - return Async::NotReady; - }, - Ok(Async::NotReady) => { - self.stream = Connected(fr); - return Async::NotReady; - }, - Err(e) => { - self.on_disconnect(DisconnectionReason::RecvFailed(e)); - return Async::NotReady; - }, + Connected(mut fr) => match fr.poll() { + Ok(Async::Ready(Some(r))) => { + self.stream = Connected(fr); + return Async::Ready(Some(r)); + } + Ok(Async::Ready(None)) => { + self.on_disconnect(DisconnectionReason::ClosedByOtherSide); + return Async::NotReady; + } + Ok(Async::NotReady) => { + self.stream = Connected(fr); + return Async::NotReady; + } + Err(e) => { + self.on_disconnect(DisconnectionReason::RecvFailed(e)); + return Async::NotReady; } }, - Connecting(mut tsn) => { - match tsn.poll() { - Ok(Async::Ready(s)) => { - let fr = s.framed(Codec); - self.stream = Connected(fr); - self.on_stream_ready(); - }, - Ok(Async::NotReady) => { - self.stream = Connecting(tsn); - return Async::NotReady; - }, - Err(e) => { - self.on_disconnect(DisconnectionReason::ConnectFailed(e)); - return Async::NotReady; - }, + Connecting(mut tsn) => match tsn.poll() { + Ok(Async::Ready(s)) => { + let fr = s.framed(Codec); + self.stream = Connected(fr); + self.on_stream_ready(); + } + Ok(Async::NotReady) => { + self.stream = Connecting(tsn); + return Async::NotReady; + } + Err(e) => { + self.on_disconnect(DisconnectionReason::ConnectFailed(e)); + return Async::NotReady; } }, Failed => { return Async::NotReady; - }, + } } } } @@ -398,7 +404,7 @@ pub enum DisconnectionReason { SendFailed(::std::io::Error), ClosedByOtherSide, HeartbeatTimeout, - Requested + Requested, } pub enum SessionEvent { Connected, @@ -406,28 +412,28 @@ pub enum SessionEvent { Receipt { id: String, original: Frame, - receipt: Frame + receipt: Frame, }, Message { destination: String, ack_mode: AckMode, - frame: Frame + frame: Frame, }, SubscriptionlessFrame(Frame), UnknownFrame(Frame), - Disconnected(DisconnectionReason) + Disconnected(DisconnectionReason), } pub(crate) enum StreamState { Connected(Framed), Connecting(TcpStreamNew), - Failed + Failed, } pub struct Session { config: SessionConfig, pub(crate) state: SessionState, stream: StreamState, hdl: Handle, - events: Vec + events: Vec, } impl Stream for Session { type Item = SessionEvent; @@ -441,7 +447,7 @@ impl Stream for Session { HeartBeat => { debug!("Received heartbeat."); self.on_recv_data()?; - }, + } CompleteFrame(frame) => { debug!("Received frame: {:?}", frame); self.on_recv_data()?; @@ -450,13 +456,15 @@ impl Stream for Session { Command::Receipt => self.handle_receipt(frame), Command::Connected => self.on_connected_frame_received(frame)?, Command::Message => self.on_message(frame), - _ => self.events.push(SessionEvent::UnknownFrame(frame)) + _ => self.events.push(SessionEvent::UnknownFrame(frame)), }; } } } - let rxh = self.state.rx_heartbeat_timeout + let rxh = self + .state + .rx_heartbeat_timeout .as_mut() .map(|t| t.poll()) .unwrap_or(Ok(Async::NotReady))?; @@ -465,7 +473,9 @@ impl Stream for Session { self.on_disconnect(DisconnectionReason::HeartbeatTimeout); } - let txh = self.state.tx_heartbeat_timeout + let txh = self + .state + .tx_heartbeat_timeout .as_mut() .map(|t| t.poll()) .unwrap_or(Ok(Async::NotReady))?; @@ -482,8 +492,7 @@ impl Stream for Session { task::current().notify(); } Ok(Async::Ready(Some(self.events.remove(0)))) - } - else { + } else { Ok(Async::NotReady) } } diff --git a/src/session_builder.rs b/src/session_builder.rs index 35cbebe..e388a2d 100644 --- a/src/session_builder.rs +++ b/src/session_builder.rs @@ -1,12 +1,12 @@ -use option_setter::OptionSetter; -use connection::{HeartBeat, OwnedCredentials}; -use header::{HeaderList, Header}; +use crate::connection::{HeartBeat, OwnedCredentials}; +use crate::header::{Header, HeaderList}; +use crate::option_setter::OptionSetter; -use std::net::ToSocketAddrs; -use session::{Session}; +use crate::session::Session; use std::io; -use tokio_core::reactor::Handle; +use std::net::ToSocketAddrs; use tokio_core::net::TcpStream; +use tokio_core::reactor::Handle; #[derive(Clone)] pub struct SessionConfig { @@ -18,40 +18,45 @@ pub struct SessionConfig { } pub struct SessionBuilder { - pub config: SessionConfig + pub config: SessionConfig, } impl SessionBuilder { - pub fn new(host: &str, - port: u16) - -> SessionBuilder { + pub fn new(host: &str, port: u16) -> SessionBuilder { let config = SessionConfig { host: host.to_owned(), port: port, credentials: None, heartbeat: HeartBeat(0, 0), headers: header_list![ - "host" => host, - "accept-version" => "1.2", - "content-length" => "0" - ], + "host" => host, + "accept-version" => "1.2", + "content-length" => "0" + ], }; - SessionBuilder { - config: config, - } + SessionBuilder { config: config } } #[allow(dead_code)] pub fn start<'b, 'c>(self, hdl: Handle) -> ::std::io::Result { let address = (&self.config.host as &str, self.config.port) - .to_socket_addrs()?.nth(0) - .ok_or(io::Error::new(io::ErrorKind::Other, "address provided resolved to nothing"))?; - Ok(Session::new(self.config, TcpStream::connect(&address, &hdl), hdl)) + .to_socket_addrs()? + .nth(0) + .ok_or(io::Error::new( + io::ErrorKind::Other, + "address provided resolved to nothing", + ))?; + Ok(Session::new( + self.config, + TcpStream::connect(&address, &hdl), + hdl, + )) } #[allow(dead_code)] pub fn with<'b, T>(self, option_setter: T) -> SessionBuilder - where T: OptionSetter + where + T: OptionSetter, { option_setter.set_option(self) } diff --git a/src/subscription.rs b/src/subscription.rs index 9dfb553..f4d6feb 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,4 +1,4 @@ -use header::HeaderList; +use crate::header::HeaderList; #[derive(Copy, Clone)] pub enum AckMode { @@ -30,13 +30,8 @@ pub struct Subscription { pub headers: HeaderList, } - impl Subscription { - pub fn new(id: u32, - destination: &str, - ack_mode: AckMode, - headers: HeaderList) - -> Subscription { + pub fn new(id: u32, destination: &str, ack_mode: AckMode, headers: HeaderList) -> Subscription { Subscription { id: format!("stomp-rs/{}", id), destination: destination.to_string(), diff --git a/src/subscription_builder.rs b/src/subscription_builder.rs index cc49e16..e006030 100644 --- a/src/subscription_builder.rs +++ b/src/subscription_builder.rs @@ -1,63 +1,67 @@ -use session::{Session, ReceiptRequest, OutstandingReceipt}; -use subscription::{Subscription, AckMode}; -use frame::Frame; -use header::HeaderList; -use option_setter::OptionSetter; +use crate::frame::Frame; +use crate::header::HeaderList; +use crate::option_setter::OptionSetter; +use crate::session::{OutstandingReceipt, ReceiptRequest, Session}; +use crate::subscription::{AckMode, Subscription}; pub struct SubscriptionBuilder<'a> { pub session: &'a mut Session, pub destination: String, pub ack_mode: AckMode, pub headers: HeaderList, - pub receipt_request: Option + pub receipt_request: Option, } impl<'a> SubscriptionBuilder<'a> { - pub fn new(session: &'a mut Session, - destination: String) -> Self { - SubscriptionBuilder { - session: session, - destination: destination, - ack_mode: AckMode::Auto, - headers: HeaderList::new(), - receipt_request: None - } + pub fn new(session: &'a mut Session, destination: String) -> Self { + SubscriptionBuilder { + session: session, + destination: destination, + ack_mode: AckMode::Auto, + headers: HeaderList::new(), + receipt_request: None, + } } #[allow(dead_code)] pub fn start(mut self) -> String { let next_id = self.session.generate_subscription_id(); - let subscription = Subscription::new(next_id, - &self.destination, - self.ack_mode, - self.headers.clone()); - let mut subscribe_frame = Frame::subscribe(&subscription.id, - &self.destination, - self.ack_mode); + let subscription = Subscription::new( + next_id, + &self.destination, + self.ack_mode, + self.headers.clone(), + ); + let mut subscribe_frame = + Frame::subscribe(&subscription.id, &self.destination, self.ack_mode); subscribe_frame.headers.concat(&mut self.headers); self.session.send_frame(subscribe_frame.clone()); - debug!("Registering callback for subscription id '{}' from builder", - subscription.id); + debug!( + "Registering callback for subscription id '{}' from builder", + subscription.id + ); let id_to_return = subscription.id.to_string(); - self.session.state.subscriptions.insert(subscription.id.to_string(), subscription); + self.session + .state + .subscriptions + .insert(subscription.id.to_string(), subscription); if self.receipt_request.is_some() { let request = self.receipt_request.unwrap(); - self.session.state.outstanding_receipts.insert( - request.id, - OutstandingReceipt::new( - subscribe_frame.clone(), - ) - ); + self.session + .state + .outstanding_receipts + .insert(request.id, OutstandingReceipt::new(subscribe_frame.clone())); } id_to_return } #[allow(dead_code)] pub fn with(self, option_setter: T) -> SubscriptionBuilder<'a> - where T: OptionSetter> + where + T: OptionSetter>, { option_setter.set_option(self) } diff --git a/src/transaction.rs b/src/transaction.rs index 1389ec3..0747f06 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,8 +1,8 @@ -use frame::Frame; -use frame::ToFrameBody; -use message_builder::MessageBuilder; -use header::Header; -use session::{Session}; +use crate::frame::Frame; +use crate::frame::ToFrameBody; +use crate::header::Header; +use crate::message_builder::MessageBuilder; +use crate::session::Session; pub struct Transaction<'tx> { pub id: String, @@ -10,20 +10,22 @@ pub struct Transaction<'tx> { } impl<'tx> Transaction<'tx> { - pub fn new(session: &'tx mut Session) - -> Transaction<'tx> { + pub fn new(session: &'tx mut Session) -> Transaction<'tx> { Transaction { id: format!("tx/{}", session.generate_transaction_id()), session: session, } } - pub fn message<'builder, T: ToFrameBody>(&'builder mut self, - destination: &str, - body_convertible: T) - -> MessageBuilder<'builder> { + pub fn message<'builder, T: ToFrameBody>( + &'builder mut self, + destination: &str, + body_convertible: T, + ) -> MessageBuilder<'builder> { let mut send_frame = Frame::send(destination, body_convertible.to_frame_body()); - send_frame.headers.push(Header::new("transaction", self.id.as_ref())); + send_frame + .headers + .push(Header::new("transaction", self.id.as_ref())); MessageBuilder::new(self.session, send_frame) }