Skip to content

Commit

Permalink
Refactored for daemons to return TaskManager
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Apr 23, 2024
1 parent 6d5fc54 commit 623bfdf
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 158 deletions.
175 changes: 88 additions & 87 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
boosting_oracles,
coverage::CoverageDaemon,
data_session::DataSessionIngestor,
geofence::Geofence,
Expand Down Expand Up @@ -99,92 +100,92 @@ impl Cmd {
settings.usa_and_mexico_fencing_resolution()?,
)?;

let mut task_manager = TaskManager::new();
task_manager.add(file_upload_server);
task_manager.add(valid_heartbeats_server);
task_manager.add(seniority_updates_server);
task_manager.add(speedtests_avg_server);

CbrsHeartbeatDaemon::setup(
&mut task_manager,
pool.clone(),
settings,
report_ingest.clone(),
gateway_client.clone(),
valid_heartbeats.clone(),
seniority_updates.clone(),
usa_geofence.clone(),
)
.await?;

WifiHeartbeatDaemon::setup(
&mut task_manager,
pool.clone(),
settings,
report_ingest.clone(),
gateway_client.clone(),
valid_heartbeats,
seniority_updates,
usa_and_mexico_geofence,
)
.await?;

SpeedtestDaemon::setup(
&mut task_manager,
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
speedtests_avg.clone(),
gateway_client,
)
.await?;

CoverageDaemon::setup(
&mut task_manager,
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
auth_client.clone(),
usa_geofence,
)
.await?;

SubscriberLocationIngestor::setup(
&mut task_manager,
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
auth_client.clone(),
entity_client,
)
.await?;

RadioThresholdIngestor::setup(
&mut task_manager,
pool.clone(),
settings,
file_upload.clone(),
report_ingest,
auth_client,
)
.await?;

DataSessionIngestor::setup(&mut task_manager, pool.clone(), settings).await?;

Rewarder::setup(
&mut task_manager,
pool,
settings,
file_upload,
carrier_client,
hex_boosting_client,
speedtests_avg,
)
.await?;

task_manager.start().await
TaskManager::builder()
.add_task(file_upload_server)
.add_task(valid_heartbeats_server)
.add_task(seniority_updates_server)
.add_task(speedtests_avg_server)
.add_task(
CbrsHeartbeatDaemon::create_managed_task(
pool.clone(),
settings,
report_ingest.clone(),
gateway_client.clone(),
valid_heartbeats.clone(),
seniority_updates.clone(),
usa_geofence.clone(),
)
.await?,
)
.add_task(
WifiHeartbeatDaemon::create_managed_task(
pool.clone(),
settings,
report_ingest.clone(),
gateway_client.clone(),
valid_heartbeats,
seniority_updates,
usa_and_mexico_geofence,
)
.await?,
)
.add_task(
SpeedtestDaemon::create_managed_task(
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
speedtests_avg.clone(),
gateway_client,
)
.await?,
)
.add_task(
CoverageDaemon::create_managed_task(
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
auth_client.clone(),
boosting_oracles::make_hex_boost_data(settings, usa_geofence)?,
)
.await?,
)
.add_task(
SubscriberLocationIngestor::create_managed_task(
pool.clone(),
settings,
file_upload.clone(),
report_ingest.clone(),
auth_client.clone(),
entity_client,
)
.await?,
)
.add_task(
RadioThresholdIngestor::create_managed_task(
pool.clone(),
settings,
file_upload.clone(),
report_ingest,
auth_client,
)
.await?,
)
.add_task(DataSessionIngestor::create_managed_task(pool.clone(), settings).await?)
.add_task(
Rewarder::create_managed_task(
pool,
settings,
file_upload,
carrier_client,
hex_boosting_client,
speedtests_avg,
)
.await?,
)
.build()
.start()
.await
}
}
30 changes: 15 additions & 15 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
boosting_oracles::{self, assignment::HexAssignments, BoostedHexAssignments, HexBoostData},
geofence::Geofence,
boosting_oracles::{
assignment::footfall_and_urbanization_multiplier, Assignment, HexAssignment, HexBoostData,
},
heartbeats::{HbType, KeyType, OwnedKeyType},
IsAuthorized, Settings,
};
Expand Down Expand Up @@ -84,18 +85,17 @@ where

