Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mplex] Small enhancements. #1785

Merged
merged 6 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = "0.5"
fnv = "1.0"
futures = "0.3.1"
futures_codec = "0.4"
libp2p-core = { version = "0.22.0", path = "../../core" }
log = "0.4"
nohash-hasher = "0.2"
parking_lot = "0.11"
rand = "0.7"
smallvec = "1.4"
unsigned-varint = { version = "0.5", features = ["futures-codec"] }

Expand Down
29 changes: 18 additions & 11 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::Endpoint;
use futures_codec::{Decoder, Encoder};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use futures_codec::{Decoder, Encoder};
use libp2p_core::Endpoint;
use std::{fmt, hash::{Hash, Hasher}, io, mem};
use unsigned_varint::{codec, encode};

// Maximum size for a packet: 1MB as per the spec.
Expand All @@ -46,7 +45,7 @@ pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;
/// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`.
/// > Conversely, when receiving a frame with a flag identifying the remote as a "sender",
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct LocalStreamId {
num: u32,
role: Endpoint,
Expand All @@ -61,6 +60,14 @@ impl fmt::Display for LocalStreamId {
}
}

impl Hash for LocalStreamId {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u32(self.num);
}
}

impl nohash_hasher::IsEnabled for LocalStreamId {}

/// A unique identifier used by the remote node for a substream.
///
/// `RemoteStreamId`s are received with frames from the remote
Expand Down Expand Up @@ -161,7 +168,7 @@ impl Codec {

impl Decoder for Codec {
type Item = Frame<RemoteStreamId>;
type Error = IoError;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
Expand All @@ -182,7 +189,7 @@ impl Decoder for Codec {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize);
Expand Down Expand Up @@ -213,7 +220,7 @@ impl Decoder for Codec {
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
},
};

Expand All @@ -222,7 +229,7 @@ impl Decoder for Codec {
},

CodecDecodeState::Poisoned => {
return Err(IoError::new(IoErrorKind::InvalidData, "Mplex codec poisoned"));
return Err(io::Error::new(io::ErrorKind::InvalidData, "Mplex codec poisoned"));
}
}
}
Expand All @@ -231,7 +238,7 @@ impl Decoder for Codec {

impl Encoder for Codec {
type Item = Frame<LocalStreamId>;
type Error = IoError;
type Error = io::Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Expand Down Expand Up @@ -266,7 +273,7 @@ impl Encoder for Codec {
let data_len_bytes = encode::usize(data_len, &mut data_buf);

if data_len > MAX_FRAME_SIZE {
return Err(IoError::new(IoErrorKind::InvalidData, "data size exceed maximum"));
return Err(io::Error::new(io::ErrorKind::InvalidData, "data size exceed maximum"));
}

dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len);
Expand Down
Loading