Skip to content

Commit

Permalink
Template for stake split
Browse files Browse the repository at this point in the history
  • Loading branch information
akshay111meher committed Nov 14, 2024
1 parent b52821b commit 57dadb1
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 155 deletions.
35 changes: 35 additions & 0 deletions matching_engine/src/jobs/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type SymbioticStakingInstance = bindings::symbiotic_staking::SymbioticStaking<
SignerMiddleware<Provider<Http>, Wallet<SigningKey>>,
>;

type NativeStakingInstance =
bindings::native_staking::NativeStaking<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;

pub struct LogParser {
should_stop: Arc<AtomicBool>,
start_block: Arc<RwLock<U64>>,
Expand All @@ -56,6 +59,7 @@ pub struct LogParser {
generator_registry: GeneratorRegistryInstance,
entity_registry: EntityRegistryInstance,
symbiotic_staking: SymbioticStakingInstance,
native_staking: NativeStakingInstance,
provider_http: Arc<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>,
matching_engine_key: Vec<u8>,
matching_engine_slave_keys: Vec<Vec<u8>>,
Expand Down Expand Up @@ -83,6 +87,7 @@ impl LogParser {
generator_registry: GeneratorRegistryInstance,
entity_registry: EntityRegistryInstance,
symbiotic_staking: SymbioticStakingInstance,
native_staking: NativeStakingInstance,
matching_engine_key: String,
matching_engine_slave_keys: Vec<String>,
shared_local_ask_store: Arc<RwLock<LocalAskStore>>,
Expand All @@ -107,6 +112,7 @@ impl LogParser {
generator_registry,
entity_registry,
symbiotic_staking,
native_staking,
provider_http,
matching_engine_key: hex::decode(matching_engine_key).unwrap(),
matching_engine_slave_keys: matching_engine_slave_keys
Expand Down Expand Up @@ -161,6 +167,8 @@ impl LogParser {
let proof_marketplace_address = self.proof_marketplace.address();
let generator_registry_address = self.generator_registry.address();
let entity_key_registry_address = self.entity_registry.address();
let native_staking_address = self.native_staking.address();
let symbiotic_staking_address = self.symbiotic_staking.address();

let filter = Filter::default()
.from_block(start_block)
Expand All @@ -169,6 +177,8 @@ impl LogParser {
proof_marketplace_address,
generator_registry_address,
entity_key_registry_address,
native_staking_address,
symbiotic_staking_address,
]);

let logs = match self.provider_http.get_logs(&filter).await {
Expand Down Expand Up @@ -244,6 +254,31 @@ impl LogParser {
continue;
}

if log.address.eq(&native_staking_address) {
log_processor::ns::process_native_staking_logs(
log,
&self.native_staking,
&self.shared_generator_store,
&self.rpc_url,
)
.await
.unwrap();
continue;
}

if log.address.eq(&symbiotic_staking_address) {
log_processor::ss::process_symbiotic_staking_logs(
log,
&self.symbiotic_staking,
&self.shared_generator_store,
&self.shared_symbiotic_stake_store,
&self.rpc_url,
)
.await
.unwrap();
continue;
}

log::error!("Log of unknown contract found {:?}", log.address);
return Err(anyhow::anyhow!("Unknown log"));
}
Expand Down
14 changes: 14 additions & 0 deletions matching_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub struct MatchingEngineConfig {
pub entity_registry: String,
#[serde(default = "default_symbiotic_staking")]
pub symbiotic_staking: String,
#[serde(default = "default_native_staking")]
pub native_staking: String,
pub start_block: String,
}

Expand All @@ -119,6 +121,10 @@ fn default_symbiotic_staking() -> String {
"0xE7136641cB2c94d318779c3B6BEb997dC5B2E574".to_string()
}

fn default_native_staking() -> String {
"0xe9d2Bcc597f943ddA9EDf356DAC7C6A713dDE113".to_string()
}

pub struct MatchingEngine {
config: MatchingEngineConfig,
matching_engine_port: u16,
Expand All @@ -142,6 +148,7 @@ impl MatchingEngine {
generator_registry: String,
entity_registry: String,
symbiotic_staking: String,
native_staking: String,
start_block: String,
matching_engine_port: Option<u16>,
) -> Self {
Expand All @@ -154,6 +161,7 @@ impl MatchingEngine {
generator_registry,
entity_registry,
symbiotic_staking,
native_staking,
start_block,
};

Expand Down Expand Up @@ -250,6 +258,11 @@ impl MatchingEngine {
client.clone(),
);

let native_staking_var = self.config.clone().native_staking;
let native_staking_address = Address::from_str(&native_staking_var).unwrap();
let shared_native_staking =
bindings::native_staking::NativeStaking::new(native_staking_address, client.clone());

let shared_parsed_block_number_store = Arc::new(RwLock::new(
U64::from_dec_str(&start_block_string).expect("Unable to rad start_block"),
));
Expand Down Expand Up @@ -311,6 +324,7 @@ impl MatchingEngine {
generator_registry,
entity_key_registry,
shared_symbiotic_staking,
shared_native_staking,
matching_engine_key,
vec![], //TODO! fetch these slave keys using Oyster KMS
shared_local_ask_store.clone(),
Expand Down
2 changes: 2 additions & 0 deletions matching_engine/src/log_processor/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ lazy_static! {
pub static ref PROOF_MARKET_TOPICS_SKIP: HashSet<H256> = TOPICS_TO_SKIP.clone();
pub static ref GENERATOR_REGISTRY_TOPICS_SKIP: HashSet<H256> = TOPICS_TO_SKIP.clone();
pub static ref ENTITY_KEY_REGISTRY_TOPICS_SKIP: HashSet<H256> = TOPICS_TO_SKIP.clone();
pub static ref NATIVE_STAKING_TOPICS_SKIP: HashSet<H256> = TOPICS_TO_SKIP.clone();
pub static ref SYMBIOTIC_STAKING_TOPICS_SKIP: HashSet<H256> = TOPICS_TO_SKIP.clone();
}
172 changes: 17 additions & 155 deletions matching_engine/src/log_processor/gr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,19 @@ use tokio::sync::RwLock;

use crate::generator_lib::*;
use crate::log_processor::constants;
use crate::utility::get_l1_block_from_l2_block;
use crate::utility::{
tx_to_string, TokenTracker, TEST_TOKEN_ADDRESS_ONE, TEST_TOKEN_ADDRESS_THREE,
TEST_TOKEN_ADDRESS_TWO,
};
use crate::utility::TokenTracker;

pub async fn process_generator_registry_logs(
log: &Log,
genertor_registry: &bindings::generator_registry::GeneratorRegistry<
SignerMiddleware<Provider<Http>, Wallet<SigningKey>>,
>,
symbiotic_staking: &bindings::symbiotic_staking::SymbioticStaking<
_: &bindings::symbiotic_staking::SymbioticStaking<
SignerMiddleware<Provider<Http>, Wallet<SigningKey>>,
>,
generator_store: &Arc<RwLock<generator_store::GeneratorStore>>,
symbiotic_stake_store: &Arc<RwLock<symbiotic_stake_store::SymbioticStakeStore>>,
rpc_url: &str,
_: &Arc<RwLock<symbiotic_stake_store::SymbioticStakeStore>>,
_: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if constants::GENERATOR_REGISTRY_TOPICS_SKIP
.get(&log.topics[0])
Expand Down Expand Up @@ -222,26 +218,8 @@ pub async fn process_generator_registry_logs(
"Added stake to Generator: {:?}",
added_stake_log.generator_address
);
let address = added_stake_log.generator_address;
let amount = added_stake_log.amount;
let token_address = added_stake_log.token;

let block_l2: U256 = log.block_number.unwrap().as_u64().into();
let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2)
.await
.unwrap_or_default();

generator_store.add_extra_stake(
&address,
&token_address,
&amount,
U64::from(block_l1.as_u64()),
log.transaction_index.unwrap(),
log.log_index.unwrap(),
tx_to_string(&log.transaction_hash.unwrap()),
delegation::Source::Native,
);

log::warn!("Add Stake is now handled in native staking");
return Ok(());
}

Expand All @@ -256,16 +234,9 @@ pub async fn process_generator_registry_logs(
request_stake_decrease_log.generator_address
);

let address = request_stake_decrease_log.generator_address;

log::warn!("pausing all assignments across all markets");
log::warn!("will be unpaused once the request if fully withdrawn");

generator_store.pause_assignments_across_all_markets(&address);

log::warn!("Setting new utilization to same value");
let new_utilization = 1000000000000000000_i64.into();
generator_store.update_intended_stake_util(&address, new_utilization);
log::warn!(
"RequestStakeDecrease is not processed using native_staking::StakeWithdrawalRequested"
);
return Ok(());
}

Expand All @@ -281,28 +252,7 @@ pub async fn process_generator_registry_logs(
remove_stake_log.generator_address
);

let address = remove_stake_log.generator_address;
let amount = remove_stake_log.amount;
let token_address = remove_stake_log.token;

let block_l2: U256 = log.block_number.unwrap().as_u64().into();
let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2)
.await
.unwrap_or_default();

generator_store.remove_stake(
&address,
&token_address,
&amount,
U64::from(block_l1.as_u64()),
log.transaction_index.unwrap(),
log.log_index.unwrap(),
tx_to_string(&log.transaction_hash.unwrap()),
delegation::Operation::UnDelegate,
delegation::Source::Native,
);
generator_store.resume_assignments_accross_all_markets(&address);
generator_store.update_intended_stake_util(&address, 1000000000000000000_i64.into());
log::warn!("Request stake decrese in no processed in native_stake::StakeWithdrawn");

return Ok(());
}
Expand Down Expand Up @@ -373,11 +323,7 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
) {
log::debug!("Stake Lock Imposed: {:?}", stake_lock_logs);
let address = stake_lock_logs.generator_address;
let stake_locked = stake_lock_logs.stake;
let token_address = stake_lock_logs.token;

generator_store.update_on_stake_locked(&address, &token_address, stake_locked);
log::warn!("Stake Lock Imposed is now Handled in native_staking::StakeLocked and symbiotic_staking::StakeLocked separately");
return Ok(());
}

Expand All @@ -401,10 +347,7 @@ pub async fn process_generator_registry_logs(
log.data.clone(),
) {
log::debug!("Stake Lock Released: {:?}", stake_lock_logs);
let address = stake_lock_logs.generator_address;
let stake_released = stake_lock_logs.stake;
let token_address = stake_lock_logs.token;
generator_store.update_on_stake_released(&address, &token_address, stake_released);
log::warn!("Stake Lock Released in native_staking::StakeUnlocked and symbiotic_stake::StakeUnlocked separately");
return Ok(());
}

Expand All @@ -429,26 +372,7 @@ pub async fn process_generator_registry_logs(
)
{
log::warn!("Stake Slashed: {:?}", stake_slash_logs);
let address = stake_slash_logs.generator_address;
let stake_slashed = stake_slash_logs.stake;
let token_address = stake_slash_logs.token;

let block_l2: U256 = log.block_number.unwrap().as_u64().into();
let block_l1: U256 = get_l1_block_from_l2_block(rpc_url, block_l2)
.await
.unwrap_or_default();

generator_store.remove_stake(
&address,
&token_address,
&stake_slashed,
U64::from(block_l1.as_u64()),
log.transaction_index.unwrap(),
log.log_index.unwrap(),
tx_to_string(&log.transaction_hash.unwrap()),
delegation::Operation::Slash,
delegation::Source::Native,
);
log::warn!("Stake slash is now handled in ns::JobSlashed and ss::JobSlashed separately");
return Ok(());
}

Expand All @@ -457,73 +381,11 @@ pub async fn process_generator_registry_logs(
log.topics.clone(),
log.data.clone(),
) {
log::debug!("Processing SymbioticCompleteSnapshot");
let mut symbiotic_stake_store = { symbiotic_stake_store.write().await };

let capture_timestamp = {
let capture_timestamp_token = symbiotic_complete_snapshot_log.first().unwrap();
let capture_timestamp = capture_timestamp_token.clone().into_uint().unwrap();
capture_timestamp
};

let known_tokens: Vec<Address> = vec![
TEST_TOKEN_ADDRESS_ONE.clone(),
TEST_TOKEN_ADDRESS_TWO.clone(),
TEST_TOKEN_ADDRESS_THREE.clone(),
];
let all_generators = generator_store.all_generators_address();

for stake_token in known_tokens {
for operator in all_generators.clone().into_iter() {
// if this fails, system breaks. TODO
let vault_snapshot_amount = symbiotic_staking
.get_operator_stake_amount_at(capture_timestamp, stake_token, operator)
.call()
.await
.unwrap();

log::debug!(
"operator:{}, snapshot token:{}, amount: {}",
&operator,
&stake_token,
vault_snapshot_amount.to_string()
);

// before updating in symbiotic, do these steps
let last_stored_staking_info =
symbiotic_stake_store.get_latest_stake_info(&operator, &stake_token);

if vault_snapshot_amount.gt(&last_stored_staking_info) {
generator_store.add_extra_stake(
&operator,
&stake_token,
&(vault_snapshot_amount - last_stored_staking_info),
log.block_number.unwrap(),
log.transaction_index.unwrap(),
log.log_index.unwrap(),
tx_to_string(&log.transaction_hash.unwrap()),
delegation::Source::Symbiotic,
);
} else if vault_snapshot_amount.lt(&last_stored_staking_info) {
generator_store.remove_stake(
&operator,
&stake_token,
&(last_stored_staking_info - vault_snapshot_amount),
log.block_number.unwrap(),
log.transaction_index.unwrap(),
log.log_index.unwrap(),
tx_to_string(&log.transaction_hash.unwrap()),
delegation::Operation::UnDelegate,
delegation::Source::Symbiotic,
);
} else {
log::debug!("No change in symbiotic stake noticed");
}

symbiotic_stake_store.upsert_stake(&operator, &stake_token, &vault_snapshot_amount);
}
}

log::debug!(
"Processing SymbioticCompleteSnapshot: {:?}",
symbiotic_complete_snapshot_log
);
log::warn!("SymbioticCompleteSnapshot is now processed using symbiotic::SnapshotConfirmed");
return Ok(());
}

Expand Down
2 changes: 2 additions & 0 deletions matching_engine/src/log_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod constants;
pub mod er;
pub mod gr;
pub mod ns;
pub mod pm;
pub mod ss;
Loading

0 comments on commit 57dadb1

Please sign in to comment.