From 1c5ffedff35cdf4408847c9aecbefb37a8c135db Mon Sep 17 00:00:00 2001 From: Kincaid O'Neil Date: Fri, 7 Feb 2020 18:08:59 +0000 Subject: [PATCH] fix(stream): timeout at deadline instead of polling - also, inline methods for accounting --- crates/interledger-stream/src/client.rs | 37 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/crates/interledger-stream/src/client.rs b/crates/interledger-stream/src/client.rs index 1435fe354..1ca9f92af 100644 --- a/crates/interledger-stream/src/client.rs +++ b/crates/interledger-stream/src/client.rs @@ -18,7 +18,7 @@ use num::traits::identities::One; use num::traits::pow::pow; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; -use tokio::time::timeout; +use tokio::time::timeout_at; use tokio::time::{Duration, Instant}; use std::cmp::{max, min}; @@ -101,6 +101,7 @@ struct StreamPayment { impl StreamPayment { /// Account for and return amount to send in the next Prepare + #[inline] fn apply_prepare(&mut self) -> u64 { let amount = min( self.get_amount_available_to_send(), @@ -115,6 +116,7 @@ impl StreamPayment { } /// Account for a fulfilled packet and update flow control + #[inline] fn apply_fulfill(&mut self, source_amount: u64, destination_amount: u64) { self.congestion_controller.fulfill(source_amount); @@ -129,6 +131,7 @@ impl StreamPayment { } /// Account for a rejected packet and update flow control + #[inline] fn apply_reject(&mut self, amount: u64, reject: &Reject) { self.congestion_controller.reject(amount, reject); @@ -139,12 +142,14 @@ impl StreamPayment { } /// Save the recipient's destination asset details for calculating minimum exchange rates + #[inline] fn set_destination_asset_details(&mut self, asset_code: String, asset_scale: u8) { self.receipt.destination_asset_code = Some(asset_code); self.receipt.destination_asset_scale = Some(asset_scale); } /// Return the current sequence number and increment the value for subsequent packets + #[inline] fn next_sequence(&mut self) -> u64 { let seq = self.sequence; self.sequence += 1; @@ -152,6 +157,7 @@ impl StreamPayment { } /// Amount of money fulfilled in source units + #[inline] fn get_fulfilled_amount(&self) -> u64 { self.receipt .sent_amount @@ -159,6 +165,7 @@ impl StreamPayment { } // Get remaining amount that must be fulfilled for the payment to complete + #[inline] fn get_remaining_amount(&self) -> u64 { self.receipt .source_amount @@ -166,11 +173,13 @@ impl StreamPayment { } /// Has the entire intended source amount been fulfilled by the recipient? + #[inline] fn is_complete(&self) -> bool { self.get_remaining_amount() == 0 } /// Return the amount of money available to be sent in the payment (amount remaining minus in-flight) + #[inline] fn get_amount_available_to_send(&self) -> u64 { // Sent amount also includes the amount in-flight, which should be subtracted from the amount available self.receipt @@ -181,11 +190,13 @@ impl StreamPayment { /// Is as much money as possible in-flight? /// (If so, the intended source amount may be fulfilled or in-flight, or the congestion controller /// has temporarily limited sending more money) + #[inline] fn is_max_in_flight(&self) -> bool { self.congestion_controller.get_max_amount() == 0 || self.get_amount_available_to_send() == 0 } /// Given we've attempted sending enough packets, does our rejected packet rate indicate the payment is failing? + #[inline] fn is_failing(&self) -> bool { let num_packets = self.fulfilled_packets + self.rejected_packets; num_packets >= FAIL_FAST_MINIMUM_PACKET_ATTEMPTS @@ -249,8 +260,8 @@ where enum PaymentEvent { /// Send more money: send a packet with the given amount SendMoney(u64), - /// Congestion controller has limited the amount in flight: wait for pending request to complete - MaxInFlight, + /// Congestion controller limited in-flight amount: wait for pending requests until given deadline + MaxInFlight(Instant), /// Send full source amount: close the connection and return success CloseConnection, /// Maximum timeout since last fulfill has elapsed: terminate the payment @@ -270,7 +281,11 @@ where } else if payment.is_complete() { PaymentEvent::CloseConnection } else if payment.is_max_in_flight() { - PaymentEvent::MaxInFlight + let deadline = payment + .last_fulfill_time + .checked_add(MAX_TIME_SINCE_LAST_FULFILL) + .unwrap(); + PaymentEvent::MaxInFlight(deadline) } else { PaymentEvent::SendMoney(payment.apply_prepare()) } @@ -283,16 +298,12 @@ where sender.send_money_packet(packet_amount).await })); } - PaymentEvent::MaxInFlight => { - // Wait for 100ms for any request to complete, otherwise try running loop again - // to see if we reached the timeout since last fulfill - let fut = timeout( - Duration::from_millis(100), - pending_requests.select_next_some(), - ) - .await; + PaymentEvent::MaxInFlight(deadline) => { + // Wait for any request to complete, or if after reach deadline since last fulfill, + // run loop again, which should timeout the payment + let result = timeout_at(deadline, pending_requests.select_next_some()).await; - if let Ok(Ok(Err(error))) = fut { + if let Ok(Ok(Err(error))) = result { error!("Send money stopped because of error: {:?}", error); return Err(error); }