Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mobile Config] Add updated_at to GatewayInfoV2 (response) #921

Merged
merged 8 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 51 additions & 12 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ pub struct GatewayInfo {
pub address: PublicKeyBinary,
pub metadata: Option<GatewayMetadata>,
pub device_type: DeviceType,
// None for V1
// Optional fields are None for GatewayInfoProto (V1)
pub created_at: Option<DateTime<Utc>>,
// updated_at refers to the last time the data was actually changed.
pub updated_at: Option<DateTime<Utc>>,
// refreshed_at indicates the last time the chain was consulted, regardless of data changes.
pub refreshed_at: Option<DateTime<Utc>>,
}

impl GatewayInfo {
Expand All @@ -119,8 +123,8 @@ pub enum GatewayInfoProtoParseError {
InvalidLocation(#[from] std::num::ParseIntError),
#[error("Invalid created_at: {0}")]
InvalidCreatedAt(u64),
#[error("Invalid refreshed_at: {0}")]
InvalidRefreshedAt(u64),
#[error("Invalid updated_at: {0}")]
InvalidUpdatedAt(u64),
}

impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
Expand All @@ -134,6 +138,7 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
metadata,
device_type: _,
created_at,
updated_at,
} = info;

let metadata = if let Some(metadata) = metadata {
Expand All @@ -152,11 +157,18 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
.single()
.ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?;

let updated_at = Utc
.timestamp_opt(updated_at as i64, 0)
.single()
.ok_or(GatewayInfoProtoParseError::InvalidUpdatedAt(updated_at))?;

Ok(Self {
address: address.into(),
metadata,
device_type: device_type_,
created_at: Some(created_at),
updated_at: Some(updated_at),
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -189,6 +201,8 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
metadata,
device_type: device_type_,
created_at: None,
updated_at: None,
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -264,8 +278,8 @@ pub enum GatewayInfoToProtoError {
InvalidLocation(#[from] hextree::Error),
#[error("created_at is None")]
CreatedAtIsNone,
#[error("refreshed_at is None")]
RefreshedAtIsNone,
#[error("updated_at is None")]
UpdatedAtIsNone,
}

impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
Expand All @@ -289,6 +303,10 @@ impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
.created_at
.ok_or(GatewayInfoToProtoError::CreatedAtIsNone)?
.timestamp() as u64,
updated_at: info
.updated_at
.ok_or(GatewayInfoToProtoError::UpdatedAtIsNone)?
.timestamp() as u64,
})
}
}
Expand Down Expand Up @@ -351,7 +369,7 @@ pub(crate) mod db {
};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::{collections::HashSet, str::FromStr};
use std::{collections::HashMap, str::FromStr};

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type,
Expand All @@ -363,7 +381,10 @@ pub(crate) mod db {
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1";
"SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1";

const GET_UPDATED_AT: &str =
"SELECT last_changed_at FROM mobile_radio_tracker WHERE entity_key = $1";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
Expand All @@ -373,23 +394,36 @@ pub(crate) mod db {
pub async fn get_updated_radios(
db: impl PgExecutor<'_>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashSet<PublicKeyBinary>> {
) -> anyhow::Result<HashMap<PublicKeyBinary, DateTime<Utc>>> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.map_err(anyhow::Error::from)
.try_fold(
HashSet::new(),
|mut set: HashSet<PublicKeyBinary>, row| async move {
HashMap::new(),
|mut map: HashMap<PublicKeyBinary, DateTime<Utc>>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
set.insert(PublicKeyBinary::from_str(&entity_key)?);
Ok(set)
let updated_at = row.get::<DateTime<Utc>, &str>("last_changed_at");
map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at);
Ok(map)
},
)
.await
}

pub async fn get_updated_at(
db: impl PgExecutor<'_>,
address: &PublicKeyBinary,
) -> anyhow::Result<Option<DateTime<Utc>>> {
let entity_key = bs58::decode(address.to_string()).into_vec()?;
sqlx::query_scalar(GET_UPDATED_AT)
.bind(entity_key)
.fetch_optional(db)
.await
.map_err(anyhow::Error::from)
}

pub async fn get_info(
db: impl PgExecutor<'_>,
address: &PublicKeyBinary,
Expand Down Expand Up @@ -471,6 +505,7 @@ pub(crate) mod db {
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
let created_at = row.get::<DateTime<Utc>, &str>("created_at");
let refreshed_at = row.get::<Option<DateTime<Utc>>, &str>("refreshed_at");

Ok(Self {
address: PublicKeyBinary::from_str(
Expand All @@ -480,6 +515,10 @@ pub(crate) mod db {
metadata,
device_type,
created_at: Some(created_at),
refreshed_at,
// The updated_at field should be determined by considering the last_changed_at
// value from the mobile_radio_tracker table.
updated_at: None,
})
}
}
Expand Down
116 changes: 90 additions & 26 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
use chrono::{TimeZone, Utc};
use chrono::{DateTime, TimeZone, Utc};
use file_store::traits::{MsgVerify, TimestampEncode};
use futures::{
future,
Expand All @@ -15,12 +15,12 @@ use helium_proto::{
services::mobile_config::{
self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2,
GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1,
GatewayInfoStreamResV2,
GatewayInfoStreamResV2, GatewayInfoV2,
},
Message,
};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tonic::{Request, Response, Status};

pub struct GatewayService {
Expand Down Expand Up @@ -129,6 +129,12 @@ impl mobile_config::Gateway for GatewayService {
let pubkey: PublicKeyBinary = request.address.into();
tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)");

let updated_at = gateway_info::db::get_updated_at(&self.mobile_config_db_pool, &pubkey)
.await
.map_err(|_| {
Status::internal("error fetching updated_at field for gateway info (v2)")
})?;

gateway_info::db::get_info(&self.metadata_pool, &pubkey)
.await
.map_err(|_| Status::internal("error fetching gateway info (v2)"))?
Expand All @@ -137,15 +143,26 @@ impl mobile_config::Gateway for GatewayService {
telemetry::count_gateway_chain_lookup("not-found");
Err(Status::not_found(pubkey.to_string()))
},
|info| {
|mut info| {
if info.metadata.is_some() {
telemetry::count_gateway_chain_lookup("asserted");
} else {
telemetry::count_gateway_chain_lookup("not-asserted");
};
let info = info

// determine updated_at
if let Some(v) = updated_at {
info.updated_at = Some(v)
} else if info.refreshed_at.is_some() {
info.updated_at = info.refreshed_at;
} else {
info.updated_at = info.created_at;
}

let info: GatewayInfoV2 = info
.try_into()
.map_err(|_| Status::internal("error serializing gateway info (v2)"))?;

let mut res = GatewayInfoResV2 {
info: Some(info),
timestamp: Utc::now().encode_timestamp(),
Expand Down Expand Up @@ -212,7 +229,8 @@ impl mobile_config::Gateway for GatewayService {
"fetching gateways' info batch"
);

let pool = self.metadata_pool.clone();
let metadata_db_pool = self.metadata_pool.clone();
let mobile_config_db_pool = self.mobile_config_db_pool.clone();
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let addresses = request
Expand All @@ -224,7 +242,19 @@ impl mobile_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?;
let min_updated_at = DateTime::UNIX_EPOCH;
let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;

let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?;
let stream = stream
.filter_map(|gateway_info| {
future::ready(handle_updated_at(
gateway_info,
&updated_radios,
min_updated_at,
))
})
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down Expand Up @@ -291,32 +321,66 @@ impl mobile_config::Gateway for GatewayService {
);

tokio::spawn(async move {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;
let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types);
if request.min_updated_at > 0 {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

let updated_radios =
get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;
let stream = stream
.filter(|v| future::ready(updated_radios.contains(&v.address)))
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
} else {
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
}
let stream = stream
.filter_map(|gateway_info| {
future::ready(handle_updated_at(
gateway_info,
&updated_radios,
min_updated_at,
))
})
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Ok(Response::new(GrpcStreamResult::new(rx)))
}
}

fn handle_updated_at(
mut gateway_info: GatewayInfo,
updated_radios: &HashMap<PublicKeyBinary, chrono::DateTime<Utc>>,
min_updated_at: chrono::DateTime<Utc>,
) -> Option<GatewayInfo> {
// Check mobile_radio_tracker HashMap
if let Some(updated_at) = updated_radios.get(&gateway_info.address) {
// It could be already filtered by min_updated_at but recheck won't hurt
if updated_at >= &min_updated_at {
gateway_info.updated_at = Some(*updated_at);
return Some(gateway_info);
}
return None;
}
// Fallback solution #1. Try to use refreshed_at as updated_at field and check
// min_updated_at
if let Some(refreshed_at) = gateway_info.refreshed_at {
if refreshed_at >= min_updated_at {
gateway_info.updated_at = Some(refreshed_at);
return Some(gateway_info);
}
return None;
}
// Fallback solution #2. Try to use created_at as updated_at field and check
// min_updated_at
if let Some(created_at) = gateway_info.created_at {
if created_at >= min_updated_at {
gateway_info.updated_at = Some(created_at);
return Some(gateway_info);
}
return None;
}
None
}

trait GatewayInfoStreamRes {
type GatewayInfoType;
fn new(gateways: Vec<Self::GatewayInfoType>, timestamp: u64, signer: Vec<u8>) -> Self;
Expand Down
Loading