Skip to content

Commit

Permalink
feat(api-btp): Connect to BTP when adding new account
Browse files Browse the repository at this point in the history
Closes #285
  • Loading branch information
gakonst committed Sep 24, 2019
1 parent d95be4c commit cc76f02
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/interledger-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interledger-settlement = { path = "../interledger-settlement", version = "^0.1.1
interledger-spsp = { path = "../interledger-spsp", version = "^0.2.2-alpha.1"}
interledger-stream = { path = "../interledger-stream", version = "^0.2.2-alpha.1" }
interledger-ccp = { path = "../interledger-ccp", version = "^0.1.1-alpha.1"}
interledger-btp = { path = "../interledger-btp", version = "^0.2.2-alpha.1"}
log = "0.4.6"
serde = "1.0.99"
serde_json = "1.0.39"
Expand Down
15 changes: 11 additions & 4 deletions crates/interledger-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use std::{
};
use warp::{self, Filter};
mod routes;
use interledger_ccp::{CcpRoutingAccount, RoutingRelation};
use interledger_btp::{BtpAccount, BtpOutgoingService};
use interledger_ccp::CcpRoutingAccount;

pub(crate) mod http_retry;

Expand Down Expand Up @@ -125,16 +126,17 @@ impl Display for ApiError {

impl StdError for ApiError {}

pub struct NodeApi<S, I, O> {
pub struct NodeApi<S, I, O, B, A: Account> {
store: S,
admin_api_token: String,
default_spsp_account: Option<Username>,
incoming_handler: I,
outgoing_handler: O,
btp: BtpOutgoingService<B, A>,
server_secret: Bytes,
}

impl<S, I, O, A> NodeApi<S, I, O>
impl<S, I, O, B, A> NodeApi<S, I, O, B, A>
where
S: NodeStore<Account = A>
+ HttpStore<Account = A>
Expand All @@ -145,7 +147,9 @@ where
+ ExchangeRateStore,
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount
B: OutgoingService<A> + Clone + Send + Sync + 'static,
A: BtpAccount
+ CcpRoutingAccount
+ Account
+ HttpAccount
+ SettlementAccount
Expand All @@ -160,13 +164,15 @@ where
store: S,
incoming_handler: I,
outgoing_handler: O,
btp: BtpOutgoingService<B, A>,
) -> Self {
NodeApi {
store,
admin_api_token,
default_spsp_account: None,
incoming_handler,
outgoing_handler,
btp,
server_secret,
}
}
Expand All @@ -189,6 +195,7 @@ where
self.default_spsp_account,
self.incoming_handler,
self.outgoing_handler,
self.btp,
self.store.clone(),
))
.or(routes::node_settings_api(self.admin_api_token, self.store))
Expand Down
30 changes: 25 additions & 5 deletions crates/interledger-api/src/routes/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::{
future::{err, ok, result, Either},
Future, Stream,
};
use interledger_btp::{connect_to_service_account, BtpAccount, BtpOutgoingService};
use interledger_ccp::{CcpRoutingAccount, Mode, RouteControlRequest, RoutingRelation};
use interledger_http::{HttpAccount, HttpStore};
use interledger_ildcp::IldcpRequest;
Expand Down Expand Up @@ -32,24 +33,26 @@ struct SpspPayRequest {
source_amount: u64,
}

