Skip to content

Commit

Permalink
Supporting material for promotion_fund workspace
Browse files Browse the repository at this point in the history
- ingest promotion rewards, nothing will be done with them until the
  processor is added into mobile-verifier.
- dump reward files
- add sp_allocations dummy field to rewarder output
- reward indexer mobile promotion type added
  • Loading branch information
michaeldjeffrey committed Sep 20, 2024
1 parent 1b0fe2c commit 672a531
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 52 deletions.
140 changes: 93 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
18 changes: 18 additions & 0 deletions file_store/src/cli/dump_mobile_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{file_source, Result, Settings};
use futures::stream::StreamExt;
use helium_crypto::PublicKey;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward::*;
use helium_proto::services::poc_mobile::promotion_reward::Entity;
use helium_proto::services::poc_mobile::MobileRewardShare;
use prost::Message;
use serde_json::json;
Expand All @@ -23,6 +24,7 @@ impl Cmd {
let mut subscriber_reward = vec![];
let mut service_provider_reward = vec![];
let mut unallocated_reward = vec![];
let mut promotion_reward = vec![];

while let Some(result) = file_stream.next().await {
let msg = result?;
Expand Down Expand Up @@ -60,6 +62,21 @@ impl Cmd {
"unallocated_reward_type": reward.reward_type,
"amount": reward.amount,
})),
PromotionReward(reward) => {
let entity = reward.entity.unwrap();
match entity {
Entity::SubscriberId(id) => promotion_reward.push(json!({
"subscriber_id": uuid::Uuid::from_slice(&id).unwrap(),
"service_provider_amount": reward.service_provider_amount,
"matched_amount": reward.matched_amount,
})),
Entity::GatewayKey(key) => promotion_reward.push(json!({
"gateway_key": PublicKey::try_from(key)?.to_string(),
"service_provider_amount": reward.service_provider_amount,
"matched_amount": reward.matched_amount,
})),
}
}
},
None => todo!(),
}
Expand All @@ -71,6 +88,7 @@ impl Cmd {
"gateway_reward": gateway_reward,
"subscriber_reward": subscriber_reward,
"service_provider_reward": service_provider_reward,
"promotion_reward": promotion_reward,
"unallocated_reward": unallocated_reward,
}))?;

