Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated to rust 1.62 #104

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ name = "stomp"
readme = "README.md"
repository = "https://github.com/zslayton/stomp-rs"
version = "0.12.0"

rust-version = "1.62"
edition = "2021"
[dependencies]
bytes = "0.4"
futures = "0.1"
Expand Down
43 changes: 20 additions & 23 deletions src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use header::{Header, HeaderList};
use frame::{Frame, Transmission};
use crate::frame::{Command, Frame, Transmission};
use crate::header::{Header, HeaderList};
use bytes::BytesMut;
use frame::Command;
use tokio_io::codec::{Encoder, Decoder};
use nom::{line_ending, anychar};
use nom::{anychar, line_ending};
use tokio_io::codec::{Decoder, Encoder};

named!(parse_server_command(&[u8]) -> Command,
alt!(
Expand Down Expand Up @@ -43,26 +42,22 @@ fn get_body<'a, 'b>(bytes: &'a [u8], headers: &'b [Header]) -> ::nom::IResult<&'
trace!("found content-length header");
match header.1.parse::<u32>() {
Ok(value) => content_length = Some(value),
Err(error) => warn!("failed to parse content-length header: {}", error)
Err(error) => warn!("failed to parse content-length header: {}", error),
}
}
}
if let Some(content_length) = content_length {
trace!("using content-length header: {}", content_length);
take!(bytes, content_length)
}
else {
} else {
trace!("using many0 method to parse body");
map!(bytes,
many0!(is_not!("\0")),
|body| {
if body.len() == 0 {
&[]
} else {
body.into_iter().nth(0).unwrap()
}
map!(bytes, many0!(is_not!("\0")), |body| {
if body.len() == 0 {
&[]
} else {
body.into_iter().nth(0).unwrap()
}
)
})
}
}
named!(parse_frame(&[u8]) -> Frame,
Expand Down Expand Up @@ -96,7 +91,11 @@ pub struct Codec;
impl Encoder for Codec {
type Item = Transmission;
type Error = ::std::io::Error;
fn encode(&mut self, item: Transmission, buffer: &mut BytesMut) -> Result<(), ::std::io::Error> {
fn encode(
&mut self,
item: Transmission,
buffer: &mut BytesMut,
) -> Result<(), ::std::io::Error> {
item.write(buffer);
Ok(())
}
Expand All @@ -111,14 +110,12 @@ impl Decoder for Codec {

trace!("decoding data: {:?}", src);
let (point, data) = match parse_transmission(src) {
IResult::Done(rest, data) => {
(rest.len(), data)
},
IResult::Done(rest, data) => (rest.len(), data),
IResult::Error(e) => {
warn!("parse error: {:?}", e);
return Err(Error::new(ErrorKind::Other, format!("parse error: {}", e)));
},
IResult::Incomplete(_) => return Ok(None)
}
IResult::Incomplete(_) => return Ok(None),
};
let len = src.len().saturating_sub(point);
src.split_to(len);
Expand Down
11 changes: 6 additions & 5 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ impl OwnedCredentials {
}

impl Connection {
pub fn select_heartbeat(client_tx_ms: u32,
client_rx_ms: u32,
server_tx_ms: u32,
server_rx_ms: u32)
-> (u32, u32) {
pub fn select_heartbeat(
client_tx_ms: u32,
client_rx_ms: u32,
server_tx_ms: u32,
server_rx_ms: u32,
) -> (u32, u32) {
let heartbeat_tx_ms: u32;
let heartbeat_rx_ms: u32;
if client_tx_ms == 0 || server_rx_ms == 0 {
Expand Down
15 changes: 6 additions & 9 deletions src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use header::HeaderList;
use header::Header;
use subscription::AckMode;
use std::str::from_utf8;
use std::fmt;
use std::fmt::Formatter;
use crate::{header::{Header, HeaderList}, header_list, subscription::AckMode};
use bytes::BytesMut;
use std::fmt::{self, Formatter};
use std::str::from_utf8;

#[derive(Copy, Clone, Debug)]
pub enum Command {
Expand All @@ -22,7 +19,7 @@ pub enum Command {
Connected,
Message,
Receipt,
Error
Error,
}
impl Command {
pub fn as_str(&self) -> &'static str {
Expand Down Expand Up @@ -108,7 +105,8 @@ impl Frame {
let mut space_required: usize = 0;
// Add one to space calculations to make room for '\n'
space_required += self.command.as_str().len() + 1;
space_required += self.headers
space_required += self
.headers
.iter()
.fold(0, |length, header| length + header.get_raw().len() + 1);
space_required += 1; // Newline at end of headers
Expand Down Expand Up @@ -176,7 +174,6 @@ impl Frame {
disconnect_frame
}


pub fn subscribe(subscription_id: &str, destination: &str, ack_mode: AckMode) -> Frame {
let subscribe_frame = Frame {
command: Command::Subscribe,
Expand Down
48 changes: 25 additions & 23 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ impl HeaderList {
HeaderList::with_capacity(0)
}
pub fn with_capacity(capacity: usize) -> HeaderList {
HeaderList { headers: Vec::with_capacity(capacity) }
HeaderList {
headers: Vec::with_capacity(capacity),
}
}

pub fn push(&mut self, header: Header) {
Expand All @@ -33,7 +35,8 @@ impl HeaderList {
}

pub fn drain<F>(&mut self, mut sink: F)
where F: FnMut(Header)
where
F: FnMut(Header),
{
while let Some(header) = self.headers.pop() {
sink(header);
Expand All @@ -48,7 +51,8 @@ impl HeaderList {
}

pub fn retain<F>(&mut self, test: F)
where F: Fn(&Header) -> bool
where
F: Fn(&Header) -> bool,
{
self.headers.retain(test)
}
Expand All @@ -73,10 +77,10 @@ impl Header {
}

pub fn encode_value(value: &str) -> String {
let mut encoded = String::new();//self.strings.detached();
let mut encoded = String::new(); //self.strings.detached();
for grapheme in UnicodeSegmentation::graphemes(value, true) {
match grapheme {
"\\" => encoded.push_str(r"\\"),// Order is significant
"\\" => encoded.push_str(r"\\"), // Order is significant
"\r" => encoded.push_str(r"\r"),
"\n" => encoded.push_str(r"\n"),
":" => encoded.push_str(r"\c"),
Expand Down Expand Up @@ -128,11 +132,9 @@ pub enum StompVersion {

impl HeaderList {
pub fn get_header<'a>(&'a self, key: &str) -> Option<&'a Header> {
self.headers.iter().find(|header| {
match **header {
ref h if h.get_key() == key => true,
_ => false,
}
self.headers.iter().find(|header| match **header {
ref h if h.get_key() == key => true,
_ => false,
})
}

Expand All @@ -141,16 +143,15 @@ impl HeaderList {
Some(h) => h.get_value(),
None => return None,
};
let versions: Vec<StompVersion> = versions.split(',')
.filter_map(|v| {
match v.trim() {
"1.0" => Some(StompVersion::Stomp_v1_0),
"1.1" => Some(StompVersion::Stomp_v1_1),
"1.2" => Some(StompVersion::Stomp_v1_2),
_ => None,
}
})
.collect();
let versions: Vec<StompVersion> = versions
.split(',')
.filter_map(|v| match v.trim() {
"1.0" => Some(StompVersion::Stomp_v1_0),
"1.1" => Some(StompVersion::Stomp_v1_1),
"1.2" => Some(StompVersion::Stomp_v1_2),
_ => None,
})
.collect();
Some(versions)
}

Expand All @@ -173,9 +174,10 @@ impl HeaderList {
Some(h) => h.get_value(),
None => return None,
};
let spec_list: Vec<u32> = spec.split(',')
.filter_map(|str_val| str_val.parse::<u32>().ok())
.collect();
let spec_list: Vec<u32> = spec
.split(',')
.filter_map(|str_val| str_val.parse::<u32>().ok())
.collect();

if spec_list.len() != 2 {
return None;
Expand Down
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

#[macro_use]
extern crate log;
extern crate bytes;
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate tokio_io;
extern crate unicode_segmentation;
extern crate bytes;
#[macro_use]
extern crate nom;

pub mod connection;
pub mod header;
pub mod codec;
pub mod connection;
pub mod frame;
pub mod session;
pub mod subscription;
pub mod transaction;
pub mod header;
pub mod message_builder;
pub mod option_setter;
pub mod session;
pub mod session_builder;
pub mod subscription;
pub mod subscription_builder;
pub mod option_setter;
pub mod transaction;
23 changes: 11 additions & 12 deletions src/message_builder.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
use session::{Session, ReceiptRequest, OutstandingReceipt};
use frame::Frame;
use option_setter::OptionSetter;
use crate::frame::Frame;
use crate::option_setter::OptionSetter;
use crate::session::{OutstandingReceipt, ReceiptRequest, Session};

pub struct MessageBuilder<'a> {
pub session: &'a mut Session,
pub frame: Frame,
pub receipt_request: Option<ReceiptRequest>
pub receipt_request: Option<ReceiptRequest>,
}

impl<'a> MessageBuilder<'a> {
pub fn new(session: &'a mut Session, frame: Frame) -> Self {
MessageBuilder {
session: session,
frame: frame,
receipt_request: None
receipt_request: None,
}
}

#[allow(dead_code)]
pub fn send(self) {
if self.receipt_request.is_some() {
let request = self.receipt_request.unwrap();
self.session.state.outstanding_receipts.insert(
request.id,
OutstandingReceipt::new(
self.frame.clone()
)
);
self.session
.state
.outstanding_receipts
.insert(request.id, OutstandingReceipt::new(self.frame.clone()));
}
self.session.send_frame(self.frame)
}

#[allow(dead_code)]
pub fn with<T>(self, option_setter: T) -> MessageBuilder<'a>
where T: OptionSetter<MessageBuilder<'a>>
where
T: OptionSetter<MessageBuilder<'a>>,
{
option_setter.set_option(self)
}
Expand Down
Loading