Skip to content

Commit

Permalink
Merge branch 'master' into my-codecov
Browse files Browse the repository at this point in the history
  • Loading branch information
romanzac committed Aug 30, 2024
2 parents ba4d918 + 0cb039d commit 9a56cc4
Show file tree
Hide file tree
Showing 27 changed files with 550 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"nomos-services/system-sig",
"nomos-services/data-availability/indexer",
"nomos-services/data-availability/network",
"nomos-services/data-availability/sampling",
"nomos-services/data-availability/verifier",
"nomos-services/data-availability/tests",
"nomos-da/full-replication",
Expand Down
2 changes: 2 additions & 0 deletions nodes/nomos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ futures = "0.3"
http = "0.2.9"
hex = "0.4.3"
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-api = { path = "../../nomos-services/api" }
nomos-log = { path = "../../nomos-services/log" }
Expand Down
13 changes: 10 additions & 3 deletions nodes/nomos-node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
// std
use std::{
net::{IpAddr, SocketAddr, ToSocketAddrs},
path::PathBuf,
};

use crate::NomosApiService;
// crates
use clap::{Parser, ValueEnum};
use color_eyre::eyre::{eyre, Result};
use cryptarchia_ledger::Coin;
use hex::FromHex;
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_libp2p::{ed25519::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tracing::Level;
// internal
use crate::NomosApiService;

#[derive(ValueEnum, Clone, Debug, Default)]
pub enum LoggerBackendType {
Expand Down Expand Up @@ -114,6 +119,8 @@ pub struct MetricsArgs {
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub da_network:
<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
}
Expand Down
26 changes: 16 additions & 10 deletions nodes/nomos-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use bytes::Bytes;
use color_eyre::eyre::Result;
use kzgrs_backend::dispersal::BlobInfo;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use serde::{de::DeserializeOwned, Serialize};
pub mod api;
mod config;
mod tx;

// std
// crates
use api::AxumBackend;
use bytes::Bytes;
use color_eyre::eyre::Result;
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::dispersal::BlobInfo;
use nomos_api::ApiService;
use nomos_core::da::blob::info::DispersedBlobInfo;
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
use nomos_core::{header::HeaderId, tx::Transaction, wire};
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
#[cfg(feature = "tracing")]
use nomos_log::Logger;
Expand All @@ -29,12 +33,13 @@ use nomos_storage::{
StorageService,
};
use nomos_system_sig::SystemSig;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use serde::{de::DeserializeOwned, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
// internal
pub use tx::Tx;

pub mod api;
mod config;
mod tx;

pub type NomosApiService =
ApiService<AxumBackend<(), DaBlob, BlobInfo, BlobInfo, KzgrsDaVerifier, Tx, Wire, MB16>>;

Expand Down Expand Up @@ -68,6 +73,7 @@ pub struct Nomos {
#[cfg(feature = "tracing")]
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
Expand Down
1 change: 1 addition & 0 deletions nodes/nomos-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fn main() -> Result<()> {
},
registry: registry.clone(),
},
da_network: config.da_network,
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
Expand Down
8 changes: 4 additions & 4 deletions nomos-cli/src/da/network/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use libp2p::{Multiaddr, PeerId};
use log::error;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::NetworkBackend;
use nomos_libp2p::{secp256k1, secret_key_serde};
use nomos_libp2p::{ed25519, secret_key_serde};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -54,8 +54,8 @@ pub struct ExecutorBackend<Membership> {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutorBackendSettings<Membership> {
// Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random.
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
pub node_key: secp256k1::SecretKey,
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
/// Membership of DA network PoV set
membership: Membership,
node_addrs: HashMap<PeerId, Multiaddr>,
Expand Down Expand Up @@ -94,7 +94,7 @@ where
let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel();

let keypair =
libp2p::identity::Keypair::from(secp256k1::Keypair::from(config.node_key.clone()));
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut executor_swarm =
ExecutorSwarm::new(keypair, config.membership, dispersal_events_sender);
let dispersal_request_sender = executor_swarm.blobs_sender();
Expand Down
4 changes: 2 additions & 2 deletions nomos-da/network/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
libp2p = { version = "0.53", features = ["macros"] }
libp2p = { version = "0.53", features = ["macros", "tokio", "quic"] }
libp2p-stream = "0.1.0-alpha"
futures = "0.3"
tracing = "0.1"
Expand All @@ -25,6 +25,6 @@ thiserror = "1.0"

[dev-dependencies]
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] }
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "tokio", "quic", "tcp", "yamux", "noise"] }
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "quic", "tcp", "yamux", "noise"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

25 changes: 25 additions & 0 deletions nomos-da/network/core/src/address_book.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use libp2p::{Multiaddr, PeerId};
use std::collections::HashMap;
use std::sync::Arc;

/// Store for known peer addresses
/// It is a simple wrapper around a `HashMap` at the moment.
/// But it should be abstracted here to keep addresses in sync among different libp2p protocols
#[derive(Clone, Debug)]
pub struct AddressBook(Arc<HashMap<PeerId, Multiaddr>>);

impl AddressBook {
pub fn empty() -> Self {
Self(Arc::new(HashMap::new()))
}

pub fn get_address(&self, peer_id: &PeerId) -> Option<&Multiaddr> {
self.0.get(peer_id)
}
}

impl FromIterator<(PeerId, Multiaddr)> for AddressBook {
fn from_iter<T: IntoIterator<Item = (PeerId, Multiaddr)>>(iter: T) -> Self {
Self(Arc::new(iter.into_iter().collect()))
}
}
5 changes: 3 additions & 2 deletions nomos-da/network/core/src/behaviour/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use libp2p::PeerId;
// crates
use libp2p::swarm::NetworkBehaviour;
// internal
use crate::address_book::AddressBook;
use crate::{
protocols::dispersal::validator::behaviour::DispersalValidatorBehaviour,
protocols::replication::behaviour::ReplicationBehaviour,
Expand Down Expand Up @@ -33,10 +34,10 @@ where
Membership: MembershipHandler + Clone + Send + 'static,
<Membership as MembershipHandler>::NetworkId: Send,
{
pub fn new(key: &Keypair, membership: Membership) -> Self {
pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self {
let peer_id = PeerId::from_public_key(&key.public());
Self {
sampling: SamplingBehaviour::new(peer_id, membership.clone()),
sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses),
dispersal: DispersalValidatorBehaviour::new(membership.clone()),
replication: ReplicationBehaviour::new(peer_id, membership),
}
Expand Down
1 change: 1 addition & 0 deletions nomos-da/network/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod address_book;
pub mod behaviour;
pub mod protocol;
pub mod protocols;
Expand Down
21 changes: 19 additions & 2 deletions nomos-da/network/core/src/protocols/sampling/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::{AsyncWriteExt, FutureExt, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::core::Endpoint;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
Expand All @@ -25,6 +26,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::error;
// internal
use crate::address_book::AddressBook;
use crate::protocol::SAMPLING_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId;
Expand Down Expand Up @@ -167,7 +169,10 @@ type IncomingStreamHandlerFuture = BoxFuture<'static, Result<SampleStream, Sampl
/// Executor sampling protocol
/// Takes care of sending and replying sampling requests
pub struct SamplingBehaviour<Membership: MembershipHandler> {
/// Self peer id
peer_id: PeerId,
/// Addresses of known peers in the DA network
addresses: AddressBook,
/// Underlying stream behaviour
stream_behaviour: libp2p_stream::Behaviour,
/// Incoming sample request streams
Expand Down Expand Up @@ -195,7 +200,7 @@ where
Membership: MembershipHandler + 'static,
Membership::NetworkId: Send,
{
pub fn new(peer_id: PeerId, membership: Membership) -> Self {
pub fn new(peer_id: PeerId, membership: Membership, addresses: AddressBook) -> Self {
let stream_behaviour = libp2p_stream::Behaviour::new();
let mut control = stream_behaviour.new_control();

Expand All @@ -213,6 +218,7 @@ where
let connected_peers = HashSet::new();
Self {
peer_id,
addresses,
stream_behaviour,
incoming_streams,
control,
Expand Down Expand Up @@ -639,7 +645,18 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
}
// Deal with connection as the underlying behaviour would do
match self.stream_behaviour.poll(cx) {
Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::Dial { mut opts }) => {
// attach known peer address if possible
if let Some(address) = opts
.get_peer_id()
.and_then(|peer_id: PeerId| self.addresses.get_address(&peer_id))
{
opts = DialOpts::peer_id(opts.get_peer_id().unwrap())
.addresses(vec![address.clone()])
.build();
}
Poll::Ready(ToSwarm::Dial { opts })
}
Poll::Pending => {
// TODO: probably must be smarter when to wake this
cx.waker().wake_by_ref();
Expand Down
20 changes: 15 additions & 5 deletions nomos-da/network/core/src/protocols/sampling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod behaviour;

#[cfg(test)]
mod test {
use crate::address_book::AddressBook;
use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent};
use crate::test_utils::AllNeighbours;
use crate::SubnetworkId;
Expand All @@ -22,6 +23,7 @@ mod test {
pub fn sampling_swarm(
key: Keypair,
membership: impl MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static,
addresses: AddressBook,
) -> Swarm<
SamplingBehaviour<impl MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static>,
> {
Expand All @@ -30,7 +32,11 @@ mod test {
.with_other_transport(|key| quic::tokio::Transport::new(quic::Config::new(key)))
.unwrap()
.with_behaviour(|key| {
SamplingBehaviour::new(PeerId::from_public_key(&key.public()), membership)
SamplingBehaviour::new(
PeerId::from_public_key(&key.public()),
membership,
addresses,
)
})
.unwrap()
.with_swarm_config(|cfg| {
Expand Down Expand Up @@ -66,8 +72,14 @@ mod test {
.unwrap()
.with_p2p(PeerId::from_public_key(&k2.public()))
.unwrap();
let mut p1 = sampling_swarm(k1.clone(), neighbours.clone());
let mut p2 = sampling_swarm(k2.clone(), neighbours);
let p1_addresses = vec![(PeerId::from_public_key(&k2.public()), p2_address.clone())];
let p2_addresses = vec![(PeerId::from_public_key(&k1.public()), p1_address.clone())];
let mut p1 = sampling_swarm(
k1.clone(),
neighbours.clone(),
p1_addresses.into_iter().collect(),
);
let mut p2 = sampling_swarm(k2.clone(), neighbours, p2_addresses.into_iter().collect());

let request_sender_1 = p1.behaviour().sample_request_channel();
let request_sender_2 = p2.behaviour().sample_request_channel();
Expand Down Expand Up @@ -136,13 +148,11 @@ mod test {
let t1 = tokio::spawn(async move {
p1.listen_on(p1_address).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
p1.dial(_p2_address).unwrap();
test_sampling_swarm(p1).await
});
let t2 = tokio::spawn(async move {
p2.listen_on(p2_address).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
p2.dial(_p1_address).unwrap();
test_sampling_swarm(p2).await
});
tokio::time::sleep(Duration::from_secs(2)).await;
Expand Down
17 changes: 13 additions & 4 deletions nomos-da/network/core/src/swarm/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use nomos_da_messages::replication::ReplicationReq;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
use crate::address_book::AddressBook;
use crate::behaviour::validator::{ValidatorBehaviour, ValidatorBehaviourEvent};
use crate::protocols::{
dispersal::validator::behaviour::DispersalEvent, replication::behaviour::ReplicationEvent,
Expand All @@ -35,15 +36,19 @@ impl<Membership> ValidatorSwarm<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + Clone + Send,
{
pub fn new(key: Keypair, membership: Membership) -> (Self, ValidatorEventsStream) {
pub fn new(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> (Self, ValidatorEventsStream) {
let (sampling_events_sender, sampling_events_receiver) = unbounded_channel();
let (validation_events_sender, validation_events_receiver) = unbounded_channel();

let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership),
swarm: Self::build_swarm(key, membership, addresses),
sampling_events_sender,
validation_events_sender,
},
Expand All @@ -53,11 +58,15 @@ where
},
)
}
fn build_swarm(key: Keypair, membership: Membership) -> Swarm<ValidatorBehaviour<Membership>> {
fn build_swarm(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> Swarm<ValidatorBehaviour<Membership>> {
SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_quic()
.with_behaviour(|key| ValidatorBehaviour::new(key, membership))
.with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses))
.expect("Validator behaviour should build")
.build()
}
Expand Down
Loading

0 comments on commit 9a56cc4

Please sign in to comment.