Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingress-egress-tracker): remove unnecessary fields #4425

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions api/bin/chainflip-ingress-egress-tracker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{store::RedisStore, witnessing::state_chain::handle_call};
use crate::store::RedisStore;
use chainflip_engine::settings::CfSettings;
use clap::Parser;
use futures::FutureExt;
use settings::{DepositTrackerSettings, TrackerOptions};
use std::ops::Deref;
use store::{Storable, Store};
use utilities::task_scope;

Expand All @@ -22,24 +21,13 @@ async fn start(
.expect("setting default subscriber failed");

let client = redis::Client::open(settings.redis_url.clone()).unwrap();
let mut store = RedisStore::new(client.get_multiplexed_tokio_connection().await?);
let store = RedisStore::new(client.get_multiplexed_tokio_connection().await?);

// Broadcast channel will drop old messages when the buffer is full to
// avoid "memory leaks" due to slow receivers.
const EVENT_BUFFER_SIZE: usize = 1024;
let (witness_sender, _) =
tokio::sync::broadcast::channel::<state_chain_runtime::RuntimeCall>(EVENT_BUFFER_SIZE);
Comment on lines -29 to -31
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer need to multicast the witnessing calls to different websockets so i removed the channel logic


let (state_chain_client, env_params) =
witnessing::start(scope, settings.clone(), witness_sender.clone()).await?;
let btc_network = env_params.chainflip_network.into();
witnessing::btc_mempool::start(scope, settings.btc, store.clone(), btc_network);
let mut witness_receiver = witness_sender.subscribe();

while let Ok(call) = witness_receiver.recv().await {
handle_call(call, &mut store, env_params.chainflip_network, state_chain_client.deref())
.await?
}
let btc_network = witnessing::start(scope, settings.clone(), store.clone())
.await?
.chainflip_network
.into();
witnessing::btc_mempool::start(scope, settings.btc, store, btc_network);

Ok(())
}
Expand Down
30 changes: 16 additions & 14 deletions api/bin/chainflip-ingress-egress-tracker/src/witnessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ mod dot;
mod eth;
pub mod state_chain;

use std::{collections::HashMap, sync::Arc};

