Skip to content

Commit

Permalink
Merge pull request #151 from dusk-network/multiple_decoder
Browse files Browse the repository at this point in the history
Prevent duplicate message processing
  • Loading branch information
herr-seppia authored Oct 3, 2024
2 parents 99bb7b7 + bf25ab1 commit bf941da
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix raptorQ cache default config
- Fix ObjectTransmissionInformation deserialization
- Fix duplicate processing for messages with different RaptorQ configurations

### Changed

Expand Down
33 changes: 9 additions & 24 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ impl BroadcastPayload {
}
}
impl<'a> ChunkedPayload<'a> {
fn ray_id(&self) -> &[u8] {
&self.0.gossip_frame[0..RAY_ID_SIZE]
fn ray_id(&self) -> [u8; RAY_ID_SIZE] {
self.0.gossip_frame[0..RAY_ID_SIZE]
.try_into()
.expect("slice to be length 32")
}

fn transmission_info(
Expand All @@ -73,39 +75,22 @@ impl<'a> ChunkedPayload<'a> {
) -> Result<SafeObjectTransmissionInformation, TransmissionInformationError>
{
let slice = self.transmission_info_bytes();
let info = SafeObjectTransmissionInformation::try_from(slice)?;
let info = SafeObjectTransmissionInformation::try_from(&slice)?;
match info.inner.transfer_length() < max_udp_len {
true => Ok(info),
false => Err(TransmissionInformationError::TransferLengthExceeded),
}
}

fn transmission_info_bytes(&self) -> &[u8] {
&self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
fn transmission_info_bytes(&self) -> [u8; TRANSMISSION_INFO_SIZE] {
self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
.try_into()
.expect("slice to be length 12")
}

fn encoded_chunk(&self) -> &[u8] {
&self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..]
}

fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] {
let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE];

// Why do we need transmission info included into the header?
//
// Transmission info should be sent over a reliable channel, because
// it is critical to decode packets.
// Since it is sent over UDP alongside the encoded chunked bytes,
// corrupted transmission info can be received.
// If the corrupted info is part of the first received chunk, no
// message can ever be decoded.
//
// ** UPDATE:
// Since the correctness of an UDP packet is already guaranteed by OS
// checksum checks, Hashing has been removed in order to increase the
// decoding performance.
header.try_into().expect("slice to be length 44")
}
}

#[cfg(test)]
Expand Down
114 changes: 77 additions & 37 deletions src/transport/encoding/raptorq/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use raptorq::{Decoder as ExtDecoder, EncodingPacket};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

use super::{ChunkedPayload, CHUNKED_HEADER_SIZE};
use super::{ChunkedPayload, RAY_ID_SIZE, TRANSMISSION_INFO_SIZE};
use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
Expand All @@ -25,7 +25,7 @@ const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30);
const DEFAULT_MAX_UDP_LEN: u64 = 10 * 1_024 * 1_024;

pub struct RaptorQDecoder {
cache: BTreeMap<[u8; CHUNKED_HEADER_SIZE], CacheStatus>,
cache: BTreeMap<[u8; RAY_ID_SIZE], CacheStatus>,
last_pruned: Instant,
conf: RaptorQDecoderConf,
}
Expand Down Expand Up @@ -63,15 +63,27 @@ impl Configurable for RaptorQDecoder {
}
}

struct ReceivingInfo {
expire_on: Instant,
max_kad_height: u8,
}

struct DecoderInfo {
decoder: ExtDecoder,
max_kad_blocks: usize,
}

type DecoderLists = BTreeMap<[u8; TRANSMISSION_INFO_SIZE], DecoderInfo>;

enum CacheStatus {
Receiving(ExtDecoder, Instant, u8, usize),
Receiving(ReceivingInfo, DecoderLists),
Processed(Instant),
}

