diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 3fa315506..9bd053313 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,4 +1,5 @@ use crate::{ + boosting_oracles, coverage::CoverageDaemon, data_session::DataSessionIngestor, geofence::Geofence, @@ -99,92 +100,92 @@ impl Cmd { settings.usa_and_mexico_fencing_resolution()?, )?; - let mut task_manager = TaskManager::new(); - task_manager.add(file_upload_server); - task_manager.add(valid_heartbeats_server); - task_manager.add(seniority_updates_server); - task_manager.add(speedtests_avg_server); - - CbrsHeartbeatDaemon::setup( - &mut task_manager, - pool.clone(), - settings, - report_ingest.clone(), - gateway_client.clone(), - valid_heartbeats.clone(), - seniority_updates.clone(), - usa_geofence.clone(), - ) - .await?; - - WifiHeartbeatDaemon::setup( - &mut task_manager, - pool.clone(), - settings, - report_ingest.clone(), - gateway_client.clone(), - valid_heartbeats, - seniority_updates, - usa_and_mexico_geofence, - ) - .await?; - - SpeedtestDaemon::setup( - &mut task_manager, - pool.clone(), - settings, - file_upload.clone(), - report_ingest.clone(), - speedtests_avg.clone(), - gateway_client, - ) - .await?; - - CoverageDaemon::setup( - &mut task_manager, - pool.clone(), - settings, - file_upload.clone(), - report_ingest.clone(), - auth_client.clone(), - usa_geofence, - ) - .await?; - - SubscriberLocationIngestor::setup( - &mut task_manager, - pool.clone(), - settings, - file_upload.clone(), - report_ingest.clone(), - auth_client.clone(), - entity_client, - ) - .await?; - - RadioThresholdIngestor::setup( - &mut task_manager, - pool.clone(), - settings, - file_upload.clone(), - report_ingest, - auth_client, - ) - .await?; - - DataSessionIngestor::setup(&mut task_manager, pool.clone(), settings).await?; - - Rewarder::setup( - &mut task_manager, - pool, - settings, - file_upload, - carrier_client, - hex_boosting_client, - speedtests_avg, - ) - .await?; - - task_manager.start().await + TaskManager::builder() + .add_task(file_upload_server) + .add_task(valid_heartbeats_server) + .add_task(seniority_updates_server) + .add_task(speedtests_avg_server) + .add_task( + CbrsHeartbeatDaemon::create_managed_task( + pool.clone(), + settings, + report_ingest.clone(), + gateway_client.clone(), + valid_heartbeats.clone(), + seniority_updates.clone(), + usa_geofence.clone(), + ) + .await?, + ) + .add_task( + WifiHeartbeatDaemon::create_managed_task( + pool.clone(), + settings, + report_ingest.clone(), + gateway_client.clone(), + valid_heartbeats, + seniority_updates, + usa_and_mexico_geofence, + ) + .await?, + ) + .add_task( + SpeedtestDaemon::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + speedtests_avg.clone(), + gateway_client, + ) + .await?, + ) + .add_task( + CoverageDaemon::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + auth_client.clone(), + boosting_oracles::make_hex_boost_data(settings, usa_geofence)?, + ) + .await?, + ) + .add_task( + SubscriberLocationIngestor::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + auth_client.clone(), + entity_client, + ) + .await?, + ) + .add_task( + RadioThresholdIngestor::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest, + auth_client, + ) + .await?, + ) + .add_task(DataSessionIngestor::create_managed_task(pool.clone(), settings).await?) + .add_task( + Rewarder::create_managed_task( + pool, + settings, + file_upload, + carrier_client, + hex_boosting_client, + speedtests_avg, + ) + .await?, + ) + .build() + .start() + .await } } diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 32fdb83e3..b3880f06a 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -1,6 +1,7 @@ use crate::{ - boosting_oracles::{self, assignment::HexAssignments, BoostedHexAssignments, HexBoostData}, - geofence::Geofence, + boosting_oracles::{ + assignment::footfall_and_urbanization_multiplier, Assignment, HexAssignment, HexBoostData, + }, heartbeats::{HbType, KeyType, OwnedKeyType}, IsAuthorized, Settings, }; @@ -84,18 +85,17 @@ where impl CoverageDaemon where - Urban: HexAssignment, - Foot: HexAssignment, + Urban: HexAssignment + 'static, + Foot: HexAssignment + 'static, { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_upload: FileUpload, file_store: FileStore, auth_client: AuthorizationClient, - geofence: Geofence, - ) -> anyhow::Result<()> { + hex_boost_data: HexBoostData, + ) -> anyhow::Result { let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( FileType::CoverageObject, settings.store_base_path(), @@ -129,7 +129,7 @@ where .create() .await?; - let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?; + // let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?; let coverage_daemon = CoverageDaemon::new( pool, auth_client, @@ -140,12 +140,12 @@ where ) .await?; - task_manager.add(valid_coverage_objs_server); - task_manager.add(oracle_boosting_reports_server); - task_manager.add(coverage_objs_server); - task_manager.add(coverage_daemon); - - Ok(()) + Ok(TaskManager::builder() + .add_task(valid_coverage_objs_server) + .add_task(oracle_boosting_reports_server) + .add_task(coverage_objs_server) + .add_task(coverage_daemon) + .build()) } pub async fn new( diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 10e88e9fd..9873febc3 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -39,11 +39,10 @@ pub struct ServiceProviderDataSession { pub type HotspotMap = HashMap; impl DataSessionIngestor { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?; // data transfers let (data_session_ingest, data_session_ingest_server) = @@ -57,10 +56,10 @@ impl DataSessionIngestor { let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest); - task_manager.add(data_session_ingest_server); - task_manager.add(data_session_ingestor); - - Ok(()) + Ok(TaskManager::builder() + .add_task(data_session_ingest_server) + .add_task(data_session_ingestor) + .build()) } pub fn new( diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index e7629ffbc..e8b0a097e 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -42,8 +42,7 @@ where GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_store: FileStore, @@ -51,7 +50,7 @@ where valid_heartbeats: FileSinkClient, seniority_updates: FileSinkClient, geofence: GFV, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // CBRS Heartbeats let (cbrs_heartbeats, cbrs_heartbeats_server) = file_source::continuous_source::() @@ -75,10 +74,10 @@ where geofence, ); - task_manager.add(cbrs_heartbeats_server); - task_manager.add(cbrs_heartbeat_daemon); - - Ok(()) + Ok(TaskManager::builder() + .add_task(cbrs_heartbeats_server) + .add_task(cbrs_heartbeat_daemon) + .build()) } #[allow(clippy::too_many_arguments)] diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index f621f7bf9..bd5337a6d 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -41,8 +41,7 @@ where GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_store: FileStore, @@ -50,7 +49,7 @@ where valid_heartbeats: FileSinkClient, seniority_updates: FileSinkClient, geofence: GFV, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // Wifi Heartbeats let (wifi_heartbeats, wifi_heartbeats_server) = file_source::continuous_source::() @@ -73,10 +72,10 @@ where geofence, ); - task_manager.add(wifi_heartbeats_server); - task_manager.add(wifi_heartbeat_daemon); - - Ok(()) + Ok(TaskManager::builder() + .add_task(wifi_heartbeats_server) + .add_task(wifi_heartbeat_daemon) + .build()) } #[allow(clippy::too_many_arguments)] diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 7cf6bee5a..2821cf78c 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -61,14 +61,13 @@ impl RadioThresholdIngestor where AV: AuthorizationVerifier + Send + Sync + 'static, { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_upload: FileUpload, file_store: FileStore, authorization_verifier: AV, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let (verified_radio_threshold, verified_radio_threshold_server) = file_sink::FileSinkBuilder::new( FileType::VerifiedRadioThresholdIngestReport, @@ -122,13 +121,13 @@ where authorization_verifier, ); - task_manager.add(verified_radio_threshold_server); - task_manager.add(verified_invalidated_radio_threshold_server); - task_manager.add(radio_threshold_ingest_server); - task_manager.add(invalidated_radio_threshold_ingest_server); - task_manager.add(radio_threshold_ingestor); - - Ok(()) + Ok(TaskManager::builder() + .add_task(verified_radio_threshold_server) + .add_task(verified_invalidated_radio_threshold_server) + .add_task(radio_threshold_ingest_server) + .add_task(invalidated_radio_threshold_ingest_server) + .add_task(radio_threshold_ingestor) + .build()) } pub fn new( diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 41b9f794b..70989003a 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -57,15 +57,14 @@ where A: CarrierServiceVerifier + Send + Sync + 'static, B: HexBoostingInfoResolver + Send + Sync + 'static, { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_upload: FileUpload, carrier_service_verifier: A, hex_boosting_info_resolver: B, speedtests_avg: FileSinkClient, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; let reward_period_hours = settings.rewards; @@ -101,12 +100,12 @@ where speedtests_avg, ); - task_manager.add(price_daemon); - task_manager.add(mobile_rewards_server); - task_manager.add(reward_manifests_server); - task_manager.add(rewarder); - - Ok(()) + Ok(TaskManager::builder() + .add_task(price_daemon) + .add_task(mobile_rewards_server) + .add_task(reward_manifests_server) + .add_task(rewarder) + .build()) } #[allow(clippy::too_many_arguments)] diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 517bbb511..9f6bc3d1d 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -62,15 +62,14 @@ impl SpeedtestDaemon where GIR: GatewayInfoResolver, { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_upload: FileUpload, file_store: FileStore, speedtests_avg: FileSinkClient, gateway_resolver: GIR, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let (speedtests_validity, speedtests_validity_server) = file_sink::FileSinkBuilder::new( FileType::VerifiedSpeedtest, settings.store_base_path(), @@ -99,11 +98,11 @@ where speedtests_validity, ); - task_manager.add(speedtests_validity_server); - task_manager.add(speedtests_server); - task_manager.add(speedtest_daemon); - - Ok(()) + Ok(TaskManager::builder() + .add_task(speedtests_validity_server) + .add_task(speedtests_server) + .add_task(speedtest_daemon) + .build()) } pub fn new( diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 0c222d27d..13ca7de01 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -44,15 +44,14 @@ where AV: AuthorizationVerifier + Send + Sync + 'static, EV: EntityVerifier + Send + Sync + 'static, { - pub async fn setup( - task_manager: &mut TaskManager, + pub async fn create_managed_task( pool: Pool, settings: &Settings, file_upload: FileUpload, file_store: FileStore, authorization_verifier: AV, entity_verifier: EV, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let (verified_subscriber_location, verified_subscriber_location_server) = file_sink::FileSinkBuilder::new( FileType::VerifiedSubscriberLocationIngestReport, @@ -81,11 +80,11 @@ where verified_subscriber_location, ); - task_manager.add(verified_subscriber_location_server); - task_manager.add(subscriber_location_ingest_server); - task_manager.add(subscriber_location_ingestor); - - Ok(()) + Ok(TaskManager::builder() + .add_task(verified_subscriber_location_server) + .add_task(subscriber_location_ingest_server) + .add_task(subscriber_location_ingestor) + .build()) } pub fn new(