diff --git a/matching_engine/src/jobs/server.rs b/matching_engine/src/jobs/server.rs index 8651158..e1a1b3d 100644 --- a/matching_engine/src/jobs/server.rs +++ b/matching_engine/src/jobs/server.rs @@ -81,22 +81,24 @@ impl MatchingEngineServer { pub async fn start_server(self, port: u16, enable_ssc: bool) -> anyhow::Result<()> { let server = HttpServer::new(move || { - let ui_request_concurrency = ConcurrencyLimiter::new(2); + let max_requests = 25 as usize; + + let ui_request_concurrency = ConcurrencyLimiter::new(max_requests); let ui_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter( Duration::from_secs(1), - 10 as u64, + max_requests as u64, ); - let stats_request_concurrency = ConcurrencyLimiter::new(2); + let stats_request_concurrency = ConcurrencyLimiter::new(max_requests); let stats_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter( Duration::from_secs(1), - 10 as u64, + max_requests as u64, ); - let core_request_concurrency = ConcurrencyLimiter::new(10); + let core_request_concurrency = ConcurrencyLimiter::new(max_requests); let core_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter( Duration::from_secs(1), - 10 as u64, + max_requests as u64, ); #[cfg(not(feature = "matching_engine_enable_cors"))] diff --git a/slasher/src/main.rs b/slasher/src/main.rs index f64d5a9..526919e 100644 --- a/slasher/src/main.rs +++ b/slasher/src/main.rs @@ -57,19 +57,34 @@ struct Operator { address: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +struct Market { + market_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct MarketQueryResult { + result: Vec, +} + #[derive(Serialize, Deserialize, Debug, Clone)] struct OperatorQueryResult { result: Vec, } #[derive(Serialize, Deserialize, Debug, Clone)] -struct ActiveRequest { +struct Request { ask_id: String, } #[derive(Serialize, Deserialize, Debug, Clone)] struct SingleOperatorQueryResult { - active_jobs_list: Vec, + active_jobs_list: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct SingleMarketQueryResult { + unmatched_jobs: Vec, } impl SlashingInstance { @@ -115,120 +130,149 @@ impl SlashingInstance { }; for operator in operators.iter() { + sleep(Duration::from_secs(3)).await; let active_requests = self.active_requests(operator.address.clone()).await?; for active_request in active_requests.iter() { - log::warn!( - "Found Active Request with ask_id: {}", - active_request.ask_id - ); - let ask_id = match U256::from_dec_str(&active_request.ask_id) { - Ok(data) => data, - Err(err) => { - log::error!("{}", err.to_string()); - log::error!("Invalid Ask ID received"); - continue; - } - }; - let ask_state = match self.proof_marketplace.get_ask_state(ask_id).await { - Ok(data) => data, - Err(err) => { - log::error!("{}", err.to_string()); - log::error!("Failed Fetching Ask State"); - continue; - } - }; - - let ask_state = - matching_engine_helpers::ask_lib::ask_status::get_ask_state(ask_state); - - if ask_state == AskState::DeadlineCrossed || ask_state == AskState::Assigned { - let mut slashing_transaction = - self.proof_marketplace.slash_generator(ask_id); - - if cfg!(feature = "force_transactions") { - slashing_transaction = slashing_transaction.gas(10_000_000); - } - - let slashing_transaction = match slashing_transaction.send().await { - Ok(data) => data.confirmations(10), - Err(err) => { - log::error!("{}", err); - log::error!("failed sending the transaction"); - continue; - } - }; - - let slashing_transaction = match slashing_transaction.await { - Ok(data) => data, - Err(err) => { - log::error!("{}", err.to_string()); - log::error!("Failed broadcasting transaction"); - continue; - } - }; - - let slashing_transaction = match slashing_transaction { - Some(data) => data, - _ => { - log::warn!("Broadcasted transaction, but failed getting receipt"); - continue; - } - }; - - log::info!( - "Slashed Ask: {}, tx: {:?}", - ask_id, - slashing_transaction.transaction_hash - ); - } - - if ask_state == AskState::UnAssigned { - let mut cancellation_transaction = - self.proof_marketplace.cancel_ask(ask_id); - - if cfg!(feature = "force_transactions") { - cancellation_transaction = cancellation_transaction.gas(10_000_000); - } - - let cancellation_transaction = match cancellation_transaction.send().await { - Ok(data) => data.confirmations(10), - Err(err) => { - log::error!("{}", err); - log::error!("failed sending the transaction"); - continue; - } - }; - - let cancellation_transaction = match cancellation_transaction.await { - Ok(data) => data, - Err(err) => { - log::error!("{}", err.to_string()); - log::error!("Failed broadcasting transaction"); - continue; - } - }; - - let cancellation_transaction = match cancellation_transaction { - Some(data) => data, - _ => { - log::warn!("Broadcasted transaction, but failed getting receipt"); - continue; - } - }; - - log::info!( - "Cancelled Ask: {}, tx: {:?}", - ask_id, - cancellation_transaction.transaction_hash - ); - } + self._handle_request(active_request).await; + } + } + sleep(Duration::from_secs(3)).await; + + let markets = match self.get_markets().await { + Ok(data) => data, + Err(err) => { + log::error!("{}", err); + vec![] + } + }; + + for market in markets.iter() { + sleep(Duration::from_secs(3)).await; + let expired_requests = self.expired_requests(market.market_id.clone()).await?; + for expired_request in expired_requests.iter() { + self._handle_request(expired_request).await; } } - sleep(Duration::from_secs(10)).await; + + sleep(Duration::from_secs(3)).await; } } - async fn active_requests(&self, operator_address: String) -> Result> { + async fn _handle_request(&self, request: &Request) { + log::debug!("Found Active Request with ask_id: {}", request.ask_id); + let ask_id = match U256::from_dec_str(&request.ask_id) { + Ok(data) => data, + Err(err) => { + log::error!("{}", err.to_string()); + log::error!("Invalid Ask ID received"); + return; + } + }; + let ask_state = match self.proof_marketplace.get_ask_state(ask_id).await { + Ok(data) => data, + Err(err) => { + log::error!("{}", err.to_string()); + log::error!("Failed Fetching Ask State"); + return; + } + }; + + let ask_state = matching_engine_helpers::ask_lib::ask_status::get_ask_state(ask_state); + + if ask_state == AskState::DeadlineCrossed || ask_state == AskState::Assigned { + log::warn!( + "Request with ask_id: {}, State: {:?}", + request.ask_id, + ask_state + ); + + let mut slashing_transaction = self.proof_marketplace.slash_generator(ask_id); + + if cfg!(feature = "force_transactions") { + slashing_transaction = slashing_transaction.gas(10_000_000); + } + + let slashing_transaction = match slashing_transaction.send().await { + Ok(data) => data.confirmations(10), + Err(err) => { + log::error!("{}", err); + log::error!("failed sending the transaction"); + return; + } + }; + + let slashing_transaction = match slashing_transaction.await { + Ok(data) => data, + Err(err) => { + log::error!("{}", err.to_string()); + log::error!("Failed broadcasting transaction"); + return; + } + }; + + let slashing_transaction = match slashing_transaction { + Some(data) => data, + _ => { + log::warn!("Broadcasted transaction, but failed getting receipt"); + return; + } + }; + + log::info!( + "Slashed Ask: {}, tx: {:?}", + ask_id, + slashing_transaction.transaction_hash + ); + } + + if ask_state == AskState::UnAssigned { + log::warn!( + "Request with ask_id: {}, State: {:?}", + request.ask_id, + ask_state + ); + + let mut cancellation_transaction = self.proof_marketplace.cancel_ask(ask_id); + + if cfg!(feature = "force_transactions") { + cancellation_transaction = cancellation_transaction.gas(10_000_000); + } + + let cancellation_transaction = match cancellation_transaction.send().await { + Ok(data) => data.confirmations(10), + Err(err) => { + log::error!("{}", err); + log::error!("failed sending the transaction"); + return; + } + }; + + let cancellation_transaction = match cancellation_transaction.await { + Ok(data) => data, + Err(err) => { + log::error!("{}", err.to_string()); + log::error!("Failed broadcasting transaction"); + return; + } + }; + + let cancellation_transaction = match cancellation_transaction { + Some(data) => data, + _ => { + log::warn!("Broadcasted transaction, but failed getting receipt"); + return; + } + }; + + log::info!( + "Cancelled Ask: {}, tx: {:?}", + ask_id, + cancellation_transaction.transaction_hash + ); + } + } + + async fn active_requests(&self, operator_address: String) -> Result> { let url = format!("{}/ui/generator/{}", self.indexer_url, operator_address); let response = self.client.get(&url).send().await?; @@ -257,4 +301,34 @@ impl SlashingInstance { return Ok(vec![]); } } + + async fn get_markets(&self) -> Result> { + let url = format!("{}/ui/markets", self.indexer_url); + let response = self.client.get(&url).send().await?; + + if response.status().is_success() { + let markets: MarketQueryResult = response.json().await?; + return Ok(markets.result); + } else { + log::error!("Failed to fetch generators: HTTP {}", response.status()); + return Ok(vec![]); + } + } + + async fn expired_requests(&self, market_id: String) -> Result> { + let url = format!("{}/ui/market/{}", self.indexer_url, market_id); + let response = self.client.get(&url).send().await?; + + if response.status().is_success() { + let market_query_result: SingleMarketQueryResult = response.json().await?; + return Ok(market_query_result.unmatched_jobs); + } else { + log::error!( + "Failed to fetch market/{}: HTTP {}", + market_id, + response.status() + ); + return Ok(vec![]); + } + } }