From e2464d26555f280f316676cf590db3ce76eed2ea Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 23 Apr 2024 16:53:00 -0600 Subject: [PATCH] progress WIP Removed some additional bounds and cleaned up some structs and impls. There's still a bit of a battle with deserialization here, but we can get through it. --- kit/src/behaviors/allocate/mod.rs | 14 ++-- kit/src/behaviors/creator.rs | 32 +++----- kit/src/behaviors/mod.rs | 72 ++++++++++++----- kit/src/behaviors/swap/mod.rs | 130 +++++++++--------------------- kit/src/behaviors/update/mod.rs | 94 ++++----------------- kit/src/pool/mod.rs | 92 ++++++++++++++++----- kit/tests/common.rs | 12 +-- kit/tests/creator_integration.rs | 29 ++++--- kit/tests/update_integration.rs | 2 +- 9 files changed, 219 insertions(+), 258 deletions(-) diff --git a/kit/src/behaviors/allocate/mod.rs b/kit/src/behaviors/allocate/mod.rs index c8911bc5..c14b4c2c 100644 --- a/kit/src/behaviors/allocate/mod.rs +++ b/kit/src/behaviors/allocate/mod.rs @@ -1,6 +1,6 @@ use super::*; -pub trait AllocateType: Debug + Serialize + Clone +pub trait AllocateType where E: Send + 'static, { @@ -44,9 +44,9 @@ where #[async_trait::async_trait] impl Behavior for Allocate> where - A: AllocateType + Debug + Send + Sync + 'static + for<'a> Deserialize<'a>, - P: PoolType + Debug + Send + Sync + 'static, - E: Debug + Send + Sync + 'static, + A: AllocateType + Send, + P: PoolType + Send, + E: Send + 'static, { type Processor = Allocate>; async fn startup( @@ -61,9 +61,9 @@ where #[async_trait::async_trait] impl Processor for Allocate> where - A: AllocateType + Debug + Send + Sync + 'static, - P: PoolType + Debug + Send + Sync + 'static, - E: Debug + Send + Sync + 'static, + A: AllocateType + Send, + P: PoolType + Send, + E: Send + 'static, { async fn get_stream(&mut self) -> Result>> { todo!("We have not implemented the 'get_stream' method yet for the 'Allocate' behavior."); diff --git a/kit/src/behaviors/creator.rs b/kit/src/behaviors/creator.rs index ddd075df..20c907f0 100644 --- a/kit/src/behaviors/creator.rs +++ b/kit/src/behaviors/creator.rs @@ -19,8 +19,6 @@ pub struct Config { impl

Behavior<()> for Create> where P: PoolType + Send + Sync + 'static, - P::StrategyContract: Send, - P::SolverContract: Send, { type Processor = (); async fn startup( @@ -100,23 +98,19 @@ where debug!("Pool created!\n {:#?}", pool); - let pool_creation = ( - pool.id, - pool.tokens.iter().map(|t| t.address()).collect::>(), - pool.liquidity_token.address(), - params, - self.data.allocation_data.clone(), - ); - messager.send(To::All, pool_creation).await.unwrap(); + messager + .send( + To::All, + PoolCreation::

{ + id: pool.id, + tokens: pool.tokens.iter().map(|t| t.address()).collect::>(), + liquidity_token: pool.liquidity_token.address(), + params, + allocation_data: self.data.allocation_data.clone(), + }, + ) + .await + .unwrap(); Ok(()) } } - -// TODO: We should be able to use this but it is currently hard to work with due -// to `serde::Deserialize` -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct PoolCreation { - pub id: eU256, - pub params: P::Parameters, - pub allocation_data: P::AllocationData, -} diff --git a/kit/src/behaviors/mod.rs b/kit/src/behaviors/mod.rs index 25d89828..9a761994 100644 --- a/kit/src/behaviors/mod.rs +++ b/kit/src/behaviors/mod.rs @@ -12,17 +12,13 @@ pub use token::{MintRequest, TokenAdminQuery}; use self::{ creator::Create, deploy::{Deploy, DeploymentData}, - pool::PoolType, + pool::{PoolCreation, PoolType}, token::TokenAdmin, }; use super::*; pub const MAX: eU256 = eU256::MAX; -type PoolId = eU256; -type TokenList = Vec; -type LiquidityToken = eAddress; - pub mod allocate; pub mod creator; pub mod deploy; @@ -46,22 +42,60 @@ where #[serde(untagged)] Deploy(DeploymentData), #[serde(untagged)] - // TODO: This is super weird. The following commented out version with `PoolCreation

