From ed83801bb90298b0adcd2b52c808fbca3745aeea Mon Sep 17 00:00:00 2001 From: Kincaid O'Neil Date: Fri, 7 Feb 2020 07:24:11 +0000 Subject: [PATCH] feat: async stream sender with rate enforcement Co-Authored-By: Georgios Konstantopoulos --- Cargo.lock | 52 +- crates/ilp-node/src/node.rs | 2 +- crates/ilp-node/tests/redis/test_helpers.rs | 1 + crates/ilp-node/tests/redis/three_nodes.rs | 85 +- crates/interledger-api/src/routes/accounts.rs | 14 +- .../src/routes/node_settings.rs | 1 - crates/interledger-rates/Cargo.toml | 9 +- crates/interledger-rates/src/lib.rs | 2 +- crates/interledger-spsp/Cargo.toml | 1 + crates/interledger-spsp/src/client.rs | 33 +- crates/interledger-stream/Cargo.toml | 10 +- crates/interledger-stream/src/client.rs | 1218 +++++++++++------ crates/interledger-stream/src/congestion.rs | 4 +- crates/interledger-stream/src/crypto.rs | 2 +- crates/interledger-stream/src/lib.rs | 131 +- crates/interledger-stream/src/server.rs | 6 + docs/api.yml | 248 ++-- examples/eth-xrp-three-nodes/README.md | 50 +- 18 files changed, 1258 insertions(+), 611 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 502595d54..dec19fef9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1289,6 +1289,7 @@ dependencies = [ "futures 0.3.4", "hyper 0.13.2", "interledger-packet", + "interledger-rates", "interledger-service", "interledger-stream", "log 0.4.8", @@ -1352,11 +1353,13 @@ dependencies = [ "futures 0.3.4", "hex", "interledger-errors", - "interledger-ildcp", "interledger-packet", + "interledger-rates", "interledger-router", "interledger-service", + "interledger-service-util", "log 0.4.8", + "num", "once_cell", "parking_lot 0.10.0", "pin-project", @@ -1691,6 +1694,20 @@ dependencies = [ "version_check 0.1.5", ] +[[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -1702,6 +1719,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg 1.0.0", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.42" @@ -1712,6 +1739,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfb0800a0291891dd9f4fe7bd9c19384f98f7fbe0cd0f39a2c6b88b9868bbc00" +dependencies = [ + "autocfg 1.0.0", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da4dc79f9e6c81bef96148c8f6b8e72ad4541caa4a24373e900a36da07de03a3" +dependencies = [ + "autocfg 1.0.0", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.11" diff --git a/crates/ilp-node/src/node.rs b/crates/ilp-node/src/node.rs index 2def19303..5ec3c6990 100644 --- a/crates/ilp-node/src/node.rs +++ b/crates/ilp-node/src/node.rs @@ -474,7 +474,7 @@ impl InterledgerNode { bytes05::Bytes::copy_from_slice(secret_seed.as_ref()), admin_auth_token, store.clone(), - incoming_service_api.clone(), + incoming_service_api, outgoing_service.clone(), btp.clone(), // btp client service! ); diff --git a/crates/ilp-node/tests/redis/test_helpers.rs b/crates/ilp-node/tests/redis/test_helpers.rs index aea1407c9..d2d594cb4 100644 --- a/crates/ilp-node/tests/redis/test_helpers.rs +++ b/crates/ilp-node/tests/redis/test_helpers.rs @@ -85,6 +85,7 @@ pub async fn send_money_to_username( .json(&json!({ "receiver": format!("http://localhost:{}/accounts/{}/spsp", to_port, to_username), "source_amount": amount, + "slippage": 0.025 // allow up to 2.5% slippage })) .send() .map_err(|_| ()) diff --git a/crates/ilp-node/tests/redis/three_nodes.rs b/crates/ilp-node/tests/redis/three_nodes.rs index 95f3f3d92..176a0e789 100644 --- a/crates/ilp-node/tests/redis/three_nodes.rs +++ b/crates/ilp-node/tests/redis/three_nodes.rs @@ -106,6 +106,7 @@ async fn three_nodes() { "route_broadcast_interval": Some(200), "exchange_rate": { "poll_interval": 60000, + "spread": 0.02, // take a 2% spread }, })) .expect("Error creating node2."); @@ -119,7 +120,7 @@ async fn three_nodes() { "secret_seed": random_secret(), "route_broadcast_interval": Some(200), "exchange_rate": { - "poll_interval": 60000, + "poll_interval": 60000 }, })) .expect("Error creating node3."); @@ -131,6 +132,14 @@ async fn three_nodes() { create_account_on_node(node1_http, bob_on_alice, "admin") .await .unwrap(); + let client = reqwest::Client::new(); + client + .put(&format!("http://localhost:{}/rates", node1_http)) + .header("Authorization", "Bearer admin") + .json(&json!({"ABC": 1, "XYZ": 2.01})) + .send() + .await + .unwrap(); node2.serve().await.unwrap(); create_account_on_node(node2_http, alice_on_bob, "admin") @@ -139,8 +148,6 @@ async fn three_nodes() { create_account_on_node(node2_http, charlie_on_bob, "admin") .await .unwrap(); - // Also set exchange rates - let client = reqwest::Client::new(); client .put(&format!("http://localhost:{}/rates", node2_http)) .header("Authorization", "Bearer admin") @@ -156,6 +163,13 @@ async fn three_nodes() { create_account_on_node(node3_http, bob_on_charlie, "admin") .await .unwrap(); + client + .put(&format!("http://localhost:{}/rates", node3_http)) + .header("Authorization", "Bearer admin") + .json(&json!({"ABC": 1, "XYZ": 2})) + .send() + .await + .unwrap(); delay(1000).await; @@ -167,14 +181,16 @@ async fn three_nodes() { ]) }; - // Node 1 sends 1000 to Node 3. However, Node1's scale is 9, + // Node 1 sends 1,000,000 to Node 3. However, Node1's scale is 9, // while Node 3's scale is 6. This means that Node 3 will - // see 1000x less. In addition, the conversion rate is 2:1 - // for 3's asset, so he will receive 2 total. + // see 1000x less. Node 2's rate is 2:1, minus a 2% spread. + // Node 1 has however set the rate to 2.01:1. The payment suceeds because + // the end delivered result to Node 3 is 1960 which is slightly + // over 0.975 * 2010, which is within the 2.5% slippage set by the sender. let receipt = send_money_to_username( node1_http, node3_http, - 1000, + 1_000_000, "charlie_on_c", "alice_on_a", "default account holder", @@ -191,40 +207,45 @@ async fn three_nodes() { .to .to_string() .starts_with("example.bob.charlie_on_b.charlie_on_c.")); - assert_eq!(receipt.sent_asset_code, "XYZ"); - assert_eq!(receipt.sent_asset_scale, 9); - assert_eq!(receipt.sent_amount, 1000); - assert_eq!(receipt.delivered_asset_code.unwrap(), "ABC"); - assert_eq!(receipt.delivered_amount, 2); - assert_eq!(receipt.delivered_asset_scale.unwrap(), 6); + assert_eq!(receipt.source_asset_code, "XYZ"); + assert_eq!(receipt.source_asset_scale, 9); + assert_eq!(receipt.source_amount, 1_000_000); + assert_eq!(receipt.sent_amount, 1_000_000); + assert_eq!(receipt.in_flight_amount, 0); + assert_eq!(receipt.delivered_amount, 1960); + assert_eq!(receipt.destination_asset_code.unwrap(), "ABC"); + assert_eq!(receipt.destination_asset_scale.unwrap(), 6); let ret = get_balances().await; let ret: Vec<_> = ret.into_iter().map(|r| r.unwrap()).collect(); - // -1000 divided by asset scale 9 + // -1000000 divided by asset scale 9 assert_eq!( ret[0], BalanceData { asset_code: "XYZ".to_owned(), - balance: -1e-6 + balance: -1_000_000.0 / 1e9 } ); - // 2 divided by asset scale 6 + // 1960 divided by asset scale 6 assert_eq!( ret[1], BalanceData { asset_code: "ABC".to_owned(), - balance: 2e-6 + balance: 1960.0 / 1e6 } ); - // 2 divided by asset scale 6 + // 1960 divided by asset scale 6 assert_eq!( ret[2], BalanceData { asset_code: "ABC".to_owned(), - balance: 2e-6 + balance: 1960.0 / 1e6 } ); - // Charlie sends to Alice + // Node 3 sends 1,000 to Node 1. However, Node 1's scale is 9, + // while Node 3's scale is 6. This means that Node 1 will + // see 1000x more. Node 2's rate is 1:2, minus a 2% spread. + // Node 3 should receive 495,500 units total. let receipt = send_money_to_username( node3_http, node1_http, @@ -242,36 +263,38 @@ async fn three_nodes() { "Payment receipt incorrect (2)" ); assert!(receipt.to.to_string().starts_with("example.alice")); - assert_eq!(receipt.sent_asset_code, "ABC"); - assert_eq!(receipt.sent_asset_scale, 6); + assert_eq!(receipt.source_asset_code, "ABC"); + assert_eq!(receipt.source_asset_scale, 6); + assert_eq!(receipt.source_amount, 1000); assert_eq!(receipt.sent_amount, 1000); - assert_eq!(receipt.delivered_asset_code.unwrap(), "XYZ"); - assert_eq!(receipt.delivered_amount, 500_000); - assert_eq!(receipt.delivered_asset_scale.unwrap(), 9); + assert_eq!(receipt.in_flight_amount, 0); + assert_eq!(receipt.delivered_amount, 490_000); + assert_eq!(receipt.destination_asset_code.unwrap(), "XYZ"); + assert_eq!(receipt.destination_asset_scale.unwrap(), 9); let ret = get_balances().await; let ret: Vec<_> = ret.into_iter().map(|r| r.unwrap()).collect(); - // 499,000 divided by asset scale 9 + // (490,000 - 1,000,000) divided by asset scale 9 assert_eq!( ret[0], BalanceData { asset_code: "XYZ".to_owned(), - balance: 499e-6 + balance: (490_000.0 - 1_000_000.0) / 1e9 } ); - // -998 divided by asset scale 6 + // (1,960 - 1,000) divided by asset scale 6 assert_eq!( ret[1], BalanceData { asset_code: "ABC".to_owned(), - balance: -998e-6 + balance: (1960.0 - 1000.0) / 1e6 } ); - // -998 divided by asset scale 6 + // (1,960 - 1,000) divided by asset scale 6 assert_eq!( ret[2], BalanceData { asset_code: "ABC".to_owned(), - balance: -998e-6 + balance: (1960.0 - 1000.0) / 1e6 } ); } diff --git a/crates/interledger-api/src/routes/accounts.rs b/crates/interledger-api/src/routes/accounts.rs index 56a99c90a..54991ce18 100644 --- a/crates/interledger-api/src/routes/accounts.rs +++ b/crates/interledger-api/src/routes/accounts.rs @@ -28,11 +28,20 @@ use warp::{self, reply::Json, Filter, Rejection}; pub const BEARER_TOKEN_START: usize = 7; +const fn get_default_max_slippage() -> f64 { + 0.01 +} + #[derive(Deserialize, Debug)] struct SpspPayRequest { receiver: String, #[serde(deserialize_with = "number_or_string")] source_amount: u64, + #[serde( + deserialize_with = "number_or_string", + default = "get_default_max_slippage" + )] + slippage: f64, } pub fn accounts_api( @@ -339,14 +348,17 @@ where .and(warp::path::end()) .and(deserialize_json()) .and(with_incoming_handler) + .and(with_store.clone()) .and_then( - move |account: A, pay_request: SpspPayRequest, incoming_handler: I| { + move |account: A, pay_request: SpspPayRequest, incoming_handler: I, store: S| { async move { let receipt = pay( incoming_handler, account.clone(), + store, &pay_request.receiver, pay_request.source_amount, + pay_request.slippage, ) .map_err(|err| { let msg = format!("Error sending SPSP payment: {}", err); diff --git a/crates/interledger-api/src/routes/node_settings.rs b/crates/interledger-api/src/routes/node_settings.rs index ab8c346fd..2ff79682f 100644 --- a/crates/interledger-api/src/routes/node_settings.rs +++ b/crates/interledger-api/src/routes/node_settings.rs @@ -7,7 +7,6 @@ use interledger_packet::Address; use interledger_rates::ExchangeRateStore; use interledger_router::RouterStore; use interledger_service::{Account, AccountStore, AddressStore, Username}; -use interledger_service_util::ExchangeRateStore; use interledger_settlement::core::{types::SettlementAccount, SettlementClient}; use log::{error, trace}; use secrecy::{ExposeSecret, SecretString}; diff --git a/crates/interledger-rates/Cargo.toml b/crates/interledger-rates/Cargo.toml index bd3e5e852..438846d07 100644 --- a/crates/interledger-rates/Cargo.toml +++ b/crates/interledger-rates/Cargo.toml @@ -1,12 +1,11 @@ [package] name = "interledger-rates" version = "0.4.0" -authors = ["Kincaid O'Neil "] +authors = ["Evan Schwartz "] +description = "Exchange rate utilities" +license = "Apache-2.0" edition = "2018" - -# TODO Add other metadata here! - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +repository = "https://github.com/interledger-rs/interledger-rs" [dependencies] async-trait = "0.1.22" diff --git a/crates/interledger-rates/src/lib.rs b/crates/interledger-rates/src/lib.rs index 5140f507a..5d4645040 100644 --- a/crates/interledger-rates/src/lib.rs +++ b/crates/interledger-rates/src/lib.rs @@ -61,7 +61,7 @@ pub struct ExchangeRateFetcher { consecutive_failed_polls: Arc, failed_polls_before_invalidation: u32, store: S, - client: Client, // TODO What is a Client? + client: Client, } impl ExchangeRateFetcher diff --git a/crates/interledger-spsp/Cargo.toml b/crates/interledger-spsp/Cargo.toml index 50804a38d..b9b224779 100644 --- a/crates/interledger-spsp/Cargo.toml +++ b/crates/interledger-spsp/Cargo.toml @@ -9,6 +9,7 @@ repository = "https://github.com/interledger-rs/interledger-rs" [dependencies] interledger-packet = { path = "../interledger-packet", version = "^0.4.0", features = ["serde"], default-features = false } +interledger-rates = { path = "../interledger-rates", version = "^0.4.0", default-features = false } interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false } interledger-stream = { path = "../interledger-stream", version = "^0.4.0", default-features = false } diff --git a/crates/interledger-spsp/src/client.rs b/crates/interledger-spsp/src/client.rs index ddb0e2cfe..adcfa74cb 100644 --- a/crates/interledger-spsp/src/client.rs +++ b/crates/interledger-spsp/src/client.rs @@ -1,6 +1,7 @@ use super::{Error, SpspResponse}; use futures::TryFutureExt; use interledger_packet::Address; +use interledger_rates::ExchangeRateStore; use interledger_service::{Account, IncomingService}; use interledger_stream::{send_money, StreamDelivery}; use log::{debug, error, trace}; @@ -32,15 +33,18 @@ pub async fn query(server: &str) -> Result { /// Query the details of the given Payment Pointer and send a payment using the STREAM protocol. /// /// This returns the amount delivered, as reported by the receiver and in the receiver's asset's units. -pub async fn pay( - service: S, +pub async fn pay( + service: I, from_account: A, + store: S, receiver: &str, source_amount: u64, + slippage: f64, ) -> Result where - S: IncomingService + Send + Sync + Clone + 'static, - A: Account + Send + Sync + Clone + 'static, + I: IncomingService + Clone + Send + Sync + 'static, + A: Account + Send + Sync + 'static, + S: ExchangeRateStore + Send + Sync + 'static, { let spsp = query(receiver).await?; let shared_secret = spsp.shared_secret; @@ -51,13 +55,20 @@ where })?; debug!("Sending SPSP payment to address: {}", addr); - let (receipt, _plugin) = - send_money(service, &from_account, addr, &shared_secret, source_amount) - .map_err(move |err| { - error!("Error sending payment: {:?}", err); - Error::SendMoneyError(source_amount) - }) - .await?; + let receipt = send_money( + service, + &from_account, + store, + addr, + &shared_secret, + source_amount, + slippage, + ) + .map_err(move |err| { + error!("Error sending payment: {:?}", err); + Error::SendMoneyError(source_amount) + }) + .await?; debug!("Sent SPSP payment. StreamDelivery: {:?}", receipt); Ok(receipt) diff --git a/crates/interledger-stream/Cargo.toml b/crates/interledger-stream/Cargo.toml index 3dc2e221d..de89afa72 100644 --- a/crates/interledger-stream/Cargo.toml +++ b/crates/interledger-stream/Cargo.toml @@ -12,8 +12,8 @@ repository = "https://github.com/interledger-rs/interledger-rs" metrics_csv = ["csv"] [dependencies] -interledger-ildcp = { path = "../interledger-ildcp", version = "^0.4.0", default-features = false } interledger-packet = { path = "../interledger-packet", version = "^0.4.0", default-features = false, features = ["serde"] } +interledger-rates = { path = "../interledger-rates", version = "^0.4.0", default-features = false } interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false } base64 = { version = "0.11.0", default-features = false } @@ -23,10 +23,11 @@ chrono = { version = "0.4.9", default-features = false, features = ["clock"] } futures = { version = "0.3.1", default-features = false, features = ["std"] } hex = { version = "0.4.0", default-features = false } log = { version = "0.4.8", default-features = false } +num = { version = "0.2.1" } parking_lot = { version = "0.10.0", default-features = false } ring = { version = "0.16.9", default-features = false } serde = { version = "1.0.101", default-features = false } -tokio = { version = "^0.2.6", default-features = false, features = ["rt-core", "macros"] } +tokio = { version = "^0.2.6", default-features = false, features = ["rt-core", "time", "macros"] } uuid = { version = "0.8.1", default-features = false, features = ["v4"] } async-trait = { version = "0.1.22", default-features = false } pin-project = { version = "0.4.7", default-features = false } @@ -36,7 +37,8 @@ thiserror = { version = "1.0.10", default-features = false } csv = { version = "1.1.1", default-features = false, optional = true } [dev-dependencies] -interledger-router = { path = "../interledger-router", version = "^0.4.0", default-features = false } interledger-errors = { path = "../interledger-errors", version = "^0.1.0", default-features = false } +interledger-router = { path = "../interledger-router", version = "^0.4.0", default-features = false } +interledger-service-util = { path = "../interledger-service-util", version = "^0.4.0", default-features = false } -once_cell = { version = "1.3.1", default-features = false } \ No newline at end of file +once_cell = { version = "1.3.1", default-features = false } diff --git a/crates/interledger-stream/src/client.rs b/crates/interledger-stream/src/client.rs index 2e4e68281..1ca9f92af 100644 --- a/crates/interledger-stream/src/client.rs +++ b/crates/interledger-stream/src/client.rs @@ -4,502 +4,659 @@ use super::error::Error; use super::packet::*; use bytes::Bytes; use bytes::BytesMut; -use futures::{ready, TryFutureExt}; -use interledger_ildcp::get_ildcp_info; +use futures::stream::{FuturesUnordered, StreamExt}; use interledger_packet::{ - Address, ErrorClass, ErrorCode as IlpErrorCode, Fulfill, PacketType as IlpPacketType, - PrepareBuilder, Reject, + Address, ErrorClass, ErrorCode as IlpErrorCode, PacketType as IlpPacketType, PrepareBuilder, + Reject, }; +use interledger_rates::ExchangeRateStore; use interledger_service::*; use log::{debug, error, warn}; -use pin_project::{pin_project, project}; +use num::rational::BigRational; +use num::traits::cast::{FromPrimitive, ToPrimitive}; +use num::traits::identities::One; +use num::traits::pow::pow; use serde::{Deserialize, Serialize}; -use std::{ - cell::Cell, - cmp::min, - str, - time::{Duration, Instant, SystemTime}, -}; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use tokio::sync::Mutex; +use tokio::time::timeout_at; +use tokio::time::{Duration, Instant}; + +use std::cmp::{max, min}; +use std::marker::{Send, Sync}; +use std::str; +use std::sync::Arc; +use std::time::SystemTime; /// Maximum time we should wait since last fulfill before we error out to avoid /// getting into an infinite loop of sending packets and effectively DoSing ourselves const MAX_TIME_SINCE_LAST_FULFILL: Duration = Duration::from_secs(30); -/// Metadata about a completed STREAM payment +/// Minimum number of packet attempts before defaulting to failure rate +const FAIL_FAST_MINIMUM_PACKET_ATTEMPTS: u64 = 200; + +/// Minimum rate of rejected packets in order to terminate the payment +const FAIL_FAST_MINIMUM_FAILURE_RATE: f64 = 0.99; + +/// Receipt for STREAM payment to account for how much and what assets were sent & delivered #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct StreamDelivery { - /// The sender's ILP Address + /// Sender's ILP Address pub from: Address, - /// The receiver's ILP Address + /// Receiver's ILP Address pub to: Address, - // StreamDelivery variables which we know ahead of time - /// The amount sent by the sender + /// Asset scale of sender + pub source_asset_scale: u8, + /// Asset code of sender + pub source_asset_code: String, + /// Total amount *intended* to be sent, in source units + pub source_amount: u64, + /// Amount fulfilled or currently in-flight, in source units pub sent_amount: u64, - /// The sender's asset scale - pub sent_asset_scale: u8, - /// The sender's asset code - pub sent_asset_code: String, - /// The amount delivered to the receiver + /// Amount in-flight (yet to be fulfilled or rejected), in source units + pub in_flight_amount: u64, + /// Amount fulfilled and received by the recipient, in destination units pub delivered_amount: u64, - // StreamDelivery variables which may get updated if the receiver sends us a - // ConnectionAssetDetails frame. - /// The asset scale delivered to the receiver - /// (this may change depending on the granularity of accounts across nodes) - pub delivered_asset_scale: Option, - /// The asset code delivered to the receiver (this may happen in cross-currency payments) - pub delivered_asset_code: Option, + /// Receiver's asset scale (this may change depending on the granularity of accounts across nodes) + /// Updated after we received a `ConnectionAssetDetails` frame. + pub destination_asset_scale: Option, + /// Receiver's asset code + /// Updated after we received a `ConnectionAssetDetails` frame. + pub destination_asset_code: Option, } impl StreamDelivery { - /// Increases the `StreamDelivery`'s [`delivered_amount`](./struct.StreamDelivery.html#structfield.delivered_amount) by `amount` - fn increment_delivered_amount(&mut self, amount: u64) { - self.delivered_amount += amount; + pub fn new(from_account: &A, destination: Address, source_amount: u64) -> Self { + StreamDelivery { + from: from_account.ilp_address().clone(), + to: destination, + source_asset_scale: from_account.asset_scale(), + source_asset_code: from_account.asset_code().to_string(), + source_amount, + sent_amount: 0, + in_flight_amount: 0, + destination_asset_scale: None, + destination_asset_code: None, + delivered_amount: 0, + } } } -/// Send a given amount of money using the STREAM transport protocol. -/// -/// This returns the amount delivered, as reported by the receiver and in the receiver's asset's units. -pub async fn send_money( - service: S, +/// Stream payment mutable state: amounts & assets sent and received, sequence, packet counts, and flow control parameters +struct StreamPayment { + /// The [congestion controller](./../congestion/struct.CongestionController.html) to adjust flow control and the in-flight amount + congestion_controller: CongestionController, + /// The [StreamDelivery](./struct.StreamDelivery.html) receipt to account for the delivered amounts + receipt: StreamDelivery, + /// Do we need to send our source account information to the recipient? + should_send_source_account: bool, + /// Monotonically increaing sequence number for this STREAM payment + sequence: u64, + /// Number of fulfilled packets throughout the STREAM payment + fulfilled_packets: u64, + /// Number of rejected packets throughout the STREAM payment + rejected_packets: u64, + /// Timestamp when a packet was last fulfilled for this payment + last_fulfill_time: Instant, +} + +impl StreamPayment { + /// Account for and return amount to send in the next Prepare + #[inline] + fn apply_prepare(&mut self) -> u64 { + let amount = min( + self.get_amount_available_to_send(), + self.congestion_controller.get_max_amount(), + ); + + self.congestion_controller.prepare(amount); + + self.receipt.sent_amount = self.receipt.sent_amount.saturating_add(amount); + self.receipt.in_flight_amount = self.receipt.in_flight_amount.saturating_add(amount); + amount + } + + /// Account for a fulfilled packet and update flow control + #[inline] + fn apply_fulfill(&mut self, source_amount: u64, destination_amount: u64) { + self.congestion_controller.fulfill(source_amount); + + self.receipt.in_flight_amount = self.receipt.in_flight_amount.saturating_sub(source_amount); + self.receipt.delivered_amount = self + .receipt + .delivered_amount + .saturating_add(destination_amount); + + self.last_fulfill_time = Instant::now(); + self.fulfilled_packets += 1; + } + + /// Account for a rejected packet and update flow control + #[inline] + fn apply_reject(&mut self, amount: u64, reject: &Reject) { + self.congestion_controller.reject(amount, reject); + + self.receipt.sent_amount = self.receipt.sent_amount.saturating_sub(amount); + self.receipt.in_flight_amount = self.receipt.in_flight_amount.saturating_sub(amount); + + self.rejected_packets += 1; + } + + /// Save the recipient's destination asset details for calculating minimum exchange rates + #[inline] + fn set_destination_asset_details(&mut self, asset_code: String, asset_scale: u8) { + self.receipt.destination_asset_code = Some(asset_code); + self.receipt.destination_asset_scale = Some(asset_scale); + } + + /// Return the current sequence number and increment the value for subsequent packets + #[inline] + fn next_sequence(&mut self) -> u64 { + let seq = self.sequence; + self.sequence += 1; + seq + } + + /// Amount of money fulfilled in source units + #[inline] + fn get_fulfilled_amount(&self) -> u64 { + self.receipt + .sent_amount + .saturating_sub(self.receipt.in_flight_amount) + } + + // Get remaining amount that must be fulfilled for the payment to complete + #[inline] + fn get_remaining_amount(&self) -> u64 { + self.receipt + .source_amount + .saturating_sub(self.get_fulfilled_amount()) + } + + /// Has the entire intended source amount been fulfilled by the recipient? + #[inline] + fn is_complete(&self) -> bool { + self.get_remaining_amount() == 0 + } + + /// Return the amount of money available to be sent in the payment (amount remaining minus in-flight) + #[inline] + fn get_amount_available_to_send(&self) -> u64 { + // Sent amount also includes the amount in-flight, which should be subtracted from the amount available + self.receipt + .source_amount + .saturating_sub(self.receipt.sent_amount) + } + + /// Is as much money as possible in-flight? + /// (If so, the intended source amount may be fulfilled or in-flight, or the congestion controller + /// has temporarily limited sending more money) + #[inline] + fn is_max_in_flight(&self) -> bool { + self.congestion_controller.get_max_amount() == 0 || self.get_amount_available_to_send() == 0 + } + + /// Given we've attempted sending enough packets, does our rejected packet rate indicate the payment is failing? + #[inline] + fn is_failing(&self) -> bool { + let num_packets = self.fulfilled_packets + self.rejected_packets; + num_packets >= FAIL_FAST_MINIMUM_PACKET_ATTEMPTS + && (self.rejected_packets as f64 / num_packets as f64) > FAIL_FAST_MINIMUM_FAILURE_RATE + } +} + +/// Send the given source amount with packetized Interledger payments using the STREAM transport protocol +/// Returns the receipt with sent & delivered amounts, asset & account details +pub async fn send_money( + service: I, from_account: &A, + store: S, destination_account: Address, shared_secret: &[u8], source_amount: u64, -) -> Result<(StreamDelivery, S), Error> + slippage: f64, +) -> Result where - S: IncomingService + Send + Sync + Clone + 'static, + I: IncomingService + Clone + Send + Sync + 'static, A: Account + Send + Sync + 'static, + S: ExchangeRateStore + Send + Sync + 'static, { + // TODO Can we avoid copying here? let shared_secret = Bytes::from(shared_secret); - let from_account = from_account.clone(); - // TODO can/should we avoid cloning the account? - let account_details = get_ildcp_info(&mut service.clone(), from_account.clone()) - .map_err(|_err| Error::ConnectionError("Unable to get ILDCP info: {:?}".to_string())) - .await?; - - let source_account = account_details.ilp_address(); - if source_account.scheme() != destination_account.scheme() { - warn!("Destination ILP address starts with a different scheme prefix (\"{}\') than ours (\"{}\'), this probably isn't going to work", - destination_account.scheme(), - source_account.scheme()); + + let from = from_account.ilp_address(); + if from.scheme() != destination_account.scheme() { + warn!( + "Destination ILP address starts with a different scheme prefix (\"{}\') than ours (\"{}\'), this probably won't work", + destination_account.scheme(), + from.scheme() + ); } - SendMoneyFuture { - state: SendMoneyFutureState::SendMoney, - next: Some(service), + let mut sender = StreamSender { + next: service, from_account: from_account.clone(), - source_account, - destination_account: destination_account.clone(), shared_secret, - source_amount, - // Try sending the full amount first - // TODO make this configurable -- in different scenarios you might prioritize - // sending as much as possible per packet vs getting money flowing ASAP differently - congestion_controller: CongestionController::new(source_amount, source_amount / 10, 2.0), - pending_requests: Cell::new(Vec::new()), - receipt: StreamDelivery { - from: from_account.ilp_address().clone(), - to: destination_account, - sent_amount: source_amount, - sent_asset_scale: from_account.asset_scale(), - sent_asset_code: from_account.asset_code().to_string(), - delivered_asset_scale: None, - delivered_asset_code: None, - delivered_amount: 0, - }, - should_send_source_account: true, - sequence: 1, - rejected_packets: 0, - error: None, - last_fulfill_time: Instant::now(), + store, + slippage, + payment: Arc::new(Mutex::new(StreamPayment { + // TODO Make configurable to get money flowing ASAP vs as much as possible per-packet + congestion_controller: CongestionController::new( + source_amount, + source_amount / 10, + 2.0, + ), + receipt: StreamDelivery::new(from_account, destination_account, source_amount), + should_send_source_account: true, + sequence: 1, + fulfilled_packets: 0, + rejected_packets: 0, + last_fulfill_time: Instant::now(), + })), + }; + + let mut pending_requests = FuturesUnordered::new(); + + /// Actions corresponding to the state of the payment + enum PaymentEvent { + /// Send more money: send a packet with the given amount + SendMoney(u64), + /// Congestion controller limited in-flight amount: wait for pending requests until given deadline + MaxInFlight(Instant), + /// Send full source amount: close the connection and return success + CloseConnection, + /// Maximum timeout since last fulfill has elapsed: terminate the payment + Timeout, + /// Too many packets are rejected, such as if the exchange rate is too low: terminate the payment + FailFast, + } + + loop { + let event = { + let mut payment = sender.payment.lock().await; + + if payment.last_fulfill_time.elapsed() >= MAX_TIME_SINCE_LAST_FULFILL { + PaymentEvent::Timeout + } else if payment.is_failing() { + PaymentEvent::FailFast + } else if payment.is_complete() { + PaymentEvent::CloseConnection + } else if payment.is_max_in_flight() { + let deadline = payment + .last_fulfill_time + .checked_add(MAX_TIME_SINCE_LAST_FULFILL) + .unwrap(); + PaymentEvent::MaxInFlight(deadline) + } else { + PaymentEvent::SendMoney(payment.apply_prepare()) + } + }; + + match event { + PaymentEvent::SendMoney(packet_amount) => { + let mut sender = sender.clone(); + pending_requests.push(tokio::spawn(async move { + sender.send_money_packet(packet_amount).await + })); + } + PaymentEvent::MaxInFlight(deadline) => { + // Wait for any request to complete, or if after reach deadline since last fulfill, + // run loop again, which should timeout the payment + let result = timeout_at(deadline, pending_requests.select_next_some()).await; + + if let Ok(Ok(Err(error))) = result { + error!("Send money stopped because of error: {:?}", error); + return Err(error); + } + } + PaymentEvent::CloseConnection => { + // Wait for all pending requests to complete before closing the connection + pending_requests.map(|_| ()).collect::<()>().await; + + // Try to the tell the recipient the connection is closed + sender.try_send_connection_close().await; + + // Return final receipt + let payment = sender.payment.lock().await; + debug!( + "Send money future finished. Delivered: {} ({} packets fulfilled, {} packets rejected)", + payment.receipt.delivered_amount, + payment.fulfilled_packets, + payment.rejected_packets, + ); + return Ok(payment.receipt.clone()); + } + PaymentEvent::Timeout => { + // Error if we haven't received a fulfill over a timeout period + return Err(Error::TimeoutError( + "Time since last fulfill exceeded the maximum time limit".to_string(), + )); + } + PaymentEvent::FailFast => { + let payment = sender.payment.lock().await; + return Err(Error::SendMoneyError( + format!("Terminating payment since too many packets are rejected ({} packets fulfilled, {} packets rejected)", + payment.fulfilled_packets, + payment.rejected_packets, + ))); + } + } } - .await } -#[pin_project] -/// Helper data type used to track a streaming payment -struct SendMoneyFuture, A: Account> { - /// The future's [state](./enum.SendMoneyFutureState.html) - state: SendMoneyFutureState, - /// The next service which will receive the Stream packet - next: Option, +/// Sends and handles all ILP & STREAM packets, encapsulating all payment state +#[derive(Clone)] +struct StreamSender { + /// Next service to send and forward Interledger packets to the network + next: I, /// The account sending the STREAM payment from_account: A, - /// The ILP Address of the account sending the payment - source_account: Address, - /// The ILP Address of the account receiving the payment - destination_account: Address, - /// The shared secret generated by the sender and the receiver + /// Symmetric secret generated by receiver to encrypt and authenticate this connections' packets shared_secret: Bytes, - /// The amount sent by the sender - source_amount: u64, - /// The [congestion controller](./../congestion/struct.CongestionController.html) for this stream - congestion_controller: CongestionController, - /// STREAM packets we have sent and have not received responses yet for - pending_requests: Cell>, - /// The [StreamDelivery](./struct.StreamDelivery.html) receipt of this stream - receipt: StreamDelivery, - /// Boolean indicating if the source account should also be sent to the receiver - should_send_source_account: bool, - /// The sequence number of this stream - sequence: u64, - /// The amount of rejected packets by the stream - rejected_packets: u64, - /// The STREAM error for this stream - error: Option, - /// The last time a packet was fulfilled for this stream. - last_fulfill_time: Instant, + /// Store for fetching and enforcing minimum exchange rates + store: S, + /// Maximum acceptable slippage percentage below calculated minimum exchange rate + slippage: f64, + /// Mutable payment state + payment: Arc>, } -struct PendingRequest { - sequence: u64, - amount: u64, - future: Pin + Send>>, -} - -/// The state of the send money future -#[derive(PartialEq)] -enum SendMoneyFutureState { - /// Initial state of the future - SendMoney = 0, - /// Once the stream has been finished, it transitions to this state and tries to send a - /// ConnectionCloseFrame - Closing, - /// The connection is now closed and the send_money function can return - Closed, -} - -#[project] -impl SendMoneyFuture +impl StreamSender where - S: IncomingService + Send + Sync + Clone + 'static, - A: Account + Send + Sync + 'static, + I: IncomingService, + A: Account, + S: ExchangeRateStore, { - /// Fire off requests until the congestion controller tells us to stop or we've sent the total amount or maximum time since last fulfill has elapsed - fn try_send_money(&mut self) -> Result { - let mut sent_packets = false; - loop { - let amount = min( - *self.source_amount, - self.congestion_controller.get_max_amount(), - ); - if amount == 0 { - break; - } - *self.source_amount -= amount; + /// Send a Prepare for the given source amount and apply the resulting Fulfill or Reject + #[inline] + pub async fn send_money_packet(&mut self, source_amount: u64) -> Result<(), Error> { + let (prepare, sequence, min_destination_amount) = { + let mut payment = self.payment.lock().await; + + // Build the STREAM packet + + let sequence = payment.next_sequence(); - // Load up the STREAM packet - let sequence = self.next_sequence(); let mut frames = vec![Frame::StreamMoney(StreamMoneyFrame { stream_id: 1, shares: 1, })]; - if *self.should_send_source_account { + + if payment.should_send_source_account { frames.push(Frame::ConnectionNewAddress(ConnectionNewAddressFrame { - source_account: self.source_account.clone(), + source_account: payment.receipt.from.clone(), })); } - let stream_packet = StreamPacketBuilder { + + let min_destination_amount = get_min_destination_amount( + &self.store, + source_amount, + payment.receipt.source_asset_scale, + &payment.receipt.source_asset_code, + payment.receipt.destination_asset_scale, + payment + .receipt + .destination_asset_code + .as_ref() + .map(String::as_str), + self.slippage, + ) + .unwrap_or(0); // Default to 0 if unable to calculate rate + + let stream_request_packet = StreamPacketBuilder { ilp_packet_type: IlpPacketType::Prepare, - // TODO enforce min exchange rate - prepare_amount: 0, + prepare_amount: min_destination_amount, sequence, frames: &frames, } .build(); - // Create the ILP Prepare packet debug!( "Sending packet {} with amount: {} and encrypted STREAM packet: {:?}", - sequence, amount, stream_packet + sequence, source_amount, stream_request_packet ); - let data = stream_packet.into_encrypted(&self.shared_secret); - let execution_condition = generate_condition(&self.shared_secret, &data); + + let prepare_data = stream_request_packet.into_encrypted(&self.shared_secret); + + // If we couldn't calculate a minimum destination amount (e.g. don't know asset details yet), + // packet MUST be unfulfillable so no money is at risk + let execution_condition = if min_destination_amount > 0 { + generate_condition(&self.shared_secret, &prepare_data) + } else { + random_condition() + }; + + // Build the Prepare let prepare = PrepareBuilder { - destination: self.destination_account.clone(), - amount, + destination: payment.receipt.to.clone(), + amount: source_amount, execution_condition: &execution_condition, expires_at: SystemTime::now() + Duration::from_secs(30), - // TODO don't copy the data - data: &data[..], + // TODO Don't copy the data + data: &prepare_data[..], } .build(); - // Send it! - self.congestion_controller.prepare(amount); - if let Some(ref next) = self.next { - let mut next = next.clone(); - let from = self.from_account.clone(); - let request = Box::pin(async move { - next.handle_request(IncomingRequest { from, prepare }).await - }); - self.pending_requests.get_mut().push(PendingRequest { - sequence, - amount, - future: request, - }); - sent_packets = true; - } else { - panic!("Polled after finish"); - } - } + (prepare, sequence, min_destination_amount) + }; - Ok(sent_packets) - } + // Send it! + let reply = self + .next + .handle_request(IncomingRequest { + from: self.from_account.clone(), + prepare, + }) + .await; + + let (packet_type, reply_data) = match &reply { + Ok(fulfill) => (IlpPacketType::Fulfill, fulfill.data()), + Err(reject) => (IlpPacketType::Reject, reject.data()), + }; - /// Sends a STREAM inside a Prepare packet with a ConnectionClose frame to the peer - fn try_send_connection_close(&mut self) -> Result<(), Error> { - let sequence = self.next_sequence(); - let stream_packet = StreamPacketBuilder { - ilp_packet_type: IlpPacketType::Prepare, - prepare_amount: 0, - sequence, - frames: &[Frame::ConnectionClose(ConnectionCloseFrame { - code: ErrorCode::NoError, - message: "", - })], - } - .build(); - // Create the ILP Prepare packet - let data = stream_packet.into_encrypted(&self.shared_secret); - let prepare = PrepareBuilder { - destination: self.destination_account.clone(), - amount: 0, - execution_condition: &random_condition(), - expires_at: SystemTime::now() + Duration::from_secs(30), - data: &data[..], - } - .build(); + let stream_reply_packet = + StreamPacket::from_encrypted(&self.shared_secret, BytesMut::from(reply_data)); - // Send it! - debug!("Closing connection"); - if let Some(ref next) = self.next { - let mut next = next.clone(); - let from = self.from_account.clone(); - let request = - Box::pin( - async move { next.handle_request(IncomingRequest { from, prepare }).await }, - ); - self.pending_requests.get_mut().push(PendingRequest { - sequence, - amount: 0, - future: request, - }); - } else { - panic!("Polled after finish"); - } - Ok(()) - } + let mut payment = self.payment.lock().await; - fn poll_pending_requests(&mut self, cx: &mut Context<'_>) -> Poll> { - let pending_requests = self.pending_requests.take(); - let pending_requests = pending_requests - .into_iter() - .filter_map( - |mut pending_request| match pending_request.future.as_mut().poll(cx) { - Poll::Pending => Some(pending_request), - Poll::Ready(result) => { - match result { - Ok(fulfill) => { - self.handle_fulfill( - pending_request.sequence, - pending_request.amount, - fulfill, - ); - } - Err(reject) => { - self.handle_reject( - pending_request.sequence, - pending_request.amount, - reject, + // Parse the stream packet and determine the amount the recipient claims they received + let claimed_amount: u64 = match stream_reply_packet { + Ok(stream_reply_packet) => { + if stream_reply_packet.sequence() != sequence { + warn!( + "Discarding replayed STREAM packet (expected sequence {}, but received {})", + sequence, + stream_reply_packet.sequence() + ); + 0 + } else if stream_reply_packet.ilp_packet_type() == IlpPacketType::Reject + && packet_type == IlpPacketType::Fulfill + { + // If receiver claimed they sent a Reject but we got a Fulfill, they lied! + // If receiver said they sent a Fulfill but we got a Reject, that's possible + warn!("Discarding STREAM packet (received Fulfill, but recipient said they sent a Reject)"); + 0 + } else { + // Since we decrypted the response, the recipient read the request packet and knows our account + payment.should_send_source_account = false; + + // Update the destination asset scale & code + // https://github.com/interledger/rfcs/pull/551 ensures that this won't change + if payment.receipt.destination_asset_scale.is_none() { + for frame in stream_reply_packet.frames() { + if let Frame::ConnectionAssetDetails(frame) = frame { + let asset_code = frame.source_asset_code.to_string(); + let asset_scale = frame.source_asset_scale; + debug!( + "Setting remote asset details ({} with scale {})", + asset_code, asset_scale ); + payment.set_destination_asset_details(asset_code, asset_scale); } - }; - None - } - }, - ) - .collect(); - self.pending_requests.set(pending_requests); - - if let Some(error) = self.error.take() { - error!("Send money stopped because of error: {:?}", error); - Poll::Ready(Err(error)) - } else if self.pending_requests.get_mut().is_empty() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - /// Parses the provided Fulfill packet. - /// 1. Logs the fulfill in the congestion controller - /// 1. Updates the last fulfill time of the send money future - /// 1. It tries to aprse a Stream Packet inside the fulfill packet's data field - /// If successful, it increments the delivered amount by the Stream packet's prepare amount - fn handle_fulfill(&mut self, sequence: u64, amount: u64, fulfill: Fulfill) { - // TODO should we check the fulfillment and expiry or can we assume the plugin does that? - self.congestion_controller.fulfill(amount); - *self.should_send_source_account = false; - *self.last_fulfill_time = Instant::now(); - - if let Ok(packet) = StreamPacket::from_encrypted(&self.shared_secret, fulfill.into_data()) { - if packet.ilp_packet_type() == IlpPacketType::Fulfill { - // TODO check that the sequence matches our outgoing packet - - // Update the asset scale & asset code via the received - // frame. https://github.com/interledger/rfcs/pull/551 - // ensures that this won't change, so we only need to - // perform this loop once. - if self.receipt.delivered_asset_scale.is_none() { - for frame in packet.frames() { - if let Frame::ConnectionAssetDetails(frame) = frame { - self.receipt.delivered_asset_scale = Some(frame.source_asset_scale); - self.receipt.delivered_asset_code = - Some(frame.source_asset_code.to_string()); } } - } - self.receipt - .increment_delivered_amount(packet.prepare_amount()); - } - } else { - warn!( - "Unable to parse STREAM packet from fulfill data for sequence {}", - sequence - ); - } - debug!( - "Prepare {} with amount {} was fulfilled ({} left to send)", - sequence, amount, self.source_amount - ); - } - - /// Parses the provided Reject packet. - /// 1. Increases the source-amount which was deducted at the start of the [send_money](./fn.send_money.html) loop - /// 1. Logs the reject in the congestion controller - /// 1. Increments the rejected packets counter - /// 1. If the receipt's `delivered_asset` fields are not populated, it tries to parse - /// a Stream Packet inside the reject packet's data field to check if - /// there is a [`ConnectionAssetDetailsFrame`](./../packet/struct.ConnectionAssetDetailsFrame.html) frame. - /// If one is found, then it updates the receipt's `delivered_asset_scale` and `delivered_asset_code` - /// to them. - fn handle_reject(&mut self, sequence: u64, amount: u64, reject: Reject) { - *self.source_amount += amount; - self.congestion_controller.reject(amount, &reject); - *self.rejected_packets += 1; - debug!( - "Prepare {} with amount {} was rejected with code: {} ({} left to send)", - sequence, - amount, - reject.code(), - self.source_amount - ); - - // if we receive a reject, try to update our asset code/scale - // if it was not populated before - if self.receipt.delivered_asset_scale.is_none() - || self.receipt.delivered_asset_code.is_none() - { - if let Ok(packet) = - StreamPacket::from_encrypted(&self.shared_secret, BytesMut::from(reject.data())) - { - for frame in packet.frames() { - if let Frame::ConnectionAssetDetails(frame) = frame { - self.receipt.delivered_asset_scale = Some(frame.source_asset_scale); - self.receipt.delivered_asset_code = - Some(frame.source_asset_code.to_string()); - } + stream_reply_packet.prepare_amount() } - } else { + } + Err(_) => { warn!( - "Unable to parse STREAM packet from reject data for sequence {}", + "Unable to parse STREAM packet from response data for sequence {}", sequence ); + 0 } - } + }; - match (reject.code().class(), reject.code()) { - (ErrorClass::Temporary, _) => {} - (_, IlpErrorCode::F08_AMOUNT_TOO_LARGE) => { - // Handled by the congestion controller - } - (_, IlpErrorCode::F99_APPLICATION_ERROR) => { - // TODO handle STREAM errors + match reply { + // Handle ILP Fulfill + Ok(_) => { + // Delivered amount must be *at least* the minimum acceptable amount we told the receiver + // Even if the data was invalid, since it was fulfilled, we must assume they got at least the minimum + let delivered_amount = max(min_destination_amount, claimed_amount); + + payment.apply_fulfill(source_amount, delivered_amount); + + debug!( + "Prepare {} with amount {} was fulfilled ({} left to send)", + sequence, + source_amount, + payment.get_remaining_amount() + ); + + Ok(()) } - _ => { - *self.error = Some(Error::SendMoneyError(format!( - "Packet was rejected with error: {} {}", + // Handle ILP Reject + Err(reject) => { + payment.apply_reject(source_amount, &reject); + + debug!( + "Prepare {} with amount {} was rejected with code: {} ({} left to send)", + sequence, + source_amount, reject.code(), - str::from_utf8(reject.message()).unwrap_or_default(), - ))); + payment.get_remaining_amount() + ); + + match (reject.code().class(), reject.code()) { + (ErrorClass::Temporary, _) => Ok(()), + (_, IlpErrorCode::F08_AMOUNT_TOO_LARGE) => Ok(()), // Handled by the congestion controller + (_, IlpErrorCode::F99_APPLICATION_ERROR) => Ok(()), + // Any other error will stop the rest of the payment + _ => Err(Error::SendMoneyError(format!( + "Packet was rejected with error: {} {}", + reject.code(), + str::from_utf8(reject.message()).unwrap_or_default(), + ))), + } } } } - /// Increments the stream's sequence number and returns the updated value - fn next_sequence(&mut self) -> u64 { - let seq = *self.sequence; - *self.sequence += 1; - seq - } -} + /// Send an unfulfillable Prepare with a ConnectionClose frame to the peer + /// There's no ACK from the recipient, so we can't confirm it closed + #[inline] + async fn try_send_connection_close(&mut self) { + let prepare = { + let mut payment = self.payment.lock().await; + let sequence = payment.next_sequence(); -impl Future for SendMoneyFuture -where - S: IncomingService + Send + Sync + Clone + 'static, - A: Account + Send + Sync + 'static, -{ - type Output = Result<(StreamDelivery, S), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // TODO maybe don't have loops here and in try_send_money - let mut this = self.project(); - - loop { - ready!(this.poll_pending_requests(cx)?); - if this.last_fulfill_time.elapsed() >= MAX_TIME_SINCE_LAST_FULFILL { - return Poll::Ready(Err(Error::TimeoutError(format!( - "Time since last fulfill exceeded the maximum time limit of {:?} secs", - this.last_fulfill_time.elapsed().as_secs() - )))); + let stream_packet = StreamPacketBuilder { + ilp_packet_type: IlpPacketType::Prepare, + prepare_amount: 0, + sequence, + frames: &[Frame::ConnectionClose(ConnectionCloseFrame { + code: ErrorCode::NoError, + message: "", + })], } + .build(); - if *this.source_amount == 0 && this.pending_requests.get_mut().is_empty() { - if *this.state == SendMoneyFutureState::SendMoney { - *this.state = SendMoneyFutureState::Closing; - this.try_send_connection_close()?; - } else { - *this.state = SendMoneyFutureState::Closed; - debug!( - "Send money future finished. Delivered: {} ({} packets fulfilled, {} packets rejected)", this.receipt.delivered_amount, *this.sequence - 1, this.rejected_packets, - ); - return Poll::Ready(Ok((this.receipt.clone(), this.next.take().unwrap()))); - } - } else if !this.try_send_money()? { - return Poll::Pending; + // Create the ILP Prepare packet + let data = stream_packet.into_encrypted(&self.shared_secret); + PrepareBuilder { + destination: payment.receipt.to.clone(), + amount: 0, + execution_condition: &random_condition(), + expires_at: SystemTime::now() + Duration::from_secs(30), + data: &data[..], } - } + .build() + }; + + // Send it! + // Packet will always be rejected since the condition is random + debug!("Closing connection"); + self.next + .handle_request(IncomingRequest { + from: self.from_account.clone(), + prepare, + }) + .await + .ok(); } } +// TODO Abstract duplicated conversion logic from interledger-settlement & +// exchange rate service into interledger-rates + +/// Convert the given source amount into a destination amount, pulling from a provider's exchange rates +/// and subtracting slippage to determine a minimum destination amount. +/// Returns None if destination asset details are unknown or rate cannot be calculated. +#[inline] +fn get_min_destination_amount( + store: &S, + source_amount: u64, + source_scale: u8, + source_code: &str, + dest_scale: Option, + dest_code: Option<&str>, + slippage: f64, +) -> Option { + let dest_code = dest_code?; + let dest_scale = dest_scale?; + + // Fetch the exchange rate + let rate: BigRational = if source_code == dest_code { + BigRational::one() + } else if let Ok(prices) = store.get_exchange_rates(&[&source_code, &dest_code]) { + BigRational::from_f64(prices[0])? / BigRational::from_f64(prices[1])? + } else { + return None; + }; + + // Subtract slippage from rate + let slippage = BigRational::from_f64(slippage)?; + let rate = rate * (BigRational::one() - slippage); + + // First, convert scaled source amount to base unit + let mut source_amount = BigRational::from_u64(source_amount)?; + source_amount /= pow(BigRational::from_u64(10)?, source_scale as usize); + + // Apply exchange rate + let mut dest_amount = source_amount * rate; + + // Convert destination amount in base units to scaled units + dest_amount *= pow(BigRational::from_u64(10)?, dest_scale as usize); + + // For safety, always round up + dest_amount = dest_amount.ceil(); + + Some(dest_amount.to_integer().to_u64()?) +} + #[cfg(test)] mod send_money_tests { use super::*; - use crate::test_helpers::{TestAccount, EXAMPLE_CONNECTOR}; - use interledger_ildcp::IldcpService; + use crate::test_helpers::{TestAccount, TestStore, EXAMPLE_CONNECTOR}; + use async_trait::async_trait; use interledger_packet::{ErrorCode as IlpErrorCode, RejectBuilder}; use interledger_service::incoming_service_fn; + use interledger_service_util::MaxPacketAmountService; use parking_lot::Mutex; use std::str::FromStr; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + use tokio::time::timeout; use uuid::Uuid; #[tokio::test] @@ -509,11 +666,12 @@ mod send_money_tests { asset_code: "XYZ".to_string(), asset_scale: 9, ilp_address: Address::from_str("example.destination").unwrap(), + max_packet_amount: None, }; let requests = Arc::new(Mutex::new(Vec::new())); let requests_clone = requests.clone(); let result = send_money( - IldcpService::new(incoming_service_fn(move |request| { + incoming_service_fn(move |request| { requests_clone.lock().push(request); Err(RejectBuilder { code: IlpErrorCode::F00_BAD_REQUEST, @@ -522,14 +680,284 @@ mod send_money_tests { data: &[], } .build()) - })), + }), &account, + TestStore { + route: None, + price_1: None, + price_2: None, + }, Address::from_str("example.destination").unwrap(), &[0; 32][..], 100, + 0.0, ) .await; assert!(result.is_err()); assert_eq!(requests.lock().len(), 1); } + + #[tokio::test] + async fn sends_concurrent_packets() { + let destination_address = Address::from_str("example.receiver").unwrap(); + let account = TestAccount { + id: Uuid::new_v4(), + asset_code: "XYZ".to_string(), + asset_scale: 9, + ilp_address: destination_address.clone(), + max_packet_amount: Some(10), + }; + let store = TestStore { + route: Some((destination_address.to_string(), account)), + price_1: None, + price_2: None, + }; + + #[derive(Clone)] + struct CounterService { + pub num_requests_in_flight: Arc, + } + + impl CounterService { + pub fn new(num_requests_in_flight: Arc) -> Self { + CounterService { + num_requests_in_flight, + } + } + } + + #[async_trait] + impl IncomingService for CounterService + where + A: Account + 'static, + { + async fn handle_request(&mut self, _: IncomingRequest) -> IlpResult { + self.num_requests_in_flight.fetch_add(1, Ordering::Relaxed); + + // Wait for 100ms while all requests are received, then reject with final error to terminate stream + timeout( + Duration::from_millis(100), + futures::future::pending::(), + ) + .await + .unwrap_or_else(|_| { + Err(RejectBuilder { + code: IlpErrorCode::F00_BAD_REQUEST, + message: b"some final error", + triggered_by: Some(&EXAMPLE_CONNECTOR), + data: &[], + } + .build()) + }) + } + } + + let num_requests_in_flight = Arc::new(AtomicUsize::new(0)); + let counter_service = CounterService::new(num_requests_in_flight.clone()); + + let result = send_money( + MaxPacketAmountService::new(store, counter_service), + &TestAccount { + id: Uuid::new_v4(), + asset_code: "XYZ".to_string(), + asset_scale: 9, + ilp_address: destination_address.clone(), + max_packet_amount: Some(10), // Requires at least 5 packets + }, + TestStore { + route: None, + price_1: None, + price_2: None, + }, + destination_address.clone(), + &[0; 32][..], + 50, + 0.0, + ) + .await; + + assert!(result.is_err()); + assert_eq!(num_requests_in_flight.load(Ordering::Relaxed), 5); + } + + #[tokio::test] + async fn computes_min_destination_amount() { + struct TestData<'a> { + name: &'a str, + price_1: Option, + price_2: Option, + source_amount: u64, + source_scale: u8, + source_code: &'a str, + dest_scale: Option, + dest_code: Option<&'a str>, + slippage: f64, + expected_result: Option, + } + + let test_data = vec![ + TestData { + name: "Fails if rate is unavailable", + price_1: None, + price_2: Some(3.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(6), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: None, + }, + TestData { + name: "Fails if destination asset code is unavailable", + price_1: Some(1.9), + price_2: Some(3.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(6), + dest_code: None, + slippage: 0.0, + expected_result: None, + }, + TestData { + name: "Fails if destination asset code is unavailable", + price_1: Some(1.9), + price_2: Some(3.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: None, + dest_code: Some("ABC"), + slippage: 0.0, + expected_result: None, + }, + TestData { + name: "Computes result when amount gets larger", + price_1: Some(6.0), + price_2: Some(1.5), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(2), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(400), + }, + TestData { + name: "Computes result when amount gets smaller", + price_1: Some(1.5), + price_2: Some(6.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(2), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(25), + }, + TestData { + name: "Converts from small to large scale", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 33, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(6), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(330_000), + }, + TestData { + name: "Converts from large to small scale", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 123_456_000_000, + source_scale: 9, + source_code: "ABC", + dest_scale: Some(4), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(1_234_560), + }, + TestData { + name: "Subtracts slippage in simple case", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(2), + dest_code: Some("XYZ"), + slippage: 0.01, + expected_result: Some(99), + }, + TestData { + name: "Rounds up after subtracting slippage", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 100, + source_scale: 2, + source_code: "ABC", + dest_scale: Some(2), + dest_code: Some("XYZ"), + slippage: 0.035, + expected_result: Some(97), + }, + TestData { + name: "Rounds up even when destination amount is very close to 0", + price_1: Some(0.000_000_5), + price_2: Some(1.0), + source_amount: 100, + source_scale: 0, + source_code: "ABC", + dest_scale: Some(0), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(1), + }, + TestData { + // f64 multiplication errors would cause this to be 101 after rounding up, big rationals fix this + name: "No floating point errors", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 100, + source_scale: 9, + source_code: "ABC", + dest_scale: Some(9), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(100), + }, + TestData { + name: "Converts when using the largest possible scale", + price_1: Some(1.0), + price_2: Some(1.0), + source_amount: 421, + source_scale: 255, + source_code: "ABC", + dest_scale: Some(255), + dest_code: Some("XYZ"), + slippage: 0.0, + expected_result: Some(421), + }, + ]; + + for t in &test_data { + let dest_amount = get_min_destination_amount( + &TestStore { + route: None, + price_1: t.price_1, + price_2: t.price_2, + }, + t.source_amount, + t.source_scale, + t.source_code, + t.dest_scale, + t.dest_code, + t.slippage, + ); + assert_eq!(dest_amount, t.expected_result, "{}", t.name); + } + } } diff --git a/crates/interledger-stream/src/congestion.rs b/crates/interledger-stream/src/congestion.rs index 792103811..30483de0e 100644 --- a/crates/interledger-stream/src/congestion.rs +++ b/crates/interledger-stream/src/congestion.rs @@ -62,8 +62,8 @@ impl CongestionController { } } - /// The maximum amount which can be sent is the maximum amount in flight minus the current amount in flight - pub fn get_max_amount(&mut self) -> u64 { + /// The maximum amount availble to be sent is the maximum amount in flight minus the current amount in flight + pub fn get_max_amount(&self) -> u64 { if self.amount_in_flight > self.max_in_flight { return 0; } diff --git a/crates/interledger-stream/src/crypto.rs b/crates/interledger-stream/src/crypto.rs index 366250095..668e01dca 100644 --- a/crates/interledger-stream/src/crypto.rs +++ b/crates/interledger-stream/src/crypto.rs @@ -133,7 +133,7 @@ fn encrypt_with_nonce( /// The nonce and auth tag are extracted from the first 12 and 16 bytes /// of the ciphertext. pub fn decrypt(shared_secret: &[u8], mut ciphertext: BytesMut) -> Result { - // ciphertext must include at least a nonce and tag, + // ciphertext must include at least a nonce and tag if ciphertext.len() < AUTH_TAG_LENGTH { return Err(()); } diff --git a/crates/interledger-stream/src/lib.rs b/crates/interledger-stream/src/lib.rs index 554933ae7..6719a1b65 100644 --- a/crates/interledger-stream/src/lib.rs +++ b/crates/interledger-stream/src/lib.rs @@ -28,10 +28,12 @@ pub mod test_helpers { use super::*; use async_trait::async_trait; use futures::channel::mpsc::UnboundedSender; - use interledger_errors::{AccountStoreError, AddressStoreError}; + use interledger_errors::{AccountStoreError, AddressStoreError, ExchangeRateStoreError}; use interledger_packet::Address; + use interledger_rates::ExchangeRateStore; use interledger_router::RouterStore; use interledger_service::{Account, AccountStore, AddressStore, Username}; + use interledger_service_util::MaxPacketAmountAccount; use once_cell::sync::Lazy; use std::collections::HashMap; use std::iter::FromIterator; @@ -51,6 +53,7 @@ pub mod test_helpers { pub ilp_address: Address, pub asset_scale: u8, pub asset_code: String, + pub max_packet_amount: Option, } impl Account for TestAccount { @@ -75,6 +78,12 @@ pub mod test_helpers { } } + impl MaxPacketAmountAccount for TestAccount { + fn max_packet_amount(&self) -> u64 { + self.max_packet_amount.unwrap_or(std::u64::MAX) + } + } + #[derive(Clone)] pub struct DummyStore; @@ -93,7 +102,9 @@ pub mod test_helpers { #[derive(Clone)] pub struct TestStore { - pub route: (String, TestAccount), + pub route: Option<(String, TestAccount)>, + pub price_1: Option, + pub price_2: Option, } #[async_trait] @@ -104,7 +115,7 @@ pub mod test_helpers { &self, _account_ids: Vec, ) -> Result, AccountStoreError> { - Ok(vec![self.route.1.clone()]) + Ok(vec![self.route.clone().unwrap().1]) } // stub implementation (not used in these tests) @@ -119,7 +130,11 @@ pub mod test_helpers { impl RouterStore for TestStore { fn routing_table(&self) -> Arc> { Arc::new(HashMap::from_iter( - vec![(self.route.0.clone(), self.route.1.id())].into_iter(), + vec![( + self.route.clone().unwrap().0, + self.route.clone().unwrap().1.id(), + )] + .into_iter(), )) } } @@ -140,6 +155,30 @@ pub mod test_helpers { Address::from_str("example.connector").unwrap() } } + + #[async_trait] + impl ExchangeRateStore for TestStore { + fn get_exchange_rates(&self, codes: &[&str]) -> Result, ExchangeRateStoreError> { + match (self.price_1, self.price_2) { + (Some(price_1), Some(price_2)) => Ok(vec![price_1, price_2]), + _ => Err(ExchangeRateStoreError::PairNotFound { + from: codes[0].to_string(), + to: codes[1].to_string(), + }), + } + } + + fn set_exchange_rates( + &self, + _rates: HashMap, + ) -> Result<(), ExchangeRateStoreError> { + unimplemented!("Cannot set exchange rates") + } + + fn get_all_exchange_rates(&self) -> Result, ExchangeRateStoreError> { + unimplemented!("Cannot get all exchange rates") + } + } } #[cfg(test)] @@ -147,11 +186,11 @@ mod send_money_to_receiver { use super::test_helpers::*; use super::*; use bytes::Bytes; - use interledger_ildcp::IldcpService; use interledger_packet::Address; use interledger_packet::{ErrorCode, RejectBuilder}; use interledger_router::Router; use interledger_service::outgoing_service_fn; + use interledger_service_util::ExchangeRateService; use std::str::FromStr; use uuid::Uuid; @@ -164,9 +203,12 @@ mod send_money_to_receiver { ilp_address: destination_address.clone(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }; let store = TestStore { - route: (destination_address.to_string(), account), + route: Some((destination_address.to_string(), account)), + price_1: None, + price_2: None, }; let connection_generator = ConnectionGenerator::new(server_secret.clone()); let server = StreamReceiverService::new( @@ -183,27 +225,100 @@ mod send_money_to_receiver { }), ); let server = Router::new(store, server); - let server = IldcpService::new(server); let (destination_account, shared_secret) = connection_generator.generate_address_and_secret(&destination_address); let destination_address = Address::from_str("example.receiver").unwrap(); - let (receipt, _service) = send_money( + let receipt = send_money( server, &test_helpers::TestAccount { id: Uuid::new_v4(), asset_code: "XYZ".to_string(), asset_scale: 9, ilp_address: destination_address, + max_packet_amount: None, + }, + TestStore { + route: None, + price_1: None, + price_2: None, }, destination_account, &shared_secret[..], 100, + 0.0, ) .await .unwrap(); assert_eq!(receipt.delivered_amount, 100); } + + #[tokio::test] + async fn payment_fails_if_large_spread() { + let server_secret = Bytes::from(&[0; 32][..]); + let source_address = Address::from_str("example.sender").unwrap(); + let destination_address = Address::from_str("example.receiver").unwrap(); + + let sender_account = TestAccount { + id: Uuid::new_v4(), + ilp_address: source_address.clone(), + asset_code: "XYZ".to_string(), + asset_scale: 6, + max_packet_amount: None, + }; + + let recipient_account = TestAccount { + id: Uuid::new_v4(), + ilp_address: destination_address.clone(), + asset_code: "ABC".to_string(), + asset_scale: 9, + max_packet_amount: None, + }; + + let store = TestStore { + route: Some((destination_address.to_string(), recipient_account)), + price_1: Some(1.0), + price_2: Some(1.0), + }; + + let connection_generator = ConnectionGenerator::new(server_secret.clone()); + let server = StreamReceiverService::new( + server_secret, + DummyStore, + outgoing_service_fn(|_| { + Err(RejectBuilder { + code: ErrorCode::F02_UNREACHABLE, + message: b"No other outgoing handler", + triggered_by: Some(&EXAMPLE_RECEIVER), + data: &[], + } + .build()) + }), + ); + + let server = ExchangeRateService::new(0.02, store.clone(), server); + let server = Router::new(store.clone(), server); + + let (destination_account, shared_secret) = + connection_generator.generate_address_and_secret(&destination_address); + + let result = send_money( + server, + &sender_account, + store, + destination_account, + &shared_secret[..], + 1000, + 0.014, + ) + .await; + + // Connector takes 2% spread, but we're only willing to tolerate 1.4% + match result { + Err(Error::SendMoneyError(_)) => {} + _ => panic!("Payment should fail fast due to poor exchange rates"), + } + } } diff --git a/crates/interledger-stream/src/server.rs b/crates/interledger-stream/src/server.rs index 0ba842bee..cf56a337c 100644 --- a/crates/interledger-stream/src/server.rs +++ b/crates/interledger-stream/src/server.rs @@ -571,12 +571,14 @@ mod stream_receiver_service { ilp_address: Address::from_str("example.sender").unwrap(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, to: TestAccount { id: Uuid::new_v4(), ilp_address: ilp_address.clone(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, original_amount: prepare.amount(), prepare, @@ -631,12 +633,14 @@ mod stream_receiver_service { ilp_address: Address::from_str("example.sender").unwrap(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, to: TestAccount { id: Uuid::new_v4(), ilp_address: ilp_address.clone(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, original_amount: prepare.amount(), prepare, @@ -689,6 +693,7 @@ mod stream_receiver_service { ilp_address: Address::from_str("example.sender").unwrap(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, original_amount: prepare.amount(), to: TestAccount { @@ -696,6 +701,7 @@ mod stream_receiver_service { ilp_address: ilp_address.clone(), asset_code: "XYZ".to_string(), asset_scale: 9, + max_packet_amount: None, }, prepare, }) diff --git a/docs/api.yml b/docs/api.yml index 00d74e62b..a479c7a6a 100644 --- a/docs/api.yml +++ b/docs/api.yml @@ -1,38 +1,38 @@ -openapi: '3.0.2' +openapi: "3.0.2" info: title: Interledger-rs API Specification - version: '1.0' + version: "1.0" servers: - url: https://rs3.xpring.dev tags: -- name: admins - description: Secured Admin-only calls -- name: users - description: Operations available only to authenticated users + - name: admins + description: Secured Admin-only calls + - name: users + description: Operations available only to authenticated users paths: # Health Check /: get: summary: Node health check responses: - '200': + "200": description: The node's information content: application/json: - schema: - $ref: '#/components/schemas/NodeInformation' + schema: + $ref: "#/components/schemas/NodeInformation" # Default SPSP Account /.well_known/pay: get: summary: The default SPSP account used on the node. This endpoint is only enabled if the node is run with the configuration option ILP_DEFAULT_SPSP_ACCOUNT. The SPSP spec can be found at https://interledger.org/rfcs/0009-simple-payment-setup-protocol/ responses: - '200': + "200": description: The node's SPSP information content: application/json: - schema: - $ref: '#/components/schemas/SpSpInformation' + schema: + $ref: "#/components/schemas/SpSpInformation" # Adjust tracing level /tracing-level: put: @@ -55,7 +55,7 @@ paths: type: string example: "interledger=trace" responses: - '200': + "200": description: The new log level applied on the node content: text/plain: @@ -74,14 +74,14 @@ paths: required: true description: Bearer token with the administrator's authorization responses: - '200': + "200": description: Accounts on the node content: application/json: - schema: - type: array - items: - $ref: '#/components/schemas/Account' + schema: + type: array + items: + $ref: "#/components/schemas/Account" post: summary: Adds a new user on the node tags: @@ -98,14 +98,14 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/AccountDetails' + $ref: "#/components/schemas/AccountDetails" responses: - '200': + "200": description: The inserted account (an ID is auto-generated by the node) content: application/json: - schema: - $ref: '#/components/schemas/Account' + schema: + $ref: "#/components/schemas/Account" /accounts/{username}: parameters: @@ -128,12 +128,12 @@ paths: required: true description: Bearer token with the account's or administrator's authorization responses: - '200': + "200": description: The requested account's information content: application/json: - schema: - $ref: '#/components/schemas/Account' + schema: + $ref: "#/components/schemas/Account" put: summary: Edit an account's information. This is an administrator-only call which allows modification of any field of the account, not to be confused with the /accounts/:username/settings endpoint which allows a user to edit some of their authorization related data. parameters: @@ -146,18 +146,18 @@ paths: tags: - admins requestBody: - description: The new details for the specified account - content: - application/json: - schema: - $ref: '#/components/schemas/AccountDetails' + description: The new details for the specified account + content: + application/json: + schema: + $ref: "#/components/schemas/AccountDetails" responses: - '200': + "200": description: The updated account's information content: application/json: - schema: - $ref: '#/components/schemas/Account' + schema: + $ref: "#/components/schemas/Account" delete: summary: Delete an account tags: @@ -170,12 +170,12 @@ paths: required: true description: Bearer token with administrator's authorization responses: - '200': + "200": description: The deleted account's information content: application/json: - schema: - $ref: '#/components/schemas/Account' + schema: + $ref: "#/components/schemas/Account" /accounts/{username}/settings: parameters: - in: path @@ -197,18 +197,18 @@ paths: required: true description: Bearer token with the account's or administrator's authorization requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/AccountDetails' - description: The new account details for the specified username + content: + application/json: + schema: + $ref: "#/components/schemas/AccountDetails" + description: The new account details for the specified username responses: - '200': + "200": description: The updated account's information content: application/json: - schema: - $ref: '#/components/schemas/AccountSettings' + schema: + $ref: "#/components/schemas/AccountSettings" /accounts/{username}/balance: parameters: @@ -231,12 +231,12 @@ paths: required: true description: Bearer token with the account's or administrator's authorization responses: - '200': + "200": description: The account's balance content: application/json: - schema: - $ref: '#/components/schemas/Balance' + schema: + $ref: "#/components/schemas/Balance" /accounts/{username}/spsp: parameters: @@ -249,12 +249,12 @@ paths: get: summary: Get an account's SPSP information responses: - '200': + "200": description: The account's Spsp information content: application/json: - schema: - $ref: '#/components/schemas/SpSpInformation' + schema: + $ref: "#/components/schemas/SpSpInformation" /accounts/{username}/payments: parameters: @@ -276,18 +276,18 @@ paths: required: true description: Bearer token with the account's authorization requestBody: - description: The receiver's address and amount to be sent - content: - application/json: - schema: - $ref: '#/components/schemas/PaymentRequest' + description: The receiver's address and amount to be sent + content: + application/json: + schema: + $ref: "#/components/schemas/PaymentRequest" responses: - '200': + "200": description: The receipt of delivery to the receiver content: application/json: - schema: - $ref: '#/components/schemas/PaymentResponse' + schema: + $ref: "#/components/schemas/PaymentResponse" /accounts/{username}/ilp: parameters: @@ -309,12 +309,12 @@ paths: required: true description: Bearer token with the account's authorization requestBody: - description: The serialized packet to be sent to the peer - content: - text/plain: - example: "" + description: The serialized packet to be sent to the peer + content: + text/plain: + example: "" responses: - '200': + "200": description: The serialized fulfill/reject packet received by the peer content: application/octet-stream: @@ -324,12 +324,12 @@ paths: get: summary: Gets the routes on the node responses: - '200': + "200": description: The node's prefix-usernames pairs for the routes content: application/json: - schema: - $ref: '#/components/schemas/Routes' + schema: + $ref: "#/components/schemas/Routes" /routes/static: put: @@ -344,18 +344,18 @@ paths: required: true description: Bearer token with the administrator's authorization requestBody: - description: New static routes. The key is a route prefix, and the value is a username of an account. - content: - application/json: - schema: - $ref: '#/components/schemas/Routes' + description: New static routes. The key is a route prefix, and the value is a username of an account. + content: + application/json: + schema: + $ref: "#/components/schemas/Routes" responses: - '200': + "200": description: Returns the created static routes content: application/json: - schema: - $ref: '#/components/schemas/Routes' + schema: + $ref: "#/components/schemas/Routes" /routes/static/{prefix}: put: @@ -382,7 +382,7 @@ paths: type: string example: "alice" responses: - '200': + "200": description: The created static route content: text/plain: @@ -395,12 +395,12 @@ paths: get: summary: Get all of the node's exchange rates. responses: - '200': + "200": description: The stored exchange rates content: application/json: - schema: - $ref: '#/components/schemas/Pairs' + schema: + $ref: "#/components/schemas/Pairs" put: summary: Sets new currency rates. Will override any previous values. tags: @@ -417,14 +417,14 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Pairs' + $ref: "#/components/schemas/Pairs" responses: - '200': + "200": description: Updated rates content: application/json: - schema: - $ref: '#/components/schemas/Pairs' + schema: + $ref: "#/components/schemas/Pairs" # Engines endpoints /settlement/engines: @@ -440,19 +440,18 @@ paths: required: true description: Bearer token with the administrator's authorization requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/SettlementEngines' - description: Asset Code to engine URL map + content: + application/json: + schema: + $ref: "#/components/schemas/SettlementEngines" + description: Asset Code to engine URL map responses: - '200': + "200": description: Returns the created engines content: application/json: - schema: - $ref: '#/components/schemas/Routes' - + schema: + $ref: "#/components/schemas/Routes" # Various data types returned / sent to the API components: @@ -463,42 +462,54 @@ components: - receiver - source_amount properties: - receiver: + receiver: type: string example: "$payment-pointer.example.com" - source_amount: + source_amount: type: integer example: 100000 + slippage: + oneOf: + - type: number + - type: string + default: 0.01 + description: Maximum acceptable slippage percentage below calculated minimum exchange rate PaymentResponse: type: object - required: - - receiver - - source_amount properties: - delivered_asset_scale: + source_asset_scale: type: integer - example: 9, - delivered_asset_code: + example: 9 + source_asset_code: type: string - example: "ABC" + example: "XYZ" sent_amount: type: integer - example: 1000000, - sent_asset_code: - type: string - example: "XYZ" - from: - type: string - example: "example.node_a.alice" - to: - type: string - example: "example.node_b.bob.-p3zU4tXsDRCBLg8vt_U6iiyQ5pgZk4MfoCaG1wZDW8" + example: 1000000 + description: Amount fulfilled or currently in-flight, in source units + source_amount: + type: integer + description: Total amount *intended* to be sent, in source units + in_flight_amount: + type: integer + description: Amount in-flight (yet to be fulfilled or rejected), in source units delivered_amount: type: number example: 1000000 - sent_asset_scale: + description: Amount fulfilled and received by the recipient, in destination units + destination_asset_scale: type: integer example: 9 + destination_asset_code: + type: string + example: "ABC" + from: + type: string + example: "example.node_a.alice" + to: + type: string + example: "example.node_b.bob.-p3zU4tXsDRCBLg8vt_U6iiyQ5pgZk4MfoCaG1wZDW8" + NodeInformation: type: object required: @@ -579,7 +590,7 @@ components: ilp_over_btp_outgoing_token: type: string example: "our_password_on_peer" - settlement_engine_url: + settlement_engine_url: type: string example: "http://engine.example.com" settle_threshold: @@ -615,7 +626,7 @@ components: - ilp_over_btp_url - ilp_over_btp_incoming_token - ilp_over_btp_outgoing_token - - settlement_engine_url + - settlement_engine_url - settle_threshold - settle_to - routing_relation @@ -660,7 +671,7 @@ components: ilp_over_btp_outgoing_token: type: string example: "our_password_on_peer" - settlement_engine_url: + settlement_engine_url: type: string example: "http://engine.example.com" settle_threshold: @@ -709,20 +720,21 @@ components: type: integer example: 1000000000 Pairs: - example: {"ABC": 1.23, "XYZ": 3.25} + example: { "ABC": 1.23, "XYZ": 3.25 } type: object additionalProperties: type: number example: 1.23 Routes: - example: {"example.op1.alice":"alice","example.op1":"op1"} + example: { "example.op1.alice": "alice", "example.op1": "op1" } type: object additionalProperties: type: string example: "alice" SettlementEngines: - example: {"ABC":"http://localhost:3001","XYZ":"http://localhost:3002"} + example: + { "ABC": "http://localhost:3001", "XYZ": "http://localhost:3002" } type: object additionalProperties: type: string - example: "http://localhost:3001" \ No newline at end of file + example: "http://localhost:3001" diff --git a/examples/eth-xrp-three-nodes/README.md b/examples/eth-xrp-three-nodes/README.md index 3cabd22e0..c8e54ad81 100755 --- a/examples/eth-xrp-three-nodes/README.md +++ b/examples/eth-xrp-three-nodes/README.md @@ -5,9 +5,6 @@ function post_test_hook() { test_equals_or_exit '{"asset_code":"ETH","balance":-0.0005}' test_http_response_body -H "Authorization: Bearer hi_alice" http://localhost:7770/accounts/alice/balance test_equals_or_exit '{"asset_code":"ETH","balance":0.0}' test_http_response_body -H "Authorization: Bearer hi_alice" http://localhost:7770/accounts/bob/balance test_equals_or_exit '{"asset_code":"ETH","balance":0.0}' test_http_response_body -H "Authorization: Bearer hi_bob" http://localhost:8770/accounts/alice/balance - test_equals_or_exit '{"asset_code":"XRP","balance":0.0}' test_http_response_body -H "Authorization: Bearer hi_bob" http://localhost:8770/accounts/charlie/balance - test_equals_or_exit '{"asset_code":"XRP","balance":0.0}' test_http_response_body -H "Authorization: Bearer hi_charlie" http://localhost:9770/accounts/bob/balance - test_equals_or_exit '{"asset_code":"XRP","balance":0.0005}' test_http_response_body -H "Authorization: Bearer hi_charlie" http://localhost:9770/accounts/charlie/balance fi } --> @@ -437,6 +434,7 @@ ilp-node \ --redis_url redis://127.0.0.1:6379/ \ --http_bind_address 127.0.0.1:7770 \ --settlement_api_bind_address 127.0.0.1:7771 \ +--exchange_rate.provider CoinCap \ &> logs/node-alice.log & # Start Bob's node @@ -447,9 +445,10 @@ ilp-node \ --redis_url redis://127.0.0.1:6381/ \ --http_bind_address 127.0.0.1:8770 \ --settlement_api_bind_address 127.0.0.1:8771 \ +--exchange_rate.provider CoinCap \ &> logs/node-bob.log & -# Start Charlie's node. The --ilp_address field is omitted, so the node's address will +# Start Charlie's node. The --ilp_address field is omitted, so the node's address will # be `local.host`. When a parent account is added, Charlie's node will send an ILDCP request # to them, which they will respond to with an address that they have assigned to Charlie. # Charlie will then proceed to set that as their node's address. @@ -460,6 +459,7 @@ ilp-node \ --redis_url redis://127.0.0.1:6384/ \ --http_bind_address 127.0.0.1:9770 \ --settlement_api_bind_address 127.0.0.1:9771 \ +--exchange_rate.provider CoinCap \ &> logs/node-charlie.log & ``` @@ -528,20 +528,20 @@ ilp-cli --node http://localhost:8770 accounts create alice \ --routing-relation Peer &> logs/account-bob-alice.log printf "Adding Charlie's account on Bob's node (XRP Child relation)...\n" -# you can optionally provide --ilp-address example.bob.charlie as an argument, +# you can optionally provide --ilp-address example.bob.charlie as an argument, # but the node is smart enough to figure it by itself +# Prefunds up to 1 XRP from Bob to Charlie, topped up after every packet is fulfilled ilp-cli --node http://localhost:8770 accounts create charlie \ --auth hi_bob \ --asset-code XRP \ --asset-scale 6 \ - --max-packet-amount 100 \ --settlement-engine-url http://localhost:3002 \ --ilp-over-http-incoming-token charlie_password \ --ilp-over-http-outgoing-token bob_other_password \ --ilp-over-http-url http://localhost:9770/accounts/bob/ilp \ - --settle-threshold 500 \ - --min-balance -1000 \ - --settle-to 0 \ + --settle-threshold 0 \ + --settle-to -1000000 \ + --min-balance -10000000 \ --routing-relation Child &> logs/account-bob-charlie.log & printf "Adding Charlie's Account...\n" @@ -550,13 +550,12 @@ ilp-cli --node http://localhost:9770 accounts create charlie \ --auth hi_charlie \ --asset-code XRP \ --asset-scale 6 \ - --max-packet-amount 100 \ --ilp-over-http-incoming-token charlie_password \ --settle-to 0 &> logs/account-charlie-charlie.log printf "Adding Bob's account on Charlie's node (XRP Parent relation)...\n" -# Once a parent is added, Charlie's node address is updated to `example.bob.charlie, -# and then subsequently the addresses of all NonRoutingAccount and Child accounts get +# Once a parent is added, Charlie's node address is updated to `example.bob.charlie, +# and then subsequently the addresses of all NonRoutingAccount and Child accounts get # updated to ${NODE_ADDRESS}.${CHILD_USERNAME}, with the exception of the account whose # username matches the suffix of the node's address. # So in this case, Charlie's account address gets updated from local.host.charlie to example.bob.charlie @@ -567,14 +566,13 @@ ilp-cli --node http://localhost:9770 accounts create bob \ --ilp-address example.bob \ --asset-code XRP \ --asset-scale 6 \ - --max-packet-amount 100 \ --settlement-engine-url http://localhost:3003 \ --ilp-over-http-incoming-token bob_other_password \ --ilp-over-http-outgoing-token charlie_password \ --ilp-over-http-url http://localhost:8770/accounts/charlie/ilp \ - --settle-threshold 500 \ - --min-balance -1000 \ - --settle-to 0 \ + --settle-threshold 200000 \ + --settle-to -1000000 \ + --min-balance -10000000 \ --routing-relation Parent &> logs/account-charlie-bob.log sleep 2 @@ -586,18 +584,7 @@ Notice how we use Alice's settlement engine endpoint while registering Bob. This The `settle_threshold` and `settle_to` parameters control when settlements are triggered. The node will send a settlement when an account's balance reaches the `settle_threshold`, and it will settle for `balance - settle_to`. -### 7. Set the exchange rate between ETH and XRP on Bob's connector - -```bash -printf "\nSetting the exchange rate...\n" -ilp-cli --node http://localhost:8770 rates set-all \ - --auth hi_bob \ - --pair ETH 1 \ - --pair XRP 1 \ - >/dev/null -``` - -### 8. Sending a Payment +### 7. Sending a Payment ```bash @@ -644,14 +631,15 @@ wait_to_get_http_response_body '{"asset_code":"ETH","balance":0.0}' 10 -H "Autho printf "done\n" printf "Waiting for XRP ledger to be validated" -wait_to_get_http_response_body '{"asset_code":"XRP","balance":0.0}' 20 -H "Authorization: Bearer hi_charlie" "http://localhost:9770/accounts/bob/balance" || error_and_exit "Could not confirm settlement." -printf "done\n" +sleep 10 --> ### 8. Check Balances You may see unsettled balances before the settlement engines exactly work. Wait a few seconds and try later. +If Bob's balance on Charlie's node is greater than 0, then Bob sent an XRP settlement to Charlie! +