Skip to content

Commit

Permalink
feat: subscription also returns simulated implicit swaps
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Feb 19, 2024
1 parent aa1fbb8 commit f142ef4
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 42 deletions.
50 changes: 29 additions & 21 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use pallet_cf_governance::GovCallHash;
use pallet_cf_pools::{
AskBidMap, AssetsMap, PoolInfo, PoolLiquidity, PoolPriceV1, UnidirectionalPoolDepth,
};
use pallet_cf_swapping::Swap;
use pallet_cf_swapping::SwapLegInfo;
use sc_client_api::{BlockchainEvents, HeaderBackend};
use serde::{Deserialize, Serialize};
use sp_api::{ApiError, BlockT, HeaderT};
Expand All @@ -47,7 +47,7 @@ use std::{
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ScheduledSwap {
#[serde(flatten)]
swap: Swap,
swap: SwapLegInfo,
execute_at: BlockNumber,
}

Expand Down Expand Up @@ -526,8 +526,11 @@ pub trait CustomApi {
side: Order,
);

#[subscription(name = "subscribe_scheduled_swaps", item = Vec<ScheduledSwap>)]
fn cf_subscribe_scheduled_swaps(&self, asset1: RpcAsset, asset2: RpcAsset);
// Subscribe to a stream that on every block produces a list of all scheduled/pending
// swaps in the base_asset/quote_asset pool, including any "implicit" half-swaps (as a
// part of a swap involving two pools)
#[subscription(name = "subscribe_scheduled_swaps", item = SwapResponse)]
fn cf_subscribe_scheduled_swaps(&self, base_asset: RpcAsset, quote_asset: RpcAsset);

#[method(name = "prewitness_swaps")]
fn cf_prewitness_swaps(
Expand Down Expand Up @@ -1226,25 +1229,30 @@ where
fn cf_subscribe_scheduled_swaps(
&self,
sink: SubscriptionSink,
asset1: RpcAsset,
asset2: RpcAsset,
base_asset: RpcAsset,
quote_asset: RpcAsset,
) -> Result<(), SubscriptionEmptyError> {
let asset1 = asset1.try_into().map_err(|_| SubscriptionEmptyError)?;
let asset2 = asset2.try_into().map_err(|_| SubscriptionEmptyError)?;
self.new_subscription(false, false, sink, move |api, hash| {
let swaps = api
.cf_scheduled_swaps(hash, asset1, asset2)?
.into_iter()
.map(|(swap, execute_at)| ScheduledSwap { swap, execute_at })
.collect::<Vec<_>>();

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
struct SwapResponse {
swaps: Vec<ScheduledSwap>,
}
let base_asset = base_asset.try_into().map_err(|_| SubscriptionEmptyError)?;
let quote_asset = quote_asset.try_into().map_err(|_| SubscriptionEmptyError)?;
self.new_subscription(
false, /* only_on_changes */
true, /* end_on_error */
sink,
move |api, hash| {
let swaps = api
.cf_scheduled_swaps(hash, base_asset, quote_asset)?
.into_iter()
.map(|(swap, execute_at)| ScheduledSwap { swap, execute_at })
.collect::<Vec<_>>();

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
struct SwapResponse {
swaps: Vec<ScheduledSwap>,
}

Ok::<_, ApiError>(SwapResponse { swaps })
})
Ok::<_, ApiError>(SwapResponse { swaps })
},
)
}

fn cf_subscribe_prewitness_swaps(
Expand Down
74 changes: 64 additions & 10 deletions state-chain/pallets/cf-swapping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ pub struct Swap {
pub fee_taken: bool,
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct SwapLegInfo {
pub swap_id: SwapId,
pub from: Asset,
pub to: Asset,
pub amount: AssetAmount,
pub swap_type: SwapType,
}

impl Swap {
fn new(
swap_id: SwapId,
Expand Down Expand Up @@ -651,17 +661,46 @@ pub mod pallet {
}

impl<T: Config> Pallet<T> {
// Transactional ensures that any failed swap will rollback all storage changes.
#[transactional]
fn process_swaps_for_block(block: BlockNumberFor<T>) -> Result<(), BatchExecutionError> {
let mut swaps = SwapQueue::<T>::take(block);

if swaps.is_empty() {
return Ok(())
}
pub fn get_scheduled_swap_legs(
mut swaps: Vec<Swap>,
base_asset: Asset,
) -> Result<Vec<SwapLegInfo>, ()> {
Self::swap_into_stable_taking_network_fee(&mut swaps)
.map_err(|_| log::error!("Failed to simulate swaps"))?;

Ok(swaps
.into_iter()
.filter_map(|swap| {
if swap.from == base_asset {
Some(SwapLegInfo {
swap_id: swap.swap_id,
from: swap.from,
// All swaps from `base_asset` have to go through Usdc:
to: Asset::Usdc,
amount: swap.amount,
swap_type: swap.swap_type,
})
} else if swap.to == base_asset {
Some(SwapLegInfo {
swap_id: swap.swap_id,
// All swaps to `base_asset` have to go through Usdc:
from: Asset::Usdc,
to: swap.to,
// Safe to unwrap as we have swapped everything into USDC at this point
amount: swap.stable_amount.unwrap(),
swap_type: swap.swap_type,
})
} else {
None
}
})
.collect())
}

// Swap into Stable asset first.
Self::do_group_and_swap(&mut swaps, SwapLeg::ToStable)?;
fn swap_into_stable_taking_network_fee(
swaps: &mut Vec<Swap>,
) -> Result<(), BatchExecutionError> {
Self::do_group_and_swap(swaps, SwapLeg::ToStable)?;

// Take NetworkFee for all swaps
for swap in swaps.iter_mut() {
Expand All @@ -673,6 +712,21 @@ pub mod pallet {
*stable_amount = T::SwappingApi::take_network_fee(*stable_amount);
}

Ok(())
}

// Transactional ensures that any failed swap will rollback all storage changes.
#[transactional]
fn process_swaps_for_block(block: BlockNumberFor<T>) -> Result<(), BatchExecutionError> {
let mut swaps = SwapQueue::<T>::take(block);

if swaps.is_empty() {
return Ok(())
}

// Swap into Stable asset first, then take network fees:
Self::swap_into_stable_taking_network_fee(&mut swaps)?;

// Swap from Stable asset, and complete the swap logic.
Self::do_group_and_swap(&mut swaps, SwapLeg::FromStable)?;

Expand Down
2 changes: 1 addition & 1 deletion state-chain/pallets/cf-swapping/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl_mock_runtime_safe_mode! { swapping: PalletSafeMode }
parameter_types! {
pub static NetworkFee: Percent = Percent::from_percent(0);
pub static Swaps: Vec<(Asset, Asset, AssetAmount)> = vec![];
pub static SwapRate: f64 = 1f64;
pub static SwapRate: f64 = 2f64;
}

thread_local! {
Expand Down
60 changes: 60 additions & 0 deletions state-chain/pallets/cf-swapping/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use cf_traits::{
use frame_support::{assert_err, assert_noop, assert_ok, traits::Hooks};
use itertools::Itertools;
use sp_arithmetic::Permill;
use sp_core::H160;
use sp_std::iter;

const GAS_BUDGET: AssetAmount = 1_000u128;
Expand Down Expand Up @@ -2128,3 +2129,62 @@ fn deposit_address_ready_event_contain_correct_boost_fee_value() {
);
});
}

#[test]
fn test_get_scheduled_swap_legs() {
new_test_ext().execute_with(|| {
const SWAP_TYPE: SwapType = SwapType::Swap(ForeignChainAddress::Eth(H160::zero()));
const INIT_AMOUNT: AssetAmount = 1000;

let swaps: Vec<_> = [
(1, Asset::Flip, Asset::Usdc),
(2, Asset::Usdc, Asset::Flip),
(3, Asset::Btc, Asset::Eth),
(4, Asset::Flip, Asset::Btc),
(5, Asset::Eth, Asset::Flip),
]
.into_iter()
.map(|(id, from, to)| Swap::new(id, from, to, INIT_AMOUNT, SWAP_TYPE.clone()))
.collect();

// The amount of USDC in the middle of swap (5):
const INTERMEDIATE_AMOUNT: AssetAmount = 2000;

// The test is more useful when these aren't equal:
assert_ne!(INIT_AMOUNT, INTERMEDIATE_AMOUNT);

assert_eq!(
Swapping::get_scheduled_swap_legs(swaps, Asset::Flip).unwrap(),
vec![
SwapLegInfo {
swap_id: 1,
from: Asset::Flip,
to: Asset::Usdc,
amount: INIT_AMOUNT,
swap_type: SWAP_TYPE.clone()
},
SwapLegInfo {
swap_id: 2,
from: Asset::Usdc,
to: Asset::Flip,
amount: INIT_AMOUNT,
swap_type: SWAP_TYPE.clone(),
},
SwapLegInfo {
swap_id: 4,
from: Asset::Flip,
to: Asset::Usdc,
amount: INIT_AMOUNT,
swap_type: SWAP_TYPE.clone(),
},
SwapLegInfo {
swap_id: 5,
from: Asset::Usdc,
to: Asset::Flip,
amount: INTERMEDIATE_AMOUNT,
swap_type: SWAP_TYPE.clone(),
},
]
);
});
}
14 changes: 8 additions & 6 deletions state-chain/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pallet_cf_pools::{
UnidirectionalPoolDepth,
};
use pallet_cf_reputation::ExclusionList;
use pallet_cf_swapping::{CcmSwapAmounts, Swap};
use pallet_cf_swapping::{CcmSwapAmounts, SwapLegInfo};
use pallet_cf_validator::SetSizeMaximisingAuctionResolver;
use pallet_transaction_payment::{ConstFeeMultiplier, Multiplier};
use scale_info::prelude::string::String;
Expand Down Expand Up @@ -1408,17 +1408,19 @@ impl_runtime_apis! {
all_prewitnessed_swaps
}

fn cf_scheduled_swaps(from: Asset, to: Asset) -> Vec<(Swap, BlockNumber)> {
fn cf_scheduled_swaps(base_asset: Asset, _quote_asset: Asset) -> Vec<(SwapLegInfo, BlockNumber)> {

let first_block = Swapping::first_unprocessed_block();
let last_block = System::block_number() + pallet_cf_swapping::SWAP_DELAY_BLOCKS;

debug_assert!(first_block < last_block);

(first_block..=last_block).into_iter().map(|block| {
Swapping::swap_queue(block).into_iter()
.filter(|swap| {(swap.from == from && swap.to == to) || (swap.from == to && swap.to == from)})
.map(move |swap| (swap, block))
(first_block..=last_block).map(|block| {
let swaps_for_block = Swapping::swap_queue(block);

let swaps: Vec<_> = swaps_for_block.iter().filter(|swap| swap.from == base_asset || swap.to == base_asset).cloned().collect();

Swapping::get_scheduled_swap_legs(swaps, base_asset).unwrap().into_iter().map(move |swap| (swap, block))
}).flatten().collect()

}
Expand Down
8 changes: 4 additions & 4 deletions state-chain/runtime/src/runtime_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pallet_cf_pools::{
AskBidMap, AssetsMap, PoolInfo, PoolLiquidity, PoolOrderbook, PoolOrders, PoolPriceV1,
PoolPriceV2, UnidirectionalPoolDepth,
};
use pallet_cf_swapping::Swap;
use pallet_cf_swapping::SwapLegInfo;
use pallet_cf_witnesser::CallHash;
use scale_info::{prelude::string::String, TypeInfo};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -182,9 +182,9 @@ decl_runtime_apis!(
side: Order,
) -> Vec<AssetAmount>;
fn cf_scheduled_swaps(
asset1: Asset,
asset2: Asset,
) -> Vec<(Swap, cf_primitives::BlockNumber)>;
base_asset: Asset,
quote_asset: Asset,
) -> Vec<(SwapLegInfo, cf_primitives::BlockNumber)>;
fn cf_liquidity_provider_info(account_id: AccountId32) -> Option<LiquidityProviderInfo>;
fn cf_account_role(account_id: AccountId32) -> Option<AccountRole>;
fn cf_asset_balances(account_id: AccountId32) -> Vec<(Asset, AssetAmount)>;
Expand Down

0 comments on commit f142ef4

Please sign in to comment.