impl<Urban, Foot> CoverageDaemon<Urban, Foot>
where
Urban: HexAssignment,
Foot: HexAssignment,
Urban: HexAssignment + 'static,
Foot: HexAssignment + 'static,
{
pub async fn setup(
task_manager: &mut TaskManager,
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_upload: FileUpload,
file_store: FileStore,
auth_client: AuthorizationClient,
geofence: Geofence,
) -> anyhow::Result<()> {
hex_boost_data: HexBoostData<Urban, Foot>,
) -> anyhow::Result<impl ManagedTask> {
let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
settings.store_base_path(),
Expand Down Expand Up @@ -129,7 +129,7 @@ where
.create()
.await?;

let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?;
// let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?;
let coverage_daemon = CoverageDaemon::new(
pool,
auth_client,
Expand All @@ -140,12 +140,12 @@ where
)
.await?;

task_manager.add(valid_coverage_objs_server);
task_manager.add(oracle_boosting_reports_server);
task_manager.add(coverage_objs_server);
task_manager.add(coverage_daemon);

Ok(())
Ok(TaskManager::builder()
.add_task(valid_coverage_objs_server)
.add_task(oracle_boosting_reports_server)
.add_task(coverage_objs_server)
.add_task(coverage_daemon)
.build())
}

pub async fn new(
Expand Down
13 changes: 6 additions & 7 deletions mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ pub struct ServiceProviderDataSession {
pub type HotspotMap = HashMap<PublicKeyBinary, HotspotReward>;

impl DataSessionIngestor {
pub async fn setup(
task_manager: &mut TaskManager,
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
) -> anyhow::Result<()> {
) -> anyhow::Result<impl ManagedTask> {
let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?;
// data transfers
let (data_session_ingest, data_session_ingest_server) =
Expand All @@ -57,10 +56,10 @@ impl DataSessionIngestor {

let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

task_manager.add(data_session_ingest_server);
task_manager.add(data_session_ingestor);

Ok(())
Ok(TaskManager::builder()
.add_task(data_session_ingest_server)
.add_task(data_session_ingestor)
.build())
}

pub fn new(
Expand Down
13 changes: 6 additions & 7 deletions mobile_verifier/src/heartbeats/cbrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ where
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn setup(
task_manager: &mut TaskManager,
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<()> {
) -> anyhow::Result<impl ManagedTask> {
// CBRS Heartbeats
let (cbrs_heartbeats, cbrs_heartbeats_server) =
file_source::continuous_source::<CbrsHeartbeatIngestReport, _>()
Expand All @@ -75,10 +74,10 @@ where
geofence,
);

task_manager.add(cbrs_heartbeats_server);
task_manager.add(cbrs_heartbeat_daemon);

Ok(())
Ok(TaskManager::builder()
.add_task(cbrs_heartbeats_server)
.add_task(cbrs_heartbeat_daemon)
.build())
}

#[allow(clippy::too_many_arguments)]
Expand Down
13 changes: 6 additions & 7 deletions mobile_verifier/src/heartbeats/wifi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ where
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn setup(
task_manager: &mut TaskManager,
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<()> {
) -> anyhow::Result<impl ManagedTask> {
// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
file_source::continuous_source::<WifiHeartbeatIngestReport, _>()
Expand All @@ -73,10 +72,10 @@ where
geofence,
);

task_manager.add(wifi_heartbeats_server);
task_manager.add(wifi_heartbeat_daemon);

Ok(())
Ok(TaskManager::builder()
.add_task(wifi_heartbeats_server)
.add_task(wifi_heartbeat_daemon)
.build())
}

#[allow(clippy::too_many_arguments)]
Expand Down
19 changes: 9 additions & 10 deletions mobile_verifier/src/radio_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ impl<AV> RadioThresholdIngestor<AV>
where
AV: AuthorizationVerifier + Send + Sync + 'static,
{
pub async fn setup(
task_manager: &mut TaskManager,
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_upload: FileUpload,
file_store: FileStore,
authorization_verifier: AV,
) -> anyhow::Result<()> {
) -> anyhow::Result<impl ManagedTask> {
let (verified_radio_threshold, verified_radio_threshold_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedRadioThresholdIngestReport,
Expand Down Expand Up @@ -122,13 +121,13 @@ where
authorization_verifier,
);

task_manager.add(verified_radio_threshold_server);
task_manager.add(verified_invalidated_radio_threshold_server);
task_manager.add(radio_threshold_ingest_server);
task_manager.add(invalidated_radio_threshold_ingest_server);
task_manager.add(radio_threshold_ingestor);

Ok(())
Ok(TaskManager::builder()
.add_task(verified_radio_threshold_server)
.add_task(verified_invalidated_radio_threshold_server)
.add_task(radio_threshold_ingest_server)
.add_task(invalidated_radio_threshold_ingest_server)
.add_task(radio_threshold_ingestor)
.build())
}

pub fn new(
Expand Down
Loading

0 comments on commit 623bfdf

Please sign in to comment.