Skip to content

Commit

Permalink
Merge pull request #567 from helium/jg/stream-locked-route-stuffs
Browse files Browse the repository at this point in the history
IoT Config locked route updates and shutdown listeners
  • Loading branch information
jeffgrunewald authored Jul 17, 2023
2 parents 213f103 + cc6886a commit 0f6dc3f
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 262 deletions.
30 changes: 18 additions & 12 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -44,6 +45,7 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -56,6 +58,7 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -275,18 +278,21 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

tokio::spawn(async move {
stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
)
.await
tokio::select! {
_ = shutdown_listener => (),
_ = stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
) => (),
}
});

Ok(Response::new(GrpcStreamResult::new(rx)))
Expand All @@ -313,24 +319,24 @@ async fn stream_all_gateways_info(
})
.collect();

let mut response = GatewayInfoStreamResV1 {
let mut gateway = GatewayInfoStreamResV1 {
gateways: gateway_infos,
timestamp,
signer: signer.clone(),
signature: vec![],
};

response = match signing_key.sign(&response.encode_to_vec()) {
gateway = match signing_key.sign(&gateway.encode_to_vec()) {
Ok(signature) => GatewayInfoStreamResV1 {
signature,
..response
..gateway
},
Err(_) => {
continue;
}
};

tx.send(Ok(response)).await?;
tx.send(Ok(gateway)).await?;
}
Ok(())
}
7 changes: 5 additions & 2 deletions iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ pub fn update_channel<T: Clone>() -> broadcast::Sender<T> {
update_tx
}

pub async fn broadcast_update<T>(
pub async fn broadcast_update<T: std::fmt::Debug>(
message: T,
sender: broadcast::Sender<T>,
) -> Result<(), broadcast::error::SendError<T>> {
while !enqueue_update(sender.len()) {
tokio::time::sleep(tokio::time::Duration::from_millis(25)).await
}
sender.send(message).map(|_| ())
sender.send(message).map(|_| ()).map_err(|err| {
tracing::error!(error = ?err, "failed to broadcast routing update");
err
})
}

fn enqueue_update(queue_size: usize) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
Expand All @@ -108,6 +109,7 @@ impl Daemon {
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand Down
107 changes: 42 additions & 65 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
admin::{AuthCache, KeyType},
helium_netids, lora_field, org,
broadcast_update, helium_netids, lora_field, org,
route::list_routes,
telemetry, verify_public_key, GrpcResult, Settings,
};
Expand All @@ -26,6 +26,7 @@ pub struct OrgService {
route_update_tx: broadcast::Sender<RouteStreamResV1>,
signing_key: Keypair,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -41,13 +42,15 @@ impl OrgService {
pool: Pool<Postgres>,
route_update_tx: broadcast::Sender<RouteStreamResV1>,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
route_update_tx,
signing_key: settings.signing_keypair()?,
delegate_updater,
shutdown,
})
}

Expand Down Expand Up @@ -111,6 +114,38 @@ impl OrgService {
.sign(response)
.map_err(|_| Status::internal("response signing error"))
}

async fn stream_org_routes_enable_disable(&self, oui: u64) -> Result<(), Status> {
let routes = list_routes(oui, &self.pool).await.map_err(|err| {
tracing::error!(org = oui, reason = ?err, "failed to list org routes for streaming update");
Status::internal(format!("error retrieving routes for updated org: {}", oui))
})?;
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id,
"all subscribers disconnected; org routes update incomplete"
);
break;
};
tracing::debug!(route_id, "route updated");
}
Ok(())
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -431,38 +466,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org disable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list org routes for streaming disable update"
);
Status::internal(format!(
"error retrieving routes for disabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route disabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down Expand Up @@ -499,38 +505,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org enable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list routes for streaming enable update"
);
Status::internal(format!(
"error retrieving routes for enabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route enabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down
59 changes: 30 additions & 29 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,28 @@ pub async fn create_route(

transaction.commit().await?;

if new_route.active && !new_route.locked {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = signing_key
.sign(&update.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
.and_then(|signature| {
update.signature = signature;
update_tx.send(update).map_err(|err| {
tracing::warn!("error broadcasting route stream response: {err:?}")
})
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx)
.map_err(|_| anyhow!("failed broadcasting route create"))
})
.await;

Ok(new_route)
}
Expand Down Expand Up @@ -213,15 +213,17 @@ pub async fn update_route(
signature: vec![],
};

_ = signing_key
.sign(&update_res.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
update_tx
.send(update_res)
.map_err(|err| tracing::warn!("error broadcasting route stream response: {err:?}"))
});
broadcast_update(update_res, update_tx)
.map_err(|_| anyhow!("failed broadcasting route update"))
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -523,7 +525,6 @@ pub fn active_route_stream<'a>(
select r.id, r.oui, r.net_id, r.max_copies, r.server_host, r.server_port, r.server_protocol_opts, r.active, r.ignore_empty_skf, o.locked
from routes r
join organizations o on r.oui = o.oui
where o.locked = false and r.active = true
group by r.id, o.locked
"#,
)
Expand Down
Loading

0 comments on commit 0f6dc3f

Please sign in to comment.