Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hip 118 visualization data requirements #864

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 32 additions & 209 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ sqlx = { version = "0", features = [
] }
helium-anchor-gen = { git = "https://github.com/helium/helium-anchor-gen.git" }
helium-crypto = { version = "0.8.4", features = ["multisig"] }
helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = "master" }
helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = "tmp-andymck/andymck/hip-118-visualization-data" }
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "andymck/hip-118-visualization-data-v2", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/hip-118-visualization-data-v2" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
21 changes: 21 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ impl FileInfo {
}
}

pub const HEX_USAGE_COUNTS_INGEST_REPORT: &str = "hex_usage_counts_ingest_report";
pub const RADIO_USAGE_COUNTS_INGEST_REPORT: &str = "radio_usage_counts_ingest_report";
pub const HEX_USAGE_COUNTS_REQ: &str = "hex_usage_counts_req";
pub const RADIO_USAGE_COUNTS_REQ: &str = "radio_usage_counts_req";

pub const INVALIDATED_RADIO_THRESHOLD_REQ: &str = "invalidated_radio_threshold_req";
pub const INVALIDATED_RADIO_THRESHOLD_INGEST_REPORT: &str =
"invalidated_radio_threshold_ingest_report";
Expand Down Expand Up @@ -228,6 +233,10 @@ pub enum FileType {
PromotionRewardIngestReport,
VerifiedPromotionReward,
ServiceProviderPromotionFund,
HexUsageCountsIngestReport,
RadioUsageCountsIngestReport,
HexUsageCountsReq,
RadioUsageCountsReq,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -303,6 +312,10 @@ impl fmt::Display for FileType {
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
Self::HexUsageCountsIngestReport => HEX_USAGE_COUNTS_INGEST_REPORT,
Self::RadioUsageCountsIngestReport => RADIO_USAGE_COUNTS_INGEST_REPORT,
Self::HexUsageCountsReq => HEX_USAGE_COUNTS_REQ,
Self::RadioUsageCountsReq => RADIO_USAGE_COUNTS_REQ,
};
f.write_str(s)
}
Expand Down Expand Up @@ -381,6 +394,10 @@ impl FileType {
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
Self::HexUsageCountsIngestReport => HEX_USAGE_COUNTS_INGEST_REPORT,
Self::RadioUsageCountsIngestReport => RADIO_USAGE_COUNTS_INGEST_REPORT,
Self::HexUsageCountsReq => HEX_USAGE_COUNTS_REQ,
Self::RadioUsageCountsReq => RADIO_USAGE_COUNTS_REQ,
}
}
}
Expand Down Expand Up @@ -458,6 +475,10 @@ impl FromStr for FileType {
PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward,
SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund,
HEX_USAGE_COUNTS_INGEST_REPORT => Self::HexUsageCountsIngestReport,
RADIO_USAGE_COUNTS_INGEST_REPORT => Self::RadioUsageCountsIngestReport,
HEX_USAGE_COUNTS_REQ => Self::HexUsageCountsReq,
RADIO_USAGE_COUNTS_REQ => Self::RadioUsageCountsReq,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod speedtest;
pub mod subscriber_verified_mapping_event;
pub mod subscriber_verified_mapping_event_ingest_report;
pub mod traits;
pub mod usage_counts;
pub mod verified_subscriber_verified_mapping_event_ingest_report;
pub mod wifi_heartbeat;

Expand Down
10 changes: 10 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ impl_file_sink!(
FileType::WifiHeartbeatIngestReport.to_str(),
"wifi_heartbeat_report"
);
impl_file_sink!(
poc_mobile::HexUsageCountsIngestReportV1,
FileType::HexUsageCountsIngestReport.to_str(),
"hex_usage_counts_ingest_report"
);
impl_file_sink!(
poc_mobile::RadioUsageCountsIngestReportV1,
FileType::RadioUsageCountsIngestReport.to_str(),
"hotspot_usage_counts_ingest_report"
);
impl_file_sink!(
proto::BoostedHexUpdateV1,
FileType::BoostedHexUpdate.to_str(),
Expand Down
2 changes: 2 additions & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::HexUsageCountsReqV1, signature);
impl_msg_verify!(poc_mobile::RadioUsageCountsReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
261 changes: 261 additions & 0 deletions file_store/src/usage_counts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
use std::convert::TryFrom;

use crate::{
error::DecodeError,
traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use h3o::CellIndex;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
HexUsageCountsIngestReportV1, HexUsageCountsReqV1, RadioUsageCountsIngestReportV1,
RadioUsageCountsReqV1,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct HexUsageCountsReq {
pub hex: CellIndex,
pub service_provider_subscriber_count: u64,
pub disco_mapping_count: u64,
pub offload_count: u64,
pub service_provider_transfer_bytes: u64,
pub offload_transfer_bytes: u64,
pub epoch_start_timestamp: DateTime<Utc>,
pub epoch_end_timestamp: DateTime<Utc>,
pub timestamp: DateTime<Utc>,
pub carrier_mapping_key: PublicKeyBinary,
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioUsageCountsReq {
pub hotspot_pubkey: PublicKeyBinary,
pub cbsd_id: String,
pub service_provider_subscriber_count: u64,
pub disco_mapping_count: u64,
pub offload_count: u64,
pub service_provider_transfer_bytes: u64,
pub offload_transfer_bytes: u64,
pub epoch_start_timestamp: DateTime<Utc>,
pub epoch_end_timestamp: DateTime<Utc>,
pub timestamp: DateTime<Utc>,
pub carrier_mapping_key: PublicKeyBinary,
}

impl MsgDecode for HexUsageCountsReq {
type Msg = HexUsageCountsReqV1;
}

impl MsgDecode for RadioUsageCountsReq {
type Msg = RadioUsageCountsReqV1;
}

impl MsgTimestamp<Result<DateTime<Utc>>> for HexUsageCountsReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp_millis()
}
}

impl MsgTimestamp<u64> for HexUsageCountsReq {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp_millis()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RadioUsageCountsReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp_millis()
}
}

impl MsgTimestamp<u64> for RadioUsageCountsReq {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp_millis()
}
}

