Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: listen multiple UDP addresses #175

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ clap = { version = "4.4", features = ["derive", "env"] }
mockall = "0.12.1"
num_enum = "0.7.2"
convert-enum = "0.1.0"
sans-io-runtime = { version = "0.1.0", default-features = false }
sans-io-runtime = { version = "0.2", default-features = false }
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ cargo build --release
Running first seed node as a network structure collector:

```bash
cargo run -- --collector --local-tags demo --connect-tags demo --node-id 1 --bind-addr 127.0.0.1:10001 --web-addr 0.0.0.0:3000
cargo run -- --collector --local-tags demo --connect-tags demo --node-id 1 --udp-port 10001 --web-addr 0.0.0.0:3000
```

Running second nodes and join to network with seed node (you need to replace with seed node IP if it running on another device):

```bash
cargo run -- --local-tags demo --connect-tags mode --seeds 1@/ip4/127.0.0.1/udp/10001 --node-id 2 --bind-addr 127.0.0.1:10002
cargo run -- --local-tags demo --connect-tags mode --seeds 1@/ip4/127.0.0.1/udp/10001 --node-id 2 --udp-port 10002
```

Same with this, we can run more nodes and connect to the network. Remember change node-id and port for not conflict with other nodes.
Expand Down
1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rust-embed = { version = "8.2", optional = true }
futures-util = "0.3"
tracing-subscriber = "0.3"
serde_json = "1.0"
local-ip-address = "0.6"

[features]
default = ["embed"]
Expand Down
18 changes: 15 additions & 3 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
node_id: NodeId,

/// Listen address
#[arg(env, short, long, default_value = "127.0.0.1:10000")]
bind_addr: SocketAddr,
#[arg(env, short, long, default_value_t = 10000)]
udp_port: u16,

Check warning on line 63 in bin/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/main.rs#L62-L63

Added lines #L62 - L63 were not covered by tests

/// Address of node we should connect to
#[arg(env, short, long)]
Expand Down Expand Up @@ -248,7 +248,19 @@
let mut shutdown_wait = 0;
let args = Args::parse();
tracing_subscriber::fmt::init();
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, VisualNodeInfo>::new(args.node_id, &[args.bind_addr], args.custom_addrs);
let addrs = local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
.into_iter()
.filter(|(_, ip)| {
if ip.is_unspecified() || ip.is_multicast() {
false
} else {
std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
}
})
.map(|(_name, ip)| SocketAddr::new(ip, args.udp_port))
.collect::<Vec<_>>();
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, VisualNodeInfo>::new(args.node_id, &addrs, args.custom_addrs);

Check warning on line 263 in bin/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/main.rs#L251-L263

Added lines #L251 - L263 were not covered by tests

builder.set_authorization(StaticKeyAuthorization::new(&args.password));
builder.set_manual_discovery(args.local_tags, args.connect_tags);
Expand Down
2 changes: 1 addition & 1 deletion bin/start_agent.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cargo run -- --local-tags vpn --connect-tags vpn --seeds $1 --node-id $2 --bind-addr $3
cargo run -- --local-tags vpn --connect-tags vpn --seeds $1 --node-id $2 --udp-port $3
4 changes: 2 additions & 2 deletions bin/start_collector.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# If provided $3, it will be seeds
if [ -n "$4" ]; then
# $4 is defined
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --bind-addr $2 --web-addr $3 --seeds $4
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --udp-port $2 --web-addr $3 --seeds $4
else
# $4 is not defined
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --bind-addr $2 --web-addr $3
cargo run -- --collector --local-tags vpn --connect-tags vpn --node-id $1 --udp-port $2 --web-addr $3
fi
12 changes: 6 additions & 6 deletions packages/network/src/base/feature.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::net::SocketAddr;

use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId};
use atm0s_sdn_router::{shadow::ShadowRouter, RouteRule};
use sans_io_runtime::TaskSwitcherChild;

use crate::data_plane::NetPair;

use super::{Buffer, ConnectionCtx, ConnectionEvent, ServiceId, TransportMsgHeader, Ttl};

#[derive(Debug, Default, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -172,8 +172,8 @@ pub enum FeatureWorkerOutput<UserData, Control, Event, ToController> {
SendRoute(RouteRule, NetOutgoingMeta, Buffer),
RawDirect(ConnId, Buffer),
RawBroadcast(Vec<ConnId>, Buffer),
RawDirect2(SocketAddr, Buffer),
RawBroadcast2(Vec<SocketAddr>, Buffer),
RawDirect2(NetPair, Buffer),
RawBroadcast2(Vec<NetPair>, Buffer),
#[cfg(feature = "vpn")]
TunPkt(Buffer),
}
Expand Down Expand Up @@ -206,12 +206,12 @@ impl<UserData, Control, Event, ToController> FeatureWorkerOutput<UserData, Contr

