diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index cec72941b..85d9ea034 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -363,13 +363,10 @@ pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; - use futures::{ - stream::{Stream, StreamExt}, - TryStreamExt, - }; + use futures::stream::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; - use std::{collections::HashMap, str::FromStr}; + use std::str::FromStr; const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, @@ -380,50 +377,11 @@ pub(crate) mod db { const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; - const GET_UPDATED_RADIOS: &str = - "SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; - - const GET_UPDATED_AT: &str = - "SELECT last_changed_at FROM mobile_radio_tracker WHERE entity_key = $1"; - lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); } - pub async fn get_updated_radios( - db: impl PgExecutor<'_>, - min_updated_at: DateTime, - ) -> anyhow::Result>> { - sqlx::query(GET_UPDATED_RADIOS) - .bind(min_updated_at) - .fetch(db) - .map_err(anyhow::Error::from) - .try_fold( - HashMap::new(), - |mut map: HashMap>, row| async move { - let entity_key_b = row.get::<&[u8], &str>("entity_key"); - let entity_key = bs58::encode(entity_key_b).into_string(); - let updated_at = row.get::, &str>("last_changed_at"); - map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at); - Ok(map) - }, - ) - .await - } - - pub async fn get_updated_at( - db: impl PgExecutor<'_>, - address: &PublicKeyBinary, - ) -> anyhow::Result>> { - let entity_key = bs58::decode(address.to_string()).into_vec()?; - sqlx::query_scalar(GET_UPDATED_AT) - .bind(entity_key) - .fetch_optional(db) - .await - .map_err(anyhow::Error::from) - } - pub async fn get_info( db: impl PgExecutor<'_>, address: &PublicKeyBinary, diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 73323e2f4..b7892b620 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -1,12 +1,12 @@ use crate::{ - gateway_info::{self, db::get_updated_radios, DeviceType, GatewayInfo}, + gateway_info::{self, DeviceType, GatewayInfo}, key_cache::KeyCache, + mobile_radio_tracker::TrackedRadiosMap, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; use chrono::{DateTime, TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ - future, stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, }; @@ -20,14 +20,15 @@ use helium_proto::{ Message, }; use sqlx::{Pool, Postgres}; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; +use tokio::sync::RwLock; use tonic::{Request, Response, Status}; pub struct GatewayService { key_cache: KeyCache, - mobile_config_db_pool: Pool, metadata_pool: Pool, signing_key: Arc, + tracked_radios_cache: Arc>, } impl GatewayService { @@ -35,13 +36,13 @@ impl GatewayService { key_cache: KeyCache, metadata_pool: Pool, signing_key: Keypair, - mobile_config_db_pool: Pool, + tracked_radios_cache: Arc>, ) -> Self { Self { key_cache, metadata_pool, signing_key: Arc::new(signing_key), - mobile_config_db_pool, + tracked_radios_cache, } } @@ -129,11 +130,10 @@ impl mobile_config::Gateway for GatewayService { let pubkey: PublicKeyBinary = request.address.into(); tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); - let updated_at = gateway_info::db::get_updated_at(&self.mobile_config_db_pool, &pubkey) - .await - .map_err(|_| { - Status::internal("error fetching updated_at field for gateway info (v2)") - })?; + let updated_at = { + let tracked_radios = self.tracked_radios_cache.read().await; + tracked_radios.get(&pubkey).cloned() + }; gateway_info::db::get_info(&self.metadata_pool, &pubkey) .await @@ -230,7 +230,6 @@ impl mobile_config::Gateway for GatewayService { ); let metadata_db_pool = self.metadata_pool.clone(); - let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let addresses = request @@ -241,18 +240,16 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); + let radios_cache = self.tracked_radios_cache.clone(); tokio::spawn(async move { let min_updated_at = DateTime::UNIX_EPOCH; - let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + + let radios_cache = radios_cache.read().await; let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?; let stream = stream .filter_map(|gateway_info| { - future::ready(handle_updated_at( - gateway_info, - &updated_radios, - min_updated_at, - )) + handle_updated_at(gateway_info, &radios_cache, min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -307,7 +304,6 @@ impl mobile_config::Gateway for GatewayService { self.verify_request_signature(&signer, &request)?; let metadata_db_pool = self.metadata_pool.clone(); - let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; @@ -320,6 +316,7 @@ impl mobile_config::Gateway for GatewayService { device_types ); + let radios_cache = self.tracked_radios_cache.clone(); tokio::spawn(async move { let min_updated_at = Utc .timestamp_opt(request.min_updated_at as i64, 0) @@ -328,15 +325,12 @@ impl mobile_config::Gateway for GatewayService { "Invalid min_refreshed_at argument", ))?; - let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + let radios_cache = radios_cache.read().await; + let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); let stream = stream .filter_map(|gateway_info| { - future::ready(handle_updated_at( - gateway_info, - &updated_radios, - min_updated_at, - )) + handle_updated_at(gateway_info, &radios_cache, min_updated_at) }) .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await @@ -346,20 +340,20 @@ impl mobile_config::Gateway for GatewayService { } } -fn handle_updated_at( +async fn handle_updated_at( mut gateway_info: GatewayInfo, - updated_radios: &HashMap>, + updated_radios: &TrackedRadiosMap, min_updated_at: chrono::DateTime, ) -> Option { // Check mobile_radio_tracker HashMap if let Some(updated_at) = updated_radios.get(&gateway_info.address) { - // It could be already filtered by min_updated_at but recheck won't hurt if updated_at >= &min_updated_at { gateway_info.updated_at = Some(*updated_at); return Some(gateway_info); } return None; } + // Fallback solution #1. Try to use refreshed_at as updated_at field and check // min_updated_at if let Some(refreshed_at) = gateway_info.refreshed_at { diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index b869bef2c..a3bdd118a 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -7,13 +7,19 @@ use helium_proto::services::mobile_config::{ HexBoostingServer, }; use mobile_config::{ - admin_service::AdminService, authorization_service::AuthorizationService, - carrier_service::CarrierService, entity_service::EntityService, - gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache, - mobile_radio_tracker::MobileRadioTracker, settings::Settings, + admin_service::AdminService, + authorization_service::AuthorizationService, + carrier_service::CarrierService, + entity_service::EntityService, + gateway_service::GatewayService, + hex_boosting_service::HexBoostingService, + key_cache::KeyCache, + mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, + settings::Settings, }; -use std::{net::SocketAddr, path::PathBuf, time::Duration}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::RwLock; use tonic::transport; #[derive(Debug, clap::Parser)] @@ -71,11 +77,15 @@ impl Daemon { let admin_svc = AdminService::new(settings, key_cache.clone(), key_cache_updater, pool.clone())?; + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + let gateway_svc = GatewayService::new( key_cache.clone(), metadata_pool.clone(), settings.signing_keypair()?, - pool.clone(), + tracked_radios_cache.clone(), ); let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?); let entity_svc = EntityService::new( @@ -107,13 +117,19 @@ impl Daemon { hex_boosting_svc, }; + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + metadata_pool.clone(), + settings.mobile_radio_tracker_interval, + tracked_radios_cache.clone(), + ); + // (Pre)initialize tracked_radios_cache to avoid race condition in GatewayService + mobile_tracker.track_changes().await?; + + tracing::info!("Starting grpc server"); TaskManager::builder() .add_task(grpc_server) - .add_task(MobileRadioTracker::new( - pool.clone(), - metadata_pool.clone(), - settings.mobile_radio_tracker_interval, - )) + .add_task(mobile_tracker) .build() .start() .await diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 881503bb0..f19be4195 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -1,10 +1,15 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use helium_crypto::PublicKeyBinary; +use sqlx::Row; use sqlx::{Pool, Postgres, QueryBuilder}; +use std::str::FromStr; use task_manager::ManagedTask; +use tokio::sync::RwLock; +pub type TrackedRadiosMap = HashMap>; type EntityKey = Vec; #[derive(Debug, Clone, sqlx::FromRow)] @@ -105,6 +110,7 @@ pub struct MobileRadioTracker { pool: Pool, metadata: Pool, interval: Duration, + tracked_radios_cache: Arc>, } impl ManagedTask for MobileRadioTracker { @@ -122,11 +128,17 @@ impl ManagedTask for MobileRadioTracker { } impl MobileRadioTracker { - pub fn new(pool: Pool, metadata: Pool, interval: Duration) -> Self { + pub fn new( + pool: Pool, + metadata: Pool, + interval: Duration, + tracked_radios_cache: Arc>, + ) -> Self { Self { pool, metadata, interval, + tracked_radios_cache, } } @@ -139,7 +151,7 @@ impl MobileRadioTracker { biased; _ = &mut shutdown => break, _ = interval.tick() => { - if let Err(err) = track_changes(&self.pool, &self.metadata).await { + if let Err(err) = self.track_changes().await { tracing::error!(?err, "error in tracking changes to mobile radios"); } } @@ -150,20 +162,28 @@ impl MobileRadioTracker { Ok(()) } -} -pub async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { - tracing::info!("looking for changes to radios"); - let tracked_radios = get_tracked_radios(pool).await?; - let all_mobile_radios = get_all_mobile_radios(metadata); + pub async fn track_changes(&self) -> anyhow::Result<()> { + tracing::info!("looking for changes to radios"); + let tracked_radios = get_tracked_radios(&self.pool).await?; + let all_mobile_radios = get_all_mobile_radios(&self.metadata); - let updates = identify_changes(all_mobile_radios, tracked_radios).await; - tracing::info!("updating in db: {}", updates.len()); + let updates = identify_changes(all_mobile_radios, tracked_radios).await; - update_tracked_radios(pool, updates).await?; - tracing::info!("done"); + tracing::info!("updating in db: {}", updates.len()); + update_tracked_radios(&self.pool, updates).await?; - Ok(()) + tracing::info!("updating tracked radios cache"); + let tracked_radios_map: TrackedRadiosMap = + get_updated_radios(&self.pool, DateTime::UNIX_EPOCH).await?; + { + let mut map = self.tracked_radios_cache.write().await; + *map = tracked_radios_map; + } + + tracing::info!("done"); + Ok(()) + } } async fn identify_changes( @@ -183,6 +203,30 @@ async fn identify_changes( .await } +const GET_UPDATED_RADIOS: &str = + "SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + +pub async fn get_updated_radios( + pool: &Pool, + min_updated_at: DateTime, +) -> anyhow::Result { + sqlx::query(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .fetch(pool) + .map_err(anyhow::Error::from) + .try_fold( + HashMap::new(), + |mut map: HashMap>, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + let updated_at = row.get::, &str>("last_changed_at"); + map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at); + Ok(map) + }, + ) + .await +} + pub async fn get_tracked_radios( pool: &Pool, ) -> anyhow::Result> { diff --git a/mobile_config/tests/common/mod.rs b/mobile_config/tests/common/mod.rs index 489c68fa5..7c490e6ee 100644 --- a/mobile_config/tests/common/mod.rs +++ b/mobile_config/tests/common/mod.rs @@ -1,8 +1,59 @@ use bs58; use chrono::{DateTime, Duration, Utc}; use helium_crypto::PublicKeyBinary; -use helium_crypto::{KeyTag, Keypair}; +use helium_crypto::Sign; +use helium_crypto::{KeyTag, Keypair, PublicKey}; +use helium_proto::Message; +use mobile_config::{ + gateway_service::GatewayService, + key_cache::{CacheKeys, KeyCache}, + mobile_radio_tracker::{MobileRadioTracker, TrackedRadiosMap}, + KeyRole, +}; use sqlx::PgPool; +use std::sync::Arc; +use tokio::{net::TcpListener, sync::RwLock}; +use tonic::transport; + +use helium_proto::services::mobile_config::{self as proto}; + +pub async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, + MobileRadioTracker, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + pool.clone(), + humantime::parse_duration("1 hour").unwrap(), + tracked_radios_cache.clone(), + ); + mobile_tracker.track_changes().await.unwrap(); + + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + + let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle, mobile_tracker) +} pub async fn add_mobile_tracker_record( pool: &PgPool, @@ -131,3 +182,13 @@ pub async fn create_db_tables(pool: &PgPool) { pub fn make_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) } + +pub fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { + let mut req = proto::GatewayInfoReqV1 { + address: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index c34026abe..712e46f4f 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,4 +1,4 @@ -use std::vec; +use std::{sync::Arc, vec}; use chrono::{Duration, Utc}; use futures::stream::StreamExt; @@ -11,11 +11,12 @@ use helium_proto::services::mobile_config::{ use mobile_config::{ gateway_service::GatewayService, key_cache::{CacheKeys, KeyCache}, + mobile_radio_tracker::TrackedRadiosMap, KeyRole, }; use prost::Message; use sqlx::PgPool; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::RwLock}; use tonic::{transport, Code}; pub mod common; @@ -38,7 +39,9 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // Start the gateway server let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, tracked_radios_cache); let _handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -54,7 +57,7 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { assert_ne!( err.code(), Code::PermissionDenied, - "gateway can request infomation about itself" + "gateway can request information about itself" ); // Request gateway info as administrator @@ -89,30 +92,6 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -async fn spawn_gateway_service( - pool: PgPool, - admin_pub_key: PublicKey, -) -> ( - String, - tokio::task::JoinHandle>, -) { - let server_key = make_keypair(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); - let handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - - (format!("http://{addr}"), handle) -} - #[sqlx::test] async fn gateway_stream_info_v1(pool: PgPool) { let admin_key = make_keypair(); @@ -147,7 +126,8 @@ async fn gateway_stream_info_v1(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // Select all devices @@ -201,7 +181,8 @@ async fn gateway_stream_info_v2(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // Select all devices @@ -259,7 +240,8 @@ async fn gateway_stream_info_v2_updated_at(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_gateway_stream_signed_req_v2(&admin_key, &[], updated_at.timestamp() as u64); @@ -317,7 +299,8 @@ async fn gateway_info_batch_v2(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_batch_request( @@ -370,7 +353,6 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { let created_at = Utc::now() - Duration::hours(5); let refreshed_at = Utc::now() - Duration::hours(3); - let updated_at = Utc::now() - Duration::hours(4); create_db_tables(&pool).await; add_db_record( @@ -408,7 +390,6 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; // Must be ignored since not included in req add_db_record( @@ -423,7 +404,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_batch_request( @@ -465,7 +447,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { .find(|v| v.address == asset3_pubkey.to_vec()) .unwrap() .updated_at, - updated_at.timestamp() as u64 + refreshed_at.timestamp() as u64 ); } @@ -505,7 +487,8 @@ async fn gateway_info_v2_no_mobile_tracker_record(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); // asset 1 @@ -548,7 +531,8 @@ async fn gateway_info_v2(pool: PgPool) { .await; add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_signed_info_request(&asset1_pubkey, &admin_key); @@ -602,7 +586,6 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { let created_at = Utc::now() - Duration::hours(5); let refreshed_at = Utc::now() - Duration::hours(3); - let updated_at = Utc::now() - Duration::hours(4); create_db_tables(&pool).await; add_db_record( @@ -640,9 +623,9 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { None, ) .await; - add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); let req = make_gateway_stream_signed_req_v2(&admin_key, &[], 0); @@ -677,7 +660,7 @@ async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { .find(|v| v.address == asset3_pubkey.to_vec()) .unwrap() .updated_at, - updated_at.timestamp() as u64 + refreshed_at.timestamp() as u64 ); } @@ -728,7 +711,8 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { ) .await; - let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let (addr, _handle, _) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -835,16 +819,6 @@ fn make_gateway_stream_signed_req_v1( req } -fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { - let mut req = proto::GatewayInfoReqV1 { - address: address.to_vec(), - signer: signer.public_key().to_vec(), - signature: vec![], - }; - req.signature = signer.sign(&req.encode_to_vec()).unwrap(); - req -} - fn make_signed_info_batch_request( addresses: &[PublicKey], signer: &Keypair, diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs index d0a4783d4..610094021 100644 --- a/mobile_config/tests/mobile_radio_tracker.rs +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -1,11 +1,71 @@ -use chrono::Utc; +use std::sync::Arc; + +use chrono::{Days, Duration, Utc}; use helium_crypto::PublicKeyBinary; -use mobile_config::mobile_radio_tracker::{get_tracked_radios, track_changes}; +use helium_proto::services::mobile_config::GatewayClient; +use mobile_config::mobile_radio_tracker::{ + get_tracked_radios, MobileRadioTracker, TrackedRadiosMap, +}; use sqlx::PgPool; +use tokio::sync::RwLock; pub mod common; use common::*; +#[sqlx::test] +async fn mobile_tracker_integration_test(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(refreshed_at), + None, + ) + .await; + let (addr, _handle, mobile_tracker) = + spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, refreshed_at.timestamp() as u64); + + sqlx::query("UPDATE mobile_hotspot_infos SET refreshed_at = $1") + .bind(refreshed_at.checked_add_days(Days::new(1)).unwrap()) + .execute(&pool) + .await + .unwrap(); + mobile_tracker.track_changes().await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, refreshed_at.timestamp() as u64); + + let new_updated_at = refreshed_at.checked_add_days(Days::new(2)).unwrap(); + sqlx::query("UPDATE mobile_hotspot_infos SET refreshed_at = $1, location = $2") + .bind(new_updated_at) + .bind(0x8c446ca9aae35ff_i64) + .execute(&pool) + .await + .unwrap(); + mobile_tracker.track_changes().await.unwrap(); + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + assert_eq!(gw_info.updated_at, new_updated_at.timestamp() as u64); +} + #[sqlx::test] async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { // In case of duplications mobile tracker must use newer (refreshed_at) @@ -53,7 +113,18 @@ async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { .await; let b58 = bs58::decode(pubkey_binary.to_string()).into_vec().unwrap(); - track_changes(&pool, &pool).await.unwrap(); + + let tracked_radios_cache: Arc> = + Arc::new(RwLock::new(TrackedRadiosMap::new())); + + let mobile_tracker = MobileRadioTracker::new( + pool.clone(), + pool.clone(), + humantime::parse_duration("1 hour").unwrap(), + tracked_radios_cache.clone(), + ); + mobile_tracker.track_changes().await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); assert_eq!(tracked_radios.len(), 1); let tracked_radio = tracked_radios.get::>(&b58).unwrap();