diff --git a/crates/interledger-ccp/src/lib.rs b/crates/interledger-ccp/src/lib.rs index 731cb89c2..47d8a9bea 100644 --- a/crates/interledger-ccp/src/lib.rs +++ b/crates/interledger-ccp/src/lib.rs @@ -23,6 +23,7 @@ mod server; #[cfg(test)] mod test_helpers; +pub use packet::{Mode, RouteControlRequest}; pub use server::{CcpRouteManager, CcpRouteManagerBuilder}; use serde::{Deserialize, Serialize}; @@ -69,7 +70,7 @@ impl ToString for RoutingRelation { } } -/// DefineCcpAccountethods Account types need to be used by the CCP Service +/// Define CcpAccount methods and Account types that need to be used by the CCP Service pub trait CcpRoutingAccount: Account { /// The type of relationship we have with this account fn routing_relation(&self) -> RoutingRelation; diff --git a/crates/interledger-ccp/src/packet.rs b/crates/interledger-ccp/src/packet.rs index 0f575681c..95bf6a8a8 100644 --- a/crates/interledger-ccp/src/packet.rs +++ b/crates/interledger-ccp/src/packet.rs @@ -62,11 +62,11 @@ impl TryFrom for Mode { #[derive(Clone, PartialEq)] pub struct RouteControlRequest { - pub(crate) mode: Mode, + pub mode: Mode, // TODO change debug to format this as hex - pub(crate) last_known_routing_table_id: [u8; 16], - pub(crate) last_known_epoch: u32, - pub(crate) features: Vec, + pub last_known_routing_table_id: [u8; 16], + pub last_known_epoch: u32, + pub features: Vec, } impl Debug for RouteControlRequest { diff --git a/crates/interledger-ccp/src/server.rs b/crates/interledger-ccp/src/server.rs index 9398c1ebe..6a19c1336 100644 --- a/crates/interledger-ccp/src/server.rs +++ b/crates/interledger-ccp/src/server.rs @@ -17,7 +17,8 @@ use futures::{ use interledger_packet::PrepareBuilder; use interledger_packet::{Address, ErrorCode, Fulfill, Reject, RejectBuilder}; use interledger_service::{ - Account, BoxedIlpFuture, IncomingRequest, IncomingService, OutgoingRequest, OutgoingService, + Account, AddressStore, BoxedIlpFuture, IncomingRequest, IncomingService, OutgoingRequest, + OutgoingService, }; #[cfg(test)] use lazy_static::lazy_static; @@ -68,7 +69,7 @@ impl CcpRouteManagerBuilder where I: IncomingService + Clone + Send + Sync + 'static, O: OutgoingService + Clone + Send + Sync + 'static, - S: RouteManagerStore + Clone + Send + Sync + 'static, + S: AddressStore + RouteManagerStore + Clone + Send + Sync + 'static, A: CcpRoutingAccount + Send + Sync + 'static, { pub fn new(ilp_address: Address, store: S, outgoing: O, next_incoming: I) -> Self { @@ -102,8 +103,8 @@ where pub fn to_service(&self) -> CcpRouteManager { #[allow(clippy::let_and_return)] let service = CcpRouteManager { - ilp_address: self.ilp_address.clone(), - global_prefix: self.global_prefix.clone(), + ilp_address: Arc::new(RwLock::new(self.ilp_address.clone())), + global_prefix: Arc::new(RwLock::new(self.global_prefix.clone())), next_incoming: self.next_incoming.clone(), outgoing: self.outgoing.clone(), store: self.store.clone(), @@ -132,8 +133,8 @@ where /// received from peers. #[derive(Clone)] pub struct CcpRouteManager { - ilp_address: Address, - global_prefix: Bytes, + ilp_address: Arc>, + global_prefix: Arc>, /// The next request handler that will be used both to pass on requests that are not CCP messages. next_incoming: I, /// The outgoing request handler that will be used to send outgoing CCP messages. @@ -162,7 +163,7 @@ impl CcpRouteManager where I: IncomingService + Clone + Send + Sync + 'static, O: OutgoingService + Clone + Send + Sync + 'static, - S: RouteManagerStore + Clone + Send + Sync + 'static, + S: AddressStore + RouteManagerStore + Clone + Send + Sync + 'static, A: CcpRoutingAccount + Send + Sync + 'static, { /// Returns a future that will trigger this service to update its routes and broadcast @@ -173,6 +174,8 @@ where Interval::new(Instant::now(), Duration::from_millis(interval)) .map_err(|err| error!("Interval error, no longer sending route updates: {:?}", err)) .for_each(move |_| { + // ensure we have the latest ILP Address from the store + clone.update_ilp_address(); clone.broadcast_routes().then(|_| { // Returning an error would end the broadcast loop // so we want to return Ok even if there was an error @@ -182,6 +185,17 @@ where }) } + fn update_ilp_address(&self) { + let ilp_address = self.store.get_ilp_address(); + *self.global_prefix.write() = ilp_address + .to_bytes() + .iter() + .position(|c| c == &b'.') + .map(|index| ilp_address.to_bytes().slice_to(index + 1)) + .unwrap_or_else(|| ilp_address.to_bytes().clone()); + *self.ilp_address.write() = ilp_address; + } + pub fn broadcast_routes(&self) -> impl Future { let clone = self.clone(); self.update_best_routes(None) @@ -213,7 +227,7 @@ where return Either::A(err(RejectBuilder { code: ErrorCode::F00_BAD_REQUEST, message: b"We are not configured to send routes to you, sorry", - triggered_by: Some(&self.ilp_address), + triggered_by: Some(&self.ilp_address.read()), data: &[], } .build())); @@ -224,7 +238,7 @@ where return Either::A(err(RejectBuilder { code: ErrorCode::F00_BAD_REQUEST, message: b"Invalid route control request", - triggered_by: Some(&self.ilp_address), + triggered_by: Some(&self.ilp_address.read()), data: &[], } .build())); @@ -252,7 +266,7 @@ where #[cfg(test)] { - let ilp_address = self.ilp_address.clone(); + let ilp_address = self.ilp_address.read().clone(); return Either::B(Either::A( self.send_route_update(request.from.clone(), from_epoch_index, to_epoch_index) .map_err(move |_| { @@ -295,16 +309,16 @@ where .new_routes .into_iter() .filter(|route| { - if !route.prefix.starts_with(&self.global_prefix) { + if !route.prefix.starts_with(&self.global_prefix.read()) { warn!("Got route for a different global prefix: {:?}", route); false - } else if route.prefix.len() <= self.global_prefix.len() { + } else if route.prefix.len() <= self.global_prefix.read().len() { warn!("Got route broadcast for the global prefix: {:?}", route); false - } else if route.prefix.starts_with(self.ilp_address.as_ref()) { + } else if route.prefix.starts_with(self.ilp_address.read().as_ref()) { trace!("Ignoring route broadcast for a prefix that starts with our own address: {:?}", route); false - } else if route.path.contains(self.ilp_address.as_ref()) { + } else if route.path.contains(self.ilp_address.read().as_ref()) { trace!( "Ignoring route broadcast for a route that includes us: {:?}", route @@ -328,7 +342,7 @@ where return Box::new(err(RejectBuilder { code: ErrorCode::F00_BAD_REQUEST, message: b"Your route broadcasts are not accepted here", - triggered_by: Some(&self.ilp_address), + triggered_by: Some(&self.ilp_address.read()), data: &[], } .build())); @@ -339,7 +353,7 @@ where return Box::new(err(RejectBuilder { code: ErrorCode::F00_BAD_REQUEST, message: b"Invalid route update request", - triggered_by: Some(&self.ilp_address), + triggered_by: Some(&self.ilp_address.read()), data: &[], } .build())); @@ -401,7 +415,7 @@ where code: ErrorCode::T00_INTERNAL_ERROR, message: b"Error processing route update", data: &[], - triggered_by: Some(&ilp_address), + triggered_by: Some(&ilp_address.read()), } .build() }) @@ -415,7 +429,7 @@ where code: ErrorCode::F00_BAD_REQUEST, message: &message.as_bytes(), data: &[], - triggered_by: Some(&self.ilp_address), + triggered_by: Some(&self.ilp_address.read()), } .build(); let table = &incoming_tables[&request.from.id()]; @@ -489,8 +503,8 @@ where let forwarding_table = self.forwarding_table.clone(); let forwarding_table_updates = self.forwarding_table_updates.clone(); let incoming_tables = self.incoming_tables.clone(); - let ilp_address = self.ilp_address.clone(); - let global_prefix = self.global_prefix.clone(); + let ilp_address = self.ilp_address.read().clone(); + let global_prefix = self.global_prefix.read().clone(); let mut store = self.store.clone(); self.store.get_local_and_configured_routes().and_then( @@ -557,7 +571,7 @@ where // Don't advertise routes that don't start with the global prefix if route.prefix.starts_with(&global_prefix[..]) // Don't advertise the global prefix - && route.prefix != global_prefix + && route.prefix != *global_prefix // Don't advertise completely local routes because advertising our own // prefix will make sure we get packets sent to them && !(route.prefix.starts_with(ilp_address.as_ref()) && route.path.is_empty()) @@ -715,7 +729,7 @@ where current_epoch_index, new_routes: new_routes.clone(), withdrawn_routes: withdrawn_routes.clone(), - speaker: self.ilp_address.clone(), + speaker: self.ilp_address.read().clone(), hold_down_time: DEFAULT_ROUTE_EXPIRY_TIME, } } @@ -835,7 +849,7 @@ impl IncomingService for CcpRouteManager where I: IncomingService + Clone + Send + Sync + 'static, O: OutgoingService + Clone + Send + Sync + 'static, - S: RouteManagerStore + Clone + Send + Sync + 'static, + S: AddressStore + RouteManagerStore + Clone + Send + Sync + 'static, A: CcpRoutingAccount + Send + Sync + 'static, { type Future = BoxedIlpFuture; @@ -1240,7 +1254,7 @@ mod handle_route_update_request { prefix: Bytes::from("example.valid"), path: vec![ Bytes::from("example.a"), - service.ilp_address.to_bytes(), + service.ilp_address.read().to_bytes(), Bytes::from("example.b"), ], auth: [0; 32], diff --git a/crates/interledger-ccp/src/test_helpers.rs b/crates/interledger-ccp/src/test_helpers.rs index 7af39044d..c248fa5a2 100644 --- a/crates/interledger-ccp/src/test_helpers.rs +++ b/crates/interledger-ccp/src/test_helpers.rs @@ -8,8 +8,8 @@ use futures::{ }; use interledger_packet::{Address, ErrorCode, RejectBuilder}; use interledger_service::{ - incoming_service_fn, outgoing_service_fn, BoxedIlpFuture, IncomingService, OutgoingRequest, - OutgoingService, Username, + incoming_service_fn, outgoing_service_fn, AddressStore, BoxedIlpFuture, IncomingService, + OutgoingRequest, OutgoingService, Username, }; #[cfg(test)] use lazy_static::lazy_static; @@ -115,6 +115,25 @@ impl TestStore { type RoutingTable = HashMap; +impl AddressStore for TestStore { + /// Saves the ILP Address in the store's memory and database + fn set_ilp_address( + &self, + _ilp_address: Address, + ) -> Box + Send> { + unimplemented!() + } + + fn clear_ilp_address(&self) -> Box + Send> { + unimplemented!() + } + + /// Get's the store's ilp address from memory + fn get_ilp_address(&self) -> Address { + Address::from_str("example.connector").unwrap() + } +} + impl RouteManagerStore for TestStore { type Account = TestAccount;