Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor funds and re-enable disabled orgs #481

Merged
merged 13 commits into from
Apr 21, 2023
8 changes: 6 additions & 2 deletions iot_packet_verifier/pkg/settings-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ burn_period = 1
enable_solana_integration = "false"

# Minimum number of DC left in a balance before we disable the organization.
# Defaults to 35_000_000 DC, which equates to $35
minimum_allowed_balance = 35_000_000
# Defaults to 3_500_000 DC, which equates to $35
minimum_allowed_balance = 3_500_000

# How often we should check the organizations to see if they have repleneshed
# their funds. Defaults to 30 minutes.
monitor_funds_period = 30

[solana]
# Solana RPC. This may contain a secret
Expand Down
2 changes: 1 addition & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
return Ok(());
};

tracing::info!("Burning {amount} DC from {payer}");
tracing::info!(%amount, %payer, "Burning DC");

let amount = amount as u64;

Expand Down
26 changes: 20 additions & 6 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use file_store::{
use futures_util::TryFutureExt;
use solana::SolanaRpc;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc::Receiver, Mutex};

struct Daemon {
pool: Pool<Postgres>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, CachedOrgClient>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, Arc<Mutex<CachedOrgClient>>>,
report_files: Receiver<FileInfoStream<PacketRouterPacketReport>>,
valid_packets: FileSinkClient,
invalid_packets: FileSinkClient,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Daemon {
&mut self,
report_file: FileInfoStream<PacketRouterPacketReport>,
) -> Result<()> {
tracing::info!("Verifying file: {}", report_file.file_info);
tracing::info!(file = %report_file.file_info, "Verifying file");

let mut transaction = self.pool.begin().await?;
let reports = report_file.into_stream(&mut transaction).await?;
Expand Down Expand Up @@ -106,7 +106,12 @@ impl Cmd {
let balances = BalanceCache::new(&mut pool, solana.clone()).await?;

// Set up the balance burner:
let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana);
let burner = Burner::new(
pool.clone(),
&balances,
settings.burn_period,
solana.clone(),
);

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
Expand Down Expand Up @@ -150,14 +155,15 @@ impl Cmd {
.await?;

let config_keypair = settings.config_keypair()?;
let config_server = CachedOrgClient::new(org_client, config_keypair).await?;
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
config_server: config_server.clone(),
},
minimum_allowed_balance: settings.minimum_allowed_balance,
};
Expand All @@ -174,6 +180,14 @@ impl Cmd {
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
CachedOrgClient::monitor_funds(
config_server,
solana,
settings.minimum_allowed_balance,
Duration::from_secs(60 * settings.monitor_funds_period),
shutdown_listener.clone(),
)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Expand Down
8 changes: 7 additions & 1 deletion iot_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct Settings {
pub solana: Option<solana::Settings>,
#[serde(default = "default_start_after")]
pub start_after: u64,
#[serde(default = "default_monitor_funds_period")]
pub monitor_funds_period: u64,
}

pub fn default_start_after() -> u64 {
Expand All @@ -46,7 +48,11 @@ pub fn default_log() -> String {
}

pub fn default_minimum_allowed_balance() -> u64 {
35_000_000
3_500_000
}

pub fn default_monitor_funds_period() -> u64 {
maplant marked this conversation as resolved.
Show resolved Hide resolved
30
}

impl Settings {
Expand Down
192 changes: 140 additions & 52 deletions iot_packet_verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ use helium_proto::services::{
};
use helium_proto::{
services::{
iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1},
iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1, OrgListReqV1},
Channel,
},
Message,
};
use solana::SolanaNetwork;
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
convert::Infallible,
fmt::Debug,
mem,
sync::Arc,
};
use tokio::sync::Mutex;
use tokio::{
sync::Mutex,
task::JoinError,
time::{sleep_until, Duration, Instant},
};

pub struct Verifier<D, C> {
pub debiter: D,
Expand Down Expand Up @@ -83,7 +87,7 @@ where
.await
.map_err(VerificationError::DebitError)?;

if remaining_balance.is_some() {
if let Some(remaining_balance) = remaining_balance {
pending_burns
.add_burned_amount(&payer, debit_amount)
.await
Expand All @@ -98,6 +102,13 @@ where
})
.await
.map_err(VerificationError::ValidPacketWriterError)?;

if remaining_balance < minimum_allowed_balance {
self.config_server
.disable_org(report.oui)
.await
.map_err(VerificationError::ConfigError)?;
}
} else {
invalid_packets
.write(InvalidPacket {
Expand All @@ -109,13 +120,6 @@ where
.await
.map_err(VerificationError::InvalidPacketWriterError)?;
}
match remaining_balance {
Some(remaining_balance) if remaining_balance >= minimum_allowed_balance => {
self.config_server.enable_org(report.oui).await
}
_ => self.config_server.disable_org(report.oui).await,
}
.map_err(VerificationError::ConfigError)?
}

Ok(())
Expand Down Expand Up @@ -164,14 +168,12 @@ pub trait ConfigServer {
type Error;

async fn fetch_org(
&mut self,
&self,
oui: u64,
cache: &mut HashMap<u64, PublicKeyBinary>,
) -> Result<PublicKeyBinary, Self::Error>;

async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>;

async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>;
}

// TODO: Move this somewhere else
Expand All @@ -180,74 +182,160 @@ pub trait ConfigServer {
// consistent with BalanceCache
pub struct CachedOrgClient {
pub keypair: Keypair,
pub enabled_clients: HashMap<u64, bool>,
pub disabled_clients: HashSet<u64>,
pub client: OrgClient<Channel>,
}

impl CachedOrgClient {
pub fn new(client: OrgClient<Channel>, keypair: Keypair) -> Self {
CachedOrgClient {
pub async fn new(
mut client: OrgClient<Channel>,
keypair: Keypair,
) -> Result<Arc<Mutex<Self>>, tonic::Status> {
// Fetch all disables orgs:
let mut disabled_clients = HashSet::new();
let orgs = client.list(OrgListReqV1 {}).await?.into_inner();
for org in orgs.orgs {
if org.locked {
disabled_clients.insert(org.oui);
}
}
Ok(Arc::new(Mutex::new(CachedOrgClient {
keypair,
enabled_clients: HashMap::new(),
disabled_clients,
client,
})))
}

async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> {
tracing::info!(%oui, "enabling org");

self.disabled_clients.remove(&oui);
let mut req = OrgEnableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.enable(req).await?;
maplant marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> {
tracing::info!(%oui, "disabling org");

self.disabled_clients.insert(oui);
let mut req = OrgDisableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.disable(req).await?;
Ok(())
}

pub fn monitor_funds<S>(
client: Arc<Mutex<Self>>,
solana: S,
minimum_allowed_balance: u64,
monitor_period: Duration,
shutdown: triggered::Listener,
) -> impl std::future::Future<Output = Result<(), MonitorError<S::Error>>>
where
S: SolanaNetwork,
{
let join_handle = tokio::spawn(async move {
// TODO: Move this somewhere else, use retainer
let mut org_cache = HashMap::new();
loop {
tracing::info!("Checking if any orgs need to be re-enabled");

// Go through and check balances of all orgs and re-enable if they
// have a greater than minimum balance.
// This could almost certainly become a Stream.
let disabled_clients = client.lock().await.disabled_clients.clone();
for disabled_org in disabled_clients {
// Check balance
let payer = client.fetch_org(disabled_org, &mut org_cache).await?;
if solana
.payer_balance(&payer)
.await
.map_err(MonitorError::SolanaError)?
>= minimum_allowed_balance
{
client.lock().await.enable_org(disabled_org).await?;
}
}
// Sleep until we should re-check the monitor
sleep_until(Instant::now() + monitor_period).await;
}
});
async move {
tokio::select! {
result = join_handle => match result {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(MonitorError::from(err)),
},
_ = shutdown => Ok(())
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum MonitorError<S> {
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
#[error("Org client error: {0}")]
OrcClientError(#[from] OrgClientError),
#[error("Solana error: {0}")]
SolanaError(S),
}

#[derive(thiserror::Error, Debug)]
pub enum OrgClientError {
#[error("Rpc error: {0}")]
RpcError(#[from] tonic::Status),
#[error("Crypto error: {0}")]
CryptoError(#[from] helium_crypto::Error),
#[error("No org")]
NoOrg,
}

#[async_trait]
impl ConfigServer for CachedOrgClient {
impl ConfigServer for Arc<Mutex<CachedOrgClient>> {
type Error = OrgClientError;

async fn fetch_org(
&mut self,
&self,
oui: u64,
cache: &mut HashMap<u64, PublicKeyBinary>,
) -> Result<PublicKeyBinary, Self::Error> {
if let Entry::Vacant(e) = cache.entry(oui) {
let req = OrgGetReqV1 { oui };
let pubkey =
PublicKeyBinary::from(self.client.get(req).await?.into_inner().org.unwrap().payer);
let pubkey = PublicKeyBinary::from(
self.lock()
.await
.client
.get(req)
.await?
.into_inner()
.org
.ok_or(OrgClientError::NoOrg)?
.payer,
);
e.insert(pubkey);
}
Ok(cache.get(&oui).unwrap().clone())
}

async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error> {
if !mem::replace(self.enabled_clients.entry(oui).or_insert(false), true) {
let mut req = OrgEnableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.enable(req).await?;
}
Ok(())
}

async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error> {
if mem::replace(self.enabled_clients.entry(oui).or_insert(true), false) {
let mut req = OrgDisableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.disable(req).await?;
}
Ok(())
async fn disable_org(&self, oui: u64) -> Result<(), Self::Error> {
self.lock().await.disable_org(oui).await
}
}

Expand Down
Loading