Expand Down
15 changes: 15 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"subscriber_verified_mapping_ingest_report";
pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"verified_subscriber_verified_mapping_ingest_report";
pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report";
pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward";
pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -220,6 +223,9 @@ pub enum FileType {
VerifiedSPBoostedRewardsBannedRadioIngestReport,
SubscriberVerifiedMappingEventIngestReport,
VerifiedSubscriberVerifiedMappingEventIngestReport,
PromotionRewardIngestReport,
VerifiedPromotionReward,
ServiceProviderPromotionFund,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -291,6 +297,9 @@ impl fmt::Display for FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
};
f.write_str(s)
}
Expand Down Expand Up @@ -365,6 +374,9 @@ impl FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
}
}
}
Expand Down Expand Up @@ -439,6 +451,9 @@ impl FromStr for FileType {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport
}
PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward,
SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold;
pub mod mobile_session;
pub mod mobile_subscriber;
pub mod mobile_transfer;
pub mod promotion_reward;
pub mod reward_manifest;
mod settings;
pub mod speedtest;
Expand Down
91 changes: 91 additions & 0 deletions file_store/src/promotion_reward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{
traits::{MsgDecode, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
self as proto, PromotionRewardIngestReportV1, PromotionRewardReqV1,
};

#[derive(Debug, Clone, PartialEq, Hash)]
pub enum Entity {
SubscriberId(Vec<u8>),
GatewayKey(PublicKeyBinary),
}

impl From<proto::promotion_reward_req_v1::Entity> for Entity {
fn from(entity: proto::promotion_reward_req_v1::Entity) -> Self {
match entity {
proto::promotion_reward_req_v1::Entity::SubscriberId(v) => Entity::SubscriberId(v),
proto::promotion_reward_req_v1::Entity::GatewayKey(k) => Entity::GatewayKey(k.into()),
}
}
}

impl From<Entity> for proto::promotion_reward_req_v1::Entity {
fn from(entity: Entity) -> Self {
match entity {
Entity::SubscriberId(v) => proto::promotion_reward_req_v1::Entity::SubscriberId(v),
Entity::GatewayKey(k) => proto::promotion_reward_req_v1::Entity::GatewayKey(k.into()),
}
}
}

impl From<Entity> for proto::promotion_reward::Entity {
fn from(entity: Entity) -> Self {
match entity {
Entity::SubscriberId(v) => proto::promotion_reward::Entity::SubscriberId(v),
Entity::GatewayKey(k) => proto::promotion_reward::Entity::GatewayKey(k.into()),
}
}
}

#[derive(Clone)]
pub struct PromotionReward {
pub entity: Entity,
pub shares: u64,
pub timestamp: DateTime<Utc>,
pub received_timestamp: DateTime<Utc>,
pub carrier_pub_key: PublicKeyBinary,
pub signature: Vec<u8>,
}

impl MsgDecode for PromotionReward {
type Msg = PromotionRewardIngestReportV1;
}

impl TryFrom<PromotionRewardIngestReportV1> for PromotionReward {
type Error = Error;

fn try_from(v: PromotionRewardIngestReportV1) -> Result<Self> {
let received_timestamp = v.received_timestamp.to_timestamp_millis()?;
let Some(v) = v.report else {
return Err(Error::NotFound("report".to_string()));
};
Ok(Self {
entity: if let Some(entity) = v.entity {
entity.into()
} else {
return Err(Error::NotFound("entity".to_string()));
},
shares: v.shares,
timestamp: v.timestamp.to_timestamp()?,
received_timestamp,
carrier_pub_key: v.carrier_pub_key.into(),
signature: v.signature,
})
}
}

impl From<PromotionReward> for PromotionRewardReqV1 {
fn from(v: PromotionReward) -> Self {
Self {
entity: Some(v.entity.into()),
shares: v.shares,
timestamp: v.timestamp.encode_timestamp(),
carrier_pub_key: v.carrier_pub_key.into(),
signature: v.signature,
}
}
}
15 changes: 15 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,18 @@ impl_file_sink!(
FileType::RewardManifest.to_str(),
"reward_manifest"
);
impl_file_sink!(
proto::ServiceProviderPromotionFundV1,
FileType::ServiceProviderPromotionFund.to_str(),
"service_provider_promotion_fund"
);
impl_file_sink!(
poc_mobile::PromotionRewardIngestReportV1,
FileType::PromotionRewardIngestReport.to_str(),
"promotion_reward_ingest_report"
);
impl_file_sink!(
poc_mobile::VerifiedPromotionRewardV1,
FileType::VerifiedPromotionReward.to_str(),
"verified_promotion_reward"
);
1 change: 1 addition & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
42 changes: 41 additions & 1 deletion ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use helium_proto::services::poc_mobile::{
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, PromotionRewardIngestReportV1, PromotionRewardReqV1,
PromotionRewardRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1,
SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1,
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct GrpcServer {
sp_boosted_rewards_ban_sink:
FileSinkClient<ServiceProviderBoostedRewardsBannedRadioIngestReportV1>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
promotion_reward_sink: FileSinkClient<PromotionRewardIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -85,6 +87,7 @@ impl GrpcServer {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
promotion_reward_sink: FileSinkClient<PromotionRewardIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand All @@ -100,6 +103,7 @@ impl GrpcServer {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
promotion_reward_sink,
required_network,
address,
api_token,
Expand Down Expand Up @@ -437,6 +441,30 @@ impl poc_mobile::PocMobile for GrpcServer {
let id = timestamp.to_string();
Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id }))
}

async fn submit_promotion_reward(
&self,
request: Request<PromotionRewardReqV1>,
) -> GrpcResult<PromotionRewardRespV1> {
let received_timestamp: u64 = Utc::now().timestamp_millis() as u64;
let event = request.into_inner();

custom_tracing::record_b58("pub_key", &event.carrier_pub_key);

let report = self
.verify_public_key(event.carrier_pub_key.as_ref())
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| PromotionRewardIngestReportV1 {
received_timestamp,
report: Some(event),
})?;

let _ = self.promotion_reward_sink.write(report, []).await;

let id = received_timestamp.to_string();
Ok(Response::new(PromotionRewardRespV1 { id }))
}
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
Expand Down Expand Up @@ -546,6 +574,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
)
.await?;

let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) =
PromotionRewardIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let Some(api_token) = settings
.token
.as_ref()
Expand All @@ -565,6 +603,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
subscriber_referral_eligibility_sink,
settings.network,
settings.listen_addr,
api_token,
Expand All @@ -588,6 +627,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(coverage_object_report_sink_server)
.add_task(sp_boosted_rewards_ban_sink_server)
.add_task(subscriber_mapping_event_server)
.add_task(subscriber_referral_eligibility_server)
.add_task(grpc_server)
.build()
.start()
Expand Down
2 changes: 2 additions & 0 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10);
let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10);
let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10);
let (promotion_rewards_tx, _rx) = tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let grpc_server = GrpcServer::new(
Expand All @@ -57,6 +58,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
FileSinkClient::new(coverage_obj_tx, "noop"),
FileSinkClient::new(sp_boosted_tx, "noop"),
FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"),
FileSinkClient::new(promotion_rewards_tx, "noop"),
Network::MainNet,
socket_addr,
api_token,
Expand Down
2 changes: 2 additions & 0 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ where
boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal {
value: poc_dc_shares.boost.to_string(),
}),
// TODO: Filled in with the next PR
sp_allocations: vec![],
};
self.reward_manifests
.write(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE reward_type ADD VALUE 'mobile_promotion';
Loading

0 comments on commit 672a531

Please sign in to comment.