use self::state_chain::handle_call;
use crate::{settings::DepositTrackerSettings, store::RedisStore};
use anyhow::anyhow;
use cf_chains::dot::PolkadotHash;
use cf_primitives::{chains::assets::eth::Asset, NetworkEnvironment};
use chainflip_engine::{
Expand All @@ -18,10 +19,9 @@ use chainflip_engine::{
witness::common::epoch_source::EpochSource,
};
use sp_core::H160;
use std::{collections::HashMap, ops::Deref};
use utilities::task_scope;

use crate::settings::DepositTrackerSettings;

#[derive(Clone)]
pub(super) struct EnvironmentParameters {
eth_chain_id: u64,
Expand Down Expand Up @@ -105,8 +105,8 @@ async fn get_env_parameters(state_chain_client: &StateChainClient<()>) -> Enviro
pub(super) async fn start(
scope: &task_scope::Scope<'_, anyhow::Error>,
settings: DepositTrackerSettings,
witness_sender: tokio::sync::broadcast::Sender<state_chain_runtime::RuntimeCall>,
) -> anyhow::Result<(Arc<StateChainClient<()>>, EnvironmentParameters)> {
store: RedisStore,
) -> anyhow::Result<EnvironmentParameters> {
let (state_chain_stream, unfinalized_chain_stream, state_chain_client) = {
state_chain_observer::client::StateChainClient::connect_without_account(
scope,
Expand All @@ -117,20 +117,22 @@ pub(super) async fn start(
};

let env_params = get_env_parameters(&state_chain_client).await;
let chainflip_network = env_params.chainflip_network;

let epoch_source =
EpochSource::builder(scope, state_chain_stream.clone(), state_chain_client.clone()).await;

let witness_call = {
let witness_sender = witness_sender.clone();
let state_chain_client = state_chain_client.clone();
move |call: state_chain_runtime::RuntimeCall, _epoch_index| {
let witness_sender = witness_sender.clone();
let mut store = store.clone();
let state_chain_client = state_chain_client.clone();

async move {
// Send may fail if there aren't any subscribers,
// but it is safe to ignore the error.
if let Ok(n) = witness_sender.send(call.clone()) {
tracing::info!("Broadcasting witnesser call {:?} to {} subscribers", call, n);
}
handle_call(call, &mut store, chainflip_network, state_chain_client.deref())
.await
.map_err(|err| anyhow!("failed to handle call: {err:?}"))
.unwrap()
}
}
};
Expand Down Expand Up @@ -168,5 +170,5 @@ pub(super) async fn start(
)
.await?;

Ok((state_chain_client, env_params))
Ok(env_params)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{Storable, Store};
use bitcoin::{Amount, BlockHash, Network, ScriptBuf, Transaction, Txid};
use bitcoin::{BlockHash, Network, ScriptBuf, Transaction, Txid};
use cf_chains::btc::BitcoinNetwork;
use cf_primitives::ForeignChain;
use chainflip_engine::{
Expand All @@ -15,19 +15,30 @@ use std::{
use tracing::{error, info};
use utilities::task_scope;

#[derive(Clone, Serialize)]
#[derive(Clone)]
pub struct QueryResult {
confirmations: u32,
#[serde(skip_serializing)]
destination: String,
value: f64,
value: u64,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to return sats because that's our practice across the board

tx_hash: Txid,
}

impl Serialize for QueryResult {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeMap;

let mut map = serializer.serialize_map(Some(3))?;
map.serialize_entry("confirmations", &self.confirmations)?;
map.serialize_entry("tx_hash", &self.tx_hash)?;
map.serialize_entry("value", &format!("0x{:x}", self.value))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hex encode the u64

map.end()
}
}

impl Storable for QueryResult {
fn get_key(&self) -> String {
let chain = ForeignChain::Bitcoin.to_string();
format!("confirmations:{chain}:{}", self.destination)
format!("mempool:{chain}:{}", self.destination)
}

fn get_expiry_duration(&self) -> Duration {
Expand Down Expand Up @@ -122,7 +133,7 @@ async fn update_cache<T: BtcRpcApi>(
QueryResult {
destination: addr,
confirmations: 0,
value: Amount::from_sat(txout.value).to_btc(),
value: txout.value,
tx_hash: txid,
},
);
Expand Down Expand Up @@ -151,7 +162,7 @@ async fn update_cache<T: BtcRpcApi>(
QueryResult {
destination: addr,
confirmations,
value: txout.value.to_btc(),
value: txout.value.to_sat(),
tx_hash,
},
);
Expand Down Expand Up @@ -207,23 +218,21 @@ pub fn start<S: Store>(

#[cfg(test)]
mod tests {
use super::*;
use anyhow::anyhow;
use std::collections::BTreeMap;

use bitcoin::{
absolute::{Height, LockTime},
address::{self},
block::Version,
hash_types::TxMerkleNode,
hashes::Hash,
secp256k1::rand::{self, Rng},
TxOut,
Amount, TxOut,
};
use chainflip_engine::btc::rpc::{
BlockHeader, Difficulty, VerboseBlock, VerboseTransaction, VerboseTxOut,
};

use super::*;
use std::collections::BTreeMap;

#[derive(Clone)]
struct MockBtcRpc {
Expand Down Expand Up @@ -514,6 +523,6 @@ mod tests {
let cache: Cache = Default::default();
let cache = update_cache(&btc, cache.clone(), Network::Bitcoin).await.unwrap();
assert_eq!(cache.transactions.get(&address1).unwrap().confirmations, 3);
assert_eq!(cache.transactions.get(&address1).unwrap().value, tx_value.to_btc());
assert_eq!(cache.transactions.get(&address1).unwrap().value, tx_value.to_sat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
source: api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs
expression: "store.storage.get(\"broadcast:Ethereum:1\").unwrap()"
---
{"tx_out_id":{"deposit_chain":"Ethereum","signature":{"k_times_g_address":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"s":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"type":"broadcast"}
{"tx_out_id":{"signature":{"k_times_g_address":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"s":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
source: api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs
expression: "store.storage.get(format!(\"deposit:Polkadot:{}\", format!\n (\"0x{}\", hex ::\n encode(polkadot_account_id.aliased_ref()))).as_str()).unwrap()"
---
[{"amount":"0x64","asset":{"asset":"DOT","chain":"Polkadot"},"deposit_chain_block_height":1,"type":"deposit"}]
[{"amount":"0x64","asset":"DOT","deposit_chain_block_height":1}]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
source: api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs
expression: "store.storage.get(format!(\"deposit:Ethereum:{}\",\n eth_address_str2.to_lowercase()).as_str()).unwrap()"
---
[{"amount":"0x64","asset":{"asset":"ETH","chain":"Ethereum"},"deposit_chain_block_height":1,"type":"deposit"}]
[{"amount":"0x64","asset":"ETH","deposit_chain_block_height":1}]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
source: api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs
expression: "store.storage.get(format!(\"deposit:Ethereum:{}\",\n eth_address_str1.to_lowercase()).as_str()).unwrap()"
---
[{"amount":"0x64","asset":{"asset":"ETH","chain":"Ethereum"},"deposit_chain_block_height":1,"type":"deposit"},{"amount":"0x1e8480","asset":{"asset":"ETH","chain":"Ethereum"},"deposit_chain_block_height":1,"type":"deposit"}]
[{"amount":"0x64","asset":"ETH","deposit_chain_block_height":1},{"amount":"0x1e8480","asset":"ETH","deposit_chain_block_height":1}]
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
source: api/bin/chainflip-ingress-egress-tracker/src/witnessing/state_chain.rs
expression: "store.storage.get(format!(\"deposit:Ethereum:{}\",\n eth_address_str1.to_lowercase()).as_str()).unwrap()"
---
[{"amount":"0x64","asset":{"asset":"ETH","chain":"Ethereum"},"deposit_chain_block_height":1,"type":"deposit"}]
[{"amount":"0x64","asset":"ETH","deposit_chain_block_height":1}]
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,22 @@ use serde::Serialize;
use utilities::rpc::NumberOrHex;

#[derive(Serialize)]
pub struct WitnessAsset {
pub chain: ForeignChain,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the chain is part of the key in redis so it doesn't need to be serialized

pub asset: Asset,
}

impl From<cf_chains::assets::eth::Asset> for WitnessAsset {
fn from(asset: cf_chains::assets::eth::Asset) -> Self {
Self { chain: ForeignChain::Ethereum, asset: asset.into() }
}
}

impl From<cf_chains::assets::dot::Asset> for WitnessAsset {
fn from(asset: cf_chains::assets::dot::Asset) -> Self {
Self { chain: ForeignChain::Polkadot, asset: asset.into() }
}
}

impl From<cf_chains::assets::btc::Asset> for WitnessAsset {
fn from(asset: cf_chains::assets::btc::Asset) -> Self {
Self { chain: ForeignChain::Bitcoin, asset: asset.into() }
}
}

#[derive(Serialize)]
#[serde(tag = "deposit_chain")]
#[serde(untagged)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this tag is part of the key in redis so it doesn't need to be serialized

enum TransactionId {
Bitcoin { hash: String },
Ethereum { signature: SchnorrVerificationComponents },
Polkadot { signature: String },
}

#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(untagged)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this tag is part of the key in redis so it doesn't need to be serialized

enum WitnessInformation {
Deposit {
deposit_chain_block_height: <AnyChain as Chain>::ChainBlockNumber,
#[serde(skip_serializing)]
deposit_address: String,
amount: NumberOrHex,
asset: WitnessAsset,
asset: Asset,
},
Broadcast {
#[serde(skip_serializing)]
Expand All @@ -66,7 +42,7 @@ enum WitnessInformation {
impl WitnessInformation {
fn to_foreign_chain(&self) -> ForeignChain {
match self {
Self::Deposit { asset, .. } => asset.chain,
Self::Deposit { asset, .. } => (*asset).into(),
Self::Broadcast { tx_out_id, .. } => match tx_out_id {
TransactionId::Bitcoin { .. } => ForeignChain::Bitcoin,
TransactionId::Ethereum { .. } => ForeignChain::Ethereum,
Expand Down