Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate message processing #151

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix raptorQ cache default config
- Fix ObjectTransmissionInformation deserialization
- Fix duplicate processing for messages with different RaptorQ configurations

### Changed

Expand Down
33 changes: 9 additions & 24 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ impl BroadcastPayload {
}
}
impl<'a> ChunkedPayload<'a> {
fn ray_id(&self) -> &[u8] {
&self.0.gossip_frame[0..RAY_ID_SIZE]
fn ray_id(&self) -> [u8; RAY_ID_SIZE] {
self.0.gossip_frame[0..RAY_ID_SIZE]
.try_into()
.expect("slice to be length 32")
}

fn transmission_info(
Expand All @@ -73,39 +75,22 @@ impl<'a> ChunkedPayload<'a> {
) -> Result<SafeObjectTransmissionInformation, TransmissionInformationError>
{
let slice = self.transmission_info_bytes();
let info = SafeObjectTransmissionInformation::try_from(slice)?;
let info = SafeObjectTransmissionInformation::try_from(&slice)?;
match info.inner.transfer_length() < max_udp_len {
true => Ok(info),
false => Err(TransmissionInformationError::TransferLengthExceeded),
}
}

fn transmission_info_bytes(&self) -> &[u8] {
&self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
fn transmission_info_bytes(&self) -> [u8; TRANSMISSION_INFO_SIZE] {
self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
.try_into()
.expect("slice to be length 12")
}

fn encoded_chunk(&self) -> &[u8] {
&self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..]
}

fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] {
let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE];

// Why do we need transmission info included into the header?
//
// Transmission info should be sent over a reliable channel, because
// it is critical to decode packets.
// Since it is sent over UDP alongside the encoded chunked bytes,
// corrupted transmission info can be received.
// If the corrupted info is part of the first received chunk, no
// message can ever be decoded.
//
// ** UPDATE:
// Since the correctness of an UDP packet is already guaranteed by OS
// checksum checks, Hashing has been removed in order to increase the
// decoding performance.
header.try_into().expect("slice to be length 44")
}
}

#[cfg(test)]
Expand Down
114 changes: 77 additions & 37 deletions src/transport/encoding/raptorq/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use raptorq::{Decoder as ExtDecoder, EncodingPacket};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

use super::{ChunkedPayload, CHUNKED_HEADER_SIZE};
use super::{ChunkedPayload, RAY_ID_SIZE, TRANSMISSION_INFO_SIZE};
use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
Expand All @@ -25,7 +25,7 @@ const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30);
const DEFAULT_MAX_UDP_LEN: u64 = 10 * 1_024 * 1_024;

pub struct RaptorQDecoder {
cache: BTreeMap<[u8; CHUNKED_HEADER_SIZE], CacheStatus>,
cache: BTreeMap<[u8; RAY_ID_SIZE], CacheStatus>,
last_pruned: Instant,
conf: RaptorQDecoderConf,
}
Expand Down Expand Up @@ -63,15 +63,27 @@ impl Configurable for RaptorQDecoder {
}
}

struct ReceivingInfo {
expire_on: Instant,
max_kad_height: u8,
}

struct DecoderInfo {
decoder: ExtDecoder,
max_kad_blocks: usize,
}

type DecoderLists = BTreeMap<[u8; TRANSMISSION_INFO_SIZE], DecoderInfo>;

enum CacheStatus {
Receiving(ExtDecoder, Instant, u8, usize),
Receiving(ReceivingInfo, DecoderLists),
Processed(Instant),
}

