Skip to content

Commit

Permalink
remove delete-all eui/devaddr methods, better streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Mar 2, 2023
1 parent 68eadc8 commit 3c82891
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 170 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.3", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "jg/config-admin-svc", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
hextree = "*"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = {git = "https://github.com/helium/gateway-rs.git", branch = "jg/config-admin-svc"}
beacon = {git = "https://github.com/helium/gateway-rs.git", branch = "main"}
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
9 changes: 3 additions & 6 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use helium_proto::services::{
iot_config::{
AdminAddKeyReqV1, AdminLoadRegionReqV1, AdminRemoveKeyReqV1, GatewayLocationReqV1,
GatewayRegionParamsReqV1, OrgCreateHeliumReqV1, OrgCreateRoamerReqV1, OrgDisableReqV1,
OrgEnableReqV1, RouteCreateReqV1, RouteDeleteDevaddrRangesReqV1, RouteDeleteEuisReqV1,
RouteDeleteReqV1, RouteGetDevaddrRangesReqV1, RouteGetEuisReqV1, RouteGetReqV1,
RouteListReqV1, RouteStreamReqV1, RouteUpdateDevaddrRangesReqV1, RouteUpdateEuisReqV1,
RouteUpdateReqV1,
OrgEnableReqV1, RouteCreateReqV1, RouteDeleteReqV1, RouteGetDevaddrRangesReqV1,
RouteGetEuisReqV1, RouteGetReqV1, RouteListReqV1, RouteStreamReqV1,
RouteUpdateDevaddrRangesReqV1, RouteUpdateEuisReqV1, RouteUpdateReqV1,
},
poc_lora::{LoraBeaconReportReqV1, LoraWitnessReportReqV1},
};
Expand Down Expand Up @@ -50,11 +49,9 @@ impl_msg_verify!(RouteCreateReqV1, signature);
impl_msg_verify!(RouteUpdateReqV1, signature);
impl_msg_verify!(RouteDeleteReqV1, signature);
impl_msg_verify!(RouteGetEuisReqV1, signature);
impl_msg_verify!(RouteDeleteEuisReqV1, signature);
impl_msg_verify!(RouteUpdateEuisReqV1, signature);
impl_msg_verify!(RouteGetDevaddrRangesReqV1, signature);
impl_msg_verify!(RouteUpdateDevaddrRangesReqV1, signature);
impl_msg_verify!(RouteDeleteDevaddrRangesReqV1, signature);
impl_msg_verify!(GatewayLocationReqV1, signature);
impl_msg_verify!(GatewayRegionParamsReqV1, signature);
impl_msg_verify!(AdminAddKeyReqV1, signature);
Expand Down
2 changes: 1 addition & 1 deletion iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl iot_config::Org for OrgService {
if self
.route_update_tx
.send(RouteStreamResV1 {
action: ActionV1::Remove.into(),
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
})
.is_err()
Expand Down
104 changes: 15 additions & 89 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::{types::Uuid, Row};
use std::collections::BTreeMap;
use tokio::{
pin,
sync::broadcast::{self, Sender},
};
use tokio::sync::broadcast::{self, Sender};

pub mod proto {
pub use helium_proto::{
Expand Down Expand Up @@ -150,11 +147,6 @@ pub async fn update_route(

let mut transaction = db.begin().await?;

let was_active = sqlx::query_scalar::<_, bool>(r#"select active from routes where id = $1"#)
.bind(uuid)
.fetch_one(&mut transaction)
.await?;

sqlx::query(
r#"
update routes
Expand All @@ -175,21 +167,12 @@ pub async fn update_route(

transaction.commit().await?;

// if the route is not locked at the org level and it was or is now active, update downstream HPRs
if !updated_route.locked && (was_active || updated_route.active) {
let action = if updated_route.active {
proto::ActionV1::Add.into()
} else {
proto::ActionV1::Remove.into()
};

_ = update_tx.send(proto::RouteStreamResV1 {
action,
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
});
};
_ = update_tx.send(proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
});

Ok(updated_route)
}
Expand Down Expand Up @@ -291,33 +274,6 @@ pub async fn update_euis(
Ok(())
}

pub async fn delete_euis<'a>(
id: &str,
db: impl sqlx::PgExecutor<'a> + 'a + Copy,
update_tx: Sender<proto::RouteStreamResV1>,
) -> Result<(), RouteStorageError> {
let euis = list_euis_for_route(id, db)?;
pin!(euis);

while let Some(removed) = euis.next().await {
let update = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::EuiPair(removed.into())),
};
if update_tx.send(update).is_err() {
break;
}
}

let id = Uuid::try_parse(id)?;
sqlx::query(" delete from route_eui_pairs where route_id = $1 ")
.bind(id)
.execute(db)
.await?;

Ok(())
}

async fn insert_devaddr_ranges(
ranges: &[DevAddrRange],
db: impl sqlx::PgExecutor<'_>,
Expand Down Expand Up @@ -420,35 +376,6 @@ pub async fn update_devaddr_ranges(
Ok(())
}

pub async fn delete_devaddr_ranges<'a>(
id: &str,
db: impl sqlx::PgExecutor<'a> + 'a + Copy,
update_tx: Sender<proto::RouteStreamResV1>,
) -> Result<(), RouteStorageError> {
let devaddr_ranges = list_devaddr_ranges_for_route(id, db)?;
pin!(devaddr_ranges);

while let Some(removed) = devaddr_ranges.next().await {
let update = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::DevaddrRange(
removed.into(),
)),
};
if update_tx.send(update).is_err() {
break;
}
}

let id = Uuid::try_parse(id)?;
sqlx::query(" delete from route_devaddr_ranges where route_id = $1 ")
.bind(id)
.execute(db)
.await?;

Ok(())
}

