Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor funds and re-enable disabled orgs #481

Merged
merged 13 commits into from
Apr 21, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iot_packet_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ futures-util = {workspace = true}
file-store = {path = "../file_store"}
helium-proto = {workspace = true}
helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]}
metrics = {workspace = true}
poc-metrics = {path = "../metrics"}
prost = {workspace = true}
serde = {workspace = true}
Expand Down
8 changes: 6 additions & 2 deletions iot_packet_verifier/pkg/settings-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ burn_period = 1
enable_solana_integration = "false"

# Minimum number of DC left in a balance before we disable the organization.
# Defaults to 35_000_000 DC, which equates to $35
minimum_allowed_balance = 35_000_000
# Defaults to 3_500_000 DC, which equates to $35
minimum_allowed_balance = 3_500_000

# How often we should check the organizations to see if they have repleneshed
# their funds in minutes. Defaults to 30 minutes.
monitor_funds_period = 30

[solana]
# Solana RPC. This may contain a secret
Expand Down
4 changes: 3 additions & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
return Ok(());
};

tracing::info!("Burning {amount} DC from {payer}");
tracing::info!(%amount, %payer, "Burning DC");

let amount = amount as u64;

Expand All @@ -86,6 +86,8 @@ where
// Zero the balance in order to force a reset:
balances.balance = 0;

metrics::counter!("burned", amount, "payer" => payer.to_string());

Ok(())
}
}
26 changes: 20 additions & 6 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use file_store::{
use futures_util::TryFutureExt;
use solana::SolanaRpc;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc::Receiver, Mutex};

struct Daemon {
pool: Pool<Postgres>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, CachedOrgClient>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, Arc<Mutex<CachedOrgClient>>>,
report_files: Receiver<FileInfoStream<PacketRouterPacketReport>>,
valid_packets: FileSinkClient,
invalid_packets: FileSinkClient,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Daemon {
&mut self,
report_file: FileInfoStream<PacketRouterPacketReport>,
) -> Result<()> {
tracing::info!("Verifying file: {}", report_file.file_info);
tracing::info!(file = %report_file.file_info, "Verifying file");

let mut transaction = self.pool.begin().await?;
let reports = report_file.into_stream(&mut transaction).await?;
Expand Down Expand Up @@ -106,7 +106,12 @@ impl Cmd {
let balances = BalanceCache::new(&mut pool, solana.clone()).await?;

// Set up the balance burner:
let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana);
let burner = Burner::new(
pool.clone(),
&balances,
settings.burn_period,
solana.clone(),
);

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
Expand Down Expand Up @@ -150,14 +155,15 @@ impl Cmd {
.await?;

let config_keypair = settings.config_keypair()?;
let config_server = CachedOrgClient::new(org_client, config_keypair);
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
config_server: config_server.clone(),
},
minimum_allowed_balance: settings.minimum_allowed_balance,
};
Expand All @@ -174,6 +180,14 @@ impl Cmd {
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
CachedOrgClient::monitor_funds(
config_server,
solana,
settings.minimum_allowed_balance,
Duration::from_secs(60 * settings.monitor_funds_period),
shutdown_listener.clone(),
)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Expand Down
10 changes: 9 additions & 1 deletion iot_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct Settings {
pub solana: Option<solana::Settings>,
#[serde(default = "default_start_after")]
pub start_after: u64,
/// Number of minutes we should sleep before checking to re-enable
/// any disabled orgs.
#[serde(default = "default_monitor_funds_period")]
pub monitor_funds_period: u64,
}

pub fn default_start_after() -> u64 {
Expand All @@ -46,7 +50,11 @@ pub fn default_log() -> String {
}

pub fn default_minimum_allowed_balance() -> u64 {
35_000_000
3_500_000
}

pub fn default_monitor_funds_period() -> u64 {
maplant marked this conversation as resolved.
Show resolved Hide resolved
30
}

impl Settings {
Expand Down
Loading