From d91ef05434561a21bf08cd8643849c3b9e93a653 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 8 Jan 2025 15:21:07 +0200 Subject: [PATCH 1/9] Implement tracked radios cache --- mobile_config/src/gateway_info.rs | 46 +-------------- mobile_config/src/gateway_service.rs | 59 +++++++++---------- mobile_config/src/main.rs | 37 ++++++++---- mobile_config/src/mobile_radio_tracker.rs | 70 ++++++++++++++++++----- 4 files changed, 111 insertions(+), 101 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index cec72941b..85d9ea034 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -363,13 +363,10 @@ pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; - use futures::{ - stream::{Stream, StreamExt}, - TryStreamExt, - }; + use futures::stream::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; - use std::{collections::HashMap, str::FromStr}; + use std::str::FromStr; const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, @@ -380,50 +377,11 @@ pub(crate) mod db { const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; - const GET_UPDATED_RADIOS: &str = - "SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; - - const GET_UPDATED_AT: &str = - "SELECT last_changed_at FROM mobile_radio_tracker WHERE entity_key = $1"; - lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); } - pub async fn get_updated_radios( - db: impl PgExecutor<'_>, - min_updated_at: DateTime, - ) -> anyhow::Result>> { - sqlx::query(GET_UPDATED_RADIOS) - .bind(min_updated_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_fold( - HashMap::new(), - |mut map: HashMap>, row| async move { - let entity_key_b = row.get::<&[u8], &str>("entity_key"); - let entity_key = bs58::encode(entity_key_b).into_string(); - let updated_at = row.get::, &str>("last_changed_at"); - map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at); - Ok(map) - }, - ) - .await - } - - pub async fn get_updated_at( - db: impl PgExecutor<'_>, - address: &PublicKeyBinary, - ) -> anyhow::Result>> { - let entity_key = bs58::decode(address.to_string()).into_vec()?; - sqlx::query_scalar(GET_UPDATED_AT) - .bind(entity_key) - .fetch_optional(db) - .await - .map_err(anyhow::Error::from) - } - pub async fn get_info( db: impl PgExecutor<'_>, address: &PublicKeyBinary, diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 73323e2f4..260f27805 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -1,12 +1,12 @@ use crate::{ - gateway_info::{self, db::get_updated_radios, DeviceType, GatewayInfo}, + gateway_info::{self, DeviceType, GatewayInfo}, key_cache::KeyCache, + mobile_radio_tracker::TrackedRadiosMap, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; use chrono::{DateTime, TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ - future, stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, }; @@ -20,14 +20,15 @@ use helium_proto::{ Message, }; use sqlx::{Pool, Postgres}; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; +use tokio::sync::RwLock; use tonic::{Request, Response, Status}; pub struct GatewayService { key_cache: KeyCache, - mobile_config_db_pool: Pool, metadata_pool: Pool, signing_key: Arc, + tracked_radios_cache: Arc>, } impl GatewayService { @@ -35,13 +36,13 @@ impl GatewayService { key_cache: KeyCache, metadata_pool: Pool, signing_key: Keypair, - mobile_config_db_pool: Pool, + tracked_radios_cache: Arc>, ) -> Self { Self { key_cache, metadata_pool, signing_key: Arc::new(signing_key), - mobile_config_db_pool, + tracked_radios_cache, } } @@ -129,11 +130,10 @@ impl mobile_config::Gateway for GatewayService { let pubkey: PublicKeyBinary = request.address.into(); tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); - let updated_at = gateway_info::db::get_updated_at(&self.mobile_config_db_pool, &pubkey) - .await - .map_err(|_| { - Status::internal("error fetching updated_at field for gateway info (v2)") - })?; + let updated_at = { + let tracked_radios = self.tracked_radios_cache.read().await; + tracked_radios.get(&pubkey).cloned() + }; gateway_info::db::get_info(&self.metadata_pool, &pubkey) .await @@ -230,7 +230,6 @@ impl mobile_config::Gateway for GatewayService { ); let metadata_db_pool = self.metadata_pool.clone(); - let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let addresses = request @@ -241,18 +240,14 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); + let radios_cache = Arc::clone(&self.tracked_radios_cache); tokio::spawn(async move { let min_updated_at = DateTime::UNIX_EPOCH; - let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?; let stream = stream .filter_map(|gateway_info| { - future::ready(handle_updated_at( - gateway_info, - &updated_radios, - min_updated_at, - )) + handle_updated_at(gateway_info, Arc::clone(&radios_cache), min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -307,7 +302,6 @@ impl mobile_config::Gateway for GatewayService { self.verify_request_signature(&signer, &request)?; let metadata_db_pool = self.metadata_pool.clone(); - let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; @@ -320,6 +314,7 @@ impl mobile_config::Gateway for GatewayService { device_types ); + let radios_cache = Arc::clone(&self.tracked_radios_cache); tokio::spawn(async move { let min_updated_at = Utc .timestamp_opt(request.min_updated_at as i64, 0) @@ -328,15 +323,10 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; - let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); let stream = stream .filter_map(|gateway_info| { - future::ready(handle_updated_at( - gateway_info, - &updated_radios, - min_updated_at, - )) + handle_updated_at(gateway_info, Arc::clone(&radios_cache), min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -346,20 +336,23 @@ impl mobile_config::Gateway for GatewayService { } } -fn handle_updated_at( +async fn handle_updated_at( mut gateway_info: GatewayInfo, - updated_radios: &HashMap>, + updated_radios: Arc>, min_updated_at: chrono::DateTime, ) -> Option { // Check mobile_radio_tracker HashMap - if let Some(updated_at) = updated_radios.get(&gateway_info.address) { - // It could be already filtered by min_updated_at but recheck won't hurt - if updated_at >= &min_updated_at { - gateway_info.updated_at = Some(*updated_at); - return Some(gateway_info); + { + let updated_radios = updated_radios.read().await; + if let Some(updated_at) = updated_radios.get(&gateway_info.address) { + if updated_at >= &min_updated_at { + gateway_info.updated_at = Some(*updated_at); + return Some(gateway_info); + } + return None; } - return None; } + // Fallback solution #1. Try to use refreshed_at as updated_at field and check // min_updated_at if let Some(refreshed_at) = gateway_info.refreshed_at { diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index b869bef2c..d90db6ef7 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -7,13 +7,19 @@ use helium_proto::services::mobile_config::{ HexBoostingServer, }; use mobile_config::{ - admin_service::AdminService, authorization_service::AuthorizationService, - carrier_service::CarrierService, entity_service::EntityService, - gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache, - mobile_radio_tracker::MobileRadioTracker, settings::Settings, + admin_service::AdminService, + authorization_service::AuthorizationService, + carrier_service::CarrierService, + entity_service::EntityService, + gateway_service::GatewayService, + hex_boosting_service::HexBoostingService, + key_cache::KeyCache, + mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, + settings::Settings, }; -use std::{net::SocketAddr, path::PathBuf, time::Duration}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::RwLock; use tonic::transport; #[derive(Debug, clap::Parser)] @@ -71,11 +77,15 @@ impl Daemon { let admin_svc = AdminService::new(settings, key_cache.clone(), key_cache_updater, pool.clone())?; + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + let gateway_svc = GatewayService::new( key_cache.clone(), metadata_pool.clone(), settings.signing_keypair()?, - pool.clone(), + Arc::clone(&tracked_radios_cache), ); let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?); let entity_svc = EntityService::new( @@ -107,13 +117,18 @@ impl Daemon { hex_boosting_svc, }; + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + metadata_pool.clone(), + settings.mobile_radio_tracker_interval, + Arc::clone(&tracked_radios_cache), + ); + // Preinitialize tracked_radios_cache to avoid race condition in GatewayService + mobile_tracker.track_changes().await?; + TaskManager::builder() .add_task(grpc_server) - .add_task(MobileRadioTracker::new( - pool.clone(), - metadata_pool.clone(), - settings.mobile_radio_tracker_interval, - )) + .add_task(mobile_tracker) .build() .start() .await diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 0dc03a8c3..a67135e88 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -1,10 +1,15 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use helium_crypto::PublicKeyBinary; +use sqlx::Row; use sqlx::{Pool, Postgres, QueryBuilder}; +use std::str::FromStr; use task_manager::ManagedTask; +use tokio::sync::RwLock; +pub type TrackedRadiosMap = HashMap>; type EntityKey = Vec; #[derive(Debug, Clone, sqlx::FromRow)] @@ -105,6 +110,7 @@ pub struct MobileRadioTracker { pool: Pool, metadata: Pool, interval: Duration, + tracked_radios_cache: Arc>, } impl ManagedTask for MobileRadioTracker { @@ -122,11 +128,17 @@ impl ManagedTask for MobileRadioTracker { } impl MobileRadioTracker { - pub fn new(pool: Pool, metadata: Pool, interval: Duration) -> Self { + pub fn new( + pool: Pool, + metadata: Pool, + interval: Duration, + tracked_radios_cache: Arc>, + ) -> Self { Self { pool, metadata, interval, + tracked_radios_cache, } } @@ -139,7 +151,7 @@ impl MobileRadioTracker { biased; _ = &mut shutdown => break, _ = interval.tick() => { - if let Err(err) = track_changes(&self.pool, &self.metadata).await { + if let Err(err) = self.track_changes().await { tracing::error!(?err, "error in tracking changes to mobile radios"); } } @@ -150,20 +162,28 @@ impl MobileRadioTracker { Ok(()) } -} -async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { - tracing::info!("looking for changes to radios"); - let tracked_radios = get_tracked_radios(pool).await?; - let all_mobile_radios = get_all_mobile_radios(metadata); + pub async fn track_changes(&self) -> anyhow::Result<()> { + tracing::info!("looking for changes to radios"); + let tracked_radios = get_tracked_radios(&self.pool).await?; + let all_mobile_radios = get_all_mobile_radios(&self.metadata); - let updates = identify_changes(all_mobile_radios, tracked_radios).await; - tracing::info!("updating in db: {}", updates.len()); + let updates = identify_changes(all_mobile_radios, tracked_radios).await; - update_tracked_radios(pool, updates).await?; - tracing::info!("done"); + tracing::info!("updating in db: {}", updates.len()); + update_tracked_radios(&self.pool, updates).await?; - Ok(()) + tracing::info!("updating tracked radios cache"); + let tracked_radios_map: TrackedRadiosMap = + get_updated_radios(&self.pool, DateTime::UNIX_EPOCH).await?; + { + let mut map = self.tracked_radios_cache.write().await; + *map = tracked_radios_map; + } + + tracing::info!("done"); + Ok(()) + } } async fn identify_changes( @@ -183,6 +203,30 @@ async fn identify_changes( .await } +const GET_UPDATED_RADIOS: &str = + "SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + +pub async fn get_updated_radios( + pool: &Pool, + min_updated_at: DateTime, +) -> anyhow::Result>> { + sqlx::query(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .fetch(pool) + .map_err(anyhow::Error::from) + .try_fold( + HashMap::new(), + |mut map: HashMap>, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let updated_at = row.get::, &str>("last_changed_at"); + map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at); + Ok(map) + }, + ) + .await +} + async fn get_tracked_radios( pool: &Pool, ) -> anyhow::Result> { From 9404c82699d2c76255e74020898e2107ba69fac1 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 8 Jan 2025 16:11:13 +0200 Subject: [PATCH 2/9] Fix tests --- mobile_config/src/main.rs | 2 +- mobile_config/tests/gateway_service.rs | 27 +++++++++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index d90db6ef7..ad077f001 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -123,7 +123,7 @@ impl Daemon { settings.mobile_radio_tracker_interval, Arc::clone(&tracked_radios_cache), ); - // Preinitialize tracked_radios_cache to avoid race condition in GatewayService + // (Pre)initialize tracked_radios_cache to avoid race condition in GatewayService mobile_tracker.track_changes().await?; TaskManager::builder() diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index fdbbf588d..13cc741f0 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,4 +1,4 @@ -use std::vec; +use std::{sync::Arc, vec}; use chrono::{DateTime, Duration, Utc}; use futures::stream::StreamExt; @@ -11,11 +11,12 @@ use helium_proto::services::mobile_config::{ use mobile_config::{ gateway_service::GatewayService, key_cache::{CacheKeys, KeyCache}, + mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, KeyRole, }; use prost::Message; use sqlx::PgPool; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::RwLock}; use tonic::{transport, Code}; #[sqlx::test] @@ -35,7 +36,9 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // Start the gateway server let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); let _handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -51,7 +54,7 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { assert_ne!( err.code(), Code::PermissionDenied, - "gateway can request infomation about itself" + "gateway can request information about itself" ); // Request gateway info as administrator @@ -99,8 +102,22 @@ async fn spawn_gateway_service( // Start the gateway server let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + pool.clone(), + // settings.mobile_radio_tracker_interval, + humantime::parse_duration("1 hour").unwrap(), + Arc::clone(&tracked_radios_cache), + ); + mobile_tracker.track_changes().await.unwrap(); + let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); + + let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); let handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) From 3aca0cf4e608e91a7293cd09f10bd9ce050b5aac Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 8 Jan 2025 17:10:40 +0200 Subject: [PATCH 3/9] Add starting grpc server log --- mobile_config/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index ad077f001..43023b374 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -126,6 +126,7 @@ impl Daemon { // (Pre)initialize tracked_radios_cache to avoid race condition in GatewayService mobile_tracker.track_changes().await?; + tracing::info!("Starting grpc server"); TaskManager::builder() .add_task(grpc_server) .add_task(mobile_tracker) From 29e8499875ba0b761f9a46ebe6a42a072c2a3856 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 9 Jan 2025 13:56:07 -0700 Subject: [PATCH 4/9] remove unneccessary lifetimes --- boost_manager/src/updater.rs | 2 +- iot_config/src/route_service.rs | 12 ++++++------ mobile_verifier/src/coverage.rs | 4 ++-- mobile_verifier/src/data_session.rs | 2 +- mobile_verifier/src/speedtests.rs | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/boost_manager/src/updater.rs b/boost_manager/src/updater.rs index 22eafda50..1b5be1310 100644 --- a/boost_manager/src/updater.rs +++ b/boost_manager/src/updater.rs @@ -173,7 +173,7 @@ where Ok(()) } - async fn confirm_txn<'a>(&self, txn_row: &TxnRow) -> Result<()> { + async fn confirm_txn(&self, txn_row: &TxnRow) -> Result<()> { if self.solana.confirm_transaction(&txn_row.txn_id).await? { tracing::info!("txn_id {} confirmed on chain, updated db", txn_row.txn_id); db::update_verified_txns_onchain(&self.pool, &txn_row.txn_id).await? diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 349933271..4f76f1e93 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -63,11 +63,11 @@ impl RouteService { self.update_channel.clone() } - async fn verify_request_signature<'a, R>( + async fn verify_request_signature( &self, signer: &PublicKey, request: &R, - id: OrgId<'a>, + id: OrgId<'_>, ) -> Result<(), Status> where R: MsgVerify, @@ -117,11 +117,11 @@ impl RouteService { } } - async fn verify_request_signature_or_stream<'a, R>( + async fn verify_request_signature_or_stream( &self, signer: &PublicKey, request: &R, - id: OrgId<'a>, + id: OrgId<'_>, ) -> Result<(), Status> where R: MsgVerify, @@ -151,9 +151,9 @@ impl RouteService { DevAddrEuiValidator::new(route_id, admin_keys, &self.pool, check_constraints).await } - async fn validate_skf_devaddrs<'a>( + async fn validate_skf_devaddrs( &self, - route_id: &'a str, + route_id: &str, updates: &[route_skf_update_req_v1::RouteSkfUpdateV1], ) -> Result<(), Status> { let ranges: Vec = route::list_devaddr_ranges_for_route(route_id, &self.pool) diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 5ef717084..b23c0fe68 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -547,11 +547,11 @@ impl CoverageClaimTimeCache { Self { cache } } - pub async fn fetch_coverage_claim_time<'a, 'b>( + pub async fn fetch_coverage_claim_time<'a>( &self, radio_key: KeyType<'a>, coverage_object: &'a Option, - exec: &mut Transaction<'b, Postgres>, + exec: &mut Transaction<'_, Postgres>, ) -> Result>, sqlx::Error> { let key = (radio_key.to_id(), *coverage_object); if let Some(coverage_claim_time) = self.cache.get(&key).await { diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 7f1940777..f8bc9a529 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -215,7 +215,7 @@ pub async fn sum_data_sessions_to_dc_by_payer<'a>( .collect::>()) } -pub async fn data_sessions_to_dc<'a>( +pub async fn data_sessions_to_dc( stream: impl Stream>, ) -> Result { tokio::pin!(stream); diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 5ed549c47..746e98a8a 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -275,7 +275,7 @@ pub async fn get_latest_speedtests_for_pubkey( Ok(speedtests) } -pub async fn aggregate_epoch_speedtests<'a>( +pub async fn aggregate_epoch_speedtests( epoch_end: DateTime, exec: &sqlx::Pool, ) -> Result { From 530c315c989430dd8fa5fd0f6ffde19448b1e4de Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 9 Jan 2025 14:06:33 -0700 Subject: [PATCH 5/9] Update for new clippy lint about map_or https://rust-lang.github.io/rust-clippy/master/index.html#unnecessary_map_or --- file_store/src/file_sink.rs | 2 +- iot_verifier/src/runner.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 771754e90..d2ee9d9ef 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -676,7 +676,7 @@ mod tests { .file_name() .to_str() .and_then(|file_name| FileInfo::from_str(file_name).ok()) - .map_or(false, |file_info| { + .is_some_and(|file_info| { FileType::from_str(&file_info.prefix).expect("entropy report prefix") == FileType::EntropyReport }) diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 5b1092c8d..98d0f06f6 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -514,7 +514,7 @@ where .witness_updater .get_last_witness(&beacon_report.report.pub_key) .await?; - Ok(last_witness.map_or(false, |lw| { + Ok(last_witness.is_some_and(|lw| { beacon_report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW })) } @@ -544,9 +544,8 @@ where ) -> anyhow::Result { let last_beacon_recip = LastBeaconReciprocity::get(&self.pool, &report.report.pub_key).await?; - Ok(last_beacon_recip.map_or(false, |lw| { - report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW - })) + Ok(last_beacon_recip + .is_some_and(|lw| report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW)) } } From e2c220fe4a08122214a7e3d5dcc4067691d664e9 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 10 Jan 2025 17:01:47 +0200 Subject: [PATCH 6/9] Fix possible race condition. Cached radios must not be subsituted in the middle of streaming --- mobile_config/src/gateway_service.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 260f27805..f276164ed 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -244,10 +244,13 @@ impl mobile_config::Gateway for GatewayService { tokio::spawn(async move { let min_updated_at = DateTime::UNIX_EPOCH; + let binding = Arc::clone(&radios_cache); + let radios_cache = binding.read().await; + let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?; let stream = stream .filter_map(|gateway_info| { - handle_updated_at(gateway_info, Arc::clone(&radios_cache), min_updated_at) + handle_updated_at(gateway_info, &radios_cache, min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -323,10 +326,13 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; + let binding = Arc::clone(&radios_cache); + let radios_cache = binding.read().await; + let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); let stream = stream .filter_map(|gateway_info| { - handle_updated_at(gateway_info, Arc::clone(&radios_cache), min_updated_at) + handle_updated_at(gateway_info, &radios_cache, min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -338,19 +344,16 @@ impl mobile_config::Gateway for GatewayService { async fn handle_updated_at( mut gateway_info: GatewayInfo, - updated_radios: Arc>, + updated_radios: &TrackedRadiosMap, min_updated_at: chrono::DateTime, ) -> Option { // Check mobile_radio_tracker HashMap - { - let updated_radios = updated_radios.read().await; - if let Some(updated_at) = updated_radios.get(&gateway_info.address) { - if updated_at >= &min_updated_at { - gateway_info.updated_at = Some(*updated_at); - return Some(gateway_info); - } - return None; + if let Some(updated_at) = updated_radios.get(&gateway_info.address) { + if updated_at >= &min_updated_at { + gateway_info.updated_at = Some(*updated_at); + return Some(gateway_info); } + return None; } // Fallback solution #1. Try to use refreshed_at as updated_at field and check From f65cdf0b96eded2629f4281fb364e1b5d2c67e8e Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 13 Jan 2025 20:04:36 +0200 Subject: [PATCH 7/9] Add mobile_tracker_integration_test --- mobile_config/tests/common/mod.rs | 63 ++++++++++++++++- mobile_config/tests/gateway_service.rs | 77 +++++---------------- mobile_config/tests/mobile_radio_tracker.rs | 61 ++++++++++++++-- 3 files changed, 138 insertions(+), 63 deletions(-) diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 489c68fa5..9227accd6 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -1,8 +1,59 @@ use bs58; use chrono::{DateTime, Duration, Utc}; use helium_crypto::PublicKeyBinary; -use helium_crypto::{KeyTag, Keypair}; +use helium_crypto::Sign; +use helium_crypto::{KeyTag, Keypair, PublicKey}; +use helium_proto::Message; +use mobile_config::{ + gateway_service::GatewayService, + key_cache::{CacheKeys, KeyCache}, + mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, + KeyRole, +}; use sqlx::PgPool; +use std::sync::Arc; +use tokio::{net::TcpListener, sync::RwLock}; +use tonic::transport; + +use helium_proto::services::mobile_config::{self as proto}; + +pub async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, + MobileRadioTracker, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + pool.clone(), + humantime::parse_duration("1 hour").unwrap(), + Arc::clone(&tracked_radios_cache), + ); + mobile_tracker.track_changes().await.unwrap(); + + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + + let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle, mobile_tracker) +} pub async fn add_mobile_tracker_record( pool: &PgPool, @@ -131,3 +182,13 @@ pub async fn create_db_tables(pool: &PgPool) { pub fn make_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) } + +pub fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { + let mut req = proto::GatewayInfoReqV1 { + address: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 929afa464..712e46f4f 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -11,7 +11,7 @@ use helium_proto::services::mobile_config::{ use mobile_config::{ gateway_service::GatewayService, key_cache::{CacheKeys, KeyCache}, - mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, + mobile_radio_tracker::TrackedRadiosMap, KeyRole, }; use prost::Message; @@ -92,44 +92,6 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -async fn spawn_gateway_service( - pool: PgPool, - admin_pub_key: PublicKey, -) -> ( - String, - tokio::task::JoinHandle>, -) { - let server_key = make_keypair(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); - - let tracked_radios_cache: Arc> = - Arc::new(RwLock::new(TrackedRadiosMap::new())); - - let mobile_tracker = MobileRadioTracker::new( - pool.clone(), - pool.clone(), - // settings.mobile_radio_tracker_interval, - humantime::parse_duration("1 hour").unwrap(), - Arc::clone(&tracked_radios_cache), - ); - mobile_tracker.track_changes().await.unwrap(); - - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - - let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); - let handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - - (format!("http://{addr}"), handle) -} - #[sqlx::test] async fn gateway_stream_info_v1(pool: PgPool) { let admin_key = make_keypair(); @@ -164,7 +126,8 @@ async fn gateway_stream_info_v1(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // Select all devices @@ -218,7 +181,8 @@ async fn gateway_stream_info_v2(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // Select all devices @@ -276,7 +240,8 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_gateway_stream_signed_req_v2(&admin_key, &[], updated_at.timestamp() as u64); @@ -334,7 +299,8 @@ async fn gateway_info_batch_v2(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_batch_request( @@ -438,7 +404,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_batch_request( @@ -520,7 +487,8 @@ async fn gateway_info_v2_no_mobile_tracker_record(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // asset 1 @@ -563,7 +531,8 @@ async fn gateway_info_v2(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_request(&asset1_pubkey, &admin_key); @@ -655,7 +624,8 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_gateway_stream_signed_req_v2(&admin_key, &[], 0); @@ -741,7 +711,8 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -848,16 +819,6 @@ fn make_gateway_stream_signed_req_v1( req } -fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { - let mut req = proto::GatewayInfoReqV1 { - address: address.to_vec(), - signer: signer.public_key().to_vec(), - signature: vec![], - }; - req.signature = signer.sign(&req.encode_to_vec()).unwrap(); - req -} - fn make_signed_info_batch_request( addresses: &[PublicKey], signer: &Keypair, diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs index 33b249f13..70a2183ad 100644 --- a/mobile_config/tests/mobile_radio_tracker.rs +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -1,15 +1,70 @@ use std::sync::Arc; -use chrono::Utc; +use chrono::{Days, Duration, Utc}; use helium_crypto::PublicKeyBinary; +use helium_proto::services::mobile_config::GatewayClient; use mobile_config::mobile_radio_tracker::{ get_tracked_radios, MobileRadioTracker, TrackedRadiosMap, }; use sqlx::PgPool; +use tokio::sync::RwLock; pub mod common; use common::*; -use tokio::sync::RwLock; + +#[sqlx::test] +async fn mobile_tracker_integration_test(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(refreshed_at), + None, + ) + .await; + let (addr, _handle, mobile_tracker) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, refreshed_at.timestamp() as u64); + + sqlx::query("UPDATE mobile_hotspot_infos SET refreshed_at = $1") + .bind(refreshed_at.checked_add_days(Days::new(1)).unwrap()) + .execute(&pool) + .await + .unwrap(); + mobile_tracker.track_changes().await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, refreshed_at.timestamp() as u64); + + let new_updated_at = refreshed_at.checked_add_days(Days::new(2)).unwrap(); + sqlx::query("UPDATE mobile_hotspot_infos SET refreshed_at = $1, location = $2") + .bind(new_updated_at) + .bind(0x8c446ca9aae35ff_i64) + .execute(&pool) + .await + .unwrap(); + mobile_tracker.track_changes().await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, new_updated_at.timestamp() as u64); +} #[sqlx::test] async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { @@ -65,13 +120,11 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { let mobile_tracker = MobileRadioTracker::new( pool.clone(), pool.clone(), - // settings.mobile_radio_tracker_interval, humantime::parse_duration("1 hour").unwrap(), Arc::clone(&tracked_radios_cache), ); mobile_tracker.track_changes().await.unwrap(); - // track_changes(&pool, &pool).await.unwrap(); let tracked_radios = get_tracked_radios(&pool).await.unwrap(); assert_eq!(tracked_radios.len(), 1); let tracked_radio = tracked_radios.get::>(&b58).unwrap(); From d1ea6836cef1c36c1585dd7f2e5cf9deffe1a8c3 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 14 Jan 2025 19:18:51 +0200 Subject: [PATCH 8/9] Replace Arc::clone to .clone(), remove an redundant arc clones --- mobile_config/src/gateway_service.rs | 10 ++++------ mobile_config/src/main.rs | 4 ++-- mobile_config/tests/common/mod.rs | 2 +- mobile_config/tests/mobile_radio_tracker.rs | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index f276164ed..b7892b620 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -240,12 +240,11 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); - let radios_cache = Arc::clone(&self.tracked_radios_cache); + let radios_cache = self.tracked_radios_cache.clone(); tokio::spawn(async move { let min_updated_at = DateTime::UNIX_EPOCH; - let binding = Arc::clone(&radios_cache); - let radios_cache = binding.read().await; + let radios_cache = radios_cache.read().await; let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?; let stream = stream @@ -317,7 +316,7 @@ impl mobile_config::Gateway for GatewayService { device_types ); - let radios_cache = Arc::clone(&self.tracked_radios_cache); + let radios_cache = self.tracked_radios_cache.clone(); tokio::spawn(async move { let min_updated_at = Utc .timestamp_opt(request.min_updated_at as i64, 0) @@ -326,8 +325,7 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; - let binding = Arc::clone(&radios_cache); - let radios_cache = binding.read().await; + let radios_cache = radios_cache.read().await; let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); let stream = stream diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 43023b374..a3bdd118a 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -85,7 +85,7 @@ impl Daemon { key_cache.clone(), metadata_pool.clone(), settings.signing_keypair()?, - Arc::clone(&tracked_radios_cache), + tracked_radios_cache.clone(), ); let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?); let entity_svc = EntityService::new( @@ -121,7 +121,7 @@ impl Daemon { pool.clone(), metadata_pool.clone(), settings.mobile_radio_tracker_interval, - Arc::clone(&tracked_radios_cache), + tracked_radios_cache.clone(), ); // (Pre)initialize tracked_radios_cache to avoid race condition in GatewayService mobile_tracker.track_changes().await?; diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 9227accd6..7c490e6ee 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -39,7 +39,7 @@ pub async fn spawn_gateway_service( pool.clone(), pool.clone(), humantime::parse_duration("1 hour").unwrap(), - Arc::clone(&tracked_radios_cache), + tracked_radios_cache.clone(), ); mobile_tracker.track_changes().await.unwrap(); diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs index 70a2183ad..610094021 100644 --- a/mobile_config/tests/mobile_radio_tracker.rs +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -121,7 +121,7 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { pool.clone(), pool.clone(), humantime::parse_duration("1 hour").unwrap(), - Arc::clone(&tracked_radios_cache), + tracked_radios_cache.clone(), ); mobile_tracker.track_changes().await.unwrap(); From 3a0491b67c134d72d2a07f69012d94e23a36addb Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 21 Jan 2025 10:17:58 +0200 Subject: [PATCH 9/9] Fix get_updated_radios return type --- mobile_config/src/mobile_radio_tracker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index c67fc0ca3..f19be4195 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -209,7 +209,7 @@ const GET_UPDATED_RADIOS: &str = pub async fn get_updated_radios( pool: &Pool, min_updated_at: DateTime, -) -> anyhow::Result>> { +) -> anyhow::Result { sqlx::query(GET_UPDATED_RADIOS) .bind(min_updated_at) .fetch(pool)