pub fn accounts_api<I, O, S, A>(
pub fn accounts_api<I, O, S, A, B>(
server_secret: Bytes,
admin_api_token: String,
default_spsp_account: Option<Username>,
incoming_handler: I,
outgoing_handler: O,
btp: BtpOutgoingService<B, A>,
store: S,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
B: OutgoingService<A> + Clone + Send + Sync + 'static,
S: NodeStore<Account = A>
+ HttpStore<Account = A>
+ BalanceStore<Account = A>
+ StreamNotificationsStore<Account = A>
+ ExchangeRateStore
+ RouterStore,
A: CcpRoutingAccount + Account + HttpAccount + Serialize + Send + Sync + 'static,
A: BtpAccount + CcpRoutingAccount + Account + HttpAccount + Serialize + Send + Sync + 'static,
{
// TODO can we make any of the Filters const or put them in lazy_static?

Expand Down Expand Up @@ -133,12 +136,29 @@ where
let handler = outgoing_handler.clone();
let settlement_engine_url = account_details.settlement_engine_url.clone();
let http_client = http_client.clone();
let btp = btp.clone();
store.insert_account(account_details)
.map_err(|_| {
warp::reject::custom(ApiError::InternalServerError)
})
.and_then(move |account: A| {
let fut = if account.routing_relation() == RoutingRelation::Parent {
// Try to connect to the account's BTP socket if they have
// one configured
let btp_connect_fut = if account.get_btp_uri().is_some() {
Either::A(
connect_to_service_account(account.clone(), btp)
.map_err(|_| {
warp::reject::custom(ApiError::InternalServerError)
})
)
} else {
Either::B(ok(()))
};

btp_connect_fut.and_then(move |_| {
// If we added a parent, get the address assigned to us by
// them and update all of our routes
let get_ilp_address_fut = if account.routing_relation() == RoutingRelation::Parent {
Either::A(
get_address_from_parent_and_update_routes(handler, account.clone(), store_clone)
.map_err(|_| {
Expand All @@ -151,7 +171,7 @@ where
let http_client = http_client.clone();
// Register the account with the settlement engine
// if a settlement_engine_url was configured on the account
fut.and_then(move |_|
get_ilp_address_fut.and_then(move |_|
if let Some(se_url) = settlement_engine_url {
let id = account.id();
Either::A(result(Url::parse(&se_url))
Expand Down Expand Up @@ -184,7 +204,7 @@ where
}))
} else {
Either::B(ok(account))
})
})})
})
.and_then(|account: A| {
Ok(warp::reply::json(&account))
Expand Down
65 changes: 65 additions & 0 deletions crates/interledger-btp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,68 @@ where
Ok(service)
})
}

pub fn connect_to_service_account<O, A>(
account: A,
service: BtpOutgoingService<O, A>,
) -> impl Future<Item = (), Error = ()>
where
O: OutgoingService<A> + Clone + 'static,
A: BtpAccount + 'static,
{
let account_id = account.id();
let mut url = account
.get_btp_uri()
.expect("Accounts must have BTP URLs")
.clone();
if url.scheme().starts_with("btp+") {
url.set_scheme(&url.scheme().replace("btp+", "")).unwrap();
}
let token = account
.get_btp_token()
.map(|s| s.to_vec())
.unwrap_or_default();
debug!("Connecting to {}", url);
connect_async(url.clone())
.map_err(move |err| {
error!(
"Error connecting to WebSocket server for account: {} {:?}",
account_id, err
)
})
.and_then(move |(connection, _)| {
trace!(
"Connected to account {} (URI: {}), sending auth packet",
account_id,
url
);
// Send BTP authentication
let auth_packet = Message::Binary(
BtpPacket::Message(BtpMessage {
request_id: random(),
protocol_data: vec![
ProtocolData {
protocol_name: String::from("auth"),
content_type: ContentType::ApplicationOctetStream,
data: vec![],
},
ProtocolData {
protocol_name: String::from("auth_token"),
content_type: ContentType::TextPlainUtf8,
data: token,
},
],
})
.to_bytes(),
);

connection
.send(auth_packet)
.map_err(move |_| error!("Error sending auth packet on connection: {}", url))
.and_then(move |connection| {
debug!("Connected to account {}'s server", account.id());
service.add_connection(account, connection);
Ok(())
})
})
}
2 changes: 1 addition & 1 deletion crates/interledger-btp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod packet;
mod server;
mod service;

pub use self::client::{connect_client, parse_btp_url};
pub use self::client::{connect_client, connect_to_service_account, parse_btp_url};
pub use self::server::{create_open_signup_server, create_server};
pub use self::service::{BtpOutgoingService, BtpService};
use interledger_packet::Address;
Expand Down

0 comments on commit cc76f02

Please sign in to comment.