pub struct FeatureWorkerContext {
pub node_id: NodeId,
pub router: ShadowRouter<SocketAddr>,
pub router: ShadowRouter<NetPair>,
}

pub trait FeatureWorker<UserData, SdkControl, SdkEvent, ToController, ToWorker>: TaskSwitcherChild<FeatureWorkerOutput<UserData, SdkControl, SdkEvent, ToController>> {
fn on_tick(&mut self, _ctx: &mut FeatureWorkerContext, _now: u64, _tick_count: u64) {}
fn on_network_raw(&mut self, ctx: &mut FeatureWorkerContext, now: u64, conn: ConnId, _remote: SocketAddr, header: TransportMsgHeader, mut buf: Buffer) {
fn on_network_raw(&mut self, ctx: &mut FeatureWorkerContext, now: u64, conn: ConnId, _pair: NetPair, header: TransportMsgHeader, mut buf: Buffer) {
let header_len = header.serialize_size();
buf.move_front_right(header_len).expect("Buffer should bigger or equal header");
self.on_input(ctx, now, FeatureWorkerInput::Network(conn, (&header).into(), buf));
Expand Down
6 changes: 3 additions & 3 deletions packages/network/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ mod msg;
mod secure;
mod service;

use std::net::SocketAddr;

use atm0s_sdn_identity::{ConnId, NodeId};
pub use control::*;
pub use feature::*;
Expand All @@ -14,11 +12,13 @@ pub use sans_io_runtime::Buffer;
pub use secure::*;
pub use service::*;

use crate::data_plane::NetPair;

#[derive(Debug, Clone)]
pub struct ConnectionCtx {
pub conn: ConnId,
pub node: NodeId,
pub remote: SocketAddr,
pub pair: NetPair,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
16 changes: 10 additions & 6 deletions packages/network/src/controller_plane.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, fmt::Debug, hash::Hash, sync::Arc};
use std::{collections::VecDeque, fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};

use atm0s_sdn_identity::NodeId;
use atm0s_sdn_router::shadow::ShadowRouterHistory;
Expand Down Expand Up @@ -46,6 +46,7 @@ enum TaskType {

pub struct ControllerPlaneCfg<UserData, SC, SE, TC, TW> {
pub session: u64,
pub bind_addrs: Vec<SocketAddr>,
#[allow(clippy::type_complexity)]
pub services: Vec<Arc<dyn ServiceBuilder<UserData, FeaturesControl, FeaturesEvent, SC, SE, TC, TW>>>,
pub authorization: Arc<dyn Authorization>,
Expand Down Expand Up @@ -89,7 +90,10 @@ where
tick_count: 0,
feature_ctx: FeatureContext { node_id, session: cfg.session },
service_ctx: ServiceCtx { node_id, session: cfg.session },
neighbours: TaskSwitcherBranch::new(NeighboursManager::new(node_id, cfg.authorization, cfg.handshake_builder, cfg.random), TaskType::Neighbours),
neighbours: TaskSwitcherBranch::new(
NeighboursManager::new(node_id, cfg.bind_addrs, cfg.authorization, cfg.handshake_builder, cfg.random),
TaskType::Neighbours,
),
features: TaskSwitcherBranch::new(FeatureManager::new(node_id, cfg.session, service_ids), TaskType::Feature),
services: TaskSwitcherBranch::new(ServiceManager::new(cfg.services), TaskType::Service),
switcher: TaskSwitcher::new(3), //3 types: Neighbours, Feature, Service
Expand Down Expand Up @@ -132,8 +136,8 @@ where
.input(&mut self.switcher)
.on_input(&self.service_ctx, now_ms, service, ServiceInput::Control(ServiceControlActor::Controller(userdata), control));
}
Input::Control(LogicControl::NetNeighbour(remote, control)) => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(remote, control));
Input::Control(LogicControl::NetNeighbour(pair, control)) => {
self.neighbours.input(&mut self.switcher).on_input(now_ms, neighbours::Input::Control(pair, control));
}
Input::Control(LogicControl::Feature(to)) => {
self.features
Expand Down Expand Up @@ -188,7 +192,7 @@ where
.input(&mut self.switcher)
.on_shared_input(&self.service_ctx, now_ms, ServiceSharedInput::Connection(event.clone()));
match event {
ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.remote, secure))),
ConnectionEvent::Connected(ctx, secure) => self.queue.push_back(Output::Event(LogicEvent::Pin(ctx.conn, ctx.node, ctx.pair, secure))),
ConnectionEvent::Stats(_ctx, _stats) => {}
ConnectionEvent::Disconnected(ctx) => self.queue.push_back(Output::Event(LogicEvent::UnPin(ctx.conn))),
}
Expand All @@ -214,7 +218,7 @@ where
FeatureOutput::SendDirect(conn, meta, buf) => {
log::debug!("[ControllerPlane] SendDirect to conn: {:?}, len: {}", conn, buf.len());
let conn_ctx = return_if_none!(self.neighbours.conn(conn));
self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.remote, conn, meta, buf)))
self.queue.push_back(Output::Event(LogicEvent::NetDirect(feature, conn_ctx.pair, conn, meta, buf)))
}
FeatureOutput::SendRoute(rule, ttl, buf) => {
log::debug!("[ControllerPlane] SendRoute to rule: {:?}, len: {}", rule, buf.len());
Expand Down
36 changes: 24 additions & 12 deletions packages/network/src/controller_plane/neighbours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
use atm0s_sdn_identity::{ConnId, NodeAddr, NodeId, Protocol};
use sans_io_runtime::TaskSwitcherChild;

use crate::base::{self, Authorization, ConnectionCtx, HandshakeBuilder, NeighboursControl, NeighboursControlCmds, SecureContext};
use crate::{
base::{self, Authorization, ConnectionCtx, HandshakeBuilder, NeighboursControl, NeighboursControlCmds, SecureContext},
data_plane::NetPair,
};

use self::connection::{ConnectionEvent, NeighbourConnection};

Expand All @@ -16,19 +19,20 @@
pub enum Input {
ConnectTo(NodeAddr),
DisconnectFrom(NodeId),
Control(SocketAddr, NeighboursControl),
Control(NetPair, NeighboursControl),
ShutdownRequest,
}

pub enum Output {
Control(SocketAddr, NeighboursControl),
Control(NetPair, NeighboursControl),
Event(base::ConnectionEvent),
ShutdownResponse,
}

pub struct NeighboursManager {
node_id: NodeId,
connections: HashMap<SocketAddr, NeighbourConnection>,
bind_addrs: Vec<SocketAddr>,
connections: HashMap<NetPair, NeighbourConnection>,
neighbours: HashMap<ConnId, ConnectionCtx>,
queue: VecDeque<Output>,
shutdown: bool,
Expand All @@ -38,9 +42,10 @@
}

impl NeighboursManager {
pub fn new(node_id: NodeId, authorization: Arc<dyn Authorization>, handshake_builder: Arc<dyn HandshakeBuilder>, random: Box<dyn rand::RngCore>) -> Self {
pub fn new(node_id: NodeId, bind_addrs: Vec<SocketAddr>, authorization: Arc<dyn Authorization>, handshake_builder: Arc<dyn HandshakeBuilder>, random: Box<dyn rand::RngCore>) -> Self {
Self {
node_id,
bind_addrs,
connections: HashMap::new(),
neighbours: HashMap::new(),
queue: VecDeque::new(),
Expand All @@ -66,14 +71,21 @@
Input::ConnectTo(addr) => {
let dest_node = addr.node_id();
let dests = get_node_addr_dests(addr);
for remote in dests {
if self.connections.contains_key(&remote) {
continue;
for local in &self.bind_addrs {
for remote in &dests {
if local.is_ipv4() != remote.is_ipv4() {
continue;

Check warning on line 77 in packages/network/src/controller_plane/neighbours.rs

View check run for this annotation

Codecov / codecov/patch

packages/network/src/controller_plane/neighbours.rs#L77

Added line #L77 was not covered by tests
}

let pair = NetPair::new(*local, *remote);
if self.connections.contains_key(&pair) {
continue;

Check warning on line 82 in packages/network/src/controller_plane/neighbours.rs

View check run for this annotation

Codecov / codecov/patch

packages/network/src/controller_plane/neighbours.rs#L82

Added line #L82 was not covered by tests
}
log::info!("[Neighbours] Sending connect request from {local} to {remote}, dest_node {dest_node}");
let session_id = self.random.next_u64();
let conn = NeighbourConnection::new_outgoing(self.handshake_builder.clone(), self.node_id, dest_node, session_id, pair, now_ms);
self.connections.insert(pair, conn);
}
log::info!("[Neighbours] Sending connect request to {}, dest_node {}", remote, dest_node);
let session_id = self.random.next_u64();
let conn = NeighbourConnection::new_outgoing(self.handshake_builder.clone(), self.node_id, dest_node, session_id, remote, now_ms);
self.connections.insert(remote, conn);
}
}
Input::DisconnectFrom(node) => {
Expand Down
Loading
Loading