Skip to content

Commit

Permalink
fix(ccp): Update ILP Address before checking routes
Browse files Browse the repository at this point in the history
This fixes the three_nodes test. Previously, the global_prefix would not get adjusted after our store had its ilp address updated. This resulted in _filter_routes_ and _update_best_routes_ to misbehave. By updating the ccp service's ilp address and global prefix before making any route broadcasts/receivals, we ensure that we always use the most up to date store address.

Can this result in performance penalties, since we're performing an extra write? Probably not, since everything is done in memory
  • Loading branch information
gakonst committed Sep 24, 2019
1 parent 83cb030 commit af7311a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 31 deletions.
3 changes: 2 additions & 1 deletion crates/interledger-ccp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod server;
#[cfg(test)]
mod test_helpers;

pub use packet::{Mode, RouteControlRequest};
pub use server::{CcpRouteManager, CcpRouteManagerBuilder};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -69,7 +70,7 @@ impl ToString for RoutingRelation {
}
}

/// DefineCcpAccountethods Account types need to be used by the CCP Service
/// Define CcpAccount methods and Account types that need to be used by the CCP Service
pub trait CcpRoutingAccount: Account {
/// The type of relationship we have with this account
fn routing_relation(&self) -> RoutingRelation;
Expand Down
8 changes: 4 additions & 4 deletions crates/interledger-ccp/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ impl TryFrom<u8> for Mode {

#[derive(Clone, PartialEq)]
pub struct RouteControlRequest {
pub(crate) mode: Mode,
pub mode: Mode,
// TODO change debug to format this as hex
pub(crate) last_known_routing_table_id: [u8; 16],
pub(crate) last_known_epoch: u32,
pub(crate) features: Vec<String>,
pub last_known_routing_table_id: [u8; 16],
pub last_known_epoch: u32,
pub features: Vec<String>,
}

impl Debug for RouteControlRequest {
Expand Down
62 changes: 38 additions & 24 deletions crates/interledger-ccp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use futures::{
use interledger_packet::PrepareBuilder;
use interledger_packet::{Address, ErrorCode, Fulfill, Reject, RejectBuilder};
use interledger_service::{
Account, BoxedIlpFuture, IncomingRequest, IncomingService, OutgoingRequest, OutgoingService,
Account, AddressStore, BoxedIlpFuture, IncomingRequest, IncomingService, OutgoingRequest,
OutgoingService,
};
#[cfg(test)]
use lazy_static::lazy_static;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl<I, O, S, A> CcpRouteManagerBuilder<I, O, S>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
S: AddressStore + RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
pub fn new(ilp_address: Address, store: S, outgoing: O, next_incoming: I) -> Self {
Expand Down Expand Up @@ -102,8 +103,8 @@ where
pub fn to_service(&self) -> CcpRouteManager<I, O, S, A> {
#[allow(clippy::let_and_return)]
let service = CcpRouteManager {
ilp_address: self.ilp_address.clone(),
global_prefix: self.global_prefix.clone(),
ilp_address: Arc::new(RwLock::new(self.ilp_address.clone())),
global_prefix: Arc::new(RwLock::new(self.global_prefix.clone())),
next_incoming: self.next_incoming.clone(),
outgoing: self.outgoing.clone(),
store: self.store.clone(),
Expand Down Expand Up @@ -132,8 +133,8 @@ where
/// received from peers.
#[derive(Clone)]
pub struct CcpRouteManager<I, O, S, A: Account> {
ilp_address: Address,
global_prefix: Bytes,
ilp_address: Arc<RwLock<Address>>,
global_prefix: Arc<RwLock<Bytes>>,
/// The next request handler that will be used both to pass on requests that are not CCP messages.
next_incoming: I,
/// The outgoing request handler that will be used to send outgoing CCP messages.
Expand Down Expand Up @@ -162,7 +163,7 @@ impl<I, O, S, A> CcpRouteManager<I, O, S, A>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
S: AddressStore + RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
/// Returns a future that will trigger this service to update its routes and broadcast
Expand All @@ -173,6 +174,8 @@ where
Interval::new(Instant::now(), Duration::from_millis(interval))
.map_err(|err| error!("Interval error, no longer sending route updates: {:?}", err))
.for_each(move |_| {
// ensure we have the latest ILP Address from the store
clone.update_ilp_address();
clone.broadcast_routes().then(|_| {
// Returning an error would end the broadcast loop
// so we want to return Ok even if there was an error
Expand All @@ -182,6 +185,17 @@ where
})
}

fn update_ilp_address(&self) {
let ilp_address = self.store.get_ilp_address();
*self.global_prefix.write() = ilp_address
.to_bytes()
.iter()
.position(|c| c == &b'.')
.map(|index| ilp_address.to_bytes().slice_to(index + 1))
.unwrap_or_else(|| ilp_address.to_bytes().clone());
*self.ilp_address.write() = ilp_address;
}

pub fn broadcast_routes(&self) -> impl Future<Item = (), Error = ()> {
let clone = self.clone();
self.update_best_routes(None)
Expand Down Expand Up @@ -213,7 +227,7 @@ where
return Either::A(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"We are not configured to send routes to you, sorry",
triggered_by: Some(&self.ilp_address),
triggered_by: Some(&self.ilp_address.read()),
data: &[],
}
.build()));
Expand All @@ -224,7 +238,7 @@ where
return Either::A(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Invalid route control request",
triggered_by: Some(&self.ilp_address),
triggered_by: Some(&self.ilp_address.read()),
data: &[],
}
.build()));
Expand Down Expand Up @@ -252,7 +266,7 @@ where

#[cfg(test)]
{
let ilp_address = self.ilp_address.clone();
let ilp_address = self.ilp_address.read().clone();
return Either::B(Either::A(
self.send_route_update(request.from.clone(), from_epoch_index, to_epoch_index)
.map_err(move |_| {
Expand Down Expand Up @@ -295,16 +309,16 @@ where
.new_routes
.into_iter()
.filter(|route| {
if !route.prefix.starts_with(&self.global_prefix) {
if !route.prefix.starts_with(&self.global_prefix.read()) {
warn!("Got route for a different global prefix: {:?}", route);
false
} else if route.prefix.len() <= self.global_prefix.len() {
} else if route.prefix.len() <= self.global_prefix.read().len() {
warn!("Got route broadcast for the global prefix: {:?}", route);
false
} else if route.prefix.starts_with(self.ilp_address.as_ref()) {
} else if route.prefix.starts_with(self.ilp_address.read().as_ref()) {
trace!("Ignoring route broadcast for a prefix that starts with our own address: {:?}", route);
false
} else if route.path.contains(self.ilp_address.as_ref()) {
} else if route.path.contains(self.ilp_address.read().as_ref()) {
trace!(
"Ignoring route broadcast for a route that includes us: {:?}",
route
Expand All @@ -328,7 +342,7 @@ where
return Box::new(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Your route broadcasts are not accepted here",
triggered_by: Some(&self.ilp_address),
triggered_by: Some(&self.ilp_address.read()),
data: &[],
}
.build()));
Expand All @@ -339,7 +353,7 @@ where
return Box::new(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Invalid route update request",
triggered_by: Some(&self.ilp_address),
triggered_by: Some(&self.ilp_address.read()),
data: &[],
}
.build()));
Expand Down Expand Up @@ -401,7 +415,7 @@ where
code: ErrorCode::T00_INTERNAL_ERROR,
message: b"Error processing route update",
data: &[],
triggered_by: Some(&ilp_address),
triggered_by: Some(&ilp_address.read()),
}
.build()
})
Expand All @@ -415,7 +429,7 @@ where
code: ErrorCode::F00_BAD_REQUEST,
message: &message.as_bytes(),
data: &[],
triggered_by: Some(&self.ilp_address),
triggered_by: Some(&self.ilp_address.read()),
}
.build();
let table = &incoming_tables[&request.from.id()];
Expand Down Expand Up @@ -489,8 +503,8 @@ where
let forwarding_table = self.forwarding_table.clone();
let forwarding_table_updates = self.forwarding_table_updates.clone();
let incoming_tables = self.incoming_tables.clone();
let ilp_address = self.ilp_address.clone();
let global_prefix = self.global_prefix.clone();
let ilp_address = self.ilp_address.read().clone();
let global_prefix = self.global_prefix.read().clone();
let mut store = self.store.clone();

self.store.get_local_and_configured_routes().and_then(
Expand Down Expand Up @@ -557,7 +571,7 @@ where
// Don't advertise routes that don't start with the global prefix
if route.prefix.starts_with(&global_prefix[..])
// Don't advertise the global prefix
&& route.prefix != global_prefix
&& route.prefix != *global_prefix
// Don't advertise completely local routes because advertising our own
// prefix will make sure we get packets sent to them
&& !(route.prefix.starts_with(ilp_address.as_ref()) && route.path.is_empty())
Expand Down Expand Up @@ -715,7 +729,7 @@ where
current_epoch_index,
new_routes: new_routes.clone(),
withdrawn_routes: withdrawn_routes.clone(),
speaker: self.ilp_address.clone(),
speaker: self.ilp_address.read().clone(),
hold_down_time: DEFAULT_ROUTE_EXPIRY_TIME,
}
}
Expand Down Expand Up @@ -835,7 +849,7 @@ impl<I, O, S, A> IncomingService<A> for CcpRouteManager<I, O, S, A>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
S: AddressStore + RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
type Future = BoxedIlpFuture;
Expand Down Expand Up @@ -1240,7 +1254,7 @@ mod handle_route_update_request {
prefix: Bytes::from("example.valid"),
path: vec![
Bytes::from("example.a"),
service.ilp_address.to_bytes(),
service.ilp_address.read().to_bytes(),
Bytes::from("example.b"),
],
auth: [0; 32],
Expand Down
23 changes: 21 additions & 2 deletions crates/interledger-ccp/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::{
};
use interledger_packet::{Address, ErrorCode, RejectBuilder};
use interledger_service::{
incoming_service_fn, outgoing_service_fn, BoxedIlpFuture, IncomingService, OutgoingRequest,
OutgoingService, Username,
incoming_service_fn, outgoing_service_fn, AddressStore, BoxedIlpFuture, IncomingService,
OutgoingRequest, OutgoingService, Username,
};
#[cfg(test)]
use lazy_static::lazy_static;
Expand Down Expand Up @@ -115,6 +115,25 @@ impl TestStore {

type RoutingTable<A> = HashMap<Bytes, A>;

impl AddressStore for TestStore {
/// Saves the ILP Address in the store's memory and database
fn set_ilp_address(
&self,
_ilp_address: Address,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
unimplemented!()
}

fn clear_ilp_address(&self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
unimplemented!()
}

/// Get's the store's ilp address from memory
fn get_ilp_address(&self) -> Address {
Address::from_str("example.connector").unwrap()
}
}

impl RouteManagerStore for TestStore {
type Account = TestAccount;

Expand Down

0 comments on commit af7311a

Please sign in to comment.