From 6d5fc54f213c60e7f97c8065db571019479eff2e Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 23 Apr 2024 08:44:53 -0400 Subject: [PATCH] Refactor mobile-verifier start sequence to separate daemons --- mobile_verifier/src/cli/server.rs | 348 +++++---------------- mobile_verifier/src/coverage.rs | 79 ++++- mobile_verifier/src/data_session.rs | 37 ++- mobile_verifier/src/heartbeats/cbrs.rs | 56 +++- mobile_verifier/src/heartbeats/wifi.rs | 55 +++- mobile_verifier/src/radio_threshold.rs | 85 ++++- mobile_verifier/src/rewarder.rs | 67 +++- mobile_verifier/src/settings.rs | 4 + mobile_verifier/src/speedtests.rs | 60 +++- mobile_verifier/src/subscriber_location.rs | 61 +++- 10 files changed, 531 insertions(+), 321 deletions(-) diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 8c9a9c00a..3fa315506 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,11 +1,8 @@ use crate::{ - boosting_oracles, coverage::CoverageDaemon, data_session::DataSessionIngestor, geofence::Geofence, - heartbeats::{ - cbrs::HeartbeatDaemon as CellHeartbeatDaemon, wifi::HeartbeatDaemon as WifiHeartbeatDaemon, - }, + heartbeats::{cbrs::CbrsHeartbeatDaemon, wifi::WifiHeartbeatDaemon}, radio_threshold::RadioThresholdIngestor, rewarder::Rewarder, speedtests::SpeedtestDaemon, @@ -15,20 +12,15 @@ use crate::{ use anyhow::Result; use chrono::Duration; use file_store::{ - coverage::CoverageObjectIngestReport, file_info_poller::LookbackBehavior, file_sink, - file_source, file_upload, heartbeat::CbrsHeartbeatIngestReport, - mobile_radio_invalidated_threshold::InvalidatedRadioThresholdIngestReport, - mobile_radio_threshold::RadioThresholdIngestReport, - mobile_subscriber::SubscriberLocationIngestReport, mobile_transfer::ValidDataTransferSession, - speedtest::CellSpeedtestIngestReport, wifi_heartbeat::WifiHeartbeatIngestReport, FileStore, - FileType, + file_sink, + file_upload::{self}, + FileStore, FileType, }; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, CarrierServiceClient, GatewayClient, }; -use price::PriceTracker; use task_manager::TaskManager; #[derive(Debug, clap::Args)] @@ -49,7 +41,6 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); let report_ingest = FileStore::from_settings(&settings.ingest).await?; - let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?; // mobile config clients let gateway_client = GatewayClient::from_settings(&settings.config_client)?; @@ -58,30 +49,6 @@ impl Cmd { let carrier_client = CarrierServiceClient::from_settings(&settings.config_client)?; let hex_boosting_client = HexBoostingClient::from_settings(&settings.config_client)?; - // price tracker - let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; - - // CBRS Heartbeats - let (cbrs_heartbeats, cbrs_heartbeats_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::CbrsHeartbeatIngestReport.to_string()) - .queue_size(1) - .create() - .await?; - - // Wifi Heartbeats - let (wifi_heartbeats, wifi_heartbeats_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::WifiHeartbeatIngestReport.to_string()) - .create() - .await?; - let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new( FileType::ValidatedHeartbeat, store_base_path, @@ -105,23 +72,22 @@ impl Cmd { .create() .await?; + let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new( + FileType::SpeedtestAvg, + store_base_path, + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), + ) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + let usa_region_paths = settings.usa_region_paths()?; tracing::info!(?usa_region_paths, "usa_geofence_regions"); let usa_geofence = Geofence::new(usa_region_paths, settings.usa_fencing_resolution()?)?; - let cbrs_heartbeat_daemon = CellHeartbeatDaemon::new( - pool.clone(), - gateway_client.clone(), - cbrs_heartbeats, - settings.modeled_coverage_start(), - settings.max_asserted_distance_deviation, - settings.max_distance_from_coverage, - valid_heartbeats.clone(), - seniority_updates.clone(), - usa_geofence.clone(), - ); - let usa_and_mexico_region_paths = settings.usa_and_mexico_region_paths()?; tracing::info!( ?usa_and_mexico_region_paths, @@ -133,264 +99,92 @@ impl Cmd { settings.usa_and_mexico_fencing_resolution()?, )?; - let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new( + let mut task_manager = TaskManager::new(); + task_manager.add(file_upload_server); + task_manager.add(valid_heartbeats_server); + task_manager.add(seniority_updates_server); + task_manager.add(speedtests_avg_server); + + CbrsHeartbeatDaemon::setup( + &mut task_manager, + pool.clone(), + settings, + report_ingest.clone(), + gateway_client.clone(), + valid_heartbeats.clone(), + seniority_updates.clone(), + usa_geofence.clone(), + ) + .await?; + + WifiHeartbeatDaemon::setup( + &mut task_manager, pool.clone(), + settings, + report_ingest.clone(), gateway_client.clone(), - wifi_heartbeats, - settings.modeled_coverage_start(), - settings.max_asserted_distance_deviation, - settings.max_distance_from_coverage, valid_heartbeats, seniority_updates, usa_and_mexico_geofence, - ); - - // Speedtests - let (speedtests, speedtests_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::CellSpeedtestIngestReport.to_string()) - .create() - .await?; - - let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new( - FileType::SpeedtestAvg, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), ) - .auto_commit(false) - .roll_time(Duration::minutes(15)) - .create() .await?; - let (speedtests_validity, speedtests_validity_server) = file_sink::FileSinkBuilder::new( - FileType::VerifiedSpeedtest, - store_base_path, + SpeedtestDaemon::setup( + &mut task_manager, + pool.clone(), + settings, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"), + report_ingest.clone(), + speedtests_avg.clone(), + gateway_client, ) - .auto_commit(false) - .roll_time(Duration::minutes(15)) - .create() .await?; - let speedtest_daemon = SpeedtestDaemon::new( + CoverageDaemon::setup( + &mut task_manager, pool.clone(), - gateway_client, - speedtests, - speedtests_avg.clone(), - speedtests_validity, - ); - - // Coverage objects - let (coverage_objs, coverage_objs_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::CoverageObjectIngestReport.to_string()) - .create() - .await?; - - let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( - FileType::CoverageObject, - store_base_path, + settings, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), + report_ingest.clone(), + auth_client.clone(), + usa_geofence, ) - .auto_commit(false) - .roll_time(Duration::minutes(15)) - .create() .await?; - // Oracle boosting reports - let (oracle_boosting_reports, oracle_boosting_reports_server) = - file_sink::FileSinkBuilder::new( - FileType::OracleBoostingReport, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), - ) - .auto_commit(false) - .roll_time(Duration::minutes(15)) - .create() - .await?; - - let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, usa_geofence)?; - let coverage_daemon = CoverageDaemon::new( + SubscriberLocationIngestor::setup( + &mut task_manager, pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), auth_client.clone(), - hex_boost_data, - coverage_objs, - valid_coverage_objs, - oracle_boosting_reports, + entity_client, ) .await?; - // Mobile rewards - let reward_period_hours = settings.rewards; - let (mobile_rewards, mobile_rewards_server) = file_sink::FileSinkBuilder::new( - FileType::MobileRewardShare, - store_base_path, + RadioThresholdIngestor::setup( + &mut task_manager, + pool.clone(), + settings, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"), + report_ingest, + auth_client, ) - .auto_commit(false) - .create() .await?; - let (reward_manifests, reward_manifests_server) = file_sink::FileSinkBuilder::new( - FileType::RewardManifest, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"), - ) - .auto_commit(false) - .create() - .await?; + DataSessionIngestor::setup(&mut task_manager, pool.clone(), settings).await?; - let rewarder = Rewarder::new( - pool.clone(), + Rewarder::setup( + &mut task_manager, + pool, + settings, + file_upload, carrier_client, hex_boosting_client, - Duration::hours(reward_period_hours), - Duration::minutes(settings.reward_offset_minutes), - mobile_rewards, - reward_manifests, - price_tracker, speedtests_avg, - ); - - // subscriber location - let (subscriber_location_ingest, subscriber_location_ingest_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::SubscriberLocationIngestReport.to_string()) - .create() - .await?; - - let (verified_subscriber_location, verified_subscriber_location_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedSubscriberLocationIngestReport, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"), - ) - .auto_commit(false) - .create() - .await?; - - let subscriber_location_ingestor = SubscriberLocationIngestor::new( - pool.clone(), - auth_client.clone(), - entity_client, - subscriber_location_ingest, - verified_subscriber_location, - ); - - // radio threshold reports - let (radio_threshold_ingest, radio_threshold_ingest_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::RadioThresholdIngestReport.to_string()) - .create() - .await?; - - // invalidated radio threshold reports - let (invalidated_radio_threshold_ingest, invalidated_radio_threshold_ingest_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(report_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string()) - .create() - .await?; - - let (verified_radio_threshold, verified_radio_threshold_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedRadioThresholdIngestReport, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_verified_radio_threshold"), - ) - .auto_commit(false) - .create() - .await?; - - let (verified_invalidated_radio_threshold, verified_invalidated_radio_threshold_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedInvalidatedRadioThresholdIngestReport, - store_base_path, - file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_verified_invalidated_radio_threshold" - ), - ) - .auto_commit(false) - .create() - .await?; - - let radio_threshold_ingestor = RadioThresholdIngestor::new( - pool.clone(), - radio_threshold_ingest, - invalidated_radio_threshold_ingest, - verified_radio_threshold, - verified_invalidated_radio_threshold, - auth_client.clone(), - ); - - // data transfers - let (data_session_ingest, data_session_ingest_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(data_transfer_ingest.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .prefix(FileType::ValidDataTransferSession.to_string()) - .create() - .await?; - - let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest); + ) + .await?; - TaskManager::builder() - .add_task(file_upload_server) - .add_task(cbrs_heartbeats_server) - .add_task(wifi_heartbeats_server) - .add_task(valid_heartbeats_server) - .add_task(speedtests_avg_server) - .add_task(speedtests_validity_server) - .add_task(valid_coverage_objs_server) - .add_task(seniority_updates_server) - .add_task(mobile_rewards_server) - .add_task(reward_manifests_server) - .add_task(verified_subscriber_location_server) - .add_task(subscriber_location_ingestor) - .add_task(radio_threshold_ingestor) - .add_task(verified_radio_threshold_server) - .add_task(verified_invalidated_radio_threshold_server) - .add_task(data_session_ingest_server) - .add_task(price_daemon) - .add_task(cbrs_heartbeat_daemon) - .add_task(wifi_heartbeat_daemon) - .add_task(speedtests_server) - .add_task(coverage_objs_server) - .add_task(oracle_boosting_reports_server) - .add_task(speedtest_daemon) - .add_task(coverage_daemon) - .add_task(rewarder) - .add_task(subscriber_location_ingest_server) - .add_task(radio_threshold_ingest_server) - .add_task(invalidated_radio_threshold_ingest_server) - .add_task(data_session_ingestor) - .build() - .start() - .await + task_manager.start().await } } diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 8a47caa12..32fdb83e3 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -1,16 +1,18 @@ use crate::{ - boosting_oracles::{ - assignment::footfall_and_urbanization_multiplier, Assignment, HexAssignment, HexBoostData, - }, + boosting_oracles::{self, assignment::HexAssignments, BoostedHexAssignments, HexBoostData}, + geofence::Geofence, heartbeats::{HbType, KeyType, OwnedKeyType}, - IsAuthorized, + IsAuthorized, Settings, }; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use file_store::{ coverage::{self, CoverageObjectIngestReport}, - file_info_poller::FileInfoStream, - file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::{self, FileSinkClient}, + file_source, + file_upload::FileUpload, traits::TimestampEncode, + FileStore, FileType, }; use futures::{ stream::{BoxStream, Stream, StreamExt}, @@ -41,7 +43,7 @@ use std::{ sync::Arc, time::Instant, }; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; use uuid::Uuid; @@ -85,6 +87,67 @@ where Urban: HexAssignment, Foot: HexAssignment, { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + auth_client: AuthorizationClient, + geofence: Geofence, + ) -> anyhow::Result<()> { + let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( + FileType::CoverageObject, + settings.store_base_path(), + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), + ) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + + // Oracle boosting reports + let (oracle_boosting_reports, oracle_boosting_reports_server) = + file_sink::FileSinkBuilder::new( + FileType::OracleBoostingReport, + settings.store_base_path(), + file_upload, + concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), + ) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + + let (coverage_objs, coverage_objs_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::CoverageObjectIngestReport.to_string()) + .create() + .await?; + + let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?; + let coverage_daemon = CoverageDaemon::new( + pool, + auth_client, + hex_boost_data, + coverage_objs, + valid_coverage_objs, + oracle_boosting_reports, + ) + .await?; + + task_manager.add(valid_coverage_objs_server); + task_manager.add(oracle_boosting_reports_server); + task_manager.add(coverage_objs_server); + task_manager.add(coverage_daemon); + + Ok(()) + } + pub async fn new( pool: PgPool, auth_client: AuthorizationClient, diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 0c2f3f552..10e88e9fd 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -1,5 +1,10 @@ use chrono::{DateTime, Utc}; -use file_store::{file_info_poller::FileInfoStream, mobile_transfer::ValidDataTransferSession}; +use file_store::{ + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_source, + mobile_transfer::ValidDataTransferSession, + FileStore, FileType, +}; use futures::{ stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, @@ -7,11 +12,13 @@ use futures::{ use helium_crypto::PublicKeyBinary; use helium_proto::ServiceProvider; use rust_decimal::Decimal; -use sqlx::{PgPool, Postgres, Row, Transaction}; +use sqlx::{PgPool, Pool, Postgres, Row, Transaction}; use std::{collections::HashMap, ops::Range, time::Instant}; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; +use crate::Settings; + pub struct DataSessionIngestor { pub receiver: Receiver>, pub pool: PgPool, @@ -32,6 +39,30 @@ pub struct ServiceProviderDataSession { pub type HotspotMap = HashMap; impl DataSessionIngestor { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + ) -> anyhow::Result<()> { + let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?; + // data transfers + let (data_session_ingest, data_session_ingest_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(data_transfer_ingest) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::ValidDataTransferSession.to_string()) + .create() + .await?; + + let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest); + + task_manager.add(data_session_ingest_server); + task_manager.add(data_session_ingestor); + + Ok(()) + } + pub fn new( pool: sqlx::Pool, receiver: Receiver>, diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 5ca03455d..e7629ffbc 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -3,24 +3,28 @@ use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, heartbeats::LocationCache, - GatewayResolver, + GatewayResolver, Settings, }; use chrono::{DateTime, Duration, Utc}; use file_store::{ - file_info_poller::FileInfoStream, file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, heartbeat::CbrsHeartbeatIngestReport, + FileStore, FileType, }; use futures::{stream::StreamExt, TryFutureExt}; use retainer::Cache; +use sqlx::{Pool, Postgres}; use std::{ sync::Arc, time::{self, Instant}, }; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; -pub struct HeartbeatDaemon { +pub struct CbrsHeartbeatDaemon { pool: sqlx::Pool, gateway_info_resolver: GIR, heartbeats: Receiver>, @@ -32,11 +36,51 @@ pub struct HeartbeatDaemon { geofence: GFV, } -impl HeartbeatDaemon +impl CbrsHeartbeatDaemon where GIR: GatewayResolver, GFV: GeofenceValidator, { + #[allow(clippy::too_many_arguments)] + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_store: FileStore, + gateway_resolver: GIR, + valid_heartbeats: FileSinkClient, + seniority_updates: FileSinkClient, + geofence: GFV, + ) -> anyhow::Result<()> { + // CBRS Heartbeats + let (cbrs_heartbeats, cbrs_heartbeats_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::CbrsHeartbeatIngestReport.to_string()) + .queue_size(1) + .create() + .await?; + + let cbrs_heartbeat_daemon = CbrsHeartbeatDaemon::new( + pool, + gateway_resolver, + cbrs_heartbeats, + settings.modeled_coverage_start(), + settings.max_asserted_distance_deviation, + settings.max_distance_from_coverage, + valid_heartbeats, + seniority_updates, + geofence, + ); + + task_manager.add(cbrs_heartbeats_server); + task_manager.add(cbrs_heartbeat_daemon); + + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn new( pool: sqlx::Pool, @@ -151,7 +195,7 @@ where } } -impl ManagedTask for HeartbeatDaemon +impl ManagedTask for CbrsHeartbeatDaemon where GIR: GatewayResolver, GFV: GeofenceValidator, diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 65749755c..f621f7bf9 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -3,23 +3,27 @@ use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, heartbeats::LocationCache, - GatewayResolver, + GatewayResolver, Settings, }; use chrono::{DateTime, Duration, Utc}; use file_store::{ - file_info_poller::FileInfoStream, file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, wifi_heartbeat::WifiHeartbeatIngestReport, + FileStore, FileType, }; use futures::{stream::StreamExt, TryFutureExt}; use retainer::Cache; +use sqlx::{Pool, Postgres}; use std::{ sync::Arc, time::{self, Instant}, }; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; -pub struct HeartbeatDaemon { +pub struct WifiHeartbeatDaemon { pool: sqlx::Pool, gateway_info_resolver: GIR, heartbeats: Receiver>, @@ -31,11 +35,50 @@ pub struct HeartbeatDaemon { geofence: GFV, } -impl HeartbeatDaemon +impl WifiHeartbeatDaemon where GIR: GatewayResolver, GFV: GeofenceValidator, { + #[allow(clippy::too_many_arguments)] + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_store: FileStore, + gateway_resolver: GIR, + valid_heartbeats: FileSinkClient, + seniority_updates: FileSinkClient, + geofence: GFV, + ) -> anyhow::Result<()> { + // Wifi Heartbeats + let (wifi_heartbeats, wifi_heartbeats_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::WifiHeartbeatIngestReport.to_string()) + .create() + .await?; + + let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new( + pool, + gateway_resolver, + wifi_heartbeats, + settings.modeled_coverage_start(), + settings.max_asserted_distance_deviation, + settings.max_distance_from_coverage, + valid_heartbeats, + seniority_updates, + geofence, + ); + + task_manager.add(wifi_heartbeats_server); + task_manager.add(wifi_heartbeat_daemon); + + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn new( pool: sqlx::Pool, @@ -143,7 +186,7 @@ where } } -impl ManagedTask for HeartbeatDaemon +impl ManagedTask for WifiHeartbeatDaemon where GIR: GatewayResolver, GFV: GeofenceValidator, diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 732bf92c7..7cf6bee5a 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -1,7 +1,9 @@ use chrono::{DateTime, Utc}; use file_store::{ - file_info_poller::FileInfoStream, - file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::{self, FileSinkClient}, + file_source, + file_upload::FileUpload, mobile_radio_invalidated_threshold::{ InvalidatedRadioThresholdIngestReport, InvalidatedRadioThresholdReportReq, VerifiedInvalidatedRadioThresholdIngestReport, @@ -9,6 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, + FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; use futures_util::TryFutureExt; @@ -21,11 +24,13 @@ use helium_proto::services::{ }, }; use mobile_config::client::authorization_client::AuthorizationVerifier; -use sqlx::{FromRow, PgPool, Postgres, Row, Transaction}; +use sqlx::{FromRow, PgPool, Pool, Postgres, Row, Transaction}; use std::{collections::HashSet, ops::Range}; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; +use crate::Settings; + pub struct RadioThresholdIngestor { pool: PgPool, reports_receiver: Receiver>, @@ -54,8 +59,78 @@ where impl RadioThresholdIngestor where - AV: AuthorizationVerifier, + AV: AuthorizationVerifier + Send + Sync + 'static, { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + authorization_verifier: AV, + ) -> anyhow::Result<()> { + let (verified_radio_threshold, verified_radio_threshold_server) = + file_sink::FileSinkBuilder::new( + FileType::VerifiedRadioThresholdIngestReport, + settings.store_base_path(), + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_verified_radio_threshold"), + ) + .auto_commit(false) + .create() + .await?; + + let (verified_invalidated_radio_threshold, verified_invalidated_radio_threshold_server) = + file_sink::FileSinkBuilder::new( + FileType::VerifiedInvalidatedRadioThresholdIngestReport, + settings.store_base_path(), + file_upload.clone(), + concat!( + env!("CARGO_PKG_NAME"), + "_verified_invalidated_radio_threshold" + ), + ) + .auto_commit(false) + .create() + .await?; + + let (radio_threshold_ingest, radio_threshold_ingest_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::RadioThresholdIngestReport.to_string()) + .create() + .await?; + + // invalidated radio threshold reports + let (invalidated_radio_threshold_ingest, invalidated_radio_threshold_ingest_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::InvalidatedRadioThresholdIngestReport.to_string()) + .create() + .await?; + + let radio_threshold_ingestor = RadioThresholdIngestor::new( + pool.clone(), + radio_threshold_ingest, + invalidated_radio_threshold_ingest, + verified_radio_threshold, + verified_invalidated_radio_threshold, + authorization_verifier, + ); + + task_manager.add(verified_radio_threshold_server); + task_manager.add(verified_invalidated_radio_threshold_server); + task_manager.add(radio_threshold_ingest_server); + task_manager.add(invalidated_radio_threshold_ingest_server); + task_manager.add(radio_threshold_ingestor); + + Ok(()) + } + pub fn new( pool: sqlx::Pool, reports_receiver: Receiver>, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 7fe98512b..41b9f794b 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -5,12 +5,17 @@ use crate::{ reward_shares::{self, CoveragePoints, MapperShares, ServiceProviderShares, TransferRewards}, speedtests, speedtests_average::SpeedtestAverages, - subscriber_location, telemetry, + subscriber_location, telemetry, Settings, }; use anyhow::bail; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; -use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use file_store::{ + file_sink::{self, FileSinkClient}, + file_upload::FileUpload, + traits::TimestampEncode, + FileType, +}; use futures_util::TryFutureExt; use helium_proto::services::{ poc_mobile as proto, poc_mobile::mobile_reward_share::Reward as ProtoReward, @@ -30,7 +35,7 @@ use rust_decimal::{prelude::*, Decimal}; use rust_decimal_macros::dec; use sqlx::{PgExecutor, Pool, Postgres}; use std::ops::Range; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::time::sleep; const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; @@ -49,9 +54,61 @@ pub struct Rewarder { impl Rewarder where - A: CarrierServiceVerifier, - B: HexBoostingInfoResolver, + A: CarrierServiceVerifier + Send + Sync + 'static, + B: HexBoostingInfoResolver + Send + Sync + 'static, { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + carrier_service_verifier: A, + hex_boosting_info_resolver: B, + speedtests_avg: FileSinkClient, + ) -> anyhow::Result<()> { + let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; + + let reward_period_hours = settings.rewards; + let (mobile_rewards, mobile_rewards_server) = file_sink::FileSinkBuilder::new( + FileType::MobileRewardShare, + settings.store_base_path(), + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"), + ) + .auto_commit(false) + .create() + .await?; + + let (reward_manifests, reward_manifests_server) = file_sink::FileSinkBuilder::new( + FileType::RewardManifest, + settings.store_base_path(), + file_upload, + concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"), + ) + .auto_commit(false) + .create() + .await?; + + let rewarder = Rewarder::new( + pool.clone(), + carrier_service_verifier, + hex_boosting_info_resolver, + Duration::hours(reward_period_hours), + Duration::minutes(settings.reward_offset_minutes), + mobile_rewards, + reward_manifests, + price_tracker, + speedtests_avg, + ); + + task_manager.add(price_daemon); + task_manager.add(mobile_rewards_server); + task_manager.add(reward_manifests_server); + task_manager.add(rewarder); + + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn new( pool: Pool, diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index 7f0698fef..fa98a8f48 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -139,4 +139,8 @@ impl Settings { self.usa_and_mexico_fencing_resolution, )?) } + + pub fn store_base_path(&self) -> &std::path::Path { + std::path::Path::new(&self.cache) + } } diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index f4bdb1355..517bbb511 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -1,9 +1,15 @@ -use crate::speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE}; +use crate::{ + speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE}, + Settings, +}; use chrono::{DateTime, Duration, Utc}; use file_store::{ - file_info_poller::FileInfoStream, - file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::{self, FileSinkClient}, + file_source, + file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, + FileStore, FileType, }; use futures::{ stream::{StreamExt, TryStreamExt}, @@ -15,9 +21,9 @@ use helium_proto::services::poc_mobile::{ VerifiedSpeedtest as VerifiedSpeedtestProto, }; use mobile_config::client::gateway_client::GatewayInfoResolver; -use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction}; +use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction}; use std::{collections::HashMap, time::Instant}; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; @@ -56,6 +62,50 @@ impl SpeedtestDaemon where GIR: GatewayInfoResolver, { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + speedtests_avg: FileSinkClient, + gateway_resolver: GIR, + ) -> anyhow::Result<()> { + let (speedtests_validity, speedtests_validity_server) = file_sink::FileSinkBuilder::new( + FileType::VerifiedSpeedtest, + settings.store_base_path(), + file_upload, + concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"), + ) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + + let (speedtests, speedtests_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::CellSpeedtestIngestReport.to_string()) + .create() + .await?; + + let speedtest_daemon = SpeedtestDaemon::new( + pool.clone(), + gateway_resolver, + speedtests, + speedtests_avg, + speedtests_validity, + ); + + task_manager.add(speedtests_validity_server); + task_manager.add(speedtests_server); + task_manager.add(speedtest_daemon); + + Ok(()) + } + pub fn new( pool: sqlx::Pool, gateway_info_resolver: GIR, diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 722af3424..0c222d27d 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -1,11 +1,14 @@ use chrono::{DateTime, Duration, Utc}; use file_store::{ - file_info_poller::FileInfoStream, - file_sink::FileSinkClient, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::{self, FileSinkClient}, + file_source, + file_upload::FileUpload, mobile_subscriber::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, + FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; use futures_util::TryFutureExt; @@ -17,11 +20,13 @@ use helium_proto::services::poc_mobile::{ use mobile_config::client::{ authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, }; -use sqlx::{PgPool, Postgres, Transaction}; +use sqlx::{PgPool, Pool, Postgres, Transaction}; use std::{ops::Range, time::Instant}; -use task_manager::ManagedTask; +use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; +use crate::Settings; + const SUBSCRIBER_REWARD_PERIOD_IN_DAYS: i64 = 1; pub type SubscriberValidatedLocations = Vec>; @@ -36,9 +41,53 @@ pub struct SubscriberLocationIngestor { impl SubscriberLocationIngestor where - AV: AuthorizationVerifier, - EV: EntityVerifier, + AV: AuthorizationVerifier + Send + Sync + 'static, + EV: EntityVerifier + Send + Sync + 'static, { + pub async fn setup( + task_manager: &mut TaskManager, + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + authorization_verifier: AV, + entity_verifier: EV, + ) -> anyhow::Result<()> { + let (verified_subscriber_location, verified_subscriber_location_server) = + file_sink::FileSinkBuilder::new( + FileType::VerifiedSubscriberLocationIngestReport, + settings.store_base_path(), + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"), + ) + .auto_commit(false) + .create() + .await?; + + let (subscriber_location_ingest, subscriber_location_ingest_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .prefix(FileType::SubscriberLocationIngestReport.to_string()) + .create() + .await?; + + let subscriber_location_ingestor = SubscriberLocationIngestor::new( + pool, + authorization_verifier, + entity_verifier, + subscriber_location_ingest, + verified_subscriber_location, + ); + + task_manager.add(verified_subscriber_location_server); + task_manager.add(subscriber_location_ingest_server); + task_manager.add(subscriber_location_ingestor); + + Ok(()) + } + pub fn new( pool: sqlx::Pool, authorization_verifier: AV,