pub async fn list_routes(
oui: u64,
db: impl sqlx::PgExecutor<'_>,
Expand Down Expand Up @@ -482,7 +409,7 @@ pub async fn list_routes(
pub fn list_euis_for_route<'a>(
id: &str,
db: impl sqlx::PgExecutor<'a> + 'a + Copy,
) -> Result<impl Stream<Item = EuiPair> + 'a, RouteStorageError> {
) -> Result<impl Stream<Item = Result<EuiPair, sqlx::Error>> + 'a, RouteStorageError> {
let id = Uuid::try_parse(id)?;
const EUI_SELECT_SQL: &str = r#"
select eui.route_id, eui.app_eui, eui.dev_eui
Expand All @@ -493,15 +420,13 @@ pub fn list_euis_for_route<'a>(
Ok(sqlx::query_as::<_, EuiPair>(EUI_SELECT_SQL)
.bind(id)
.fetch(db)
.map_err(RouteStorageError::from)
.filter_map(|eui| async move { eui.ok() })
.boxed())
}

pub fn list_devaddr_ranges_for_route<'a>(
id: &str,
db: impl sqlx::PgExecutor<'a> + 'a,
) -> Result<impl Stream<Item = DevAddrRange> + 'a, RouteStorageError> {
) -> Result<impl Stream<Item = Result<DevAddrRange, sqlx::Error>> + 'a, RouteStorageError> {
let id = Uuid::try_parse(id)?;
const DEVADDR_RANGE_SELECT_SQL: &str = r#"
select devaddr.route_id, devaddr.start_addr, devaddr.end_addr
Expand All @@ -512,12 +437,10 @@ pub fn list_devaddr_ranges_for_route<'a>(
Ok(sqlx::query_as::<_, DevAddrRange>(DEVADDR_RANGE_SELECT_SQL)
.bind(id)
.fetch(db)
.map_err(RouteStorageError::from)
.filter_map(|devaddr| async move { devaddr.ok() })
.boxed())
}

pub async fn active_route_stream<'a>(
pub fn active_route_stream<'a>(
db: impl sqlx::PgExecutor<'a> + 'a,
) -> impl Stream<Item = Route> + 'a {
sqlx::query_as::<_, StorageRoute>(
Expand All @@ -541,9 +464,10 @@ pub async fn active_route_stream<'a>(
locked: route.locked,
})})
.filter_map(|route| async move { route.ok() })
.boxed()
}

pub async fn eui_stream<'a>(
pub fn eui_stream<'a>(
db: impl sqlx::PgExecutor<'a> + 'a + Copy,
) -> impl Stream<Item = EuiPair> + 'a {
sqlx::query_as::<_, EuiPair>(
Expand All @@ -555,9 +479,10 @@ pub async fn eui_stream<'a>(
.fetch(db)
.map_err(sqlx::Error::from)
.filter_map(|eui| async move { eui.ok() })
.boxed()
}

pub async fn devaddr_range_stream<'a>(
pub fn devaddr_range_stream<'a>(
db: impl sqlx::PgExecutor<'a> + 'a + Copy,
) -> impl Stream<Item = DevAddrRange> + 'a {
sqlx::query_as::<_, DevAddrRange>(
Expand All @@ -569,6 +494,7 @@ pub async fn devaddr_range_stream<'a>(
.fetch(db)
.map_err(sqlx::Error::from)
.filter_map(|devaddr| async move { devaddr.ok() })
.boxed()
}

pub async fn get_route(
Expand Down
Loading

0 comments on commit 3c82891

Please sign in to comment.