Skip to content

Commit

Permalink
create automatic backups inside enclave
Browse files Browse the repository at this point in the history
  • Loading branch information
akshay111meher committed Dec 12, 2024
1 parent 93d274d commit 2b0327e
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 35 deletions.
2 changes: 2 additions & 0 deletions matching_engine/src/generator_lib/generator_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ pub struct SlashingRecord {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Hash)]
pub struct WithdrawlRequest {
pub account: Address,
pub token: Address,
pub amount: U256,
pub index: U256,
}

Expand Down
72 changes: 71 additions & 1 deletion matching_engine/src/jobs/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
};
use tokio::sync::RwLock;

use crate::log_processor;
use crate::{log_processor, DumpSend};

#[cfg(not(feature = "disable_match_creation"))]
use crate::{
Expand Down Expand Up @@ -162,7 +162,10 @@ impl LogParser {

pub async fn parse(&self) -> anyhow::Result<()> {
let mut matches_upto: Option<U64> = None;
let mut loop_count: usize = 0;
loop {
loop_count += 1;

if self.should_stop.load(Ordering::Acquire) {
log::info!("Gracefully shutting down...");
break;
Expand All @@ -177,6 +180,73 @@ impl LogParser {
}
};

if loop_count % 100 == 0 {
// make backup here
let market_store = self.shared_market_store.read().await;
let ask_store = self.shared_local_ask_store.read().await;
let generator_store = self.shared_generator_store.read().await;
let native_store = self.shared_native_stake_store.read().await;
let symbiotic_store = self.shared_symbiotic_stake_store.read().await;
let cost_store = self.shared_cost_store.read().await;
let key_store = self.shared_key_store.read().await;
let stake_manager_store = self.shared_stake_manager_store.read().await;
let parsed_block = self.start_block.read().await;

let dump = DumpSend {
market_metadata_store: Some(&*market_store),
local_ask_store: Some(&*ask_store),
generator_store: Some(&*generator_store),
native_staking_store: Some(&*native_store),
symbiotic_stake_store: Some(&*symbiotic_store),
cost_store: Some(&*cost_store),
key_store: Some(&*key_store),
stake_manager_store: Some(&*stake_manager_store),
parsed_block: Some(&*parsed_block),
};

use std::path::Path;
use tokio::fs;
use tokio::io::AsyncWriteExt;

// Serialize DumpSend to JSON
match serde_json::to_string_pretty(&dump) {
Ok(json_string) => {
// Define the file path
let path = Path::new("./matching_engine_config/dump.json");

// Ensure the directory exists
if let Some(parent) = path.parent() {
if let Err(e) = fs::create_dir_all(parent).await {
log::error!("Failed to create directory {:?}: {}", parent, e);
// Handle the error as needed, e.g., continue or return
}
}

// Write the JSON string to the file asynchronously
match fs::File::create(&path).await {
Ok(mut file) => {
if let Err(e) = file.write_all(json_string.as_bytes()).await {
log::error!("Failed to write to file {:?}: {}", path, e);
// Handle the error as needed
} else {
log::info!("Successfully backed up dump to {:?}", path);
}
}
Err(e) => {
log::error!("Failed to create file {:?}: {}", path, e);
// Handle the error as needed
}
}
}
Err(e) => {
log::error!("Failed to serialize DumpSend: {}", e);
// Handle the serialization error as needed
}
}

continue;
}

if let Some(matches_upto) = matches_upto.filter(|&m| m == end_block) {
log::warn!(
"All matches made up to {}. Waiting for a few seconds",
Expand Down
14 changes: 14 additions & 0 deletions matching_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ pub struct Dump {
pub parsed_block: U64,
}

// Define the Dump struct
#[derive(Serialize)]
pub struct DumpSend<'a> {
market_metadata_store: Option<&'a MarketMetadataStore>,
local_ask_store: Option<&'a LocalAskStore>,
generator_store: Option<&'a GeneratorStore>,
native_staking_store: Option<&'a NativeStakingStore>,
symbiotic_stake_store: Option<&'a SymbioticStakeStore>,
cost_store: Option<&'a CostStore>,
key_store: Option<&'a KeyStore>,
stake_manager_store: Option<&'a StakeManagerStore>,
parsed_block: Option<&'a U64>,
}

impl MatchingEngine {
pub fn from_config(config: MatchingEngineConfig, matching_engine_port: Option<u16>) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion matching_engine/src/log_processor/er.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn process_entity_key_registry_logs(
)
.is_ok()
{
log::warn!("Skipped EnclaveImageRevoked event");
log::debug!("Skipped EnclaveImageRevoked event");
return Ok(());
}

Expand Down
24 changes: 13 additions & 11 deletions matching_engine/src/log_processor/gr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn process_generator_registry_logs(
let address = parsed_registered_generator_log.generator.into();
let compute = parsed_registered_generator_log.initial_compute.into();

log::warn!("During registration initial stake is assumed to be 0");
log::debug!("During registration initial stake is assumed to be 0");

let generator_data = genertor_registry
.generator_registry(address)
Expand Down Expand Up @@ -218,7 +218,7 @@ pub async fn process_generator_registry_logs(
added_stake_log.generator_address
);

log::warn!("Add Stake is now handled in native staking");
log::debug!("Add Stake is now handled in native staking");
return Ok(());
}

Expand All @@ -233,7 +233,7 @@ pub async fn process_generator_registry_logs(
request_stake_decrease_log.generator_address
);

log::warn!(
log::debug!(
"RequestStakeDecrease is not processed using native_staking::StakeWithdrawalRequested"
);
return Ok(());
Expand All @@ -251,7 +251,7 @@ pub async fn process_generator_registry_logs(
remove_stake_log.generator_address
);

log::warn!("Request stake decrese in no processed in native_stake::StakeWithdrawn");
log::debug!("Request stake decrese in no processed in native_stake::StakeWithdrawn");

return Ok(());
}
Expand Down Expand Up @@ -281,7 +281,7 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
)
{
log::info!(
log::debug!(
"Request compute decrease for Generator: {:?}",
request_compute_decrease_log.generator
);
Expand All @@ -300,7 +300,7 @@ pub async fn process_generator_registry_logs(
log.topics.clone(),
log.data.clone(),
) {
log::info!(
log::debug!(
"Compute decrease for Generator: {:?} to : {:?}",
decrease_compute_log.generator,
decrease_compute_log.compute
Expand All @@ -322,7 +322,7 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
) {
log::debug!("Stake Lock Imposed: {:?}", stake_lock_logs);
log::warn!("Stake Lock Imposed is now Handled in native_staking::StakeLocked and symbiotic_staking::StakeLocked separately");
log::debug!("Stake Lock Imposed is now Handled in native_staking::StakeLocked and symbiotic_staking::StakeLocked separately");
return Ok(());
}

Expand All @@ -346,7 +346,7 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
) {
log::debug!("Stake Lock Released: {:?}", stake_lock_logs);
log::warn!("Stake Lock Released in native_staking::StakeUnlocked and symbiotic_stake::StakeUnlocked separately");
log::debug!("Stake Lock Released in native_staking::StakeUnlocked and symbiotic_stake::StakeUnlocked separately");
return Ok(());
}

Expand All @@ -370,8 +370,8 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
)
{
log::warn!("Stake Slashed: {:?}", stake_slash_logs);
log::warn!("Stake slash is now handled in ns::JobSlashed and ss::JobSlashed separately");
log::debug!("Stake Slashed: {:?}", stake_slash_logs);
log::debug!("Stake slash is now handled in ns::JobSlashed and ss::JobSlashed separately");
return Ok(());
}

Expand All @@ -384,7 +384,9 @@ pub async fn process_generator_registry_logs(
"Processing SymbioticCompleteSnapshot: {:?}",
symbiotic_complete_snapshot_log
);
log::warn!("SymbioticCompleteSnapshot is now processed using symbiotic::SnapshotConfirmed");
log::debug!(
"SymbioticCompleteSnapshot is now processed using symbiotic::SnapshotConfirmed"
);
return Ok(());
}

Expand Down
22 changes: 20 additions & 2 deletions matching_engine/src/log_processor/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub async fn process_native_staking_logs(
let address = request_stake_decrease_log.operator;
let account = request_stake_decrease_log.account;
let index = request_stake_decrease_log.index;
let token = request_stake_decrease_log.token;
let amount = request_stake_decrease_log.amount;

log::warn!("pausing all assignments across all markets");
log::warn!("will be unpaused once the request if fully withdrawn");
Expand All @@ -135,7 +137,15 @@ pub async fn process_native_staking_logs(
log::warn!("Setting new utilization to same value");
let new_utilization = 1000000000000000000_i64.into();
generator_store.update_intended_stake_util(&address, new_utilization);
generator_store.insert_withdrawal_request(&address, WithdrawlRequest { account, index });
generator_store.insert_withdrawal_request(
&address,
WithdrawlRequest {
account,
index,
token,
amount,
},
);
return Ok(());
}

Expand Down Expand Up @@ -175,7 +185,15 @@ pub async fn process_native_staking_logs(
);
generator_store.resume_assignments_accross_all_markets(&address);
generator_store.update_intended_stake_util(&address, 1000000000000000000_i64.into());
generator_store.remove_withdrawal_request(&address, WithdrawlRequest { account, index });
generator_store.remove_withdrawal_request(
&address,
WithdrawlRequest {
account,
index,
token: token_address,
amount,
},
);

return Ok(());
}
Expand Down
17 changes: 13 additions & 4 deletions matching_engine/src/log_processor/ss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,21 @@ pub async fn process_symbiotic_staking_logs(

for stake_token in known_tokens {
for operator in all_generators.clone().into_iter() {
// if this fails, system breaks. TODO
let vault_snapshot_amount = symbiotic_staking
// if this fails, then we just use last amount
let vault_snapshot_amount_result = symbiotic_staking
.get_operator_stake_amount_at(capture_timestamp, stake_token, operator)
.call()
.await
.unwrap();
.await;

let vault_snapshot_amount = match vault_snapshot_amount_result {
Ok(data) => data,
Err(err) => {
log::error!("Unable to fetch latest symbiotic stake. contract call get_operator_stake_amount_at failing: {}", err);
symbiotic_stake_store
.get_latest_stake_info(&operator, &stake_token)
.clone()
}
};

log::debug!(
"operator:{:?}, snapshot token:{:?}, amount: {:?}",
Expand Down
18 changes: 2 additions & 16 deletions matching_engine/src/routes/ui_routes/welcome.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use ethers::types::U64;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::RwLock;

Expand All @@ -12,6 +11,7 @@ use crate::generator_lib::stake_manager_store::StakeManagerStore;
use crate::generator_lib::symbiotic_stake_store::SymbioticStakeStore;
use crate::market_metadata::MarketMetadataStore;

use crate::DumpSend;
use crate::{models::WelcomeResponse, try_read_or_lock};
use actix_web::{web::Data, HttpResponse};

Expand All @@ -21,20 +21,6 @@ pub async fn welcome() -> actix_web::Result<HttpResponse> {
}))
}

// Define the Dump struct
#[derive(Serialize)]
struct Dump<'a> {
market_metadata_store: Option<&'a MarketMetadataStore>,
local_ask_store: Option<&'a LocalAskStore>,
generator_store: Option<&'a GeneratorStore>,
native_staking_store: Option<&'a NativeStakingStore>,
symbiotic_stake_store: Option<&'a SymbioticStakeStore>,
cost_store: Option<&'a CostStore>,
key_store: Option<&'a KeyStore>,
stake_manager_store: Option<&'a StakeManagerStore>,
parsed_block: Option<&'a U64>,
}

pub async fn get_dump(
local_market_store: Data<Arc<RwLock<MarketMetadataStore>>>,
local_ask_store: Data<Arc<RwLock<LocalAskStore>>>,
Expand All @@ -56,7 +42,7 @@ pub async fn get_dump(
try_read_or_lock!(local_stake_manager_store, stake_manager_store);
try_read_or_lock!(local_parsed_block, parsed_block);

let dump = Dump {
let dump = DumpSend {
market_metadata_store: Some(&*market_store),
local_ask_store: Some(&*ask_store),
generator_store: Some(&*generator_store),
Expand Down

0 comments on commit 2b0327e

Please sign in to comment.