impl TryFrom<HexUsageCountsReqV1> for HexUsageCountsReq {
type Error = Error;
fn try_from(v: HexUsageCountsReqV1) -> Result<Self> {
let timestamp = v.timestamp()?;
let epoch_start_timestamp = v.epoch_start_timestamp.to_timestamp_millis()?;
let epoch_end_timestamp = v.epoch_end_timestamp.to_timestamp_millis()?;
let hex = CellIndex::try_from(v.hex).map_err(|_| {
DecodeError::FileStreamTryDecode(format!("invalid CellIndex {}", v.hex))
})?;
Ok(Self {
hex,
service_provider_subscriber_count: v.service_provider_subscriber_count,
disco_mapping_count: v.disco_mapping_count,
offload_count: v.offload_count,
service_provider_transfer_bytes: v.service_provider_transfer_bytes,
offload_transfer_bytes: v.offload_transfer_bytes,
epoch_start_timestamp,
epoch_end_timestamp,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
})
}
}

impl From<HexUsageCountsReq> for HexUsageCountsReqV1 {
fn from(v: HexUsageCountsReq) -> Self {
let timestamp = v.timestamp();
let epoch_start_timestamp = v.epoch_start_timestamp.encode_timestamp_millis();
let epoch_end_timestamp = v.epoch_end_timestamp.encode_timestamp_millis();

HexUsageCountsReqV1 {
hex: v.hex.into(),
service_provider_subscriber_count: v.service_provider_subscriber_count,
disco_mapping_count: v.disco_mapping_count,
offload_count: v.offload_count,
service_provider_transfer_bytes: v.service_provider_transfer_bytes,
offload_transfer_bytes: v.offload_transfer_bytes,
epoch_start_timestamp,
epoch_end_timestamp,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
signature: vec![],
}
}
}

impl TryFrom<RadioUsageCountsReqV1> for RadioUsageCountsReq {
type Error = Error;
fn try_from(v: RadioUsageCountsReqV1) -> Result<Self> {
let timestamp = v.timestamp()?;
let epoch_start_timestamp = v.epoch_start_timestamp.to_timestamp_millis()?;
let epoch_end_timestamp = v.epoch_end_timestamp.to_timestamp_millis()?;
Ok(Self {
hotspot_pubkey: v.hotspot_pubkey.into(),
cbsd_id: v.cbsd_id,
service_provider_subscriber_count: v.service_provider_subscriber_count,
disco_mapping_count: v.disco_mapping_count,
offload_count: v.offload_count,
service_provider_transfer_bytes: v.service_provider_transfer_bytes,
offload_transfer_bytes: v.offload_transfer_bytes,
epoch_start_timestamp,
epoch_end_timestamp,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
})
}
}

