diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 88947194a..9de1c6669 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -9,11 +9,11 @@ use helium_proto::services::poc_mobile::{ }; use sqlx::{Postgres, Transaction}; -use crate::{event_ids, MobileConfigResolverExt}; +use crate::{event_ids, pending_burns, MobileConfigResolverExt}; pub async fn accumulate_sessions( mobile_config: &impl MobileConfigResolverExt, - conn: &mut Transaction<'_, Postgres>, + txn: &mut Transaction<'_, Postgres>, verified_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, @@ -21,7 +21,7 @@ pub async fn accumulate_sessions( tokio::pin!(reports); while let Some(report) = reports.next().await { - let report_validity = verify_report(conn, mobile_config, &report).await?; + let report_validity = verify_report(txn, mobile_config, &report).await?; write_verified_report( verified_data_session_report_sink, report_validity, @@ -37,26 +37,7 @@ pub async fn accumulate_sessions( continue; } - let event = report.report.data_transfer_usage; - sqlx::query( - r#" - INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $6) - ON CONFLICT (pub_key, payer) DO UPDATE SET - uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, - last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) - "# - ) - .bind(event.pub_key) - .bind(event.payer) - .bind(event.upload_bytes as i64) - .bind(event.download_bytes as i64) - .bind(report.report.rewardable_bytes as i64) - .bind(curr_file_ts) - .execute(&mut *conn) - .await?; + pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?; } Ok(()) @@ -125,7 +106,7 @@ mod tests { use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology; use sqlx::PgPool; - use crate::burner::DataTransferSession; + use crate::pending_burns::DataTransferSession; use super::*; diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index f3cb54617..73d792563 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,34 +1,10 @@ -use chrono::{DateTime, Utc}; -use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; use solana::burn::SolanaNetwork; -use sqlx::{FromRow, Pool, Postgres}; -use std::collections::HashMap; +use sqlx::{Pool, Postgres}; -#[derive(FromRow)] -pub struct DataTransferSession { - pub_key: PublicKeyBinary, - payer: PublicKeyBinary, - uploaded_bytes: i64, - downloaded_bytes: i64, - rewardable_bytes: i64, - first_timestamp: DateTime, - last_timestamp: DateTime, -} - -#[derive(Default)] -pub struct PayerTotals { - total_dcs: u64, - sessions: Vec, -} - -impl PayerTotals { - fn push_sess(&mut self, sess: DataTransferSession) { - self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64); - self.sessions.push(sess); - } -} +use crate::pending_burns; pub struct Burner { valid_sessions: FileSinkClient, @@ -44,49 +20,17 @@ impl Burner { } } -#[derive(thiserror::Error, Debug)] -pub enum BurnError { - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), - #[error("solana error: {0}")] - SolanaError(E), -} - impl Burner where S: SolanaNetwork, { - pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { - // Fetch all of the sessions - let sessions: Vec = - sqlx::query_as("SELECT * FROM data_transfer_sessions") - .fetch_all(pool) - .await?; - - // Fetch all of the sessions and group by the payer - let mut payer_totals = HashMap::::new(); - for session in sessions.into_iter() { - payer_totals - .entry(session.payer.clone()) - .or_default() - .push_sess(session); - } + pub async fn burn(&self, pool: &Pool) -> anyhow::Result<()> { + for payer_pending_burn in pending_burns::get_all_payer_burns(pool).await? { + let payer = payer_pending_burn.payer; + let total_dcs = payer_pending_burn.total_dcs; + let sessions = payer_pending_burn.sessions; - for ( - payer, - PayerTotals { - total_dcs, - sessions, - }, - ) in payer_totals.into_iter() - { - let payer_balance = self - .solana - .payer_balance(&payer) - .await - .map_err(BurnError::SolanaError)?; + let payer_balance = self.solana.payer_balance(&payer).await?; if payer_balance < total_dcs { tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); @@ -107,28 +51,11 @@ where .increment(total_dcs); // Delete from the data transfer session and write out to S3 - - sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") - .bind(&payer) - .execute(pool) - .await?; + pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; for session in sessions { - let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); self.valid_sessions - .write( - ValidDataTransferSession { - pub_key: session.pub_key.into(), - payer: session.payer.into(), - upload_bytes: session.uploaded_bytes as u64, - download_bytes: session.downloaded_bytes as u64, - rewardable_bytes: session.rewardable_bytes as u64, - num_dcs, - first_timestamp: session.first_timestamp.encode_timestamp_millis(), - last_timestamp: session.last_timestamp.encode_timestamp_millis(), - }, - &[], - ) + .write(ValidDataTransferSession::from(session), &[]) .await?; } } @@ -146,11 +73,3 @@ where Ok(()) } } - -const BYTES_PER_DC: u64 = 20_000; - -fn bytes_to_dc(bytes: u64) -> u64 { - let bytes = bytes.max(BYTES_PER_DC); - // Integer div/ceil from: https://stackoverflow.com/a/2745086 - (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC -} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 6a7a8853f..685272b91 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,6 +1,6 @@ use crate::{ - burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients, - MobileConfigResolverExt, + burner::Burner, event_ids::EventIdPurger, pending_burns, settings::Settings, + MobileConfigClients, MobileConfigResolverExt, }; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; @@ -73,23 +73,13 @@ where S: SolanaNetwork, MCR: MobileConfigResolverExt, { - pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { + pub async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> { // Set the initial burn period to one minute let mut burn_time = Instant::now() + Duration::from_secs(60); loop { tokio::select! { - file = self.reports.recv() => { - let Some(file) = file else { - anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); - }; - tracing::info!("Verifying file: {}", file.file_info); - let ts = file.file_info.timestamp; - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?; - transaction.commit().await?; - self.verified_data_session_report_sink.commit().await?; - }, + biased; + _ = &mut shutdown => return Ok(()), _ = sleep_until(burn_time) => { // It's time to burn match self.burner.burn(&self.pool).await { @@ -102,7 +92,18 @@ where } } } - _ = shutdown.clone() => return Ok(()), + file = self.reports.recv() => { + let Some(file) = file else { + anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); + }; + tracing::info!("Verifying file: {}", file.file_info); + let ts = file.file_info.timestamp; + let mut transaction = self.pool.begin().await?; + let reports = file.into_stream(&mut transaction).await?; + crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?; + transaction.commit().await?; + self.verified_data_session_report_sink.commit().await?; + } } } } @@ -119,6 +120,8 @@ impl Cmd { let pool = settings.database.connect("mobile-packet-verifier").await?; sqlx::migrate!().run(&pool).await?; + pending_burns::initialize(&pool).await?; + // Set up the solana network: let solana = if settings.enable_solana_integration { let Some(ref solana_settings) = settings.solana else { diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 9d80a855e..9ddb71634 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -8,6 +8,7 @@ pub mod accumulate; pub mod burner; pub mod daemon; pub mod event_ids; +pub mod pending_burns; pub mod settings; pub struct MobileConfigClients { diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs new file mode 100644 index 000000000..b6dca6384 --- /dev/null +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -0,0 +1,179 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use file_store::{mobile_session::DataTransferSessionReq, traits::TimestampEncode}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::packet_verifier::ValidDataTransferSession; +use sqlx::{prelude::FromRow, Pool, Postgres, Row, Transaction}; + +const METRIC_NAME: &str = "pending_dc_burn"; + +#[derive(FromRow)] +pub struct DataTransferSession { + pub_key: PublicKeyBinary, + payer: PublicKeyBinary, + uploaded_bytes: i64, + downloaded_bytes: i64, + rewardable_bytes: i64, + first_timestamp: DateTime, + last_timestamp: DateTime, +} + +impl DataTransferSession { + pub fn dc_to_burn(&self) -> u64 { + bytes_to_dc(self.rewardable_bytes as u64) + } +} + +impl From for ValidDataTransferSession { + fn from(session: DataTransferSession) -> Self { + let num_dcs = session.dc_to_burn(); + + ValidDataTransferSession { + pub_key: session.pub_key.into(), + payer: session.payer.into(), + upload_bytes: session.uploaded_bytes as u64, + download_bytes: session.downloaded_bytes as u64, + rewardable_bytes: session.rewardable_bytes as u64, + num_dcs, + first_timestamp: session.first_timestamp.encode_timestamp_millis(), + last_timestamp: session.last_timestamp.encode_timestamp_millis(), + } + } +} + +pub struct PendingPayerBurn { + pub payer: PublicKeyBinary, + pub total_dcs: u64, + pub sessions: Vec, +} + +pub async fn initialize(conn: &Pool) -> anyhow::Result<()> { + let results = sqlx::query( + r#" + SELECT payer, sum(rewardable_bytes)::bigint as total_rewardable_bytes + FROM data_transfer_sessions + GROUP BY payer + "#, + ) + .fetch_all(conn) + .await?; + + for row in results { + let payer: PublicKeyBinary = row.get("payer"); + let total_rewardable_bytes: u64 = row.get::("total_rewardable_bytes") as u64; + + set_metric(&payer, bytes_to_dc(total_rewardable_bytes)); + } + + Ok(()) +} + +pub async fn get_all(conn: &Pool) -> anyhow::Result> { + sqlx::query_as("SELECT * FROM data_transfer_sessions") + .fetch_all(conn) + .await + .map_err(anyhow::Error::from) +} + +pub async fn get_all_payer_burns(conn: &Pool) -> anyhow::Result> { + let pending_payer_burns = get_all(conn) + .await? + .into_iter() + .fold( + HashMap::::new(), + |mut map, session| { + let dc_to_burn = session.dc_to_burn(); + + match map.get_mut(&session.payer) { + Some(pending_payer_burn) => { + pending_payer_burn.total_dcs += dc_to_burn; + pending_payer_burn.sessions.push(session); + } + None => { + map.insert( + session.payer.clone(), + PendingPayerBurn { + payer: session.payer.clone(), + total_dcs: dc_to_burn, + sessions: vec![session], + }, + ); + } + } + + map + }, + ) + .into_values() + .collect(); + + Ok(pending_payer_burns) +} + +pub async fn save( + txn: &mut Transaction<'_, Postgres>, + req: &DataTransferSessionReq, + last_timestamp: DateTime, +) -> anyhow::Result<()> { + let dc_to_burn = bytes_to_dc(req.rewardable_bytes); + + sqlx::query( + r#" + INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $6) + ON CONFLICT (pub_key, payer) DO UPDATE SET + uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, + downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, + rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, + last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + "# + ) + .bind(&req.data_transfer_usage.pub_key) + .bind(&req.data_transfer_usage.payer) + .bind(req.data_transfer_usage.upload_bytes as i64) + .bind(req.data_transfer_usage.download_bytes as i64) + .bind(req.rewardable_bytes as i64) + .bind(last_timestamp) + .execute(txn) + .await?; + + increment_metric(&req.data_transfer_usage.payer, dc_to_burn); + + Ok(()) +} + +pub async fn delete_for_payer( + conn: &Pool, + payer: &PublicKeyBinary, + burnt_dc: u64, +) -> anyhow::Result<()> { + sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") + .bind(payer) + .execute(conn) + .await?; + + decrement_metric(payer, burnt_dc); + + Ok(()) +} + +fn set_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).set(value as f64); +} + +fn increment_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).increment(value as f64); +} + +fn decrement_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).decrement(value as f64); +} + +const BYTES_PER_DC: u64 = 20_000; + +fn bytes_to_dc(bytes: u64) -> u64 { + let bytes = bytes.max(BYTES_PER_DC); + // Integer div/ceil from: https://stackoverflow.com/a/2745086 + (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC +}