diff --git a/file_store/src/mobile_session.rs b/file_store/src/mobile_session.rs index ee8db39b6..b3769dcde 100644 --- a/file_store/src/mobile_session.rs +++ b/file_store/src/mobile_session.rs @@ -10,6 +10,7 @@ use helium_proto::services::poc_mobile::{ DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionSettlementStatus, InvalidDataTransferIngestReportV1, + PendingDataTransferSessionV1, }; use serde::Serialize; @@ -221,3 +222,23 @@ impl From for DataTransferSessionReqV1 { } } } + +impl DataTransferSessionReq { + pub fn to_pending_proto( + self, + received_timestamp: DateTime, + ) -> PendingDataTransferSessionV1 { + let event_timestamp = self.data_transfer_usage.timestamp(); + let received_timestamp = received_timestamp.encode_timestamp_millis(); + + PendingDataTransferSessionV1 { + pub_key: self.pub_key.into(), + payer: self.data_transfer_usage.payer.into(), + upload_bytes: self.data_transfer_usage.upload_bytes, + download_bytes: self.data_transfer_usage.download_bytes, + rewardable_bytes: self.rewardable_bytes, + event_timestamp, + received_timestamp, + } + } +} diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 1b6854a54..db944e4a0 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -9,7 +9,7 @@ use helium_proto::services::mobile_config::NetworkKeyRole; use helium_proto::services::poc_mobile::DataTransferSessionSettlementStatus; use helium_proto::services::poc_mobile::{ invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, - InvalidDataTransferIngestReportV1, + InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1, }; use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, @@ -23,6 +23,7 @@ pub async fn accumulate_sessions( authorization_verifier: &impl AuthorizationVerifier, conn: &mut Transaction<'_, Postgres>, invalid_data_session_report_sink: &FileSinkClient, + pending_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, ) -> anyhow::Result<()> { @@ -62,11 +63,15 @@ pub async fn accumulate_sessions( DataTransferSessionSettlementStatus::Pending => { save_pending_data_transfer_session( conn, - report.report.data_transfer_usage, + &report.report.data_transfer_usage, report.report.rewardable_bytes, curr_file_ts, + Utc::now(), ) .await?; + pending_data_session_report_sink + .write(report.report.to_pending_proto(curr_file_ts), []) + .await?; } } } @@ -292,12 +297,16 @@ mod tests { let (invalid_report_tx, mut invalid_report_rx) = tokio::sync::mpsc::channel(1); let invalid_data_session_report_sink = FileSinkClient::new(invalid_report_tx, "testing"); + let (pending_report_tx, mut pending_report_rx) = tokio::sync::mpsc::channel(1); + let pending_data_session_report_sink = FileSinkClient::new(pending_report_tx, "testing"); + let mut conn = pool.begin().await?; accumulate_sessions( &MockGatewayInfoResolver, &AllVerified, &mut conn, &invalid_data_session_report_sink, + &pending_data_session_report_sink, Utc::now(), reports, ) @@ -316,9 +325,14 @@ mod tests { .unwrap(); assert_eq!(settled_rows.len(), 0); - if invalid_report_rx.try_recv().is_ok() { - panic!("expected invalid report sink to be empty") - } + assert!( + invalid_report_rx.try_recv().is_err(), + "expected invalid report sink to be empty" + ); + assert!( + pending_report_rx.try_recv().is_ok(), + "expected pending report sink to have a record" + ); Ok(()) } diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index b07fa36c3..eb45aa92e 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,4 +1,6 @@ -use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; +use crate::{ + accumulate::accumulate_sessions, burner::Burner, event_ids::EventIdPurger, settings::Settings, +}; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -11,7 +13,8 @@ use file_store::{ }; use helium_proto::services::{ - packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1, + packet_verifier::ValidDataTransferSession, + poc_mobile::{InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1}, }; use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, @@ -34,6 +37,7 @@ pub struct Daemon { gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, + pending_data_session_report_sink: FileSinkClient, } impl Daemon { @@ -45,6 +49,7 @@ impl Daemon { gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, + pending_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -55,6 +60,7 @@ impl Daemon { gateway_info_resolver, authorization_verifier, invalid_data_session_report_sink, + pending_data_session_report_sink, } } } @@ -123,6 +129,7 @@ where &self.authorization_verifier, &mut transaction, &self.invalid_data_session_report_sink, + &self.pending_data_session_report_sink, ts, reports, ) @@ -180,6 +187,14 @@ impl Cmd { ) .await?; + let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink( + store_base_path, + file_upload.clone(), + None, + env!("CARGO_PKG_NAME"), + ) + .await?; + let burner = Burner::new(valid_sessions, solana); let file_store = FileStore::from_settings(&settings.ingest).await?; @@ -207,6 +222,7 @@ impl Cmd { gateway_client, auth_client, invalid_sessions, + pending_sessions, ); let event_id_purger = EventIdPurger::from_settings(pool, settings); @@ -215,6 +231,7 @@ impl Cmd { .add_task(file_upload_server) .add_task(valid_sessions_server) .add_task(invalid_sessions_server) + .add_task(pending_sessions_server) .add_task(reports_server) .add_task(event_id_purger) .add_task(daemon)