diff --git a/matching_engine/src/jobs/parser.rs b/matching_engine/src/jobs/parser.rs index ab365d3..5e63452 100644 --- a/matching_engine/src/jobs/parser.rs +++ b/matching_engine/src/jobs/parser.rs @@ -47,6 +47,9 @@ type SymbioticStakingInstance = bindings::symbiotic_staking::SymbioticStaking< SignerMiddleware, Wallet>, >; +type NativeStakingInstance = + bindings::native_staking::NativeStaking, Wallet>>; + pub struct LogParser { should_stop: Arc, start_block: Arc>, @@ -56,6 +59,7 @@ pub struct LogParser { generator_registry: GeneratorRegistryInstance, entity_registry: EntityRegistryInstance, symbiotic_staking: SymbioticStakingInstance, + native_staking: NativeStakingInstance, provider_http: Arc, Wallet>>, matching_engine_key: Vec, matching_engine_slave_keys: Vec>, @@ -83,6 +87,7 @@ impl LogParser { generator_registry: GeneratorRegistryInstance, entity_registry: EntityRegistryInstance, symbiotic_staking: SymbioticStakingInstance, + native_staking: NativeStakingInstance, matching_engine_key: String, matching_engine_slave_keys: Vec, shared_local_ask_store: Arc>, @@ -107,6 +112,7 @@ impl LogParser { generator_registry, entity_registry, symbiotic_staking, + native_staking, provider_http, matching_engine_key: hex::decode(matching_engine_key).unwrap(), matching_engine_slave_keys: matching_engine_slave_keys @@ -161,6 +167,8 @@ impl LogParser { let proof_marketplace_address = self.proof_marketplace.address(); let generator_registry_address = self.generator_registry.address(); let entity_key_registry_address = self.entity_registry.address(); + let native_staking_address = self.native_staking.address(); + let symbiotic_staking_address = self.symbiotic_staking.address(); let filter = Filter::default() .from_block(start_block) @@ -169,6 +177,8 @@ impl LogParser { proof_marketplace_address, generator_registry_address, entity_key_registry_address, + native_staking_address, + symbiotic_staking_address, ]); let logs = match self.provider_http.get_logs(&filter).await { @@ -244,6 +254,31 @@ impl LogParser { continue; } + if log.address.eq(&native_staking_address) { + log_processor::ns::process_native_staking_logs( + log, + &self.native_staking, + &self.shared_generator_store, + &self.rpc_url, + ) + .await + .unwrap(); + continue; + } + + if log.address.eq(&symbiotic_staking_address) { + log_processor::ss::process_symbiotic_staking_logs( + log, + &self.symbiotic_staking, + &self.shared_generator_store, + &self.shared_symbiotic_stake_store, + &self.rpc_url, + ) + .await + .unwrap(); + continue; + } + log::error!("Log of unknown contract found {:?}", log.address); return Err(anyhow::anyhow!("Unknown log")); } diff --git a/matching_engine/src/lib.rs b/matching_engine/src/lib.rs index abdf379..a65006b 100644 --- a/matching_engine/src/lib.rs +++ b/matching_engine/src/lib.rs @@ -110,6 +110,8 @@ pub struct MatchingEngineConfig { pub entity_registry: String, #[serde(default = "default_symbiotic_staking")] pub symbiotic_staking: String, + #[serde(default = "default_native_staking")] + pub native_staking: String, pub start_block: String, } @@ -119,6 +121,10 @@ fn default_symbiotic_staking() -> String { "0xE7136641cB2c94d318779c3B6BEb997dC5B2E574".to_string() } +fn default_native_staking() -> String { + "0xe9d2Bcc597f943ddA9EDf356DAC7C6A713dDE113".to_string() +} + pub struct MatchingEngine { config: MatchingEngineConfig, matching_engine_port: u16, @@ -142,6 +148,7 @@ impl MatchingEngine { generator_registry: String, entity_registry: String, symbiotic_staking: String, + native_staking: String, start_block: String, matching_engine_port: Option, ) -> Self { @@ -154,6 +161,7 @@ impl MatchingEngine { generator_registry, entity_registry, symbiotic_staking, + native_staking, start_block, }; @@ -250,6 +258,11 @@ impl MatchingEngine { client.clone(), ); + let native_staking_var = self.config.clone().native_staking; + let native_staking_address = Address::from_str(&native_staking_var).unwrap(); + let shared_native_staking = + bindings::native_staking::NativeStaking::new(native_staking_address, client.clone()); + let shared_parsed_block_number_store = Arc::new(RwLock::new( U64::from_dec_str(&start_block_string).expect("Unable to rad start_block"), )); @@ -311,6 +324,7 @@ impl MatchingEngine { generator_registry, entity_key_registry, shared_symbiotic_staking, + shared_native_staking, matching_engine_key, vec![], //TODO! fetch these slave keys using Oyster KMS shared_local_ask_store.clone(), diff --git a/matching_engine/src/log_processor/constants.rs b/matching_engine/src/log_processor/constants.rs index ee83458..aa0b61f 100644 --- a/matching_engine/src/log_processor/constants.rs +++ b/matching_engine/src/log_processor/constants.rs @@ -19,4 +19,6 @@ lazy_static! { pub static ref PROOF_MARKET_TOPICS_SKIP: HashSet = TOPICS_TO_SKIP.clone(); pub static ref GENERATOR_REGISTRY_TOPICS_SKIP: HashSet = TOPICS_TO_SKIP.clone(); pub static ref ENTITY_KEY_REGISTRY_TOPICS_SKIP: HashSet = TOPICS_TO_SKIP.clone(); + pub static ref NATIVE_STAKING_TOPICS_SKIP: HashSet = TOPICS_TO_SKIP.clone(); + pub static ref SYMBIOTIC_STAKING_TOPICS_SKIP: HashSet = TOPICS_TO_SKIP.clone(); } diff --git a/matching_engine/src/log_processor/gr.rs b/matching_engine/src/log_processor/gr.rs index 2ca7c85..4518bae 100644 --- a/matching_engine/src/log_processor/gr.rs +++ b/matching_engine/src/log_processor/gr.rs @@ -4,23 +4,19 @@ use tokio::sync::RwLock; use crate::generator_lib::*; use crate::log_processor::constants; -use crate::utility::get_l1_block_from_l2_block; -use crate::utility::{ - tx_to_string, TokenTracker, TEST_TOKEN_ADDRESS_ONE, TEST_TOKEN_ADDRESS_THREE, - TEST_TOKEN_ADDRESS_TWO, -}; +use crate::utility::TokenTracker; pub async fn process_generator_registry_logs( log: &Log, genertor_registry: &bindings::generator_registry::GeneratorRegistry< SignerMiddleware, Wallet>, >, - symbiotic_staking: &bindings::symbiotic_staking::SymbioticStaking< + _: &bindings::symbiotic_staking::SymbioticStaking< SignerMiddleware, Wallet>, >, generator_store: &Arc>, - symbiotic_stake_store: &Arc>, - rpc_url: &str, + _: &Arc>, + _: &str, ) -> Result<(), Box> { if constants::GENERATOR_REGISTRY_TOPICS_SKIP .get(&log.topics[0]) @@ -222,26 +218,8 @@ pub async fn process_generator_registry_logs( "Added stake to Generator: {:?}", added_stake_log.generator_address ); - let address = added_stake_log.generator_address; - let amount = added_stake_log.amount; - let token_address = added_stake_log.token; - - let block_l2: U256 = log.block_number.unwrap().as_u64().into(); - let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) - .await - .unwrap_or_default(); - - generator_store.add_extra_stake( - &address, - &token_address, - &amount, - U64::from(block_l1.as_u64()), - log.transaction_index.unwrap(), - log.log_index.unwrap(), - tx_to_string(&log.transaction_hash.unwrap()), - delegation::Source::Native, - ); + log::warn!("Add Stake is now handled in native staking"); return Ok(()); } @@ -256,16 +234,9 @@ pub async fn process_generator_registry_logs( request_stake_decrease_log.generator_address ); - let address = request_stake_decrease_log.generator_address; - - log::warn!("pausing all assignments across all markets"); - log::warn!("will be unpaused once the request if fully withdrawn"); - - generator_store.pause_assignments_across_all_markets(&address); - - log::warn!("Setting new utilization to same value"); - let new_utilization = 1000000000000000000_i64.into(); - generator_store.update_intended_stake_util(&address, new_utilization); + log::warn!( + "RequestStakeDecrease is not processed using native_staking::StakeWithdrawalRequested" + ); return Ok(()); } @@ -281,28 +252,7 @@ pub async fn process_generator_registry_logs( remove_stake_log.generator_address ); - let address = remove_stake_log.generator_address; - let amount = remove_stake_log.amount; - let token_address = remove_stake_log.token; - - let block_l2: U256 = log.block_number.unwrap().as_u64().into(); - let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) - .await - .unwrap_or_default(); - - generator_store.remove_stake( - &address, - &token_address, - &amount, - U64::from(block_l1.as_u64()), - log.transaction_index.unwrap(), - log.log_index.unwrap(), - tx_to_string(&log.transaction_hash.unwrap()), - delegation::Operation::UnDelegate, - delegation::Source::Native, - ); - generator_store.resume_assignments_accross_all_markets(&address); - generator_store.update_intended_stake_util(&address, 1000000000000000000_i64.into()); + log::warn!("Request stake decrese in no processed in native_stake::StakeWithdrawn"); return Ok(()); } @@ -373,11 +323,7 @@ pub async fn process_generator_registry_logs( log.data.clone(), ) { log::debug!("Stake Lock Imposed: {:?}", stake_lock_logs); - let address = stake_lock_logs.generator_address; - let stake_locked = stake_lock_logs.stake; - let token_address = stake_lock_logs.token; - - generator_store.update_on_stake_locked(&address, &token_address, stake_locked); + log::warn!("Stake Lock Imposed is now Handled in native_staking::StakeLocked and symbiotic_staking::StakeLocked separately"); return Ok(()); } @@ -401,10 +347,7 @@ pub async fn process_generator_registry_logs( log.data.clone(), ) { log::debug!("Stake Lock Released: {:?}", stake_lock_logs); - let address = stake_lock_logs.generator_address; - let stake_released = stake_lock_logs.stake; - let token_address = stake_lock_logs.token; - generator_store.update_on_stake_released(&address, &token_address, stake_released); + log::warn!("Stake Lock Released in native_staking::StakeUnlocked and symbiotic_stake::StakeUnlocked separately"); return Ok(()); } @@ -429,26 +372,7 @@ pub async fn process_generator_registry_logs( ) { log::warn!("Stake Slashed: {:?}", stake_slash_logs); - let address = stake_slash_logs.generator_address; - let stake_slashed = stake_slash_logs.stake; - let token_address = stake_slash_logs.token; - - let block_l2: U256 = log.block_number.unwrap().as_u64().into(); - let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) - .await - .unwrap_or_default(); - - generator_store.remove_stake( - &address, - &token_address, - &stake_slashed, - U64::from(block_l1.as_u64()), - log.transaction_index.unwrap(), - log.log_index.unwrap(), - tx_to_string(&log.transaction_hash.unwrap()), - delegation::Operation::Slash, - delegation::Source::Native, - ); + log::warn!("Stake slash is now handled in ns::JobSlashed and ss::JobSlashed separately"); return Ok(()); } @@ -457,73 +381,11 @@ pub async fn process_generator_registry_logs( log.topics.clone(), log.data.clone(), ) { - log::debug!("Processing SymbioticCompleteSnapshot"); - let mut symbiotic_stake_store = { symbiotic_stake_store.write().await }; - - let capture_timestamp = { - let capture_timestamp_token = symbiotic_complete_snapshot_log.first().unwrap(); - let capture_timestamp = capture_timestamp_token.clone().into_uint().unwrap(); - capture_timestamp - }; - - let known_tokens: Vec
= vec![ - TEST_TOKEN_ADDRESS_ONE.clone(), - TEST_TOKEN_ADDRESS_TWO.clone(), - TEST_TOKEN_ADDRESS_THREE.clone(), - ]; - let all_generators = generator_store.all_generators_address(); - - for stake_token in known_tokens { - for operator in all_generators.clone().into_iter() { - // if this fails, system breaks. TODO - let vault_snapshot_amount = symbiotic_staking - .get_operator_stake_amount_at(capture_timestamp, stake_token, operator) - .call() - .await - .unwrap(); - - log::debug!( - "operator:{}, snapshot token:{}, amount: {}", - &operator, - &stake_token, - vault_snapshot_amount.to_string() - ); - - // before updating in symbiotic, do these steps - let last_stored_staking_info = - symbiotic_stake_store.get_latest_stake_info(&operator, &stake_token); - - if vault_snapshot_amount.gt(&last_stored_staking_info) { - generator_store.add_extra_stake( - &operator, - &stake_token, - &(vault_snapshot_amount - last_stored_staking_info), - log.block_number.unwrap(), - log.transaction_index.unwrap(), - log.log_index.unwrap(), - tx_to_string(&log.transaction_hash.unwrap()), - delegation::Source::Symbiotic, - ); - } else if vault_snapshot_amount.lt(&last_stored_staking_info) { - generator_store.remove_stake( - &operator, - &stake_token, - &(last_stored_staking_info - vault_snapshot_amount), - log.block_number.unwrap(), - log.transaction_index.unwrap(), - log.log_index.unwrap(), - tx_to_string(&log.transaction_hash.unwrap()), - delegation::Operation::UnDelegate, - delegation::Source::Symbiotic, - ); - } else { - log::debug!("No change in symbiotic stake noticed"); - } - - symbiotic_stake_store.upsert_stake(&operator, &stake_token, &vault_snapshot_amount); - } - } - + log::debug!( + "Processing SymbioticCompleteSnapshot: {:?}", + symbiotic_complete_snapshot_log + ); + log::warn!("SymbioticCompleteSnapshot is now processed using symbiotic::SnapshotConfirmed"); return Ok(()); } diff --git a/matching_engine/src/log_processor/mod.rs b/matching_engine/src/log_processor/mod.rs index 85b904e..c42c440 100644 --- a/matching_engine/src/log_processor/mod.rs +++ b/matching_engine/src/log_processor/mod.rs @@ -1,4 +1,6 @@ pub mod constants; pub mod er; pub mod gr; +pub mod ns; pub mod pm; +pub mod ss; diff --git a/matching_engine/src/log_processor/ns.rs b/matching_engine/src/log_processor/ns.rs new file mode 100644 index 0000000..f68fd26 --- /dev/null +++ b/matching_engine/src/log_processor/ns.rs @@ -0,0 +1,211 @@ +use std::sync::Arc; + +use ethers::prelude::{k256::ecdsa::SigningKey, *}; +use tokio::sync::RwLock; + +use crate::{ + generator_lib::{delegation, generator_store}, + log_processor::constants, + utility::{get_l1_block_from_l2_block, tx_to_string}, +}; + +pub async fn process_native_staking_logs( + log: &Log, + native_staking: &bindings::native_staking::NativeStaking< + SignerMiddleware, Wallet>, + >, + generator_store: &Arc>, + rpc_url: &str, +) -> Result<(), Box> { + if constants::NATIVE_STAKING_TOPICS_SKIP + .get(&log.topics[0]) + .is_some() + { + log::debug!("standard topic to skip found, ignoring it"); + return Ok(()); + } + if let Ok(stake_manager_set_log) = + native_staking.decode_event_raw("StakingManagerSet", log.topics.clone(), log.data.clone()) + { + log::debug!("Staking Manager Set Logs: {:?}", stake_manager_set_log); + return Ok(()); + } + + if let Ok(event_log) = + native_staking.decode_event_raw("StakeTokenAdded", log.topics.clone(), log.data.clone()) + { + log::debug!("StakeTokenAdded Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = + native_staking.decode_event_raw("AmountToLockSet", log.topics.clone(), log.data.clone()) + { + log::debug!("AmountToLockSet Logs: {:?}", event_log); + return Ok(()); + } + + let mut generator_store = { generator_store.write().await }; + + if let Ok(added_stake_log) = native_staking + .decode_event::( + "Staked", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!( + "Native stake Added. Generator: {}", + added_stake_log.operator + ); + + let address = added_stake_log.operator; + let amount = added_stake_log.amount; + let token_address = added_stake_log.token; + + let block_l2: U256 = log.block_number.unwrap().as_u64().into(); + let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) + .await + .unwrap_or_default(); + + generator_store.add_extra_stake( + &address, + &token_address, + &amount, + U64::from(block_l1.as_u64()), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Source::Native, + ); + + return Ok(()); + } + + if let Ok(request_stake_decrease_log) = native_staking + .decode_event::( + "StakeWithdrawalRequested", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!( + "Request stake decrease for Generator: {:?}", + request_stake_decrease_log.operator + ); + + let address = request_stake_decrease_log.operator; + + log::warn!("pausing all assignments across all markets"); + log::warn!("will be unpaused once the request if fully withdrawn"); + + generator_store.pause_assignments_across_all_markets(&address); + + log::warn!("Setting new utilization to same value"); + let new_utilization = 1000000000000000000_i64.into(); + generator_store.update_intended_stake_util(&address, new_utilization); + return Ok(()); + } + + if let Ok(remove_stake_log) = native_staking + .decode_event::( + "StakeWithdrawn", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!( + "Remove stake for Generator: {:?}", + remove_stake_log.operator + ); + + let address = remove_stake_log.operator; + let amount = remove_stake_log.amount; + let token_address = remove_stake_log.token; + + let block_l2: U256 = log.block_number.unwrap().as_u64().into(); + let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) + .await + .unwrap_or_default(); + + generator_store.remove_stake( + &address, + &token_address, + &amount, + U64::from(block_l1.as_u64()), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Operation::UnDelegate, + delegation::Source::Native, + ); + generator_store.resume_assignments_accross_all_markets(&address); + generator_store.update_intended_stake_util(&address, 1000000000000000000_i64.into()); + + return Ok(()); + } + + if let Ok(stake_lock_logs) = native_staking + .decode_event::( + "StakeLocked", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!("Stake Locked: {:?}", stake_lock_logs); + let address = stake_lock_logs.operator; + let stake_locked = stake_lock_logs.amount; + let token_address = stake_lock_logs.token; + + generator_store.update_on_stake_locked(&address, &token_address, stake_locked); + return Ok(()); + } + + if let Ok(stake_lock_logs) = native_staking + .decode_event::( + "StakeUnlocked", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!("Stake Lock Released: {:?}", stake_lock_logs); + let address = stake_lock_logs.operator; + let stake_released = stake_lock_logs.amount; + let token_address = stake_lock_logs.token; + generator_store.update_on_stake_released(&address, &token_address, stake_released); + return Ok(()); + } + + if let Ok(stake_slash_logs) = native_staking + .decode_event::( + "JobSlashed", + log.topics.clone(), + log.data.clone(), + ) + { + log::warn!("Job/Stake Slashed: {:?}", stake_slash_logs); + let address = stake_slash_logs.operator; + let stake_slashed = stake_slash_logs.amount; + let token_address = stake_slash_logs.token; + + let block_l2: U256 = log.block_number.unwrap().as_u64().into(); + let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) + .await + .unwrap_or_default(); + + generator_store.remove_stake( + &address, + &token_address, + &stake_slashed, + U64::from(block_l1.as_u64()), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Operation::Slash, + delegation::Source::Native, + ); + return Ok(()); + } + + log::error!("unhandled log in native staking {:?}", log); + return Err("Unhandled log in native staking".into()); +} diff --git a/matching_engine/src/log_processor/ss.rs b/matching_engine/src/log_processor/ss.rs new file mode 100644 index 0000000..03d1e9d --- /dev/null +++ b/matching_engine/src/log_processor/ss.rs @@ -0,0 +1,290 @@ +use std::sync::Arc; + +use ethers::prelude::{k256::ecdsa::SigningKey, *}; +use tokio::sync::RwLock; + +use crate::{ + generator_lib::{delegation, generator_store, symbiotic_stake_store}, + log_processor::constants, + utility::{ + get_l1_block_from_l2_block, tx_to_string, TEST_TOKEN_ADDRESS_ONE, TEST_TOKEN_ADDRESS_THREE, + TEST_TOKEN_ADDRESS_TWO, + }, +}; + +pub async fn process_symbiotic_staking_logs( + log: &Log, + symbiotic_staking: &bindings::symbiotic_staking::SymbioticStaking< + SignerMiddleware, Wallet>, + >, + generator_store: &Arc>, + symbiotic_stake_store: &Arc>, + rpc_url: &str, +) -> Result<(), Box> { + if constants::SYMBIOTIC_STAKING_TOPICS_SKIP + .get(&log.topics[0]) + .is_some() + { + log::debug!("standard topic to skip found, ignoring it"); + return Ok(()); + } + + if let Ok(stake_manager_set_log) = symbiotic_staking.decode_event_raw( + "StakingManagerSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("Staking Manager Set Logs: {:?}", stake_manager_set_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "ProofMarketplaceSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("ProofMarketplaceSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "RewardDistributorSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("RewardDistributorSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "FeeRewardTokenSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("FeeRewardTokenSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = + symbiotic_staking.decode_event_raw("StakeTokenAdded", log.topics.clone(), log.data.clone()) + { + log::debug!("StakeTokenAdded Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = + symbiotic_staking.decode_event_raw("AmountToLockSet", log.topics.clone(), log.data.clone()) + { + log::debug!("AmountToLockSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "BaseTransmitterComissionRateSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("BaseTransmitterComissionRateSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "SubmissionCooldownSet", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("SubmissionCooldownSet Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "EnclaveImageAdded", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("EnclaveImageAdded Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "AttestationVerifierUpdated", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("AttestationVerifierUpdated Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "EnclaveImageRemoved", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("EnclaveImageRemoved Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "VaultSnapshotSubmitted", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("VaultSnapshotSubmitted Logs: {:?}", event_log); + return Ok(()); + } + + if let Ok(event_log) = symbiotic_staking.decode_event_raw( + "SlashResultSubmitted", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("SlashResultSubmitted Logs: {:?}", event_log); + return Ok(()); + } + + let mut generator_store = { generator_store.write().await }; + + if let Ok(symbiotic_complete_snapshot_log) = symbiotic_staking.decode_event_raw( + "SnapshotConfirmed", + log.topics.clone(), + log.data.clone(), + ) { + log::debug!("Processing SnapshotConfirmed"); + let mut symbiotic_stake_store = { symbiotic_stake_store.write().await }; + + let capture_timestamp = { + let transmitter_token = symbiotic_complete_snapshot_log.get(0).unwrap(); + let transmitter = transmitter_token.clone().into_address().unwrap(); + log::debug!("Transmitter: {}", transmitter); + + let capture_timestamp_token = symbiotic_complete_snapshot_log.get(1).unwrap(); + let capture_timestamp = capture_timestamp_token.clone().into_uint().unwrap(); + capture_timestamp + }; + + let known_tokens: Vec
= vec![ + TEST_TOKEN_ADDRESS_ONE.clone(), + TEST_TOKEN_ADDRESS_TWO.clone(), + TEST_TOKEN_ADDRESS_THREE.clone(), + ]; + let all_generators = generator_store.all_generators_address(); + + for stake_token in known_tokens { + for operator in all_generators.clone().into_iter() { + // if this fails, system breaks. TODO + let vault_snapshot_amount = symbiotic_staking + .get_operator_stake_amount_at(capture_timestamp, stake_token, operator) + .call() + .await + .unwrap(); + + log::debug!( + "operator:{}, snapshot token:{}, amount: {}", + &operator, + &stake_token, + vault_snapshot_amount.to_string() + ); + + // before updating in symbiotic, do these steps + let last_stored_staking_info = + symbiotic_stake_store.get_latest_stake_info(&operator, &stake_token); + + if vault_snapshot_amount.gt(&last_stored_staking_info) { + generator_store.add_extra_stake( + &operator, + &stake_token, + &(vault_snapshot_amount - last_stored_staking_info), + log.block_number.unwrap(), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Source::Symbiotic, + ); + } else if vault_snapshot_amount.lt(&last_stored_staking_info) { + generator_store.remove_stake( + &operator, + &stake_token, + &(last_stored_staking_info - vault_snapshot_amount), + log.block_number.unwrap(), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Operation::UnDelegate, + delegation::Source::Symbiotic, + ); + } else { + log::debug!("No change in symbiotic stake noticed"); + } + + symbiotic_stake_store.upsert_stake(&operator, &stake_token, &vault_snapshot_amount); + } + } + + return Ok(()); + } + + if let Ok(stake_lock_logs) = symbiotic_staking + .decode_event::( + "StakeLocked", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!("Stake Locked: {:?}", stake_lock_logs); + let address = stake_lock_logs.operator; + let stake_locked = stake_lock_logs.amount; + let token_address = stake_lock_logs.token; + + generator_store.update_on_stake_locked(&address, &token_address, stake_locked); + return Ok(()); + } + + if let Ok(stake_lock_logs) = symbiotic_staking + .decode_event::( + "StakeUnlocked", + log.topics.clone(), + log.data.clone(), + ) + { + log::debug!("Stake Lock Released: {:?}", stake_lock_logs); + let address = stake_lock_logs.operator; + let stake_released = stake_lock_logs.amount; + let token_address = stake_lock_logs.token; + generator_store.update_on_stake_released(&address, &token_address, stake_released); + return Ok(()); + } + + if let Ok(stake_slash_logs) = symbiotic_staking + .decode_event::( + "JobSlashed", + log.topics.clone(), + log.data.clone(), + ) + { + log::warn!("Job/Stake Slashed: {:?}", stake_slash_logs); + let address = stake_slash_logs.operator; + let stake_slashed = stake_slash_logs.amount; + let token_address = stake_slash_logs.token; + + let block_l2: U256 = log.block_number.unwrap().as_u64().into(); + let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2) + .await + .unwrap_or_default(); + + generator_store.remove_stake( + &address, + &token_address, + &stake_slashed, + U64::from(block_l1.as_u64()), + log.transaction_index.unwrap(), + log.log_index.unwrap(), + tx_to_string(&log.transaction_hash.unwrap()), + delegation::Operation::Slash, + delegation::Source::Native, + ); + return Ok(()); + } + + log::error!("unhandled log in symbiotic staking {:?}", log); + return Err("Unhandled log in symbiotic staking".into()); +}