From 41d1b2ce111836965cc282b3e1b2fde14c6e1e90 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:00:27 -0400 Subject: [PATCH 01/13] Monitor funds and re-enable disabled orgs --- iot_packet_verifier/src/daemon.rs | 23 ++- iot_packet_verifier/src/settings.rs | 8 +- iot_packet_verifier/src/verifier.rs | 193 +++++++++++++----- .../tests/integration_tests.rs | 66 +++--- 4 files changed, 201 insertions(+), 89 deletions(-) diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 26ef0af01..0d4694778 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -15,12 +15,12 @@ use file_store::{ use futures_util::TryFutureExt; use solana::SolanaRpc; use sqlx::{Pool, Postgres}; -use std::sync::Arc; -use tokio::sync::mpsc::Receiver; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{mpsc::Receiver, Mutex}; struct Daemon { pool: Pool, - verifier: Verifier>>, CachedOrgClient>, + verifier: Verifier>>, Arc>>, report_files: Receiver>, valid_packets: FileSinkClient, invalid_packets: FileSinkClient, @@ -106,7 +106,12 @@ impl Cmd { let balances = BalanceCache::new(&mut pool, solana.clone()).await?; // Set up the balance burner: - let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana); + let burner = Burner::new( + pool.clone(), + &balances, + settings.burn_period, + solana.clone(), + ); let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); let file_upload = @@ -150,6 +155,7 @@ impl Cmd { .await?; let config_keypair = settings.config_keypair()?; + let config_server = CachedOrgClient::new(org_client, config_keypair).await?; let verifier_daemon = Daemon { pool, report_files, @@ -157,7 +163,7 @@ impl Cmd { invalid_packets, verifier: Verifier { debiter: balances, - config_server: CachedOrgClient::new(org_client, config_keypair), + config_server: config_server.clone(), }, minimum_allowed_balance: settings.minimum_allowed_balance, }; @@ -174,6 +180,13 @@ impl Cmd { invalid_packets_server .run(&shutdown_listener) .map_err(Error::from), + CachedOrgClient::monitor_funds( + config_server, + solana, + settings.minimum_allowed_balance, + Duration::from_secs(60 * settings.monitor_funds_period) + ) + .map_err(Error::from), source_join_handle.map_err(Error::from), )?; diff --git a/iot_packet_verifier/src/settings.rs b/iot_packet_verifier/src/settings.rs index fada39635..7b5cac810 100644 --- a/iot_packet_verifier/src/settings.rs +++ b/iot_packet_verifier/src/settings.rs @@ -31,6 +31,8 @@ pub struct Settings { pub solana: Option, #[serde(default = "default_start_after")] pub start_after: u64, + #[serde(default = "default_monitor_funds_period")] + pub monitor_funds_period: u64, } pub fn default_start_after() -> u64 { @@ -46,7 +48,11 @@ pub fn default_log() -> String { } pub fn default_minimum_allowed_balance() -> u64 { - 35_000_000 + 3_500_000 +} + +pub fn default_monitor_funds_period() -> u64 { + 30 } impl Settings { diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index f5d1fe7fa..6a10bda0a 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -12,19 +12,23 @@ use helium_proto::services::{ }; use helium_proto::{ services::{ - iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1}, + iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1, OrgListReqV1}, Channel, }, Message, }; +use solana::SolanaNetwork; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, convert::Infallible, fmt::Debug, - mem, sync::Arc, }; -use tokio::sync::Mutex; +use tokio::{ + sync::Mutex, + task::JoinError, + time::{sleep_until, Duration, Instant}, +}; pub struct Verifier { pub debiter: D, @@ -83,7 +87,7 @@ where .await .map_err(VerificationError::DebitError)?; - if remaining_balance.is_some() { + if let Some(remaining_balance) = remaining_balance { pending_burns .add_burned_amount(&payer, debit_amount) .await @@ -98,6 +102,13 @@ where }) .await .map_err(VerificationError::ValidPacketWriterError)?; + + if remaining_balance < minimum_allowed_balance { + self.config_server + .disable_org(report.oui) + .await + .map_err(VerificationError::ConfigError)?; + } } else { invalid_packets .write(InvalidPacket { @@ -109,13 +120,6 @@ where .await .map_err(VerificationError::InvalidPacketWriterError)?; } - match remaining_balance { - Some(remaining_balance) if remaining_balance >= minimum_allowed_balance => { - self.config_server.enable_org(report.oui).await - } - _ => self.config_server.disable_org(report.oui).await, - } - .map_err(VerificationError::ConfigError)? } Ok(()) @@ -164,14 +168,14 @@ pub trait ConfigServer { type Error; async fn fetch_org( - &mut self, + &self, oui: u64, cache: &mut HashMap, ) -> Result; - async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>; + // async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>; - async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error>; + async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>; } // TODO: Move this somewhere else @@ -180,20 +184,124 @@ pub trait ConfigServer { // consistent with BalanceCache pub struct CachedOrgClient { pub keypair: Keypair, - pub enabled_clients: HashMap, + pub disabled_clients: HashSet, pub client: OrgClient, } impl CachedOrgClient { - pub fn new(client: OrgClient, keypair: Keypair) -> Self { - CachedOrgClient { + pub async fn new( + mut client: OrgClient, + keypair: Keypair, + ) -> Result>, tonic::Status> { + // Fetch all clients and set them as disabled; + let mut disabled_clients = HashSet::new(); + let orgs = client.list(OrgListReqV1 {}).await?.into_inner(); + for org in orgs.orgs { + if org.locked { + disabled_clients.insert(org.oui); + } + } + Ok(Arc::new(Mutex::new(CachedOrgClient { keypair, - enabled_clients: HashMap::new(), + disabled_clients, client, + }))) + } + + async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { + tracing::info!("Enabling org: {oui}"); + + self.disabled_clients.remove(&oui); + let mut req = OrgEnableReqV1 { + oui, + timestamp: Utc::now().timestamp_millis() as u64, + signer: self.keypair.public_key().into(), + signature: vec![], + }; + let signature = self.keypair.sign(&req.encode_to_vec())?; + req.signature = signature; + let _ = self.client.enable(req).await?; + Ok(()) + } + + async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { + tracing::info!("Disabling org: {oui}"); + + self.disabled_clients.insert(oui); + let mut req = OrgDisableReqV1 { + oui, + timestamp: Utc::now().timestamp_millis() as u64, + signer: self.keypair.public_key().into(), + signature: vec![], + }; + let signature = self.keypair.sign(&req.encode_to_vec())?; + req.signature = signature; + let _ = self.client.disable(req).await?; + Ok(()) + } + + pub fn monitor_funds( + client: Arc>, + solana: S, + minimum_allowed_balance: u64, + monitor_period: Duration, + ) -> impl std::future::Future>> + where + S: SolanaNetwork, + { + let join_handle = tokio::spawn(async move { + // TODO: Move this somewhere else, use retainer + let mut org_cache = HashMap::new(); + loop { + tracing::info!("Checking if any orgs need to be re-enabled"); + + // Go through and check balances of all orgs and re-enable if they + // have a greater than minimum balance. + // This could almost certainly become a Stream. + let mut to_enable = Vec::new(); + let disabled_clients = client.lock().await.disabled_clients.clone(); + for disabled_org in disabled_clients { + // Check balance + let payer = client.fetch_org(disabled_org, &mut org_cache).await?; + if solana + .payer_balance(&payer) + .await + .map_err(MonitorError::SolanaError)? + >= minimum_allowed_balance + { + to_enable.push(disabled_org); + } + } + + // Enable all the orgs that need to be: + for org in to_enable { + client.lock().await.enable_org(org).await?; + } + + // Sleep until we should re-check the monitor + sleep_until(Instant::now() + monitor_period).await; + } + }); + async move { + match join_handle.await { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(err) => Err(MonitorError::from(err)), + } } } } +#[derive(thiserror::Error, Debug)] +pub enum MonitorError { + #[error("Join error: {0}")] + JoinError(#[from] JoinError), + #[error("Org client error: {0}")] + OrcClientError(#[from] OrgClientError), + #[error("Solana error: {0}")] + SolanaError(S), +} + #[derive(thiserror::Error, Debug)] pub enum OrgClientError { #[error("Rpc error: {0}")] @@ -203,51 +311,34 @@ pub enum OrgClientError { } #[async_trait] -impl ConfigServer for CachedOrgClient { +impl ConfigServer for Arc> { type Error = OrgClientError; async fn fetch_org( - &mut self, + &self, oui: u64, cache: &mut HashMap, ) -> Result { if let Entry::Vacant(e) = cache.entry(oui) { let req = OrgGetReqV1 { oui }; - let pubkey = - PublicKeyBinary::from(self.client.get(req).await?.into_inner().org.unwrap().payer); + let pubkey = PublicKeyBinary::from( + self.lock() + .await + .client + .get(req) + .await? + .into_inner() + .org + .unwrap() + .payer, + ); e.insert(pubkey); } Ok(cache.get(&oui).unwrap().clone()) } - async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error> { - if !mem::replace(self.enabled_clients.entry(oui).or_insert(false), true) { - let mut req = OrgEnableReqV1 { - oui, - timestamp: Utc::now().timestamp_millis() as u64, - signer: self.keypair.public_key().into(), - signature: vec![], - }; - let signature = self.keypair.sign(&req.encode_to_vec())?; - req.signature = signature; - let _ = self.client.enable(req).await?; - } - Ok(()) - } - - async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error> { - if mem::replace(self.enabled_clients.entry(oui).or_insert(true), false) { - let mut req = OrgDisableReqV1 { - oui, - timestamp: Utc::now().timestamp_millis() as u64, - signer: self.keypair.public_key().into(), - signature: vec![], - }; - let signature = self.keypair.sign(&req.encode_to_vec())?; - req.signature = signature; - let _ = self.client.disable(req).await?; - } - Ok(()) + async fn disable_org(&self, oui: u64) -> Result<(), Self::Error> { + self.lock().await.disable_org(oui).await } } diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index d8f8156b5..d1eb1b22b 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -24,16 +24,16 @@ struct MockConfig { #[derive(Default)] struct MockConfigServer { - payers: HashMap, + payers: Arc>>, } impl MockConfigServer { - fn insert(&mut self, oui: u64, payer: PublicKeyBinary) { - self.payers.insert( + async fn insert(&self, oui: u64, payer: PublicKeyBinary) { + self.payers.lock().await.insert( oui, MockConfig { payer, - enabled: false, + enabled: true, }, ); } @@ -44,20 +44,15 @@ impl ConfigServer for MockConfigServer { type Error = (); async fn fetch_org( - &mut self, + &self, oui: u64, _cache: &mut HashMap, ) -> Result { - Ok(self.payers.get(&oui).unwrap().payer.clone()) + Ok(self.payers.lock().await.get(&oui).unwrap().payer.clone()) } - async fn enable_org(&mut self, oui: u64) -> Result<(), ()> { - self.payers.get_mut(&oui).unwrap().enabled = true; - Ok(()) - } - - async fn disable_org(&mut self, oui: u64) -> Result<(), ()> { - self.payers.get_mut(&oui).unwrap().enabled = false; + async fn disable_org(&self, oui: u64) -> Result<(), ()> { + self.payers.lock().await.get_mut(&oui).unwrap().enabled = false; Ok(()) } } @@ -170,15 +165,15 @@ async fn test_verifier() { packet_report(2, 0, 24, vec![7]), ]; // Set up orgs: - let mut orgs = MockConfigServer::default(); - orgs.insert(0_u64, PublicKeyBinary::from(vec![0])); - orgs.insert(1_u64, PublicKeyBinary::from(vec![1])); - orgs.insert(2_u64, PublicKeyBinary::from(vec![2])); + let orgs = MockConfigServer::default(); + orgs.insert(0_u64, PublicKeyBinary::from(vec![0])).await; + orgs.insert(1_u64, PublicKeyBinary::from(vec![1])).await; + orgs.insert(2_u64, PublicKeyBinary::from(vec![2])).await; // Set up balances: let mut balances = HashMap::new(); balances.insert(PublicKeyBinary::from(vec![0]), 3); - balances.insert(PublicKeyBinary::from(vec![1]), 4); - balances.insert(PublicKeyBinary::from(vec![2]), 1); + balances.insert(PublicKeyBinary::from(vec![1]), 5); + balances.insert(PublicKeyBinary::from(vec![2]), 2); let balances = InstantBurnedBalance(Arc::new(Mutex::new(balances))); // Set up output: let mut valid_packets = Vec::new(); @@ -192,7 +187,7 @@ async fn test_verifier() { // Run the verifier: verifier .verify( - 0, + 1, balances.clone(), stream::iter(packets), &mut valid_packets, @@ -220,9 +215,10 @@ async fn test_verifier() { assert_eq!(invalid_packets, vec![invalid_packet(1, vec![3]),]); // Verify that only org #0 is disabled: - assert!(!verifier.config_server.payers.get(&0).unwrap().enabled); - assert!(verifier.config_server.payers.get(&1).unwrap().enabled); - assert!(verifier.config_server.payers.get(&2).unwrap().enabled); + let payers = verifier.config_server.payers.lock().await; + assert!(!payers.get(&0).unwrap().enabled); + assert!(payers.get(&1).unwrap().enabled); + assert!(payers.get(&2).unwrap().enabled); } #[tokio::test] @@ -252,8 +248,8 @@ async fn test_end_to_end() { ); // Orgs: - let mut orgs = MockConfigServer::default(); - orgs.insert(0_u64, payer.clone()); + let orgs = MockConfigServer::default(); + orgs.insert(0_u64, payer.clone()).await; // Packet output: let mut valid_packets = Vec::new(); @@ -268,7 +264,7 @@ async fn test_end_to_end() { // Verify four packets, each costing one DC. The last one should be invalid verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![ packet_report(0, 0, BYTES_PER_DC as u32, vec![1]), @@ -283,7 +279,16 @@ async fn test_end_to_end() { .unwrap(); // Org 0 should be disabled now: - assert!(!verifier.config_server.payers.get(&0).unwrap().enabled,); + assert!( + !verifier + .config_server + .payers + .lock() + .await + .get(&0) + .unwrap() + .enabled + ); assert_eq!( valid_packets, @@ -339,7 +344,7 @@ async fn test_end_to_end() { verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![packet_report(0, 4, BYTES_PER_DC as u32, vec![5])]), &mut valid_packets, @@ -365,7 +370,7 @@ async fn test_end_to_end() { // should clear verifier .verify( - 0, + 1, pending_burns.clone(), stream::iter(vec![ packet_report(0, 5, 2 * BYTES_PER_DC as u32, vec![6]), @@ -393,7 +398,4 @@ async fn test_end_to_end() { }; assert_eq!(balance.balance, 1); assert_eq!(balance.burned, 1); - - // The last packet was valid, so the org should be enabled now - assert!(verifier.config_server.payers.get(&0).unwrap().enabled); } From b915972633441b6ead49096a07f7922a6da6c485 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:04:20 -0400 Subject: [PATCH 02/13] Slight change --- iot_packet_verifier/src/verifier.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 6a10bda0a..b55134cfe 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -258,7 +258,6 @@ impl CachedOrgClient { // Go through and check balances of all orgs and re-enable if they // have a greater than minimum balance. // This could almost certainly become a Stream. - let mut to_enable = Vec::new(); let disabled_clients = client.lock().await.disabled_clients.clone(); for disabled_org in disabled_clients { // Check balance @@ -269,15 +268,9 @@ impl CachedOrgClient { .map_err(MonitorError::SolanaError)? >= minimum_allowed_balance { - to_enable.push(disabled_org); + client.lock().await.enable_org(disabled_org).await?; } } - - // Enable all the orgs that need to be: - for org in to_enable { - client.lock().await.enable_org(org).await?; - } - // Sleep until we should re-check the monitor sleep_until(Instant::now() + monitor_period).await; } From c67afc645eea046c59d3cb482e11c332f534a575 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:21:41 -0400 Subject: [PATCH 03/13] Remove comment --- iot_packet_verifier/src/verifier.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index b55134cfe..542e6bcdd 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -173,8 +173,6 @@ pub trait ConfigServer { cache: &mut HashMap, ) -> Result; - // async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>; - async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>; } From 261a1b97cf5fd7326bf65761fa93370fec82ccb2 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:29:06 -0400 Subject: [PATCH 04/13] Add shutdown listener to funds monitor --- iot_packet_verifier/src/daemon.rs | 3 ++- iot_packet_verifier/src/verifier.rs | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 0d4694778..f20e4a347 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -184,7 +184,8 @@ impl Cmd { config_server, solana, settings.minimum_allowed_balance, - Duration::from_secs(60 * settings.monitor_funds_period) + Duration::from_secs(60 * settings.monitor_funds_period), + shutdown_listener.clone(), ) .map_err(Error::from), source_join_handle.map_err(Error::from), diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 542e6bcdd..177b242c9 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -243,6 +243,7 @@ impl CachedOrgClient { solana: S, minimum_allowed_balance: u64, monitor_period: Duration, + shutdown: triggered::Listener, ) -> impl std::future::Future>> where S: SolanaNetwork, @@ -274,10 +275,13 @@ impl CachedOrgClient { } }); async move { - match join_handle.await { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err), - Err(err) => Err(MonitorError::from(err)), + tokio::select! { + result = join_handle => match result { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(err) => Err(MonitorError::from(err)), + }, + _ = shutdown => Ok(()) } } } From fa99c410aca6cc55512ecbdb3e074616e241ac99 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:36:02 -0400 Subject: [PATCH 05/13] Add updates to settings-template --- iot_packet_verifier/pkg/settings-template.toml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/iot_packet_verifier/pkg/settings-template.toml b/iot_packet_verifier/pkg/settings-template.toml index e69cf8583..f6d090a1a 100644 --- a/iot_packet_verifier/pkg/settings-template.toml +++ b/iot_packet_verifier/pkg/settings-template.toml @@ -23,8 +23,12 @@ burn_period = 1 enable_solana_integration = "false" # Minimum number of DC left in a balance before we disable the organization. -# Defaults to 35_000_000 DC, which equates to $35 -minimum_allowed_balance = 35_000_000 +# Defaults to 3_500_000 DC, which equates to $35 +minimum_allowed_balance = 3_500_000 + +# How often we should check the organizations to see if they have repleneshed +# their funds. Defaults to 30 minutes. +monitor_funds_period = 30 [solana] # Solana RPC. This may contain a secret From cd5fe8f1e4942362c67fbacdad5fa26e884ec543 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:36:38 -0400 Subject: [PATCH 06/13] Fix incorrect comment --- iot_packet_verifier/src/verifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 177b242c9..737cd718a 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -191,7 +191,7 @@ impl CachedOrgClient { mut client: OrgClient, keypair: Keypair, ) -> Result>, tonic::Status> { - // Fetch all clients and set them as disabled; + // Fetch all disables orgs: let mut disabled_clients = HashSet::new(); let orgs = client.list(OrgListReqV1 {}).await?.into_inner(); for org in orgs.orgs { From 4df361e8367f56157b9826b3e0e3e9d47492c3ed Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 13:52:41 -0400 Subject: [PATCH 07/13] StRuCtUrEd LoGgInG --- iot_packet_verifier/src/burner.rs | 2 +- iot_packet_verifier/src/daemon.rs | 2 +- iot_packet_verifier/src/verifier.rs | 8 +++++--- solana/src/lib.rs | 6 +++--- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 777b59f0f..1954fcdc0 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -64,7 +64,7 @@ where return Ok(()); }; - tracing::info!("Burning {amount} DC from {payer}"); + tracing::info!(%amount, %payer, "Burning DC"); let amount = amount as u64; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index f20e4a347..b49548ce5 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -50,7 +50,7 @@ impl Daemon { &mut self, report_file: FileInfoStream, ) -> Result<()> { - tracing::info!("Verifying file: {}", report_file.file_info); + tracing::info!(file = %report_file.file_info, "Verifying file"); let mut transaction = self.pool.begin().await?; let reports = report_file.into_stream(&mut transaction).await?; diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 737cd718a..3a8f3def3 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -207,7 +207,7 @@ impl CachedOrgClient { } async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { - tracing::info!("Enabling org: {oui}"); + tracing::info!(%oui, "enabling org"); self.disabled_clients.remove(&oui); let mut req = OrgEnableReqV1 { @@ -223,7 +223,7 @@ impl CachedOrgClient { } async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { - tracing::info!("Disabling org: {oui}"); + tracing::info!(%oui, "disabling org"); self.disabled_clients.insert(oui); let mut req = OrgDisableReqV1 { @@ -303,6 +303,8 @@ pub enum OrgClientError { RpcError(#[from] tonic::Status), #[error("Crypto error: {0}")] CryptoError(#[from] helium_crypto::Error), + #[error("No org")] + NoOrg, } #[async_trait] @@ -324,7 +326,7 @@ impl ConfigServer for Arc> { .await? .into_inner() .org - .unwrap() + .ok_or(OrgClientError::NoOrg)? .payer, ); e.insert(pubkey); diff --git a/solana/src/lib.rs b/solana/src/lib.rs index 8e1bda8bf..f100c25c1 100644 --- a/solana/src/lib.rs +++ b/solana/src/lib.rs @@ -107,7 +107,7 @@ impl SolanaNetwork for SolanaRpc { ); let Ok(account_data) = self.provider.get_account_data(&escrow_account).await else { // If the account is empty, it has no DC - tracing::info!("{payer} has no account, therefore no balance"); + tracing::info!(%payer, "Account not found, therefore no balance"); return Ok(0); }; let account_layout = spl_token::state::Account::unpack(account_data.as_slice())?; @@ -201,8 +201,8 @@ impl SolanaNetwork for SolanaRpc { .await?; tracing::info!( - "Successfully burned data credits. Transaction: {}", - signature + transaction = %signature, + "Successfully burned data credits", ); Ok(()) From eace807ef660a70140b818f9d8ca7235608325c3 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 20:30:36 -0400 Subject: [PATCH 08/13] More specific comments --- iot_packet_verifier/pkg/settings-template.toml | 2 +- iot_packet_verifier/src/settings.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/iot_packet_verifier/pkg/settings-template.toml b/iot_packet_verifier/pkg/settings-template.toml index f6d090a1a..e41c3c0b0 100644 --- a/iot_packet_verifier/pkg/settings-template.toml +++ b/iot_packet_verifier/pkg/settings-template.toml @@ -27,7 +27,7 @@ enable_solana_integration = "false" minimum_allowed_balance = 3_500_000 # How often we should check the organizations to see if they have repleneshed -# their funds. Defaults to 30 minutes. +# their funds in minutes. Defaults to 30 minutes. monitor_funds_period = 30 [solana] diff --git a/iot_packet_verifier/src/settings.rs b/iot_packet_verifier/src/settings.rs index 7b5cac810..4a4b3c676 100644 --- a/iot_packet_verifier/src/settings.rs +++ b/iot_packet_verifier/src/settings.rs @@ -31,6 +31,8 @@ pub struct Settings { pub solana: Option, #[serde(default = "default_start_after")] pub start_after: u64, + /// Number of minutes we should sleep before checking to re-enable + /// any disabled orgs. #[serde(default = "default_monitor_funds_period")] pub monitor_funds_period: u64, } From b2b1f72d0761cb59dc13a0890c9b47ac36a116c3 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 20 Apr 2023 21:56:16 -0400 Subject: [PATCH 09/13] Just check every time... like a RUBE --- iot_packet_verifier/src/daemon.rs | 2 +- iot_packet_verifier/src/verifier.rs | 58 ++++++++++++----------------- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index b49548ce5..c8bac3ae4 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -155,7 +155,7 @@ impl Cmd { .await?; let config_keypair = settings.config_keypair()?; - let config_server = CachedOrgClient::new(org_client, config_keypair).await?; + let config_server = CachedOrgClient::new(org_client, config_keypair); let verifier_daemon = Daemon { pool, report_files, diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 3a8f3def3..ea2f94a76 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -182,34 +182,17 @@ pub trait ConfigServer { // consistent with BalanceCache pub struct CachedOrgClient { pub keypair: Keypair, - pub disabled_clients: HashSet, pub client: OrgClient, } impl CachedOrgClient { - pub async fn new( - mut client: OrgClient, - keypair: Keypair, - ) -> Result>, tonic::Status> { - // Fetch all disables orgs: - let mut disabled_clients = HashSet::new(); - let orgs = client.list(OrgListReqV1 {}).await?.into_inner(); - for org in orgs.orgs { - if org.locked { - disabled_clients.insert(org.oui); - } - } - Ok(Arc::new(Mutex::new(CachedOrgClient { - keypair, - disabled_clients, - client, - }))) + pub fn new(client: OrgClient, keypair: Keypair) -> Arc> { + Arc::new(Mutex::new(CachedOrgClient { keypair, client })) } async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { tracing::info!(%oui, "enabling org"); - self.disabled_clients.remove(&oui); let mut req = OrgEnableReqV1 { oui, timestamp: Utc::now().timestamp_millis() as u64, @@ -225,7 +208,6 @@ impl CachedOrgClient { async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> { tracing::info!(%oui, "disabling org"); - self.disabled_clients.insert(oui); let mut req = OrgDisableReqV1 { oui, timestamp: Utc::now().timestamp_millis() as u64, @@ -254,20 +236,28 @@ impl CachedOrgClient { loop { tracing::info!("Checking if any orgs need to be re-enabled"); - // Go through and check balances of all orgs and re-enable if they - // have a greater than minimum balance. - // This could almost certainly become a Stream. - let disabled_clients = client.lock().await.disabled_clients.clone(); - for disabled_org in disabled_clients { - // Check balance - let payer = client.fetch_org(disabled_org, &mut org_cache).await?; - if solana - .payer_balance(&payer) - .await - .map_err(MonitorError::SolanaError)? - >= minimum_allowed_balance - { - client.lock().await.enable_org(disabled_org).await?; + // Fetch all disables orgs: + let mut disabled_clients = HashSet::new(); + let orgs = client + .lock() + .await + .client + .list(OrgListReqV1 {}) + .await + .map_err(OrgClientError::RpcError)? + .into_inner(); + for org in orgs.orgs { + if org.locked { + disabled_clients.insert(org.oui); + let payer = client.fetch_org(org.oui, &mut org_cache).await?; + if solana + .payer_balance(&payer) + .await + .map_err(MonitorError::SolanaError)? + >= minimum_allowed_balance + { + client.lock().await.enable_org(org.oui).await?; + } } } // Sleep until we should re-check the monitor From 6c00e7fe1cfd74fb82428a17a06722638c7a11ee Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Fri, 21 Apr 2023 10:53:23 -0400 Subject: [PATCH 10/13] Remove disabled_clients HashSet --- iot_packet_verifier/src/verifier.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index ea2f94a76..0c5404bf0 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -237,7 +237,6 @@ impl CachedOrgClient { tracing::info!("Checking if any orgs need to be re-enabled"); // Fetch all disables orgs: - let mut disabled_clients = HashSet::new(); let orgs = client .lock() .await @@ -248,7 +247,6 @@ impl CachedOrgClient { .into_inner(); for org in orgs.orgs { if org.locked { - disabled_clients.insert(org.oui); let payer = client.fetch_org(org.oui, &mut org_cache).await?; if solana .payer_balance(&payer) From ce07a2637d36dd2a9bcabb43f359d79fdeeaab91 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Fri, 21 Apr 2023 10:56:45 -0400 Subject: [PATCH 11/13] Add metric for amount burned --- Cargo.lock | 1 + iot_packet_verifier/Cargo.toml | 1 + iot_packet_verifier/src/burner.rs | 2 ++ iot_packet_verifier/src/verifier.rs | 2 +- 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 229324863..758778830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3316,6 +3316,7 @@ dependencies = [ "helium-proto", "http", "http-serde", + "metrics", "poc-metrics", "prost", "serde", diff --git a/iot_packet_verifier/Cargo.toml b/iot_packet_verifier/Cargo.toml index 3cfe06477..3a6197c07 100644 --- a/iot_packet_verifier/Cargo.toml +++ b/iot_packet_verifier/Cargo.toml @@ -18,6 +18,7 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} +metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 1954fcdc0..b304bd87f 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -86,6 +86,8 @@ where // Zero the balance in order to force a reset: balances.balance = 0; + metrics::counter!("burned", amount, "payer" => payer.to_string()); + Ok(()) } } diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 0c5404bf0..6976ae1ff 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -19,7 +19,7 @@ use helium_proto::{ }; use solana::SolanaNetwork; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, convert::Infallible, fmt::Debug, sync::Arc, From ba38f8309e16da68c6a870fb4fabeb937fed7dac Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Fri, 21 Apr 2023 11:03:42 -0400 Subject: [PATCH 12/13] Add metric to mobile packet verifier --- Cargo.lock | 1 + mobile_packet_verifier/Cargo.toml | 1 + mobile_packet_verifier/src/burner.rs | 2 ++ 3 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 758778830..faafe20f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3902,6 +3902,7 @@ dependencies = [ "helium-proto", "http", "http-serde", + "metrics", "mobile-config", "poc-metrics", "prost", diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 6dc4edcdd..60f44e47d 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -18,6 +18,7 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} +metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 26dc00e45..c9adc4672 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -88,6 +88,8 @@ where .await .map_err(BurnError::SolanaError)?; + metrics::counter!("burned", total_dcs, "payer" => payer.to_string()); + // Delete from the data transfer session and write out to S3 sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") From 07e1fc3b487521a440b72958ae99211d0e14333a Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Fri, 21 Apr 2023 11:21:21 -0400 Subject: [PATCH 13/13] Remove org_cache --- iot_packet_verifier/src/verifier.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 6976ae1ff..558417af0 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -231,8 +231,6 @@ impl CachedOrgClient { S: SolanaNetwork, { let join_handle = tokio::spawn(async move { - // TODO: Move this somewhere else, use retainer - let mut org_cache = HashMap::new(); loop { tracing::info!("Checking if any orgs need to be re-enabled"); @@ -247,7 +245,7 @@ impl CachedOrgClient { .into_inner(); for org in orgs.orgs { if org.locked { - let payer = client.fetch_org(org.oui, &mut org_cache).await?; + let payer = PublicKeyBinary::from(org.payer); if solana .payer_balance(&payer) .await