Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor funds and re-enable disabled orgs #481

Merged
merged 13 commits into from
Apr 21, 2023
8 changes: 6 additions & 2 deletions iot_packet_verifier/pkg/settings-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ burn_period = 1
enable_solana_integration = "false"

# Minimum number of DC left in a balance before we disable the organization.
# Defaults to 35_000_000 DC, which equates to $35
minimum_allowed_balance = 35_000_000
# Defaults to 3_500_000 DC, which equates to $35
minimum_allowed_balance = 3_500_000

# How often we should check the organizations to see if they have repleneshed
# their funds in minutes. Defaults to 30 minutes.
monitor_funds_period = 30

[solana]
# Solana RPC. This may contain a secret
Expand Down
2 changes: 1 addition & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
return Ok(());
};

tracing::info!("Burning {amount} DC from {payer}");
tracing::info!(%amount, %payer, "Burning DC");

let amount = amount as u64;

Expand Down
26 changes: 20 additions & 6 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use file_store::{
use futures_util::TryFutureExt;
use solana::SolanaRpc;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc::Receiver, Mutex};

struct Daemon {
pool: Pool<Postgres>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, CachedOrgClient>,
verifier: Verifier<BalanceCache<Option<Arc<SolanaRpc>>>, Arc<Mutex<CachedOrgClient>>>,
report_files: Receiver<FileInfoStream<PacketRouterPacketReport>>,
valid_packets: FileSinkClient,
invalid_packets: FileSinkClient,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Daemon {
&mut self,
report_file: FileInfoStream<PacketRouterPacketReport>,
) -> Result<()> {
tracing::info!("Verifying file: {}", report_file.file_info);
tracing::info!(file = %report_file.file_info, "Verifying file");

let mut transaction = self.pool.begin().await?;
let reports = report_file.into_stream(&mut transaction).await?;
Expand Down Expand Up @@ -106,7 +106,12 @@ impl Cmd {
let balances = BalanceCache::new(&mut pool, solana.clone()).await?;

// Set up the balance burner:
let burner = Burner::new(pool.clone(), &balances, settings.burn_period, solana);
let burner = Burner::new(
pool.clone(),
&balances,
settings.burn_period,
solana.clone(),
);

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
Expand Down Expand Up @@ -150,14 +155,15 @@ impl Cmd {
.await?;

let config_keypair = settings.config_keypair()?;
let config_server = CachedOrgClient::new(org_client, config_keypair);
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
config_server: config_server.clone(),
},
minimum_allowed_balance: settings.minimum_allowed_balance,
};
Expand All @@ -174,6 +180,14 @@ impl Cmd {
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
CachedOrgClient::monitor_funds(
config_server,
solana,
settings.minimum_allowed_balance,
Duration::from_secs(60 * settings.monitor_funds_period),
shutdown_listener.clone(),
)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Expand Down
10 changes: 9 additions & 1 deletion iot_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct Settings {
pub solana: Option<solana::Settings>,
#[serde(default = "default_start_after")]
pub start_after: u64,
/// Number of minutes we should sleep before checking to re-enable
/// any disabled orgs.
#[serde(default = "default_monitor_funds_period")]
pub monitor_funds_period: u64,
}

pub fn default_start_after() -> u64 {
Expand All @@ -46,7 +50,11 @@ pub fn default_log() -> String {
}

pub fn default_minimum_allowed_balance() -> u64 {
35_000_000
3_500_000
}

pub fn default_monitor_funds_period() -> u64 {
maplant marked this conversation as resolved.
Show resolved Hide resolved
30
}

impl Settings {
Expand Down
186 changes: 132 additions & 54 deletions iot_packet_verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ use helium_proto::services::{
};
use helium_proto::{
services::{
iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1},
iot_config::{config_org_client::OrgClient, OrgDisableReqV1, OrgEnableReqV1, OrgListReqV1},
Channel,
},
Message,
};
use solana::SolanaNetwork;
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
convert::Infallible,
fmt::Debug,
mem,
sync::Arc,
};
use tokio::sync::Mutex;
use tokio::{
sync::Mutex,
task::JoinError,
time::{sleep_until, Duration, Instant},
};

pub struct Verifier<D, C> {
pub debiter: D,
Expand Down Expand Up @@ -83,7 +87,7 @@ where
.await
.map_err(VerificationError::DebitError)?;

if remaining_balance.is_some() {
if let Some(remaining_balance) = remaining_balance {
pending_burns
.add_burned_amount(&payer, debit_amount)
.await
Expand All @@ -98,6 +102,13 @@ where
})
.await
.map_err(VerificationError::ValidPacketWriterError)?;

if remaining_balance < minimum_allowed_balance {
self.config_server
.disable_org(report.oui)
.await
.map_err(VerificationError::ConfigError)?;
}
} else {
invalid_packets
.write(InvalidPacket {
Expand All @@ -109,13 +120,6 @@ where
.await
.map_err(VerificationError::InvalidPacketWriterError)?;
}
match remaining_balance {
Some(remaining_balance) if remaining_balance >= minimum_allowed_balance => {
self.config_server.enable_org(report.oui).await
}
_ => self.config_server.disable_org(report.oui).await,
}
.map_err(VerificationError::ConfigError)?
}

Ok(())
Expand Down Expand Up @@ -164,14 +168,12 @@ pub trait ConfigServer {
type Error;

async fn fetch_org(
&mut self,
&self,
oui: u64,
cache: &mut HashMap<u64, PublicKeyBinary>,
) -> Result<PublicKeyBinary, Self::Error>;

async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error>;

async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn disable_org(&self, oui: u64) -> Result<(), Self::Error>;
}

// TODO: Move this somewhere else
Expand All @@ -180,74 +182,150 @@ pub trait ConfigServer {
// consistent with BalanceCache
pub struct CachedOrgClient {
pub keypair: Keypair,
pub enabled_clients: HashMap<u64, bool>,
pub client: OrgClient<Channel>,
}

impl CachedOrgClient {
pub fn new(client: OrgClient<Channel>, keypair: Keypair) -> Self {
CachedOrgClient {
keypair,
enabled_clients: HashMap::new(),
client,
pub fn new(client: OrgClient<Channel>, keypair: Keypair) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(CachedOrgClient { keypair, client }))
}

async fn enable_org(&mut self, oui: u64) -> Result<(), OrgClientError> {
tracing::info!(%oui, "enabling org");

let mut req = OrgEnableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.enable(req).await?;
maplant marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

async fn disable_org(&mut self, oui: u64) -> Result<(), OrgClientError> {
tracing::info!(%oui, "disabling org");

let mut req = OrgDisableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.disable(req).await?;
Ok(())
}

pub fn monitor_funds<S>(
client: Arc<Mutex<Self>>,
solana: S,
minimum_allowed_balance: u64,
monitor_period: Duration,
shutdown: triggered::Listener,
) -> impl std::future::Future<Output = Result<(), MonitorError<S::Error>>>
where
S: SolanaNetwork,
{
let join_handle = tokio::spawn(async move {
// TODO: Move this somewhere else, use retainer
let mut org_cache = HashMap::new();
loop {
tracing::info!("Checking if any orgs need to be re-enabled");

// Fetch all disables orgs:
let mut disabled_clients = HashSet::new();
let orgs = client
.lock()
.await
.client
.list(OrgListReqV1 {})
.await
.map_err(OrgClientError::RpcError)?
.into_inner();
for org in orgs.orgs {
if org.locked {
disabled_clients.insert(org.oui);
let payer = client.fetch_org(org.oui, &mut org_cache).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see you using disabled_clients anywhere, you're just inserting into it and then dropping it at the end of each loop?
also, i don't think you need the outer org_cache any longer since you're calling org list on each run and that's sending you back the org record, including the payer field you're using to query solana. unlike the other, you are using this cache from one run to the next, but since you're always calling the org list command and getting back the same info you need the cache isn't doing anything for you

if solana
.payer_balance(&payer)
.await
.map_err(MonitorError::SolanaError)?
>= minimum_allowed_balance
{
client.lock().await.enable_org(org.oui).await?;
}
}
}
// Sleep until we should re-check the monitor
sleep_until(Instant::now() + monitor_period).await;
}
});
async move {
tokio::select! {
result = join_handle => match result {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(MonitorError::from(err)),
},
_ = shutdown => Ok(())
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum MonitorError<S> {
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
#[error("Org client error: {0}")]
OrcClientError(#[from] OrgClientError),
#[error("Solana error: {0}")]
SolanaError(S),
}

#[derive(thiserror::Error, Debug)]
pub enum OrgClientError {
#[error("Rpc error: {0}")]
RpcError(#[from] tonic::Status),
#[error("Crypto error: {0}")]
CryptoError(#[from] helium_crypto::Error),
#[error("No org")]
NoOrg,
}

#[async_trait]
impl ConfigServer for CachedOrgClient {
impl ConfigServer for Arc<Mutex<CachedOrgClient>> {
type Error = OrgClientError;

async fn fetch_org(
&mut self,
&self,
oui: u64,
cache: &mut HashMap<u64, PublicKeyBinary>,
) -> Result<PublicKeyBinary, Self::Error> {
if let Entry::Vacant(e) = cache.entry(oui) {
let req = OrgGetReqV1 { oui };
let pubkey =
PublicKeyBinary::from(self.client.get(req).await?.into_inner().org.unwrap().payer);
let pubkey = PublicKeyBinary::from(
self.lock()
.await
.client
.get(req)
.await?
.into_inner()
.org
.ok_or(OrgClientError::NoOrg)?
.payer,
);
e.insert(pubkey);
}
Ok(cache.get(&oui).unwrap().clone())
}

async fn enable_org(&mut self, oui: u64) -> Result<(), Self::Error> {
if !mem::replace(self.enabled_clients.entry(oui).or_insert(false), true) {
let mut req = OrgEnableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.enable(req).await?;
}
Ok(())
}

async fn disable_org(&mut self, oui: u64) -> Result<(), Self::Error> {
if mem::replace(self.enabled_clients.entry(oui).or_insert(true), false) {
let mut req = OrgDisableReqV1 {
oui,
timestamp: Utc::now().timestamp_millis() as u64,
signer: self.keypair.public_key().into(),
signature: vec![],
};
let signature = self.keypair.sign(&req.encode_to_vec())?;
req.signature = signature;
let _ = self.client.disable(req).await?;
}
Ok(())
async fn disable_org(&self, oui: u64) -> Result<(), Self::Error> {
self.lock().await.disable_org(oui).await
}
}

Expand Down
Loading