From d9cf8a4c983e9ed2b2d25e01fced0a6f226e38ce Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Tue, 23 Apr 2024 14:35:27 -0600 Subject: [PATCH] refactor current behaviors Some tests are now failing. Will work on these in the next commit. --- Cargo.lock | 8 ++-- kit/Cargo.toml | 8 ++-- kit/src/behaviors/allocate/mod.rs | 21 +++------- kit/src/behaviors/creator.rs | 10 ++--- kit/src/behaviors/deploy.rs | 30 +------------ kit/src/behaviors/mod.rs | 5 +-- kit/src/behaviors/swap/mod.rs | 70 +++++++++++++------------------ kit/src/behaviors/token.rs | 42 +++++++------------ kit/src/behaviors/update/mod.rs | 47 +++++++-------------- kit/tests/common.rs | 18 +------- kit/tests/swap_integration.rs | 3 +- kit/tests/token_integration.rs | 2 +- kit/tests/update_integration.rs | 4 +- 13 files changed, 86 insertions(+), 182 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0a90d64..a25c73d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,7 @@ checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" [[package]] name = "arbiter-bindings" version = "0.1.6" -source = "git+https://github.com/primitivefinance/arbiter.git?rev=aff29d30#aff29d30daea200dcb4f889b1296e39064a3da1a" +source = "git+https://github.com/primitivefinance/arbiter.git?rev=360f4a7f#360f4a7f38667e761f8306ec140418eceab1bccd" dependencies = [ "ethers", "serde", @@ -194,7 +194,7 @@ dependencies = [ [[package]] name = "arbiter-core" version = "0.10.3" -source = "git+https://github.com/primitivefinance/arbiter.git?rev=aff29d30#aff29d30daea200dcb4f889b1296e39064a3da1a" +source = "git+https://github.com/primitivefinance/arbiter.git?rev=360f4a7f#360f4a7f38667e761f8306ec140418eceab1bccd" dependencies = [ "arbiter-bindings", "async-stream", @@ -222,7 +222,7 @@ dependencies = [ [[package]] name = "arbiter-engine" version = "0.3.2" -source = "git+https://github.com/primitivefinance/arbiter.git?rev=aff29d30#aff29d30daea200dcb4f889b1296e39064a3da1a" +source = "git+https://github.com/primitivefinance/arbiter.git?rev=360f4a7f#360f4a7f38667e761f8306ec140418eceab1bccd" dependencies = [ "anyhow", "arbiter-bindings", @@ -246,7 +246,7 @@ dependencies = [ [[package]] name = "arbiter-macros" version = "0.1.3" -source = "git+https://github.com/primitivefinance/arbiter.git?rev=aff29d30#aff29d30daea200dcb4f889b1296e39064a3da1a" +source = "git+https://github.com/primitivefinance/arbiter.git?rev=360f4a7f#360f4a7f38667e761f8306ec140418eceab1bccd" dependencies = [ "quote", "syn 2.0.58", diff --git a/kit/Cargo.toml b/kit/Cargo.toml index 441fae5b..6dc656ed 100644 --- a/kit/Cargo.toml +++ b/kit/Cargo.toml @@ -9,10 +9,10 @@ keywords = ["ethereum", "smart-contracts", "automated market makers"] readme = "../README.md" [dependencies] -arbiter-core = { git = "https://github.com/primitivefinance/arbiter.git", rev = "aff29d30" } -arbiter-engine = { git = "https://github.com/primitivefinance/arbiter.git", rev = "aff29d30" } -arbiter-macros = { git = "https://github.com/primitivefinance/arbiter.git", rev = "aff29d30" } -arbiter-bindings = { git = "https://github.com/primitivefinance/arbiter.git", rev = "aff29d30" } +arbiter-core = { git = "https://github.com/primitivefinance/arbiter.git", rev = "360f4a7f" } +arbiter-engine = { git = "https://github.com/primitivefinance/arbiter.git", rev = "360f4a7f" } +arbiter-macros = { git = "https://github.com/primitivefinance/arbiter.git", rev = "360f4a7f" } +arbiter-bindings = { git = "https://github.com/primitivefinance/arbiter.git", rev = "360f4a7f" } # Ethereum ethers = "2.0.13" diff --git a/kit/src/behaviors/allocate/mod.rs b/kit/src/behaviors/allocate/mod.rs index 63e97d0e..c8911bc5 100644 --- a/kit/src/behaviors/allocate/mod.rs +++ b/kit/src/behaviors/allocate/mod.rs @@ -8,7 +8,6 @@ where // annoying fn change_allocation_amount(&mut self, event: E) -> // Option; fn change_allocation_amount(&mut self, event: E) -> Option>; - fn get_stream(&self) -> Pin + Send + Sync>>; } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -24,11 +23,12 @@ where _phantom_e: PhantomData, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, State)] pub struct Config { pub allocation_data: P::AllocationData, } +#[derive(State)] pub struct Processing where P: PoolType, @@ -40,18 +40,6 @@ where _phantom: PhantomData, } -impl State for Config