impl CacheStatus {
fn expired(&self) -> bool {
let expire_on = match self {
CacheStatus::Receiving(_, expire_on, _, _) => expire_on,
CacheStatus::Receiving(info, _) => &info.expire_on,
CacheStatus::Processed(expire_on) => expire_on,
};
expire_on < &Instant::now()
Expand All @@ -85,70 +97,98 @@ impl Decoder for RaptorQDecoder {
let chunked = ChunkedPayload::try_from(&payload)?;
let ray_id = chunked.ray_id();
let encode_info = chunked.transmission_info_bytes();
let chunked_header = chunked.header();

// Perform a `match` on the cache entry against the chunked header.
let status = match self.cache.entry(chunked_header) {
// Perform a `match` on the cache entry against the ray id.
let status = match self.cache.entry(ray_id) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => o.into_mut(),

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder with
// the received transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked.transmission_info(self.conf.max_udp_len);
match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(CacheStatus::Receiving(
ExtDecoder::new(safe_info.inner),
Instant::now() + self.conf.cache_ttl,
payload.height,
safe_info.max_blocks,
))
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Invalid transmission info {e:?}",),
));
}
}
let receiving_info = ReceivingInfo {
expire_on: Instant::now() + self.conf.cache_ttl,
max_kad_height: payload.height,
};
v.insert(CacheStatus::Receiving(
receiving_info,
BTreeMap::new(),
))
}
};

let decoded = match status {
// Avoid to repropagate already processed messages
CacheStatus::Processed(_) => None,
CacheStatus::Receiving(decoder, _, max_height, max_blocks) => {
CacheStatus::Receiving(recv, list) => {
// check right decoder according to the encoding info
let decoder_info = match list.entry(encode_info) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => {
o.into_mut()
}

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder
// with the received
// transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked
.transmission_info(self.conf.max_udp_len);

match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(DecoderInfo {
decoder: ExtDecoder::new(
safe_info.inner,
),
max_kad_blocks: safe_info.max_blocks,
})
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid transmission info {e:?}",
),
));
}
}
}
};

// Depending on Beta replication, we can receive chunks of
// the same message from multiple peers.
// Those peers can send with different broadcast height.
// If those heights differs, we should check the highest one
// in order to preserve the propagation
if payload.height > *max_height {
*max_height = payload.height;
if payload.height > recv.max_kad_height {
recv.max_kad_height = payload.height;
}

let packet =
EncodingPacket::deserialize(chunked.encoded_chunk());
if packet.payload_id().source_block_number() as usize
>= *max_blocks
>= decoder_info.max_kad_blocks
{
return Ok(None);
};

decoder
decoder_info
.decoder
.decode(packet)
// If decoded successfully, create the new
// BroadcastMessage
.and_then(|decoded| {
let payload = BroadcastPayload {
height: *max_height,
height: recv.max_kad_height,
gossip_frame: decoded,
};
// Perform integrity check
Expand All @@ -171,7 +211,7 @@ impl Decoder for RaptorQDecoder {
// to propagate already processed messages
.map(|decoded| {
self.cache.insert(
chunked_header,
ray_id,
CacheStatus::Processed(
Instant::now() + self.conf.cache_ttl,
),
Expand Down
14 changes: 7 additions & 7 deletions src/transport/encoding/raptorq/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* limitations under the License.
*/

use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;

use raptorq::ObjectTransmissionInformation;

Expand Down Expand Up @@ -60,7 +60,6 @@ pub(crate) struct SafeObjectTransmissionInformation {

#[derive(Debug, Clone)]
pub enum TransmissionInformationError {
InvalidSize,
SourceBlocksZero,
SymbolSizeZero,
SymbolSizeGreaterThanMTU,
Expand All @@ -70,12 +69,13 @@ pub enum TransmissionInformationError {
TooManySourceSymbols,
}

impl TryFrom<&[u8]> for SafeObjectTransmissionInformation {
impl TryFrom<&[u8; TRANSMISSION_INFO_SIZE]>
for SafeObjectTransmissionInformation
{
type Error = TransmissionInformationError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let value: &[u8; TRANSMISSION_INFO_SIZE] = value
.try_into()
.map_err(|_| TransmissionInformationError::InvalidSize)?;
fn try_from(
value: &[u8; TRANSMISSION_INFO_SIZE],
) -> Result<Self, Self::Error> {
let config = ObjectTransmissionInformation::deserialize(value);

if config.source_blocks() == 0 {
Expand Down