Skip to content

Commit

Permalink
Remove check when testing locally, standardize cli for packet verifie…
Browse files Browse the repository at this point in the history
…rs (#430)

* Only check for invalid keypair if we're burning

* Standardize!
  • Loading branch information
Matthew Plant authored Apr 10, 2023
1 parent 8b80d33 commit b8111eb
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 180 deletions.
2 changes: 1 addition & 1 deletion iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Burner {
keypair: Keypair,
) -> Result<Self, BurnError> {
let program_cache = BurnProgramCache::new(settings, provider.as_ref()).await?;
if program_cache.dc_burn_authority != keypair.pubkey() {
if settings.enable_dc_burn && program_cache.dc_burn_authority != keypair.pubkey() {
return Err(BurnError::InvalidKeypair);
}
Ok(Self {
Expand Down
211 changes: 108 additions & 103 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,109 +70,114 @@ impl Daemon {
}
}

pub async fn run_daemon(settings: &Settings) -> Result<()> {
poc_metrics::install_metrics();

let (shutdown_trigger, shutdown_listener) = triggered::trigger();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown_trigger.trigger()
});

// Set up the postgres pool:
let (pool, db_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana RpcClient:
let rpc_client = Arc::new(RpcClient::new(settings.solana_rpc.clone()));

let (sub_dao, _) = Pubkey::find_program_address(
&["sub_dao".as_bytes(), settings.dnt_mint()?.as_ref()],
&helium_sub_daos::ID,
);

// Set up the balance cache:
let balances = BalanceCache::new(&pool, sub_dao, rpc_client.clone()).await?;

// Set up the balance burner:
let burn_keypair = match read_keypair_file(&settings.burn_keypair) {
Ok(kp) => kp,
Err(e) => bail!("Failed to read keypair file ({})", e),
};
let burner = Burner::new(settings, &pool, &balances, rpc_client, burn_keypair).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?;

let store_base_path = std::path::Path::new(&settings.cache);

// Verified packets:
let (valid_packets, mut valid_packets_server) = FileSinkBuilder::new(
FileType::IotValidPacket,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_valid_packets"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.create()
.await?;

let (invalid_packets, mut invalid_packets_server) = FileSinkBuilder::new(
FileType::InvalidPacket,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.create()
.await?;

let org_client = settings.connect_org();

let file_store = FileStore::from_settings(&settings.ingest).await?;

let (report_files, source_join_handle) =
file_source::continuous_source::<PacketRouterPacketReport>()
.db(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(
Utc.timestamp_millis_opt(0).unwrap(),
))
.file_type(FileType::IotPacketReport)
.build()?
.start(shutdown_listener.clone())
#[derive(Debug, clap::Args)]
pub struct Cmd {}

impl Cmd {
pub async fn run(self, settings: &Settings) -> Result<()> {
poc_metrics::install_metrics();

let (shutdown_trigger, shutdown_listener) = triggered::trigger();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown_trigger.trigger()
});

// Set up the postgres pool:
let (pool, db_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana RpcClient:
let rpc_client = Arc::new(RpcClient::new(settings.solana_rpc.clone()));

let (sub_dao, _) = Pubkey::find_program_address(
&["sub_dao".as_bytes(), settings.dnt_mint()?.as_ref()],
&helium_sub_daos::ID,
);

// Set up the balance cache:
let balances = BalanceCache::new(&pool, sub_dao, rpc_client.clone()).await?;

// Set up the balance burner:
let burn_keypair = match read_keypair_file(&settings.burn_keypair) {
Ok(kp) => kp,
Err(e) => bail!("Failed to read keypair file ({})", e),
};
let burner = Burner::new(settings, &pool, &balances, rpc_client, burn_keypair).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?;

let store_base_path = std::path::Path::new(&settings.cache);

// Verified packets:
let (valid_packets, mut valid_packets_server) = FileSinkBuilder::new(
FileType::IotValidPacket,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_valid_packets"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.create()
.await?;

let (invalid_packets, mut invalid_packets_server) = FileSinkBuilder::new(
FileType::InvalidPacket,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.create()
.await?;

let config_keypair = settings.config_keypair()?;
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
},
};

// Run the services:
tokio::try_join!(
db_handle.map_err(Error::from),
burner.run(&shutdown_listener).map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
verifier_daemon.run(&shutdown_listener).map_err(Error::from),
valid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Ok(())
let org_client = settings.connect_org();

let file_store = FileStore::from_settings(&settings.ingest).await?;

let (report_files, source_join_handle) =
file_source::continuous_source::<PacketRouterPacketReport>()
.db(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(
Utc.timestamp_millis_opt(0).unwrap(),
))
.file_type(FileType::IotPacketReport)
.build()?
.start(shutdown_listener.clone())
.await?;

let config_keypair = settings.config_keypair()?;
let verifier_daemon = Daemon {
pool,
report_files,
valid_packets,
invalid_packets,
verifier: Verifier {
debiter: balances,
config_server: CachedOrgClient::new(org_client, config_keypair),
},
};

// Run the services:
tokio::try_join!(
db_handle.map_err(Error::from),
burner.run(&shutdown_listener).map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
verifier_daemon.run(&shutdown_listener).map_err(Error::from),
valid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
invalid_packets_server
.run(&shutdown_listener)
.map_err(Error::from),
source_join_handle.map_err(Error::from),
)?;

Ok(())
}
}
18 changes: 17 additions & 1 deletion iot_packet_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Cli {
/// settins in the given file.
#[clap(short = 'c')]
config: Option<PathBuf>,

#[clap(subcommand)]
cmd: Cmd,
}

impl Cli {
Expand All @@ -22,7 +25,20 @@ impl Cli {
.with(tracing_subscriber::EnvFilter::new(&settings.log))
.with(tracing_subscriber::fmt::layer())
.init();
daemon::run_daemon(&settings).await
self.cmd.run(settings).await
}
}

#[derive(clap::Subcommand)]
pub enum Cmd {
Server(daemon::Cmd),
}

impl Cmd {
async fn run(self, settings: Settings) -> Result<()> {
match self {
Self::Server(cmd) => cmd.run(&settings).await,
}
}
}

Expand Down
83 changes: 81 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::{burner::Burner, settings::Settings};
use anyhow::{bail, Error, Result};
use file_store::{
file_info_poller::FileInfoStream, mobile_session::DataTransferSessionIngestReport,
file_info_poller::FileInfoStream, file_source, file_upload,
mobile_session::DataTransferSessionIngestReport, FileSinkBuilder, FileStore, FileType,
};
use futures_util::TryFutureExt;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::read_keypair_file;
use sqlx::{Pool, Postgres};
use tokio::{
sync::mpsc::Receiver,
Expand Down Expand Up @@ -30,7 +35,7 @@ impl Daemon {
}
}

pub async fn run(mut self, shutdown: &triggered::Listener) -> anyhow::Result<()> {
pub async fn run(mut self, shutdown: &triggered::Listener) -> Result<()> {
let mut burn_time = Instant::now() + self.burn_period;
loop {
tokio::select! {
Expand All @@ -55,3 +60,77 @@ impl Daemon {
}
}
}

#[derive(Debug, clap::Args)]
pub struct Cmd {}

impl Cmd {
pub async fn run(self, settings: &Settings) -> Result<()> {
poc_metrics::install_metrics();

let (shutdown_trigger, shutdown_listener) = triggered::trigger();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown_trigger.trigger()
});

// Set up the postgres pool:
let (pool, conn_handler) = settings
.database
.connect("mobile-packet-verifier", shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana RpcClient:
let rpc_client = RpcClient::new(settings.solana_rpc.clone());

// Set up the balance burner:
let burn_keypair = match read_keypair_file(&settings.burn_keypair) {
Ok(kp) => kp,
Err(e) => bail!("Failed to read keypair file ({})", e),
};

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?;

let store_base_path = std::path::Path::new(&settings.cache);

let (valid_sessions, mut valid_sessions_server) = FileSinkBuilder::new(
FileType::ValidDataTransferSession,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(true)
.create()
.await?;

let burner = Burner::new(settings, valid_sessions, rpc_client, burn_keypair).await?;

let file_store = FileStore::from_settings(&settings.ingest).await?;

let (reports, source_join_handle) =
file_source::continuous_source::<DataTransferSessionIngestReport>()
.db(pool.clone())
.store(file_store)
.file_type(FileType::DataTransferSessionIngestReport)
.build()?
.start(shutdown_listener.clone())
.await?;

let daemon = Daemon::new(settings, pool, reports, burner);

tokio::try_join!(
source_join_handle.map_err(Error::from),
valid_sessions_server
.run(&shutdown_listener)
.map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
)?;

Ok(())
}
}
Loading

0 comments on commit b8111eb

Please sign in to comment.