Skip to content

Commit

Permalink
internal rename of fields
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Sep 28, 2024
1 parent 59d9661 commit 3af988b
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 65 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Change the EncodedChunk UUID generation
- Change the EncodedChunk UUID generation (aka RaptorqHeader)
- Change `raptorq` dependency from `1.6` to `2.0`

## [0.6.1] - 2024-04-10
Expand Down
2 changes: 1 addition & 1 deletion src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ mod tests {
BroadcastPayload {
height: 10,
gossip_frame: vec![3, 5, 6, 7],
ray: vec![],
ray_id: vec![],
},
);
test_kadkast_marshal(a)
Expand Down
5 changes: 3 additions & 2 deletions src/encoding/payload/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::encoding::Marshallable;
pub(crate) struct BroadcastPayload {
pub(crate) height: u8,
pub(crate) gossip_frame: Vec<u8>,
pub(crate) ray: Vec<u8>,
pub(crate) ray_id: Vec<u8>,
}

impl Marshallable for BroadcastPayload {
Expand All @@ -34,7 +34,8 @@ impl Marshallable for BroadcastPayload {
Ok(BroadcastPayload {
height: height_buf[0],
gossip_frame,
ray: vec![],
ray_id: vec![], /* We don't serialize the ray_id because is up to
* the decoder. */
})
}
}
20 changes: 12 additions & 8 deletions src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{RwLock, K_K};
pub struct MessageInfo {
pub(crate) src: SocketAddr,
pub(crate) height: u8,
pub(crate) ray: Vec<u8>,
pub(crate) ray_id: Vec<u8>,
}

