Skip to content

Commit

Permalink
feat: add router dump
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Nov 20, 2024
1 parent 0114e62 commit ee505f3
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 27 deletions.
47 changes: 44 additions & 3 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::bool_assert_comparison)]

use atm0s_sdn::features::{router_sync, FeaturesEvent};
use atm0s_sdn::secure::StaticKeyAuthorization;
use atm0s_sdn::services::visualization;
use atm0s_sdn::{
Expand All @@ -14,6 +15,7 @@ use futures_util::{SinkExt, StreamExt};
use poem::endpoint::StaticFilesEndpoint;
#[cfg(feature = "embed")]
use poem::endpoint::{EmbeddedFileEndpoint, EmbeddedFilesEndpoint};
use poem::web::Json;
use poem::{
get, handler,
listener::TcpListener,
Expand All @@ -37,7 +39,8 @@ use std::{
},
time::Duration,
};
use tokio::sync::Mutex;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{oneshot, Mutex};

#[cfg(feature = "embed")]
#[derive(RustEmbed)]
Expand Down Expand Up @@ -238,6 +241,26 @@ fn ws(ws: WebSocket, ctx: Data<&Arc<Mutex<WebsocketCtx>>>) -> impl IntoResponse
})
}

#[handler]
async fn dump_router(ctx: Data<&UnboundedSender<oneshot::Sender<serde_json::Value>>>) -> impl IntoResponse {
let (tx, rx) = oneshot::channel();
ctx.0.send(tx).expect("should send");
match tokio::time::timeout(Duration::from_millis(1000), rx).await {
Ok(Ok(v)) => Json(serde_json::json!({
"status": true,
"data": v
})),
Ok(Err(e)) => Json(serde_json::json!({
"status": false,
"error": e.to_string()
})),
Err(_e) => Json(serde_json::json!({
"status": false,
"error": "timeout"
})),
}
}