{ - type Data = Self; -} - -impl State for Processing -where - P: PoolType, - E: Send + 'static, -{ - type Data = Self; -} - #[allow(unused_variables)] #[async_trait::async_trait] impl Behavior for Allocate> @@ -65,7 +53,7 @@ where &mut self, client: Arc, messager: Messager, - ) -> Result)>> { + ) -> Result { todo!(); } } @@ -77,6 +65,9 @@ where P: PoolType + Debug + Send + Sync + 'static, E: Debug + Send + Sync + 'static, { + async fn get_stream(&mut self) -> Result>> { + todo!("We have not implemented the 'get_stream' method yet for the 'Allocate' behavior."); + } async fn process(&mut self, _event: E) -> Result { Ok(ControlFlow::Halt) } diff --git a/kit/src/behaviors/creator.rs b/kit/src/behaviors/creator.rs index 3cbdef29..ddd075df 100644 --- a/kit/src/behaviors/creator.rs +++ b/kit/src/behaviors/creator.rs @@ -7,7 +7,7 @@ pub struct Create { pub data: S::Data, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, State)] pub struct Config { pub base_config: BaseConfig, pub params: P::Parameters, @@ -15,10 +15,6 @@ pub struct Config { pub token_list: Vec, } -impl State for Config

{ - type Data = Self; -} - #[async_trait::async_trait] impl

Behavior<()> for Create> where @@ -31,7 +27,7 @@ where &mut self, client: Arc, mut messager: Messager, - ) -> Result)>> { + ) -> Result { // Receive the `DeploymentData` from the `Deployer` agent and use it to get the // contracts. debug!("Starting the creator"); @@ -112,7 +108,7 @@ where self.data.allocation_data.clone(), ); messager.send(To::All, pool_creation).await.unwrap(); - Ok(None) + Ok(()) } } diff --git a/kit/src/behaviors/deploy.rs b/kit/src/behaviors/deploy.rs index a2106dc2..cb4b0cd3 100644 --- a/kit/src/behaviors/deploy.rs +++ b/kit/src/behaviors/deploy.rs @@ -35,58 +35,34 @@ impl Behavior<()> for Deploy { &mut self, client: Arc, messager: Messager, - ) -> Result)>> { + ) -> Result { let weth = WETH::deploy(client.clone(), ())?.send().await?; - trace!("WETH deployed at {:?}", weth.address()); - let dfmm = DFMM::deploy(client.clone(), weth.address())?.send().await?; - trace!("DFMM deployed at {:?}", dfmm.address()); - let geometric_mean = GeometricMean::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!("GeometricMean deployed at {:?}", geometric_mean.address()); - let geometric_mean_solver = GeometricMeanSolver::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!( - "GeometricMeanSolver deployed at {:?}", - geometric_mean.address() - ); - let log_normal = LogNormal::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!("LogNormal deployed at {:?}", log_normal.address()); - let log_normal_solver = LogNormalSolver::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!( - "LogNormalSolver deployed at {:?}", - log_normal_solver.address() - ); - let constant_sum = ConstantSum::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!("ConstantSum deployed at {:?}", constant_sum.address()); - let constant_sum_solver = ConstantSumSolver::deploy(client.clone(), dfmm.address())? .send() .await?; - trace!("ConstantSumSolver deployed at {:?}", constant_sum.address()); - let n_token_geometric_mean = NTokenGeometricMean::deploy(client.clone(), dfmm.address())? .send() .await?; - let n_token_geometric_mean_solver = NTokenGeometricMeanSolver::deploy(client.clone(), dfmm.address())? .send() .await?; - let deployment_data = DeploymentData { weth: weth.address(), dfmm: dfmm.address(), @@ -99,10 +75,8 @@ impl Behavior<()> for Deploy { constant_sum: constant_sum.address(), constant_sum_solver: constant_sum_solver.address(), }; - debug!("Deployments completed: {:#?}", deployment_data); - messager.send(To::All, deployment_data).await?; - Ok(None) + Ok(()) } } diff --git a/kit/src/behaviors/mod.rs b/kit/src/behaviors/mod.rs index dc340a2c..25d89828 100644 --- a/kit/src/behaviors/mod.rs +++ b/kit/src/behaviors/mod.rs @@ -1,13 +1,12 @@ -use std::{boxed::Box, marker::PhantomData, pin::Pin, sync::Arc}; +use std::{boxed::Box, marker::PhantomData, sync::Arc}; use arbiter_engine::{ machine::{Behavior, ControlFlow, EventStream, Processor, State}, messager::{Message, Messager, To}, }; #[allow(unused)] -use arbiter_macros::Behaviors; +use arbiter_macros::{Behaviors, State}; use bindings::{arbiter_token::ArbiterToken, dfmm::DFMM}; -use futures_util::Stream; pub use token::{MintRequest, TokenAdminQuery}; use self::{ diff --git a/kit/src/behaviors/swap/mod.rs b/kit/src/behaviors/swap/mod.rs index 37a614f9..bdae7ebf 100644 --- a/kit/src/behaviors/swap/mod.rs +++ b/kit/src/behaviors/swap/mod.rs @@ -4,14 +4,6 @@ use crate::behaviors::token::Response; pub trait SwapType: Debug + Serialize + Clone { fn compute_swap_amount(event: E) -> (eU256, InputToken); - // TODO: Put this on the processor in arbiter engine so that startups just - // return a proccess - fn get_stream( - &self, - _messager: Messager, - ) -> Option + Send + Sync>>> { - None - } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -24,7 +16,7 @@ pub struct Swap, E> { pub _phantom: PhantomData, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, State)] pub struct Config { pub base_config: BaseConfig, pub params: P::Parameters, @@ -32,24 +24,13 @@ pub struct Config { pub token_list: Vec, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, State)] pub struct Processing { pub messager: Messager, pub client: Arc, pub pool: Pool

, } -impl State for Config

