Skip to content

Commit

Permalink
Write pending data transfer sessions to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldjeffrey committed Aug 27, 2024
1 parent 504c6bc commit 47d19d4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
21 changes: 21 additions & 0 deletions file_store/src/mobile_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use helium_proto::services::poc_mobile::{
DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionSettlementStatus, InvalidDataTransferIngestReportV1,
PendingDataTransferSessionV1,
};

use serde::Serialize;
Expand Down Expand Up @@ -221,3 +222,23 @@ impl From<DataTransferSessionReq> for DataTransferSessionReqV1 {
}
}
}

impl DataTransferSessionReq {
pub fn to_pending_proto(
self,
received_timestamp: DateTime<Utc>,
) -> PendingDataTransferSessionV1 {
let event_timestamp = self.data_transfer_usage.timestamp();
let received_timestamp = received_timestamp.encode_timestamp_millis();

PendingDataTransferSessionV1 {
pub_key: self.pub_key.into(),
payer: self.data_transfer_usage.payer.into(),
upload_bytes: self.data_transfer_usage.upload_bytes,
download_bytes: self.data_transfer_usage.download_bytes,
rewardable_bytes: self.rewardable_bytes,
event_timestamp,
received_timestamp,
}
}
}
24 changes: 19 additions & 5 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use helium_proto::services::mobile_config::NetworkKeyRole;
use helium_proto::services::poc_mobile::DataTransferSessionSettlementStatus;
use helium_proto::services::poc_mobile::{
invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus,
InvalidDataTransferIngestReportV1,
InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1,
};
use mobile_config::client::{
authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver,
Expand All @@ -23,6 +23,7 @@ pub async fn accumulate_sessions(
authorization_verifier: &impl AuthorizationVerifier,
conn: &mut Transaction<'_, Postgres>,
invalid_data_session_report_sink: &FileSinkClient<InvalidDataTransferIngestReportV1>,
pending_data_session_report_sink: &FileSinkClient<PendingDataTransferSessionV1>,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -62,11 +63,15 @@ pub async fn accumulate_sessions(
DataTransferSessionSettlementStatus::Pending => {
save_pending_data_transfer_session(
conn,
report.report.data_transfer_usage,
&report.report.data_transfer_usage,
report.report.rewardable_bytes,
curr_file_ts,
Utc::now(),
)
.await?;
pending_data_session_report_sink
.write(report.report.to_pending_proto(curr_file_ts), [])
.await?;
}
}
}
Expand Down Expand Up @@ -292,12 +297,16 @@ mod tests {
let (invalid_report_tx, mut invalid_report_rx) = tokio::sync::mpsc::channel(1);
let invalid_data_session_report_sink = FileSinkClient::new(invalid_report_tx, "testing");

let (pending_report_tx, mut pending_report_rx) = tokio::sync::mpsc::channel(1);
let pending_data_session_report_sink = FileSinkClient::new(pending_report_tx, "testing");

let mut conn = pool.begin().await?;
accumulate_sessions(
&MockGatewayInfoResolver,
&AllVerified,
&mut conn,
&invalid_data_session_report_sink,
&pending_data_session_report_sink,
Utc::now(),
reports,
)
Expand All @@ -316,9 +325,14 @@ mod tests {
.unwrap();
assert_eq!(settled_rows.len(), 0);

if invalid_report_rx.try_recv().is_ok() {
panic!("expected invalid report sink to be empty")
}
assert!(
invalid_report_rx.try_recv().is_err(),
"expected invalid report sink to be empty"
);
assert!(
pending_report_rx.try_recv().is_ok(),
"expected pending report sink to have a record"
);

Ok(())
}
Expand Down
21 changes: 19 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings};
use crate::{
accumulate::accumulate_sessions, burner::Burner, event_ids::EventIdPurger, settings::Settings,
};
use anyhow::{bail, Result};
use chrono::{TimeZone, Utc};
use file_store::{
Expand All @@ -11,7 +13,8 @@ use file_store::{
};

use helium_proto::services::{
packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1,
packet_verifier::ValidDataTransferSession,
poc_mobile::{InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1},
};
use mobile_config::client::{
authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver,
Expand All @@ -34,6 +37,7 @@ pub struct Daemon<S, GIR, AV> {
gateway_info_resolver: GIR,
authorization_verifier: AV,
invalid_data_session_report_sink: FileSinkClient<InvalidDataTransferIngestReportV1>,
pending_data_session_report_sink: FileSinkClient<PendingDataTransferSessionV1>,
}

impl<S, GIR, AV> Daemon<S, GIR, AV> {
Expand All @@ -45,6 +49,7 @@ impl<S, GIR, AV> Daemon<S, GIR, AV> {
gateway_info_resolver: GIR,
authorization_verifier: AV,
invalid_data_session_report_sink: FileSinkClient<InvalidDataTransferIngestReportV1>,
pending_data_session_report_sink: FileSinkClient<PendingDataTransferSessionV1>,
) -> Self {
Self {
pool,
Expand All @@ -55,6 +60,7 @@ impl<S, GIR, AV> Daemon<S, GIR, AV> {
gateway_info_resolver,
authorization_verifier,
invalid_data_session_report_sink,
pending_data_session_report_sink,
}
}
}
Expand Down Expand Up @@ -123,6 +129,7 @@ where
&self.authorization_verifier,
&mut transaction,
&self.invalid_data_session_report_sink,
&self.pending_data_session_report_sink,
ts,
reports,
)
Expand Down Expand Up @@ -180,6 +187,14 @@ impl Cmd {
)
.await?;

let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink(
store_base_path,
file_upload.clone(),
None,
env!("CARGO_PKG_NAME"),
)
.await?;

let burner = Burner::new(valid_sessions, solana);

let file_store = FileStore::from_settings(&settings.ingest).await?;
Expand Down Expand Up @@ -207,6 +222,7 @@ impl Cmd {
gateway_client,
auth_client,
invalid_sessions,
pending_sessions,
);

let event_id_purger = EventIdPurger::from_settings(pool, settings);
Expand All @@ -215,6 +231,7 @@ impl Cmd {
.add_task(file_upload_server)
.add_task(valid_sessions_server)
.add_task(invalid_sessions_server)
.add_task(pending_sessions_server)
.add_task(reports_server)
.add_task(event_id_purger)
.add_task(daemon)
Expand Down

0 comments on commit 47d19d4

Please sign in to comment.