#[tokio::main]
async fn main() {
if std::env::var_os("RUST_LOG").is_none() {
Expand Down Expand Up @@ -281,13 +304,14 @@ async fn main() {
BackendType::Polling => builder.build::<PollingBackend<SdnOwner, 128, 128>>(args.workers, node_info),
};

let (dump_tx, mut dump_rx) = unbounded_channel::<oneshot::Sender<serde_json::Value>>();
let ctx = Arc::new(Mutex::new(WebsocketCtx::new()));

if args.collector {
controller.service_control(visualization::SERVICE_ID.into(), (), visualization::Control::Subscribe);
let ctx_c = ctx.clone();
tokio::spawn(async move {
let route = Route::new().at("/ws", get(ws.data(ctx_c)));
let route = Route::new().at("/dump_router", get(dump_router).data(dump_tx)).at("/ws", get(ws.data(ctx_c)));

#[cfg(not(feature = "embed"))]
let route = route.nest("/", StaticFilesEndpoint::new("./public/").index_file("index.html"));
Expand All @@ -303,6 +327,7 @@ async fn main() {

let started_at = Instant::now();
let mut count = 0;
let mut wait_dump_router = vec![];
while controller.process().is_some() {
if term.load(Ordering::Relaxed) {
if shutdown_wait == 200 {
Expand All @@ -322,6 +347,11 @@ async fn main() {
}),
);
}

while let Ok(v) = dump_rx.try_recv() {
controller.feature_control((), router_sync::Control::DumpRouter.into());
wait_dump_router.push(v);
}
while let Some(event) = controller.pop_event() {
match event {
SdnExtOut::ServicesEvent(_service, (), event) => match event {
Expand All @@ -338,7 +368,18 @@ async fn main() {
ctx.lock().await.del_node(node);
}
},
SdnExtOut::FeaturesEvent(_, _) => {}
SdnExtOut::FeaturesEvent(_, event) => {
if let FeaturesEvent::RouterSync(event) = event {
match event {
router_sync::Event::DumpRouter(value) => {
let json = serde_json::to_value(value).expect("should convert json");
while let Some(v) = wait_dump_router.pop() {
let _ = v.send(json.clone());
}
}
}
}
}
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
Expand Down
6 changes: 3 additions & 3 deletions packages/core/router/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ mod registry;
mod router;
mod table;

pub use self::registry::{Registry, RegistryDelta, RegistryDestDelta, RegistrySync};
pub use self::router::{Router, RouterDelta, RouterSync};
pub use self::table::{DestDelta, Metric, Path, TableDelta, TableSync, BANDWIDTH_LIMIT};
pub use self::registry::{RegisterDestDump, RegisterDump, Registry, RegistryDelta, RegistryDestDelta, RegistrySync};
pub use self::router::{Router, RouterDelta, RouterDump, RouterSync};
pub use self::table::{DestDelta, DestDump, Metric, Path, TableDelta, TableDump, TableSync, BANDWIDTH_LIMIT};

#[derive(PartialEq, Debug)]
pub enum ServiceDestination {
Expand Down
26 changes: 25 additions & 1 deletion packages/core/router/src/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};

mod dest;

pub use self::dest::RegistryDestDelta;
pub use self::dest::{RegisterDestDump, RegistryDestDelta};

use super::{registry::dest::RegistryDest, Metric, Path, ServiceDestination};

Expand All @@ -21,6 +21,12 @@ pub enum RegistryDelta {
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct RegistrySync(pub Vec<(u8, Metric)>);

#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct RegisterDump {
local: Vec<u8>,
remotes: HashMap<u8, RegisterDestDump>,
}

pub struct Registry {
node_id: NodeId,
local_destinations: [bool; 256],
Expand All @@ -38,6 +44,24 @@ impl Registry {
}
}

pub fn dump(&self) -> RegisterDump {
let mut local = Vec::new();
let mut remotes = HashMap::new();

for i in 0..=255 {
if self.local_destinations[i as usize] {
local.push(i);
}

let dest: &RegistryDest = &self.remote_destinations[i as usize];
if !dest.is_empty() {
remotes.insert(i, dest.dump());
}
}

RegisterDump { local, remotes }
}

pub fn add_service(&mut self, service_id: u8) {
self.local_destinations[service_id as usize] = true;
self.deltas.push_back(RegistryDelta::SetServiceLocal(service_id));
Expand Down
16 changes: 15 additions & 1 deletion packages/core/router/src/core/registry/dest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};

use atm0s_sdn_identity::{ConnId, NodeId};
use serde::Serialize;

use super::{Metric, Path};

Expand All @@ -10,13 +11,26 @@ pub enum RegistryDestDelta {
DelServicePath(ConnId),
}

#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct RegisterDestDump {
next: Option<NodeId>,
paths: HashMap<NodeId, Metric>,
}

#[derive(Debug, Default)]
pub struct RegistryDest {
paths: Vec<Path>,
deltas: VecDeque<RegistryDestDelta>,
}

impl RegistryDest {
pub fn dump(&self) -> RegisterDestDump {
RegisterDestDump {
next: self.next(&[]).map(|p| p.1),
paths: self.paths.iter().map(|p| (p.1.over_node(), p.1.clone())).collect(),
}
}

pub fn set_path(&mut self, over: ConnId, metric: Metric) {
match self.index_of(over) {
Some(index) => {
Expand Down
19 changes: 17 additions & 2 deletions packages/core/router/src/core/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use serde::{Deserialize, Serialize};
use crate::core::{Metric, Path};
use crate::core::{Registry, RegistrySync};

use super::registry::RegistryDelta;
use super::table::{NodeIndex, Table, TableDelta, TableSync};
use super::registry::{RegisterDump, RegistryDelta};
use super::table::{NodeIndex, Table, TableDelta, TableDump, TableSync};
use super::ServiceDestination;

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -20,6 +20,13 @@ pub type Layer = u8;
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct RouterSync(pub RegistrySync, pub [Option<TableSync>; 4]);

#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct RouterDump {
node_id: NodeId,
services: RegisterDump,
layers: [TableDump; 4],
}

pub struct Router {
node_id: NodeId,
tables: [Table; 4],
Expand All @@ -37,6 +44,14 @@ impl Router {
}
}

pub fn dump(&self) -> RouterDump {
RouterDump {
node_id: self.node_id,
services: self.service_registry.dump(),
layers: [self.tables[0].dump(), self.tables[1].dump(), self.tables[2].dump(), self.tables[3].dump()],
}
}

pub fn node_id(&self) -> NodeId {
self.node_id
}
Expand Down
15 changes: 14 additions & 1 deletion packages/core/router/src/core/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, VecDeque};
use atm0s_sdn_identity::{ConnId, NodeId, NodeIdType};
use serde::{Deserialize, Serialize};

pub use dest::{Dest, DestDelta};
pub use dest::{Dest, DestDelta, DestDump};
pub use metric::{Metric, BANDWIDTH_LIMIT};
pub use path::Path;

Expand All @@ -20,6 +20,12 @@ pub struct TableDelta(pub u8, pub DestDelta);
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct TableSync(pub Vec<(u8, Metric)>);

#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct TableDump {
layer: u8,
dests: HashMap<u8, DestDump>,
}

pub struct Table {
node_id: NodeId,
layer: u8,
Expand All @@ -39,6 +45,13 @@ impl Table {
}
}

pub fn dump(&self) -> TableDump {
TableDump {
layer: self.layer,
dests: HashMap::from_iter(self.dests.iter().enumerate().filter(|d| !d.1.is_empty()).map(|d| (d.0 as u8, d.1.dump()))),
}
}

#[allow(unused)]
pub fn slots(&self) -> Vec<u8> {
self.slots.clone()
Expand Down
10 changes: 9 additions & 1 deletion packages/core/router/src/core/table/dest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};

use atm0s_sdn_identity::{ConnId, NodeId};
use serde::Serialize;

use super::{Metric, Path};

Expand All @@ -10,13 +11,20 @@ pub enum DestDelta {
DelBestPath,
}

#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
pub struct DestDump(HashMap<NodeId, Metric>);

#[derive(Debug, Default)]
pub struct Dest {
paths: Vec<Path>,
deltas: VecDeque<DestDelta>,
}

impl Dest {
pub fn dump(&self) -> DestDump {
DestDump(self.paths.iter().map(|p| (p.1.over_node(), p.1.clone())).collect())
}

pub fn set_path(&mut self, over: ConnId, metric: Metric) {
let pre_best_conn = self.paths.first().map(|p| p.0);
match self.index_of(over) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl NeighbourConnection {
handshake: request_buf,
},
));
log::info!("[NeighbourConnection] Resend connect request to {}, dest_node {}", self.pair, self.node);
log::debug!("[NeighbourConnection] Resend connect request to {}, dest_node {}", self.pair, self.node);
} else {
log::warn!("[NeighbourConnection] Cannot create handshake for resending connect request to {}, dest_node {}", self.pair, self.node);
}
Expand Down
44 changes: 30 additions & 14 deletions packages/network/src/features/router_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};

use atm0s_sdn_identity::{ConnId, NodeId};
use atm0s_sdn_router::{
core::{DestDelta, Metric, RegistryDelta, RegistryDestDelta, Router, RouterDelta, RouterSync, TableDelta},
core::{DestDelta, Metric, RegistryDelta, RegistryDestDelta, Router, RouterDelta, RouterDump, RouterSync, TableDelta},
shadow::ShadowRouterDelta,
};
use derivative::Derivative;
Expand All @@ -19,8 +19,15 @@ pub const FEATURE_NAME: &str = "router_sync";
const INIT_RTT_MS: u16 = 1000;
const INIT_BW: u32 = 100_000_000;

pub type Control = ();
pub type Event = ();
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Control {
DumpRouter,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
DumpRouter(RouterDump),
}

pub type ToWorker = ShadowRouterDelta<NetPair>;
pub type ToController = ();
Expand Down Expand Up @@ -101,20 +108,29 @@ impl<UserData> Feature<UserData, Control, Event, ToController, ToWorker> for Rou
}

fn on_input(&mut self, _ctx: &FeatureContext, _now_ms: u64, input: FeatureInput<'_, UserData, Control, ToController>) {
if let FeatureInput::Net(ctx, meta, buf) = input {
if !meta.secure {
log::warn!("[RouterSync] reject unsecure message");
return;
}
if let Some((_node, _remote, metric)) = self.conns.get(&ctx.conn) {
if let Ok(sync) = bincode::deserialize::<RouterSync>(&buf) {
self.router.apply_sync(ctx.conn, metric.clone(), sync);
match input {
FeatureInput::FromWorker(_) => {}
FeatureInput::Control(actor, control) => match control {
Control::DumpRouter => {
self.queue.push_back(FeatureOutput::Event(actor, Event::DumpRouter(self.router.dump())));
}
},
FeatureInput::Net(ctx, meta, buf) => {
if !meta.secure {
log::warn!("[RouterSync] reject unsecure message");
return;
}
if let Some((_node, _remote, metric)) = self.conns.get(&ctx.conn) {
if let Ok(sync) = bincode::deserialize::<RouterSync>(&buf) {
self.router.apply_sync(ctx.conn, metric.clone(), sync);
} else {
log::warn!("[RouterSync] Receive invalid sync from {}", ctx.pair);
}
} else {
log::warn!("[RouterSync] Receive invalid sync from {}", ctx.pair);
log::warn!("[RouterSync] Receive sync from unknown connection {}", ctx.pair);
}
} else {
log::warn!("[RouterSync] Receive sync from unknown connection {}", ctx.pair);
}
FeatureInput::Local(..) => {}
}
}

Expand Down

0 comments on commit ee505f3

Please sign in to comment.