{ - type Data = Self; -} - -impl

State for Processing

-where - P: PoolType, -{ - type Data = Self; -} - #[derive(Debug)] struct SwapTodo { deployment_data: Option, @@ -64,7 +45,7 @@ struct SwapTodo { } #[async_trait::async_trait] -impl Behavior for Swap, T, E> +impl Behavior<()> for Swap, T, E> where P: PoolType + Send + Sync + 'static, P::StrategyContract: Send, @@ -72,12 +53,13 @@ where T: SwapType + Send + Sync + 'static + for<'a> Deserialize<'a>, E: Debug + Send + Sync + 'static, { - type Processor = Swap, T, E>; + // type Processor = Swap, T, E>; + type Processor = (); async fn startup( &mut self, client: Arc, mut messager: Messager, - ) -> Result)>> { + ) -> Result { // Make a "TODO" list. // This is the data I need to recieve to do my job let mut todo: SwapTodo

= SwapTodo { @@ -176,24 +158,25 @@ where tokens, liquidity_token: lp_token, }; - - match self.swap_type.get_stream(messager.clone()) { - Some(stream) => { - let process = Self::Processor { - token_admin: self.token_admin.clone(), - update: self.update.clone(), - data: Processing { - messager, - client, - pool, - }, - swap_type: self.swap_type.clone(), - _phantom: PhantomData::, - }; - Ok(Some((process, stream))) - } - None => Ok(None), - } + // TODO: We need to come back around and adjust this. + // match self.swap_type.get_stream(messager.clone()) { + // Some(stream) => { + // let process = Self::Processor { + // token_admin: self.token_admin.clone(), + // update: self.update.clone(), + // data: Processing { + // messager, + // client, + // pool, + // }, + // swap_type: self.swap_type.clone(), + // _phantom: PhantomData::, + // }; + // Ok(Some((process, stream))) + // } + // None => Ok(None), + // } + Ok(()) } } @@ -204,6 +187,9 @@ where T: SwapType + Send + Sync + 'static, E: Send + Sync + 'static, { + async fn get_stream(&mut self) -> Result>> { + todo!("We have not implemented the 'get_stream' method yet for the 'Swap' behavior.") + } async fn process(&mut self, event: E) -> Result { let (swap_amount, input) = T::compute_swap_amount(event); self.data.pool.swap(swap_amount, input).await?; diff --git a/kit/src/behaviors/token.rs b/kit/src/behaviors/token.rs index f181602e..390e9db9 100644 --- a/kit/src/behaviors/token.rs +++ b/kit/src/behaviors/token.rs @@ -9,26 +9,18 @@ pub struct TokenAdmin { pub data: S::Data, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, State)] pub struct Config { pub token_data: Vec, } -impl State for Config { - type Data = Self; -} - -#[derive(Debug, Clone)] +#[derive(Debug, Clone, State)] pub struct Processing { pub messager: Messager, pub client: Arc, pub tokens: HashMap)>, } -impl State for Processing { - type Data = Self; -} - #[async_trait::async_trait] impl Behavior for TokenAdmin { type Processor = TokenAdmin; @@ -36,7 +28,7 @@ impl Behavior for TokenAdmin { &mut self, client: Arc, messager: Messager, - ) -> Result)>> { + ) -> Result { let mut tokens = HashMap::new(); for token_data in self.data.token_data.drain(..) { let token = ArbiterToken::deploy( @@ -56,16 +48,13 @@ impl Behavior for TokenAdmin { debug!("Tokens deployed {:#?}", tokens); - let process = Self::Processor { + Ok(Self::Processor { data: Processing { messager, client, tokens, }, - }; - - let stream = process.data.messager.clone().stream()?; - Ok(Some((process, stream))) + }) } } @@ -73,24 +62,26 @@ impl Behavior for TokenAdmin { // easier. Would be nice to add this in arbiter_engine. #[async_trait::async_trait] impl Processor for TokenAdmin { + async fn get_stream(&mut self) -> Result>> { + Ok(Some(self.data.messager.stream()?)) + } + async fn process(&mut self, event: Message) -> Result { - let query: TokenAdminQuery = - serde_json::from_str(&event.data).unwrap_or(TokenAdminQuery::NoOp); - match query { - TokenAdminQuery::AddressOf(token_name) => { + match serde_json::from_str(&event.data) { + Ok(TokenAdminQuery::AddressOf(token_name)) => { self.reply_address_of(token_name, event.from).await?; } - TokenAdminQuery::MintRequest(mint_request) => { + Ok(TokenAdminQuery::MintRequest(mint_request)) => { self.reply_mint_request(mint_request, event.from).await?; } - TokenAdminQuery::GetAssetUniverse => { + Ok(TokenAdminQuery::GetAssetUniverse) => { self.reply_get_asset_universe(event.from).await?; } - TokenAdminQuery::GetTokenData(token_name) => { + Ok(TokenAdminQuery::GetTokenData(token_name)) => { self.reply_token_data(token_name, event.from).await?; } - TokenAdminQuery::NoOp => { - debug!("NoOp: {:?}", event); + _ => { + debug!("TokenAdmin got some other message variant it could ignore."); } } Ok(ControlFlow::Continue) @@ -160,7 +151,6 @@ pub enum TokenAdminQuery { MintRequest(MintRequest), GetAssetUniverse, GetTokenData(String), - NoOp, } /// Used as an action to mint tokens. diff --git a/kit/src/behaviors/update/mod.rs b/kit/src/behaviors/update/mod.rs index c2c7ff16..0c114920 100644 --- a/kit/src/behaviors/update/mod.rs +++ b/kit/src/behaviors/update/mod.rs @@ -11,7 +11,7 @@ pub struct Update { pub data: S::Data, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, State)] pub struct Config { pub base_config: BaseConfig, pub allocation_data: P::AllocationData, @@ -19,7 +19,7 @@ pub struct Config { pub params: VecDeque, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, State)] pub struct Processing { pub messager: Messager, pub client: Arc, @@ -27,17 +27,6 @@ pub struct Processing { pub pool_params: VecDeque, } -impl State for Config

