Skip to content

Commit

Permalink
Change Peer::new to return Result
Browse files Browse the repository at this point in the history
This is a larger refactor that aims to make the library more stable by removing unwrap and expect

Resolves #115
  • Loading branch information
herr-seppia committed Oct 20, 2022
1 parent 423cf51 commit aede873
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 98 deletions.
6 changes: 3 additions & 3 deletions src/encoding/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl Message {
}
}

pub(crate) fn bytes(&self) -> Vec<u8> {
pub(crate) fn bytes(&self) -> io::Result<Vec<u8>> {
let mut bytes = vec![];
self.marshal_binary(&mut bytes).unwrap();
bytes
self.marshal_binary(&mut bytes)?;
Ok(bytes)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/encoding/payload/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Marshallable for PeerEncodedInfo {
let ipv6_bytes: [u8; 16] = concat_u8(&ipv4[1..], &ipv6[..])
.as_slice()
.try_into()
.expect("Wrong length");
.expect("ipv6_bytes to be length 16");

IpInfo::IPv6(ipv6_bytes)
}
Expand Down
6 changes: 5 additions & 1 deletion src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,15 @@ impl MessageHandler {

let messages: Vec<(Message, Vec<SocketAddr>)> = table_read
.extract(Some((payload.height - 1).into()))
.filter_map(|(height, nodes)|{
// skip heights greater than u8 (this should never happen)
height.try_into().ok().map(|a: u8| (a,nodes))
})
.map(|(height, nodes)| {
let msg = Message::Broadcast(
my_header,
BroadcastPayload {
height: height.try_into().unwrap(),
height,
gossip_frame: payload
.gossip_frame
.clone(), //FIX_ME: avoid clone
Expand Down
8 changes: 6 additions & 2 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ impl<V> Bucket<V> {
self.try_perform_eviction();
return Ok(NodeInsertOk::Updated {
pending_eviction: self.pending_eviction_node(),
updated: self.nodes.last().unwrap(),
updated: self.nodes.last().expect(
"last node to exist because it's been just updated",
),
});
}
self.try_perform_eviction();
match self.nodes.try_push(node) {
Ok(_) => Ok(NodeInsertOk::Inserted {
inserted: self.nodes.last().unwrap(),
inserted: self.nodes.last().expect(
"last node to exist because it's been just inserted",
),
}),
Err(err) => {
if self
Expand Down
10 changes: 4 additions & 6 deletions src/kbucket/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::K_NONCE_LEN;
pub type BinaryKey = [u8; K_ID_LEN_BYTES];
pub type BinaryNonce = [u8; K_NONCE_LEN];

use blake2::{Blake2s, Digest};
use blake2::{Blake2s256, Digest};

use crate::{K_DIFF_MIN_BIT, K_DIFF_PRODUCED_BIT};

Expand Down Expand Up @@ -60,9 +60,7 @@ impl BinaryID {
.enumerate()
.rev()
.find(|(_, b)| b != &0b0)
.map(|(i, b)| {
BinaryID::msb(b).expect("Can't be None") + (i << 3) - 1
})
.map(|(i, b)| BinaryID::msb(b).expect("to be Some") + (i << 3) - 1)
}

// Returns the position of the most-significant bit set in a byte,
Expand All @@ -83,7 +81,7 @@ impl BinaryID {
panic!("PoW is less than minimum required, review your build config...")
}
let mut nonce: u32 = 0;
let mut hasher = Blake2s::new();
let mut hasher = Blake2s256::new();
loop {
hasher.update(id);
let nonce_bytes = nonce.to_le_bytes();
Expand All @@ -102,7 +100,7 @@ impl BinaryID {
}

pub fn verify_nonce(&self) -> bool {
let mut hasher = Blake2s::new();
let mut hasher = Blake2s256::new();
hasher.update(self.bytes);
hasher.update(self.nonce);
BinaryID::verify_difficulty(
Expand Down
16 changes: 11 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::net::AddrParseError;
use std::{convert::TryInto, net::SocketAddr, time::Duration};

use config::Config;
Expand Down Expand Up @@ -76,9 +77,9 @@ impl Peer {
pub fn new<L: NetworkListen + 'static>(
config: Config,
listener: L,
) -> Self {
) -> Result<Self, AddrParseError> {
let tree = Tree::new(
PeerNode::generate(&config.public_address[..]),
PeerNode::generate(&config.public_address[..])?,
config.bucket,
);

Expand Down Expand Up @@ -107,7 +108,7 @@ impl Peer {
WireNetwork::start(inbound_channel_tx, outbound_channel_rx, config);
TableMantainer::start(bootstrapping_nodes, table, outbound_channel_tx);
task::spawn(Peer::notifier(listener_channel_rx, listener));
peer
Ok(peer)
}

async fn notifier(
Expand Down Expand Up @@ -179,11 +180,16 @@ impl Peer {
.read()
.await
.extract(height)
.map(|(h, nodes)| {
.flat_map(|(h, nodes)| {
h.try_into()
.map_err(|_| error!("Invalid extracted height: {}", h))
.map(|h| (h, nodes))
})
.map(|(height, nodes)| {
let msg = Message::Broadcast(
self.header,
BroadcastPayload {
height: h.try_into().unwrap(),
height,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
},
);
Expand Down
26 changes: 11 additions & 15 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::kbucket::{BinaryID, BinaryKey};
use blake2::{Blake2s, Digest};
use blake2::{Blake2s256, Digest};
use std::convert::TryInto;
use std::net::{IpAddr, SocketAddr};
use std::net::{AddrParseError, IpAddr, SocketAddr};
pub type PeerNode = Node<PeerInfo>;
use crate::encoding::message::Header;
use crate::encoding::payload::{IpInfo, PeerEncodedInfo};

use crate::kbucket::Node;
use crate::K_ID_LEN_BYTES;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PeerInfo {
address: SocketAddr,
Expand All @@ -25,14 +26,13 @@ impl PeerInfo {
}

impl PeerNode {
pub fn generate(address: &str) -> Self {
let server: SocketAddr =
address.parse().expect("Unable to parse address");
pub fn generate(address: &str) -> Result<Self, AddrParseError> {
let server: SocketAddr = address.parse()?;
let info = PeerInfo { address: server };
let binary =
PeerNode::compute_id(&info.address.ip(), info.address.port());
let id = BinaryID::generate(binary);
Node::new(id, info)
Ok(Node::new(id, info))
}

pub fn from_socket(address: SocketAddr, id: BinaryID) -> Self {
Expand All @@ -46,20 +46,16 @@ impl PeerNode {
}

pub(crate) fn compute_id(ip: &IpAddr, port: u16) -> BinaryKey {
let mut hasher = Blake2s::new();
let mut hasher = Blake2s256::new();
hasher.update(port.to_le_bytes());
match ip {
IpAddr::V4(ip) => hasher.update(ip.octets()),
IpAddr::V6(ip) => hasher.update(ip.octets()),
};
let a: [u8; 32] = hasher
.finalize()
.as_slice()
.try_into()
.expect("Wrong length");
let mut x = vec![0u8; crate::K_ID_LEN_BYTES];
x.clone_from_slice(&a[..crate::K_ID_LEN_BYTES]);
x.try_into().expect("Wrong length")
let a: [u8; 32] = hasher.finalize().into();
let mut x = vec![0u8; K_ID_LEN_BYTES];
x.clone_from_slice(&a[..K_ID_LEN_BYTES]);
x.try_into().expect("compute_id length = K_ID_LEN_BYTES")
}

pub(crate) fn as_header(&self) -> Header {
Expand Down
35 changes: 24 additions & 11 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ impl WireNetwork {
match Message::unmarshal_binary(&mut &message[..]) {
Ok(deser) => {
debug!("> Received raw message {}", deser.type_byte());
let to_process = decoder.decode(deser);

let to_process = decoder.decode(deser).map_err(|e|{
error!("Unable to process the message through the decoder: {}",e);
})
.ok().flatten();

if let Some(message) = to_process {
let valid_header = PeerNode::verify_header(
message.header(),
Expand Down Expand Up @@ -165,17 +170,25 @@ impl WireNetwork {
to,
message.type_byte()
);
let chunks: Vec<Vec<u8>> =
encoder.encode(message).iter().map(|m| m.bytes()).collect();
for remote_addr in to.iter() {
for chunk in &chunks {
output_sockets
.send(chunk, remote_addr)
.await
.unwrap_or_else(|e| {
error!("Unable to send msg {}", e)
});

match encoder.encode(message) {
Ok(chunks) => {
let chunks: Vec<Vec<u8>> = chunks
.iter()
.filter_map(|m| m.bytes().ok())
.collect();
for remote_addr in to.iter() {
for chunk in &chunks {
output_sockets
.send(chunk, remote_addr)
.await
.unwrap_or_else(|e| {
error!("Unable to send msg {}", e)
});
}
}
}
Err(e) => error!("Unable to encode msg {}", e),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/transport/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub type TransportEncoderConfig =
pub type TransportDecoderConfig =
<self::TransportDecoder as Configurable>::TConf;
use crate::encoding::message::Message;
use std::io;

pub trait Configurable {
type TConf;
Expand All @@ -23,9 +24,9 @@ pub trait Configurable {
}

pub(crate) trait Encoder: Configurable {
fn encode(&self, msg: Message) -> Vec<Message>;
fn encode(&self, msg: Message) -> io::Result<Vec<Message>>;
}

pub(crate) trait Decoder: Configurable {
fn decode(&mut self, chunk: Message) -> Option<Message>;
fn decode(&mut self, chunk: Message) -> io::Result<Option<Message>>;
}
11 changes: 6 additions & 5 deletions src/transport/encoding/plain_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::HashMap;
use crate::encoding::message::Message;

use super::{Configurable, Decoder, Encoder};
use std::io;

pub(crate) struct PlainEncoder {}

Expand All @@ -23,17 +24,17 @@ impl Configurable for PlainEncoder {
}

impl Encoder for PlainEncoder {
fn encode<'msg>(&self, msg: Message) -> Vec<Message> {
vec![msg]
fn encode<'msg>(&self, msg: Message) -> io::Result<Vec<Message>> {
Ok(vec![msg])
}
}

impl Decoder for PlainEncoder {
fn decode(&mut self, chunk: Message) -> Option<Message> {
fn decode(&mut self, chunk: Message) -> io::Result<Option<Message>> {
if let Message::Broadcast(header, payload) = chunk {
Some(Message::Broadcast(header, payload))
Ok(Some(Message::Broadcast(header, payload)))
} else {
Some(chunk)
Ok(Some(chunk))
}
}
}
Loading

0 comments on commit aede873

Please sign in to comment.