diff --git a/Cargo.lock b/Cargo.lock index 6914afbc8..534905b80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,7 +1106,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#9208f4f93afa9741f4bd52a1ea12073c6905d0a0" +source = "git+https://github.com/helium/proto?branch=andymck/add-invalid-mobile-packet-support#7255e8aefe8fb7dc022d8e0a9263435fe24b9798" dependencies = [ "base64 0.21.0", "byteorder", @@ -1116,7 +1116,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", ] @@ -2025,7 +2025,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "structopt", "thiserror", "tracing", @@ -2435,7 +2435,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "sqlx", "strum", "strum_macros", @@ -2853,7 +2853,7 @@ dependencies = [ "p256", "rand_core 0.6.4", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "signature", "solana-sdk", "sqlx", @@ -2863,7 +2863,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#9208f4f93afa9741f4bd52a1ea12073c6905d0a0" +source = "git+https://github.com/helium/proto?branch=andymck/add-invalid-mobile-packet-support#7255e8aefe8fb7dc022d8e0a9263435fe24b9798" dependencies = [ "bytes", "prost", @@ -3184,7 +3184,7 @@ dependencies = [ "prost", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", "tokio", "tonic", @@ -3319,7 +3319,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "sqlx", "thiserror", "tokio", @@ -3878,7 +3878,7 @@ dependencies = [ "poc-metrics", "prost", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "solana", "sqlx", "thiserror", @@ -3922,7 +3922,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "sqlx", "thiserror", "tokio", @@ -5114,7 +5114,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "sqlx", "thiserror", "tokio", @@ -5741,7 +5741,7 @@ dependencies = [ "helium-sub-daos", "metrics", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "solana-client", "solana-program", "solana-sdk", diff --git a/Cargo.toml b/Cargo.toml index 059bef8f7..9e8b64d8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,14 +57,14 @@ sqlx = {version = "0", features = [ ]} helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/add-invalid-mobile-packet-support", features = ["services"]} hextree = "*" solana-client = "1.14" solana-sdk = "1.14" solana-program = "1.11" spl-token = "3.5.0" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "andymck/add-invalid-mobile-packet-support" } humantime = "2" metrics = "0" metrics-exporter-prometheus = "0" diff --git a/file_store/src/cli/dump.rs b/file_store/src/cli/dump.rs index 5cf961ded..47d05b18f 100644 --- a/file_store/src/cli/dump.rs +++ b/file_store/src/cli/dump.rs @@ -3,7 +3,7 @@ use crate::{ file_source, heartbeat::{CellHeartbeat, CellHeartbeatIngestReport}, iot_packet::IotValidPacket, - mobile_session::DataTransferSessionIngestReport, + mobile_session::{DataTransferSessionIngestReport, InvalidDataTransferIngestReport}, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, traits::MsgDecode, FileType, Result, Settings, @@ -17,8 +17,8 @@ use helium_proto::{ poc_lora::{LoraBeaconIngestReportV1, LoraPocV1, LoraWitnessIngestReportV1}, poc_mobile::{ mobile_reward_share::Reward, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, - Heartbeat, MobileRewardShare, RadioRewardShare, SpeedtestAvg, SpeedtestIngestReportV1, - SpeedtestReqV1, + Heartbeat, InvalidDataTransferIngestReportV1, MobileRewardShare, RadioRewardShare, + SpeedtestAvg, SpeedtestIngestReportV1, SpeedtestReqV1, }, router::PacketRouterPacketReportV1, }, @@ -77,6 +77,23 @@ impl Cmd { "timestamp": dtr.report.data_transfer_usage.timestamp, }))?; } + FileType::InvalidDataTransferSessionIngestReport => { + let msg: InvalidDataTransferIngestReport = + InvalidDataTransferIngestReportV1::decode(msg)?.try_into()?; + print_json(&json!({ + "invalid_reason": msg.reason, + "received_timestamp": msg.report.received_timestamp, + "reward_cancelled": msg.report.report.reward_cancelled, + "hotspot_key": msg.report.report.data_transfer_usage.pub_key, + "routing_key": msg.report.report.pub_key, + "upload_bytes": msg.report.report.data_transfer_usage.upload_bytes, + "download_bytes": msg.report.report.data_transfer_usage.download_bytes, + "radio_access_technology": msg.report.report.data_transfer_usage.radio_access_technology, + "event_id": msg.report.report.data_transfer_usage.event_id, + "payer": msg.report.report.data_transfer_usage.payer, + "timestamp": msg.report.report.data_transfer_usage.timestamp, + }))?; + } FileType::IotBeaconIngestReport => { let dec_msg = LoraBeaconIngestReportV1::decode(msg)?; let json = json!({ diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 0b1cf9a56..743db98c4 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -120,6 +120,8 @@ pub const INVALID_PACKET: &str = "invalid_packet"; pub const NON_REWARDABLE_PACKET: &str = "non_rewardable_packet"; pub const IOT_REWARD_SHARE: &str = "iot_reward_share"; pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ingest_report"; +pub const INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT: &str = + "invalid_data_transfer_session_ingest_report"; pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; pub const PRICE_REPORT: &str = "price_report"; pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share"; @@ -150,6 +152,7 @@ pub enum FileType { NonRewardablePacket, IotRewardShare, DataTransferSessionIngestReport, + InvalidDataTransferSessionIngestReport, ValidDataTransferSession, PriceReport, MobileRewardShare, @@ -185,6 +188,9 @@ impl fmt::Display for FileType { Self::NonRewardablePacket => NON_REWARDABLE_PACKET, Self::IotRewardShare => IOT_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, + Self::InvalidDataTransferSessionIngestReport => { + INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT + } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, @@ -221,6 +227,9 @@ impl FileType { Self::NonRewardablePacket => NON_REWARDABLE_PACKET, Self::IotRewardShare => IOT_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, + Self::InvalidDataTransferSessionIngestReport => { + INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT + } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, @@ -255,6 +264,9 @@ impl FromStr for FileType { NON_REWARDABLE_PACKET => Self::NonRewardablePacket, IOT_REWARD_SHARE => Self::IotRewardShare, DATA_TRANSFER_SESSION_INGEST_REPORT => Self::DataTransferSessionIngestReport, + INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT => { + Self::InvalidDataTransferSessionIngestReport + } VALID_DATA_TRANSFER_SESSION => Self::ValidDataTransferSession, PRICE_REPORT => Self::PriceReport, MOBILE_REWARD_SHARE => Self::MobileRewardShare, diff --git a/file_store/src/mobile_session.rs b/file_store/src/mobile_session.rs index a75b08846..31216b44c 100644 --- a/file_store/src/mobile_session.rs +++ b/file_store/src/mobile_session.rs @@ -1,21 +1,17 @@ use crate::{ - traits::{MsgDecode, MsgTimestamp, TimestampDecode}, + error::DecodeError, + traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, Error, Result, }; use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology, - DataTransferSessionIngestReportV1, DataTransferSessionReqV1, + DataTransferEvent as DataTransferEventProto, DataTransferIngestReportStatus, + DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, + InvalidDataTransferIngestReportV1, }; use serde::Serialize; -impl MsgTimestamp>> for DataTransferSessionIngestReportV1 { - fn timestamp(&self) -> Result> { - self.received_timestamp.to_timestamp_millis() - } -} - #[derive(Serialize, Clone, Debug)] pub struct DataTransferSessionIngestReport { pub received_timestamp: DateTime, @@ -26,6 +22,18 @@ impl MsgDecode for DataTransferSessionIngestReport { type Msg = DataTransferSessionIngestReportV1; } +impl MsgTimestamp>> for DataTransferSessionIngestReportV1 { + fn timestamp(&self) -> Result> { + self.received_timestamp.to_timestamp_millis() + } +} + +impl MsgTimestamp for DataTransferSessionIngestReport { + fn timestamp(&self) -> u64 { + self.received_timestamp.encode_timestamp_millis() + } +} + impl TryFrom for DataTransferSessionIngestReport { type Error = Error; @@ -40,6 +48,69 @@ impl TryFrom for DataTransferSessionIngestRep } } +impl From for DataTransferSessionIngestReportV1 { + fn from(v: DataTransferSessionIngestReport) -> Self { + let received_timestamp = v.timestamp(); + let report: DataTransferSessionReqV1 = v.report.into(); + Self { + report: Some(report), + received_timestamp, + } + } +} + +#[derive(Serialize, Clone, Debug)] +pub struct InvalidDataTransferIngestReport { + pub report: DataTransferSessionIngestReport, + pub reason: DataTransferIngestReportStatus, + pub timestamp: DateTime, +} + +impl MsgDecode for InvalidDataTransferIngestReport { + type Msg = InvalidDataTransferIngestReportV1; +} + +impl MsgTimestamp>> for InvalidDataTransferIngestReportV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp_millis() + } +} + +impl MsgTimestamp for InvalidDataTransferIngestReport { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp_millis() + } +} + +impl TryFrom for InvalidDataTransferIngestReport { + type Error = Error; + fn try_from(v: InvalidDataTransferIngestReportV1) -> Result { + let reason = DataTransferIngestReportStatus::from_i32(v.reason).ok_or_else(|| { + DecodeError::unsupported_status_reason("invalid_data_transfer_session_reason", v.reason) + })?; + Ok(Self { + report: v + .report + .ok_or_else(|| Error::not_found("data transfer session ingest report"))? + .try_into()?, + timestamp: v.timestamp.to_timestamp()?, + reason, + }) + } +} + +impl From for InvalidDataTransferIngestReportV1 { + fn from(v: InvalidDataTransferIngestReport) -> Self { + let timestamp = v.timestamp(); + let report: DataTransferSessionIngestReportV1 = v.report.into(); + Self { + report: Some(report), + reason: v.reason as i32, + timestamp, + } + } +} + #[derive(Serialize, Clone, Debug)] pub struct DataTransferEvent { pub pub_key: PublicKeyBinary, @@ -58,6 +129,12 @@ impl MsgTimestamp>> for DataTransferEventProto { } } +impl MsgTimestamp for DataTransferEvent { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp_millis() + } +} + impl MsgDecode for DataTransferEvent { type Msg = DataTransferEventProto; } @@ -81,6 +158,22 @@ impl TryFrom for DataTransferEvent { } } +impl From for DataTransferEventProto { + fn from(v: DataTransferEvent) -> Self { + let timestamp = v.timestamp(); + Self { + pub_key: v.pub_key.into(), + upload_bytes: v.upload_bytes, + download_bytes: v.download_bytes, + radio_access_technology: v.radio_access_technology as i32, + event_id: v.event_id, + payer: v.payer.into(), + timestamp, + signature: v.signature, + } + } +} + #[derive(Serialize, Clone, Debug)] pub struct DataTransferSessionReq { pub data_transfer_usage: DataTransferEvent, @@ -108,3 +201,15 @@ impl TryFrom for DataTransferSessionReq { }) } } + +impl From for DataTransferSessionReqV1 { + fn from(v: DataTransferSessionReq) -> Self { + let report: DataTransferEventProto = v.data_transfer_usage.into(); + Self { + data_transfer_usage: Some(report), + reward_cancelled: v.reward_cancelled, + pub_key: v.pub_key.into(), + signature: v.signature, + } + } +} diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 72b7a901c..a302f50f1 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,8 +1,14 @@ use chrono::{DateTime, Utc}; -use file_store::mobile_session::{DataTransferSessionIngestReport, DataTransferSessionReq}; +use file_store::file_sink::FileSinkClient; +use file_store::mobile_session::{ + DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport, +}; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::NetworkKeyRole; +use helium_proto::services::poc_mobile::{ + DataTransferIngestReportStatus, InvalidDataTransferIngestReportV1, +}; use mobile_config::{ client::{AuthorizationClient, ClientError, GatewayClient}, gateway_info::GatewayInfoResolver, @@ -25,18 +31,23 @@ pub async fn accumulate_sessions( gateway_client: &GatewayClient, auth_client: &AuthorizationClient, conn: &mut Transaction<'_, Postgres>, + invalid_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, ) -> Result<(), AccumulationError> { tokio::pin!(reports); - while let Some(DataTransferSessionIngestReport { report, .. }) = reports.next().await { + while let Some(report) = reports.next().await { // If the reward has been cancelled or we cannot resolve this gateway, skip the // report - if report.reward_cancelled || !verify_report(gateway_client, auth_client, &report).await { + let report_validity = verify_report(gateway_client, auth_client, &report.report).await; + if report.report.reward_cancelled + || report_validity != DataTransferIngestReportStatus::Valid + { + write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; continue; } - let event = report.data_transfer_usage; + let event = report.report.data_transfer_usage; sqlx::query( r#" INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, first_timestamp, last_timestamp) @@ -63,14 +74,14 @@ async fn verify_report( gateway_client: &GatewayClient, auth_client: &AuthorizationClient, report: &DataTransferSessionReq, -) -> bool { +) -> DataTransferIngestReportStatus { if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await { - return false; + return DataTransferIngestReportStatus::InvalidGatewayKey; }; if !verify_known_routing_key(auth_client, &report.pub_key).await { - return false; + return DataTransferIngestReportStatus::InvalidRoutingKey; }; - true + DataTransferIngestReportStatus::Valid } async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool { @@ -92,3 +103,21 @@ async fn verify_known_routing_key( Err(_err) => false, } } + +async fn write_invalid_report( + invalid_data_session_report_sink: &FileSinkClient, + reason: DataTransferIngestReportStatus, + report: DataTransferSessionIngestReport, +) -> Result<(), file_store::Error> { + let proto: InvalidDataTransferIngestReportV1 = InvalidDataTransferIngestReport { + report, + reason, + timestamp: Utc::now(), + } + .into(); + + invalid_data_session_report_sink + .write(proto, &[("reason", reason.as_str_name())]) + .await?; + Ok(()) +} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index c6aaa7519..2bf27642f 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -3,6 +3,7 @@ use anyhow::{bail, Error, Result}; use chrono::{TimeZone, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, FileSinkBuilder, FileStore, FileType, @@ -24,6 +25,7 @@ pub struct Daemon { burn_period: Duration, gateway_client: GatewayClient, auth_client: AuthorizationClient, + invalid_data_session_report_sink: FileSinkClient, } impl Daemon { @@ -34,6 +36,7 @@ impl Daemon { burner: Burner, gateway_client: GatewayClient, auth_client: AuthorizationClient, + invalid_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -42,6 +45,7 @@ impl Daemon { burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64), gateway_client, auth_client, + invalid_data_session_report_sink, } } } @@ -63,8 +67,9 @@ where let ts = file.file_info.timestamp; let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.gateway_client, &self.auth_client, &mut transaction, ts, reports).await?; + crate::accumulate::accumulate_sessions(&self.gateway_client, &self.auth_client, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; transaction.commit().await?; + self.invalid_data_session_report_sink.commit().await?; }, _ = sleep_until(burn_time) => { // It's time to burn @@ -127,7 +132,7 @@ impl Cmd { let (valid_sessions, mut valid_sessions_server) = FileSinkBuilder::new( FileType::ValidDataTransferSession, store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"), + concat!(env!("CARGO_PKG_NAME"), "_valid_data_transfer_session"), shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) @@ -135,6 +140,17 @@ impl Cmd { .create() .await?; + let (invalid_sessions, mut invalid_sessions_server) = FileSinkBuilder::new( + FileType::InvalidDataTransferSessionIngestReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_data_transfer_session"), + shutdown_listener.clone(), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .create() + .await?; + let burner = Burner::new(valid_sessions, solana); let file_store = FileStore::from_settings(&settings.ingest).await?; @@ -155,11 +171,20 @@ impl Cmd { let gateway_client = GatewayClient::from_settings(&settings.config_client)?; let auth_client = AuthorizationClient::from_settings(&settings.auth_client)?; - let daemon = Daemon::new(settings, pool, reports, burner, gateway_client, auth_client); + let daemon = Daemon::new( + settings, + pool, + reports, + burner, + gateway_client, + auth_client, + invalid_sessions, + ); tokio::try_join!( source_join_handle.map_err(Error::from), valid_sessions_server.run().map_err(Error::from), + invalid_sessions_server.run().map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), daemon.run(&shutdown_listener).map_err(Error::from), conn_handler.map_err(Error::from),