impl MessageInfo {
Expand All @@ -37,8 +37,8 @@ impl MessageInfo {
self.height
}
/// Returns the ray-id for this message (if any)
pub fn ray(&self) -> &[u8] {
&self.ray
pub fn ray_id(&self) -> &[u8] {
&self.ray_id
}
}

Expand Down Expand Up @@ -314,17 +314,21 @@ impl MessageHandler {
) {
let height = payload.height;
let gossip_frame = payload.gossip_frame;
let ray = payload.ray;
let ray_id = payload.ray_id;
debug!(
event = "handle broadcast",
height,
size = gossip_frame.len(),
ray = hex::encode(&ray)
ray = hex::encode(&ray_id)
);

// Aggregate message + metadata for lib client
let msg = gossip_frame.clone();
let md = MessageInfo { src, height, ray };
let md = MessageInfo {
src,
height,
ray_id,
};

// Notify lib client
self.listener_sender
Expand All @@ -347,8 +351,8 @@ impl MessageHandler {
let payload = BroadcastPayload {
height,
gossip_frame,
ray: vec![], /* ray will be set while sending
* according to the encoder */
ray_id: vec![], /* ray will be set while sending
* according to the encoder */
};
let msg = Message::Broadcast(self.my_header, payload);
let targets =
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ impl Peer {
BroadcastPayload {
height,
gossip_frame: message.to_vec(),
ray: vec![], /* ray will be set while sending
* according to the encoder */
ray_id: vec![], /* ray will be set while sending
* according to the encoder */
},
);
let targets =
Expand Down Expand Up @@ -251,7 +251,7 @@ impl Peer {
BroadcastPayload {
height: 0,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
ray: vec![],
ray_id: vec![],
},
);
let targets = vec![target];
Expand Down
18 changes: 9 additions & 9 deletions src/transport/encoding/raptorq.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@

- The encoded `ChunkedPayload` is constructed from a `BroadcastPayload`.
- The encoding of a `ChunkedPayload` consists of the following components:
- `UID` (Unique Identifier): A 32-byte hash of the broadcast payload, excluding the height field.
- `RAY_ID` (Unique Identifier): A 32-byte hash of the broadcast payload, excluding the height field.
- `ObjectTransmissionInformation` (RaptorQ header): A 12-byte header specific to the RaptorQ encoding scheme.
- `Encoded Chunk`: The chunked and encoded data using RaptorQ.

| Field | Length (bytes) | Description |
|-------------------|----------------|-----------------------------------|
| UID (Blake2s256) | 32 | Unique identifier for the chunked payload. |
| Transmission Info | 12 | Object Transmission Information (RaptorQ header). |
| Encoded Chunk | Variable | The RaptorQ encoded chunk of the payload. |
| Field | Length (bytes) | Description |
|---------------------|----------------|-----------------------------------|
| RAY_ID (Blake2s256) | 32 | Unique identifier for the chunked payload. |
| Transmission Info | 12 | Object Transmission Information (RaptorQ header). |
| Encoded Chunk | Variable | The RaptorQ encoded chunk of the payload. |

**Decoding**:

- When a `BroadcastPayload` is transformed into a `ChunkedPayload`, it checks if the payload length is at least `MIN_CHUNKED_SIZE`, which is the minimum required length to consider it a valid `ChunkedPayload`. If not, an error is raised.

- The `ChunkedPayload` holds the following components:
- `UID`: The unique identifier for the chunk, extracted from the first 32 bytes of the `gossip_frame`.
- `RAY_ID`: The unique identifier for the chunk, extracted from the first 32 bytes of the `gossip_frame`.
- `ObjectTransmissionInformation`: The 12-byte RaptorQ header, parsed from the `gossip_frame` bytes.
- `Encoded Chunk`: The remaining bytes after UID and RaptorQ header, containing the encoded chunk.
- `Encoded Chunk`: The remaining bytes after RAY_ID and RaptorQ header, containing the encoded chunk.

- `ChunkedPayload` is used in a cache to manage the decoding process for broadcast messages. It tracks the state of a broadcast message's chunk as either receiving or processed.

- The cache stores the `UID` of the broadcast message as the key and the `CacheStatus` as the value, which tracks the state.
- The cache stores the `RAY_ID`+`ObjectTransmissionInformation` (aka `ChunkedHeader`) of the broadcast message as the key and the `CacheStatus` as the value, which tracks the state.

- The `CacheStatus` can be in two states:
1. **Receiving**: In this state, a RaptorQ decoder is initialized with the `ObjectTransmissionInformation`. The decoder processes incoming encoded chunks and attempts to decode them. If a chunk is successfully decoded, the message is checked for integrity, and if it's valid, it's stored as a fully processed message. If not, it's discarded.
Expand Down
34 changes: 18 additions & 16 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ struct ChunkedPayload<'a>(&'a BroadcastPayload);
// ObjectTransmissionInformation Size (Raptorq header)
const TRANSMISSION_INFO_SIZE: usize = 12;

// UID Size (Blake2s256)
const UID_SIZE: usize = 32;
// RAY_ID Size (Blake2s256)
const RAY_ID_SIZE: usize = 32;

// CHUNKED_HEADER_SIZE Size
const CHUNKED_HEADER_SIZE: usize = RAY_ID_SIZE + TRANSMISSION_INFO_SIZE;

// EncodingPacket min size (RaptorQ packet)
const MIN_ENCODING_PACKET_SIZE: usize = 5;

const MIN_CHUNKED_SIZE: usize =
UID_SIZE + TRANSMISSION_INFO_SIZE + MIN_ENCODING_PACKET_SIZE;
const MIN_CHUNKED_SIZE: usize = CHUNKED_HEADER_SIZE + MIN_ENCODING_PACKET_SIZE;

impl<'a> TryFrom<&'a BroadcastPayload> for ChunkedPayload<'a> {
type Error = io::Error;
Expand All @@ -53,16 +55,16 @@ impl BroadcastPayload {
self.marshal_binary(&mut bytes)?;
Ok(bytes)
}
fn generate_uid(&self) -> io::Result<[u8; UID_SIZE]> {
fn generate_ray_id(&self) -> io::Result<[u8; RAY_ID_SIZE]> {
let mut hasher = Blake2s256::new();
// Remove the kadcast `height` field from the hash
hasher.update(&self.bytes()?[1..]);
Ok(hasher.finalize().into())
}
}
impl<'a> ChunkedPayload<'a> {
fn uid(&self) -> &[u8] {
&self.0.gossip_frame[0..UID_SIZE]
fn ray_id(&self) -> &[u8] {
&self.0.gossip_frame[0..RAY_ID_SIZE]
}

fn transmission_info(
Expand All @@ -79,17 +81,17 @@ impl<'a> ChunkedPayload<'a> {
}

fn transmission_info_bytes(&self) -> &[u8] {
&self.0.gossip_frame[UID_SIZE..(UID_SIZE + TRANSMISSION_INFO_SIZE)]
&self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
}

fn encoded_chunk(&self) -> &[u8] {
&self.0.gossip_frame[(UID_SIZE + TRANSMISSION_INFO_SIZE)..]
&self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..]
}

fn uid_with_info(&self) -> [u8; UID_SIZE + TRANSMISSION_INFO_SIZE] {
let uid = &self.0.gossip_frame[0..UID_SIZE + TRANSMISSION_INFO_SIZE];
fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] {
let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE];

// Why do we need transmission info?
// Why do we need transmission info included into the header?
//
// Transmission info should be sent over a reliable channel, because
// it is critical to decode packets.
Expand All @@ -102,7 +104,7 @@ impl<'a> ChunkedPayload<'a> {
// Since the correctness of an UDP packet is already guaranteed by OS
// checksum checks, Hashing has been removed in order to increase the
// decoding performance.
uid.try_into().expect("slice to be length 44")
header.try_into().expect("slice to be length 44")
}
}

Expand Down Expand Up @@ -136,7 +138,7 @@ mod tests {
let payload = BroadcastPayload {
height: 255,
gossip_frame: data,
ray: vec![],
ray_id: vec![],
};
println!("orig payload len {}", payload.bytes()?.len());
let message = Message::Broadcast(header, payload);
Expand Down Expand Up @@ -194,7 +196,7 @@ mod tests {
let payload = BroadcastPayload {
height: 255,
gossip_frame: data,
ray: vec![],
ray_id: vec![],
};
println!("orig payload len {}", payload.bytes()?.len());
let message = Message::Broadcast(header, payload);
Expand Down Expand Up @@ -223,7 +225,7 @@ mod tests {
BroadcastPayload {
height: 255,
gossip_frame,
ray: vec![],
ray_id: vec![],
},
);
if let Ok(Some(_)) = decoder.decode(msg) {
Expand Down
32 changes: 16 additions & 16 deletions src/transport/encoding/raptorq/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::collections::BTreeMap;
use std::convert::TryInto;
use std::convert::TryFrom;
use std::io;
use std::time::{Duration, Instant};

use raptorq::{Decoder as ExtDecoder, EncodingPacket};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

use super::{ChunkedPayload, TRANSMISSION_INFO_SIZE, UID_SIZE};
use super::{ChunkedPayload, CHUNKED_HEADER_SIZE};
use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
Expand All @@ -25,7 +25,7 @@ const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30);
const DEFAULT_MAX_UDP_LEN: u64 = 10 * 1_024 * 1_024;

pub struct RaptorQDecoder {
cache: BTreeMap<[u8; UID_SIZE + TRANSMISSION_INFO_SIZE], CacheStatus>,
cache: BTreeMap<[u8; CHUNKED_HEADER_SIZE], CacheStatus>,
last_pruned: Instant,
conf: RaptorQDecoderConf,
}
Expand Down Expand Up @@ -82,13 +82,13 @@ impl Decoder for RaptorQDecoder {
fn decode(&mut self, message: Message) -> io::Result<Option<Message>> {
if let Message::Broadcast(header, payload) = message {
trace!("> Decoding broadcast chunk");
let chunked: ChunkedPayload = (&payload).try_into()?;
let uid = chunked.uid();
let chunked = ChunkedPayload::try_from(&payload)?;
let ray_id = chunked.ray_id();
let encode_info = chunked.transmission_info_bytes();
let uid_with_info = chunked.uid_with_info();
let chunked_header = chunked.header();

// Perform a `match` on the cache entry against the uid.
let status = match self.cache.entry(uid_with_info) {
// Perform a `match` on the cache entry against the chunked header.
let status = match self.cache.entry(chunked_header) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => o.into_mut(),

Expand All @@ -101,7 +101,7 @@ impl Decoder for RaptorQDecoder {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(uid),
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

Expand Down Expand Up @@ -150,12 +150,12 @@ impl Decoder for RaptorQDecoder {
let payload = BroadcastPayload {
height: *max_height,
gossip_frame: decoded,
ray: uid.to_vec(),
ray_id: ray_id.to_vec(),
};
// Perform integrity check
match payload.generate_uid() {
match payload.generate_ray_id() {
// Compare received ID with the one generated
Ok(uid) if chunked.uid().eq(&uid) => {
Ok(ray_id) if chunked.ray_id().eq(&ray_id) => {
Some(Message::Broadcast(header, payload))
}
_ => {
Expand All @@ -170,7 +170,7 @@ impl Decoder for RaptorQDecoder {
// to propagate already processed messages
.map(|decoded| {
self.cache.insert(
uid_with_info,
chunked_header,
CacheStatus::Processed(
Instant::now() + self.conf.cache_ttl,
),
Expand Down Expand Up @@ -225,7 +225,7 @@ mod tests {
BroadcastPayload {
height: 0,
gossip_frame: vec![0],
ray: vec![],
ray_id: vec![],
},
))? {
dec.decode(n)?;
Expand All @@ -246,7 +246,7 @@ mod tests {
BroadcastPayload {
height: 0,
gossip_frame: vec![i],
ray: vec![],
ray_id: vec![],
},
))? {
dec.decode(n)?;
Expand All @@ -263,7 +263,7 @@ mod tests {
BroadcastPayload {
height: 0,
gossip_frame: vec![0],
ray: vec![],
ray_id: vec![],
},
))? {
dec.decode(n)?;
Expand Down
18 changes: 9 additions & 9 deletions src/transport/encoding/raptorq/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ impl Encoder for RaptorQEncoder {
ExtEncoder::with_defaults(&payload.gossip_frame, self.conf.mtu);
let transmission_info = encoder.get_config().serialize();

let uid = payload.generate_uid()?.to_vec();
let base_packet = [&uid[..], &transmission_info].concat();
let ray_id = payload.generate_ray_id()?;
let raptorq_header = [&ray_id[..], &transmission_info].concat();

debug!(
event = "Start encoding payload",
ray = hex::encode(&uid),
encode_info = hex::encode(&transmission_info)
ray = hex::encode(ray_id),
encode_info = hex::encode(transmission_info)
);

let mut repair_packets =
Expand All @@ -92,15 +92,15 @@ impl Encoder for RaptorQEncoder {
.get_encoded_packets(repair_packets)
.iter()
.map(|encoded_packet| {
let mut packet_with_uid = base_packet.clone();
let ray = uid.clone();
packet_with_uid.append(&mut encoded_packet.serialize());
let mut gossip_frame = raptorq_header.clone();
let ray_id = ray_id.to_vec();
gossip_frame.append(&mut encoded_packet.serialize());
Message::Broadcast(
header,
BroadcastPayload {
height: payload.height,
gossip_frame: packet_with_uid,
ray,
gossip_frame,
ray_id,
},
)
})
Expand Down

0 comments on commit 3af988b

Please sign in to comment.