Skip to content

Commit

Permalink
Add the auto prune of expired items in RaptorQ cache
Browse files Browse the repository at this point in the history
Resolves #68
  • Loading branch information
herr-seppia committed Dec 15, 2021
1 parent 6921124 commit ad9cf36
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `send` method to public API [#58]
- Add metadata to `on_message` callback [#59]
- Add `listen_address` parameter [#69]
- Add the auto prune of expired items in RaptorQ cache [#68]

### Changed

Expand All @@ -38,4 +39,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#59]: https://github.com/dusk-network/kadcast/issues/59
[#60]: https://github.com/dusk-network/kadcast/issues/60
[#63]: https://github.com/dusk-network/kadcast/issues/63
[#68]: https://github.com/dusk-network/kadcast/issues/68
[#69]: https://github.com/dusk-network/kadcast/issues/69
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kadcast"
version = "0.1.1"
version = "0.2.0"
authors = [
"herr-seppia <seppia@dusk.network>"
]
Expand Down
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ const K_ALPHA: usize = 3;
// Redundacy factor for broadcast
const K_BETA: usize = 3;

const K_CHUNK_SIZE: u16 = 1024;

/// Default value while a node is considered alive (no eviction will be
/// requested)
pub const BUCKET_DEFAULT_NODE_TTL_MILLIS: u64 = 30000;
Expand Down
69 changes: 52 additions & 17 deletions src/transport/encoding/raptorq_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,59 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::{collections::HashMap, convert::TryInto};
use std::{
collections::HashMap,
convert::TryInto,
time::{Duration, Instant},
};

use blake2::{Blake2s, Digest};
use raptorq::{
Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation,
};
use tracing::warn;

use crate::{
encoding::{message::Message, payload::BroadcastPayload, Marshallable},
K_CHUNK_SIZE,
use crate::encoding::{
message::Message, payload::BroadcastPayload, Marshallable,
};

const DEFAULT_REPAIR_PACKETS_PER_BLOCK: u32 = 15;
const MAX_CHUNK_SIZE: u16 = 1024;
const CACHE_DEFAULT_TTL_SECS: u64 = 60;
const CACHE_PRUNED_EVERY_SECS: u64 = 60 * 5;

const CACHE_DEFAULT_TTL_DURATION: Duration =
Duration::from_secs(CACHE_DEFAULT_TTL_SECS);
const CACHE_PRUNED_EVERY_DURATION: Duration =
Duration::from_secs(CACHE_PRUNED_EVERY_SECS);

pub(crate) struct RaptorQEncoder {
cache: HashMap<[u8; 32], CacheStatus>,
last_pruned: Instant,
}

impl RaptorQEncoder {
pub(crate) fn new() -> Self {
RaptorQEncoder {
cache: HashMap::new(),
last_pruned: Instant::now(),
}
}
}
enum CacheStatus {
Receiving(Decoder),
Processed,
Receiving(Decoder, Instant),
Processed(Instant),
}

impl CacheStatus {
fn expired(&self) -> bool {
match self {
CacheStatus::Receiving(_, date) => date,
CacheStatus::Processed(date) => date,
}
.elapsed()
> CACHE_DEFAULT_TTL_DURATION
}
}

struct ChunkedPayload<'a>(&'a BroadcastPayload);
Expand Down Expand Up @@ -96,9 +122,9 @@ impl super::Encoder for RaptorQEncoder {
if let Message::Broadcast(header, payload) = msg {
let uid = payload.generate_uid();
let encoder =
Encoder::with_defaults(&payload.gossip_frame, K_CHUNK_SIZE);
Encoder::with_defaults(&payload.gossip_frame, MAX_CHUNK_SIZE);
encoder
.get_encoded_packets(15)
.get_encoded_packets(DEFAULT_REPAIR_PACKETS_PER_BLOCK)
.iter()
.map(|encoded_packet| {
let mut packet_with_uid = uid.to_vec();
Expand Down Expand Up @@ -134,16 +160,17 @@ impl super::Encoder for RaptorQEncoder {
// CacheStatus::Receiving status and binds a new Decoder with
// the received transmission information
std::collections::hash_map::Entry::Vacant(v) => {
v.insert(CacheStatus::Receiving(Decoder::new(
chunked.transmission_info(),
)))
v.insert(CacheStatus::Receiving(
Decoder::new(chunked.transmission_info()),
Instant::now(),
))
}
};

match status {
let decoded = match status {
// Avoid to repropagate already processed messages
CacheStatus::Processed => None,
CacheStatus::Receiving(decoder) => decoder
CacheStatus::Processed(_) => None,
CacheStatus::Receiving(decoder, _) => decoder
.decode(EncodingPacket::deserialize(
chunked.encoded_chunk(),
))
Expand All @@ -165,11 +192,19 @@ impl super::Encoder for RaptorQEncoder {
// If the message is succesfully decoded, update the cache
// with new status. This will drop useless Decoder and avoid
// to propagate already processed messages
.map(|a| {
self.cache.insert(uid, CacheStatus::Processed);
a
.map(|decoded| {
self.cache.insert(
uid,
CacheStatus::Processed(Instant::now()),
);
decoded
}),
};
// Every X time, prune dupemap cache
if self.last_pruned.elapsed() > CACHE_PRUNED_EVERY_DURATION {
self.cache.retain(|_, status| !status.expired())
}
decoded
} else {
Some(message)
}
Expand Down

0 comments on commit ad9cf36

Please sign in to comment.