Skip to content

Commit

Permalink
Add cancellation handling in slasher
Browse files Browse the repository at this point in the history
  • Loading branch information
akshay111meher committed Dec 2, 2024
1 parent 4fb16ce commit 5e3514e
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 115 deletions.
14 changes: 8 additions & 6 deletions matching_engine/src/jobs/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,24 @@ impl MatchingEngineServer {

pub async fn start_server(self, port: u16, enable_ssc: bool) -> anyhow::Result<()> {
let server = HttpServer::new(move || {
let ui_request_concurrency = ConcurrencyLimiter::new(2);
let max_requests = 25 as usize;

let ui_request_concurrency = ConcurrencyLimiter::new(max_requests);
let ui_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter(
Duration::from_secs(1),
10 as u64,
max_requests as u64,
);

let stats_request_concurrency = ConcurrencyLimiter::new(2);
let stats_request_concurrency = ConcurrencyLimiter::new(max_requests);
let stats_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter(
Duration::from_secs(1),
10 as u64,
max_requests as u64,
);

let core_request_concurrency = ConcurrencyLimiter::new(10);
let core_request_concurrency = ConcurrencyLimiter::new(max_requests);
let core_rate_limiter = kalypso_helper::middlewares::ratelimiter::get_rate_limiter(
Duration::from_secs(1),
10 as u64,
max_requests as u64,
);

#[cfg(not(feature = "matching_engine_enable_cors"))]
Expand Down
292 changes: 183 additions & 109 deletions slasher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,34 @@ struct Operator {
address: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct Market {
market_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct MarketQueryResult {
result: Vec<Market>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct OperatorQueryResult {
result: Vec<Operator>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct ActiveRequest {
struct Request {
ask_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct SingleOperatorQueryResult {
active_jobs_list: Vec<ActiveRequest>,
active_jobs_list: Vec<Request>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct SingleMarketQueryResult {
unmatched_jobs: Vec<Request>,
}

impl SlashingInstance {
Expand Down Expand Up @@ -115,120 +130,149 @@ impl SlashingInstance {
};

for operator in operators.iter() {
sleep(Duration::from_secs(3)).await;
let active_requests = self.active_requests(operator.address.clone()).await?;
for active_request in active_requests.iter() {
log::warn!(
"Found Active Request with ask_id: {}",
active_request.ask_id
);
let ask_id = match U256::from_dec_str(&active_request.ask_id) {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Invalid Ask ID received");
continue;
}
};
let ask_state = match self.proof_marketplace.get_ask_state(ask_id).await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed Fetching Ask State");
continue;
}
};

let ask_state =
matching_engine_helpers::ask_lib::ask_status::get_ask_state(ask_state);

if ask_state == AskState::DeadlineCrossed || ask_state == AskState::Assigned {
let mut slashing_transaction =
self.proof_marketplace.slash_generator(ask_id);

if cfg!(feature = "force_transactions") {
slashing_transaction = slashing_transaction.gas(10_000_000);
}

let slashing_transaction = match slashing_transaction.send().await {
Ok(data) => data.confirmations(10),
Err(err) => {
log::error!("{}", err);
log::error!("failed sending the transaction");
continue;
}
};

let slashing_transaction = match slashing_transaction.await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed broadcasting transaction");
continue;
}
};

let slashing_transaction = match slashing_transaction {
Some(data) => data,
_ => {
log::warn!("Broadcasted transaction, but failed getting receipt");
continue;
}
};

log::info!(
"Slashed Ask: {}, tx: {:?}",
ask_id,
slashing_transaction.transaction_hash
);
}

if ask_state == AskState::UnAssigned {
let mut cancellation_transaction =
self.proof_marketplace.cancel_ask(ask_id);

if cfg!(feature = "force_transactions") {
cancellation_transaction = cancellation_transaction.gas(10_000_000);
}

let cancellation_transaction = match cancellation_transaction.send().await {
Ok(data) => data.confirmations(10),
Err(err) => {
log::error!("{}", err);
log::error!("failed sending the transaction");
continue;
}
};

let cancellation_transaction = match cancellation_transaction.await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed broadcasting transaction");
continue;
}
};

let cancellation_transaction = match cancellation_transaction {
Some(data) => data,
_ => {
log::warn!("Broadcasted transaction, but failed getting receipt");
continue;
}
};

log::info!(
"Cancelled Ask: {}, tx: {:?}",
ask_id,
cancellation_transaction.transaction_hash
);
}
self._handle_request(active_request).await;
}
}
sleep(Duration::from_secs(3)).await;

let markets = match self.get_markets().await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err);
vec![]
}
};

for market in markets.iter() {
sleep(Duration::from_secs(3)).await;
let expired_requests = self.expired_requests(market.market_id.clone()).await?;
for expired_request in expired_requests.iter() {
self._handle_request(expired_request).await;
}
}
sleep(Duration::from_secs(10)).await;

sleep(Duration::from_secs(3)).await;
}
}

