Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move denylist check from loader to the runner verification list #588

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions denylist/src/denylist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{client::DenyListClient, models::metadata::Asset, Error, Result};
use bytes::Buf;
use helium_crypto::{PublicKey, Verify};
use helium_crypto::{PublicKey, PublicKeyBinary, Verify};
use serde::Serialize;
use std::{fs, hash::Hasher, path, str::FromStr};
use twox_hash::XxHash64;
Expand All @@ -25,6 +25,20 @@ pub struct DenyList {
pub filter: Xor32,
}

impl TryFrom<Vec<PublicKeyBinary>> for DenyList {
type Error = Error;
fn try_from(v: Vec<PublicKeyBinary>) -> Result<Self> {
let keys: Vec<u64> = v.into_iter().map(public_key_hash).collect();
let filter = Xor32::from(&keys);
let client = DenyListClient::new()?;
Ok(Self {
tag_name: 0,
client,
filter,
})
}
}

impl DenyList {
pub fn new() -> Result<Self> {
tracing::debug!("initializing new denylist");
Expand Down Expand Up @@ -89,7 +103,7 @@ impl DenyList {
Ok(())
}

pub async fn check_key<K: AsRef<[u8]>>(&self, pub_key: K) -> bool {
pub fn check_key<K: AsRef<[u8]>>(&self, pub_key: K) -> bool {
if self.filter.len() == 0 {
tracing::warn!("empty denylist filter, rejecting key");
return true;
Expand Down
12 changes: 12 additions & 0 deletions file_store/src/iot_invalid_poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct IotInvalidBeaconReport {
pub received_timestamp: DateTime<Utc>,
pub reason: InvalidReason,
pub report: IotBeaconReport,
pub location: Option<u64>,
pub gain: i32,
pub elevation: i32,
}

#[derive(Serialize, Clone)]
Expand Down Expand Up @@ -75,6 +78,9 @@ impl TryFrom<LoraInvalidBeaconReportV1> for IotInvalidBeaconReport {
.report
.ok_or_else(|| Error::not_found("iot invalid beacon report v1"))?
.try_into()?,
location: v.location.parse().ok(),
gain: v.gain,
elevation: v.elevation,
})
}
}
Expand All @@ -87,6 +93,12 @@ impl From<IotInvalidBeaconReport> for LoraInvalidBeaconReportV1 {
received_timestamp,
reason: v.reason as i32,
report: Some(report),
location: v
.location
.map(|l| l.to_string())
.unwrap_or_else(String::new),
gain: v.gain,
elevation: v.elevation,
}
}
}
Expand Down
56 changes: 1 addition & 55 deletions iot_verifier/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use chrono::DateTime;
use chrono::{Duration as ChronoDuration, Utc};
use denylist::DenyList;
use file_store::{
iot_beacon_report::IotBeaconIngestReport,
iot_witness_report::IotWitnessIngestReport,
Expand All @@ -17,7 +16,7 @@ use file_store::{
use futures::{stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{hash::Hasher, ops::DerefMut, time::Duration};
use std::{hash::Hasher, ops::DerefMut};
use tokio::{
sync::Mutex,
time::{self, MissedTickBehavior},
Expand All @@ -34,9 +33,6 @@ pub struct Loader {
window_width: ChronoDuration,
ingestor_rollup_time: ChronoDuration,
max_lookback_age: ChronoDuration,
deny_list_latest_url: String,
deny_list_trigger_interval: Duration,
deny_list: DenyList,
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -45,13 +41,10 @@ pub enum NewLoaderError {
FileStoreError(#[from] file_store::Error),
#[error("db_store error: {0}")]
DbStoreError(#[from] db_store::Error),
#[error("denylist error: {0}")]
DenyListError(#[from] denylist::Error),
}

pub enum ValidGatewayResult {
Valid,
Denied,
Unknown,
}

Expand All @@ -63,17 +56,13 @@ impl Loader {
let window_width = settings.poc_loader_window_width();
let ingestor_rollup_time = settings.ingestor_rollup_time();
let max_lookback_age = settings.loader_window_max_lookback_age();
let deny_list = DenyList::new()?;
Ok(Self {
pool,
ingest_store,
poll_time,
window_width,
ingestor_rollup_time,
max_lookback_age,
deny_list_latest_url: settings.denylist.denylist_url.clone(),
deny_list_trigger_interval: settings.denylist.trigger_interval(),
deny_list,
})
}

Expand All @@ -85,21 +74,12 @@ impl Loader {
tracing::info!("started verifier loader");
let mut report_timer = time::interval(self.poll_time);
report_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut denylist_timer = time::interval(self.deny_list_trigger_interval);
denylist_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if shutdown.is_triggered() {
break;
}
tokio::select! {
_ = shutdown.clone() => break,
_ = denylist_timer.tick() =>
match self.handle_denylist_tick().await {
Ok(()) => (),
Err(err) => {
tracing::error!("fatal loader error, denylist_tick triggered: {err:?}");
}
},
_ = report_timer.tick() => match self.handle_report_tick(gateway_cache).await {
Ok(()) => (),
Err(err) => {
Expand All @@ -112,23 +92,6 @@ impl Loader {
Ok(())
}

async fn handle_denylist_tick(&mut self) -> anyhow::Result<()> {
tracing::info!("handling denylist tick");
// sink any errors whilst updating the denylist
// the verifier should not stop just because github
// could not be reached for example
match self
.deny_list
.update_to_latest(&self.deny_list_latest_url)
.await
{
Ok(()) => (),
Err(e) => tracing::warn!("failed to update denylist: {e}"),
}
tracing::info!("completed handling denylist tick");
Ok(())
}

async fn handle_report_tick(&self, gateway_cache: &GatewayCache) -> anyhow::Result<()> {
tracing::info!("handling report tick");
let now = Utc::now();
Expand Down Expand Up @@ -375,11 +338,6 @@ impl Loader {
};
Ok(Some(res))
}
ValidGatewayResult::Denied => {
metrics.increment_beacons_denied();
Ok(None)
}

ValidGatewayResult::Unknown => {
metrics.increment_beacons_unknown();
Ok(None)
Expand Down Expand Up @@ -410,10 +368,6 @@ impl Loader {
metrics.increment_witnesses();
Ok(Some(res))
}
ValidGatewayResult::Denied => {
metrics.increment_witnesses_denied();
Ok(None)
}
ValidGatewayResult::Unknown => {
metrics.increment_witnesses_unknown();
Ok(None)
Expand Down Expand Up @@ -445,10 +399,6 @@ impl Loader {
pub_key: &PublicKeyBinary,
gateway_cache: &GatewayCache,
) -> ValidGatewayResult {
if self.check_gw_denied(pub_key).await {
tracing::debug!("dropping denied gateway : {:?}", &pub_key);
return ValidGatewayResult::Denied;
}
if self.check_unknown_gw(pub_key, gateway_cache).await {
tracing::debug!("dropping unknown gateway: {:?}", &pub_key);
return ValidGatewayResult::Unknown;
Expand All @@ -463,10 +413,6 @@ impl Loader {
) -> bool {
gateway_cache.resolve_gateway_info(pub_key).await.is_err()
}

async fn check_gw_denied(&self, pub_key: &PublicKeyBinary) -> bool {
self.deny_list.check_key(pub_key).await
}
}

fn filter_key_hash(data: &[u8]) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Server {

// init da processes
let mut loader = loader::Loader::from_settings(settings, pool.clone()).await?;
let mut runner = runner::Runner::from_settings(settings, pool.clone()).await?;
let mut runner = runner::Runner::new(settings, pool.clone()).await?;
let purger = purger::Purger::from_settings(settings, pool.clone()).await?;
let mut density_scaler =
DensityScaler::from_settings(settings, pool, gateway_updater_receiver.clone()).await?;
Expand Down
Loading