Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mobile config] Add v2 endpoints to GatewayInfoV2. Change min_refreshed_at to min_updated_at #910

Merged
merged 11 commits into from
Dec 17, 2024
52 changes: 34 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "min_update_at", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "min_update_at" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
34 changes: 25 additions & 9 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ pub(crate) mod db {
use futures::stream::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::str::FromStr;
use std::{collections::HashSet, str::FromStr};

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type,
Expand All @@ -369,15 +369,34 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL}
where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#);
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
}

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);
pub async fn get_updated_radios(
db: impl PgExecutor<'_>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashSet<PublicKeyBinary>> {
let rows: Vec<Vec<u8>> = sqlx::query_scalar(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch_all(db)
.await?;
let mut radios = HashSet::new();

for row in rows {
let entity_key_b: &[u8] = &row;
let entity_key = bs58::encode(entity_key_b).into_string();
let pk = PublicKeyBinary::from_str(&entity_key)?;
radios.insert(pk);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let rows: Vec<Vec<u8>> = sqlx::query_scalar(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch_all(db)
.await?;
let mut radios = HashSet::new();
for row in rows {
let entity_key_b: &[u8] = &row;
let entity_key = bs58::encode(entity_key_b).into_string();
let pk = PublicKeyBinary::from_str(&entity_key)?;
radios.insert(pk);
}
let radios: HashSet<PublicKeyBinary> = sqlx::query_scalar(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.await
.map(|row| {
let entity_key_b: &[u8] = &row;
let entity_key = bs58::encode(entity_key_b).into_string();
PublicKeyBinary::from_str(&entity_key)
})
.collect::<Result<HashSet<PublicKeyBinary>, helium_crypto::Error>>()?;

this probably necessitates importing StreamExt or similar but avoids the extra allocations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helium-crypto-rs provides a sqlx mapping for PublicKeyBinary https://github.com/helium/helium-crypto-rs/blob/main/src/public_key_binary.rs#L115

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @michaeldjeffrey thanks for information. The helium-crypto-rs provides a sqlx mapping for PublicKeyBinary which stored in database as string. In our case pubkey is stored as a bytea type. So when I try to use helium-crypto-rs
mapping like below

Ok(sqlx::query_as::<_, (PublicKeyBinary,)>(GET_UPDATED_RADIOS)
           .bind(min_updated_at)
           .fetch(db)
           .map_ok(|(entity_key,)| entity_key)
           .try_collect::<HashSet<_>>()
           .await?)

I've got an error: mismatched types; Rust type `helium_crypto::public_key_binary::PublicKeyBinary` (as SQL type `text`) is not compatible with SQL type BYTEA

To make it work we need to extend decode function in helium-crypto-rs https://github.com/helium/helium-crypto-rs/blob/main/src/public_key_binary.rs#L150 to make it support bytea. I think this is not in the scope of this PR and I'm not sure that it is worth since I have no information how often we store PublicKeyBinary in bytea type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No doubt. That's a good @madninja question. Anyways, carry on 👯


Ok(radios)
}

pub async fn get_info(
Expand Down Expand Up @@ -413,16 +432,13 @@ pub(crate) mod db {
pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
min_refreshed_at: DateTime<Utc>,
) -> impl Stream<Item = GatewayInfo> + 'a {
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT)
.bind(min_refreshed_at)
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(min_refreshed_at)
.bind(
device_types
.iter()
Expand Down
45 changes: 32 additions & 13 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{
gateway_info::{self, DeviceType, GatewayInfo},
gateway_info::{self, db::get_updated_radios, DeviceType, GatewayInfo},
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
use chrono::{DateTime, TimeZone, Utc};
use chrono::{TimeZone, Utc};
use file_store::traits::{MsgVerify, TimestampEncode};
use futures::{
future,
stream::{Stream, StreamExt, TryStreamExt},
TryFutureExt,
};
Expand All @@ -23,16 +24,23 @@ use tonic::{Request, Response, Status};

pub struct GatewayService {
key_cache: KeyCache,
mobile_config_db_pool: Pool<Postgres>,
metadata_pool: Pool<Postgres>,
signing_key: Arc<Keypair>,
}

impl GatewayService {
pub fn new(key_cache: KeyCache, metadata_pool: Pool<Postgres>, signing_key: Keypair) -> Self {
pub fn new(
key_cache: KeyCache,
metadata_pool: Pool<Postgres>,
signing_key: Keypair,
mobile_config_db_pool: Pool<Postgres>,
) -> Self {
Self {
key_cache,
metadata_pool,
signing_key: Arc::new(signing_key),
mobile_config_db_pool,
}
}

Expand Down Expand Up @@ -170,8 +178,7 @@ impl mobile_config::Gateway for GatewayService {
);

tokio::spawn(async move {
let stream =
gateway_info::db::all_info_stream(&pool, &device_types, DateTime::UNIX_EPOCH);
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand All @@ -191,27 +198,39 @@ impl mobile_config::Gateway for GatewayService {
self.verify_request_signature(&signer, &request)?;

let pool = self.metadata_pool.clone();
let mc_pool = self.mobile_config_db_pool.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original pool variable name made sense here as-is when it was the only database connection this function needed but in general i'd lean more towards the app's "owned" database connection more generically and the one "owned" by an outside service (the metadata db in this case) with more specificity the way we do in the config service's settings or else just give them both more specific names to avoid any confusion

let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;

let (tx, rx) = tokio::sync::mpsc::channel(100);

let device_types: Vec<DeviceType> = request.device_types().map(|v| v.into()).collect();
let min_refreshed_at = Utc
.timestamp_opt(request.min_refreshed_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

tracing::debug!(
"fetching all gateways' info (v2). Device types: {:?} ",
device_types
);

tokio::spawn(async move {
let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
if request.min_updated_at > 0 {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?;
let stream = stream
.filter(|v| future::ready(updated_redios.contains(&v.address)))
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
} else {
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
if request.min_updated_at > 0 {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;
let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?;
let stream = stream
.filter(|v| future::ready(updated_redios.contains(&v.address)))
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
} else {
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
}
let stream =
if request.min_updated_at > 0 {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;
let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?;
gateway_info::db::all_info_stream(&pool, &device_types)
.filter(|v| future::ready(updated_redios.contains(&v.address)))
.boxed();
} else {
gateway_info::db::all_info_stream(&pool, &device_types)
};
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await

i don't know that this really makes a difference; i wanted to get it down to a single invocation of the stream but i don't think we can fully clean that up unless we added a no-op filter on the branch that doesn't check updated at so feel free to ignore here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to do it in a similar way, but I'm facing incompatible types or borrow checker errors.

});

Ok(Response::new(GrpcStreamResult::new(rx)))
Expand Down
1 change: 1 addition & 0 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl Daemon {
key_cache.clone(),
metadata_pool.clone(),
settings.signing_keypair()?,
pool.clone(),
);
let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?);
let entity_svc = EntityService::new(
Expand Down
Loading