From cc76f0206785ee71ef3b019a24e29b1d6f124cf6 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 22 Sep 2019 13:42:04 +0900 Subject: [PATCH] feat(api-btp): Connect to BTP when adding new account Closes https://github.com/interledger-rs/interledger-rs/issues/285 --- Cargo.lock | 1 + crates/interledger-api/Cargo.toml | 1 + crates/interledger-api/src/lib.rs | 15 +++-- crates/interledger-api/src/routes/accounts.rs | 30 +++++++-- crates/interledger-btp/src/client.rs | 65 +++++++++++++++++++ crates/interledger-btp/src/lib.rs | 2 +- 6 files changed, 104 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37ee91a18..002d450d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,6 +997,7 @@ dependencies = [ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures-retry 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "interledger-btp 0.2.2-alpha.1", "interledger-ccp 0.1.1-alpha.1", "interledger-http 0.2.2-alpha.1", "interledger-ildcp 0.2.2-alpha.1", diff --git a/crates/interledger-api/Cargo.toml b/crates/interledger-api/Cargo.toml index 29260ba9d..3f7a53d69 100644 --- a/crates/interledger-api/Cargo.toml +++ b/crates/interledger-api/Cargo.toml @@ -22,6 +22,7 @@ interledger-settlement = { path = "../interledger-settlement", version = "^0.1.1 interledger-spsp = { path = "../interledger-spsp", version = "^0.2.2-alpha.1"} interledger-stream = { path = "../interledger-stream", version = "^0.2.2-alpha.1" } interledger-ccp = { path = "../interledger-ccp", version = "^0.1.1-alpha.1"} +interledger-btp = { path = "../interledger-btp", version = "^0.2.2-alpha.1"} log = "0.4.6" serde = "1.0.99" serde_json = "1.0.39" diff --git a/crates/interledger-api/src/lib.rs b/crates/interledger-api/src/lib.rs index efda3e881..d5b40239a 100644 --- a/crates/interledger-api/src/lib.rs +++ b/crates/interledger-api/src/lib.rs @@ -15,7 +15,8 @@ use std::{ }; use warp::{self, Filter}; mod routes; -use interledger_ccp::{CcpRoutingAccount, RoutingRelation}; +use interledger_btp::{BtpAccount, BtpOutgoingService}; +use interledger_ccp::CcpRoutingAccount; pub(crate) mod http_retry; @@ -125,16 +126,17 @@ impl Display for ApiError { impl StdError for ApiError {} -pub struct NodeApi { +pub struct NodeApi { store: S, admin_api_token: String, default_spsp_account: Option, incoming_handler: I, outgoing_handler: O, + btp: BtpOutgoingService, server_secret: Bytes, } -impl NodeApi +impl NodeApi where S: NodeStore + HttpStore @@ -145,7 +147,9 @@ where + ExchangeRateStore, I: IncomingService + Clone + Send + Sync + 'static, O: OutgoingService + Clone + Send + Sync + 'static, - A: CcpRoutingAccount + B: OutgoingService + Clone + Send + Sync + 'static, + A: BtpAccount + + CcpRoutingAccount + Account + HttpAccount + SettlementAccount @@ -160,6 +164,7 @@ where store: S, incoming_handler: I, outgoing_handler: O, + btp: BtpOutgoingService, ) -> Self { NodeApi { store, @@ -167,6 +172,7 @@ where default_spsp_account: None, incoming_handler, outgoing_handler, + btp, server_secret, } } @@ -189,6 +195,7 @@ where self.default_spsp_account, self.incoming_handler, self.outgoing_handler, + self.btp, self.store.clone(), )) .or(routes::node_settings_api(self.admin_api_token, self.store)) diff --git a/crates/interledger-api/src/routes/accounts.rs b/crates/interledger-api/src/routes/accounts.rs index a977a5f4f..3de7d7643 100644 --- a/crates/interledger-api/src/routes/accounts.rs +++ b/crates/interledger-api/src/routes/accounts.rs @@ -4,6 +4,7 @@ use futures::{ future::{err, ok, result, Either}, Future, Stream, }; +use interledger_btp::{connect_to_service_account, BtpAccount, BtpOutgoingService}; use interledger_ccp::{CcpRoutingAccount, Mode, RouteControlRequest, RoutingRelation}; use interledger_http::{HttpAccount, HttpStore}; use interledger_ildcp::IldcpRequest; @@ -32,24 +33,26 @@ struct SpspPayRequest { source_amount: u64, } -pub fn accounts_api( +pub fn accounts_api( server_secret: Bytes, admin_api_token: String, default_spsp_account: Option, incoming_handler: I, outgoing_handler: O, + btp: BtpOutgoingService, store: S, ) -> warp::filters::BoxedFilter<(impl warp::Reply,)> where I: IncomingService + Clone + Send + Sync + 'static, O: OutgoingService + Clone + Send + Sync + 'static, + B: OutgoingService + Clone + Send + Sync + 'static, S: NodeStore + HttpStore + BalanceStore + StreamNotificationsStore + ExchangeRateStore + RouterStore, - A: CcpRoutingAccount + Account + HttpAccount + Serialize + Send + Sync + 'static, + A: BtpAccount + CcpRoutingAccount + Account + HttpAccount + Serialize + Send + Sync + 'static, { // TODO can we make any of the Filters const or put them in lazy_static? @@ -133,12 +136,29 @@ where let handler = outgoing_handler.clone(); let settlement_engine_url = account_details.settlement_engine_url.clone(); let http_client = http_client.clone(); + let btp = btp.clone(); store.insert_account(account_details) .map_err(|_| { warp::reject::custom(ApiError::InternalServerError) }) .and_then(move |account: A| { - let fut = if account.routing_relation() == RoutingRelation::Parent { + // Try to connect to the account's BTP socket if they have + // one configured + let btp_connect_fut = if account.get_btp_uri().is_some() { + Either::A( + connect_to_service_account(account.clone(), btp) + .map_err(|_| { + warp::reject::custom(ApiError::InternalServerError) + }) + ) + } else { + Either::B(ok(())) + }; + + btp_connect_fut.and_then(move |_| { + // If we added a parent, get the address assigned to us by + // them and update all of our routes + let get_ilp_address_fut = if account.routing_relation() == RoutingRelation::Parent { Either::A( get_address_from_parent_and_update_routes(handler, account.clone(), store_clone) .map_err(|_| { @@ -151,7 +171,7 @@ where let http_client = http_client.clone(); // Register the account with the settlement engine // if a settlement_engine_url was configured on the account - fut.and_then(move |_| + get_ilp_address_fut.and_then(move |_| if let Some(se_url) = settlement_engine_url { let id = account.id(); Either::A(result(Url::parse(&se_url)) @@ -184,7 +204,7 @@ where })) } else { Either::B(ok(account)) - }) + })}) }) .and_then(|account: A| { Ok(warp::reply::json(&account)) diff --git a/crates/interledger-btp/src/client.rs b/crates/interledger-btp/src/client.rs index 598083d95..f5c82c362 100644 --- a/crates/interledger-btp/src/client.rs +++ b/crates/interledger-btp/src/client.rs @@ -107,3 +107,68 @@ where Ok(service) }) } + +pub fn connect_to_service_account( + account: A, + service: BtpOutgoingService, +) -> impl Future +where + O: OutgoingService + Clone + 'static, + A: BtpAccount + 'static, +{ + let account_id = account.id(); + let mut url = account + .get_btp_uri() + .expect("Accounts must have BTP URLs") + .clone(); + if url.scheme().starts_with("btp+") { + url.set_scheme(&url.scheme().replace("btp+", "")).unwrap(); + } + let token = account + .get_btp_token() + .map(|s| s.to_vec()) + .unwrap_or_default(); + debug!("Connecting to {}", url); + connect_async(url.clone()) + .map_err(move |err| { + error!( + "Error connecting to WebSocket server for account: {} {:?}", + account_id, err + ) + }) + .and_then(move |(connection, _)| { + trace!( + "Connected to account {} (URI: {}), sending auth packet", + account_id, + url + ); + // Send BTP authentication + let auth_packet = Message::Binary( + BtpPacket::Message(BtpMessage { + request_id: random(), + protocol_data: vec![ + ProtocolData { + protocol_name: String::from("auth"), + content_type: ContentType::ApplicationOctetStream, + data: vec![], + }, + ProtocolData { + protocol_name: String::from("auth_token"), + content_type: ContentType::TextPlainUtf8, + data: token, + }, + ], + }) + .to_bytes(), + ); + + connection + .send(auth_packet) + .map_err(move |_| error!("Error sending auth packet on connection: {}", url)) + .and_then(move |connection| { + debug!("Connected to account {}'s server", account.id()); + service.add_connection(account, connection); + Ok(()) + }) + }) +} diff --git a/crates/interledger-btp/src/lib.rs b/crates/interledger-btp/src/lib.rs index f8d050714..93171ed98 100644 --- a/crates/interledger-btp/src/lib.rs +++ b/crates/interledger-btp/src/lib.rs @@ -17,7 +17,7 @@ mod packet; mod server; mod service; -pub use self::client::{connect_client, parse_btp_url}; +pub use self::client::{connect_client, connect_to_service_account, parse_btp_url}; pub use self::server::{create_open_signup_server, create_server}; pub use self::service::{BtpOutgoingService, BtpService}; use interledger_packet::Address;