Skip to content

Commit

Permalink
Mix: Integrate cover traffic to mix service (#920)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee authored Nov 28, 2024
1 parent dc286a9 commit 63c472e
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 19 deletions.
7 changes: 7 additions & 0 deletions nodes/nomos-node/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ mix:
num_mix_layers: 1
temporal_processor:
max_delay_seconds: 5
cover_traffic:
epoch_duration:
secs: 432000
nanos: 0
slot_duration:
secs: 20
nanos: 0
membership:
- address: /ip4/127.0.0.1/udp/3001/quic-v1
public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
Expand Down
2 changes: 1 addition & 1 deletion nomos-mix/core/src/cover_traffic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ mod tests {
#[test]
fn test_ticket() {
generate_ticket(10u32.to_be_bytes(), 1123, 0);
for i in (0..1u32) {
for i in 0..1u32 {
let slots = select_slot(i.to_be_bytes(), 1234, 100, 21600, winning_probability(1));
println!("slots = {slots:?}");
}
Expand Down
25 changes: 22 additions & 3 deletions nomos-mix/core/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ where
M: MixMessage,
{
remote_nodes: Vec<Node<M::PublicKey>>,
local_node: Node<M::PublicKey>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -22,10 +23,20 @@ where
M: MixMessage,
M::PublicKey: PartialEq,
{
pub fn new(mut nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
nodes.retain(|node| node.public_key != local_public_key);
pub fn new(nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
let mut remote_nodes = Vec::with_capacity(nodes.len() - 1);
let mut local_node = None;
nodes.into_iter().for_each(|node| {
if node.public_key == local_public_key {
local_node = Some(node);
} else {
remote_nodes.push(node);
}
});

Self {
remote_nodes: nodes,
remote_nodes,
local_node: local_node.expect("Local node not found"),
}
}

Expand All @@ -36,4 +47,12 @@ where
) -> Vec<&Node<M::PublicKey>> {
self.remote_nodes.choose_multiple(rng, amount).collect()
}

pub fn local_node(&self) -> &Node<M::PublicKey> {
&self.local_node
}

pub fn size(&self) -> usize {
self.remote_nodes.len() + 1
}
}
4 changes: 4 additions & 0 deletions nomos-services/data-availability/tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ pub fn new_node(
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: mix_config.membership.clone(),
},
da_network: DaNetworkConfig {
Expand Down
107 changes: 92 additions & 15 deletions nomos-services/mix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use backends::MixBackend;
use futures::StreamExt;
use network::NetworkAdapter;
use nomos_core::wire;
use nomos_mix::membership::{Membership, Node};
use nomos_mix::message_blend::crypto::CryptographicProcessor;
use nomos_mix::message_blend::temporal::TemporalScheduler;
use nomos_mix::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings};
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
use nomos_mix::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
};
use nomos_mix::MixOutgoingMessage;
use nomos_mix_message::mock::MockMixMessage;
use nomos_mix::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::{Membership, Node},
};
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
Expand Down Expand Up @@ -127,12 +130,22 @@ where
ChaCha12Rng::from_entropy(),
);
let mut blend_messages = backend.listen_to_incoming_messages().blend(
mix_config.message_blend,
mix_config.message_blend.clone(),
membership.clone(),
temporal_scheduler,
ChaCha12Rng::from_entropy(),
);

// tier 3 cover traffic
let mut cover_traffic: CoverTraffic<_, _, MockMixMessage> = CoverTraffic::new(
mix_config.cover_traffic.cover_traffic_settings(
&membership,
&mix_config.message_blend.cryptographic_processor,
),
mix_config.cover_traffic.epoch_stream(),
mix_config.cover_traffic.slot_stream(),
);

// local messages, are bypassed and send immediately
let mut local_messages = service_state
.inbound_relay
Expand Down Expand Up @@ -162,23 +175,17 @@ where
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
},
_ => {
tracing::error!("unrecognized message from mix backend");
tracing::debug!("unrecognized message from mix backend");
}
}
}
}
}
Some(msg) = cover_traffic.next() => {
Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender);
}
Some(msg) = local_messages.next() => {
match cryptographic_processor.wrap_message(&msg) {
Ok(wrapped_message) => {
if let Err(e) = persistent_sender.send(wrapped_message) {
tracing::error!("Error sending message to persistent stream: {e}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
}
}
Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender);
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
Expand Down Expand Up @@ -214,18 +221,88 @@ where
}
}
}

fn wrap_and_send_to_persistent_transmission(
message: Vec<u8>,
cryptographic_processor: &mut CryptographicProcessor<ChaCha12Rng, MockMixMessage>,
persistent_sender: &mpsc::UnboundedSender<Vec<u8>>,
) {
match cryptographic_processor.wrap_message(&message) {
Ok(wrapped_message) => {
if let Err(e) = persistent_sender.send(wrapped_message) {
tracing::error!("Error sending message to persistent stream: {e}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
}
}
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixConfig<BackendSettings> {
pub backend: BackendSettings,
pub message_blend: MessageBlendSettings<MockMixMessage>,
pub persistent_transmission: PersistentTransmissionSettings,
pub cover_traffic: CoverTrafficExtSettings,
pub membership: Vec<
Node<<nomos_mix_message::mock::MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CoverTrafficExtSettings {
pub epoch_duration: Duration,
pub slot_duration: Duration,
}

impl CoverTrafficExtSettings {
fn cover_traffic_settings(
&self,
membership: &Membership<MockMixMessage>,
cryptographic_processor_settings: &CryptographicProcessorSettings<
<MockMixMessage as MixMessage>::PrivateKey,
>,
) -> CoverTrafficSettings {
CoverTrafficSettings {
node_id: membership.local_node().public_key,
number_of_hops: cryptographic_processor_settings.num_mix_layers,
slots_per_epoch: self.slots_per_epoch(),
network_size: membership.size(),
}
}

fn slots_per_epoch(&self) -> usize {
(self.epoch_duration.as_secs() as usize)
.checked_div(self.slot_duration.as_secs() as usize)
.expect("Invalid epoch & slot duration")
}

fn epoch_stream(
&self,
) -> futures::stream::Map<
futures::stream::Enumerate<IntervalStream>,
impl FnMut((usize, time::Instant)) -> usize,
> {
IntervalStream::new(time::interval(self.epoch_duration))
.enumerate()
.map(|(i, _)| i)
}

fn slot_stream(
&self,
) -> futures::stream::Map<
futures::stream::Enumerate<IntervalStream>,
impl FnMut((usize, time::Instant)) -> usize,
> {
let slots_per_epoch = self.slots_per_epoch();
IntervalStream::new(time::interval(self.slot_duration))
.enumerate()
.map(move |(i, _)| i % slots_per_epoch)
}
}

impl<BackendSettings> MixConfig<BackendSettings> {
fn membership(&self) -> Membership<MockMixMessage> {
// We use private key as a public key because the `MockMixMessage` doesn't differentiate between them.
Expand Down
4 changes: 4 additions & 0 deletions tests/src/nodes/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: config.mix_config.membership,
},
cryptarchia: CryptarchiaSettings {
Expand Down
4 changes: 4 additions & 0 deletions tests/src/nodes/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: config.mix_config.membership,
},
cryptarchia: CryptarchiaSettings {
Expand Down

0 comments on commit 63c472e

Please sign in to comment.