From bf25ab11cf5871e787facb5293a8e526292bbebe Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 2 Oct 2024 16:49:43 +0200 Subject: [PATCH] Prevent duplicate message processing Handle cache to ensure that the same message is not processed twice if sent with different RaptorQ configurations --- CHANGELOG.md | 1 + src/transport/encoding/raptorq.rs | 33 ++----- src/transport/encoding/raptorq/decoder.rs | 114 +++++++++++++++------- src/transport/encoding/raptorq/safe.rs | 14 +-- 4 files changed, 94 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8117c44..32851f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix raptorQ cache default config - Fix ObjectTransmissionInformation deserialization +- Fix duplicate processing for messages with different RaptorQ configurations ### Changed diff --git a/src/transport/encoding/raptorq.rs b/src/transport/encoding/raptorq.rs index 8685aa8..e030fc0 100644 --- a/src/transport/encoding/raptorq.rs +++ b/src/transport/encoding/raptorq.rs @@ -63,8 +63,10 @@ impl BroadcastPayload { } } impl<'a> ChunkedPayload<'a> { - fn ray_id(&self) -> &[u8] { - &self.0.gossip_frame[0..RAY_ID_SIZE] + fn ray_id(&self) -> [u8; RAY_ID_SIZE] { + self.0.gossip_frame[0..RAY_ID_SIZE] + .try_into() + .expect("slice to be length 32") } fn transmission_info( @@ -73,39 +75,22 @@ impl<'a> ChunkedPayload<'a> { ) -> Result { let slice = self.transmission_info_bytes(); - let info = SafeObjectTransmissionInformation::try_from(slice)?; + let info = SafeObjectTransmissionInformation::try_from(&slice)?; match info.inner.transfer_length() < max_udp_len { true => Ok(info), false => Err(TransmissionInformationError::TransferLengthExceeded), } } - fn transmission_info_bytes(&self) -> &[u8] { - &self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)] + fn transmission_info_bytes(&self) -> [u8; TRANSMISSION_INFO_SIZE] { + self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)] + .try_into() + .expect("slice to be length 12") } fn encoded_chunk(&self) -> &[u8] { &self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..] } - - fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] { - let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE]; - - // Why do we need transmission info included into the header? - // - // Transmission info should be sent over a reliable channel, because - // it is critical to decode packets. - // Since it is sent over UDP alongside the encoded chunked bytes, - // corrupted transmission info can be received. - // If the corrupted info is part of the first received chunk, no - // message can ever be decoded. - // - // ** UPDATE: - // Since the correctness of an UDP packet is already guaranteed by OS - // checksum checks, Hashing has been removed in order to increase the - // decoding performance. - header.try_into().expect("slice to be length 44") - } } #[cfg(test)] diff --git a/src/transport/encoding/raptorq/decoder.rs b/src/transport/encoding/raptorq/decoder.rs index 4250861..a394489 100644 --- a/src/transport/encoding/raptorq/decoder.rs +++ b/src/transport/encoding/raptorq/decoder.rs @@ -13,7 +13,7 @@ use raptorq::{Decoder as ExtDecoder, EncodingPacket}; use serde::{Deserialize, Serialize}; use tracing::{debug, trace, warn}; -use super::{ChunkedPayload, CHUNKED_HEADER_SIZE}; +use super::{ChunkedPayload, RAY_ID_SIZE, TRANSMISSION_INFO_SIZE}; use crate::encoding::message::Message; use crate::encoding::payload::BroadcastPayload; use crate::transport::encoding::Configurable; @@ -25,7 +25,7 @@ const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30); const DEFAULT_MAX_UDP_LEN: u64 = 10 * 1_024 * 1_024; pub struct RaptorQDecoder { - cache: BTreeMap<[u8; CHUNKED_HEADER_SIZE], CacheStatus>, + cache: BTreeMap<[u8; RAY_ID_SIZE], CacheStatus>, last_pruned: Instant, conf: RaptorQDecoderConf, } @@ -63,15 +63,27 @@ impl Configurable for RaptorQDecoder { } } +struct ReceivingInfo { + expire_on: Instant, + max_kad_height: u8, +} + +struct DecoderInfo { + decoder: ExtDecoder, + max_kad_blocks: usize, +} + +type DecoderLists = BTreeMap<[u8; TRANSMISSION_INFO_SIZE], DecoderInfo>; + enum CacheStatus { - Receiving(ExtDecoder, Instant, u8, usize), + Receiving(ReceivingInfo, DecoderLists), Processed(Instant), } impl CacheStatus { fn expired(&self) -> bool { let expire_on = match self { - CacheStatus::Receiving(_, expire_on, _, _) => expire_on, + CacheStatus::Receiving(info, _) => &info.expire_on, CacheStatus::Processed(expire_on) => expire_on, }; expire_on < &Instant::now() @@ -85,10 +97,9 @@ impl Decoder for RaptorQDecoder { let chunked = ChunkedPayload::try_from(&payload)?; let ray_id = chunked.ray_id(); let encode_info = chunked.transmission_info_bytes(); - let chunked_header = chunked.header(); - // Perform a `match` on the cache entry against the chunked header. - let status = match self.cache.entry(chunked_header) { + // Perform a `match` on the cache entry against the ray id. + let status = match self.cache.entry(ray_id) { // Cache status exists: return it std::collections::btree_map::Entry::Occupied(o) => o.into_mut(), @@ -96,59 +107,88 @@ impl Decoder for RaptorQDecoder { // CacheStatus::Receiving status and binds a new Decoder with // the received transmission information std::collections::btree_map::Entry::Vacant(v) => { - let info = chunked.transmission_info(self.conf.max_udp_len); - match info { - Ok(safe_info) => { - debug!( - event = "Start decoding payload", - ray = hex::encode(ray_id), - encode_info = hex::encode(encode_info) - ); - - v.insert(CacheStatus::Receiving( - ExtDecoder::new(safe_info.inner), - Instant::now() + self.conf.cache_ttl, - payload.height, - safe_info.max_blocks, - )) - } - Err(e) => { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("Invalid transmission info {e:?}",), - )); - } - } + let receiving_info = ReceivingInfo { + expire_on: Instant::now() + self.conf.cache_ttl, + max_kad_height: payload.height, + }; + v.insert(CacheStatus::Receiving( + receiving_info, + BTreeMap::new(), + )) } }; let decoded = match status { // Avoid to repropagate already processed messages CacheStatus::Processed(_) => None, - CacheStatus::Receiving(decoder, _, max_height, max_blocks) => { + CacheStatus::Receiving(recv, list) => { + // check right decoder according to the encoding info + let decoder_info = match list.entry(encode_info) { + // Cache status exists: return it + std::collections::btree_map::Entry::Occupied(o) => { + o.into_mut() + } + + // Cache status not found: creates a new entry with + // CacheStatus::Receiving status and binds a new Decoder + // with the received + // transmission information + std::collections::btree_map::Entry::Vacant(v) => { + let info = chunked + .transmission_info(self.conf.max_udp_len); + + match info { + Ok(safe_info) => { + debug!( + event = "Start decoding payload", + ray = hex::encode(ray_id), + encode_info = hex::encode(encode_info) + ); + + v.insert(DecoderInfo { + decoder: ExtDecoder::new( + safe_info.inner, + ), + max_kad_blocks: safe_info.max_blocks, + }) + } + Err(e) => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Invalid transmission info {e:?}", + ), + )); + } + } + } + }; + // Depending on Beta replication, we can receive chunks of // the same message from multiple peers. // Those peers can send with different broadcast height. // If those heights differs, we should check the highest one // in order to preserve the propagation - if payload.height > *max_height { - *max_height = payload.height; + if payload.height > recv.max_kad_height { + recv.max_kad_height = payload.height; } + let packet = EncodingPacket::deserialize(chunked.encoded_chunk()); if packet.payload_id().source_block_number() as usize - >= *max_blocks + >= decoder_info.max_kad_blocks { return Ok(None); }; - decoder + decoder_info + .decoder .decode(packet) // If decoded successfully, create the new // BroadcastMessage .and_then(|decoded| { let payload = BroadcastPayload { - height: *max_height, + height: recv.max_kad_height, gossip_frame: decoded, }; // Perform integrity check @@ -171,7 +211,7 @@ impl Decoder for RaptorQDecoder { // to propagate already processed messages .map(|decoded| { self.cache.insert( - chunked_header, + ray_id, CacheStatus::Processed( Instant::now() + self.conf.cache_ttl, ), diff --git a/src/transport/encoding/raptorq/safe.rs b/src/transport/encoding/raptorq/safe.rs index 1536d99..7b07851 100644 --- a/src/transport/encoding/raptorq/safe.rs +++ b/src/transport/encoding/raptorq/safe.rs @@ -30,7 +30,7 @@ * limitations under the License. */ -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use raptorq::ObjectTransmissionInformation; @@ -60,7 +60,6 @@ pub(crate) struct SafeObjectTransmissionInformation { #[derive(Debug, Clone)] pub enum TransmissionInformationError { - InvalidSize, SourceBlocksZero, SymbolSizeZero, SymbolSizeGreaterThanMTU, @@ -70,12 +69,13 @@ pub enum TransmissionInformationError { TooManySourceSymbols, } -impl TryFrom<&[u8]> for SafeObjectTransmissionInformation { +impl TryFrom<&[u8; TRANSMISSION_INFO_SIZE]> + for SafeObjectTransmissionInformation +{ type Error = TransmissionInformationError; - fn try_from(value: &[u8]) -> Result { - let value: &[u8; TRANSMISSION_INFO_SIZE] = value - .try_into() - .map_err(|_| TransmissionInformationError::InvalidSize)?; + fn try_from( + value: &[u8; TRANSMISSION_INFO_SIZE], + ) -> Result { let config = ObjectTransmissionInformation::deserialize(value); if config.source_blocks() == 0 {