Skip to content

Commit

Permalink
verify routing keys of incoming data session requests
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jun 23, 2023
1 parent be6ba55 commit 96c355d
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 23 deletions.
4 changes: 2 additions & 2 deletions mobile_config/src/client/authorization_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl AuthorizationClient {
}

pub async fn verify_authorized_key(
&mut self,
&self,
pubkey: &PublicKeyBinary,
role: mobile_config::NetworkKeyRole,
) -> Result<bool, ClientError> {
Expand All @@ -53,7 +53,7 @@ impl AuthorizationClient {
};
request.signature = self.signing_key.sign(&request.encode_to_vec())?;
tracing::debug!(pubkey = pubkey.to_string(), role = ?role, "verifying authorized key registered");
let response = match self.client.verify(request).await {
let response = match self.client.clone().verify(request).await {
Ok(verify_res) => {
let response = verify_res.into_inner();
response.verify(&self.config_pubkey)?;
Expand Down
4 changes: 2 additions & 2 deletions mobile_config/src/client/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl gateway_info::GatewayInfoResolver for GatewayClient {
type Error = ClientError;

async fn resolve_gateway_info(
&mut self,
&self,
address: &PublicKeyBinary,
) -> Result<Option<gateway_info::GatewayInfo>, Self::Error> {
if let Some(cached_response) = self.cache.get(address).await {
Expand All @@ -60,7 +60,7 @@ impl gateway_info::GatewayInfoResolver for GatewayClient {
};
request.signature = self.signing_key.sign(&request.encode_to_vec())?;
tracing::debug!(pubkey = address.to_string(), "fetching gateway info");
let response = match self.client.info(request).await {
let response = match self.client.clone().info(request).await {
Ok(info_res) => {
let response = info_res.into_inner();
response.verify(&self.config_pubkey)?;
Expand Down
2 changes: 1 addition & 1 deletion mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait GatewayInfoResolver {
type Error;

async fn resolve_gateway_info(
&mut self,
&self,
address: &PublicKeyBinary,
) -> Result<Option<GatewayInfo>, Self::Error>;

Expand Down
52 changes: 42 additions & 10 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::{DateTime, Utc};
use file_store::mobile_session::DataTransferSessionIngestReport;
use file_store::mobile_session::{DataTransferSessionIngestReport, DataTransferSessionReq};
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::mobile_config::NetworkKeyRole;
use mobile_config::{
client::{ClientError, GatewayClient},
client::{AuthorizationClient, ClientError, GatewayClient},
gateway_info::GatewayInfoResolver,
};
use sqlx::{Postgres, Transaction};
Expand All @@ -20,25 +22,21 @@ pub enum AccumulationError {
}

pub async fn accumulate_sessions(
config_client: &mut GatewayClient,
gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
conn: &mut Transaction<'_, Postgres>,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> Result<(), AccumulationError> {
tokio::pin!(reports);

while let Some(DataTransferSessionIngestReport { report, .. }) = reports.next().await {
let event = report.data_transfer_usage;
// If the reward has been cancelled or we cannot resolve this gateway, skip the
// report
if report.reward_cancelled
|| config_client
.resolve_gateway_info(&event.pub_key)
.await?
.is_none()
{
if report.reward_cancelled || !verify_report(gateway_client, auth_client, &report).await {
continue;
}
let event = report.data_transfer_usage;
sqlx::query(
r#"
INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, first_timestamp, last_timestamp)
Expand All @@ -60,3 +58,37 @@ pub async fn accumulate_sessions(

Ok(())
}

async fn verify_report(
gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
report: &DataTransferSessionReq,
) -> bool {
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await {
return false;
};
if !verify_known_routing_key(auth_client, &report.pub_key).await {
return false;
};
true
}

async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool {
match gateway_client.resolve_gateway_info(public_key).await {
Ok(res) => res.is_some(),
Err(_err) => false,
}
}

async fn verify_known_routing_key(
auth_client: &AuthorizationClient,
public_key: &PublicKeyBinary,
) -> bool {
match auth_client
.verify_authorized_key(public_key, NetworkKeyRole::MobileRouter)
.await
{
Ok(res) => res,
Err(_err) => false,
}
}
18 changes: 11 additions & 7 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use file_store::{
FileSinkBuilder, FileStore, FileType,
};
use futures_util::TryFutureExt;
use mobile_config::GatewayClient;
use mobile_config::{client::AuthorizationClient, GatewayClient};
use solana::{SolanaNetwork, SolanaRpc};
use sqlx::{Pool, Postgres};
use tokio::{
Expand All @@ -22,7 +22,8 @@ pub struct Daemon<S> {
burner: Burner<S>,
reports: Receiver<FileInfoStream<DataTransferSessionIngestReport>>,
burn_period: Duration,
config_client: GatewayClient,
gateway_client: GatewayClient,
auth_client: AuthorizationClient,
}

impl<S> Daemon<S> {
Expand All @@ -31,14 +32,16 @@ impl<S> Daemon<S> {
pool: Pool<Postgres>,
reports: Receiver<FileInfoStream<DataTransferSessionIngestReport>>,
burner: Burner<S>,
config_client: GatewayClient,
gateway_client: GatewayClient,
auth_client: AuthorizationClient,
) -> Self {
Self {
pool,
burner,
reports,
burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64),
config_client,
gateway_client,
auth_client,
}
}
}
Expand All @@ -60,7 +63,7 @@ where
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&mut self.config_client, &mut transaction, ts, reports).await?;
crate::accumulate::accumulate_sessions(&self.gateway_client, &self.auth_client, &mut transaction, ts, reports).await?;
transaction.commit().await?;
},
_ = sleep_until(burn_time) => {
Expand Down Expand Up @@ -149,9 +152,10 @@ impl Cmd {
.start(shutdown_listener.clone())
.await?;

let config_client = GatewayClient::from_settings(&settings.config_client)?;
let gateway_client = GatewayClient::from_settings(&settings.config_client)?;
let auth_client = AuthorizationClient::from_settings(&settings.auth_client)?;

let daemon = Daemon::new(settings, pool, reports, burner, config_client);
let daemon = Daemon::new(settings, pool, reports, burner, gateway_client, auth_client);

tokio::try_join!(
source_join_handle.map_err(Error::from),
Expand Down
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Settings {
pub enable_solana_integration: bool,
pub solana: Option<solana::Settings>,
pub config_client: mobile_config::ClientSettings,
pub auth_client: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
}
Expand Down
1 change: 0 additions & 1 deletion mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ impl SpeedtestRollingAverage {

Ok(futures::stream::iter(speedtests.into_iter())
.then(move |(rolling_average, cell_speedtests)| {
let mut gateway_client = gateway_client.clone();
async move {
// If we get back some gateway info for the given address, it's a valid address
if gateway_client
Expand Down

0 comments on commit 96c355d

Please sign in to comment.