From 96c355db4e7014eb421c9826099c8ec4ecaba648 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 23 Jun 2023 10:58:35 +0100 Subject: [PATCH] verify routing keys of incoming data session requests --- .../src/client/authorization_client.rs | 4 +- mobile_config/src/client/gateway_client.rs | 4 +- mobile_config/src/gateway_info.rs | 2 +- mobile_packet_verifier/src/accumulate.rs | 52 +++++++++++++++---- mobile_packet_verifier/src/daemon.rs | 18 ++++--- mobile_packet_verifier/src/settings.rs | 1 + mobile_verifier/src/speedtests.rs | 1 - 7 files changed, 59 insertions(+), 23 deletions(-) diff --git a/mobile_config/src/client/authorization_client.rs b/mobile_config/src/client/authorization_client.rs index 5a350d5e9..41ded77de 100644 --- a/mobile_config/src/client/authorization_client.rs +++ b/mobile_config/src/client/authorization_client.rs @@ -37,7 +37,7 @@ impl AuthorizationClient { } pub async fn verify_authorized_key( - &mut self, + &self, pubkey: &PublicKeyBinary, role: mobile_config::NetworkKeyRole, ) -> Result { @@ -53,7 +53,7 @@ impl AuthorizationClient { }; request.signature = self.signing_key.sign(&request.encode_to_vec())?; tracing::debug!(pubkey = pubkey.to_string(), role = ?role, "verifying authorized key registered"); - let response = match self.client.verify(request).await { + let response = match self.client.clone().verify(request).await { Ok(verify_res) => { let response = verify_res.into_inner(); response.verify(&self.config_pubkey)?; diff --git a/mobile_config/src/client/gateway_client.rs b/mobile_config/src/client/gateway_client.rs index 38a7b6e73..cfca23d29 100644 --- a/mobile_config/src/client/gateway_client.rs +++ b/mobile_config/src/client/gateway_client.rs @@ -46,7 +46,7 @@ impl gateway_info::GatewayInfoResolver for GatewayClient { type Error = ClientError; async fn resolve_gateway_info( - &mut self, + &self, address: &PublicKeyBinary, ) -> Result, Self::Error> { if let Some(cached_response) = self.cache.get(address).await { @@ -60,7 +60,7 @@ impl gateway_info::GatewayInfoResolver for GatewayClient { }; request.signature = self.signing_key.sign(&request.encode_to_vec())?; tracing::debug!(pubkey = address.to_string(), "fetching gateway info"); - let response = match self.client.info(request).await { + let response = match self.client.clone().info(request).await { Ok(info_res) => { let response = info_res.into_inner(); response.verify(&self.config_pubkey)?; diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 5d724e847..2126d2efb 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -22,7 +22,7 @@ pub trait GatewayInfoResolver { type Error; async fn resolve_gateway_info( - &mut self, + &self, address: &PublicKeyBinary, ) -> Result, Self::Error>; diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 14745a062..72b7a901c 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,8 +1,10 @@ use chrono::{DateTime, Utc}; -use file_store::mobile_session::DataTransferSessionIngestReport; +use file_store::mobile_session::{DataTransferSessionIngestReport, DataTransferSessionReq}; use futures::{Stream, StreamExt}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::mobile_config::NetworkKeyRole; use mobile_config::{ - client::{ClientError, GatewayClient}, + client::{AuthorizationClient, ClientError, GatewayClient}, gateway_info::GatewayInfoResolver, }; use sqlx::{Postgres, Transaction}; @@ -20,7 +22,8 @@ pub enum AccumulationError { } pub async fn accumulate_sessions( - config_client: &mut GatewayClient, + gateway_client: &GatewayClient, + auth_client: &AuthorizationClient, conn: &mut Transaction<'_, Postgres>, curr_file_ts: DateTime, reports: impl Stream, @@ -28,17 +31,12 @@ pub async fn accumulate_sessions( tokio::pin!(reports); while let Some(DataTransferSessionIngestReport { report, .. }) = reports.next().await { - let event = report.data_transfer_usage; // If the reward has been cancelled or we cannot resolve this gateway, skip the // report - if report.reward_cancelled - || config_client - .resolve_gateway_info(&event.pub_key) - .await? - .is_none() - { + if report.reward_cancelled || !verify_report(gateway_client, auth_client, &report).await { continue; } + let event = report.data_transfer_usage; sqlx::query( r#" INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, first_timestamp, last_timestamp) @@ -60,3 +58,37 @@ pub async fn accumulate_sessions( Ok(()) } + +async fn verify_report( + gateway_client: &GatewayClient, + auth_client: &AuthorizationClient, + report: &DataTransferSessionReq, +) -> bool { + if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await { + return false; + }; + if !verify_known_routing_key(auth_client, &report.pub_key).await { + return false; + }; + true +} + +async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool { + match gateway_client.resolve_gateway_info(public_key).await { + Ok(res) => res.is_some(), + Err(_err) => false, + } +} + +async fn verify_known_routing_key( + auth_client: &AuthorizationClient, + public_key: &PublicKeyBinary, +) -> bool { + match auth_client + .verify_authorized_key(public_key, NetworkKeyRole::MobileRouter) + .await + { + Ok(res) => res, + Err(_err) => false, + } +} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index b2e90d304..c6aaa7519 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -8,7 +8,7 @@ use file_store::{ FileSinkBuilder, FileStore, FileType, }; use futures_util::TryFutureExt; -use mobile_config::GatewayClient; +use mobile_config::{client::AuthorizationClient, GatewayClient}; use solana::{SolanaNetwork, SolanaRpc}; use sqlx::{Pool, Postgres}; use tokio::{ @@ -22,7 +22,8 @@ pub struct Daemon { burner: Burner, reports: Receiver>, burn_period: Duration, - config_client: GatewayClient, + gateway_client: GatewayClient, + auth_client: AuthorizationClient, } impl Daemon { @@ -31,14 +32,16 @@ impl Daemon { pool: Pool, reports: Receiver>, burner: Burner, - config_client: GatewayClient, + gateway_client: GatewayClient, + auth_client: AuthorizationClient, ) -> Self { Self { pool, burner, reports, burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64), - config_client, + gateway_client, + auth_client, } } } @@ -60,7 +63,7 @@ where let ts = file.file_info.timestamp; let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&mut self.config_client, &mut transaction, ts, reports).await?; + crate::accumulate::accumulate_sessions(&self.gateway_client, &self.auth_client, &mut transaction, ts, reports).await?; transaction.commit().await?; }, _ = sleep_until(burn_time) => { @@ -149,9 +152,10 @@ impl Cmd { .start(shutdown_listener.clone()) .await?; - let config_client = GatewayClient::from_settings(&settings.config_client)?; + let gateway_client = GatewayClient::from_settings(&settings.config_client)?; + let auth_client = AuthorizationClient::from_settings(&settings.auth_client)?; - let daemon = Daemon::new(settings, pool, reports, burner, config_client); + let daemon = Daemon::new(settings, pool, reports, burner, gateway_client, auth_client); tokio::try_join!( source_join_handle.map_err(Error::from), diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 0ee241cf4..4745f8a74 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -22,6 +22,7 @@ pub struct Settings { pub enable_solana_integration: bool, pub solana: Option, pub config_client: mobile_config::ClientSettings, + pub auth_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] pub start_after: u64, } diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index a8dfdd143..918af24b3 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -188,7 +188,6 @@ impl SpeedtestRollingAverage { Ok(futures::stream::iter(speedtests.into_iter()) .then(move |(rolling_average, cell_speedtests)| { - let mut gateway_client = gateway_client.clone(); async move { // If we get back some gateway info for the given address, it's a valid address if gateway_client