Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mobile config] Add v2 endpoints to GatewayInfoV2. Change min_refreshed_at to min_updated_at #910

Merged
merged 11 commits into from
Dec 17, 2024
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 30 additions & 29 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub struct GatewayInfo {
pub metadata: Option<GatewayMetadata>,
pub device_type: DeviceType,
// None for V1
pub refreshed_at: Option<DateTime<Utc>>,
pub created_at: Option<DateTime<Utc>>,
}

Expand Down Expand Up @@ -135,7 +134,6 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
metadata,
device_type: _,
created_at,
refreshed_at,
} = info;

let metadata = if let Some(metadata) = metadata {
Expand All @@ -154,16 +152,11 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
.single()
.ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?;

let refreshed_at = Utc.timestamp_opt(refreshed_at as i64, 0).single().ok_or(
GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at),
)?;

Ok(Self {
address: address.into(),
metadata,
device_type: device_type_,
created_at: Some(created_at),
refreshed_at: Some(refreshed_at),
})
}
}
Expand Down Expand Up @@ -196,7 +189,6 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
metadata,
device_type: device_type_,
created_at: None,
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -297,10 +289,6 @@ impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
.created_at
.ok_or(GatewayInfoToProtoError::CreatedAtIsNone)?
.timestamp() as u64,
refreshed_at: info
.refreshed_at
.ok_or(GatewayInfoToProtoError::RefreshedAtIsNone)?
.timestamp() as u64,
})
}
}
Expand Down Expand Up @@ -357,10 +345,13 @@ pub(crate) mod db {
use super::{DeviceType, GatewayInfo, GatewayMetadata};
use crate::gateway_info::DeploymentInfo;
use chrono::{DateTime, Utc};
use futures::stream::{Stream, StreamExt};
use futures::{
stream::{Stream, StreamExt},
TryStreamExt,
};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::str::FromStr;
use std::{collections::HashSet, str::FromStr};

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type,
Expand All @@ -369,15 +360,34 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL}
where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#);

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
}

pub async fn get_updated_radios(
db: impl PgExecutor<'_>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashSet<PublicKeyBinary>> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.map_err(anyhow::Error::from)
.try_fold(
HashSet::new(),
|mut set: HashSet<PublicKeyBinary>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
set.insert(PublicKeyBinary::from_str(&entity_key)?);
Ok(set)
},
)
.await
}

pub async fn get_info(
Expand Down Expand Up @@ -413,16 +423,13 @@ pub(crate) mod db {
pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
min_refreshed_at: DateTime<Utc>,
) -> impl Stream<Item = GatewayInfo> + 'a {
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT)
.bind(min_refreshed_at)
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(min_refreshed_at)
.bind(
device_types
.iter()
Expand Down Expand Up @@ -464,11 +471,6 @@ pub(crate) mod db {
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
let created_at = row.get::<DateTime<Utc>, &str>("created_at");
// `refreshed_at` can be NULL in the database schema.
// If so, fallback to using `created_at` as the default value of `refreshed_at`.
let refreshed_at = row
.get::<Option<DateTime<Utc>>, &str>("refreshed_at")
.unwrap_or(created_at);

Ok(Self {
address: PublicKeyBinary::from_str(
Expand All @@ -477,7 +479,6 @@ pub(crate) mod db {
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?,
metadata,
device_type,
refreshed_at: Some(refreshed_at),
created_at: Some(created_at),
})
}
Expand Down
Loading