Skip to content

Commit

Permalink
Don't process invalid and valid threshold reports at the same time (#786
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Matthew Plant authored Apr 10, 2024
1 parent fc13b23 commit 202ee2a
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 182 deletions.
33 changes: 13 additions & 20 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::{
heartbeats::{
cbrs::HeartbeatDaemon as CellHeartbeatDaemon, wifi::HeartbeatDaemon as WifiHeartbeatDaemon,
},
invalidated_radio_threshold::InvalidatedRadioThresholdIngestor,
radio_threshold::RadioThresholdIngestor,
rewarder::Rewarder,
speedtests::SpeedtestDaemon,
Expand Down Expand Up @@ -304,6 +303,16 @@ impl Cmd {
.create()
.await?;

// invalidated radio threshold reports
let (invalidated_radio_threshold_ingest, invalidated_radio_threshold_ingest_server) =
file_source::continuous_source::<InvalidatedRadioThresholdIngestReport, _>()
.state(pool.clone())
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string())
.create()
.await?;

let (verified_radio_threshold, verified_radio_threshold_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedRadioThresholdIngestReport,
Expand All @@ -315,23 +324,6 @@ impl Cmd {
.create()
.await?;

let radio_threshold_ingestor = RadioThresholdIngestor::new(
pool.clone(),
radio_threshold_ingest,
verified_radio_threshold,
auth_client.clone(),
);

// invalidated radio threshold reports
let (invalidated_radio_threshold_ingest, invalidated_radio_threshold_ingest_server) =
file_source::continuous_source::<InvalidatedRadioThresholdIngestReport, _>()
.state(pool.clone())
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string())
.create()
.await?;

let (verified_invalidated_radio_threshold, verified_invalidated_radio_threshold_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedInvalidatedRadioThresholdIngestReport,
Expand All @@ -346,9 +338,11 @@ impl Cmd {
.create()
.await?;

let invalidated_radio_threshold_ingestor = InvalidatedRadioThresholdIngestor::new(
let radio_threshold_ingestor = RadioThresholdIngestor::new(
pool.clone(),
radio_threshold_ingest,
invalidated_radio_threshold_ingest,
verified_radio_threshold,
verified_invalidated_radio_threshold,
auth_client.clone(),
);
Expand Down Expand Up @@ -380,7 +374,6 @@ impl Cmd {
.add_task(subscriber_location_ingestor)
.add_task(radio_threshold_ingestor)
.add_task(verified_radio_threshold_server)
.add_task(invalidated_radio_threshold_ingestor)
.add_task(verified_invalidated_radio_threshold_server)
.add_task(data_session_ingest_server)
.add_task(price_daemon)
Expand Down
160 changes: 0 additions & 160 deletions mobile_verifier/src/invalidated_radio_threshold.rs

This file was deleted.

1 change: 0 additions & 1 deletion mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod coverage;
pub mod data_session;
pub mod geofence;
pub mod heartbeats;
pub mod invalidated_radio_threshold;
pub mod radio_threshold;
pub mod reward_shares;
pub mod rewarder;
Expand Down
86 changes: 85 additions & 1 deletion mobile_verifier/src/radio_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
mobile_radio_invalidated_threshold::{
InvalidatedRadioThresholdIngestReport, InvalidatedRadioThresholdReportReq,
VerifiedInvalidatedRadioThresholdIngestReport,
},
mobile_radio_threshold::{
RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport,
},
Expand All @@ -11,7 +15,10 @@ use futures_util::TryFutureExt;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
mobile_config::NetworkKeyRole,
poc_mobile::{RadioThresholdReportVerificationStatus, VerifiedRadioThresholdIngestReportV1},
poc_mobile::{
InvalidatedRadioThresholdReportVerificationStatus, RadioThresholdReportVerificationStatus,
VerifiedInvalidatedRadioThresholdIngestReportV1, VerifiedRadioThresholdIngestReportV1,
},
};
use mobile_config::client::authorization_client::AuthorizationVerifier;
use sqlx::{FromRow, PgPool, Postgres, Row, Transaction};
Expand All @@ -22,7 +29,9 @@ use tokio::sync::mpsc::Receiver;
pub struct RadioThresholdIngestor<AV> {
pool: PgPool,
reports_receiver: Receiver<FileInfoStream<RadioThresholdIngestReport>>,
invalid_reports_receiver: Receiver<FileInfoStream<InvalidatedRadioThresholdIngestReport>>,
verified_report_sink: FileSinkClient,
verified_invalid_report_sink: FileSinkClient,
authorization_verifier: AV,
}

Expand Down Expand Up @@ -50,13 +59,17 @@ where
pub fn new(
pool: sqlx::Pool<Postgres>,
reports_receiver: Receiver<FileInfoStream<RadioThresholdIngestReport>>,
invalid_reports_receiver: Receiver<FileInfoStream<InvalidatedRadioThresholdIngestReport>>,
verified_report_sink: FileSinkClient,
verified_invalid_report_sink: FileSinkClient,
authorization_verifier: AV,
) -> Self {
Self {
pool,
reports_receiver,
invalid_reports_receiver,
verified_report_sink,
verified_invalid_report_sink,
authorization_verifier,
}
}
Expand All @@ -70,6 +83,9 @@ where
Some(file) = self.reports_receiver.recv() => {
self.process_file(file).await?;
}
Some(file) = self.invalid_reports_receiver.recv() => {
self.process_invalid_file(file).await?;
}
}
}
tracing::info!("stopping radio threshold ingestor");
Expand Down Expand Up @@ -122,6 +138,47 @@ where
Ok(())
}

async fn process_invalid_file(
&self,
file_info_stream: FileInfoStream<InvalidatedRadioThresholdIngestReport>,
) -> anyhow::Result<()> {
let mut transaction = self.pool.begin().await?;
file_info_stream
.into_stream(&mut transaction)
.await?
.map(anyhow::Ok)
.try_fold(transaction, |mut transaction, ingest_report| async move {
// verify the report
let verified_report_status = self.verify_invalid_report(&ingest_report.report).await;

// if the report is valid then delete the thresholds from the DB
if verified_report_status == InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusValid {
delete(&ingest_report, &mut transaction).await?;
}

// write out paper trail of verified report, valid or invalid
let verified_report_proto: VerifiedInvalidatedRadioThresholdIngestReportV1 =
VerifiedInvalidatedRadioThresholdIngestReport {
report: ingest_report,
status: verified_report_status,
timestamp: Utc::now(),
}
.into();
self.verified_invalid_report_sink
.write(
verified_report_proto,
&[("report_status", verified_report_status.as_str_name())],
)
.await?;
Ok(transaction)
})
.await?
.commit()
.await?;
self.verified_report_sink.commit().await?;
Ok(())
}

async fn verify_report(
&self,
report: &RadioThresholdReportReq,
Expand All @@ -140,6 +197,16 @@ where
Ok(final_validity)
}

async fn verify_invalid_report(
&self,
report: &InvalidatedRadioThresholdReportReq,
) -> InvalidatedRadioThresholdReportVerificationStatus {
if !self.verify_known_carrier_key(&report.carrier_pub_key).await {
return InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusInvalidCarrierKey;
};
InvalidatedRadioThresholdReportVerificationStatus::InvalidatedThresholdReportStatusValid
}

async fn do_report_verifications(
&self,
report: &RadioThresholdReportReq,
Expand Down Expand Up @@ -251,3 +318,20 @@ pub async fn verified_radio_thresholds(
}
Ok(map)
}

pub async fn delete(
ingest_report: &InvalidatedRadioThresholdIngestReport,
db: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
DELETE FROM radio_threshold
WHERE hotspot_pubkey = $1 AND (cbsd_id is null or cbsd_id = $2)
"#,
)
.bind(ingest_report.report.hotspot_pubkey.to_string())
.bind(ingest_report.report.cbsd_id.clone())
.execute(&mut *db)
.await?;
Ok(())
}

0 comments on commit 202ee2a

Please sign in to comment.