impl From<RadioUsageCountsReq> for RadioUsageCountsReqV1 {
fn from(v: RadioUsageCountsReq) -> Self {
let timestamp = v.timestamp();
let epoch_start_timestamp = v.epoch_start_timestamp.encode_timestamp_millis();
let epoch_end_timestamp = v.epoch_end_timestamp.encode_timestamp_millis();

RadioUsageCountsReqV1 {
hotspot_pubkey: v.hotspot_pubkey.into(),
cbsd_id: v.cbsd_id,
service_provider_subscriber_count: v.service_provider_subscriber_count,
disco_mapping_count: v.disco_mapping_count,
offload_count: v.offload_count,
service_provider_transfer_bytes: v.service_provider_transfer_bytes,
offload_transfer_bytes: v.offload_transfer_bytes,
epoch_start_timestamp,
epoch_end_timestamp,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
signature: vec![],
}
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct HexUsageCountsIngestReport {
pub report: HexUsageCountsReq,
pub received_timestamp: DateTime<Utc>,
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioUsageCountsIngestReport {
pub report: RadioUsageCountsReq,
pub received_timestamp: DateTime<Utc>,
}

impl MsgDecode for HexUsageCountsIngestReport {
type Msg = HexUsageCountsIngestReportV1;
}

impl MsgDecode for RadioUsageCountsIngestReport {
type Msg = RadioUsageCountsIngestReportV1;
}

impl MsgTimestamp<Result<DateTime<Utc>>> for HexUsageCountsIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for HexUsageCountsIngestReport {
fn timestamp(&self) -> u64 {
self.received_timestamp.encode_timestamp()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RadioUsageCountsIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for RadioUsageCountsIngestReport {
fn timestamp(&self) -> u64 {
self.received_timestamp.encode_timestamp()
}
}

impl TryFrom<HexUsageCountsIngestReportV1> for HexUsageCountsIngestReport {
type Error = Error;
fn try_from(v: HexUsageCountsIngestReportV1) -> Result<Self> {
Ok(Self {
report: v
.clone()
.report
.ok_or_else(|| Error::not_found("ingest HexUsageCountsIngestReport report"))?
.try_into()?,
received_timestamp: v.timestamp()?,
})
}
}

impl From<HexUsageCountsIngestReport> for HexUsageCountsIngestReportV1 {
fn from(v: HexUsageCountsIngestReport) -> Self {
let received_timestamp = v.received_timestamp;
let report: HexUsageCountsReqV1 = v.report.into();
Self {
report: Some(report),
received_timestamp: received_timestamp.encode_timestamp(),
}
}
}

impl TryFrom<RadioUsageCountsIngestReportV1> for RadioUsageCountsIngestReport {
type Error = Error;
fn try_from(v: RadioUsageCountsIngestReportV1) -> Result<Self> {
Ok(Self {
report: v
.clone()
.report
.ok_or_else(|| Error::not_found("ingest RadioUsageCountsIngestReport report"))?
.try_into()?,
received_timestamp: v.timestamp()?,
})
}
}

impl From<RadioUsageCountsIngestReport> for RadioUsageCountsIngestReportV1 {
fn from(v: RadioUsageCountsIngestReport) -> Self {
let received_timestamp = v.received_timestamp;
let report: RadioUsageCountsReqV1 = v.report.into();
Self {
report: Some(report),
received_timestamp: received_timestamp.encode_timestamp(),
}
}
}
1 change: 1 addition & 0 deletions ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ file-store = { path = "../file_store" }
poc-metrics = { path = "../metrics" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
mobile-config = { path = "../mobile_config" }
task-manager = { path = "../task_manager" }
rand = { workspace = true }
custom-tracing = { path = "../custom_tracing", features = ["grpc"] }
Expand Down
Loading
Loading