Skip to content

Commit

Permalink
Parametrize RaptorQ configuration
Browse files Browse the repository at this point in the history
Start to implement the scaffold:
- Split `Encoder` in two different traits (`Encoder`/`Decoder`)
- Add `Configurable` trait

Resolves #72
  • Loading branch information
herr-seppia committed Dec 17, 2021
1 parent a2ec7a3 commit 8d69d82
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kadcast"
version = "0.2.0"
version = "0.2.1"
authors = [
"herr-seppia <seppia@dusk.network>"
]
Expand Down
11 changes: 7 additions & 4 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::{error::Error, net::SocketAddr};
use std::{collections::HashMap, error::Error, net::SocketAddr};

use tokio::{
io,
Expand All @@ -16,7 +16,9 @@ use tracing::*;
use crate::{
encoding::{message::Message, Marshallable},
peer::PeerNode,
transport::encoding::{Encoder, RaptorQEncoder},
transport::encoding::{
Configurable, Decoder, Encoder, RaptorQDecoder, RaptorQEncoder,
},
MAX_DATAGRAM_SIZE,
};

Expand Down Expand Up @@ -47,7 +49,7 @@ impl WireNetwork {
inbound_channel_tx: Sender<MessageBeanIn>,
) -> io::Result<()> {
debug!("WireNetwork::listen_in started");
let mut decoder = RaptorQEncoder::new();
let mut decoder = RaptorQDecoder::configure(HashMap::new());
let socket = UdpSocket::bind(listen_address)
.await
.expect("Unable to bind address");
Expand Down Expand Up @@ -90,10 +92,11 @@ impl WireNetwork {
mut outbound_channel_rx: Receiver<MessageBeanOut>,
) -> io::Result<()> {
debug!("WireNetwork::listen_out started");
let encoder = RaptorQEncoder::configure(HashMap::new());
loop {
if let Some((message, to)) = outbound_channel_rx.recv().await {
trace!("< Message to send to ({:?}) - {:?} ", to, message);
for chunk in RaptorQEncoder::encode(message).iter() {
for chunk in encoder.encode(message).iter() {
let bytes = chunk.bytes();
for remote_addr in to.iter() {
let _ = WireNetwork::send(&bytes, remote_addr)
Expand Down
14 changes: 12 additions & 2 deletions src/transport/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@
mod plain_encoder;
mod raptorq_encoder;

use std::collections::HashMap;

pub(crate) use raptorq_encoder::RaptorQDecoder;
pub(crate) use raptorq_encoder::RaptorQEncoder;

use crate::encoding::message::Message;
pub(crate) trait Encoder {
fn encode(msg: Message) -> Vec<Message>;

pub(crate) trait Configurable {
fn configure(conf: HashMap<String, String>) -> Self;
}

pub(crate) trait Encoder: Configurable {
fn encode(&self, msg: Message) -> Vec<Message>;
}

pub(crate) trait Decoder: Configurable {
fn decode(&mut self, chunk: Message) -> Option<Message>;
}
14 changes: 12 additions & 2 deletions src/transport/encoding/plain_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,27 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::{collections::HashMap};

use crate::encoding::message::Message;

use super::Encoder;
use super::{Configurable, Decoder, Encoder};

pub(crate) struct PlainEncoder {}

impl Configurable for PlainEncoder {
fn configure(_: HashMap<String, String>) -> Self {
PlainEncoder {}
}
}

impl Encoder for PlainEncoder {
fn encode<'msg>(msg: Message) -> Vec<Message> {
fn encode<'msg>(&self, msg: Message) -> Vec<Message> {
vec![msg]
}
}

impl Decoder for PlainEncoder {
fn decode(&mut self, chunk: Message) -> Option<Message> {
if let Message::Broadcast(header, payload) = chunk {
Some(Message::Broadcast(header, payload))
Expand Down
24 changes: 17 additions & 7 deletions src/transport/encoding/raptorq_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::encoding::{
message::Message, payload::BroadcastPayload, Marshallable,
};

use super::Configurable;

const DEFAULT_REPAIR_PACKETS_PER_BLOCK: u32 = 15;
const MAX_CHUNK_SIZE: u16 = 1024;
const CACHE_DEFAULT_TTL_SECS: u64 = 60;
Expand All @@ -30,19 +32,26 @@ const CACHE_DEFAULT_TTL_DURATION: Duration =
const CACHE_PRUNED_EVERY_DURATION: Duration =
Duration::from_secs(CACHE_PRUNED_EVERY_SECS);

pub(crate) struct RaptorQEncoder {
pub(crate) struct RaptorQDecoder {
cache: HashMap<[u8; 32], CacheStatus>,
last_pruned: Instant,
}

impl RaptorQEncoder {
pub(crate) fn new() -> Self {
RaptorQEncoder {
impl Configurable for RaptorQDecoder {
fn configure(_: HashMap<String, String>) -> Self {
Self {
cache: HashMap::new(),
last_pruned: Instant::now(),
}
}
}

pub(crate) struct RaptorQEncoder {}

impl Configurable for RaptorQEncoder {
fn configure(_: HashMap<String, String>) -> Self {
Self {}
}
}
enum CacheStatus {
Receiving(Decoder, Instant),
Processed(Instant),
Expand Down Expand Up @@ -118,7 +127,7 @@ impl<'a> ChunkedPayload<'a> {
}

impl super::Encoder for RaptorQEncoder {
fn encode<'msg>(msg: Message) -> Vec<Message> {
fn encode<'msg>(&self, msg: Message) -> Vec<Message> {
if let Message::Broadcast(header, payload) = msg {
let uid = payload.generate_uid();
let encoder =
Expand All @@ -145,7 +154,8 @@ impl super::Encoder for RaptorQEncoder {
vec![msg]
}
}

}
impl super::Decoder for RaptorQDecoder {
fn decode(&mut self, message: Message) -> Option<Message> {
if let Message::Broadcast(header, payload) = message {
let chunked = ChunkedPayload(&payload);
Expand Down

0 comments on commit 8d69d82

Please sign in to comment.