Skip to content

Commit

Permalink
Monitor funds and re-enable disabled orgs (#481)
Browse files Browse the repository at this point in the history
* Monitor funds and re-enable disabled orgs

* Slight change

* Remove comment

* Add shutdown listener to funds monitor

* Add updates to settings-template

* Fix incorrect comment

* StRuCtUrEd LoGgInG

* More specific comments

* Just check every time... like a RUBE

* Remove disabled_clients HashSet

* Add metric for amount burned

* Add metric to mobile packet verifier

* Remove org_cache
  • Loading branch information
Matthew Plant authored Apr 21, 2023
1 parent e6606fc commit 9913e1f
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 98 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iot_packet_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ futures-util = {workspace = true}
file-store = {path = "../file_store"}
helium-proto = {workspace = true}
helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]}
metrics = {workspace = true}
poc-metrics = {path = "../metrics"}
prost = {workspace = true}
serde = {workspace = true}
Expand Down
8 changes: 6 additions & 2 deletions iot_packet_verifier/pkg/settings-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ burn_period = 1
enable_solana_integration = "false"

# Minimum number of DC left in a balance before we disable the organization.
# Defaults to 35_000_000 DC, which equates to $35
minimum_allowed_balance = 35_000_000
# Defaults to 3_500_000 DC, which equates to $35
minimum_allowed_balance = 3_500_000

# How often we should check the organizations to see if they have repleneshed
# their funds in minutes. Defaults to 30 minutes.
monitor_funds_period = 30

[solana]
# Solana RPC. This may contain a secret
Expand Down
4 changes: 3 additions & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
return Ok(());
};

tracing::info!("Burning {amount} DC from {payer}");
tracing::info!(%amount, %payer, "Burning DC");

let amount = amount as u64;

Expand All @@ -86,6 +86,8 @@ where
// Zero the balance in order to force a reset:
balances.balance = 0;

metrics::counter!("burned", amount, "payer" => payer.to_string());

Ok(())
}
}
26 changes: 20 additions & 6 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use file_store::{
use futures_util::TryFutureExt;
use solana::SolanaRpc;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc::Receiver, Mutex};

struct Daemon {
pool: Pool<Postgres>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, CachedOrgClient>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, Arc<Mutex<CachedOrgClient>>>,
report_files: Receiver<FileInfoStream<PacketRouterPacketReport>>,
valid_packets: FileSinkClient,
invalid_packets: FileSinkClient,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Daemon {
&mut self,
report_file: FileInfoStream<PacketRouterPacketReport>,
) -> Result<()> {
tracing::info!("Verifying file: {}", report_file.file_info);
tracing::info!(file = %report_file.file_info, "Verifying file");

let mut transaction = self.pool.begin().await?;
let reports = report_file.into_stream(&mut transaction).await?;
Expand Down Expand Up @@ -106,7 +106,12 @@ impl Cmd {
let balances = BalanceCache::new(&mut pool, solana.clone()).await?;

// Set up the balance burner:
let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana);
let burner = Burner::new(
pool.clone(),
&balances,
settings.burn_period,
solana.clone(),
);

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
Expand Down Expand Up @@ -150,14 +155,15 @@ impl Cmd {
.await?;

let config_keypair = settings.config_keypair()?;
let config_server = CachedOrgClient::new(org_client, config_keypair);
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
config_server: config_server.clone(),
},
minimum_allowed_balance: settings.minimum_allowed_balance,
};
Expand All @@ -174,6 +180,14 @@ impl Cmd {
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
CachedOrgClient::monitor_funds(
config_server,
solana,
settings.minimum_allowed_balance,
Duration::from_secs(60 * settings.monitor_funds_period),
shutdown_listener.clone(),
)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Expand Down
10 changes: 9 additions & 1 deletion iot_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct Settings {
pub solana: Option<solana::Settings>,
#[serde(default = "default_start_after")]
pub start_after: u64,
/// Number of minutes we should sleep before checking to re-enable
/// any disabled orgs.
#[serde(default = "default_monitor_funds_period")]
pub monitor_funds_period: u64,
}

pub fn default_start_after() -> u64 {
Expand All @@ -46,7 +50,11 @@ pub fn default_log() -> String {
}

pub fn default_minimum_allowed_balance() -> u64 {
35_000_000
3_500_000
}

pub fn default_monitor_funds_period() -> u64 {
30
}

impl Settings {
Expand Down
Loading

0 comments on commit 9913e1f

Please sign in to comment.