{ - type Data = Self; -} - -impl

State for Processing

-where - P: PoolType, -{ - type Data = Self; -} - type PoolId = eU256; type TokenList = Vec; type LiquidityToken = eAddress; @@ -68,7 +57,7 @@ where &mut self, client: Arc, mut messager: Messager, - ) -> Result)>> { + ) -> Result { // Make a "TODO" list. // This is the data I need to recieve to do my job let mut todo: UpdateTodo

= UpdateTodo { @@ -131,17 +120,16 @@ where debug!("Updater has built the pool."); - let process = Self::Processor { + let processor = Self::Processor { token_admin: self.token_admin.clone(), data: Processing { - messager: messager.clone(), + messager, client, pool, pool_params: self.data.params.clone(), }, }; - let stream = messager.stream()?; - Ok(Some((process, stream))) + Ok(processor) } } @@ -150,14 +138,12 @@ impl

Processor for Update> where P: PoolType + Send + Sync, { + async fn get_stream(&mut self) -> Result>> { + Ok(Some(self.data.messager.stream()?)) + } async fn process(&mut self, event: Message) -> Result { - warn!("Process: Got event: {:?}", event); - let msg: UpdatoorQuerry = serde_json::from_str(&event.data).unwrap_or(UpdatoorQuerry::NoOp); - - warn!("Process: deserialized update querry: {:?}", msg); - - match msg { - UpdatoorQuerry::UpdateMeDaddy => { + match serde_json::from_str(&event.data) { + Ok(UpdaterQuery::ApplyUpdate) => { let params = self.data.pool_params.pop_front().unwrap(); self.data.pool.update(params.clone()).await?; let _ = self @@ -167,18 +153,15 @@ where .await?; info!("Successfully updated!"); } - - UpdatoorQuerry::NoOp => { - debug!("NoOp"); + Err(e) => { + warn!("Failed to parse message: {}", e); } } - Ok(ControlFlow::Continue) } } #[derive(Serialize, Deserialize, Clone, Debug)] -pub enum UpdatoorQuerry { - NoOp, - UpdateMeDaddy, +pub enum UpdaterQuery { + ApplyUpdate, } diff --git a/kit/tests/common.rs b/kit/tests/common.rs index f922e832..751d3604 100644 --- a/kit/tests/common.rs +++ b/kit/tests/common.rs @@ -1,10 +1,6 @@ use std::{collections::VecDeque, marker::PhantomData}; -use arbiter_engine::{ - agent::Agent, - messager::{Message, Messager}, - world::World, -}; +use arbiter_engine::{agent::Agent, messager::Message, world::World}; use dfmm_kit::{ behaviors::{ creator::{self, Create}, @@ -20,10 +16,7 @@ use dfmm_kit::{ }, TokenData, }; -use ethers::{ - abi::ethereum_types::BloomInput, - types::{Address as eAddress, U256 as eU256}, -}; +use ethers::types::{Address as eAddress, U256 as eU256}; use serde::{Deserialize, Serialize}; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -117,13 +110,6 @@ impl SwapType for VanillaSwap { fn compute_swap_amount(_event: Message) -> (eU256, dfmm_kit::pool::InputToken) { (ethers::utils::parse_ether(0.5).unwrap(), InputToken::TokenY) } - - fn get_stream( - &self, - messager: Messager, - ) -> Option + Send + Sync>>> { - Some(messager.stream().unwrap()) - } } fn mock_token_admin_behavior() -> TokenAdmin { diff --git a/kit/tests/swap_integration.rs b/kit/tests/swap_integration.rs index 3524a775..217b564a 100644 --- a/kit/tests/swap_integration.rs +++ b/kit/tests/swap_integration.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use arbiter_engine::messager::To; use dfmm_kit::behaviors::MessageTypes; use futures_util::StreamExt; use tracing::{info, warn}; @@ -11,7 +10,7 @@ async fn run_updater_constant_sum() { log(Level::DEBUG); let mut world = World::new("test"); - let messager = world.messager.for_agent("test"); + let mut messager = world.messager.for_agent("test"); spawn_deployer(&mut world); spawn_token_admin(&mut world); diff --git a/kit/tests/token_integration.rs b/kit/tests/token_integration.rs index b099fd8f..f27cf504 100644 --- a/kit/tests/token_integration.rs +++ b/kit/tests/token_integration.rs @@ -12,7 +12,7 @@ async fn run_token_admin() { log(Level::DEBUG); let mut world = World::new("test"); - let messager = world.messager.for_agent("test"); + let mut messager = world.messager.for_agent("test"); spawn_deployer(&mut world); spawn_token_admin(&mut world); diff --git a/kit/tests/update_integration.rs b/kit/tests/update_integration.rs index 79cdb92e..cb5f40f1 100644 --- a/kit/tests/update_integration.rs +++ b/kit/tests/update_integration.rs @@ -11,7 +11,7 @@ async fn run_updater_constant_sum() { log(Level::DEBUG); let mut world = World::new("test"); - let messager = world.messager.for_agent("test"); + let mut messager = world.messager.for_agent("test"); spawn_deployer(&mut world); spawn_token_admin(&mut world); @@ -26,7 +26,7 @@ async fn run_updater_constant_sum() { messager .send( To::Agent(UPDATER.to_owned()), - update::UpdatoorQuerry::UpdateMeDaddy, + update::UpdaterQuery::ApplyUpdate, ) .await .unwrap();