Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Solana Errors concrete types #924

Merged
merged 16 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions boost_manager/tests/integrations/updater_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use async_trait::async_trait;
use boost_manager::{db, updater::Updater, OnChainStatus};
use chrono::{DateTime, Utc};
use file_store::hex_boost::BoostedHexActivation;
use solana::{start_boost::SolanaNetwork, GetSignature};
use solana::{start_boost::SolanaNetwork, GetSignature, SolanaRpcError};
use solana_sdk::signature::Signature;
use sqlx::{PgPool, Postgres, Transaction};
use std::{string::ToString, sync::Mutex, time::Duration};
Expand All @@ -23,12 +23,6 @@ pub struct MockSolanaConnection {
error: Option<String>,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("not found")]
SubmitError(String),
}

impl MockSolanaConnection {
fn ok() -> Self {
Self {
Expand All @@ -47,28 +41,28 @@ impl MockSolanaConnection {

#[async_trait]
impl SolanaNetwork for MockSolanaConnection {
type Error = Error;
type Transaction = MockTransaction;

async fn make_start_boost_transaction(
&self,
batch: &[BoostedHexActivation],
) -> Result<Self::Transaction, Self::Error> {
) -> Result<Self::Transaction, SolanaRpcError> {
Ok(MockTransaction {
signature: Signature::new_unique(),
_activations: batch.to_owned(),
})
}

async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), Self::Error> {
async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), SolanaRpcError> {
self.submitted.lock().unwrap().push(txn.clone());

self.error
.as_ref()
.map(|str| Err(Error::SubmitError(str.clone())))
.map(|err| Err(SolanaRpcError::Test(err.to_owned())))
.unwrap_or(Ok(()))
}

async fn confirm_transaction(&self, _id: &str) -> Result<bool, Self::Error> {
async fn confirm_transaction(&self, _id: &str) -> Result<bool, SolanaRpcError> {
Ok(true)
}
}
Expand Down
12 changes: 4 additions & 8 deletions iot_config/src/client/org_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use helium_proto::services::iot_config::{

#[async_trait]
pub trait Orgs: Send + Sync + 'static {
type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;

async fn get(&mut self, oui: u64) -> Result<OrgResV1, Self::Error>;
async fn list(&mut self) -> Result<Vec<OrgV1>, Self::Error>;
async fn enable(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn disable(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn get(&mut self, oui: u64) -> Result<OrgResV1, ClientError>;
async fn list(&mut self) -> Result<Vec<OrgV1>, ClientError>;
async fn enable(&mut self, oui: u64) -> Result<(), ClientError>;
async fn disable(&mut self, oui: u64) -> Result<(), ClientError>;
}

#[derive(Clone)]
Expand All @@ -42,8 +40,6 @@ impl OrgClient {

#[async_trait]
impl Orgs for OrgClient {
type Error = ClientError;

async fn get(&mut self, oui: u64) -> Result<OrgResV1, ClientError> {
tracing::debug!(%oui, "retrieving org");

Expand Down
6 changes: 2 additions & 4 deletions iot_packet_verifier/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
verifier::Debiter,
};
use helium_crypto::PublicKeyBinary;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, SolanaRpcError};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
Expand Down Expand Up @@ -62,16 +62,14 @@ impl<S> Debiter for BalanceCache<S>
where
S: SolanaNetwork,
{
type Error = S::Error;

/// Debits the balance from the cache, returning the remaining balance as an
/// option if there was enough and none otherwise.
async fn debit_if_sufficient(
&self,
payer: &PublicKeyBinary,
amount: u64,
trigger_balance_check_threshold: u64,
) -> Result<Option<u64>, S::Error> {
) -> Result<Option<u64>, SolanaRpcError> {
let mut payer_accounts = self.payer_accounts.lock().await;

// Fetch the balance if we haven't seen the payer before
Expand Down
35 changes: 17 additions & 18 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
},
};
use futures::{future::LocalBoxFuture, TryFutureExt};
use solana::{burn::SolanaNetwork, GetSignature};
use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError};
use std::time::Duration;
use task_manager::ManagedTask;
use tokio::time::{self, MissedTickBehavior};
Expand Down Expand Up @@ -37,15 +37,15 @@ where
}

#[derive(thiserror::Error, Debug)]
pub enum BurnError<S> {
pub enum BurnError {
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("Sql error: {0}")]
SqlError(#[from] sqlx::Error),
#[error("Solana error: {0}")]
SolanaError(S),
SolanaError(#[from] SolanaRpcError),
#[error("Confirm pending transaction error: {0}")]
ConfirmPendingError(#[from] ConfirmPendingError<S>),
ConfirmPendingError(#[from] ConfirmPendingError),
}

impl<P, S> Burner<P, S> {
Expand All @@ -69,32 +69,31 @@ where
P: PendingTables + Send + Sync + 'static,
S: SolanaNetwork,
{
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError<S::Error>> {
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError> {
tracing::info!("Starting burner");
let mut burn_timer = time::interval(self.burn_period);
burn_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
#[rustfmt::skip]
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
}
}
tracing::info!("Stopping burner");
Ok(())
}

pub async fn burn(&mut self) -> Result<(), BurnError<S::Error>> {
pub async fn burn(&mut self) -> Result<(), BurnError> {
// Fetch the next payer and amount that should be burn. If no such burn
// exists, perform no action.
let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else {
Expand Down
4 changes: 2 additions & 2 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ where
self.minimum_allowed_balance,
&mut transaction,
reports,
&self.valid_packets,
&self.invalid_packets,
&mut self.valid_packets,
&mut self.invalid_packets,
)
.await?;
transaction.commit().await?;
Expand Down
20 changes: 10 additions & 10 deletions iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, SolanaRpcError};
use solana_sdk::signature::Signature;
use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction};
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -45,20 +45,20 @@ pub trait PendingTables {
}

#[derive(thiserror::Error, Debug)]
pub enum ConfirmPendingError<S> {
pub enum ConfirmPendingError {
#[error("Sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("Chrono error: {0}")]
ChronoError(#[from] chrono::OutOfRangeError),
#[error("Solana error: {0}")]
SolanaError(S),
SolanaError(#[from] SolanaRpcError),
}

pub async fn confirm_pending_txns<S>(
pending_tables: &impl PendingTables,
solana: &S,
balances: &BalanceStore,
) -> Result<(), ConfirmPendingError<S::Error>>
) -> Result<(), ConfirmPendingError>
where
S: SolanaNetwork,
{
Expand Down Expand Up @@ -165,7 +165,7 @@ impl PendingTables for PgPool {
}

#[async_trait]
impl AddPendingBurn for &'_ mut Transaction<'_, Postgres> {
impl AddPendingBurn for Transaction<'_, Postgres> {
async fn add_burned_amount(
&mut self,
payer: &PublicKeyBinary,
Expand Down Expand Up @@ -387,6 +387,7 @@ impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables {

#[cfg(test)]
mod test {

use crate::balances::PayerAccount;

use super::*;
Expand All @@ -397,11 +398,10 @@ mod test {

#[async_trait]
impl SolanaNetwork for MockConfirmed {
type Error = std::convert::Infallible;
type Transaction = Signature;

#[allow(clippy::diverging_sub_expression)]
async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result<u64, Self::Error> {
async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result<u64, SolanaRpcError> {
unreachable!()
}

Expand All @@ -410,19 +410,19 @@ mod test {
&self,
_payer: &PublicKeyBinary,
_amount: u64,
) -> Result<Self::Transaction, Self::Error> {
) -> Result<Self::Transaction, SolanaRpcError> {
unreachable!()
}

#[allow(clippy::diverging_sub_expression)]
async fn submit_transaction(
&self,
_transaction: &Self::Transaction,
) -> Result<(), Self::Error> {
) -> Result<(), SolanaRpcError> {
unreachable!()
}

async fn confirm_transaction(&self, txn: &Signature) -> Result<bool, Self::Error> {
async fn confirm_transaction(&self, txn: &Signature) -> Result<bool, SolanaRpcError> {
Ok(self.0.contains(txn))
}
}
Expand Down
Loading