` - // doesn't compile. Create(creator::PoolCreation

), - // TODO: BUT, this line where the tuple struct has the exact same data as `PoolCreation

` - // DOES compile. I'm not sure how to go about making this work nicely, but at least this works - // for now. - Create( - ( - eU256, // Pool ID - Vec, // Token List - eAddress, // Liquidity Token - P::Parameters, - P::AllocationData, - ), - ), + Create(PoolCreation

), #[serde(untagged)] TokenAdmin(token::Response), #[serde(untagged)] Update(P::Parameters), } + +#[derive(Debug)] +struct GetPoolTodo { + deployment_data: Option, + pool_creation: Option>, +} + +impl GetPoolTodo

{ + async fn complete(messager: &mut Messager) -> Self { + // Make an undone "TODO" list. + let mut todo: GetPoolTodo

= GetPoolTodo { + deployment_data: None, + pool_creation: None, + }; + let id = messager.id.clone(); + // Loop through the messager until we check off the boxes for this TODO list. + debug!("{:#?} is looping through their TODO list.", id.clone()); + loop { + if let Ok(msg) = messager.get_next_raw().await { + // TODO: Okay annoyingly if we try to deserialize into this immediately it + // works. But we can't use the message types enum. + let data: PoolCreation

= serde_json::from_str(&msg.data).unwrap(); + match msg.data { + MessageTypes::Deploy(deploy_data) => { + debug!("Updater: Got deployment data: {:?}", deploy_data); + todo.deployment_data = Some(deploy_data); + if todo.pool_creation.is_some() { + debug!("{:#?}: Got all the data.\n{:#?}", id.clone(), todo); + break todo; + } + } + MessageTypes::Create(pool_creation) => { + debug!("Updater: Got pool creation data: {:?}", pool_creation); + todo.pool_creation = Some(pool_creation); + if todo.deployment_data.is_some() { + debug!("{:#?}: Got all the data.\n{:#?}", id.clone(), todo); + break todo; + } + } + _ => continue, + } + } else { + debug!( + "{:#?} got some other message variant it could ignore.", + id.clone() + ); + continue; + } + } + } +} diff --git a/kit/src/behaviors/swap/mod.rs b/kit/src/behaviors/swap/mod.rs index bdae7ebf..3df32eb8 100644 --- a/kit/src/behaviors/swap/mod.rs +++ b/kit/src/behaviors/swap/mod.rs @@ -2,12 +2,16 @@ use self::{bindings::erc20::ERC20, pool::InputToken}; use super::*; use crate::behaviors::token::Response; -pub trait SwapType: Debug + Serialize + Clone { +pub trait SwapType { fn compute_swap_amount(event: E) -> (eU256, InputToken); } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Swap, E> { +pub struct Swap +where + S: State, + T: SwapType, +{ // to get tokens on start up pub token_admin: String, pub update: String, @@ -16,12 +20,20 @@ pub struct Swap, E> { pub _phantom: PhantomData, } +// TODO: This needs to be configurable in some way to make the `SwapType` become +// transparent and useful. +// Should also get some data necessary for mint amounts and what not. #[derive(Clone, Debug, Serialize, Deserialize, State)] pub struct Config { - pub base_config: BaseConfig, - pub params: P::Parameters, - pub allocation_data: P::AllocationData, - pub token_list: Vec, + phantom: PhantomData

, +} + +impl Default for Config

{ + fn default() -> Self { + Self { + phantom: PhantomData, + } + } } #[derive(Debug, Clone, State)] @@ -31,27 +43,12 @@ pub struct Processing { pub pool: Pool

, } -#[derive(Debug)] -struct SwapTodo { - deployment_data: Option, - #[allow(clippy::type_complexity)] - pool_creation: Option<( - PoolId, // Pool ID - TokenList, // Token List - LiquidityToken, // Liquidity Token -

::Parameters, -

::AllocationData, - )>, -} - #[async_trait::async_trait] impl Behavior<()> for Swap, T, E> where - P: PoolType + Send + Sync + 'static, - P::StrategyContract: Send, - P::SolverContract: Send, - T: SwapType + Send + Sync + 'static + for<'a> Deserialize<'a>, - E: Debug + Send + Sync + 'static, + P: PoolType + Send, + T: SwapType + Send, + E: Send, { // type Processor = Swap, T, E>; type Processor = (); @@ -60,67 +57,28 @@ where client: Arc, mut messager: Messager, ) -> Result { - // Make a "TODO" list. - // This is the data I need to recieve to do my job - let mut todo: SwapTodo

= SwapTodo { - deployment_data: None, - pool_creation: None, - }; - - // Loop through the messager until we check off the boxes for this TODO list. - debug!("Updater is looping through their TODO list."); - loop { - if let Ok(msg) = messager.get_next::>().await { - match msg.data { - MessageTypes::Deploy(deploy_data) => { - debug!("Updater: Got deployment data: {:?}", deploy_data); - todo.deployment_data = Some(deploy_data); - if todo.pool_creation.is_some() { - debug!("Updater: Got all the data.\n{:#?}", todo); - break; - } - } - MessageTypes::Create(pool_creation) => { - debug!("Updater: Got pool creation data: {:?}", pool_creation); - todo.pool_creation = Some(pool_creation); - if todo.deployment_data.is_some() { - debug!("Updater: Got all the data.\n{:#?}", todo); - break; - } - } - _ => continue, - } - } else { - debug!("Updater got some other message variant it could ignore."); - continue; - } - } - debug!("Updater has checked off their TODO list."); + // TODO: Here we probably need to filter on the `PoolCreation` so that we get + // the correct pool. + let completed_todo = GetPoolTodo::

::complete(&mut messager).await; + let (deployment_data, pool_creation) = ( + completed_todo.deployment_data.unwrap(), + completed_todo.pool_creation.unwrap(), + ); let (strategy_contract, solver_contract) = - P::get_contracts(todo.deployment_data.as_ref().unwrap(), client.clone()); - let dfmm = DFMM::new(todo.deployment_data.unwrap().dfmm, client.clone()); - debug!("Got DFMM and the strategy contracts."); + P::get_contracts(&deployment_data, client.clone()); + let dfmm = DFMM::new(deployment_data.dfmm, client.clone()); // Get the intended tokens for the pool and do approvals. let mut tokens: Vec> = Vec::new(); - for tkn in self.data.token_list.drain(..) { - messager - .send( - To::Agent(self.token_admin.clone()), - TokenAdminQuery::AddressOf(tkn.clone()), - ) - .await - .unwrap(); - let token = ArbiterToken::new( - messager.get_next::().await.unwrap().data, - client.clone(), - ); + for token_address in pool_creation.tokens.into_iter() { + let token = ArbiterToken::new(token_address, client.clone()); + let name = token.name().call().await?; messager .send( To::Agent(self.token_admin.clone()), TokenAdminQuery::MintRequest(MintRequest { - token: tkn, + token: name, mint_to: client.address(), mint_amount: 100_000_000_000, }), @@ -142,21 +100,13 @@ where tokens.push(token); } - let lp_address = todo.pool_creation.clone().unwrap().2; - let lp_token = ERC20::new(lp_address, client.clone()); - let instance = P::create_instance( - strategy_contract, - solver_contract, - todo.pool_creation.clone().unwrap().3, - ); - // build pool for processor and stream - let pool = Pool::

{ - id: todo.pool_creation.clone().unwrap().0, + let _pool = Pool::

{ + id: pool_creation.id, dfmm, - instance, + instance: P::create_instance(strategy_contract, solver_contract, pool_creation.params), tokens, - liquidity_token: lp_token, + liquidity_token: ERC20::new(pool_creation.liquidity_token, client.clone()), }; // TODO: We need to come back around and adjust this. // match self.swap_type.get_stream(messager.clone()) { @@ -184,8 +134,8 @@ where impl Processor for Swap, T, E> where P: PoolType + Send + Sync, - T: SwapType + Send + Sync + 'static, - E: Send + Sync + 'static, + T: SwapType + Send, + E: Send + 'static, { async fn get_stream(&mut self) -> Result>> { todo!("We have not implemented the 'get_stream' method yet for the 'Swap' behavior.") diff --git a/kit/src/behaviors/update/mod.rs b/kit/src/behaviors/update/mod.rs index 0c114920..d5b0b4a0 100644 --- a/kit/src/behaviors/update/mod.rs +++ b/kit/src/behaviors/update/mod.rs @@ -27,30 +27,10 @@ pub struct Processing { pub pool_params: VecDeque, } -type PoolId = eU256; -type TokenList = Vec; -type LiquidityToken = eAddress; - -#[derive(Debug)] -struct UpdateTodo { - deployment_data: Option, - #[allow(clippy::type_complexity)] - pool_creation: Option<( - PoolId, // Pool ID - TokenList, // Token List - LiquidityToken, // Liquidity Token -

::Parameters, -

::AllocationData, - )>, -} - #[async_trait::async_trait] impl

Behavior for Update> where - P: PoolType + Send + Sync + 'static, - P::Parameters: Send + Sync + 'static, - P::StrategyContract: Send + Sync + 'static, - P::SolverContract: Send + Sync + 'static, + P: PoolType + Send + Sync + for<'a> Deserialize<'a> + Serialize, { type Processor = Update>; async fn startup( @@ -58,68 +38,28 @@ where client: Arc, mut messager: Messager, ) -> Result { - // Make a "TODO" list. - // This is the data I need to recieve to do my job - let mut todo: UpdateTodo

= UpdateTodo { - deployment_data: None, - pool_creation: None, - }; - - // Loop through the messager until we check off the boxes for this TODO list. - debug!("Updater is looping through their TODO list."); - loop { - if let Ok(msg) = messager.get_next::>().await { - match msg.data { - MessageTypes::Deploy(deploy_data) => { - debug!("Updater: Got deployment data: {:?}", deploy_data); - todo.deployment_data = Some(deploy_data); - if todo.pool_creation.is_some() { - debug!("Updater: Got all the data.\n{:#?}", todo); - break; - } - } - MessageTypes::Create(pool_creation) => { - debug!("Updater: Got pool creation data: {:?}", pool_creation); - todo.pool_creation = Some(pool_creation); - if todo.deployment_data.is_some() { - debug!("Updater: Got all the data.\n{:#?}", todo); - break; - } - } - _ => continue, - } - } else { - debug!("Updater got some other message variant it could ignore."); - continue; - } - } - debug!("Updater has checked off their TODO list."); + let completed_todo = GetPoolTodo::

::complete(&mut messager).await; + let (deployment_data, pool_creation) = ( + completed_todo.deployment_data.unwrap(), + completed_todo.pool_creation.unwrap(), + ); let (strategy_contract, solver_contract) = - P::get_contracts(todo.deployment_data.as_ref().unwrap(), client.clone()); - let dfmm = DFMM::new(todo.deployment_data.unwrap().dfmm, client.clone()); - debug!("Got DFMM and the strategy contracts."); + P::get_contracts(&deployment_data, client.clone()); + let dfmm = DFMM::new(deployment_data.dfmm, client.clone()); + let pool = Pool::

{ - id: todo.pool_creation.clone().unwrap().0, + id: pool_creation.id, dfmm, - instance: P::create_instance( - strategy_contract, - solver_contract, - todo.pool_creation.clone().unwrap().3.clone(), - ), - tokens: todo - .pool_creation - .clone() - .unwrap() - .1 + instance: P::create_instance(strategy_contract, solver_contract, pool_creation.params), + tokens: pool_creation + .tokens .into_iter() .map(|t| ArbiterToken::new(t, client.clone())) .collect(), - liquidity_token: ERC20::new(todo.pool_creation.as_ref().unwrap().2, client.clone()), + liquidity_token: ERC20::new(pool_creation.liquidity_token, client.clone()), }; - debug!("Updater has built the pool."); - let processor = Self::Processor { token_admin: self.token_admin.clone(), data: Processing { @@ -136,14 +76,14 @@ where #[async_trait::async_trait] impl

Processor for Update> where - P: PoolType + Send + Sync, + P: PoolType + Serialize + Send + Sync, { async fn get_stream(&mut self) -> Result>> { Ok(Some(self.data.messager.stream()?)) } async fn process(&mut self, event: Message) -> Result { match serde_json::from_str(&event.data) { - Ok(UpdaterQuery::ApplyUpdate) => { + Ok(UpdateRequest::ApplyUpdate) => { let params = self.data.pool_params.pop_front().unwrap(); self.data.pool.update(params.clone()).await?; let _ = self @@ -162,6 +102,6 @@ where } #[derive(Serialize, Deserialize, Clone, Debug)] -pub enum UpdaterQuery { +pub enum UpdateRequest { ApplyUpdate, } diff --git a/kit/src/pool/mod.rs b/kit/src/pool/mod.rs index 4fc43e10..b7b5a303 100644 --- a/kit/src/pool/mod.rs +++ b/kit/src/pool/mod.rs @@ -36,30 +36,16 @@ pub struct BaseConfig { // All the other types will be specific to each pool/strategy type since those // will be specific contracts #[async_trait::async_trait] -pub trait PoolType: Clone + Debug + 'static { - // This trait provides the interface for people to construct pools from a - // `Configuration` state since all of this should be `Serialize` and - // `Deserialize`. This stuff ultimately will be what's used to deploy a - // `Pool` which will hold onto actual instances of contracts - // (whereas this just holds config data). +pub trait PoolType: Debug { type Parameters: Clone + Debug - + Serialize - + for<'de> Deserialize<'de> + Send - + Sync - + 'static - + ethers::abi::AbiDecode; - // ~~ These are the contracts that are used to interact with the pool. ~~ - type StrategyContract; - type SolverContract; - type AllocationData: Clone - + Debug - + Serialize + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + + Serialize + + ethers::abi::AbiDecode; + type StrategyContract: Send; + type SolverContract: Send; + type AllocationData: Clone + Debug + Send + for<'de> Deserialize<'de> + Serialize; async fn swap_data(&self, pool_id: eU256, swap: InputToken, amount_in: eU256) -> Result; /// Change Parameters @@ -278,3 +264,69 @@ impl Pool

{ Ok(()) } } + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +pub struct PoolCreation { + pub id: eU256, + pub tokens: Vec, + pub liquidity_token: eAddress, + #[serde(bound(deserialize = "P::Parameters: Deserialize<'de>"))] + pub params: P::Parameters, + #[serde(bound(deserialize = "P::Parameters: Deserialize<'de>"))] + pub allocation_data: P::AllocationData, +} + +// impl<'de, P: PoolType> Deserialize<'de> for PoolCreation

{ +// fn deserialize(deserializer: D) -> Result +// where +// D: serde::de::Deserializer<'de>, +// { +// let mut map = deserializer.deserialize_map(None)?; +// let mut id = None; +// let mut tokens = None; +// let mut liquidity_token = None; +// let mut params = None; +// let mut allocation_data = None; + +// while let Some(key) = map.next_key()? { +// match key { +// "id" => { +// id = Some(map.next_value()?); +// } +// "tokens" => { +// tokens = Some(map.next_value()?); +// } +// "liquidity_token" => { +// liquidity_token = Some(map.next_value()?); +// } +// "params" => { +// params = Some(map.next_value()?); +// } +// "allocation_data" => { +// allocation_data = Some(map.next_value()?); +// } +// _ => { +// // Ignore unknown fields +// let _ = map.next_value::(); +// } +// } +// } + +// let id = id.ok_or_else(|| serde::de::Error::missing_field("id"))?; +// let tokens = tokens.ok_or_else(|| +// serde::de::Error::missing_field("tokens"))?; let liquidity_token = +// liquidity_token.ok_or_else(|| +// serde::de::Error::missing_field("liquidity_token"))?; let params = +// params.ok_or_else(|| serde::de::Error::missing_field("params"))?; let +// allocation_data = allocation_data.ok_or_else(|| +// serde::de::Error::missing_field("allocation_data"))?; + +// Ok(PoolCreation { +// id, +// tokens, +// liquidity_token, +// params, +// allocation_data, +// }) +// } +// } diff --git a/kit/tests/common.rs b/kit/tests/common.rs index 751d3604..c87a30f1 100644 --- a/kit/tests/common.rs +++ b/kit/tests/common.rs @@ -12,7 +12,7 @@ use dfmm_kit::{ bindings::constant_sum_solver::ConstantSumParams, pool::{ constant_sum::{ConstantSumAllocationData, ConstantSumPool}, - BaseConfig, InputToken, + BaseConfig, InputToken, PoolCreation, }, TokenData, }; @@ -84,15 +84,7 @@ pub fn spawn_constant_sum_creator(world: &mut World) { } fn mock_swap_behavior() -> Swap, VanillaSwap, Message> { - let data: swap::Config = swap::Config { - base_config: mock_base_config(), - params: constant_sum_parameters(), - allocation_data: ConstantSumAllocationData { - reserve_x: RESERVE_X, - reserve_y: RESERVE_Y, - }, - token_list: vec![TOKEN_X_NAME.to_owned(), TOKEN_Y_NAME.to_owned()], - }; + let data = swap::Config::::default(); Swap::, VanillaSwap, Message> { token_admin: TOKEN_ADMIN.to_owned(), diff --git a/kit/tests/creator_integration.rs b/kit/tests/creator_integration.rs index 33b12472..bbe0b904 100644 --- a/kit/tests/creator_integration.rs +++ b/kit/tests/creator_integration.rs @@ -17,27 +17,26 @@ async fn run_creator_constant_sum() { let task = tokio::spawn(async move { loop { if let Ok(message) = messager - .get_next::>() + .get_next::>() .await { let data = message.data; info!("Saw message data: {:#?}", data); - let mock_creation = creator::PoolCreation:: { - id: data.id, - params: ConstantSumParams { - price: WAD, - swap_fee: ethers::utils::parse_ether(0.003).unwrap(), - controller: eAddress::zero(), - }, - allocation_data: ConstantSumAllocationData { - reserve_x: RESERVE_X, - reserve_y: RESERVE_Y, - }, + let id = data.id; + let params = ConstantSumParams { + price: WAD, + swap_fee: ethers::utils::parse_ether(0.003).unwrap(), + controller: eAddress::zero(), }; - assert_eq!(data.id, mock_creation.id); - assert_eq!(data.params, mock_creation.params); - assert_eq!(data.allocation_data, mock_creation.allocation_data); + let allocation_data = ConstantSumAllocationData { + reserve_x: RESERVE_X, + reserve_y: RESERVE_Y, + }; + + assert_eq!(data.id, id); + assert_eq!(data.params, params); + assert_eq!(data.allocation_data, allocation_data); info!("Asserts passed!"); break; } else { diff --git a/kit/tests/update_integration.rs b/kit/tests/update_integration.rs index cb5f40f1..3b753ef4 100644 --- a/kit/tests/update_integration.rs +++ b/kit/tests/update_integration.rs @@ -26,7 +26,7 @@ async fn run_updater_constant_sum() { messager .send( To::Agent(UPDATER.to_owned()), - update::UpdaterQuery::ApplyUpdate, + update::UpdateRequest::ApplyUpdate, ) .await .unwrap();