Skip to content

Commit

Permalink
Refactor mobile-verifier start sequence to separate daemons
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Apr 23, 2024
1 parent 992d80c commit 6d5fc54
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 321 deletions.
348 changes: 71 additions & 277 deletions mobile_verifier/src/cli/server.rs

Large diffs are not rendered by default.

79 changes: 71 additions & 8 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::{
boosting_oracles::{
assignment::footfall_and_urbanization_multiplier, Assignment, HexAssignment, HexBoostData,
},
boosting_oracles::{self, assignment::HexAssignments, BoostedHexAssignments, HexBoostData},
geofence::Geofence,
heartbeats::{HbType, KeyType, OwnedKeyType},
IsAuthorized,
IsAuthorized, Settings,
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use file_store::{
coverage::{self, CoverageObjectIngestReport},
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::{self, FileSinkClient},
file_source,
file_upload::FileUpload,
traits::TimestampEncode,
FileStore, FileType,
};
use futures::{
stream::{BoxStream, Stream, StreamExt},
Expand Down Expand Up @@ -41,7 +43,7 @@ use std::{
sync::Arc,
time::Instant,
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;
use uuid::Uuid;

Expand Down Expand Up @@ -85,6 +87,67 @@ where
Urban: HexAssignment,
Foot: HexAssignment,
{
pub async fn setup(
task_manager: &mut TaskManager,
pool: Pool<Postgres>,
settings: &Settings,
file_upload: FileUpload,
file_store: FileStore,
auth_client: AuthorizationClient,
geofence: Geofence,
) -> anyhow::Result<()> {
let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
settings.store_base_path(),
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_coverage_object"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

// Oracle boosting reports
let (oracle_boosting_reports, oracle_boosting_reports_server) =
file_sink::FileSinkBuilder::new(
FileType::OracleBoostingReport,
settings.store_base_path(),
file_upload,
concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

let (coverage_objs, coverage_objs_server) =
file_source::continuous_source::<CoverageObjectIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CoverageObjectIngestReport.to_string())
.create()
.await?;

let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?;
let coverage_daemon = CoverageDaemon::new(
pool,
auth_client,
hex_boost_data,
coverage_objs,
valid_coverage_objs,
oracle_boosting_reports,
)
.await?;

task_manager.add(valid_coverage_objs_server);
task_manager.add(oracle_boosting_reports_server);
task_manager.add(coverage_objs_server);
task_manager.add(coverage_daemon);

Ok(())
}

pub async fn new(
pool: PgPool,
auth_client: AuthorizationClient,
Expand Down
37 changes: 34 additions & 3 deletions mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use chrono::{DateTime, Utc};
use file_store::{file_info_poller::FileInfoStream, mobile_transfer::ValidDataTransferSession};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
file_source,
mobile_transfer::ValidDataTransferSession,
FileStore, FileType,
};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
TryFutureExt,
};
use helium_crypto::PublicKeyBinary;
use helium_proto::ServiceProvider;
use rust_decimal::Decimal;
use sqlx::{PgPool, Postgres, Row, Transaction};
use sqlx::{PgPool, Pool, Postgres, Row, Transaction};
use std::{collections::HashMap, ops::Range, time::Instant};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

use crate::Settings;

pub struct DataSessionIngestor {
pub receiver: Receiver<FileInfoStream<ValidDataTransferSession>>,
pub pool: PgPool,
Expand All @@ -32,6 +39,30 @@ pub struct ServiceProviderDataSession {
pub type HotspotMap = HashMap<PublicKeyBinary, HotspotReward>;

impl DataSessionIngestor {
pub async fn setup(
task_manager: &mut TaskManager,
pool: Pool<Postgres>,
settings: &Settings,
) -> anyhow::Result<()> {
let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?;
// data transfers
let (data_session_ingest, data_session_ingest_server) =
file_source::continuous_source::<ValidDataTransferSession, _>()
.state(pool.clone())
.store(data_transfer_ingest)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::ValidDataTransferSession.to_string())
.create()
.await?;

let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

task_manager.add(data_session_ingest_server);
task_manager.add(data_session_ingestor);

Ok(())
}

pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
receiver: Receiver<FileInfoStream<ValidDataTransferSession>>,
Expand Down
56 changes: 50 additions & 6 deletions mobile_verifier/src/heartbeats/cbrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@ use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
GatewayResolver, Settings,
};

use chrono::{DateTime, Duration, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::FileSinkClient,
file_source,
heartbeat::CbrsHeartbeatIngestReport,
FileStore, FileType,
};
use futures::{stream::StreamExt, TryFutureExt};
use retainer::Cache;
use sqlx::{Pool, Postgres};
use std::{
sync::Arc,
time::{self, Instant},
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR, GFV> {
pub struct CbrsHeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<CbrsHeartbeatIngestReport>>,
Expand All @@ -32,11 +36,51 @@ pub struct HeartbeatDaemon<GIR, GFV> {
geofence: GFV,
}

impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> CbrsHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn setup(
task_manager: &mut TaskManager,
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<()> {
// CBRS Heartbeats
let (cbrs_heartbeats, cbrs_heartbeats_server) =
file_source::continuous_source::<CbrsHeartbeatIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CbrsHeartbeatIngestReport.to_string())
.queue_size(1)
.create()
.await?;

let cbrs_heartbeat_daemon = CbrsHeartbeatDaemon::new(
pool,
gateway_resolver,
cbrs_heartbeats,
settings.modeled_coverage_start(),
settings.max_asserted_distance_deviation,
settings.max_distance_from_coverage,
valid_heartbeats,
seniority_updates,
geofence,
);

task_manager.add(cbrs_heartbeats_server);
task_manager.add(cbrs_heartbeat_daemon);

Ok(())
}

#[allow(clippy::too_many_arguments)]
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
Expand Down Expand Up @@ -151,7 +195,7 @@ where
}
}

impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> ManagedTask for CbrsHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
Expand Down
55 changes: 49 additions & 6 deletions mobile_verifier/src/heartbeats/wifi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,27 @@ use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
GatewayResolver, Settings,
};
use chrono::{DateTime, Duration, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::FileSinkClient,
file_source,
wifi_heartbeat::WifiHeartbeatIngestReport,
FileStore, FileType,
};
use futures::{stream::StreamExt, TryFutureExt};
use retainer::Cache;
use sqlx::{Pool, Postgres};
use std::{
sync::Arc,
time::{self, Instant},
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR, GFV> {
pub struct WifiHeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<WifiHeartbeatIngestReport>>,
Expand All @@ -31,11 +35,50 @@ pub struct HeartbeatDaemon<GIR, GFV> {
geofence: GFV,
}

impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> WifiHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn setup(
task_manager: &mut TaskManager,
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<()> {
// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
file_source::continuous_source::<WifiHeartbeatIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::WifiHeartbeatIngestReport.to_string())
.create()
.await?;

let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new(
pool,
gateway_resolver,
wifi_heartbeats,
settings.modeled_coverage_start(),
settings.max_asserted_distance_deviation,
settings.max_distance_from_coverage,
valid_heartbeats,
seniority_updates,
geofence,
);

task_manager.add(wifi_heartbeats_server);
task_manager.add(wifi_heartbeat_daemon);

Ok(())
}

#[allow(clippy::too_many_arguments)]
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
Expand Down Expand Up @@ -143,7 +186,7 @@ where
}
}

impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> ManagedTask for WifiHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
Expand Down
Loading

0 comments on commit 6d5fc54

Please sign in to comment.