Skip to content

Commit

Permalink
rename existing mobile config client refs
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Jun 16, 2023
1 parent 60e5c77 commit 6a45485
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
6 changes: 3 additions & 3 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,22 @@ impl Cmd {
.await?;

let reward_period_hours = settings.rewards;
let gateway_client = GatewayClient::from_settings(&settings.config_client)?;
let gateway_client = GatewayClient::from_settings(&settings.config_client_settings)?;
let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?;

let (price_tracker, tracker_process) =
PriceTracker::start(&settings.price_tracker, shutdown_listener.clone()).await?;

let heartbeat_daemon = HeartbeatDaemon::new(
pool.clone(),
config_client.clone(),
gateway_client.clone(),
heartbeats,
valid_heartbeats,
);

let speedtest_daemon = SpeedtestDaemon::new(
pool.clone(),
config_client.clone(),
gateway_client.clone(),
speedtests,
valid_speedtests,
);
Expand Down
18 changes: 9 additions & 9 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ impl From<HeartbeatKey> for HeartbeatReward {

pub struct HeartbeatDaemon {
pool: sqlx::Pool<sqlx::Postgres>,
config_client: GatewayClient,
gateway_client: GatewayClient,
heartbeats: Receiver<FileInfoStream<CellHeartbeatIngestReport>>,
file_sink: FileSinkClient,
}

impl HeartbeatDaemon {
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
config_client: GatewayClient,
gateway_client: GatewayClient,
heartbeats: Receiver<FileInfoStream<CellHeartbeatIngestReport>>,
file_sink: FileSinkClient,
) -> Self {
Self {
pool,
config_client,
gateway_client,
heartbeats,
file_sink,
}
Expand Down Expand Up @@ -105,7 +105,7 @@ impl HeartbeatDaemon {
let reports = file.into_stream(&mut transaction).await?;

let mut validated_heartbeats =
pin!(Heartbeat::validate_heartbeats(&self.config_client, reports, &epoch).await);
pin!(Heartbeat::validate_heartbeats(&self.gateway_client, reports, &epoch).await);

while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? {
heartbeat.write(&self.file_sink).await?;
Expand Down Expand Up @@ -180,15 +180,15 @@ impl Heartbeat {
}

pub async fn validate_heartbeats<'a>(
config_client: &'a GatewayClient,
gateway_client: &'a GatewayClient,
heartbeats: impl Stream<Item = CellHeartbeatIngestReport> + 'a,
epoch: &'a Range<DateTime<Utc>>,
) -> impl Stream<Item = Result<Self, ClientError>> + 'a {
heartbeats.then(move |heartbeat_report| {
let mut config_client = config_client.clone();
let mut gateway_client = gateway_client.clone();
async move {
let (cell_type, validity) =
validate_heartbeat(&heartbeat_report, &mut config_client, epoch).await?;
validate_heartbeat(&heartbeat_report, &mut gateway_client, epoch).await?;
Ok(Heartbeat {
hotspot_key: heartbeat_report.report.pubkey,
cbsd_id: heartbeat_report.report.cbsd_id,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Heartbeat {
/// Validate a heartbeat in the given epoch.
async fn validate_heartbeat(
heartbeat: &CellHeartbeatIngestReport,
config_client: &mut GatewayClient,
gateway_client: &mut GatewayClient,
epoch: &Range<DateTime<Utc>>,
) -> Result<(Option<CellType>, proto::HeartbeatValidity), ClientError> {
let cell_type = match CellType::from_cbsd_id(&heartbeat.report.cbsd_id) {
Expand All @@ -269,7 +269,7 @@ async fn validate_heartbeat(
return Ok((cell_type, proto::HeartbeatValidity::HeartbeatOutsideRange));
}

if config_client
if gateway_client
.resolve_gateway_info(&heartbeat.report.pubkey)
.await?
.is_none()
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Settings {
pub output: file_store::Settings,
pub metrics: poc_metrics::Settings,
pub price_tracker: price::price_tracker::Settings,
pub config_client: mobile_config::ClientSettings,
pub config_client_settings: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
}
Expand Down
14 changes: 7 additions & 7 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ impl Speedtest {

pub struct SpeedtestDaemon {
pool: sqlx::Pool<sqlx::Postgres>,
config_client: GatewayClient,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
}

impl SpeedtestDaemon {
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
config_client: GatewayClient,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
) -> Self {
Self {
pool,
config_client,
gateway_client,
speedtests,
file_sink,
}
Expand Down Expand Up @@ -105,7 +105,7 @@ impl SpeedtestDaemon {

let mut validated_speedtests = pin!(
SpeedtestRollingAverage::validate_speedtests(
&self.config_client,
&self.gateway_client,
reports.map(|s| s.report),
&mut transaction,
)
Expand Down Expand Up @@ -157,7 +157,7 @@ impl SpeedtestRollingAverage {
}

pub async fn validate_speedtests<'a>(
config_client: &'a GatewayClient,
gateway_client: &'a GatewayClient,
speedtests: impl Stream<Item = CellSpeedtest> + 'a,
exec: &mut Transaction<'_, Postgres>,
) -> Result<impl Stream<Item = Result<Self, ClientError>> + 'a, sqlx::Error> {
Expand Down Expand Up @@ -188,10 +188,10 @@ impl SpeedtestRollingAverage {

Ok(futures::stream::iter(speedtests.into_iter())
.then(move |(rolling_average, cell_speedtests)| {
let mut config_client = config_client.clone();
let mut gateway_client = gateway_client.clone();
async move {
// If we get back some gateway info for the given address, it's a valid address
if config_client
if gateway_client
.resolve_gateway_info(&rolling_average.id)
.await?
.is_none()
Expand Down

0 comments on commit 6a45485

Please sign in to comment.