From 38aa4c4af788069dd38eae8b9eace850eb03717b Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 2 Jan 2025 17:57:58 +0200 Subject: [PATCH] [Mobile Config] Add updated_at to GatewayInfoV2 (response) (#921) * Add updated_at to GatewayInfoV2 responses --- Cargo.lock | 10 +- mobile_config/src/gateway_info.rs | 63 ++++- mobile_config/src/gateway_service.rs | 116 ++++++-- mobile_config/tests/gateway_service.rs | 262 ++++++++++++++++++ .../tests/integrations/speedtests.rs | 2 + 5 files changed, 410 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc459c706..b03eaafaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" +source = "git+https://github.com/helium/proto?branch=master#4085e00c6f4d82c3da798ae1bc97324bc9cada2e" dependencies = [ "base64 0.21.7", "byteorder", @@ -1625,7 +1625,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -3821,7 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" +source = "git+https://github.com/helium/proto?branch=master#4085e00c6f4d82c3da798ae1bc97324bc9cada2e" dependencies = [ "bytes", "prost", @@ -6063,7 +6063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -9988,7 +9988,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf", diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 044cfd6c9..cec72941b 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -103,8 +103,12 @@ pub struct GatewayInfo { pub address: PublicKeyBinary, pub metadata: Option, pub device_type: DeviceType, - // None for V1 + // Optional fields are None for GatewayInfoProto (V1) pub created_at: Option>, + // updated_at refers to the last time the data was actually changed. + pub updated_at: Option>, + // refreshed_at indicates the last time the chain was consulted, regardless of data changes. + pub refreshed_at: Option>, } impl GatewayInfo { @@ -119,8 +123,8 @@ pub enum GatewayInfoProtoParseError { InvalidLocation(#[from] std::num::ParseIntError), #[error("Invalid created_at: {0}")] InvalidCreatedAt(u64), - #[error("Invalid refreshed_at: {0}")] - InvalidRefreshedAt(u64), + #[error("Invalid updated_at: {0}")] + InvalidUpdatedAt(u64), } impl TryFrom for GatewayInfo { @@ -134,6 +138,7 @@ impl TryFrom for GatewayInfo { metadata, device_type: _, created_at, + updated_at, } = info; let metadata = if let Some(metadata) = metadata { @@ -152,11 +157,18 @@ impl TryFrom for GatewayInfo { .single() .ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?; + let updated_at = Utc + .timestamp_opt(updated_at as i64, 0) + .single() + .ok_or(GatewayInfoProtoParseError::InvalidUpdatedAt(updated_at))?; + Ok(Self { address: address.into(), metadata, device_type: device_type_, created_at: Some(created_at), + updated_at: Some(updated_at), + refreshed_at: None, }) } } @@ -189,6 +201,8 @@ impl TryFrom for GatewayInfo { metadata, device_type: device_type_, created_at: None, + updated_at: None, + refreshed_at: None, }) } } @@ -264,8 +278,8 @@ pub enum GatewayInfoToProtoError { InvalidLocation(#[from] hextree::Error), #[error("created_at is None")] CreatedAtIsNone, - #[error("refreshed_at is None")] - RefreshedAtIsNone, + #[error("updated_at is None")] + UpdatedAtIsNone, } impl TryFrom for GatewayInfoProtoV2 { @@ -289,6 +303,10 @@ impl TryFrom for GatewayInfoProtoV2 { .created_at .ok_or(GatewayInfoToProtoError::CreatedAtIsNone)? .timestamp() as u64, + updated_at: info + .updated_at + .ok_or(GatewayInfoToProtoError::UpdatedAtIsNone)? + .timestamp() as u64, }) } } @@ -351,7 +369,7 @@ pub(crate) mod db { }; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; - use std::{collections::HashSet, str::FromStr}; + use std::{collections::HashMap, str::FromStr}; const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, @@ -363,7 +381,10 @@ pub(crate) mod db { const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; const GET_UPDATED_RADIOS: &str = - "SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + "SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1"; + + const GET_UPDATED_AT: &str = + "SELECT last_changed_at FROM mobile_radio_tracker WHERE entity_key = $1"; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); @@ -373,23 +394,36 @@ pub(crate) mod db { pub async fn get_updated_radios( db: impl PgExecutor<'_>, min_updated_at: DateTime, - ) -> anyhow::Result> { + ) -> anyhow::Result>> { sqlx::query(GET_UPDATED_RADIOS) .bind(min_updated_at) .fetch(db) .map_err(anyhow::Error::from) .try_fold( - HashSet::new(), - |mut set: HashSet, row| async move { + HashMap::new(), + |mut map: HashMap>, row| async move { let entity_key_b = row.get::<&[u8], &str>("entity_key"); let entity_key = bs58::encode(entity_key_b).into_string(); - set.insert(PublicKeyBinary::from_str(&entity_key)?); - Ok(set) + let updated_at = row.get::, &str>("last_changed_at"); + map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at); + Ok(map) }, ) .await } + pub async fn get_updated_at( + db: impl PgExecutor<'_>, + address: &PublicKeyBinary, + ) -> anyhow::Result>> { + let entity_key = bs58::decode(address.to_string()).into_vec()?; + sqlx::query_scalar(GET_UPDATED_AT) + .bind(entity_key) + .fetch_optional(db) + .await + .map_err(anyhow::Error::from) + } + pub async fn get_info( db: impl PgExecutor<'_>, address: &PublicKeyBinary, @@ -471,6 +505,7 @@ pub(crate) mod db { ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; let created_at = row.get::, &str>("created_at"); + let refreshed_at = row.get::>, &str>("refreshed_at"); Ok(Self { address: PublicKeyBinary::from_str( @@ -480,6 +515,10 @@ pub(crate) mod db { metadata, device_type, created_at: Some(created_at), + refreshed_at, + // The updated_at field should be determined by considering the last_changed_at + // value from the mobile_radio_tracker table. + updated_at: None, }) } } diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 7b66405c8..73323e2f4 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::{TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ future, @@ -15,12 +15,12 @@ use helium_proto::{ services::mobile_config::{ self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1, - GatewayInfoStreamResV2, + GatewayInfoStreamResV2, GatewayInfoV2, }, Message, }; use sqlx::{Pool, Postgres}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tonic::{Request, Response, Status}; pub struct GatewayService { @@ -129,6 +129,12 @@ impl mobile_config::Gateway for GatewayService { let pubkey: PublicKeyBinary = request.address.into(); tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); + let updated_at = gateway_info::db::get_updated_at(&self.mobile_config_db_pool, &pubkey) + .await + .map_err(|_| { + Status::internal("error fetching updated_at field for gateway info (v2)") + })?; + gateway_info::db::get_info(&self.metadata_pool, &pubkey) .await .map_err(|_| Status::internal("error fetching gateway info (v2)"))? @@ -137,15 +143,26 @@ impl mobile_config::Gateway for GatewayService { telemetry::count_gateway_chain_lookup("not-found"); Err(Status::not_found(pubkey.to_string())) }, - |info| { + |mut info| { if info.metadata.is_some() { telemetry::count_gateway_chain_lookup("asserted"); } else { telemetry::count_gateway_chain_lookup("not-asserted"); }; - let info = info + + // determine updated_at + if let Some(v) = updated_at { + info.updated_at = Some(v) + } else if info.refreshed_at.is_some() { + info.updated_at = info.refreshed_at; + } else { + info.updated_at = info.created_at; + } + + let info: GatewayInfoV2 = info .try_into() .map_err(|_| Status::internal("error serializing gateway info (v2)"))?; + let mut res = GatewayInfoResV2 { info: Some(info), timestamp: Utc::now().encode_timestamp(), @@ -212,7 +229,8 @@ impl mobile_config::Gateway for GatewayService { "fetching gateways' info batch" ); - let pool = self.metadata_pool.clone(); + let metadata_db_pool = self.metadata_pool.clone(); + let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let addresses = request @@ -224,7 +242,19 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); tokio::spawn(async move { - let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?; + let min_updated_at = DateTime::UNIX_EPOCH; + let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + + let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?; + let stream = stream + .filter_map(|gateway_info| { + future::ready(handle_updated_at( + gateway_info, + &updated_radios, + min_updated_at, + )) + }) + .boxed(); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); @@ -291,32 +321,66 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { + let min_updated_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; + + let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); - if request.min_updated_at > 0 { - let min_updated_at = Utc - .timestamp_opt(request.min_updated_at as i64, 0) - .single() - .ok_or(Status::invalid_argument( - "Invalid min_refreshed_at argument", - ))?; - - let updated_radios = - get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; - let stream = stream - .filter(|v| future::ready(updated_radios.contains(&v.address))) - .boxed(); - stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) - .await - } else { - stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) - .await - } + let stream = stream + .filter_map(|gateway_info| { + future::ready(handle_updated_at( + gateway_info, + &updated_radios, + min_updated_at, + )) + }) + .boxed(); + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); Ok(Response::new(GrpcStreamResult::new(rx))) } } +fn handle_updated_at( + mut gateway_info: GatewayInfo, + updated_radios: &HashMap>, + min_updated_at: chrono::DateTime, +) -> Option { + // Check mobile_radio_tracker HashMap + if let Some(updated_at) = updated_radios.get(&gateway_info.address) { + // It could be already filtered by min_updated_at but recheck won't hurt + if updated_at >= &min_updated_at { + gateway_info.updated_at = Some(*updated_at); + return Some(gateway_info); + } + return None; + } + // Fallback solution #1. Try to use refreshed_at as updated_at field and check + // min_updated_at + if let Some(refreshed_at) = gateway_info.refreshed_at { + if refreshed_at >= min_updated_at { + gateway_info.updated_at = Some(refreshed_at); + return Some(gateway_info); + } + return None; + } + // Fallback solution #2. Try to use created_at as updated_at field and check + // min_updated_at + if let Some(created_at) = gateway_info.created_at { + if created_at >= min_updated_at { + gateway_info.updated_at = Some(created_at); + return Some(gateway_info); + } + return None; + } + None +} + trait GatewayInfoStreamRes { type GatewayInfoType; fn new(gateways: Vec, timestamp: u64, signer: Vec) -> Self; diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 609cb43cb..fdbbf588d 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -353,6 +353,175 @@ async fn gateway_info_batch_v2(pool: PgPool) { assert!(gw2.metadata.clone().unwrap().deployment_info.is_none()); } +#[sqlx::test] +async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let asset2_hex_idx = 631711286145955327_i64; + let asset3_hex_idx = 631711286145006591_i64; + let asset3_pubkey = make_keypair().public_key().clone(); + let asset4_pubkey = make_keypair().public_key().clone(); + let asset4_hex_idx = 0x8c44a82aed527ff_i64; + + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + let updated_at = Utc::now() - Duration::hours(4); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(refreshed_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiIndoor\"", + asset2_pubkey.clone().into(), + created_at, + None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset3", + asset3_hex_idx, + "\"wifiDataOnly\"", + asset3_pubkey.clone().into(), + created_at, + Some(refreshed_at), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + + // Must be ignored since not included in req + add_db_record( + &pool, + "asset4", + asset4_hex_idx, + "\"wifiIndoor\"", + asset4_pubkey.clone().into(), + created_at, + None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_signed_info_batch_request( + &vec![ + asset1_pubkey.clone(), + asset2_pubkey.clone(), + asset3_pubkey.clone(), + make_keypair().public_key().clone(), // it doesn't exist + ], + &admin_key, + ); + let stream = client.info_batch_v2(req).await.unwrap().into_inner(); + let resp = stream + .filter_map(|result| async { result.ok() }) + .collect::>() + .await; + let gateways = resp.first().unwrap().gateways.clone(); + assert_eq!(gateways.len(), 3); + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset1_pubkey.to_vec()) + .unwrap() + .updated_at, + refreshed_at.timestamp() as u64 + ); + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset2_pubkey.to_vec()) + .unwrap() + .updated_at, + created_at.timestamp() as u64 + ); + + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset3_pubkey.to_vec()) + .unwrap() + .updated_at, + updated_at.timestamp() as u64 + ); +} + +#[sqlx::test] +async fn gateway_info_v2_no_mobile_tracker_record(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let asset2_hex_idx = 631711286145955327_i64; + + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(refreshed_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiIndoor\"", + asset2_pubkey.clone().into(), + created_at, + None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // asset 1 + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!(gw_info.updated_at, refreshed_at.timestamp() as u64); + + // asset 2 + let req = make_signed_info_request(&asset2_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + let gw_info = resp.info.unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset2_pubkey.clone()); + assert_eq!(gw_info.updated_at, created_at.timestamp() as u64); +} + #[sqlx::test] async fn gateway_info_v2(pool: PgPool) { let admin_key = make_keypair(); @@ -374,6 +543,7 @@ async fn gateway_info_v2(pool: PgPool) { Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) ) .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); @@ -388,6 +558,7 @@ async fn gateway_info_v2(pool: PgPool) { DeviceType::try_from(gw_info.device_type).unwrap(), DeviceType::WifiIndoor ); + assert_eq!(gw_info.updated_at, updated_at.timestamp() as u64); assert_eq!( i64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), asset1_hex_idx @@ -416,6 +587,97 @@ async fn gateway_info_v2(pool: PgPool) { assert_eq!(resp_err.code(), Code::NotFound); } +#[sqlx::test] +async fn gateway_info_stream_v2_updated_at_check(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let asset2_hex_idx = 631711286145955327_i64; + let asset3_hex_idx = 631711286145006591_i64; + let asset3_pubkey = make_keypair().public_key().clone(); + + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + let updated_at = Utc::now() - Duration::hours(4); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(refreshed_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiIndoor\"", + asset2_pubkey.clone().into(), + created_at, + None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset3", + asset3_hex_idx, + "\"wifiDataOnly\"", + asset3_pubkey.clone().into(), + created_at, + Some(refreshed_at), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset3_pubkey.clone().into(), updated_at).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req_v2(&admin_key, &[], 0); + let stream = client.info_stream_v2(req).await.unwrap().into_inner(); + + let resp = stream + .filter_map(|result| async { result.ok() }) + .collect::>() + .await; + let gateways = resp.first().unwrap().gateways.clone(); + assert_eq!(gateways.len(), 3); + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset1_pubkey.to_vec()) + .unwrap() + .updated_at, + refreshed_at.timestamp() as u64 + ); + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset2_pubkey.to_vec()) + .unwrap() + .updated_at, + created_at.timestamp() as u64 + ); + + assert_eq!( + gateways + .iter() + .find(|v| v.address == asset3_pubkey.to_vec()) + .unwrap() + .updated_at, + updated_at.timestamp() as u64 + ); +} + #[sqlx::test] async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { let admin_key = make_keypair(); diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 791480b45..5bdc961eb 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -35,6 +35,8 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { metadata: None, device_type: DeviceType::Cbrs, created_at: None, + refreshed_at: None, + updated_at: None, })) }