diff --git a/Cargo.lock b/Cargo.lock index 229324863..faafe20f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3316,6 +3316,7 @@ dependencies = [ "helium-proto", "http", "http-serde", + "metrics", "poc-metrics", "prost", "serde", @@ -3901,6 +3902,7 @@ dependencies = [ "helium-proto", "http", "http-serde", + "metrics", "mobile-config", "poc-metrics", "prost", diff --git a/iot_packet_verifier/Cargo.toml b/iot_packet_verifier/Cargo.toml index 3cfe06477..3a6197c07 100644 --- a/iot_packet_verifier/Cargo.toml +++ b/iot_packet_verifier/Cargo.toml @@ -18,6 +18,7 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} +metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} diff --git a/iot_packet_verifier/pkg/settings-template.toml b/iot_packet_verifier/pkg/settings-template.toml index e69cf8583..e41c3c0b0 100644 --- a/iot_packet_verifier/pkg/settings-template.toml +++ b/iot_packet_verifier/pkg/settings-template.toml @@ -23,8 +23,12 @@ burn_period = 1 enable_solana_integration = "false" # Minimum number of DC left in a balance before we disable the organization. -# Defaults to 35_000_000 DC, which equates to $35 -minimum_allowed_balance = 35_000_000 +# Defaults to 3_500_000 DC, which equates to $35 +minimum_allowed_balance = 3_500_000 + +# How often we should check the organizations to see if they have repleneshed +# their funds in minutes. Defaults to 30 minutes. +monitor_funds_period = 30 [solana] # Solana RPC. This may contain a secret diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 777b59f0f..b304bd87f 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -64,7 +64,7 @@ where return Ok(()); }; - tracing::info!("Burning {amount} DC from {payer}"); + tracing::info!(%amount, %payer, "Burning DC"); let amount = amount as u64; @@ -86,6 +86,8 @@ where // Zero the balance in order to force a reset: balances.balance = 0; + metrics::counter!("burned", amount, "payer" => payer.to_string()); + Ok(()) } } diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 26ef0af01..c8bac3ae4 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -15,12 +15,12 @@ use file_store::{ use futures_util::TryFutureExt; use solana::SolanaRpc; use sqlx::{Pool, Postgres}; -use std::sync::Arc; -use tokio::sync::mpsc::Receiver; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{mpsc::Receiver, Mutex}; struct Daemon { pool: Pool, - verifier: Verifier>>, CachedOrgClient>, + verifier: Verifier>>, Arc>>, report_files: Receiver>, valid_packets: FileSinkClient, invalid_packets: FileSinkClient, @@ -50,7 +50,7 @@ impl Daemon { &mut self, report_file: FileInfoStream, ) -> Result<()> { - tracing::info!("Verifying file: {}", report_file.file_info); + tracing::info!(file = %report_file.file_info, "Verifying file"); let mut transaction = self.pool.begin().await?; let reports = report_file.into_stream(&mut transaction).await?; @@ -106,7 +106,12 @@ impl Cmd { let balances = BalanceCache::new(&mut pool, solana.clone()).await?; // Set up the balance burner: - let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana); + let burner = Burner::new( + pool.clone(), + &balances, + settings.burn_period, + solana.clone(), + ); let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); let file_upload = @@ -150,6 +155,7 @@ impl Cmd { .await?; let config_keypair = settings.config_keypair()?; + let config_server = CachedOrgClient::new(org_client, config_keypair); let verifier_daemon = Daemon { pool, report_files, @@ -157,7 +163,7 @@ impl Cmd { invalid_packets, verifier: Verifier { debiter: balances, - config_server: CachedOrgClient::new(org_client, config_keypair), + config_server: config_server.clone(), }, minimum_allowed_balance: settings.minimum_allowed_balance, }; @@ -174,6 +180,14 @@ impl Cmd { invalid_packets_server .run(&shutdown_listener) .map_err(Error::from), + CachedOrgClient::monitor_funds( + config_server, + solana, + settings.minimum_allowed_balance, + Duration::from_secs(60 * settings.monitor_funds_period), + shutdown_listener.clone(), + ) + .map_err(Error::from), source_join_handle.map_err(Error::from), )?; diff --git a/iot_packet_verifier/src/settings.rs b/iot_packet_verifier/src/settings.rs index fada39635..4a4b3c676 100644 --- a/iot_packet_verifier/src/settings.rs +++ b/iot_packet_verifier/src/settings.rs @@ -31,6 +31,10 @@ pub struct Settings { pub solana: Option, #[serde(default = "default_start_after")] pub start_after: u64, + /// Number of minutes we should sleep before checking to re-enable + /// any disabled orgs. + #[serde(default = "default_monitor_funds_period")] + pub monitor_funds_period: u64, } pub fn default_start_after() -> u64 { @@ -46,7 +50,11 @@ pub fn default_log() -> String { } pub fn default_minimum_allowed_balance() -> u64 { - 35_000_000 + 3_500_000 +} + +pub fn default_monitor_funds_period() -> u64 { + 30 } impl Settings { diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index f5d1fe7fa..558417af0 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -12,19 +12,23 @@ use helium_proto::services::{ }; use helium_proto::{ services::{ - iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1}, + iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1, OrgListReqV1}, Channel, }, Message, }; +use solana::SolanaNetwork; use std::{ collections::{hash_map::Entry, HashMap}, convert::Infallible, fmt::Debug, - mem, sync::Arc, }; -use tokio::sync::Mutex; +use tokio::{ + sync::Mutex, + task::JoinError, + time::{sleep_until, Duration, Instant}, +}; pub struct Verifier { pub debiter: D, @@ -83,7 +87,7 @@ where .await .map_err(VerificationError::DebitError)?; - if remaining_balance.is_some() { + if let Some(remaining_balance) = remaining_balance { pending_burns .add_burned_amount(&payer, debit_amount) .await @@ -98,6 +102,13 @@ where }) .await .map_err(VerificationError::ValidPacketWriterError)?; + + if remaining_balance < minimum_allowed_balance { + self.config_server + .disable_org(report.oui) + .await + .map_err(VerificationError::ConfigError)?; + } } else { invalid_packets .write(InvalidPacket { @@ -109,13 +120,6 @@ where .await .map_err(VerificationError::InvalidPacketWriterError)?; } - match remaining_balance { - Some(remaining_balance) if remaining_balance >= minimum_allowed_balance => { - self.config_server.enable_org(report.oui).await - } - _ => self.config_server.disable_org(report.oui).await, - } - .map_err(VerificationError::ConfigError)? } Ok(()) @@ -164,14 +168,12 @@ pub trait ConfigServer { type Error; async fn fetch_org( - &mut self, + &self, oui: u64, cache: &mut HashMap, ) -> Result; - async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>; - - async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error>; + async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>; } // TODO: Move this somewhere else @@ -180,74 +182,146 @@ pub trait ConfigServer { // consistent with BalanceCache pub struct CachedOrgClient { pub keypair: Keypair, - pub enabled_clients: HashMap, pub client: OrgClient, } impl CachedOrgClient { - pub fn new(client: OrgClient, keypair: Keypair) -> Self { - CachedOrgClient { - keypair, - enabled_clients: HashMap::new(), - client, + pub fn new(client: OrgClient, keypair: Keypair) -> Arc> { + Arc::new(Mutex::new(CachedOrgClient { keypair, client })) + } + + async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { + tracing::info!(%oui, "enabling org"); + + let mut req = OrgEnableReqV1 { + oui, + timestamp: Utc::now().timestamp_millis() as u64, + signer: self.keypair.public_key().into(), + signature: vec![], + }; + let signature = self.keypair.sign(&req.encode_to_vec())?; + req.signature = signature; + let _ = self.client.enable(req).await?; + Ok(()) + } + + async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { + tracing::info!(%oui, "disabling org"); + + let mut req = OrgDisableReqV1 { + oui, + timestamp: Utc::now().timestamp_millis() as u64, + signer: self.keypair.public_key().into(), + signature: vec![], + }; + let signature = self.keypair.sign(&req.encode_to_vec())?; + req.signature = signature; + let _ = self.client.disable(req).await?; + Ok(()) + } + + pub fn monitor_funds( + client: Arc>, + solana: S, + minimum_allowed_balance: u64, + monitor_period: Duration, + shutdown: triggered::Listener, + ) -> impl std::future::Future>> + where + S: SolanaNetwork, + { + let join_handle = tokio::spawn(async move { + loop { + tracing::info!("Checking if any orgs need to be re-enabled"); + + // Fetch all disables orgs: + let orgs = client + .lock() + .await + .client + .list(OrgListReqV1 {}) + .await + .map_err(OrgClientError::RpcError)? + .into_inner(); + for org in orgs.orgs { + if org.locked { + let payer = PublicKeyBinary::from(org.payer); + if solana + .payer_balance(&payer) + .await + .map_err(MonitorError::SolanaError)? + >= minimum_allowed_balance + { + client.lock().await.enable_org(org.oui).await?; + } + } + } + // Sleep until we should re-check the monitor + sleep_until(Instant::now() + monitor_period).await; + } + }); + async move { + tokio::select! { + result = join_handle => match result { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(err) => Err(MonitorError::from(err)), + }, + _ = shutdown => Ok(()) + } } } } +#[derive(thiserror::Error, Debug)] +pub enum MonitorError { + #[error("Join error: {0}")] + JoinError(#[from] JoinError), + #[error("Org client error: {0}")] + OrcClientError(#[from] OrgClientError), + #[error("Solana error: {0}")] + SolanaError(S), +} + #[derive(thiserror::Error, Debug)] pub enum OrgClientError { #[error("Rpc error: {0}")] RpcError(#[from] tonic::Status), #[error("Crypto error: {0}")] CryptoError(#[from] helium_crypto::Error), + #[error("No org")] + NoOrg, } #[async_trait] -impl ConfigServer for CachedOrgClient { +impl ConfigServer for Arc> { type Error = OrgClientError; async fn fetch_org( - &mut self, + &self, oui: u64, cache: &mut HashMap, ) -> Result { if let Entry::Vacant(e) = cache.entry(oui) { let req = OrgGetReqV1 { oui }; - let pubkey = - PublicKeyBinary::from(self.client.get(req).await?.into_inner().org.unwrap().payer); + let pubkey = PublicKeyBinary::from( + self.lock() + .await + .client + .get(req) + .await? + .into_inner() + .org + .ok_or(OrgClientError::NoOrg)? + .payer, + ); e.insert(pubkey); } Ok(cache.get(&oui).unwrap().clone()) } - async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error> { - if !mem::replace(self.enabled_clients.entry(oui).or_insert(false), true) { - let mut req = OrgEnableReqV1 { - oui, - timestamp: Utc::now().timestamp_millis() as u64, - signer: self.keypair.public_key().into(), - signature: vec![], - }; - let signature = self.keypair.sign(&req.encode_to_vec())?; - req.signature = signature; - let _ = self.client.enable(req).await?; - } - Ok(()) - } - - async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error> { - if mem::replace(self.enabled_clients.entry(oui).or_insert(true), false) { - let mut req = OrgDisableReqV1 { - oui, - timestamp: Utc::now().timestamp_millis() as u64, - signer: self.keypair.public_key().into(), - signature: vec![], - }; - let signature = self.keypair.sign(&req.encode_to_vec())?; - req.signature = signature; - let _ = self.client.disable(req).await?; - } - Ok(()) + async fn disable_org(&self, oui: u64) -> Result<(), Self::Error> { + self.lock().await.disable_org(oui).await } } diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index d8f8156b5..d1eb1b22b 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -24,16 +24,16 @@ struct MockConfig { #[derive(Default)] struct MockConfigServer { - payers: HashMap, + payers: Arc>>, } impl MockConfigServer { - fn insert(&mut self, oui: u64, payer: PublicKeyBinary) { - self.payers.insert( + async fn insert(&self, oui: u64, payer: PublicKeyBinary) { + self.payers.lock().await.insert( oui, MockConfig { payer, - enabled: false, + enabled: true, }, ); } @@ -44,20 +44,15 @@ impl ConfigServer for MockConfigServer { type Error = (); async fn fetch_org( - &mut self, + &self, oui: u64, _cache: &mut HashMap, ) -> Result { - Ok(self.payers.get(&oui).unwrap().payer.clone()) + Ok(self.payers.lock().await.get(&oui).unwrap().payer.clone()) } - async fn enable_org(&mut self, oui: u64) -> Result<(), ()> { - self.payers.get_mut(&oui).unwrap().enabled = true; - Ok(()) - } - - async fn disable_org(&mut self, oui: u64) -> Result<(), ()> { - self.payers.get_mut(&oui).unwrap().enabled = false; + async fn disable_org(&self, oui: u64) -> Result<(), ()> { + self.payers.lock().await.get_mut(&oui).unwrap().enabled = false; Ok(()) } } @@ -170,15 +165,15 @@ async fn test_verifier() { packet_report(2, 0, 24, vec![7]), ]; // Set up orgs: - let mut orgs = MockConfigServer::default(); - orgs.insert(0_u64, PublicKeyBinary::from(vec![0])); - orgs.insert(1_u64, PublicKeyBinary::from(vec![1])); - orgs.insert(2_u64, PublicKeyBinary::from(vec![2])); + let orgs = MockConfigServer::default(); + orgs.insert(0_u64, PublicKeyBinary::from(vec![0])).await; + orgs.insert(1_u64, PublicKeyBinary::from(vec![1])).await; + orgs.insert(2_u64, PublicKeyBinary::from(vec![2])).await; // Set up balances: let mut balances = HashMap::new(); balances.insert(PublicKeyBinary::from(vec![0]), 3); - balances.insert(PublicKeyBinary::from(vec![1]), 4); - balances.insert(PublicKeyBinary::from(vec![2]), 1); + balances.insert(PublicKeyBinary::from(vec![1]), 5); + balances.insert(PublicKeyBinary::from(vec![2]), 2); let balances = InstantBurnedBalance(Arc::new(Mutex::new(balances))); // Set up output: let mut valid_packets = Vec::new(); @@ -192,7 +187,7 @@ async fn test_verifier() { // Run the verifier: verifier .verify( - 0, + 1, balances.clone(), stream::iter(packets), &mut valid_packets, @@ -220,9 +215,10 @@ async fn test_verifier() { assert_eq!(invalid_packets, vec![invalid_packet(1, vec![3]),]); // Verify that only org #0 is disabled: - assert!(!verifier.config_server.payers.get(&0).unwrap().enabled); - assert!(verifier.config_server.payers.get(&1).unwrap().enabled); - assert!(verifier.config_server.payers.get(&2).unwrap().enabled); + let payers = verifier.config_server.payers.lock().await; + assert!(!payers.get(&0).unwrap().enabled); + assert!(payers.get(&1).unwrap().enabled); + assert!(payers.get(&2).unwrap().enabled); } #[tokio::test] @@ -252,8 +248,8 @@ async fn test_end_to_end() { ); // Orgs: - let mut orgs = MockConfigServer::default(); - orgs.insert(0_u64, payer.clone()); + let orgs = MockConfigServer::default(); + orgs.insert(0_u64, payer.clone()).await; // Packet output: let mut valid_packets = Vec::new(); @@ -268,7 +264,7 @@ async fn test_end_to_end() { // Verify four packets, each costing one DC. The last one should be invalid verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![ packet_report(0, 0, BYTES_PER_DC as u32, vec![1]), @@ -283,7 +279,16 @@ async fn test_end_to_end() { .unwrap(); // Org 0 should be disabled now: - assert!(!verifier.config_server.payers.get(&0).unwrap().enabled,); + assert!( + !verifier + .config_server + .payers + .lock() + .await + .get(&0) + .unwrap() + .enabled + ); assert_eq!( valid_packets, @@ -339,7 +344,7 @@ async fn test_end_to_end() { verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![packet_report(0, 4, BYTES_PER_DC as u32, vec![5])]), &mut valid_packets, @@ -365,7 +370,7 @@ async fn test_end_to_end() { // should clear verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![ packet_report(0, 5, 2 * BYTES_PER_DC as u32, vec![6]), @@ -393,7 +398,4 @@ async fn test_end_to_end() { }; assert_eq!(balance.balance, 1); assert_eq!(balance.burned, 1); - - // The last packet was valid, so the org should be enabled now - assert!(verifier.config_server.payers.get(&0).unwrap().enabled); } diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 6dc4edcdd..60f44e47d 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -18,6 +18,7 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} +metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 26dc00e45..c9adc4672 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -88,6 +88,8 @@ where .await .map_err(BurnError::SolanaError)?; + metrics::counter!("burned", total_dcs, "payer" => payer.to_string()); + // Delete from the data transfer session and write out to S3 sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") diff --git a/solana/src/lib.rs b/solana/src/lib.rs index 8e1bda8bf..f100c25c1 100644 --- a/solana/src/lib.rs +++ b/solana/src/lib.rs @@ -107,7 +107,7 @@ impl SolanaNetwork for SolanaRpc { ); let Ok(account_data) = self.provider.get_account_data(&escrow_account).await else { // If the account is empty, it has no DC - tracing::info!("{payer} has no account, therefore no balance"); + tracing::info!(%payer, "Account not found, therefore no balance"); return Ok(0); }; let account_layout = spl_token::state::Account::unpack(account_data.as_slice())?; @@ -201,8 +201,8 @@ impl SolanaNetwork for SolanaRpc { .await?; tracing::info!( - "Successfully burned data credits. Transaction: {}", - signature + transaction = %signature, + "Successfully burned data credits", ); Ok(())