diff --git a/boost_manager/tests/integrations/updater_tests.rs b/boost_manager/tests/integrations/updater_tests.rs index a0509e77a..cc47c7b73 100644 --- a/boost_manager/tests/integrations/updater_tests.rs +++ b/boost_manager/tests/integrations/updater_tests.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use boost_manager::{db, updater::Updater, OnChainStatus}; use chrono::{DateTime, Utc}; use file_store::hex_boost::BoostedHexActivation; -use solana::{start_boost::SolanaNetwork, GetSignature}; +use solana::{start_boost::SolanaNetwork, GetSignature, SolanaRpcError}; use solana_sdk::signature::Signature; use sqlx::{PgPool, Postgres, Transaction}; use std::{string::ToString, sync::Mutex, time::Duration}; @@ -23,12 +23,6 @@ pub struct MockSolanaConnection { error: Option, } -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("not found")] - SubmitError(String), -} - impl MockSolanaConnection { fn ok() -> Self { Self { @@ -47,28 +41,28 @@ impl MockSolanaConnection { #[async_trait] impl SolanaNetwork for MockSolanaConnection { - type Error = Error; type Transaction = MockTransaction; async fn make_start_boost_transaction( &self, batch: &[BoostedHexActivation], - ) -> Result { + ) -> Result { Ok(MockTransaction { signature: Signature::new_unique(), _activations: batch.to_owned(), }) } - async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), Self::Error> { + async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), SolanaRpcError> { self.submitted.lock().unwrap().push(txn.clone()); + self.error .as_ref() - .map(|str| Err(Error::SubmitError(str.clone()))) + .map(|err| Err(SolanaRpcError::Test(err.to_owned()))) .unwrap_or(Ok(())) } - async fn confirm_transaction(&self, _id: &str) -> Result { + async fn confirm_transaction(&self, _id: &str) -> Result { Ok(true) } } diff --git a/iot_config/src/client/org_client.rs b/iot_config/src/client/org_client.rs index b0e41087e..00d376d87 100644 --- a/iot_config/src/client/org_client.rs +++ b/iot_config/src/client/org_client.rs @@ -11,12 +11,10 @@ use helium_proto::services::iot_config::{ #[async_trait] pub trait Orgs: Send + Sync + 'static { - type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static; - - async fn get(&mut self, oui: u64) -> Result; - async fn list(&mut self) -> Result, Self::Error>; - async fn enable(&mut self, oui: u64) -> Result<(), Self::Error>; - async fn disable(&mut self, oui: u64) -> Result<(), Self::Error>; + async fn get(&mut self, oui: u64) -> Result; + async fn list(&mut self) -> Result, ClientError>; + async fn enable(&mut self, oui: u64) -> Result<(), ClientError>; + async fn disable(&mut self, oui: u64) -> Result<(), ClientError>; } #[derive(Clone)] @@ -42,8 +40,6 @@ impl OrgClient { #[async_trait] impl Orgs for OrgClient { - type Error = ClientError; - async fn get(&mut self, oui: u64) -> Result { tracing::debug!(%oui, "retrieving org"); diff --git a/iot_packet_verifier/src/balances.rs b/iot_packet_verifier/src/balances.rs index 57f559e71..a5ab6cea3 100644 --- a/iot_packet_verifier/src/balances.rs +++ b/iot_packet_verifier/src/balances.rs @@ -3,7 +3,7 @@ use crate::{ verifier::Debiter, }; use helium_crypto::PublicKeyBinary; -use solana::burn::SolanaNetwork; +use solana::{burn::SolanaNetwork, SolanaRpcError}; use std::{ collections::{hash_map::Entry, HashMap}, sync::Arc, @@ -62,8 +62,6 @@ impl Debiter for BalanceCache where S: SolanaNetwork, { - type Error = S::Error; - /// Debits the balance from the cache, returning the remaining balance as an /// option if there was enough and none otherwise. async fn debit_if_sufficient( @@ -71,7 +69,7 @@ where payer: &PublicKeyBinary, amount: u64, trigger_balance_check_threshold: u64, - ) -> Result, S::Error> { + ) -> Result, SolanaRpcError> { let mut payer_accounts = self.payer_accounts.lock().await; // Fetch the balance if we haven't seen the payer before diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 750860511..3f19afd10 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -5,7 +5,7 @@ use crate::{ }, }; use futures::{future::LocalBoxFuture, TryFutureExt}; -use solana::{burn::SolanaNetwork, GetSignature}; +use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError}; use std::time::Duration; use task_manager::ManagedTask; use tokio::time::{self, MissedTickBehavior}; @@ -37,15 +37,15 @@ where } #[derive(thiserror::Error, Debug)] -pub enum BurnError { +pub enum BurnError { #[error("Join error: {0}")] JoinError(#[from] tokio::task::JoinError), #[error("Sql error: {0}")] SqlError(#[from] sqlx::Error), #[error("Solana error: {0}")] - SolanaError(S), + SolanaError(#[from] SolanaRpcError), #[error("Confirm pending transaction error: {0}")] - ConfirmPendingError(#[from] ConfirmPendingError), + ConfirmPendingError(#[from] ConfirmPendingError), } impl Burner { @@ -69,32 +69,31 @@ where P: PendingTables + Send + Sync + 'static, S: SolanaNetwork, { - pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError> { + pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError> { tracing::info!("Starting burner"); let mut burn_timer = time::interval(self.burn_period); burn_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { - #[rustfmt::skip] tokio::select! { - biased; - _ = shutdown.clone() => break, - _ = burn_timer.tick() => { - match self.burn().await { - Ok(()) => continue, - Err(err) => { - tracing::error!("Error while burning data credits: {err}"); - confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?; - } - } - } + biased; + _ = shutdown.clone() => break, + _ = burn_timer.tick() => { + match self.burn().await { + Ok(()) => continue, + Err(err) => { + tracing::error!("Error while burning data credits: {err}"); + confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?; + } + } + } } } tracing::info!("Stopping burner"); Ok(()) } - pub async fn burn(&mut self) -> Result<(), BurnError> { + pub async fn burn(&mut self) -> Result<(), BurnError> { // Fetch the next payer and amount that should be burn. If no such burn // exists, perform no action. let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else { diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 5b4b21710..b442e77e2 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -84,8 +84,8 @@ where self.minimum_allowed_balance, &mut transaction, reports, - &self.valid_packets, - &self.invalid_packets, + &mut self.valid_packets, + &mut self.invalid_packets, ) .await?; transaction.commit().await?; diff --git a/iot_packet_verifier/src/pending.rs b/iot_packet_verifier/src/pending.rs index 0db62ce63..561059651 100644 --- a/iot_packet_verifier/src/pending.rs +++ b/iot_packet_verifier/src/pending.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use helium_crypto::PublicKeyBinary; -use solana::burn::SolanaNetwork; +use solana::{burn::SolanaNetwork, SolanaRpcError}; use solana_sdk::signature::Signature; use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction}; use std::{collections::HashMap, sync::Arc}; @@ -45,20 +45,20 @@ pub trait PendingTables { } #[derive(thiserror::Error, Debug)] -pub enum ConfirmPendingError { +pub enum ConfirmPendingError { #[error("Sqlx error: {0}")] SqlxError(#[from] sqlx::Error), #[error("Chrono error: {0}")] ChronoError(#[from] chrono::OutOfRangeError), #[error("Solana error: {0}")] - SolanaError(S), + SolanaError(#[from] SolanaRpcError), } pub async fn confirm_pending_txns( pending_tables: &impl PendingTables, solana: &S, balances: &BalanceStore, -) -> Result<(), ConfirmPendingError> +) -> Result<(), ConfirmPendingError> where S: SolanaNetwork, { @@ -165,7 +165,7 @@ impl PendingTables for PgPool { } #[async_trait] -impl AddPendingBurn for &'_ mut Transaction<'_, Postgres> { +impl AddPendingBurn for Transaction<'_, Postgres> { async fn add_burned_amount( &mut self, payer: &PublicKeyBinary, @@ -387,6 +387,7 @@ impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables { #[cfg(test)] mod test { + use crate::balances::PayerAccount; use super::*; @@ -397,11 +398,10 @@ mod test { #[async_trait] impl SolanaNetwork for MockConfirmed { - type Error = std::convert::Infallible; type Transaction = Signature; #[allow(clippy::diverging_sub_expression)] - async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result { + async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result { unreachable!() } @@ -410,7 +410,7 @@ mod test { &self, _payer: &PublicKeyBinary, _amount: u64, - ) -> Result { + ) -> Result { unreachable!() } @@ -418,11 +418,11 @@ mod test { async fn submit_transaction( &self, _transaction: &Self::Transaction, - ) -> Result<(), Self::Error> { + ) -> Result<(), SolanaRpcError> { unreachable!() } - async fn confirm_transaction(&self, txn: &Signature) -> Result { + async fn confirm_transaction(&self, txn: &Signature) -> Result { Ok(self.0.contains(txn)) } } diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 9e96c5040..6c5b24dc8 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -11,11 +11,10 @@ use helium_proto::services::{ packet_verifier::{InvalidPacket, InvalidPacketReason, ValidPacket}, router::packet_router_packet_report_v1::PacketType, }; -use iot_config::client::org_client::Orgs; -use solana::burn::SolanaNetwork; +use iot_config::client::{org_client::Orgs, ClientError}; +use solana::{burn::SolanaNetwork, SolanaRpcError}; use std::{ collections::{hash_map::Entry, HashMap}, - convert::Infallible, fmt::Debug, sync::Arc, }; @@ -31,17 +30,17 @@ pub struct Verifier { } #[derive(thiserror::Error, Debug)] -pub enum VerificationError { +pub enum VerificationError { #[error("Debit error: {0}")] - DebitError(DE), + DebitError(#[from] SolanaRpcError), #[error("Config server error: {0}")] - ConfigError(CE), + ConfigError(#[from] ConfigServerError), #[error("Burn error: {0}")] BurnError(#[from] sqlx::Error), #[error("Valid packet writer error: {0}")] - ValidPacketWriterError(VPE), + ValidPacketWriterError(file_store::Error), #[error("Invalid packet writer error: {0}")] - InvalidPacketWriterError(IPE), + InvalidPacketWriterError(file_store::Error), } impl Verifier @@ -50,20 +49,14 @@ where C: ConfigServer, { /// Verify a stream of packet reports. Writes out `valid_packets` and `invalid_packets`. - pub async fn verify( + pub async fn verify( &mut self, minimum_allowed_balance: u64, - mut pending_burns: B, - reports: R, - mut valid_packets: VP, - mut invalid_packets: IP, - ) -> Result<(), VerificationError> - where - B: AddPendingBurn, - R: Stream, - VP: PacketWriter, - IP: PacketWriter, - { + pending_burns: &mut impl AddPendingBurn, + reports: impl Stream, + valid_packets: &mut impl PacketWriter, + invalid_packets: &mut impl PacketWriter, + ) -> Result<(), VerificationError> { let mut org_cache = HashMap::::new(); tokio::pin!(reports); @@ -82,19 +75,16 @@ where let payer = self .config_server .fetch_org(report.oui, &mut org_cache) - .await - .map_err(VerificationError::ConfigError)?; + .await?; if let Some(remaining_balance) = self .debiter .debit_if_sufficient(&payer, debit_amount, minimum_allowed_balance) - .await - .map_err(VerificationError::DebitError)? + .await? { pending_burns .add_burned_amount(&payer, debit_amount) - .await - .map_err(VerificationError::BurnError)?; + .await?; valid_packets .write(ValidPacket { @@ -108,10 +98,7 @@ where .map_err(VerificationError::ValidPacketWriterError)?; if remaining_balance < minimum_allowed_balance { - self.config_server - .disable_org(report.oui) - .await - .map_err(VerificationError::ConfigError)?; + self.config_server.disable_org(report.oui).await?; } } else { invalid_packets @@ -123,10 +110,8 @@ where }) .await .map_err(VerificationError::InvalidPacketWriterError)?; - self.config_server - .disable_org(report.oui) - .await - .map_err(VerificationError::ConfigError)?; + + self.config_server.disable_org(report.oui).await?; } } @@ -143,8 +128,6 @@ pub fn payload_size_to_dc(payload_size: u64) -> u64 { #[async_trait] pub trait Debiter { - type Error; - /// Debit the balance from the account. If the debit was successful, /// return the remaining amount. async fn debit_if_sufficient( @@ -152,19 +135,17 @@ pub trait Debiter { payer: &PublicKeyBinary, amount: u64, trigger_balance_check_threshold: u64, - ) -> Result, Self::Error>; + ) -> Result, SolanaRpcError>; } #[async_trait] impl Debiter for Arc>> { - type Error = Infallible; - async fn debit_if_sufficient( &self, payer: &PublicKeyBinary, amount: u64, _trigger_balance_check_threshold: u64, - ) -> Result, Infallible> { + ) -> Result, SolanaRpcError> { let map = self.lock().await; let balance = map.get(payer).unwrap(); // Don't debit the amount if we're mocking. That is a job for the burner. @@ -182,19 +163,17 @@ pub struct Org { #[async_trait] pub trait ConfigServer: Sized + Send + Sync + 'static { - type Error: Send + Sync + 'static; - async fn fetch_org( &self, oui: u64, cache: &mut HashMap, - ) -> Result; + ) -> Result; - async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>; + async fn disable_org(&self, oui: u64) -> Result<(), ConfigServerError>; - async fn enable_org(&self, oui: u64) -> Result<(), Self::Error>; + async fn enable_org(&self, oui: u64) -> Result<(), ConfigServerError>; - async fn list_orgs(&self) -> Result, Self::Error>; + async fn list_orgs(&self) -> Result, ConfigServerError>; async fn monitor_funds( self, @@ -203,7 +182,7 @@ pub trait ConfigServer: Sized + Send + Sync + 'static { minimum_allowed_balance: u64, monitor_period: Duration, shutdown: triggered::Listener, - ) -> Result<(), MonitorError> + ) -> Result<(), MonitorError> where S: SolanaNetwork, B: BalanceStore, @@ -212,22 +191,12 @@ pub trait ConfigServer: Sized + Send + Sync + 'static { loop { tracing::info!("Checking if any orgs need to be re-enabled"); - for Org { locked, payer, oui } in self - .list_orgs() - .await - .map_err(MonitorError::ConfigClientError)? - .into_iter() - { + for Org { locked, payer, oui } in self.list_orgs().await?.into_iter() { if locked { - let balance = solana - .payer_balance(&payer) - .await - .map_err(MonitorError::SolanaError)?; + let balance = solana.payer_balance(&payer).await?; if balance >= minimum_allowed_balance { balances.set_balance(&payer, balance).await; - self.enable_org(oui) - .await - .map_err(MonitorError::ConfigClientError)?; + self.enable_org(oui).await?; } } } @@ -267,19 +236,19 @@ impl BalanceStore for Arc>> { } #[derive(thiserror::Error, Debug)] -pub enum MonitorError { +pub enum MonitorError { #[error("Join error: {0}")] JoinError(#[from] JoinError), #[error("Config client error: {0}")] - ConfigClientError(E), + ConfigClientError(#[from] ConfigServerError), #[error("Solana error: {0}")] - SolanaError(S), + SolanaError(#[from] SolanaRpcError), } #[derive(thiserror::Error, Debug)] -pub enum ConfigServerError { +pub enum ConfigServerError { #[error("orgs error: {0}")] - OrgError(#[from] OrgsError), + OrgError(#[from] ClientError), #[error("not found: {0}")] NotFound(u64), } @@ -303,13 +272,11 @@ impl ConfigServer for Arc>> where O: Orgs, { - type Error = ConfigServerError; - async fn fetch_org( &self, oui: u64, oui_cache: &mut HashMap, - ) -> Result { + ) -> Result { if let Entry::Vacant(e) = oui_cache.entry(oui) { let pubkey = PublicKeyBinary::from( self.lock() @@ -326,7 +293,7 @@ where Ok(oui_cache.get(&oui).unwrap().clone()) } - async fn disable_org(&self, oui: u64) -> Result<(), Self::Error> { + async fn disable_org(&self, oui: u64) -> Result<(), ConfigServerError> { let mut cached_client = self.lock().await; if *cached_client.locked_cache.entry(oui).or_insert(true) { cached_client.orgs.disable(oui).await?; @@ -335,7 +302,7 @@ where Ok(()) } - async fn enable_org(&self, oui: u64) -> Result<(), Self::Error> { + async fn enable_org(&self, oui: u64) -> Result<(), ConfigServerError> { let mut cached_client = self.lock().await; if !*cached_client.locked_cache.entry(oui).or_insert(false) { cached_client.orgs.enable(oui).await?; @@ -344,7 +311,7 @@ where Ok(()) } - async fn list_orgs(&self) -> Result, Self::Error> { + async fn list_orgs(&self) -> Result, ConfigServerError> { Ok(self .lock() .await @@ -363,29 +330,23 @@ where #[async_trait] pub trait PacketWriter { - type Error; - // The redundant &mut receivers we see for PacketWriter and Burner are so // that we are able to resolve to either a mutable or immutable ref without // having to take ownership of the mutable reference. - async fn write(&mut self, packet: T) -> Result<(), Self::Error>; + async fn write(&mut self, packet: T) -> Result<(), file_store::Error>; } #[async_trait] -impl PacketWriter for &'_ FileSinkClient { - type Error = file_store::Error; - - async fn write(&mut self, packet: T) -> Result<(), Self::Error> { +impl PacketWriter for FileSinkClient { + async fn write(&mut self, packet: T) -> Result<(), file_store::Error> { (*self).write(packet, []).await?; Ok(()) } } #[async_trait] -impl PacketWriter for &'_ mut Vec { - type Error = (); - - async fn write(&mut self, packet: T) -> Result<(), ()> { +impl PacketWriter for Vec { + async fn write(&mut self, packet: T) -> Result<(), file_store::Error> { (*self).push(packet); Ok(()) } diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index f4251dda0..b571bd106 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -14,11 +14,11 @@ use iot_packet_verifier::{ balances::{BalanceCache, PayerAccount}, burner::Burner, pending::{confirm_pending_txns, AddPendingBurn, Burn, MockPendingTables, PendingTables}, - verifier::{payload_size_to_dc, ConfigServer, Org, Verifier, BYTES_PER_DC}, + verifier::{payload_size_to_dc, ConfigServer, ConfigServerError, Org, Verifier, BYTES_PER_DC}, }; use solana::{ burn::{MockTransaction, SolanaNetwork}, - GetSignature, + GetSignature, SolanaRpcError, }; use solana_sdk::signature::Signature; use sqlx::PgPool; @@ -53,27 +53,25 @@ impl MockConfigServer { #[async_trait] impl ConfigServer for MockConfigServer { - type Error = (); - async fn fetch_org( &self, oui: u64, _cache: &mut HashMap, - ) -> Result { + ) -> Result { Ok(self.payers.lock().await.get(&oui).unwrap().payer.clone()) } - async fn disable_org(&self, oui: u64) -> Result<(), ()> { + async fn disable_org(&self, oui: u64) -> Result<(), ConfigServerError> { self.payers.lock().await.get_mut(&oui).unwrap().enabled = false; Ok(()) } - async fn enable_org(&self, oui: u64) -> Result<(), ()> { + async fn enable_org(&self, oui: u64) -> Result<(), ConfigServerError> { self.payers.lock().await.get_mut(&oui).unwrap().enabled = true; Ok(()) } - async fn list_orgs(&self) -> Result, ()> { + async fn list_orgs(&self) -> Result, ConfigServerError> { Ok(self .payers .lock() @@ -204,7 +202,7 @@ async fn test_config_unlocking() { verifier .verify( 1, - balances.clone(), + &mut balances.clone(), stream::iter(vec![ packet_report(0, 0, 24, vec![1], false), packet_report(0, 1, 48, vec![2], false), @@ -262,7 +260,7 @@ async fn test_config_unlocking() { verifier .verify( 1, - balances.clone(), + &mut balances.clone(), stream::iter(vec![ packet_report(0, 0, 24, vec![1], false), packet_report(0, 1, 48, vec![2], false), @@ -319,7 +317,7 @@ async fn test_verifier_free_packets() { verifier .verify( 1, - balances.clone(), + &mut balances.clone(), stream::iter(packets), &mut valid_packets, &mut invalid_packets, @@ -393,7 +391,7 @@ async fn test_verifier() { verifier .verify( 1, - balances.clone(), + &mut balances.clone(), stream::iter(packets), &mut valid_packets, &mut invalid_packets, @@ -474,7 +472,7 @@ async fn test_end_to_end() { verifier .verify( 1, - pending_burns.clone(), + &mut pending_burns.clone(), stream::iter(vec![ packet_report(0, 0, BYTES_PER_DC as u32, vec![1], false), packet_report(0, 1, BYTES_PER_DC as u32, vec![2], false), @@ -554,7 +552,7 @@ async fn test_end_to_end() { verifier .verify( 1, - pending_burns.clone(), + &mut pending_burns.clone(), stream::iter(vec![packet_report( 0, 4, @@ -593,10 +591,9 @@ impl MockSolanaNetwork { #[async_trait] impl SolanaNetwork for MockSolanaNetwork { - type Error = std::convert::Infallible; type Transaction = MockTransaction; - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { self.ledger.payer_balance(payer).await } @@ -604,16 +601,16 @@ impl SolanaNetwork for MockSolanaNetwork { &self, payer: &PublicKeyBinary, amount: u64, - ) -> Result { + ) -> Result { self.ledger.make_burn_transaction(payer, amount).await } - async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), Self::Error> { + async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), SolanaRpcError> { self.confirmed.lock().await.insert(txn.signature); self.ledger.submit_transaction(txn).await } - async fn confirm_transaction(&self, txn: &Signature) -> Result { + async fn confirm_transaction(&self, txn: &Signature) -> Result { Ok(self.confirmed.lock().await.contains(txn)) } } @@ -644,7 +641,7 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> { // Add both the burn amounts to the pending burns table { let mut transaction = pool.begin().await.unwrap(); - (&mut transaction) + transaction .add_burned_amount(&payer, CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT) .await .unwrap(); diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 7ab9b362b..1c5296d5a 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -3,7 +3,7 @@ use std::time::Duration; use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::{burn::SolanaNetwork, GetSignature}; +use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError}; use sqlx::{Pool, Postgres}; use tracing::Instrument; @@ -90,7 +90,7 @@ where async fn transaction_confirmation_check( &self, pool: &Pool, - err: S::Error, + err: SolanaRpcError, txn: S::Transaction, payer: PublicKeyBinary, total_dcs: u64, diff --git a/solana/src/burn.rs b/solana/src/burn.rs index eb84f8127..bcdfcd9fc 100644 --- a/solana/src/burn.rs +++ b/solana/src/burn.rs @@ -23,30 +23,28 @@ use solana_sdk::{ signer::Signer, transaction::Transaction, }; -use std::convert::Infallible; use std::{collections::HashMap, str::FromStr}; -use std::{ - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{sync::Arc, time::SystemTime}; use tokio::sync::Mutex; #[async_trait] pub trait SolanaNetwork: Clone + Send + Sync + 'static { - type Error: std::error::Error + Send + Sync + 'static; type Transaction: GetSignature + Send + Sync + 'static; - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result; + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result; async fn make_burn_transaction( &self, payer: &PublicKeyBinary, amount: u64, - ) -> Result; + ) -> Result; - async fn submit_transaction(&self, transaction: &Self::Transaction) -> Result<(), Self::Error>; + async fn submit_transaction( + &self, + transaction: &Self::Transaction, + ) -> Result<(), SolanaRpcError>; - async fn confirm_transaction(&self, txn: &Signature) -> Result; + async fn confirm_transaction(&self, txn: &Signature) -> Result; } #[derive(Debug, Deserialize)] @@ -92,7 +90,9 @@ impl SolanaRpc { let dc_mint = settings.dc_mint.parse()?; let dnt_mint = settings.dnt_mint.parse()?; let Ok(keypair) = read_keypair_file(&settings.burn_keypair) else { - return Err(SolanaRpcError::FailedToReadKeypairError); + return Err(SolanaRpcError::FailedToReadKeypairError( + settings.burn_keypair.to_owned(), + )); }; let provider = RpcClient::new_with_commitment(settings.rpc_url.clone(), CommitmentConfig::finalized()); @@ -114,10 +114,9 @@ impl SolanaRpc { #[async_trait] impl SolanaNetwork for SolanaRpc { - type Error = SolanaRpcError; type Transaction = Transaction; - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { let ddc_key = delegated_data_credits(&self.program_cache.sub_dao, payer); let (escrow_account, _) = Pubkey::find_program_address( &["escrow_dc_account".as_bytes(), &ddc_key.to_bytes()], @@ -154,7 +153,7 @@ impl SolanaNetwork for SolanaRpc { &self, payer: &PublicKeyBinary, amount: u64, - ) -> Result { + ) -> Result { // Fetch the sub dao epoch info: const EPOCH_LENGTH: u64 = 60 * 60 * 24; let epoch = SystemTime::now() @@ -209,8 +208,7 @@ impl SolanaNetwork for SolanaRpc { &priority_fee_accounts, self.min_priority_fee, ) - .await - .map_err(|e| SolanaRpcError::RpcClientError(Box::new(e)))?; + .await?; tracing::info!(%priority_fee); @@ -253,7 +251,7 @@ impl SolanaNetwork for SolanaRpc { )) } - async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), Self::Error> { + async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), SolanaRpcError> { let config = solana_client::rpc_config::RpcSendTransactionConfig { skip_preflight: true, ..Default::default() @@ -278,12 +276,12 @@ impl SolanaNetwork for SolanaRpc { transaction = %signature, "Data credit burn failed: {err:?}" ); - Err(SolanaRpcError::RpcClientError(Box::new(err))) + Err(err.into()) } } } - async fn confirm_transaction(&self, txn: &Signature) -> Result { + async fn confirm_transaction(&self, txn: &Signature) -> Result { Ok(matches!( self.provider .get_signature_status_with_commitment_and_history( @@ -431,10 +429,9 @@ impl GetSignature for PossibleTransaction { #[async_trait] impl SolanaNetwork for Option> { - type Error = SolanaRpcError; type Transaction = PossibleTransaction; - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { if let Some(ref rpc) = self { rpc.payer_balance(payer).await } else { @@ -446,7 +443,7 @@ impl SolanaNetwork for Option> { &self, payer: &PublicKeyBinary, amount: u64, - ) -> Result { + ) -> Result { if let Some(ref rpc) = self { Ok(PossibleTransaction::Transaction( rpc.make_burn_transaction(payer, amount).await?, @@ -456,7 +453,10 @@ impl SolanaNetwork for Option> { } } - async fn submit_transaction(&self, transaction: &Self::Transaction) -> Result<(), Self::Error> { + async fn submit_transaction( + &self, + transaction: &Self::Transaction, + ) -> Result<(), SolanaRpcError> { match (self, transaction) { (Some(ref rpc), PossibleTransaction::Transaction(ref txn)) => { rpc.submit_transaction(txn).await? @@ -467,7 +467,7 @@ impl SolanaNetwork for Option> { Ok(()) } - async fn confirm_transaction(&self, txn: &Signature) -> Result { + async fn confirm_transaction(&self, txn: &Signature) -> Result { if let Some(ref rpc) = self { rpc.confirm_transaction(txn).await } else { @@ -490,10 +490,9 @@ impl GetSignature for MockTransaction { #[async_trait] impl SolanaNetwork for Arc>> { - type Error = Infallible; type Transaction = MockTransaction; - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { Ok(*self.lock().await.get(payer).unwrap()) } @@ -501,7 +500,7 @@ impl SolanaNetwork for Arc>> { &self, payer: &PublicKeyBinary, amount: u64, - ) -> Result { + ) -> Result { Ok(MockTransaction { signature: Signature::new_unique(), payer: payer.clone(), @@ -509,12 +508,12 @@ impl SolanaNetwork for Arc>> { }) } - async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), Self::Error> { + async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), SolanaRpcError> { *self.lock().await.get_mut(&txn.payer).unwrap() -= txn.amount; Ok(()) } - async fn confirm_transaction(&self, _txn: &Signature) -> Result { + async fn confirm_transaction(&self, _txn: &Signature) -> Result { Ok(true) } } diff --git a/solana/src/lib.rs b/solana/src/lib.rs index df1f81073..526dabf3c 100644 --- a/solana/src/lib.rs +++ b/solana/src/lib.rs @@ -17,7 +17,7 @@ macro_rules! send_with_retry { Err(err) => { if attempt < 5 { attempt += 1; - tokio::time::sleep(Duration::from_secs(attempt)).await; + tokio::time::sleep(std::time::Duration::from_secs(attempt)).await; continue; } else { break Err(err); @@ -45,10 +45,13 @@ pub enum SolanaRpcError { InvalidKeypair, #[error("System time error: {0}")] SystemTimeError(#[from] SystemTimeError), - #[error("Failed to read keypair file")] - FailedToReadKeypairError, + #[error("Failed to read keypair file: {0}")] + FailedToReadKeypairError(String), #[error("crypto error: {0}")] Crypto(#[from] helium_crypto::Error), + // TODO: Remove when fully integrated with helium-lib + #[error("Test Error")] + Test(String), } impl From for SolanaRpcError { diff --git a/solana/src/start_boost.rs b/solana/src/start_boost.rs index 2bc8eadaf..1245ac2d6 100644 --- a/solana/src/start_boost.rs +++ b/solana/src/start_boost.rs @@ -16,21 +16,23 @@ use solana_sdk::{ signer::Signer, transaction::Transaction, }; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; #[async_trait] pub trait SolanaNetwork: Send + Sync + 'static { - type Error: std::error::Error + Send + Sync + 'static; type Transaction: GetSignature + Send + Sync + 'static; async fn make_start_boost_transaction( &self, batch: &[BoostedHexActivation], - ) -> Result; + ) -> Result; - async fn submit_transaction(&self, transaction: &Self::Transaction) -> Result<(), Self::Error>; + async fn submit_transaction( + &self, + transaction: &Self::Transaction, + ) -> Result<(), SolanaRpcError>; - async fn confirm_transaction(&self, txn: &str) -> Result; + async fn confirm_transaction(&self, txn: &str) -> Result; } #[derive(Debug, Deserialize)] @@ -50,7 +52,9 @@ pub struct SolanaRpc { impl SolanaRpc { pub async fn new(settings: &Settings) -> Result, SolanaRpcError> { let Ok(keypair) = read_keypair_file(&settings.start_authority_keypair) else { - return Err(SolanaRpcError::FailedToReadKeypairError); + return Err(SolanaRpcError::FailedToReadKeypairError( + settings.start_authority_keypair.to_owned(), + )); }; let provider = RpcClient::new_with_commitment(settings.rpc_url.clone(), CommitmentConfig::finalized()); @@ -66,13 +70,12 @@ impl SolanaRpc { #[async_trait] impl SolanaNetwork for SolanaRpc { - type Error = SolanaRpcError; type Transaction = Transaction; async fn make_start_boost_transaction( &self, batch: &[BoostedHexActivation], - ) -> Result { + ) -> Result { let instructions = { let mut request = RequestBuilder::from( hexboosting::id(), @@ -112,7 +115,7 @@ impl SolanaNetwork for SolanaRpc { )) } - async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), Self::Error> { + async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), SolanaRpcError> { match send_with_retry!(self.provider.send_and_confirm_transaction(tx)) { Ok(signature) => { tracing::info!( @@ -127,12 +130,12 @@ impl SolanaNetwork for SolanaRpc { transaction = %signature, "hex start boost failed: {err:?}" ); - Err(SolanaRpcError::RpcClientError(Box::new(err))) + Err(err.into()) } } } - async fn confirm_transaction(&self, signature: &str) -> Result { + async fn confirm_transaction(&self, signature: &str) -> Result { let txn: Signature = signature.parse()?; Ok(matches!( self.provider @@ -162,13 +165,12 @@ impl GetSignature for PossibleTransaction { #[async_trait] impl SolanaNetwork for Option> { - type Error = SolanaRpcError; type Transaction = PossibleTransaction; async fn make_start_boost_transaction( &self, batch: &[BoostedHexActivation], - ) -> Result { + ) -> Result { if let Some(ref rpc) = self { Ok(PossibleTransaction::Transaction( rpc.make_start_boost_transaction(batch).await?, @@ -178,7 +180,10 @@ impl SolanaNetwork for Option> { } } - async fn submit_transaction(&self, transaction: &Self::Transaction) -> Result<(), Self::Error> { + async fn submit_transaction( + &self, + transaction: &Self::Transaction, + ) -> Result<(), SolanaRpcError> { match (self, transaction) { (Some(ref rpc), PossibleTransaction::Transaction(ref txn)) => { rpc.submit_transaction(txn).await? @@ -189,7 +194,7 @@ impl SolanaNetwork for Option> { Ok(()) } - async fn confirm_transaction(&self, txn: &str) -> Result { + async fn confirm_transaction(&self, txn: &str) -> Result { if let Some(ref rpc) = self { rpc.confirm_transaction(txn).await } else {