From 5ad4663c8e2fc35fee4accfce1c6683d1b076762 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Tue, 29 Oct 2024 10:30:13 -0700 Subject: [PATCH] Service Provider Promotions again (#877) * Remove unused files * Update service provider promotions types in file_store * Rework service provider promotion rewards We now get incentive fund bps and active promotions from mobile-config list incentive promotions from mobile-config client * Start to fill out listing incentive programs for service providers from mobile-config * Redo carrier client listing incentive promotions * Raise dc_sessions and sp_promotions sp_promotions needs to be written to the reward manifest. If we pull it up a level, we can uncouple service provider rewards from carrier_client and the database. * Remove funds container, replace with promotions that has bps and promotions * Use ServiceProviderPromotions wrapper * clean up test structure * clean up exported naming from service provider * update to use published proto types * Add metadata db to carrier service Fill out query to return all active service provider promotions * Keep importing RadioReward until we stop reading them * Use request timestamp to filter active promotions returned * update to rebased proto branch * go back to proto master --- Cargo.lock | 16 +- file_store/src/cli/dump_mobile_rewards.rs | 24 +- file_store/src/lib.rs | 1 - file_store/src/promotion_reward.rs | 91 --- file_store/src/traits/file_sink_write.rs | 15 - file_store/src/traits/msg_verify.rs | 3 +- ingest/src/server_mobile.rs | 42 +- ingest/tests/common/mod.rs | 2 - mobile_config/src/carrier_service.rs | 101 ++- .../src/client/carrier_service_client.rs | 36 +- mobile_config/src/main.rs | 8 +- mobile_verifier/src/cli/mod.rs | 1 - mobile_verifier/src/cli/promotion_funds.rs | 59 -- mobile_verifier/src/cli/server.rs | 15 - mobile_verifier/src/main.rs | 4 +- mobile_verifier/src/reward_shares.rs | 9 +- mobile_verifier/src/rewarder.rs | 27 +- .../src/service_provider/dc_sessions.rs | 12 +- mobile_verifier/src/service_provider/mod.rs | 34 +- .../src/service_provider/promotions.rs | 51 ++ .../src/service_provider/promotions/daemon.rs | 262 ------- .../src/service_provider/promotions/funds.rs | 99 --- .../src/service_provider/promotions/mod.rs | 3 - .../service_provider/promotions/rewards.rs | 180 ----- .../src/service_provider/reward.rs | 734 +++++++++--------- mobile_verifier/src/settings.rs | 2 - .../tests/integrations/common/mod.rs | 23 +- .../tests/integrations/rewarder_sp_rewards.rs | 299 ++++--- 28 files changed, 748 insertions(+), 1405 deletions(-) delete mode 100644 file_store/src/promotion_reward.rs delete mode 100644 mobile_verifier/src/cli/promotion_funds.rs create mode 100644 mobile_verifier/src/service_provider/promotions.rs delete mode 100644 mobile_verifier/src/service_provider/promotions/daemon.rs delete mode 100644 mobile_verifier/src/service_provider/promotions/funds.rs delete mode 100644 mobile_verifier/src/service_provider/promotions/mod.rs delete mode 100644 mobile_verifier/src/service_provider/promotions/rewards.rs diff --git a/Cargo.lock b/Cargo.lock index ba84a1667..74c492ed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ad18fa2ac0864a2ec0da1c362139778cfd512ce2" +source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" dependencies = [ "base64 0.21.7", "byteorder", @@ -1625,7 +1625,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -3755,7 +3755,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.2.10", + "getrandom 0.1.16", "k256", "lazy_static", "multihash", @@ -3822,7 +3822,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ad18fa2ac0864a2ec0da1c362139778cfd512ce2" +source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" dependencies = [ "bytes", "prost", @@ -5521,7 +5521,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 1.1.3", "proc-macro2", "quote", "syn 2.0.58", @@ -6061,7 +6061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.0", "itertools", "log", "multimap", @@ -9316,7 +9316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] @@ -9986,7 +9986,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/file_store/src/cli/dump_mobile_rewards.rs b/file_store/src/cli/dump_mobile_rewards.rs index 8c4e83f14..649b7524e 100644 --- a/file_store/src/cli/dump_mobile_rewards.rs +++ b/file_store/src/cli/dump_mobile_rewards.rs @@ -24,7 +24,7 @@ impl Cmd { let mut subscriber_reward = vec![]; let mut service_provider_reward = vec![]; let mut unallocated_reward = vec![]; - // let mut promotion_reward = vec![]; + let mut promotion_reward = vec![]; while let Some(result) = file_stream.next().await { let msg = result?; @@ -62,21 +62,11 @@ impl Cmd { "unallocated_reward_type": reward.reward_type, "amount": reward.amount, })), - // PromotionReward(reward) => { - // let entity = reward.entity.unwrap(); - // match entity { - // Entity::SubscriberId(id) => promotion_reward.push(json!({ - // "subscriber_id": uuid::Uuid::from_slice(&id).unwrap(), - // "service_provider_amount": reward.service_provider_amount, - // "matched_amount": reward.matched_amount, - // })), - // Entity::GatewayKey(key) => promotion_reward.push(json!({ - // "gateway_key": PublicKey::try_from(key)?.to_string(), - // "service_provider_amount": reward.service_provider_amount, - // "matched_amount": reward.matched_amount, - // })), - // } - // } + PromotionReward(reward) => promotion_reward.push(json!({ + "entity": reward.entity, + "service_provider_amount": reward.service_provider_amount, + "matched_amount": reward.matched_amount, + })), }, None => todo!(), } @@ -88,7 +78,7 @@ impl Cmd { "gateway_reward": gateway_reward, "subscriber_reward": subscriber_reward, "service_provider_reward": service_provider_reward, - // "promotion_reward": promotion_reward, + "promotion_reward": promotion_reward, "unallocated_reward": unallocated_reward, }))?; diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 9c5a132cd..477c0dea9 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,7 +20,6 @@ pub mod mobile_radio_threshold; pub mod mobile_session; pub mod mobile_subscriber; pub mod mobile_transfer; -pub mod promotion_reward; pub mod reward_manifest; mod settings; pub mod speedtest; diff --git a/file_store/src/promotion_reward.rs b/file_store/src/promotion_reward.rs deleted file mode 100644 index c89132355..000000000 --- a/file_store/src/promotion_reward.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::{ - traits::{MsgDecode, TimestampDecode, TimestampEncode}, - Error, Result, -}; -use chrono::{DateTime, Utc}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile::{ - self as proto, PromotionRewardIngestReportV1, PromotionRewardReqV1, -}; - -#[derive(Debug, Clone, PartialEq, Hash)] -pub enum Entity { - SubscriberId(Vec), - GatewayKey(PublicKeyBinary), -} - -impl From for Entity { - fn from(entity: proto::promotion_reward_req_v1::Entity) -> Self { - match entity { - proto::promotion_reward_req_v1::Entity::SubscriberId(v) => Entity::SubscriberId(v), - proto::promotion_reward_req_v1::Entity::GatewayKey(k) => Entity::GatewayKey(k.into()), - } - } -} - -impl From for proto::promotion_reward_req_v1::Entity { - fn from(entity: Entity) -> Self { - match entity { - Entity::SubscriberId(v) => proto::promotion_reward_req_v1::Entity::SubscriberId(v), - Entity::GatewayKey(k) => proto::promotion_reward_req_v1::Entity::GatewayKey(k.into()), - } - } -} - -impl From for proto::promotion_reward::Entity { - fn from(entity: Entity) -> Self { - match entity { - Entity::SubscriberId(v) => proto::promotion_reward::Entity::SubscriberId(v), - Entity::GatewayKey(k) => proto::promotion_reward::Entity::GatewayKey(k.into()), - } - } -} - -#[derive(Clone)] -pub struct PromotionReward { - pub entity: Entity, - pub shares: u64, - pub timestamp: DateTime, - pub received_timestamp: DateTime, - pub carrier_pub_key: PublicKeyBinary, - pub signature: Vec, -} - -impl MsgDecode for PromotionReward { - type Msg = PromotionRewardIngestReportV1; -} - -impl TryFrom for PromotionReward { - type Error = Error; - - fn try_from(v: PromotionRewardIngestReportV1) -> Result { - let received_timestamp = v.received_timestamp.to_timestamp_millis()?; - let Some(v) = v.report else { - return Err(Error::NotFound("report".to_string())); - }; - Ok(Self { - entity: if let Some(entity) = v.entity { - entity.into() - } else { - return Err(Error::NotFound("entity".to_string())); - }, - shares: v.shares, - timestamp: v.timestamp.to_timestamp()?, - received_timestamp, - carrier_pub_key: v.carrier_pub_key.into(), - signature: v.signature, - }) - } -} - -impl From for PromotionRewardReqV1 { - fn from(v: PromotionReward) -> Self { - Self { - entity: Some(v.entity.into()), - shares: v.shares, - timestamp: v.timestamp.encode_timestamp(), - carrier_pub_key: v.carrier_pub_key.into(), - signature: v.signature, - } - } -} diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 54ea53100..f8166b024 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -273,18 +273,3 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); -impl_file_sink!( - proto::ServiceProviderPromotionFundV1, - FileType::ServiceProviderPromotionFund.to_str(), - "service_provider_promotion_fund" -); -impl_file_sink!( - poc_mobile::PromotionRewardIngestReportV1, - FileType::PromotionRewardIngestReport.to_str(), - "promotion_reward_ingest_report" -); -impl_file_sink!( - poc_mobile::VerifiedPromotionRewardV1, - FileType::VerifiedPromotionReward.to_str(), - "verified_promotion_reward" -); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 017a1e315..f4c05eeba 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -84,6 +84,8 @@ impl_msg_verify!(mobile_config::AuthorizationListReqV1, signature); impl_msg_verify!(mobile_config::AuthorizationListResV1, signature); impl_msg_verify!(mobile_config::EntityVerifyReqV1, signature); impl_msg_verify!(mobile_config::EntityVerifyResV1, signature); +impl_msg_verify!(mobile_config::CarrierIncentivePromotionListReqV1, signature); +impl_msg_verify!(mobile_config::CarrierIncentivePromotionListResV1, signature); impl_msg_verify!(mobile_config::CarrierKeyToEntityReqV1, signature); impl_msg_verify!(mobile_config::CarrierKeyToEntityResV1, signature); impl_msg_verify!(mobile_config::GatewayInfoReqV1, signature); @@ -95,7 +97,6 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); -impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature); #[cfg(test)] mod test { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 71bf2bf07..c1e91ded1 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -14,8 +14,7 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, - InvalidatedRadioThresholdReportRespV1, PromotionRewardIngestReportV1, PromotionRewardReqV1, - PromotionRewardRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, + InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, @@ -47,7 +46,6 @@ pub struct GrpcServer { sp_boosted_rewards_ban_sink: FileSinkClient, subscriber_mapping_event_sink: FileSinkClient, - promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -87,7 +85,6 @@ impl GrpcServer { ServiceProviderBoostedRewardsBannedRadioIngestReportV1, >, subscriber_mapping_event_sink: FileSinkClient, - promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -103,7 +100,6 @@ impl GrpcServer { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, - promotion_reward_sink, required_network, address, api_token, @@ -441,30 +437,6 @@ impl poc_mobile::PocMobile for GrpcServer { let id = timestamp.to_string(); Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } - - async fn submit_promotion_reward( - &self, - request: Request, - ) -> GrpcResult { - let received_timestamp: u64 = Utc::now().timestamp_millis() as u64; - let event = request.into_inner(); - - custom_tracing::record_b58("pub_key", &event.carrier_pub_key); - - let report = self - .verify_public_key(event.carrier_pub_key.as_ref()) - .and_then(|public_key| self.verify_network(public_key)) - .and_then(|public_key| self.verify_signature(public_key, event)) - .map(|(_, event)| PromotionRewardIngestReportV1 { - received_timestamp, - report: Some(event), - })?; - - let _ = self.promotion_reward_sink.write(report, []).await; - - let id = received_timestamp.to_string(); - Ok(Response::new(PromotionRewardRespV1 { id })) - } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -574,16 +546,6 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; - let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) = - PromotionRewardIngestReportV1::file_sink( - store_base_path, - file_upload.clone(), - FileSinkCommitStrategy::Automatic, - FileSinkRollTime::Duration(settings.roll_time), - env!("CARGO_PKG_NAME"), - ) - .await?; - let Some(api_token) = settings .token .as_ref() @@ -603,7 +565,6 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, - subscriber_referral_eligibility_sink, settings.network, settings.listen_addr, api_token, @@ -627,7 +588,6 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(coverage_object_report_sink_server) .add_task(sp_boosted_rewards_ban_sink_server) .add_task(subscriber_mapping_event_server) - .add_task(subscriber_referral_eligibility_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 5584416a0..52512eba1 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -44,7 +44,6 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); - let (promotion_rewards_tx, _rx) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { let grpc_server = GrpcServer::new( @@ -58,7 +57,6 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(coverage_obj_tx, "noop"), FileSinkClient::new(sp_boosted_tx, "noop"), FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), - FileSinkClient::new(promotion_rewards_tx, "noop"), Network::MainNet, socket_addr, api_token, diff --git a/mobile_config/src/carrier_service.rs b/mobile_config/src/carrier_service.rs index f80ddd397..5bc96c7ec 100644 --- a/mobile_config/src/carrier_service.rs +++ b/mobile_config/src/carrier_service.rs @@ -1,25 +1,38 @@ +use std::{collections::HashMap, str::FromStr}; + use crate::{key_cache::KeyCache, telemetry, verify_public_key, GrpcResult}; use chrono::Utc; use file_store::traits::{MsgVerify, TimestampEncode}; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::{ - services::mobile_config::{self, CarrierKeyToEntityReqV1, CarrierKeyToEntityResV1}, - Message, + service_provider_promotions::Promotion, + services::mobile_config::{ + self, CarrierIncentivePromotionListReqV1, CarrierIncentivePromotionListResV1, + CarrierKeyToEntityReqV1, CarrierKeyToEntityResV1, + }, + Message, ServiceProvider, ServiceProviderPromotions, }; -use sqlx::{Pool, Postgres}; +use sqlx::{prelude::FromRow, Pool, Postgres}; use tonic::{Request, Response, Status}; pub struct CarrierService { key_cache: KeyCache, - pool: Pool, + mobile_config_db: Pool, + metadata_db: Pool, signing_key: Keypair, } impl CarrierService { - pub fn new(key_cache: KeyCache, pool: Pool, signing_key: Keypair) -> Self { + pub fn new( + key_cache: KeyCache, + mobile_config_db: Pool, + metadata_db: Pool, + signing_key: Keypair, + ) -> Self { Self { key_cache, - pool, + mobile_config_db, + metadata_db, signing_key, } } @@ -46,11 +59,61 @@ impl CarrierService { " select entity_key from carrier_keys where pubkey = $1 ", ) .bind(pubkey) - .fetch_one(&self.pool) + .fetch_one(&self.mobile_config_db) .await .map_err(|_| Status::internal("carrier entity key not found"))?; Ok(entity_key) } + + async fn fetch_incentive_promotions( + &self, + timestamp: i64, + ) -> Result, Status> { + #[derive(Debug, FromRow)] + struct Local { + carrier_name: String, + incentive_escrow_fund_bps: i32, + start_ts: i64, + stop_ts: i64, + shares: i32, + promo_name: String, + } + + let rows = sqlx::query_as::<_, Local>( + r#" + SELECT + c.name as carrier_name, c.incentive_escrow_fund_bps, + iep.carrier, iep.start_ts, iep.stop_ts, iep.shares, iep.name as promo_name + FROM carriers c + JOIN incentive_escrow_programs iep + on c.address = iep.carrier + WHERE + iep.start_ts < $1 + AND iep.stop_ts > $1 + "#, + ) + .bind(timestamp) + .fetch_all(&self.metadata_db) + .await + .map_err(|_| Status::internal("could not fetch incentive programs"))?; + + let mut sp_promotions: HashMap = HashMap::new(); + for row in rows { + let m = sp_promotions.entry(row.carrier_name.clone()).or_default(); + m.service_provider = ServiceProvider::from_str(&row.carrier_name) + .map_err(|err| Status::internal(format!("unknown carrier: {err:?}")))? + .into(); + m.incentive_escrow_fund_bps = row.incentive_escrow_fund_bps as u32; + m.promotions.push(Promotion { + entity: row.promo_name, + start_ts: row.start_ts as u64, + end_ts: row.stop_ts as u64, + shares: row.shares as u32, + }); + } + + Ok(sp_promotions.into_values().collect()) + } } #[tonic::async_trait] @@ -77,4 +140,28 @@ impl mobile_config::CarrierService for CarrierService { response.signature = self.sign_response(&response.encode_to_vec())?; Ok(Response::new(response)) } + + async fn list_incentive_promotions( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("carrier_service", "list_incentive_promotions"); + custom_tracing::record_b58("signer", &request.signer); + + let signer = verify_public_key(&request.signature)?; + self.verify_request_signature(&signer, &request)?; + + let timestamp = request.timestamp; + let promotions = self.fetch_incentive_promotions(timestamp as i64).await?; + + let mut response = CarrierIncentivePromotionListResV1 { + service_provider_promotions: promotions, + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + response.signature = self.sign_response(&response.encode_to_vec())?; + + Ok(Response::new(response)) + } } diff --git a/mobile_config/src/client/carrier_service_client.rs b/mobile_config/src/client/carrier_service_client.rs index 3a55ad4dd..69e9be28c 100644 --- a/mobile_config/src/client/carrier_service_client.rs +++ b/mobile_config/src/client/carrier_service_client.rs @@ -1,10 +1,11 @@ use super::{call_with_retry, ClientError, Settings, CACHE_EVICTION_FREQUENCY}; use async_trait::async_trait; -use file_store::traits::MsgVerify; +use chrono::Utc; +use file_store::traits::{MsgVerify, TimestampEncode}; use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::{ services::{mobile_config, Channel}, - Message, ServiceProvider, + Message, ServiceProvider, ServiceProviderPromotions, }; use retainer::Cache; use std::{str::FromStr, sync::Arc, time::Duration}; @@ -15,6 +16,10 @@ pub trait CarrierServiceVerifier { &self, payer: &str, ) -> Result; + + async fn list_incentive_promotions( + &self, + ) -> Result, Self::Error>; } #[derive(Clone)] pub struct CarrierServiceClient { @@ -61,6 +66,33 @@ impl CarrierServiceVerifier for CarrierServiceClient { .await; Ok(response) } + + async fn list_incentive_promotions( + &self, + ) -> Result, Self::Error> { + let mut request = mobile_config::CarrierIncentivePromotionListReqV1 { + timestamp: Utc::now().encode_timestamp_millis(), + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + request.signature = self.signing_key.sign(&request.encode_to_vec())?; + + let response = match call_with_retry!(self + .client + .clone() + .list_incentive_promotions(request.clone())) + { + Ok(verify_res) => { + let response = verify_res.into_inner(); + response.verify(&self.config_pubkey)?; + response.service_provider_promotions + } + + Err(status) => Err(status)?, + }; + + Ok(response) + } } impl CarrierServiceClient { diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index d12fd5233..e81127ab0 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -82,8 +82,12 @@ impl Daemon { metadata_pool.clone(), settings.signing_keypair()?, ); - let carrier_svc = - CarrierService::new(key_cache.clone(), pool.clone(), settings.signing_keypair()?); + let carrier_svc = CarrierService::new( + key_cache.clone(), + pool.clone(), + metadata_pool.clone(), + settings.signing_keypair()?, + ); let hex_boosting_svc = HexBoostingService::new( key_cache.clone(), diff --git a/mobile_verifier/src/cli/mod.rs b/mobile_verifier/src/cli/mod.rs index 36cc1a1df..611db001b 100644 --- a/mobile_verifier/src/cli/mod.rs +++ b/mobile_verifier/src/cli/mod.rs @@ -1,4 +1,3 @@ -pub mod promotion_funds; pub mod reward_from_db; pub mod server; pub mod verify_disktree; diff --git a/mobile_verifier/src/cli/promotion_funds.rs b/mobile_verifier/src/cli/promotion_funds.rs deleted file mode 100644 index 0801a9074..000000000 --- a/mobile_verifier/src/cli/promotion_funds.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::{ - service_provider::promotions::funds::{ - delete_promotion_fund, fetch_promotion_funds, save_promotion_fund, - }, - Settings, -}; - -#[derive(Debug, clap::Args)] -pub struct Cmd { - #[clap(subcommand)] - sub_command: SubCommand, -} - -#[derive(Debug, clap::Subcommand)] -enum SubCommand { - /// Print Service Provider promotions in mobile-verifier db - List, - /// Set Service Provider promotion in mobile-verifier db - Set { - service_provider_id: i32, - basis_points: u16, - }, - /// Remove Service Provider promotion allocation from mobile-verifier db - Unset { service_provider_id: i32 }, -} - -impl Cmd { - pub async fn run(&self, settings: &Settings) -> anyhow::Result<()> { - let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; - - match self.sub_command { - SubCommand::List => { - let funds = fetch_promotion_funds(&pool).await?; - println!("{funds:?}"); - } - SubCommand::Set { - service_provider_id, - basis_points, - } => { - let mut txn = pool.begin().await?; - save_promotion_fund(&mut txn, service_provider_id, basis_points).await?; - txn.commit().await?; - - let funds = fetch_promotion_funds(&pool).await?; - println!("{funds:?}"); - } - SubCommand::Unset { - service_provider_id, - } => { - delete_promotion_fund(&pool, service_provider_id).await?; - - let funds = fetch_promotion_funds(&pool).await?; - println!("{funds:?}"); - } - } - - Ok(()) - } -} diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 50d4fb61e..b7ab815fd 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -8,7 +8,6 @@ use crate::{ heartbeats::{cbrs::CbrsHeartbeatDaemon, wifi::WifiHeartbeatDaemon}, radio_threshold::RadioThresholdIngestor, rewarder::Rewarder, - service_provider, sp_boosted_rewards_bans::ServiceProviderBoostedRewardsBanIngestor, speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, @@ -46,7 +45,6 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); let report_ingest = FileStore::from_settings(&settings.ingest).await?; - let promotion_fund_ingest = FileStore::from_settings(&settings.promotion_ingest).await?; // mobile config clients let gateway_client = GatewayClient::from_settings(&settings.config_client)?; @@ -195,19 +193,6 @@ impl Cmd { ) .await?, ) - .add_task( - service_provider::PromotionDaemon::create_managed_task( - pool.clone(), - settings, - file_upload.clone(), - report_ingest.clone(), - promotion_fund_ingest, - gateway_client.clone(), - auth_client.clone(), - entity_client.clone(), - ) - .await?, - ) .add_task(DataSessionIngestor::create_managed_task(pool.clone(), settings).await?) .add_task( ServiceProviderBoostedRewardsBanIngestor::create_managed_task( diff --git a/mobile_verifier/src/main.rs b/mobile_verifier/src/main.rs index 43d6a3ba2..3b3c991b7 100644 --- a/mobile_verifier/src/main.rs +++ b/mobile_verifier/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::Parser; use mobile_verifier::{ - cli::{promotion_funds, reward_from_db, server, verify_disktree}, + cli::{reward_from_db, server, verify_disktree}, Settings, }; use std::path; @@ -37,7 +37,6 @@ pub enum Cmd { /// Go through every cell and ensure it's value can be turned into an Assignment. /// NOTE: This can take a very long time. Run with a --release binary. VerifyDisktree(verify_disktree::Cmd), - PromotionFunds(promotion_funds::Cmd), } impl Cmd { @@ -46,7 +45,6 @@ impl Cmd { Self::Server(cmd) => cmd.run(&settings).await, Self::RewardFromDb(cmd) => cmd.run(&settings).await, Self::VerifyDisktree(cmd) => cmd.run(&settings).await, - Self::PromotionFunds(cmd) => cmd.run(&settings).await, } } } diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 713438634..b2bdb9b38 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -772,10 +772,7 @@ mod test { heartbeats::{HeartbeatReward, KeyType, OwnedKeyType}, reward_shares, service_provider::{ - self, - dc_sessions::ServiceProviderDCSessions, - promotions::{funds::ServiceProviderFunds, rewards::ServiceProviderPromotions}, - ServiceProviderRewardInfos, + self, ServiceProviderDCSessions, ServiceProviderPromotions, ServiceProviderRewardInfos, }, speedtests::Speedtest, speedtests_average::SpeedtestAverage, @@ -2232,7 +2229,6 @@ mod test { let total_sp_rewards = service_provider::get_scheduled_tokens(&epoch); let sp_reward_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(sp1, dec!(1000))]), - ServiceProviderFunds::default(), ServiceProviderPromotions::default(), total_sp_rewards, mobile_bone_price, @@ -2278,7 +2274,6 @@ mod test { let sp_reward_infos = ServiceProviderRewardInfos::new( // force the service provider to have spend more DC than total rewardable ServiceProviderDCSessions::from([(sp1, total_rewards_value_in_dc * dec!(2.0))]), - ServiceProviderFunds::default(), ServiceProviderPromotions::default(), total_rewards_value_in_dc, mobile_bone_price, @@ -2324,7 +2319,6 @@ mod test { let sp_reward_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(sp1, dec!(100_000_000))]), - ServiceProviderFunds::default(), ServiceProviderPromotions::default(), total_sp_rewards_in_bones, mobile_bone_price, @@ -2370,7 +2364,6 @@ mod test { let sp_reward_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(sp1, dec!(100_000_000_000))]), - ServiceProviderFunds::default(), ServiceProviderPromotions::default(), total_sp_rewards_in_bones, mobile_bone_price, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index fb6fa2b3d..a433594e7 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -7,7 +7,8 @@ use crate::{ self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets, MapperShares, TransferRewards, }, - service_provider, sp_boosted_rewards_bans, speedtests, + service_provider::{self, ServiceProviderDCSessions, ServiceProviderPromotions}, + sp_boosted_rewards_bans, speedtests, speedtests_average::SpeedtestAverages, subscriber_location, subscriber_verified_mapping_event, telemetry, Settings, }; @@ -273,9 +274,13 @@ where reward_mappers(&self.pool, &self.mobile_rewards, reward_period).await?; // process rewards for service providers + let dc_sessions = + service_provider::get_dc_sessions(&self.pool, &self.carrier_client, reward_period) + .await?; + let sp_promotions = service_provider::get_promotions(&self.carrier_client).await?; reward_service_providers( - &self.pool, - &self.carrier_client, + dc_sessions, + sp_promotions.clone(), &self.mobile_rewards, reward_period, mobile_bone_price, @@ -296,8 +301,6 @@ where coverage::clear_coverage_objects(&mut transaction, &reward_period.start).await?; sp_boosted_rewards_bans::clear_bans(&mut transaction, reward_period.start).await?; subscriber_verified_mapping_event::clear(&mut transaction, &reward_period.start).await?; - service_provider::db::clear_promotion_rewards(&mut transaction, &reward_period.start) - .await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); @@ -313,7 +316,7 @@ where boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal { value: poc_dc_shares.boost.to_string(), }), - // sp_allocations: service_provider::reward_data_sp_allocations(&self.pool).await?, + service_provider_promotions: sp_promotions.into_proto(), }; self.reward_manifests .write( @@ -597,23 +600,19 @@ pub async fn reward_oracles( } pub async fn reward_service_providers( - pool: &Pool, - carrier_client: &impl CarrierServiceVerifier, + dc_sessions: ServiceProviderDCSessions, + sp_promotions: ServiceProviderPromotions, mobile_rewards: &FileSinkClient, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result<()> { - use service_provider::{db, ServiceProviderRewardInfos}; - let dc_sessions = db::fetch_dc_sessions(pool, carrier_client, reward_period).await?; - let promo_funds = db::fetch_promotion_funds(pool).await?; - let promo_rewards = db::fetch_promotion_rewards(pool, carrier_client, reward_period).await?; + use service_provider::ServiceProviderRewardInfos; let total_sp_rewards = service_provider::get_scheduled_tokens(reward_period); let sps = ServiceProviderRewardInfos::new( dc_sessions, - promo_funds, - promo_rewards, + sp_promotions, total_sp_rewards, mobile_bone_price, reward_period.clone(), diff --git a/mobile_verifier/src/service_provider/dc_sessions.rs b/mobile_verifier/src/service_provider/dc_sessions.rs index 1888c3bfa..3c1de37bf 100644 --- a/mobile_verifier/src/service_provider/dc_sessions.rs +++ b/mobile_verifier/src/service_provider/dc_sessions.rs @@ -12,7 +12,7 @@ use crate::{ use super::ServiceProviderId; -pub async fn fetch_dc_sessions( +pub async fn get_dc_sessions( pool: &PgPool, carrier_client: &impl CarrierServiceVerifier, reward_period: &Range>, @@ -106,7 +106,7 @@ where pub mod tests { use chrono::Duration; - use helium_proto::ServiceProvider; + use helium_proto::{ServiceProvider, ServiceProviderPromotions}; use crate::data_session::HotspotDataSession; @@ -133,6 +133,12 @@ pub mod tests { ) -> Result { Ok(ServiceProvider::HeliumMobile) } + + async fn list_incentive_promotions( + &self, + ) -> Result, Self::Error> { + Ok(vec![]) + } } // Save multiple data sessions with different payers @@ -161,7 +167,7 @@ pub mod tests { let epoch = now - Duration::hours(24)..now; // dc sessions should represent single payer, and all dc is combined - let map = fetch_dc_sessions(&pool, &MockClient, &epoch).await?; + let map = get_dc_sessions(&pool, &MockClient, &epoch).await?; assert_eq!(map.len(), 1); assert_eq!(map.all_transfer(), Decimal::from(4_000)); diff --git a/mobile_verifier/src/service_provider/mod.rs b/mobile_verifier/src/service_provider/mod.rs index 9a6f90885..19439df37 100644 --- a/mobile_verifier/src/service_provider/mod.rs +++ b/mobile_verifier/src/service_provider/mod.rs @@ -1,20 +1,14 @@ use std::ops::Range; use chrono::{DateTime, Utc}; -use helium_proto::ServiceProviderAllocation; -pub use promotions::daemon::PromotionDaemon; -pub use reward::ServiceProviderRewardInfos; -use sqlx::PgPool; -pub mod dc_sessions; -pub mod promotions; -pub mod reward; +pub use dc_sessions::{get_dc_sessions, ServiceProviderDCSessions}; +pub use promotions::{get_promotions, ServiceProviderPromotions}; +pub use reward::ServiceProviderRewardInfos; -pub mod db { - pub use super::dc_sessions::fetch_dc_sessions; - pub use super::promotions::funds::fetch_promotion_funds; - pub use super::promotions::rewards::{clear_promotion_rewards, fetch_promotion_rewards}; -} +mod dc_sessions; +mod promotions; +mod reward; // This type is used in lieu of the helium_proto::ServiceProvider enum so we can // handle more than a single value without adding a hard deploy dependency to @@ -25,19 +19,3 @@ pub fn get_scheduled_tokens(reward_period: &Range>) -> rust_decima let duration = reward_period.end - reward_period.start; crate::reward_shares::get_scheduled_tokens_for_service_providers(duration) } - -pub async fn reward_data_sp_allocations( - pool: &PgPool, -) -> anyhow::Result> { - let funds = db::fetch_promotion_funds(pool).await?; - let mut sp_allocations = vec![]; - - for (sp_id, bps) in funds.0.into_iter() { - sp_allocations.push(ServiceProviderAllocation { - service_provider: sp_id, - incentive_escrow_fund_bps: bps as u32, - }); - } - - Ok(sp_allocations) -} diff --git a/mobile_verifier/src/service_provider/promotions.rs b/mobile_verifier/src/service_provider/promotions.rs new file mode 100644 index 000000000..a731f2f2c --- /dev/null +++ b/mobile_verifier/src/service_provider/promotions.rs @@ -0,0 +1,51 @@ +use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; + +use crate::service_provider::ServiceProviderId; + +mod proto { + pub use helium_proto::{service_provider_promotions::Promotion, ServiceProviderPromotions}; +} + +pub async fn get_promotions( + client: &impl CarrierServiceVerifier, +) -> anyhow::Result { + let promos = client.list_incentive_promotions().await?; + Ok(ServiceProviderPromotions(promos)) +} + +#[derive(Debug, Default, Clone)] +pub struct ServiceProviderPromotions(Vec); + +impl ServiceProviderPromotions { + pub fn into_proto(self) -> Vec { + self.0 + } + + pub(crate) fn get_fund_percent(&self, sp_id: ServiceProviderId) -> Decimal { + for promo in &self.0 { + if promo.service_provider == sp_id { + return Decimal::from(promo.incentive_escrow_fund_bps) / dec!(10_000); + } + } + + dec!(0) + } + + pub(crate) fn get_active_promotions(&self, sp_id: ServiceProviderId) -> Vec { + for promo in &self.0 { + if promo.service_provider == sp_id { + return promo.promotions.clone(); + } + } + + vec![] + } +} + +impl From> for ServiceProviderPromotions { + fn from(value: Vec) -> Self { + Self(value) + } +} diff --git a/mobile_verifier/src/service_provider/promotions/daemon.rs b/mobile_verifier/src/service_provider/promotions/daemon.rs deleted file mode 100644 index 57bf11c4f..000000000 --- a/mobile_verifier/src/service_provider/promotions/daemon.rs +++ /dev/null @@ -1,262 +0,0 @@ -use std::time::{Duration, Instant}; - -use chrono::Utc; -use file_store::{ - file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::FileSinkClient, - file_source, - file_upload::FileUpload, - promotion_reward::{Entity, PromotionReward}, - traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, - FileType, -}; -use futures::{StreamExt, TryFutureExt}; -use helium_proto::{ - services::{ - mobile_config::NetworkKeyRole, - poc_mobile::{ - PromotionRewardIngestReportV1, PromotionRewardStatus, VerifiedPromotionRewardV1, - }, - }, - ServiceProviderPromotionFundV1, -}; -use mobile_config::{ - client::{ - authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, - AuthorizationClient, EntityClient, - }, - GatewayClient, -}; -use sqlx::PgPool; -use task_manager::{ManagedTask, TaskManager}; -use tokio::sync::mpsc::Receiver; - -use crate::{ - service_provider::promotions::{funds, rewards}, - GatewayResolver, Settings, -}; - -pub struct PromotionDaemon { - pool: PgPool, - gateway_info_resolver: GatewayClient, - authorization_verifier: AuthorizationClient, - entity_verifier: EntityClient, - promotion_funds: Receiver>, - promotion_rewards: Receiver>, - promotion_rewards_sink: FileSinkClient, -} - -impl ManagedTask for PromotionDaemon { - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> futures::prelude::future::LocalBoxFuture<'static, anyhow::Result<()>> { - let handle = tokio::spawn(self.run(shutdown)); - Box::pin( - handle - .map_err(anyhow::Error::from) - .and_then(|result| async move { result.map_err(anyhow::Error::from) }), - ) - } -} - -impl PromotionDaemon { - #[allow(clippy::too_many_arguments)] - pub async fn create_managed_task( - pool: PgPool, - settings: &Settings, - file_upload: FileUpload, - report_file_store: file_store::FileStore, - promotion_file_store: file_store::FileStore, - gateway_info_resolver: GatewayClient, - authorization_verifier: AuthorizationClient, - entity_verifier: EntityClient, - ) -> anyhow::Result { - let (promotion_rewards_sink, valid_promotion_rewards_server) = - VerifiedPromotionRewardV1::file_sink( - settings.store_base_path(), - file_upload.clone(), - FileSinkCommitStrategy::Automatic, - FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), - env!("CARGO_PKG_NAME"), - ) - .await?; - - let (promotion_rewards, promotion_rewards_server) = - file_source::Continuous::msg_source::() - .state(pool.clone()) - .store(report_file_store.clone()) - .lookback(LookbackBehavior::StartAfter(settings.start_after)) - .prefix(FileType::PromotionRewardIngestReport.to_string()) - .create() - .await?; - - let (promotion_funds, promotion_funds_server) = - file_source::Continuous::prost_source::() - .state(pool.clone()) - .store(promotion_file_store) - .lookback(LookbackBehavior::StartAfter(settings.start_after)) - .prefix(FileType::ServiceProviderPromotionFund.to_string()) - .create() - .await?; - - let promotion_reward_daemon = Self { - pool, - gateway_info_resolver, - authorization_verifier, - entity_verifier, - promotion_funds, - promotion_rewards, - promotion_rewards_sink, - }; - - Ok(TaskManager::builder() - .add_task(valid_promotion_rewards_server) - .add_task(promotion_funds_server) - .add_task(promotion_rewards_server) - .add_task(promotion_reward_daemon) - .build()) - } - - async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - loop { - tokio::select! { - _ = shutdown.clone() => { - tracing::info!("PromotionRewardDaemon shutting down"); - break; - } - Some(file) = self.promotion_rewards.recv() => { - let start = Instant::now(); - self.process_rewards_file(file).await?; - metrics::histogram!("promotion_reward_processing_time").record(start.elapsed()); - } - Some(file) = self.promotion_funds.recv() => { - let start = Instant::now(); - self.process_funds_file(file).await?; - metrics::histogram!("promotion_funds_processing_time").record(start.elapsed()); - } - } - } - - Ok(()) - } - - async fn process_rewards_file( - &self, - file: FileInfoStream, - ) -> anyhow::Result<()> { - tracing::info!(key = file.file_info.key, "Processing promotion reward file"); - - let mut transaction = self.pool.begin().await?; - let mut promotion_rewards = file.into_stream(&mut transaction).await?; - - while let Some(promotion_reward) = promotion_rewards.next().await { - let promotion_reward_status = validate_promotion_reward( - &promotion_reward, - &self.authorization_verifier, - &self.gateway_info_resolver, - &self.entity_verifier, - ) - .await?; - - if promotion_reward_status == PromotionRewardStatus::Valid { - rewards::save_promotion_reward(&mut transaction, &promotion_reward).await?; - } - - write_promotion_reward( - &self.promotion_rewards_sink, - &promotion_reward, - promotion_reward_status, - ) - .await?; - } - - self.promotion_rewards_sink.commit().await?; - transaction.commit().await?; - - Ok(()) - } - - async fn process_funds_file( - &self, - file: FileInfoStream, - ) -> anyhow::Result<()> { - tracing::info!(key = file.file_info.key, "Processing promotion funds file"); - - let mut txn = self.pool.begin().await?; - - let mut promotion_funds = file.into_stream(&mut txn).await?; - while let Some(promotion_fund) = promotion_funds.next().await { - funds::save_promotion_fund( - &mut txn, - promotion_fund.service_provider, - promotion_fund.bps as u16, - ) - .await?; - } - - txn.commit().await?; - - Ok(()) - } -} - -async fn validate_promotion_reward( - promotion_reward: &PromotionReward, - authorization_verifier: &impl AuthorizationVerifier, - gateway_info_resolver: &impl GatewayResolver, - entity_verifier: &impl EntityVerifier, -) -> anyhow::Result { - if authorization_verifier - .verify_authorized_key( - &promotion_reward.carrier_pub_key, - NetworkKeyRole::MobileCarrier, - ) - .await - .is_err() - { - return Ok(PromotionRewardStatus::InvalidCarrierKey); - } - match &promotion_reward.entity { - Entity::SubscriberId(ref subscriber_id) - if entity_verifier - .verify_rewardable_entity(subscriber_id) - .await - .is_err() => - { - Ok(PromotionRewardStatus::InvalidSubscriberId) - } - Entity::GatewayKey(ref gateway_key) - if gateway_info_resolver - .resolve_gateway(gateway_key) - .await? - .is_not_found() => - { - Ok(PromotionRewardStatus::InvalidGatewayKey) - } - _ => Ok(PromotionRewardStatus::Valid), - } -} - -async fn write_promotion_reward( - file_sink: &FileSinkClient, - promotion_reward: &PromotionReward, - status: PromotionRewardStatus, -) -> anyhow::Result<()> { - file_sink - .write( - VerifiedPromotionRewardV1 { - report: Some(PromotionRewardIngestReportV1 { - received_timestamp: promotion_reward - .received_timestamp - .encode_timestamp_millis(), - report: Some(promotion_reward.clone().into()), - }), - status: status as i32, - timestamp: Utc::now().encode_timestamp_millis(), - }, - &[("validity", status.as_str_name())], - ) - .await?; - Ok(()) -} diff --git a/mobile_verifier/src/service_provider/promotions/funds.rs b/mobile_verifier/src/service_provider/promotions/funds.rs deleted file mode 100644 index 70a0aa28e..000000000 --- a/mobile_verifier/src/service_provider/promotions/funds.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::collections::HashMap; - -use chrono::Utc; -use rust_decimal::Decimal; -use rust_decimal_macros::dec; -use sqlx::{PgPool, Postgres, Transaction}; - -use crate::service_provider::ServiceProviderId; - -#[derive(Debug, Default)] -pub struct ServiceProviderFunds(pub(crate) HashMap); - -impl ServiceProviderFunds { - pub fn get_fund_percent(&self, service_provider_id: ServiceProviderId) -> Decimal { - let bps = self - .0 - .get(&service_provider_id) - .cloned() - .unwrap_or_default(); - Decimal::from(bps) / dec!(10_000) - } -} - -impl From for ServiceProviderFunds -where - F: IntoIterator, - I: Into, -{ - fn from(funds: F) -> Self { - Self(funds.into_iter().map(|(k, v)| (k.into(), v)).collect()) - } -} - -pub async fn fetch_promotion_funds(pool: &PgPool) -> anyhow::Result { - #[derive(Debug, sqlx::FromRow)] - struct PromotionFund { - #[sqlx(try_from = "i64")] - pub service_provider: ServiceProviderId, - #[sqlx(try_from = "i64")] - pub basis_points: u16, - } - - let funds = sqlx::query_as::<_, PromotionFund>( - r#" - SELECT - service_provider, basis_points - FROM - service_provider_promotion_funds - "#, - ) - .fetch_all(pool) - .await?; - - let funds = funds - .into_iter() - .map(|fund| (fund.service_provider, fund.basis_points)) - .collect(); - - Ok(ServiceProviderFunds(funds)) -} - -pub async fn save_promotion_fund( - transaction: &mut Transaction<'_, Postgres>, - service_provider_id: ServiceProviderId, - basis_points: u16, -) -> anyhow::Result<()> { - sqlx::query( - r#" - INSERT INTO service_provider_promotion_funds - (service_provider, basis_points, inserted_at) - VALUES - ($1, $2, $3) - "#, - ) - .bind(service_provider_id) - .bind(basis_points as i64) - .bind(Utc::now()) - .execute(transaction) - .await?; - - Ok(()) -} - -pub async fn delete_promotion_fund( - pool: &PgPool, - service_provider_id: ServiceProviderId, -) -> anyhow::Result<()> { - sqlx::query( - r#" - DELETE FROM service_provider_promotion_funds - WHERE service_provider = $1 - "#, - ) - .bind(service_provider_id) - .execute(pool) - .await?; - - Ok(()) -} diff --git a/mobile_verifier/src/service_provider/promotions/mod.rs b/mobile_verifier/src/service_provider/promotions/mod.rs deleted file mode 100644 index 766b23940..000000000 --- a/mobile_verifier/src/service_provider/promotions/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod daemon; -pub mod funds; -pub mod rewards; diff --git a/mobile_verifier/src/service_provider/promotions/rewards.rs b/mobile_verifier/src/service_provider/promotions/rewards.rs deleted file mode 100644 index 644acf85a..000000000 --- a/mobile_verifier/src/service_provider/promotions/rewards.rs +++ /dev/null @@ -1,180 +0,0 @@ -use std::ops::Range; - -use chrono::{DateTime, Utc}; -use file_store::promotion_reward::{Entity, PromotionReward}; - -use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; -use rust_decimal::Decimal; -use sqlx::{PgPool, Postgres, Transaction}; - -use crate::service_provider::ServiceProviderId; - -#[derive(Debug, Default, Clone, PartialEq)] -pub struct ServiceProviderPromotions(Vec); - -impl ServiceProviderPromotions { - pub fn for_service_provider( - &self, - service_provider_id: ServiceProviderId, - ) -> ServiceProviderPromotions { - let promotions = self - .0 - .iter() - .filter(|x| x.service_provider_id == service_provider_id) - .cloned() - .collect(); - Self(promotions) - } - - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - pub fn total_shares(&self) -> Decimal { - self.0.iter().map(|x| Decimal::from(x.shares)).sum() - } - - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } -} - -impl From for ServiceProviderPromotions -where - F: IntoIterator, -{ - fn from(promotions: F) -> Self { - Self(promotions.into_iter().collect()) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct PromotionRewardShare { - pub service_provider_id: ServiceProviderId, - pub rewardable_entity: Entity, - pub shares: u64, -} - -pub async fn save_promotion_reward( - transaction: &mut Transaction<'_, Postgres>, - promotion_reward: &PromotionReward, -) -> anyhow::Result<()> { - match &promotion_reward.entity { - Entity::SubscriberId(subscriber_id) => { - sqlx::query( - r#" - INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - "# - ) - .bind(promotion_reward.timestamp) - .bind(subscriber_id) - .bind(&promotion_reward.carrier_pub_key) - .bind(promotion_reward.shares as i64) - .execute(&mut *transaction) - .await?; - } - Entity::GatewayKey(gateway_key) => { - sqlx::query( - r#" - INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - "# - ) - .bind(promotion_reward.timestamp) - .bind(gateway_key) - .bind(&promotion_reward.carrier_pub_key) - .bind(promotion_reward.shares as i64) - .execute(&mut *transaction) - .await?; - } - } - Ok(()) -} - -pub async fn clear_promotion_rewards( - tx: &mut Transaction<'_, Postgres>, - timestamp: &DateTime, -) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM subscriber_promotion_rewards WHERE time_of_reward < $1") - .bind(timestamp) - .execute(&mut *tx) - .await?; - sqlx::query("DELETE FROM gateway_promotion_rewards WHERE time_of_reward < $1") - .bind(timestamp) - .execute(&mut *tx) - .await?; - Ok(()) -} - -pub async fn fetch_promotion_rewards( - _pool: &PgPool, - _carrier: &impl CarrierServiceVerifier, - _epoch: &Range>, -) -> anyhow::Result { - Ok(ServiceProviderPromotions::default()) - - // let rewards = sqlx::query_as( - // r#" - // SELECT - // subscriber_id, NULL as gateway_key, SUM(shares)::bigint as shares, carrier_key - // FROM - // subscriber_promotion_rewards - // WHERE - // time_of_reward >= $1 AND time_of_reward < $2 - // GROUP BY - // subscriber_id, carrier_key - // UNION - // SELECT - // NULL as subscriber_id, gateway_key, SUM(shares)::bigint as shares, carrier_key - // FROM - // gateway_promotion_rewards - // WHERE - // time_of_reward >= $1 AND time_of_reward < $2 - // GROUP - // BY gateway_key, carrier_key - // "#, - // ) - // .bind(epoch.start) - // .bind(epoch.end) - // .fetch(pool) - // .map_err(anyhow::Error::from) - // .and_then(|x: DbPromotionRewardShares| async move { - // let service_provider_id = carrier - // .payer_key_to_service_provider(&x.carrier_key.to_string()) - // .await?; - // Ok(PromotionRewardShare { - // service_provider_id: service_provider_id as ServiceProviderId, - // rewardable_entity: x.rewardable_entity, - // shares: x.shares, - // }) - // }) - // .try_collect() - // .await?; - - // Ok(ServiceProviderPromotions(rewards)) -} - -// struct DbPromotionRewardShares { -// pub carrier_key: PublicKeyBinary, -// pub rewardable_entity: Entity, -// pub shares: u64, -// } - -// impl sqlx::FromRow<'_, PgRow> for DbPromotionRewardShares { -// fn from_row(row: &PgRow) -> sqlx::Result { -// let subscriber_id: Option> = row.try_get("subscriber_id")?; -// let shares: i64 = row.try_get("shares")?; -// Ok(Self { -// rewardable_entity: if let Some(subscriber_id) = subscriber_id { -// Entity::SubscriberId(subscriber_id) -// } else { -// Entity::GatewayKey(row.try_get("gateway_key")?) -// }, -// shares: shares as u64, -// carrier_key: row.try_get("carrier_key")?, -// }) -// } -// } diff --git a/mobile_verifier/src/service_provider/reward.rs b/mobile_verifier/src/service_provider/reward.rs index b16ec3057..9bb11af79 100644 --- a/mobile_verifier/src/service_provider/reward.rs +++ b/mobile_verifier/src/service_provider/reward.rs @@ -8,14 +8,14 @@ use rust_decimal_macros::dec; use crate::reward_shares::{dc_to_mobile_bones, DEFAULT_PREC}; -use super::{ - dc_sessions::ServiceProviderDCSessions, - promotions::{funds::ServiceProviderFunds, rewards::ServiceProviderPromotions}, -}; +use super::{dc_sessions::ServiceProviderDCSessions, promotions::ServiceProviderPromotions}; mod proto { - pub use helium_proto::services::poc_mobile::{ - mobile_reward_share::Reward, MobileRewardShare, ServiceProviderReward, + pub use helium_proto::{ + service_provider_promotions::Promotion, + services::poc_mobile::{ + mobile_reward_share::Reward, MobileRewardShare, PromotionReward, ServiceProviderReward, + }, }; } @@ -66,15 +66,14 @@ struct RewardInfo { // % matched promotions from unallocated, can never exceed realized_promo_perc matched_promo_perc: Decimal, - // Rewards for the epoch - promotion_rewards: ServiceProviderPromotions, + // Active promotions for the epoch + promotions: Vec, } impl ServiceProviderRewardInfos { pub fn new( dc_sessions: ServiceProviderDCSessions, - promo_funds: ServiceProviderFunds, - rewards: ServiceProviderPromotions, + promotions: ServiceProviderPromotions, total_sp_allocation: Decimal, mobile_bone_price: Decimal, reward_epoch: Range>, @@ -83,24 +82,28 @@ impl ServiceProviderRewardInfos { let mut me = Self { coll: vec![], - all_transfer, total_sp_allocation, + all_transfer, mobile_bone_price, reward_epoch, }; let used_allocation = total_sp_allocation.max(all_transfer); for (dc_session, dc_transfer) in dc_sessions.iter() { - let promo_fund_perc = promo_funds.get_fund_percent(dc_session); + let promo_fund_perc = promotions.get_fund_percent(dc_session); + let promos = promotions.get_active_promotions(dc_session); + me.coll.push(RewardInfo::new( dc_session, dc_transfer, promo_fund_perc, used_allocation, - rewards.for_service_provider(dc_session), + promos, )); } + me.coll.sort_by_key(|x| x.sp_id); + distribute_unallocated(&mut me.coll); me @@ -113,6 +116,7 @@ impl ServiceProviderRewardInfos { self.mobile_bone_price, ); let sp_rewards = self.total_sp_allocation * rewards_per_share; + // NOTE(mj): `rewards_per_share * self.dc` vs `sp_rewards * self.dc_perc` // They're veeeeery close. But the % multiplication produces a floating point number // that will typically be rounded down. @@ -125,20 +129,20 @@ impl ServiceProviderRewardInfos { rewards }) .filter(|(amount, _r)| *amount > 0) - .collect() + .collect::>() } } impl RewardInfo { - pub fn new( + fn new( sp_id: i32, dc_transfer: Decimal, promo_fund_perc: Decimal, total_sp_allocation: Decimal, - rewards: ServiceProviderPromotions, + promotions: Vec, ) -> Self { let dc_perc = dc_transfer / total_sp_allocation; - let realized_promo_perc = if rewards.is_empty() { + let realized_promo_perc = if promotions.is_empty() { dec!(0) } else { dc_perc * promo_fund_perc @@ -155,7 +159,7 @@ impl RewardInfo { realized_dc_perc, matched_promo_perc: dec!(0), - promotion_rewards: rewards, + promotions, } } @@ -183,46 +187,49 @@ impl RewardInfo { pub fn promo_rewards( &self, - _total_allocation: Decimal, - _reward_period: &Range>, + total_allocation: Decimal, + reward_period: &Range>, ) -> Vec<(u64, proto::MobileRewardShare)> { - vec![] - // if self.promotion_rewards.is_empty() { - // return vec![]; - // } - - // let mut rewards = vec![]; - - // let sp_amount = total_allocation * self.realized_promo_perc; - // let matched_amount = total_allocation * self.matched_promo_perc; - - // let total_shares = self.promotion_rewards.total_shares(); - // let sp_amount_per_share = sp_amount / total_shares; - // let matched_amount_per_share = matched_amount / total_shares; - - // for r in self.promotion_rewards.iter() { - // let shares = Decimal::from(r.shares); + if self.promotions.is_empty() { + return vec![]; + } - // let service_provider_amount = (sp_amount_per_share * shares).to_u64_rounded(); - // let matched_amount = (matched_amount_per_share * shares).to_u64_rounded(); + let mut rewards = vec![]; - // let total_amount = service_provider_amount + matched_amount; + let sp_amount = total_allocation * self.realized_promo_perc; + let matched_amount = total_allocation * self.matched_promo_perc; - // rewards.push(( - // total_amount, - // proto::MobileRewardShare { - // start_period: reward_period.start.encode_timestamp(), - // end_period: reward_period.end.encode_timestamp(), - // reward: Some(proto::Reward::PromotionReward(proto::PromotionReward { - // service_provider_amount, - // matched_amount, - // entity: Some(r.rewardable_entity.clone().into()), - // })), - // }, - // )) - // } + let total_shares = self + .promotions + .iter() + .map(|x| Decimal::from(x.shares)) + .sum::(); + let sp_amount_per_share = sp_amount / total_shares; + let matched_amount_per_share = matched_amount / total_shares; + + for r in self.promotions.iter() { + let shares = Decimal::from(r.shares); + + let service_provider_amount = (sp_amount_per_share * shares).to_u64_rounded(); + let matched_amount = (matched_amount_per_share * shares).to_u64_rounded(); + + let total_amount = service_provider_amount + matched_amount; + + rewards.push(( + total_amount, + proto::MobileRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(proto::Reward::PromotionReward(proto::PromotionReward { + service_provider_amount, + matched_amount, + entity: r.entity.to_owned(), + })), + }, + )) + } - // rewards + rewards } } @@ -232,7 +239,7 @@ fn distribute_unallocated(coll: &mut [RewardInfo]) { let maybe_matching_perc = coll .iter() - .filter(|x| !x.promotion_rewards.is_empty()) + .filter(|x| !x.promotions.is_empty()) .map(|x| x.realized_promo_perc) .sum::(); @@ -248,7 +255,7 @@ fn distribute_unalloc_over_limit(coll: &mut [RewardInfo], unallocated_perc: Deci let total = coll.iter().map(|x| x.realized_promo_perc).sum::() * dec!(100); for sp in coll.iter_mut() { - if sp.promotion_rewards.is_empty() { + if sp.promotions.is_empty() { continue; } let shares = sp.realized_promo_perc * dec!(100); @@ -258,7 +265,7 @@ fn distribute_unalloc_over_limit(coll: &mut [RewardInfo], unallocated_perc: Deci fn distribute_unalloc_under_limit(coll: &mut [RewardInfo]) { for sp in coll.iter_mut() { - if sp.promotion_rewards.is_empty() { + if sp.promotions.is_empty() { continue; } sp.matched_promo_perc = sp.realized_promo_perc @@ -282,134 +289,84 @@ impl DecimalRoundingExt for Decimal { #[cfg(test)] mod tests { use chrono::Duration; - use file_store::promotion_reward::Entity; - use helium_proto::services::poc_mobile::MobileRewardShare; + use helium_proto::services::poc_mobile::{MobileRewardShare, PromotionReward}; - use crate::service_provider::{self, promotions::rewards::PromotionRewardShare}; + use crate::service_provider; use super::*; - use super::ServiceProviderRewardInfos; - fn epoch() -> Range> { let now = Utc::now(); now - Duration::hours(24)..now } - impl ServiceProviderRewardInfos { - fn iter_sp_rewards(&self, sp_id: i32) -> Vec { - let rewards_per_share = rewards_per_share( - self.all_transfer, - self.total_sp_allocation, - self.mobile_bone_price, - ); - let sp_rewards = self.total_sp_allocation * rewards_per_share; - - for info in self.coll.iter() { - if info.sp_id == sp_id { - let mut result = info.promo_rewards(sp_rewards, &self.reward_epoch); - result.push(info.carrier_reward(sp_rewards, &self.reward_epoch)); - return result.into_iter().map(|(_, x)| x).collect(); - } - } - vec![] - } - - fn single_sp_rewards(&self, sp_id: i32) -> proto::ServiceProviderReward { - let binding = self.iter_sp_rewards(sp_id); - let mut rewards = binding.iter(); - rewards.next().cloned().unwrap().sp_reward() - } - - // fn single_sp_rewards( - // &self, - // sp_id: i32, - // ) -> (proto::PromotionReward, proto::ServiceProviderReward) { - // let binding = self.iter_sp_rewards(sp_id); - // let mut rewards = binding.iter(); - - // let promo = rewards.next().cloned().unwrap().promotion_reward(); - // let sp = rewards.next().cloned().unwrap().sp_reward(); + #[test] + fn no_promotions() { + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dec!(12)), (1, dec!(6))]), + ServiceProviderPromotions::default(), + dec!(100), + dec!(0.00001), + epoch(), + ); - // (promo, sp) - // } - } + let mut iter = sp_infos.iter_rewards().into_iter(); - trait RewardExt { - // fn promotion_reward(self) -> proto::PromotionReward; - fn sp_reward(self) -> proto::ServiceProviderReward; - } + let sp_1 = iter.next().unwrap().1.sp_reward(); + let sp_2 = iter.next().unwrap().1.sp_reward(); - impl RewardExt for proto::MobileRewardShare { - // fn promotion_reward(self) -> proto::PromotionReward { - // match self.reward { - // Some(proto::Reward::PromotionReward(promo)) => promo.clone(), - // other => panic!("expected promotion reward, got {other:?}"), - // } - // } + assert_eq!(sp_1.amount, 12); + assert_eq!(sp_2.amount, 6); - fn sp_reward(self) -> proto::ServiceProviderReward { - match self.reward { - Some(proto::Reward::ServiceProviderReward(promo)) => promo.clone(), - other => panic!("expected sp reward, got {other:?}"), - } - } + assert_eq!(None, iter.next()); } #[test] fn unallocated_reward_scaling_1() { let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(0, dec!(12)), (1, dec!(6))]), - ServiceProviderFunds::from([(0, 5000), (1, 5000)]), - ServiceProviderPromotions::from([ - PromotionRewardShare { - service_provider_id: 0, - rewardable_entity: Entity::SubscriberId(vec![0]), - shares: 1, - }, - PromotionRewardShare { - service_provider_id: 1, - rewardable_entity: Entity::SubscriberId(vec![1]), - shares: 1, - }, + ServiceProviderPromotions::from(vec![ + make_test_promotion(0, "promo-0", 5000, 1), + make_test_promotion(1, "promo-1", 5000, 1), ]), dec!(100), dec!(0.00001), epoch(), ); - let sp_1 = sp_infos.single_sp_rewards(0); + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 6); + assert_eq!(promo_1.matched_amount, 6); assert_eq!(sp_1.amount, 6); - let sp_2 = sp_infos.single_sp_rewards(1); + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 3); + assert_eq!(promo_2.matched_amount, 3); assert_eq!(sp_2.amount, 3); - - // let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); - // assert_eq!(promo_1.service_provider_amount, 6); - // assert_eq!(promo_1.matched_amount, 6); - // assert_eq!(sp_1.amount, 6); - - // let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); - // assert_eq!(promo_2.service_provider_amount, 3); - // assert_eq!(promo_2.matched_amount, 3); - // assert_eq!(sp_2.amount, 3); } #[test] fn unallocated_reward_scaling_2() { let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(0, dec!(12)), (1, dec!(6))]), - ServiceProviderFunds::from([(0, 5000), (1, 10000)]), - ServiceProviderPromotions::from([ - PromotionRewardShare { - service_provider_id: 0, - rewardable_entity: Entity::SubscriberId(vec![0]), - shares: 1, + ServiceProviderPromotions::from(vec![ + helium_proto::ServiceProviderPromotions { + service_provider: 0, + incentive_escrow_fund_bps: 5000, + promotions: vec![helium_proto::service_provider_promotions::Promotion { + entity: "promo-0".to_string(), + shares: 1, + ..Default::default() + }], }, - PromotionRewardShare { - service_provider_id: 1, - rewardable_entity: Entity::SubscriberId(vec![1]), - shares: 1, + helium_proto::ServiceProviderPromotions { + service_provider: 1, + incentive_escrow_fund_bps: 10000, + promotions: vec![helium_proto::service_provider_promotions::Promotion { + entity: "promo-1".to_string(), + shares: 1, + ..Default::default() + }], }, ]), dec!(100), @@ -417,72 +374,46 @@ mod tests { epoch(), ); - let sp_1 = sp_infos.single_sp_rewards(0); + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 6); + assert_eq!(promo_1.matched_amount, 6); assert_eq!(sp_1.amount, 6); - let sp_2 = sp_infos.single_sp_rewards(1); + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 6); + assert_eq!(promo_2.matched_amount, 6); assert_eq!(sp_2.amount, 0); - - // let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); - // assert_eq!(promo_1.service_provider_amount, 6); - // assert_eq!(promo_1.matched_amount, 6); - // assert_eq!(sp_1.amount, 6); - - // let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); - // assert_eq!(promo_2.service_provider_amount, 6); - // assert_eq!(promo_2.matched_amount, 6); - // assert_eq!(sp_2.amount, 0); } #[test] fn unallocated_reward_scaling_3() { let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(0, dec!(10)), (1, dec!(1000))]), - ServiceProviderFunds::from([(0, 10000), (1, 200)]), - ServiceProviderPromotions::from([ - PromotionRewardShare { - service_provider_id: 0, - rewardable_entity: Entity::SubscriberId(vec![0]), - shares: 1, - }, - PromotionRewardShare { - service_provider_id: 1, - rewardable_entity: Entity::SubscriberId(vec![1]), - shares: 1, - }, + ServiceProviderPromotions::from(vec![ + make_test_promotion(0, "promo-0", 10000, 1), + make_test_promotion(1, "promo-1", 200, 1), ]), dec!(2000), dec!(0.00001), epoch(), ); - let sp_1 = sp_infos.single_sp_rewards(0); + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 10); + assert_eq!(promo_1.matched_amount, 10); assert_eq!(sp_1.amount, 0); - let sp_2 = sp_infos.single_sp_rewards(1); + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 20); + assert_eq!(promo_2.matched_amount, 20); assert_eq!(sp_2.amount, 980); - - // let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); - // assert_eq!(promo_1.service_provider_amount, 10); - // assert_eq!(promo_1.matched_amount, 10); - // assert_eq!(sp_1.amount, 0); - - // let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); - // assert_eq!(promo_2.service_provider_amount, 20); - // assert_eq!(promo_2.matched_amount, 20); - // assert_eq!(sp_2.amount, 980); } #[test] fn no_rewards_if_none_allocated() { let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(0, dec!(100))]), - ServiceProviderFunds::from([(0, 5000)]), - ServiceProviderPromotions::from([PromotionRewardShare { - service_provider_id: 0, - rewardable_entity: Entity::SubscriberId(vec![0]), - shares: 1, - }]), + ServiceProviderPromotions::from(vec![make_test_promotion(0, "promo-0", 5000, 1)]), dec!(0), dec!(0.0001), epoch(), @@ -491,188 +422,177 @@ mod tests { assert!(sp_infos.iter_rewards().is_empty()); } - // #[test] - // fn no_matched_rewards_if_no_unallocated() { - // let total_rewards = dec!(1000); - - // let sp_infos = ServiceProviderRewardInfos::new( - // ServiceProviderDCSessions::from([(0, total_rewards)]), - // ServiceProviderFunds::from([(0, 5000)]), - // ServiceProviderPromotions::from([PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![0]), - // shares: 1, - // }]), - // total_rewards, - // dec!(0.001), - // epoch(), - // ); - - // let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); - - // assert!(!promo_rewards.is_empty()); - // for reward in promo_rewards { - // assert_eq!(reward.matched_amount, 0); - // } - // } - - // #[test] - // fn single_sp_unallocated_less_than_matched_distributed_by_shares() { - // // 100 unallocated - // let total_rewards = dec!(1100); - // let sp_session = dec!(1000); - - // let sp_infos = ServiceProviderRewardInfos::new( - // ServiceProviderDCSessions::from([(0, sp_session)]), - // ServiceProviderFunds::from([(0, 10000)]), // All rewards allocated to promotions - // ServiceProviderPromotions::from([ - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![0]), - // shares: 1, - // }, - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![1]), - // shares: 2, - // }, - // ]), - // total_rewards, - // dec!(0.00001), - // epoch(), - // ); - - // let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); - // assert_eq!(2, promo_rewards.len()); - - // assert_eq!(promo_rewards[0].service_provider_amount, 333); - // assert_eq!(promo_rewards[0].matched_amount, 33); - // // - // assert_eq!(promo_rewards[1].service_provider_amount, 666); - // assert_eq!(promo_rewards[1].matched_amount, 66); - // } - - // #[test] - // fn single_sp_unallocated_more_than_matched_promotion() { - // // 1,000 unallocated - // let total_rewards = dec!(11_000); - // let sp_session = dec!(1000); - - // let sp_infos = ServiceProviderRewardInfos::new( - // ServiceProviderDCSessions::from([(0, sp_session)]), - // ServiceProviderFunds::from([(0, 10000)]), // All rewards allocated to promotions - // ServiceProviderPromotions::from([ - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![0]), - // shares: 1, - // }, - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![1]), - // shares: 2, - // }, - // ]), - // total_rewards, - // dec!(0.00001), - // epoch(), - // ); - - // let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); - // assert_eq!(2, promo_rewards.len()); - - // assert_eq!(promo_rewards[0].service_provider_amount, 333); - // assert_eq!(promo_rewards[0].matched_amount, 333); - // // - // assert_eq!(promo_rewards[1].service_provider_amount, 666); - // assert_eq!(promo_rewards[1].matched_amount, 666); - // } - - // #[test] - // fn unallocated_matching_does_not_exceed_promotion() { - // // 100 unallocated - // let total_rewards = dec!(1100); - // let sp_session = dec!(1000); - - // let sp_infos = ServiceProviderRewardInfos::new( - // ServiceProviderDCSessions::from([(0, sp_session)]), - // ServiceProviderFunds::from([(0, 100)]), // Severely limit promotion rewards - // ServiceProviderPromotions::from([ - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![0]), - // shares: 1, - // }, - // PromotionRewardShare { - // service_provider_id: 0, - // rewardable_entity: Entity::SubscriberId(vec![1]), - // shares: 2, - // }, - // ]), - // total_rewards, - // dec!(0.00001), - // epoch(), - // ); - - // let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); - // assert_eq!(2, promo_rewards.len()); - - // assert_eq!(promo_rewards[0].service_provider_amount, 3); - // assert_eq!(promo_rewards[0].matched_amount, 3); - // // - // assert_eq!(promo_rewards[1].service_provider_amount, 6); - // assert_eq!(promo_rewards[1].matched_amount, 6); - // } - - // trait PromoRewardFiltersExt { - // fn only_promotion_rewards(&self) -> Vec; - // } - - // impl PromoRewardFiltersExt for Vec<(u64, MobileRewardShare)> { - // fn only_promotion_rewards(&self) -> Vec { - // self.clone() - // .into_iter() - // .filter_map(|(_, r)| { - // if let Some(proto::Reward::PromotionReward(reward)) = r.reward { - // Some(reward) - // } else { - // None - // } - // }) - // .collect() - // } - // } + #[test] + fn no_matched_rewards_if_no_unallocated() { + let total_rewards = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, total_rewards)]), + ServiceProviderPromotions::from(vec![make_test_promotion(0, "promo-0", 5000, 1)]), + total_rewards, + dec!(0.001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + + assert!(!promo_rewards.is_empty()); + for reward in promo_rewards { + assert_eq!(reward.matched_amount, 0); + } + } + + #[test] + fn single_sp_unallocated_less_than_matched_distributed_by_shares() { + // 100 unallocated + let total_rewards = dec!(1100); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderPromotions::from(vec![helium_proto::ServiceProviderPromotions { + service_provider: 0, + incentive_escrow_fund_bps: 10000, + promotions: vec![ + helium_proto::service_provider_promotions::Promotion { + entity: "promo-0".to_string(), + shares: 1, + ..Default::default() + }, + helium_proto::service_provider_promotions::Promotion { + entity: "promo-1".to_string(), + shares: 2, + ..Default::default() + }, + ], + }]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 333); + assert_eq!(promo_rewards[0].matched_amount, 33); + // + assert_eq!(promo_rewards[1].service_provider_amount, 666); + assert_eq!(promo_rewards[1].matched_amount, 66); + } + + #[test] + fn single_sp_unallocated_more_than_matched_promotion() { + // 1,000 unallocated + let total_rewards = dec!(11_000); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderPromotions::from(vec![helium_proto::ServiceProviderPromotions { + service_provider: 0, + incentive_escrow_fund_bps: 10000, + promotions: vec![ + helium_proto::service_provider_promotions::Promotion { + entity: "promo-0".to_string(), + shares: 1, + ..Default::default() + }, + helium_proto::service_provider_promotions::Promotion { + entity: "promo-1".to_string(), + shares: 2, + ..Default::default() + }, + ], + }]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 333); + assert_eq!(promo_rewards[0].matched_amount, 333); + // + assert_eq!(promo_rewards[1].service_provider_amount, 666); + assert_eq!(promo_rewards[1].matched_amount, 666); + } + + #[test] + fn unallocated_matching_does_not_exceed_promotion() { + // 100 unallocated + let total_rewards = dec!(1100); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderPromotions::from(vec![helium_proto::ServiceProviderPromotions { + service_provider: 0, + incentive_escrow_fund_bps: 100, // severely limit promotions + promotions: vec![ + helium_proto::service_provider_promotions::Promotion { + entity: "promo-0".to_string(), + shares: 1, + ..Default::default() + }, + helium_proto::service_provider_promotions::Promotion { + entity: "promo-1".to_string(), + shares: 2, + ..Default::default() + }, + ], + }]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 3); + assert_eq!(promo_rewards[0].matched_amount, 3); + // + assert_eq!(promo_rewards[1].service_provider_amount, 6); + assert_eq!(promo_rewards[1].matched_amount, 6); + } use proptest::prelude::*; prop_compose! { - fn arb_share()(sp_id in 0..10_i32, ent_id in 0..200u8, shares in 1..=100u64) -> PromotionRewardShare { - PromotionRewardShare { - service_provider_id: sp_id, - rewardable_entity: Entity::SubscriberId(vec![ent_id]), - shares - } + fn arb_promotion()(entity: String, shares in 1..=100u32) -> helium_proto::service_provider_promotions::Promotion { + proto::Promotion { entity, shares, ..Default::default() } } } prop_compose! { - fn arb_dc_session()( + fn arb_sp_promotion()( sp_id in 0..10_i32, - // below 1 trillion - dc_session in (0..=1_000_000_000_000_u64).prop_map(Decimal::from) - ) -> (i32, Decimal) { - (sp_id, dc_session) + bps in arb_bps(), + promotions in prop::collection::vec(arb_promotion(), 0..10) + ) -> helium_proto::ServiceProviderPromotions { + helium_proto::ServiceProviderPromotions { + service_provider: sp_id, + incentive_escrow_fund_bps: bps, + promotions + } } } prop_compose! { - fn arb_fund()(sp_id in 0..10_i32, bps in arb_bps()) -> (i32, u16) { - (sp_id, bps) - } + fn arb_bps()(bps in 0..=10_000u32) -> u32 { bps } } prop_compose! { - fn arb_bps()(bps in 0..=10_000u16) -> u16 { bps } + fn arb_dc_session()( + sp_id in 0..10_i32, + // below 1 trillion + dc_session in (0..=1_000_000_000_000_u64).prop_map(Decimal::from) + ) -> (i32, Decimal) { + (sp_id, dc_session) + } } proptest! { @@ -681,15 +601,13 @@ mod tests { #[test] fn single_provider_does_not_overallocate( dc_session in any::().prop_map(Decimal::from), - fund_bps in arb_bps(), - shares in prop::collection::vec(arb_share(), 0..10), + promotions in prop::collection::vec(arb_sp_promotion(), 0..10), total_allocation in any::().prop_map(Decimal::from) ) { let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from([(0, dc_session)]), - ServiceProviderFunds::from([(0, fund_bps)]), - ServiceProviderPromotions::from(shares), + ServiceProviderPromotions::from(promotions), total_allocation, dec!(0.00001), epoch() @@ -705,18 +623,17 @@ mod tests { assert!(allocated <= total_allocation); } + #[test] fn multiple_provider_does_not_overallocate( dc_sessions in prop::collection::vec(arb_dc_session(), 0..10), - funds in prop::collection::vec(arb_fund(), 0..10), - promotions in prop::collection::vec(arb_share(), 0..100), + promotions in prop::collection::vec(arb_sp_promotion(), 0..10), ) { let epoch = epoch(); let total_allocation = service_provider::get_scheduled_tokens(&epoch); let sp_infos = ServiceProviderRewardInfos::new( ServiceProviderDCSessions::from(dc_sessions), - ServiceProviderFunds::from(funds), ServiceProviderPromotions::from(promotions), total_allocation, dec!(0.00001), @@ -741,6 +658,46 @@ mod tests { } + trait RewardExt { + fn promotion_reward(self) -> proto::PromotionReward; + fn sp_reward(self) -> proto::ServiceProviderReward; + } + + impl RewardExt for proto::MobileRewardShare { + fn promotion_reward(self) -> proto::PromotionReward { + match self.reward { + Some(proto::Reward::PromotionReward(promo)) => promo.clone(), + other => panic!("expected promotion reward, got {other:?}"), + } + } + + fn sp_reward(self) -> proto::ServiceProviderReward { + match self.reward { + Some(proto::Reward::ServiceProviderReward(promo)) => promo.clone(), + other => panic!("expected sp reward, got {other:?}"), + } + } + } + + trait PromoRewardFiltersExt { + fn only_promotion_rewards(&self) -> Vec; + } + + impl PromoRewardFiltersExt for Vec<(u64, MobileRewardShare)> { + fn only_promotion_rewards(&self) -> Vec { + self.clone() + .into_iter() + .filter_map(|(_, r)| { + if let Some(proto::Reward::PromotionReward(reward)) = r.reward { + Some(reward) + } else { + None + } + }) + .collect() + } + } + impl RewardInfo { fn total_percent(&self) -> Decimal { self.realized_dc_perc + self.realized_promo_perc + self.matched_promo_perc @@ -751,5 +708,54 @@ mod tests { fn total_percent(&self) -> Decimal { self.coll.iter().map(|x| x.total_percent()).sum() } + + fn iter_sp_rewards(&self, sp_id: i32) -> Vec { + let rewards_per_share = rewards_per_share( + self.all_transfer, + self.total_sp_allocation, + self.mobile_bone_price, + ); + let sp_rewards = self.total_sp_allocation * rewards_per_share; + + for info in self.coll.iter() { + if info.sp_id == sp_id { + let mut result = info.promo_rewards(sp_rewards, &self.reward_epoch); + result.push(info.carrier_reward(sp_rewards, &self.reward_epoch)); + return result.into_iter().map(|(_, x)| x).collect(); + } + } + vec![] + } + + fn single_sp_rewards( + &self, + sp_id: i32, + ) -> (proto::PromotionReward, proto::ServiceProviderReward) { + let binding = self.iter_sp_rewards(sp_id); + let mut rewards = binding.iter(); + + let promo = rewards.next().cloned().unwrap().promotion_reward(); + let sp = rewards.next().cloned().unwrap().sp_reward(); + + (promo, sp) + } + } + + fn make_test_promotion( + sp_id: i32, + entity: &str, + incentive_escrow_fund_bps: u32, + shares: u32, + ) -> helium_proto::ServiceProviderPromotions { + helium_proto::ServiceProviderPromotions { + service_provider: sp_id, + incentive_escrow_fund_bps, + promotions: vec![helium_proto::service_provider_promotions::Promotion { + entity: entity.to_string(), + start_ts: Utc::now().encode_timestamp_millis(), + end_ts: Utc::now().encode_timestamp_millis(), + shares, + }], + } } } diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index 0ad90d158..54a31d9dd 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -25,8 +25,6 @@ pub struct Settings { pub database: db_store::Settings, pub ingest: file_store::Settings, pub data_transfer_ingest: file_store::Settings, - /// S3 ingest reading Service Provider Promotion Funds - pub promotion_ingest: file_store::Settings, pub output: file_store::Settings, /// S3 bucket from which new data sets are downloaded for oracle boosting /// assignments diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 13d8b88c4..d1e73dd47 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -9,8 +9,9 @@ use helium_proto::services::{ mobile_config::NetworkKeyRole, poc_mobile::{ mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, - MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward, - RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, + MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, PromotionReward, + RadioReward, RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, + UnallocatedReward, }, }; use hex_assignments::{Assignment, HexAssignment, HexBoostData}; @@ -162,15 +163,15 @@ impl MockFileSinkReceiver { } } - // pub async fn receive_promotion_reward(&mut self) -> PromotionReward { - // match self.receive("receive_promotion_reward").await { - // Some(mobile_reward) => match mobile_reward.reward { - // Some(MobileReward::PromotionReward(r)) => r, - // _ => panic!("failed to get promotion reward"), - // }, - // None => panic!("failed to receive promotion reward"), - // } - // } + pub async fn receive_promotion_reward(&mut self) -> PromotionReward { + match self.receive("receive_promotion_reward").await { + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::PromotionReward(r)) => r, + _ => panic!("failed to get promotion reward"), + }, + None => panic!("failed to receive promotion reward"), + } + } pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { match self.receive("receive_unallocated_reward").await { diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index fdd3299d3..f118466c5 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -4,10 +4,11 @@ use std::string::ToString; use async_trait::async_trait; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use helium_proto::{ + service_provider_promotions::Promotion, services::poc_mobile::{ MobileRewardShare, ServiceProviderReward, UnallocatedReward, UnallocatedRewardType, }, - ServiceProvider, + ServiceProvider, ServiceProviderPromotions, }; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -15,7 +16,7 @@ use sqlx::{PgPool, Postgres, Transaction}; use crate::common::{self, MockFileSinkReceiver}; use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; -use mobile_verifier::{data_session, reward_shares, rewarder}; +use mobile_verifier::{data_session, reward_shares, rewarder, service_provider}; const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; const HOTSPOT_2: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; @@ -28,11 +29,19 @@ pub type ValidSpMap = HashMap; #[derive(Debug, Clone)] pub struct MockCarrierServiceClient { pub valid_sps: ValidSpMap, + pub promotions: Vec, } impl MockCarrierServiceClient { fn new(valid_sps: ValidSpMap) -> Self { - Self { valid_sps } + Self { + valid_sps, + promotions: vec![], + } + } + + fn with_promotions(self, promotions: Vec) -> Self { + Self { promotions, ..self } } } @@ -50,6 +59,12 @@ impl CarrierServiceVerifier for MockCarrierServiceClient { None => Err(ClientError::UnknownServiceProvider(pubkey.to_string())), } } + + async fn list_incentive_promotions( + &self, + ) -> Result, Self::Error> { + Ok(self.promotions.clone()) + } } #[sqlx::test] @@ -67,10 +82,13 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { seed_hotspot_data(epoch.end, &mut txn).await?; txn.commit().await?; + let dc_sessions = service_provider::get_dc_sessions(&pool, &carrier_client, &epoch).await?; + let sp_promotions = carrier_client.list_incentive_promotions().await?; + let (_, rewards) = tokio::join!( rewarder::reward_service_providers( - &pool, - &carrier_client, + dc_sessions, + sp_promotions.into(), &mobile_rewards_client, &epoch, dec!(0.0001), @@ -112,14 +130,13 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { } #[sqlx::test] -async fn test_service_provider_rewards_invalid_sp(pool: PgPool) -> anyhow::Result<()> { +async fn test_service_provider_rewards_halt_on_invalid_sp(pool: PgPool) -> anyhow::Result<()> { // only payer 1 has a corresponding SP key // data sessions from payer 2 will result in an error, halting rewards let mut valid_sps = HashMap::::new(); valid_sps.insert(PAYER_1.to_string(), SP_1.to_string()); let carrier_client = MockCarrierServiceClient::new(valid_sps); - let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; @@ -127,127 +144,114 @@ async fn test_service_provider_rewards_invalid_sp(pool: PgPool) -> anyhow::Resul seed_hotspot_data_invalid_sp(epoch.end, &mut txn).await?; txn.commit().await.expect("db txn failed"); - let resp = rewarder::reward_service_providers( - &pool.clone(), - &carrier_client.clone(), - &mobile_rewards_client, - &epoch, - dec!(0.0001), - ) - .await; + let dc_sessions = service_provider::get_dc_sessions(&pool, &carrier_client, &epoch).await; assert_eq!( - resp.unwrap_err().to_string(), - "unknown service provider ".to_string() + PAYER_2 + dc_sessions.unwrap_err().to_string(), + format!("unknown service provider {PAYER_2}") ); + // This is where rewarding would happen if we could properly fetch dc_sessions - // confirm we get no msgs as rewards halted - mobile_rewards.assert_no_messages(); Ok(()) } -// #[sqlx::test] -// async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result<()> { -// // Single SP has allocated shares for a few of their subscribers. -// // Rewards are matched by the unallocated SP rewards for the subscribers - -// let valid_sps = HashMap::from_iter([(PAYER_1.to_string(), SP_1.to_string())]); -// let carrier_client = MockCarrierServiceClient::new(valid_sps); - -// let now = Utc::now(); -// let epoch = (now - ChronoDuration::hours(24))..now; -// let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); - -// let mut txn = pool.begin().await?; -// seed_hotspot_data(epoch.end, &mut txn).await?; // DC transferred == 6,000 reward amount -// seed_sp_promotion_rewards_with_random_subscribers( -// PAYER_1.to_string().parse().unwrap(), -// &[1, 2, 3], -// &mut txn, -// ) -// .await?; -// // promotions allocated 10.00% -// seed_sp_promotion_rewards_funds(&[(0, 1500)], &mut txn).await?; -// txn.commit().await?; - -// let (_, rewards) = tokio::join!( -// rewarder::reward_service_providers( -// &pool, -// &carrier_client, -// &mobile_rewards_client, -// &epoch, -// dec!(0.00001) -// ), -// async move { -// let mut promos = vec![ -// mobile_rewards.receive_promotion_reward().await, -// mobile_rewards.receive_promotion_reward().await, -// mobile_rewards.receive_promotion_reward().await, -// ]; -// // sort by awarded amount least -> most -// promos.sort_by_key(|a| a.service_provider_amount); - -// let sp_reward = mobile_rewards.receive_service_provider_reward().await; -// let unallocated = mobile_rewards.receive_unallocated_reward().await; - -// mobile_rewards.assert_no_messages(); - -// (promos, sp_reward, unallocated) -// } -// ); - -// let (promos, sp_reward, unallocated) = rewards; -// let promo_reward_1 = promos[0].clone(); -// let promo_reward_2 = promos[1].clone(); -// let promo_reward_3 = promos[2].clone(); - -// // 1 share -// assert_eq!(promo_reward_1.service_provider_amount, 1_500); -// assert_eq!(promo_reward_1.matched_amount, 1_500); - -// // 2 shares -// assert_eq!(promo_reward_2.service_provider_amount, 3_000); -// assert_eq!(promo_reward_2.matched_amount, 3_000); - -// // 3 shares -// assert_eq!(promo_reward_3.service_provider_amount, 4_500); -// assert_eq!(promo_reward_3.matched_amount, 4_500); - -// // dc_percentage * total_sp_allocation rounded down -// assert_eq!(sp_reward.amount, 50_999); - -// let unallocated_sp_rewards = get_unallocated_sp_rewards(&epoch); -// let expected_unallocated = unallocated_sp_rewards -// - 50_999 // 85% service provider rewards rounded down -// - 9_000 // 15% service provider promotions -// - 9_000; // matched promotion - -// assert_eq!(unallocated.amount, expected_unallocated); - -// // Ensure the cleanup job can run -// let mut txn = pool.begin().await?; - -// service_provider::db::clear_promotion_rewards(&mut txn, &Utc::now()).await?; -// txn.commit().await?; - -// let promos = service_provider::db::fetch_promotion_rewards( -// &pool, -// &carrier_client, -// &(epoch.start..Utc::now()), -// ) -// .await?; -// assert!(promos.is_empty()); - -// let sp_allocations = service_provider::reward_data_sp_allocations(&pool).await?; -// assert_eq!( -// vec![helium_proto::ServiceProviderAllocation { -// service_provider: 0, -// incentive_escrow_fund_bps: 1500 -// }], -// sp_allocations -// ); - -// Ok(()) -// } +#[sqlx::test] +async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result<()> { + // Single SP has allocated shares for a few of their subscribers. + // Rewards are matched by the unallocated SP rewards for the subscribers + + let valid_sps = HashMap::from_iter([(PAYER_1.to_string(), SP_1.to_string())]); + // promotions allocated 15.00% + let carrier_client = + MockCarrierServiceClient::new(valid_sps).with_promotions(vec![ServiceProviderPromotions { + service_provider: 0, + incentive_escrow_fund_bps: 1500, + promotions: vec![ + Promotion { + entity: "one".to_string(), + shares: 1, + ..Default::default() + }, + Promotion { + entity: "two".to_string(), + shares: 2, + ..Default::default() + }, + Promotion { + entity: "three".to_string(), + shares: 3, + ..Default::default() + }, + ], + }]); + + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + + let mut txn = pool.begin().await?; + seed_hotspot_data(epoch.end, &mut txn).await?; // DC transferred == 6,000 reward amount + + txn.commit().await?; + + let dc_sessions = service_provider::get_dc_sessions(&pool, &carrier_client, &epoch).await?; + let sp_promotions = carrier_client.list_incentive_promotions().await?; + + let (_, rewards) = tokio::join!( + rewarder::reward_service_providers( + dc_sessions, + sp_promotions.into(), + &mobile_rewards_client, + &epoch, + dec!(0.00001) + ), + async move { + let mut promos = vec![ + mobile_rewards.receive_promotion_reward().await, + mobile_rewards.receive_promotion_reward().await, + mobile_rewards.receive_promotion_reward().await, + ]; + // sort by awarded amount least -> most + promos.sort_by_key(|a| a.service_provider_amount); + + let sp_reward = mobile_rewards.receive_service_provider_reward().await; + let unallocated = mobile_rewards.receive_unallocated_reward().await; + + mobile_rewards.assert_no_messages(); + + (promos, sp_reward, unallocated) + } + ); + + let (promos, sp_reward, unallocated) = rewards; + let promo_reward_1 = promos[0].clone(); + let promo_reward_2 = promos[1].clone(); + let promo_reward_3 = promos[2].clone(); + + // 1 share + assert_eq!(promo_reward_1.service_provider_amount, 1_500); + assert_eq!(promo_reward_1.matched_amount, 1_500); + + // 2 shares + assert_eq!(promo_reward_2.service_provider_amount, 3_000); + assert_eq!(promo_reward_2.matched_amount, 3_000); + + // 3 shares + assert_eq!(promo_reward_3.service_provider_amount, 4_500); + assert_eq!(promo_reward_3.matched_amount, 4_500); + + // dc_percentage * total_sp_allocation rounded down + assert_eq!(sp_reward.amount, 50_999); + + let unallocated_sp_rewards = get_unallocated_sp_rewards(&epoch); + let expected_unallocated = unallocated_sp_rewards + - 50_999 // 85% service provider rewards rounded down + - 9_000 // 15% service provider promotions + - 9_000; // matched promotion + + assert_eq!(unallocated.amount, expected_unallocated); + + Ok(()) +} async fn receive_expected_rewards( mobile_rewards: &mut MockFileSinkReceiver, @@ -321,47 +325,10 @@ async fn seed_hotspot_data_invalid_sp( Ok(()) } -// // Service Provider promotion rewards are verified during ingest. When you write -// // directly to the database, the assumption is the entity and the payer are -// // valid. -// async fn seed_sp_promotion_rewards_with_random_subscribers( -// payer: PublicKeyBinary, -// sub_shares: &[u64], -// txn: &mut Transaction<'_, Postgres>, -// ) -> anyhow::Result<()> { -// for &shares in sub_shares { -// save_promotion_reward( -// txn, -// &PromotionReward { -// entity: promotion_reward::Entity::SubscriberId(Uuid::new_v4().into()), -// shares, -// timestamp: Utc::now() - chrono::Duration::hours(2), -// received_timestamp: Utc::now(), -// carrier_pub_key: payer.clone(), -// signature: vec![], -// }, -// ) -// .await?; -// } - -// Ok(()) -// } - -// async fn seed_sp_promotion_rewards_funds( -// sp_fund_allocations: &[(ServiceProviderId, u16)], -// txn: &mut Transaction<'_, Postgres>, -// ) -> anyhow::Result<()> { -// for (sp_id, basis_points) in sp_fund_allocations { -// save_promotion_fund(txn, *sp_id, *basis_points).await?; -// } - -// Ok(()) -// } - -// // Helper for turning Decimal -> u64 to compare against output rewards -// fn get_unallocated_sp_rewards(epoch: &std::ops::Range>) -> u64 { -// reward_shares::get_scheduled_tokens_for_service_providers(epoch.end - epoch.start) -// .round_dp_with_strategy(0, RoundingStrategy::ToZero) -// .to_u64() -// .unwrap_or(0) -// } +// Helper for turning Decimal -> u64 to compare against output rewards +fn get_unallocated_sp_rewards(epoch: &std::ops::Range>) -> u64 { + reward_shares::get_scheduled_tokens_for_service_providers(epoch.end - epoch.start) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0) +}