diff --git a/CHANGELOG.md b/CHANGELOG.md index a6305ed..aaf0fb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add new facade function `new` to creating `RwLock` based on feature flag. [#94] +- Add `NetworkId` in configuration [#123] ### Changed - Change `RwLock` API to support diagnostics feature flag [#94] +- Change network wire encoding to support `NetworkId` [#123] ## [0.5.0] - 2023-05-17 @@ -121,6 +123,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#112]: https://github.com/dusk-network/kadcast/issues/112 [#115]: https://github.com/dusk-network/kadcast/issues/115 [#117]: https://github.com/dusk-network/kadcast/issues/117 +[#123]: https://github.com/dusk-network/kadcast/issues/123 diff --git a/src/config.rs b/src/config.rs index 060e371..7ee0454 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,9 @@ pub const DEFAULT_BLOCKLIST_REFRESH_SECS: u64 = 10; #[derive(Clone, Serialize, Deserialize)] pub struct Config { + /// KadcastID + pub kadcast_id: Option, + /// Public `SocketAddress` of the [Peer]. No domain name allowed /// /// This is the address where other peers can contact you. @@ -86,6 +89,7 @@ impl Default for Config { fn default() -> Self { Self { public_address: "127.0.0.1:9000".to_string(), + kadcast_id: None, listen_address: None, bootstrapping_nodes: vec![], auto_propagate: ENABLE_BROADCAST_PROPAGATION, diff --git a/src/encoding.md b/src/encoding.md new file mode 100644 index 0000000..caae212 --- /dev/null +++ b/src/encoding.md @@ -0,0 +1,101 @@ +# Structs Encoding Explanation + +This document explains how various structs are encoded and decoded in the provided Rust code. The document covers the following structs: + +1. [Message Struct](#1-message-struct) +2. [Header Struct](#2-header-struct) +3. [PeerEncodedInfo Struct](#3-peerencodedinfo-struct) +4. [NodePayload Struct](#4-nodepayload-struct) +5. [BroadcastPayload Struct](#5-broadcastpayload-struct) +6. [Marshallable Trait](#6-marshallable-trait) + +--- + +## 1. Message Struct + +**Purpose**: The `Message` struct represents various types of network messages, such as Ping, Pong, FindNodes, Nodes, and Broadcast. Each message has its header and associated payload. + +**Encoding**: + +| Field | Length (bytes) | Description | +|------------------|-----------------|-------------------------------------------------| +| Message Type | 1 | Type identifier for the message. | +| Header | Variable | Header of the message. | +| Payload | Variable | Payload data specific to the message type. | + +- The length of the Header and Payload fields depends on the message type. + +--- + +## 2. Header Struct + +**Purpose**: The `Header` struct represents the header of a network message. It includes information such as the sender's binary ID, sender port, network ID, and reserved bytes. + +**Encoding**: + +| Field | Length (bytes) | Description | +|------------------|-----------------|---------------------------------------| +| Binary ID | 32 | The binary ID of the sender. | +| Nonce | 8 | Nonce of the sender. | +| Sender Port | 2 | Port of the sender (Little Endian). | +| Network ID | 1 | Network ID. | +| Reserved Bytes | 2 | Reserved bytes. | + +--- + +## 3. PeerEncodedInfo Struct + +**Purpose**: The `PeerEncodedInfo` struct contains information about a peer, including their IP address, port, and binary ID. + +**Encoding**: + +| Field | Length (bytes) | Description | +|---------------|----------------|-----------------------------------| +| IP Info | Variable | The IP address (IPv4 or IPv6). | +| Port | 2 | Port of the peer (LE encoding) | +| Binary ID | 32 | The binary ID of the peer. | + +- The length of the IP Info field depends on whether it's IPv4 or IPv6. + +- IPv4 peers are identified with a leading `0` in the IP Info field. IPv6 peers do not have a leading `0`. + +--- + +## 4. NodePayload Struct + +**Purpose**: The `NodePayload` struct represents a collection of peer information. + +**Encoding**: + +| Field | Length (bytes) | Description | +|------------------|-----------------|-------------------------------------------------| +| Number of Peers | 2 | Number of peers in the payload. | +| Peer Info | Variable | Information about each peer (PeerEncodedInfo). | + +- The length of the Peer Info field depends on the number of peers in the payload. + +- The number of peers is prepended as a 2-byte length before the Peer Info field. + +--- + +## 5. BroadcastPayload Struct + +**Purpose**: The `BroadcastPayload` struct represents the payload of a broadcast message. It includes the message's height and the message content. + +**Encoding**: + +| Field | Length (bytes) | Description | +|------------------------|----------------|----------------------------------------| +| Height | 1 | Height of the message. | +| Message Content Length | 4 | Length of the message (Little Endian). | +| Message Content | Variable | The content of the message. | + +- The length of the Message Content field depends on the size of the message. + +- The length of the Message Content is prepended as a 4-byte length before the content. + +--- + +## 6. Marshallable Trait + +The `Marshallable` trait defines methods for encoding and decoding the structs into/from binary data. diff --git a/src/encoding.rs b/src/encoding.rs index ce0f4ca..f07a5cf 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -35,32 +35,34 @@ mod tests { #[test] fn test_encode_ping() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let a = Message::Ping(peer.to_header()); test_kadkast_marshal(a) } #[test] fn test_encode_pong() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let a = Message::Pong(peer.to_header()); test_kadkast_marshal(a) } #[test] fn test_encode_find_nodes() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; - let target = *PeerNode::generate("192.168.1.1:666")?.id().as_binary(); + let peer = PeerNode::generate("192.168.0.1:666", 0)?; + let target = + *PeerNode::generate("192.168.1.1:666", 0)?.id().as_binary(); let a = Message::FindNodes(peer.to_header(), target); test_kadkast_marshal(a) } #[test] fn test_encode_nodes() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let nodes = vec![ - PeerNode::generate("192.168.1.1:666")?, + PeerNode::generate("192.168.1.1:666", 0)?, PeerNode::generate( "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:666", + 0, )?, ] .iter() @@ -72,13 +74,13 @@ mod tests { #[test] fn test_encode_empty_nodes() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let a = Message::Nodes(peer.to_header(), NodePayload { peers: vec![] }); test_kadkast_marshal(a) } #[test] fn test_encode_broadcast() -> Result<()> { - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let a = Message::Broadcast( peer.to_header(), BroadcastPayload { diff --git a/src/encoding/header.rs b/src/encoding/header.rs index 678da5f..34d1013 100644 --- a/src/encoding/header.rs +++ b/src/encoding/header.rs @@ -13,6 +13,7 @@ use crate::{kbucket::BinaryID, K_ID_LEN_BYTES, K_NONCE_LEN}; pub struct Header { pub(crate) binary_id: BinaryID, pub(crate) sender_port: u16, + pub(crate) network_id: u8, pub(crate) reserved: [u8; 2], } @@ -30,6 +31,7 @@ impl Marshallable for Header { writer.write_all(self.binary_id.as_binary())?; writer.write_all(self.binary_id.nonce())?; writer.write_all(&self.sender_port.to_le_bytes())?; + writer.write_all(&[self.network_id])?; writer.write_all(&self.reserved)?; Ok(()) } @@ -50,12 +52,19 @@ impl Marshallable for Header { let mut port_buffer = [0; 2]; reader.read_exact(&mut port_buffer)?; let port = u16::from_le_bytes(port_buffer); + + let mut network_id = [0; 1]; + reader.read_exact(&mut network_id)?; + let network_id = network_id[0]; + let mut reserved = [0; 2]; reader.read_exact(&mut reserved)?; + Ok(Header { binary_id, sender_port: port, reserved, + network_id, }) } } diff --git a/src/handling.rs b/src/handling.rs index 8180d86..66a5162 100644 --- a/src/handling.rs +++ b/src/handling.rs @@ -97,6 +97,7 @@ impl MessageHandler { let remote_peer = PeerNode::from_socket( remote_peer_addr, *message.header().binary_id(), + message.header().network_id, ); match handler.handle_peer(remote_peer).await { @@ -114,6 +115,14 @@ impl MessageHandler { ); continue; } + Err(NodeInsertError::MismatchNetwork(n)) => { + error!( + "Unable to insert node - NETWORK MISMATCH {} - {}", + n.value().address(), + n.network_id, + ); + continue; + } }; handler.handle_message(message, remote_peer_addr).await; diff --git a/src/kbucket.rs b/src/kbucket.rs index f225b48..6f56820 100644 --- a/src/kbucket.rs +++ b/src/kbucket.rs @@ -36,6 +36,9 @@ impl Tree { &mut self, node: Node, ) -> Result, InsertError> { + if self.root().network_id != node.network_id { + return Err(NodeInsertError::MismatchNetwork(node)); + } match self.root.calculate_distance(&node) { None => Err(NodeInsertError::Invalid(node)), Some(height) => self.get_or_create_bucket(height).insert(node), @@ -99,7 +102,7 @@ impl Tree { ) -> impl Iterator { let max_buckets = (crate::K_ID_LEN_BYTES * 8) as BucketHeight; (0..max_buckets).filter(move |h| { - self.buckets.get(h).map_or_else(|| true, |b| b.is_idle()) + self.buckets.get(h).map_or_else(|| true, |b| b.has_idle()) }) } @@ -110,7 +113,7 @@ impl Tree { { self.buckets .iter() - .filter(|(_, bucket)| bucket.is_idle()) + .filter(|(_, bucket)| bucket.has_idle()) .map(|(&height, bucket)| (height, bucket.pick::())) } @@ -172,29 +175,24 @@ mod tests { use crate::tests::Result; #[test] fn test_buckets() -> Result<()> { - let root = PeerNode::generate("192.168.0.1:666")?; + let root = PeerNode::generate("192.168.0.1:666", 0)?; let mut config = BucketConfig::default(); config.node_evict_after = Duration::from_millis(5000); config.node_ttl = Duration::from_secs(60); let mut route_table = Tree::new(root, config); for i in 2..255 { - let res = route_table.insert(PeerNode::generate( - &format!("192.168.0.{}:666", i)[..], - )?); + let node = PeerNode::generate(format!("192.168.0.{}:666", i), 0)?; + let res = route_table.insert(node); match res { - Ok(_) => {} - Err(error) => match error { - NodeInsertError::Invalid(_) => { - assert!(false) - } - _ => {} - }, + Ok(_) | Err(NodeInsertError::Full(_)) => {} + _ => panic!("Node must be valid"), } } let res = route_table - .insert(PeerNode::generate("192.168.0.1:666")?) + .insert(PeerNode::generate("192.168.0.1:666", 0)?) .expect_err("this should be an error"); + assert!(if let NodeInsertError::Invalid(_) = res { true } else { diff --git a/src/kbucket/bucket.rs b/src/kbucket/bucket.rs index edd93a2..ff178a4 100644 --- a/src/kbucket/bucket.rs +++ b/src/kbucket/bucket.rs @@ -13,34 +13,45 @@ use super::BinaryKey; use crate::config::BucketConfig; use crate::K_K; +/// Represents a bucket for storing nodes in a Kademlia routing table. pub(super) struct Bucket { nodes: arrayvec::ArrayVec, K_K>, pending_node: Option>, bucket_config: BucketConfig, } +/// Enum representing the result of inserting a node into a bucket. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum NodeInsertOk<'a, TNode> { - Inserted { - inserted: &'a TNode, - }, + /// The node was successfully inserted into the bucket. + Inserted { inserted: &'a TNode }, + /// The node was updated within the bucket, and a potential eviction is + /// pending. Updated { updated: &'a TNode, pending_eviction: Option<&'a TNode>, }, + /// The insertion of the node is pending, and a potential eviction is + /// pending as well. Pending { pending_insert: &'a TNode, pending_eviction: Option<&'a TNode>, }, } +/// Enum representing possible errors when inserting a node into a bucket. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum NodeInsertError { + /// The node is considered invalid and cannot be inserted. Invalid(TNode), + /// The bucket is already full, and the node cannot be inserted. Full(TNode), + /// There is a mismatch with the network while inserting the node. + MismatchNetwork(TNode), } impl<'a, TNode> NodeInsertOk<'a, TNode> { + /// Returns an optional reference to the node pending eviction. pub fn pending_eviction(&self) -> Option<&TNode> { match self { NodeInsertOk::Inserted { inserted: _ } => None, @@ -55,9 +66,15 @@ impl<'a, TNode> NodeInsertOk<'a, TNode> { } } } + +/// Type alias for a successful node insertion result in a bucket. pub type InsertOk<'a, V> = NodeInsertOk<'a, Node>; + +/// Type alias for an error during node insertion into a bucket. pub type InsertError = NodeInsertError>; + impl Bucket { + /// Creates a new `Bucket` with the given configuration. pub(super) fn new(bucket_config: BucketConfig) -> Self { Bucket { nodes: ArrayVec::, K_K>::new(), @@ -66,8 +83,8 @@ impl Bucket { } } - //Refreshes the node's last usage time corresponding to the given key and - // return his ref + /// Refreshes the last usage time of a node based on the given key and + /// returns a reference to it. fn refresh_node(&mut self, key: &BinaryKey) -> Option<&Node> { let old_index = self.nodes.iter().position(|s| s.id().as_binary() == key)?; @@ -76,26 +93,30 @@ impl Bucket { self.nodes.last() } + /// Attempts to insert a previously marked pending node into the bucket. No + /// action is taken if the bucket is full. + /// + /// If the pending node is no longer alive, it is removed. fn insert_pending(&mut self) { if self.nodes.is_full() { return; }; if let Some(pending) = self.pending_node.take() { if pending.is_alive(self.bucket_config.node_ttl) { - //FIXME: use try_push instead of push - //FIXME2: we are breaking the LRU policy, maybe in - // the meanwhile other records have been updated. Btw - // it's mitigated with is_alive check + // FIXME: Consider using `try_push` instead of `push`. + // FIXME2: This may break the LRU policy, as other records may + // have been updated in the meantime. However, it is mitigated + // by the `is_alive` check. self.nodes.push(pending); } } } - /* - If the bucket is full, flag the least recent used for eviction. - If it's already flagged, check if timeout is expired and then replace with the pending node. - The method return the candidate for eviction (if any) - */ + /// If the bucket is full, flag the least recently used node for eviction. + /// If it's already flagged, check if the timeout has expired and then + /// replace it with the pending node. + /// + /// The method returns the candidate for eviction (if any). fn try_perform_eviction(&mut self) -> Option<&Node> { if !self.nodes.is_full() { return None; @@ -121,11 +142,12 @@ impl Bucket { } } + /// Tries to insert a node into the bucket and returns the result. pub fn insert( &mut self, node: Node, ) -> Result, InsertError> { - if !node.is_id_valid() { + if !node.id().verify_nonce() { return Err(NodeInsertError::Invalid(node)); } if self.refresh_node(node.id().as_binary()).is_some() { @@ -166,7 +188,7 @@ impl Bucket { } } - //pick at most `ITEM_COUNT` random nodes from this bucket + /// Picks at most `ITEM_COUNT` random nodes from this bucket. pub fn pick( &self, ) -> impl Iterator> { @@ -177,50 +199,62 @@ impl Bucket { .filter_map(move |idx| self.nodes.get(idx)) } - /* The method return the least recent used node to query if flagged for - * eviction */ + /// Returns the least recently used node to query if flagged for eviction. fn pending_eviction_node(&self) -> Option<&Node> { self.nodes .first() .filter(|n| n.eviction_status != NodeEvictionStatus::None) } + /// Returns an iterator over the peers in the bucket. pub(super) fn peers(&self) -> impl Iterator> { self.nodes.iter() } - pub(crate) fn is_idle(&self) -> bool { + /// Checks if the bucket has at least one idle node. + pub(crate) fn has_idle(&self) -> bool { self.nodes.first().map_or(false, |n| { n.seen_at.elapsed() > self.bucket_config.bucket_ttl }) } + /// Removes idle nodes from the bucket. pub(crate) fn remove_idle_nodes(&mut self) { let ttl = self.bucket_config.node_ttl; self.nodes.retain(|n| n.is_alive(ttl)); self.insert_pending(); } + /// Returns an iterator over the alive nodes in the bucket. pub(crate) fn alive_nodes(&self) -> impl Iterator> { let ttl = self.bucket_config.node_ttl; self.nodes.iter().filter(move |&n| n.is_alive(ttl)) } + /// Checks if the bucket contains a node with the given peer key. pub(crate) fn has_node(&self, peer: &BinaryKey) -> bool { self.nodes.iter().any(|n| n.id().as_binary() == peer) } + /// Checks if the bucket is full. pub(crate) fn is_full(&self) -> bool { self.nodes.is_full() } + /// Removes a node from the bucket by its ID. + /// + /// If there is an alive pending node, it's inserted. + /// + /// Returns the removed node. pub(crate) fn remove_id(&mut self, id: &[u8]) -> Option> { let node_idx = self.nodes.iter().position(|s| s.id().as_binary() == id)?; self.nodes.pop_at(node_idx).map(|removed| { if let Some(pending) = self.pending_node.take() { - self.nodes.push(pending); + if pending.is_alive(self.bucket_config.node_ttl) { + self.nodes.push(pending); + } } removed }) @@ -256,7 +290,7 @@ mod tests { #[test] fn test_lru_base_5secs() -> Result<()> { - let root = PeerNode::generate("127.0.0.1:666")?; + let root = PeerNode::generate("127.0.0.1:666", 0)?; let mut config = BucketConfig::default(); config.node_evict_after = Duration::from_millis(1000); config.node_ttl = Duration::from_secs(5); @@ -264,9 +298,9 @@ mod tests { let mut route_table = Tree::new(root, config); let bucket = route_table.bucket_for_test(); - let node1 = PeerNode::generate("192.168.1.1:8080")?; + let node1 = PeerNode::generate("192.168.1.1:8080", 0)?; let id_node1 = node1.id().as_binary().clone(); - let node1_copy = PeerNode::generate("192.168.1.1:8080")?; + let node1_copy = PeerNode::generate("192.168.1.1:8080", 0)?; match bucket.insert(node1).expect("This should return an ok()") { NodeInsertOk::Inserted { .. } => {} _ => assert!(false), @@ -282,7 +316,7 @@ mod tests { _ => assert!(false), } assert_eq!(Some(&id_node1), bucket.last_id()); - let node2 = PeerNode::generate("192.168.1.2:8080")?; + let node2 = PeerNode::generate("192.168.1.2:8080", 0)?; let id_node2 = node2.id().as_binary().clone(); match bucket.insert(node2).expect("This should return an ok()") { @@ -295,7 +329,7 @@ mod tests { assert_eq!(Some(&id_node1), bucket.least_used_id()); match bucket - .insert(PeerNode::generate("192.168.1.1:8080")?) + .insert(PeerNode::generate("192.168.1.1:8080", 0)?) .expect("This should return an ok()") { NodeInsertOk::Updated { .. } => {} @@ -313,6 +347,7 @@ mod tests { match bucket .insert(PeerNode::generate( &format!("192.168.1.{}:8080", i)[..], + 0, )?) .expect("This should return an ok()") { @@ -323,7 +358,7 @@ mod tests { } } assert_eq!(bucket.pick::().count(), K_BETA); - let pending = PeerNode::generate("192.168.1.21:8080")?; + let pending = PeerNode::generate("192.168.1.21:8080", 0)?; let pending_id = pending.id().as_binary().clone(); match bucket.insert(pending).expect_err("this should be error") { NodeInsertError::Full(pending) => { @@ -339,7 +374,8 @@ mod tests { &pending_id ); thread::sleep(Duration::from_secs(1)); - let pending = PeerNode::generate("192.168.1.21:8080")?; + let pending = + PeerNode::generate("192.168.1.21:8080", 0)?; match bucket.insert(pending).expect("this should be ok") { NodeInsertOk::Inserted { inserted: _ } => {} diff --git a/src/kbucket/key.rs b/src/kbucket/key.rs index 0f79bbf..f28f600 100644 --- a/src/kbucket/key.rs +++ b/src/kbucket/key.rs @@ -56,15 +56,19 @@ impl Marshallable for BinaryKey { } impl BinaryID { + /// Returns the binary key associated with this `BinaryID`. pub fn as_binary(&self) -> &BinaryKey { &self.bytes } + + /// Returns the binary nonce associated with this `BinaryID`. pub fn nonce(&self) -> &BinaryNonce { &self.nonce } - // Returns the 0-based kadcast distance between 2 ID - // `None` if they are identical + /// Calculates the 0-based Kadcast distance between two `BinaryID`s. + /// + /// Returns `None` if the two IDs are identical. pub fn calculate_distance( &self, other: &BinaryKey, @@ -80,8 +84,9 @@ impl BinaryID { .map(|(i, b)| BinaryID::msb(b).expect("to be Some") + (i << 3) - 1) } - // Returns the position of the most-significant bit set in a byte, - // `None` if no bit is set + /// Returns the position of the most significant bit set in a byte. + /// + /// Returns `None` if no bit is set. const fn msb(n: u8) -> Option { match u8::BITS - n.leading_zeros() { 0 => None, @@ -89,10 +94,16 @@ impl BinaryID { } } + /// Creates a new `BinaryID` by combining a `BinaryKey` and a `BinaryNonce`. pub(crate) fn from_nonce(id: BinaryKey, nonce: BinaryNonce) -> Self { - BinaryID { bytes: id, nonce } + Self { bytes: id, nonce } } + /// Generates a new `BinaryID` using the given `BinaryKey`. + /// + /// This function generates a unique identifier by repeatedly hashing the + /// provided `BinaryKey` with an incrementing nonce value until a + /// difficulty threshold is met. pub(crate) fn generate(id: BinaryKey) -> Self { let mut nonce: u32 = 0; let mut hasher = Blake2s256::new(); @@ -113,6 +124,8 @@ impl BinaryID { } } + /// Verifies that the nonce of the `BinaryID` meets the minimum difficulty + /// requirements. pub fn verify_nonce(&self) -> bool { let mut hasher = Blake2s256::new(); hasher.update(self.bytes); @@ -123,6 +136,8 @@ impl BinaryID { ) } + /// Verifies the difficulty of a binary value according to the specified + /// criteria. pub(crate) fn verify_difficulty<'a, I>( bytes: &mut I, difficulty: usize, @@ -169,12 +184,12 @@ mod tests { #[test] fn test_distance() -> Result<()> { - let n1 = PeerNode::generate("192.168.0.1:666")?; - let n2 = PeerNode::generate("192.168.0.1:666")?; + let n1 = PeerNode::generate("192.168.0.1:666", 0)?; + let n2 = PeerNode::generate("192.168.0.1:666", 0)?; assert_eq!(n1.calculate_distance(&n2), None); assert_eq!(n1.id().calculate_distance_native(n2.id()), None); for i in 2..255 { - let n_in = PeerNode::generate(&format!("192.168.0.{}:666", i)[..])?; + let n_in = PeerNode::generate(format!("192.168.0.{}:666", i), 0)?; assert_eq!( n1.calculate_distance(&n_in), n1.id().calculate_distance_native(n_in.id()) @@ -185,7 +200,7 @@ mod tests { #[test] fn test_id_nonce() -> Result<()> { - let root = PeerNode::generate("192.168.0.1:666")?; + let root = PeerNode::generate("192.168.0.1:666", 0)?; println!("Nonce is {:?}", root.id().nonce()); assert!(root.id().verify_nonce()); Ok(()) diff --git a/src/kbucket/node.rs b/src/kbucket/node.rs index b18437b..f2895f8 100644 --- a/src/kbucket/node.rs +++ b/src/kbucket/node.rs @@ -9,14 +9,18 @@ use std::time::{Duration, Instant}; use super::key::BinaryID; use super::BucketHeight; +/// A struct representing a node in the network with an associated ID, value, +/// and eviction status. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Node { id: BinaryID, value: TValue, pub(super) eviction_status: NodeEvictionStatus, pub(super) seen_at: Instant, + pub(crate) network_id: u8, } +/// An enumeration representing the eviction status of a node. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum NodeEvictionStatus { None, @@ -24,15 +28,19 @@ pub enum NodeEvictionStatus { } impl Node { - pub fn new(id: BinaryID, value: TValue) -> Self { + /// Creates a new node with the provided ID, value, and network ID. + pub fn new(id: BinaryID, value: TValue, network_id: u8) -> Self { Node { id, value, seen_at: Instant::now(), eviction_status: NodeEvictionStatus::None, + network_id, } } + /// Calculates the distance between this node and another node in the + /// network. pub fn calculate_distance( &self, other: &Node, @@ -40,29 +48,33 @@ impl Node { self.id.calculate_distance(other.id.as_binary()) } - //maybe we can move this outside of node impl, nonce must be verified when - // a node is deserialized IMHO - pub fn is_id_valid(&self) -> bool { - self.id.verify_nonce() - } - + /// Returns the ID of the node. pub fn id(&self) -> &BinaryID { &self.id } + /// Returns the value associated with the node. pub fn value(&self) -> &TValue { &self.value } + /// Refreshes the last seen time and clears the eviction status for the + /// node. pub(super) fn refresh(&mut self) { self.eviction_status = NodeEvictionStatus::None; self.seen_at = Instant::now(); } + /// Flags the node for eviction check by updating its eviction status. pub(super) fn flag_for_check(&mut self) { self.eviction_status = NodeEvictionStatus::Requested(Instant::now()); } + /// Checks if the node has been seen within a specified time duration. + /// + /// Returns `true` if the node's last seen time is within the given + /// `duration`, indicating that it is still active and reachable in the + /// network. pub(super) fn is_alive(&self, duration: Duration) -> bool { self.seen_at.elapsed() < duration } diff --git a/src/lib.rs b/src/lib.rs index 14e9065..0da68da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,8 +65,8 @@ pub struct Peer { blocklist: RwLock>, } -/// [NetworkListen] is notified each time a broadcasted -/// message is received from the network +/// The [NetworkListen] trait receives notifications whenever a broadcasted +/// message is received from the network. pub trait NetworkListen: Send { fn on_message(&self, message: Vec, metadata: MessageInfo); } @@ -81,8 +81,9 @@ impl Peer { config: Config, listener: L, ) -> Result { + let network_id = config.kadcast_id.unwrap_or_default(); let tree = Tree::new( - PeerNode::generate(&config.public_address[..])?, + PeerNode::generate(&config.public_address[..], network_id)?, config.bucket, ); @@ -246,6 +247,16 @@ impl Peer { .unwrap_or_else(|e| error!("Unable to send from send method {e}")); } + /// Blocks a network source and removes it from the routing table. + /// + /// # Arguments + /// + /// * `source` - The address of the network source to be blocked. + /// + /// This method blocks a network source by adding its address to the + /// blocklist and subsequently removes the corresponding peer from the + /// routing table. This action prevents further communication with the + /// blocked source. pub async fn block_source(&self, source: SocketAddr) { self.blocklist.write().await.insert(source); let binary_key = PeerNode::compute_id(&source.ip(), source.port()); diff --git a/src/peer.rs b/src/peer.rs index 36c8b3d..21d3548 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -27,18 +27,25 @@ impl PeerInfo { } impl PeerNode { - pub fn generate(address: &str) -> Result { - let address: SocketAddr = address.parse()?; + pub fn generate( + address: impl AsRef, + network_id: u8, + ) -> Result { + let address: SocketAddr = address.as_ref().parse()?; let info = PeerInfo { address }; let binary = PeerNode::compute_id(&info.address.ip(), info.address.port()); let id = BinaryID::generate(binary); - Ok(Node::new(id, info)) + Ok(Node::new(id, info, network_id)) } - pub fn from_socket(address: SocketAddr, id: BinaryID) -> Self { + pub fn from_socket( + address: SocketAddr, + id: BinaryID, + network_id: u8, + ) -> Self { let info = PeerInfo { address }; - Node::new(id, info) + Node::new(id, info, network_id) } pub(crate) fn verify_header(header: &Header, ip: &IpAddr) -> bool { @@ -64,6 +71,7 @@ impl PeerNode { binary_id: *self.id(), sender_port: self.value().address.port(), reserved: [0; 2], + network_id: self.network_id, } } @@ -86,13 +94,14 @@ mod tests { use crate::tests::Result; #[test] fn test_verify_header() -> Result<()> { - let wrong_header = PeerNode::generate("10.0.0.1:333")?.to_header(); + let wrong_header = PeerNode::generate("10.0.0.1:333", 0)?.to_header(); let wrong_header_sameport = - PeerNode::generate("10.0.0.1:666")?.to_header(); + PeerNode::generate("10.0.0.1:666", 0)?.to_header(); vec![ - PeerNode::generate("192.168.1.1:666")?, + PeerNode::generate("192.168.1.1:666", 0)?, PeerNode::generate( "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:666", + 0, )?, ] .iter() diff --git a/src/transport/encoding/raptorq.md b/src/transport/encoding/raptorq.md new file mode 100644 index 0000000..46dcee9 --- /dev/null +++ b/src/transport/encoding/raptorq.md @@ -0,0 +1,38 @@ +## ChunkedPayload Struct + +**Purpose**: The `ChunkedPayload` struct is used for encoding and decoding broadcast payloads using RaptorQ encoding. It represents a single chunk of a larger broadcast message. + +**Encoding**: + +- The encoded `ChunkedPayload` is constructed from a `BroadcastPayload`. +- The encoding of a `ChunkedPayload` consists of the following components: + - `UID` (Unique Identifier): A 32-byte hash of the broadcast payload, excluding the height field. + - `ObjectTransmissionInformation` (RaptorQ header): A 12-byte header specific to the RaptorQ encoding scheme. + - `Encoded Chunk`: The chunked and encoded data using RaptorQ. + +| Field | Length (bytes) | Description | +|-------------------|----------------|-----------------------------------| +| UID (Blake2s256) | 32 | Unique identifier for the chunked payload. | +| Transmission Info | 12 | Object Transmission Information (RaptorQ header). | +| Encoded Chunk | Variable | The RaptorQ encoded chunk of the payload. | + +**Decoding**: + +- When a `BroadcastPayload` is transformed into a `ChunkedPayload`, it checks if the payload length is at least `MIN_CHUNKED_SIZE`, which is the minimum required length to consider it a valid `ChunkedPayload`. If not, an error is raised. + +- The `ChunkedPayload` holds the following components: + - `UID`: The unique identifier for the chunk, extracted from the first 32 bytes of the `gossip_frame`. + - `ObjectTransmissionInformation`: The 12-byte RaptorQ header, parsed from the `gossip_frame` bytes. + - `Encoded Chunk`: The remaining bytes after UID and RaptorQ header, containing the encoded chunk. + +- `ChunkedPayload` is used in a cache to manage the decoding process for broadcast messages. It tracks the state of a broadcast message's chunk as either receiving or processed. + +- The cache stores the `UID` of the broadcast message as the key and the `CacheStatus` as the value, which tracks the state. + +- The `CacheStatus` can be in two states: + 1. **Receiving**: In this state, a RaptorQ decoder is initialized with the `ObjectTransmissionInformation`. The decoder processes incoming encoded chunks and attempts to decode them. If a chunk is successfully decoded, the message is checked for integrity, and if it's valid, it's stored as a fully processed message. If not, it's discarded. + 2. **Processed**: In this state, the message has been successfully decoded and processed. The `CacheStatus` holds a timestamp for when it was last processed, and if the message is expired based on the cache's TTL, it can be removed from the cache. + +- The cache is pruned periodically to remove expired messages. + +Please note that the actual RaptorQ encoding and decoding processes are carried out using the RaptorQ library and are not detailed here. The focus is on how the Rust code manages the chunked broadcast payloads in the context of RaptorQ encoding. \ No newline at end of file diff --git a/src/transport/encoding/raptorq.rs b/src/transport/encoding/raptorq.rs index 706af8d..380d378 100644 --- a/src/transport/encoding/raptorq.rs +++ b/src/transport/encoding/raptorq.rs @@ -120,7 +120,7 @@ mod tests { for i in 0..data.len() { data[i] = rand::Rng::gen(&mut rand::thread_rng()); } - let peer = PeerNode::generate("192.168.0.1:666")?; + let peer = PeerNode::generate("192.168.0.1:666", 0)?; let header = peer.to_header(); let payload = BroadcastPayload { height: 255, diff --git a/src/transport/encoding/raptorq/decoder.rs b/src/transport/encoding/raptorq/decoder.rs index 7950be2..3eb7f51 100644 --- a/src/transport/encoding/raptorq/decoder.rs +++ b/src/transport/encoding/raptorq/decoder.rs @@ -176,7 +176,7 @@ mod tests { #[test] fn test_expiring_cache() -> Result<()> { - let root = PeerNode::generate("192.168.0.1:666")?; + let root = PeerNode::generate("192.168.0.1:666", 0)?; let enc = RaptorQEncoder::configure(&RaptorQEncoder::default_configuration()); let mut conf = RaptorQDecoder::default_configuration(); diff --git a/tests/lib.rs b/tests/lib.rs index bb0f22e..cb4a6df 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -7,8 +7,9 @@ #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use std::net::{AddrParseError, SocketAddr, ToSocketAddrs}; + use std::ops::Range; use std::time::Duration; use kadcast::config::Config; @@ -54,17 +55,26 @@ mod tests { v }; let mut peers = HashMap::new(); - for i in 0..NODES { + + // Use a wrong network_id for the first bootstrapper + peers.insert( + 0, + create_peer(0, bootstraps.clone(), tx.clone(), Some(2))?, + ); + for i in 1..NODES { tokio::time::sleep(Duration::from_millis(500)).await; - peers.insert(i, create_peer(i, bootstraps.clone(), tx.clone())?); + peers.insert( + i, + create_peer(i, bootstraps.clone(), tx.clone(), Some(1))?, + ); } + tokio::time::sleep(Duration::from_millis(2000)).await; let mut data: Vec = vec![0; MESSAGE_SIZE]; for i in 0..data.len() { data[i] = rand::Rng::gen(&mut rand::thread_rng()); } - for i in 1..NODES { - // for (i, p) in peers.iter() { + for i in 0..NODES { info!("ROUTING TABLE PEER #{}", i); peers.get(&i).unwrap().report().await; info!("----------------------"); @@ -80,9 +90,23 @@ mod tests { .unwrap() .broadcast(&data, None) .await; - let res = - timeout(Duration::from_secs(WAIT_SEC), receive(rx, NODES - 1)) - .await; + let expected_message_broadcasted = NODES; + // Remove the invalid network id + let expected_message_sent = expected_message_broadcasted - 1; + // Remove the sender + let expected_message_received = expected_message_sent - 1; + + let start_expected_range = BASE_PORT + 1; // remove the first invalid node + let end_expected_range = + start_expected_range + expected_message_received; + + let expected_received_range = start_expected_range..end_expected_range; + + let res = timeout( + Duration::from_secs(WAIT_SEC), + receive(rx, expected_received_range), + ) + .await; assert!( res.is_ok(), "Not all nodes received the broadcasted message" @@ -92,30 +116,34 @@ mod tests { async fn receive( mut rx: mpsc::Receiver<(usize, (Vec, SocketAddr, u8))>, - expected: i32, + expected_from: Range, ) { - let mut missing = HashMap::new(); - for i in (BASE_PORT + 1)..(BASE_PORT + expected) { - missing.insert(i, i); + let mut missing = HashSet::new(); + info!("{expected_from:?}"); + for i in expected_from { + missing.insert(i); } + info!("{missing:?}"); let mut i = 0; while !missing.is_empty() { if let Some((receiver_port, message)) = rx.recv().await { i = i + 1; - missing.remove(&(receiver_port as i32)); + let removed = missing.remove(&(receiver_port as i32)); info!( - "RECEIVER PORT: {} - Message N° {} got from {:?}", - receiver_port, i, message.1 + "RECEIVER PORT: {} - Message N° {} got from {:?} - Left {} - Removed {:?}", + receiver_port, i, message.1, missing.len(), removed ); } } info!("Received All {} messages", i); + info!("{missing:?}"); } fn create_peer( i: i32, bootstrap: Vec, grpc_sender: mpsc::Sender<(usize, (Vec, SocketAddr, u8))>, + network_id: Option, ) -> core::result::Result { let port = BASE_PORT + i; let public_addr = format!("127.0.0.1:{}", port).to_string(); @@ -124,6 +152,7 @@ mod tests { receiver_port: port as usize, }; let mut conf = Config::default(); + conf.kadcast_id = network_id; conf.bootstrapping_nodes = bootstrap; conf.public_address = public_addr; Peer::new(conf, listener)