Skip to content

Commit

Permalink
output paper trail for verified subscriber location reports
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jun 21, 2023
1 parent 8eb3c8a commit 6fda679
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 42 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/verified-subscriber-report", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/verified-subscriber-report" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
8 changes: 8 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl FileInfo {

pub const SUBSCRIBER_LOCATION_REQ: &str = "subscriber_location_req";
pub const SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "subscriber_location_report";
pub const VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "verified_subscriber_location_report";
pub const CELL_HEARTBEAT: &str = "cell_heartbeat";
pub const CELL_SPEEDTEST: &str = "cell_speedtest";
pub const CELL_HEARTBEAT_INGEST_REPORT: &str = "heartbeat_report";
Expand Down Expand Up @@ -155,13 +156,17 @@ pub enum FileType {
MobileRewardShare,
SubscriberLocationReq,
SubscriberLocationIngestReport,
VerifiedSubscriberLocationIngestReport,
}

impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::SubscriberLocationReq => SUBSCRIBER_LOCATION_REQ,
Self::SubscriberLocationIngestReport => SUBSCRIBER_LOCATION_INGEST_REPORT,
Self::VerifiedSubscriberLocationIngestReport => {
VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Expand Down Expand Up @@ -198,6 +203,9 @@ impl FileType {
match self {
Self::SubscriberLocationReq => SUBSCRIBER_LOCATION_REQ,
Self::SubscriberLocationIngestReport => SUBSCRIBER_LOCATION_INGEST_REPORT,
Self::VerifiedSubscriberLocationIngestReport => {
VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Expand Down
102 changes: 97 additions & 5 deletions file_store/src/mobile_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::{
traits::{MsgDecode, MsgTimestamp, TimestampDecode},
error::DecodeError,
traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
SubscriberLocationIngestReportV1, SubscriberLocationReqV1,
SubscriberLocationIngestReportV1, SubscriberLocationReqV1, SubscriberReportVerificationStatus,
VerifiedSubscriberLocationIngestReportV1,
};
use serde::{Deserialize, Serialize};

Expand All @@ -22,6 +24,13 @@ pub struct SubscriberLocationIngestReport {
pub report: SubscriberLocationReq,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VerifiedSubscriberLocationIngestReport {
pub report: SubscriberLocationIngestReport,
pub status: SubscriberReportVerificationStatus,
pub timestamp: DateTime<Utc>,
}

impl MsgDecode for SubscriberLocationReq {
type Msg = SubscriberLocationReqV1;
}
Expand All @@ -30,6 +39,10 @@ impl MsgDecode for SubscriberLocationIngestReport {
type Msg = SubscriberLocationIngestReportV1;
}

impl MsgDecode for VerifiedSubscriberLocationIngestReport {
type Msg = VerifiedSubscriberLocationIngestReportV1;
}

impl TryFrom<SubscriberLocationReqV1> for SubscriberLocationReq {
type Error = Error;
fn try_from(v: SubscriberLocationReqV1) -> Result<Self> {
Expand All @@ -41,12 +54,54 @@ impl TryFrom<SubscriberLocationReqV1> for SubscriberLocationReq {
}
}

impl From<SubscriberLocationReq> for SubscriberLocationReqV1 {
fn from(v: SubscriberLocationReq) -> Self {
let timestamp = v.timestamp();
Self {
subscriber_id: v.subscriber_id,
timestamp,
carrier_pub_key: v.carrier_pub_key.into(),
signature: vec![],
}
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for SubscriberLocationReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for SubscriberLocationReq {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for SubscriberLocationIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp_millis()
}
}

impl MsgTimestamp<u64> for SubscriberLocationIngestReport {
fn timestamp(&self) -> u64 {
self.received_timestamp.encode_timestamp_millis()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for VerifiedSubscriberLocationIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp_millis()
}
}

impl MsgTimestamp<u64> for VerifiedSubscriberLocationIngestReport {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp_millis()
}
}

impl TryFrom<SubscriberLocationIngestReportV1> for SubscriberLocationIngestReport {
type Error = Error;
fn try_from(v: SubscriberLocationIngestReportV1) -> Result<Self> {
Expand All @@ -60,8 +115,45 @@ impl TryFrom<SubscriberLocationIngestReportV1> for SubscriberLocationIngestRepor
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for SubscriberLocationIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp_millis()
impl From<SubscriberLocationIngestReport> for SubscriberLocationIngestReportV1 {
fn from(v: SubscriberLocationIngestReport) -> Self {
let received_timestamp = v.timestamp();
let report: SubscriberLocationReqV1 = v.report.into();
Self {
received_timestamp,
report: Some(report),
}
}
}

impl TryFrom<VerifiedSubscriberLocationIngestReportV1> for VerifiedSubscriberLocationIngestReport {
type Error = Error;
fn try_from(v: VerifiedSubscriberLocationIngestReportV1) -> Result<Self> {
let status = SubscriberReportVerificationStatus::from_i32(v.status).ok_or_else(|| {
DecodeError::unsupported_status_reason(
"verified_subscriber_location_ingest_report_v1",
v.status,
)
})?;
Ok(Self {
report: v
.report
.ok_or_else(|| Error::not_found("ingest subscriber location ingest report"))?
.try_into()?,
status,
timestamp: v.timestamp.to_timestamp()?,
})
}
}

impl From<VerifiedSubscriberLocationIngestReport> for VerifiedSubscriberLocationIngestReportV1 {
fn from(v: VerifiedSubscriberLocationIngestReport) -> Self {
let timestamp = v.timestamp();
let report: SubscriberLocationIngestReportV1 = v.report.into();
Self {
report: Some(report),
status: v.status as i32,
timestamp,
}
}
}
16 changes: 16 additions & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,24 @@ impl Cmd {
.start(shutdown_listener.clone())
.await?;

let (verified_subscriber_location, mut verified_subscriber_location_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedSubscriberLocationIngestReport,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"),
shutdown_listener.clone(),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.create()
.await?;

let subscriber_location_ingestor = SubscriberLocationIngestor::new(
pool.clone(),
auth_client.clone(),
entity_client.clone(),
subscriber_location_ingest,
verified_subscriber_location,
);

// data transfers
Expand All @@ -188,6 +201,9 @@ impl Cmd {
mobile_rewards_server.run().map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
reward_manifests_server.run().map_err(Error::from),
verified_subscriber_location_server
.run()
.map_err(Error::from),
subscriber_location_ingestor
.run(&shutdown_listener)
.map_err(Error::from),
Expand Down
1 change: 0 additions & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use anyhow::bail;
use chrono::{DateTime, Duration, TimeZone, Utc};
use db_store::meta;
use file_store::{file_sink::FileSinkClient, traits::TimestampEncode};

use helium_proto::services::poc_mobile::mobile_reward_share::Reward as ProtoReward;
use helium_proto::RewardManifest;
use price::PriceTracker;
Expand Down
Loading

0 comments on commit 6fda679

Please sign in to comment.