async fn active_requests(&self, operator_address: String) -> Result<Vec<ActiveRequest>> {
async fn _handle_request(&self, request: &Request) {
log::debug!("Found Active Request with ask_id: {}", request.ask_id);
let ask_id = match U256::from_dec_str(&request.ask_id) {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Invalid Ask ID received");
return;
}
};
let ask_state = match self.proof_marketplace.get_ask_state(ask_id).await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed Fetching Ask State");
return;
}
};

let ask_state = matching_engine_helpers::ask_lib::ask_status::get_ask_state(ask_state);

if ask_state == AskState::DeadlineCrossed || ask_state == AskState::Assigned {
log::warn!(
"Request with ask_id: {}, State: {:?}",
request.ask_id,
ask_state
);

let mut slashing_transaction = self.proof_marketplace.slash_generator(ask_id);

if cfg!(feature = "force_transactions") {
slashing_transaction = slashing_transaction.gas(10_000_000);
}

let slashing_transaction = match slashing_transaction.send().await {
Ok(data) => data.confirmations(10),
Err(err) => {
log::error!("{}", err);
log::error!("failed sending the transaction");
return;
}
};

let slashing_transaction = match slashing_transaction.await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed broadcasting transaction");
return;
}
};

let slashing_transaction = match slashing_transaction {
Some(data) => data,
_ => {
log::warn!("Broadcasted transaction, but failed getting receipt");
return;
}
};

log::info!(
"Slashed Ask: {}, tx: {:?}",
ask_id,
slashing_transaction.transaction_hash
);
}

if ask_state == AskState::UnAssigned {
log::warn!(
"Request with ask_id: {}, State: {:?}",
request.ask_id,
ask_state
);

let mut cancellation_transaction = self.proof_marketplace.cancel_ask(ask_id);

if cfg!(feature = "force_transactions") {
cancellation_transaction = cancellation_transaction.gas(10_000_000);
}

let cancellation_transaction = match cancellation_transaction.send().await {
Ok(data) => data.confirmations(10),
Err(err) => {
log::error!("{}", err);
log::error!("failed sending the transaction");
return;
}
};

let cancellation_transaction = match cancellation_transaction.await {
Ok(data) => data,
Err(err) => {
log::error!("{}", err.to_string());
log::error!("Failed broadcasting transaction");
return;
}
};

let cancellation_transaction = match cancellation_transaction {
Some(data) => data,
_ => {
log::warn!("Broadcasted transaction, but failed getting receipt");
return;
}
};

log::info!(
"Cancelled Ask: {}, tx: {:?}",
ask_id,
cancellation_transaction.transaction_hash
);
}
}

async fn active_requests(&self, operator_address: String) -> Result<Vec<Request>> {
let url = format!("{}/ui/generator/{}", self.indexer_url, operator_address);
let response = self.client.get(&url).send().await?;

Expand Down Expand Up @@ -257,4 +301,34 @@ impl SlashingInstance {
return Ok(vec![]);
}
}

async fn get_markets(&self) -> Result<Vec<Market>> {
let url = format!("{}/ui/markets", self.indexer_url);
let response = self.client.get(&url).send().await?;

if response.status().is_success() {
let markets: MarketQueryResult = response.json().await?;
return Ok(markets.result);
} else {
log::error!("Failed to fetch generators: HTTP {}", response.status());
return Ok(vec![]);
}
}

async fn expired_requests(&self, market_id: String) -> Result<Vec<Request>> {
let url = format!("{}/ui/market/{}", self.indexer_url, market_id);
let response = self.client.get(&url).send().await?;

if response.status().is_success() {
let market_query_result: SingleMarketQueryResult = response.json().await?;
return Ok(market_query_result.unmatched_jobs);
} else {
log::error!(
"Failed to fetch market/{}: HTTP {}",
market_id,
response.status()
);
return Ok(vec![]);
}
}
}

0 comments on commit 5e3514e

Please sign in to comment.