impl CacheStatus {
fn expired(&self) -> bool {
let expire_on = match self {
CacheStatus::Receiving(_, expire_on, _, _) => expire_on,
CacheStatus::Receiving(info, _) => &info.expire_on,
CacheStatus::Processed(expire_on) => expire_on,
};
expire_on < &Instant::now()
Expand All @@ -85,70 +97,98 @@ impl Decoder for RaptorQDecoder {
let chunked = ChunkedPayload::try_from(&payload)?;
let ray_id = chunked.ray_id();
let encode_info = chunked.transmission_info_bytes();
let chunked_header = chunked.header();

// Perform a `match` on the cache entry against the chunked header.
let status = match self.cache.entry(chunked_header) {
// Perform a `match` on the cache entry against the ray id.
let status = match self.cache.entry(ray_id) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => o.into_mut(),

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder with
// the received transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked.transmission_info(self.conf.max_udp_len);
match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(CacheStatus::Receiving(
ExtDecoder::new(safe_info.inner),
Instant::now() + self.conf.cache_ttl,
payload.height,
safe_info.max_blocks,
))
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Invalid transmission info {e:?}",),
));
}
}
let receiving_info = ReceivingInfo {
expire_on: Instant::now() + self.conf.cache_ttl,
max_kad_height: payload.height,
};
v.insert(CacheStatus::Receiving(
receiving_info,
BTreeMap::new(),
))
}
};

let decoded = match status {
// Avoid to repropagate already processed messages
CacheStatus::Processed(_) => None,
CacheStatus::Receiving(decoder, _, max_height, max_blocks) => {
CacheStatus::Receiving(recv, list) => {
// check right decoder according to the encoding info
let decoder_info = match list.entry(encode_info) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => {
o.into_mut()
}

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder
// with the received
// transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked
.transmission_info(self.conf.max_udp_len);

match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(DecoderInfo {
decoder: ExtDecoder::new(
safe_info.inner,
),
max_kad_blocks: safe_info.max_blocks,
})
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid transmission info {e:?}",
),
));
}
}
}
};

// Depending on Beta replication, we can receive chunks of
// the same message from multiple peers.
// Those peers can send with different broadcast height.
// If those heights differs, we should check the highest one
// in order to preserve the propagation
if payload.height > *max_height {
*max_height = payload.height;
if payload.height > recv.max_kad_height {
recv.max_kad_height = payload.height;
}

let packet =
EncodingPacket::deserialize(chunked.encoded_chunk());
if packet.payload_id().source_block_number() as usize
>= *max_blocks
>= decoder_info.max_kad_blocks
{
return Ok(None);
};

decoder
decoder_info
.decoder
.decode(packet)
// If decoded successfully, create the new
// BroadcastMessage
.and_then(|decoded| {
let payload = BroadcastPayload {
height: *max_height,
height: recv.max_kad_height,
gossip_frame: decoded,
};
// Perform integrity check
Expand All @@ -171,7 +211,7 @@ impl Decoder for RaptorQDecoder {
// to propagate already processed messages
.map(|decoded| {
self.cache.insert(
chunked_header,
ray_id,
CacheStatus::Processed(
Instant::now() + self.conf.cache_ttl,
),
Expand Down
14 changes: 7 additions & 7 deletions src/transport/encoding/raptorq/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* limitations under the License.
*/

use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;

use raptorq::ObjectTransmissionInformation;

Expand Down Expand Up @@ -60,7 +60,6 @@ pub(crate) struct SafeObjectTransmissionInformation {

#[derive(Debug, Clone)]
pub enum TransmissionInformationError {
InvalidSize,
SourceBlocksZero,
SymbolSizeZero,
SymbolSizeGreaterThanMTU,
Expand All @@ -70,12 +69,13 @@ pub enum TransmissionInformationError {
TooManySourceSymbols,
}

impl TryFrom<&[u8]> for SafeObjectTransmissionInformation {
impl TryFrom<&[u8; TRANSMISSION_INFO_SIZE]>
for SafeObjectTransmissionInformation
{
type Error = TransmissionInformationError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let value: &[u8; TRANSMISSION_INFO_SIZE] = value
.try_into()
.map_err(|_| TransmissionInformationError::InvalidSize)?;
fn try_from(
value: &[u8; TRANSMISSION_INFO_SIZE],
) -> Result<Self, Self::Error> {
let config = ObjectTransmissionInformation::deserialize(value);

if config.source_blocks() == 0 {
Expand Down

0 comments on commit bf941da

Please sign in to comment.