Skip to content

Commit

Permalink
fix(stream): timeout at deadline instead of polling
Browse files Browse the repository at this point in the history
- also, inline methods for accounting
  • Loading branch information
kincaidoneil committed Feb 7, 2020
1 parent 2bfed61 commit 1c5ffed
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions crates/interledger-stream/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use num::traits::identities::One;
use num::traits::pow::pow;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::time::timeout;
use tokio::time::timeout_at;
use tokio::time::{Duration, Instant};

use std::cmp::{max, min};
Expand Down Expand Up @@ -101,6 +101,7 @@ struct StreamPayment {

impl StreamPayment {
/// Account for and return amount to send in the next Prepare
#[inline]
fn apply_prepare(&mut self) -> u64 {
let amount = min(
self.get_amount_available_to_send(),
Expand All @@ -115,6 +116,7 @@ impl StreamPayment {
}

/// Account for a fulfilled packet and update flow control
#[inline]
fn apply_fulfill(&mut self, source_amount: u64, destination_amount: u64) {
self.congestion_controller.fulfill(source_amount);

Expand All @@ -129,6 +131,7 @@ impl StreamPayment {
}

/// Account for a rejected packet and update flow control
#[inline]
fn apply_reject(&mut self, amount: u64, reject: &Reject) {
self.congestion_controller.reject(amount, reject);

Expand All @@ -139,38 +142,44 @@ impl StreamPayment {
}

/// Save the recipient's destination asset details for calculating minimum exchange rates
#[inline]
fn set_destination_asset_details(&mut self, asset_code: String, asset_scale: u8) {
self.receipt.destination_asset_code = Some(asset_code);
self.receipt.destination_asset_scale = Some(asset_scale);
}

/// Return the current sequence number and increment the value for subsequent packets
#[inline]
fn next_sequence(&mut self) -> u64 {
let seq = self.sequence;
self.sequence += 1;
seq
}

/// Amount of money fulfilled in source units
#[inline]
fn get_fulfilled_amount(&self) -> u64 {
self.receipt
.sent_amount
.saturating_sub(self.receipt.in_flight_amount)
}

// Get remaining amount that must be fulfilled for the payment to complete
#[inline]
fn get_remaining_amount(&self) -> u64 {
self.receipt
.source_amount
.saturating_sub(self.get_fulfilled_amount())
}

/// Has the entire intended source amount been fulfilled by the recipient?
#[inline]
fn is_complete(&self) -> bool {
self.get_remaining_amount() == 0
}

/// Return the amount of money available to be sent in the payment (amount remaining minus in-flight)
#[inline]
fn get_amount_available_to_send(&self) -> u64 {
// Sent amount also includes the amount in-flight, which should be subtracted from the amount available
self.receipt
Expand All @@ -181,11 +190,13 @@ impl StreamPayment {
/// Is as much money as possible in-flight?
/// (If so, the intended source amount may be fulfilled or in-flight, or the congestion controller
/// has temporarily limited sending more money)
#[inline]
fn is_max_in_flight(&self) -> bool {
self.congestion_controller.get_max_amount() == 0 || self.get_amount_available_to_send() == 0
}

/// Given we've attempted sending enough packets, does our rejected packet rate indicate the payment is failing?
#[inline]
fn is_failing(&self) -> bool {
let num_packets = self.fulfilled_packets + self.rejected_packets;
num_packets >= FAIL_FAST_MINIMUM_PACKET_ATTEMPTS
Expand Down Expand Up @@ -249,8 +260,8 @@ where
enum PaymentEvent {
/// Send more money: send a packet with the given amount
SendMoney(u64),
/// Congestion controller has limited the amount in flight: wait for pending request to complete
MaxInFlight,
/// Congestion controller limited in-flight amount: wait for pending requests until given deadline
MaxInFlight(Instant),
/// Send full source amount: close the connection and return success
CloseConnection,
/// Maximum timeout since last fulfill has elapsed: terminate the payment
Expand All @@ -270,7 +281,11 @@ where
} else if payment.is_complete() {
PaymentEvent::CloseConnection
} else if payment.is_max_in_flight() {
PaymentEvent::MaxInFlight
let deadline = payment
.last_fulfill_time
.checked_add(MAX_TIME_SINCE_LAST_FULFILL)
.unwrap();
PaymentEvent::MaxInFlight(deadline)
} else {
PaymentEvent::SendMoney(payment.apply_prepare())
}
Expand All @@ -283,16 +298,12 @@ where
sender.send_money_packet(packet_amount).await
}));
}
PaymentEvent::MaxInFlight => {
// Wait for 100ms for any request to complete, otherwise try running loop again
// to see if we reached the timeout since last fulfill
let fut = timeout(
Duration::from_millis(100),
pending_requests.select_next_some(),
)
.await;
PaymentEvent::MaxInFlight(deadline) => {
// Wait for any request to complete, or if after reach deadline since last fulfill,
// run loop again, which should timeout the payment
let result = timeout_at(deadline, pending_requests.select_next_some()).await;

if let Ok(Ok(Err(error))) = fut {
if let Ok(Ok(Err(error))) = result {
error!("Send money stopped because of error: {:?}", error);
return Err(error);
}
Expand Down

0 comments on commit 1c5ffed

Please sign in to comment.