diff --git a/Cargo.lock b/Cargo.lock index 35dc33d05..7f0a05749 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,7 +1611,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support#f7b68c095f86afdf904f6a38db8af03af99c9a86" +source = "git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support#a3a45986b1167f730874a6b81ed94f10f731ff38" dependencies = [ "base64 0.21.7", "byteorder", @@ -1621,7 +1621,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -3817,7 +3817,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support#f7b68c095f86afdf904f6a38db8af03af99c9a86" +source = "git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support#a3a45986b1167f730874a6b81ed94f10f731ff38" dependencies = [ "bytes", "prost", @@ -4373,6 +4373,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", + "helium-lib", "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support)", "hextree", "http 0.2.11", @@ -4385,6 +4386,8 @@ dependencies = [ "prost", "rand 0.8.5", "retainer", + "rust_decimal", + "rust_decimal_macros", "serde", "serde_json", "sqlx", @@ -4457,6 +4460,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", + "helium-lib", "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=andymck%2Fsub-dao-epoch-support)", "http-serde", "humantime-serde", @@ -6078,7 +6082,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.0", "itertools", "log", "multimap", @@ -10003,7 +10007,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/boost_manager/src/updater.rs b/boost_manager/src/updater.rs index 22eafda50..1b5be1310 100644 --- a/boost_manager/src/updater.rs +++ b/boost_manager/src/updater.rs @@ -173,7 +173,7 @@ where Ok(()) } - async fn confirm_txn<'a>(&self, txn_row: &TxnRow) -> Result<()> { + async fn confirm_txn(&self, txn_row: &TxnRow) -> Result<()> { if self.solana.confirm_transaction(&txn_row.txn_id).await? { tracing::info!("txn_id {} confirmed on chain, updated db", txn_row.txn_id); db::update_verified_txns_onchain(&self.pool, &txn_row.txn_id).await? diff --git a/file_store/src/cli/dump.rs b/file_store/src/cli/dump.rs index 652969307..4feac31bb 100644 --- a/file_store/src/cli/dump.rs +++ b/file_store/src/cli/dump.rs @@ -8,6 +8,7 @@ use crate::{ mobile_radio_threshold::VerifiedRadioThresholdIngestReport, mobile_session::{DataTransferSessionIngestReport, InvalidDataTransferIngestReport}, mobile_subscriber::{SubscriberLocationIngestReport, VerifiedSubscriberLocationIngestReport}, + reward_manifest::RewardManifest, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, traits::{MsgDecode, TimestampDecode}, unique_connections::UniqueConnectionReq, @@ -23,12 +24,13 @@ use helium_proto::{ services::{ packet_verifier::ValidDataTransferSession as ValidDataTransferSessionProto, poc_lora::{ + iot_reward_share::Reward as IotReward, IotRewardShare as IotRewardShareProto, LoraBeaconIngestReportV1, LoraInvalidWitnessReportV1, LoraPocV1, LoraWitnessIngestReportV1, }, poc_mobile::{ - mobile_reward_share::Reward, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, - CoverageObjectV1, Heartbeat, HexUsageStatsIngestReportV1, + mobile_reward_share::Reward as MobileReward, CellHeartbeatIngestReportV1, + CellHeartbeatReqV1, CoverageObjectV1, Heartbeat, HexUsageStatsIngestReportV1, InvalidDataTransferIngestReportV1, MobileRewardShare, OracleBoostingReportV1, RadioRewardShare, RadioUsageStatsIngestReportV1, SpeedtestAvg, SpeedtestIngestReportV1, SpeedtestReqV1, UniqueConnectionsIngestReportV1, @@ -38,7 +40,7 @@ use helium_proto::{ router::PacketRouterPacketReportV1, }, BlockchainTxn, BoostedHexUpdateV1 as BoostedHexUpdateProto, Message, PriceReportV1, - RewardManifest, SubnetworkRewards, + RewardManifest as RewardManifestProto, SubnetworkRewards, }; use serde_json::json; use std::io; @@ -255,29 +257,51 @@ impl Cmd { "validity": heartbeat.validity, }))?; } + FileType::IotRewardShare => { + let reward = IotRewardShareProto::decode(msg)?; + match reward.reward { + Some(IotReward::GatewayReward(reward)) => print_json(&json!({ + "type": "gateway_reward", + "hotspot_key": PublicKey::try_from(reward.hotspot_key)?, + "dc_transfer_amount": reward.dc_transfer_amount, + "beacon_amount": reward.beacon_amount, + "witness_amount": reward.witness_amount, + }))?, + Some(IotReward::OperationalReward(reward)) => print_json(&json!({ + "type": "operational_reward", + "amount": reward.amount, + }))?, + Some(IotReward::UnallocatedReward(reward)) => print_json(&json!({ + "type": "unallocated_reward", + "unallocated_reward_type": reward.reward_type, + "amount": reward.amount, + }))?, + _ => (), + } + } FileType::MobileRewardShare => { let reward = MobileRewardShare::decode(msg)?; match reward.reward { - Some(Reward::GatewayReward(reward)) => print_json(&json!({ + Some(MobileReward::GatewayReward(reward)) => print_json(&json!({ "hotspot_key": PublicKey::try_from(reward.hotspot_key)?, "dc_transfer_reward": reward.dc_transfer_reward, }))?, - Some(Reward::RadioReward(reward)) => print_json(&json!({ + Some(MobileReward::RadioReward(reward)) => print_json(&json!({ "hotspot_key": PublicKey::try_from(reward.hotspot_key)?, "cbsd_id": reward.cbsd_id, "poc_reward": reward.poc_reward, "boosted_hexes": reward.boosted_hexes, }))?, - Some(Reward::SubscriberReward(reward)) => print_json(&json!({ + Some(MobileReward::SubscriberReward(reward)) => print_json(&json!({ "subscriber_id": reward.subscriber_id, "discovery_location_amount": reward.discovery_location_amount, "verification_mapping_amount": reward.verification_mapping_amount, }))?, - Some(Reward::ServiceProviderReward(reward)) => print_json(&json!({ + Some(MobileReward::ServiceProviderReward(reward)) => print_json(&json!({ "service_provider": reward.service_provider_id, "amount": reward.amount, }))?, - Some(Reward::UnallocatedReward(reward)) => print_json(&json!({ + Some(MobileReward::UnallocatedReward(reward)) => print_json(&json!({ "unallocated_reward_type": reward.reward_type, "amount": reward.amount, }))?, @@ -296,13 +320,9 @@ impl Cmd { }))?; } FileType::RewardManifest => { - let manifest = RewardManifest::decode(msg)?; - print_json(&json!({ - "written_files": manifest.written_files, - "start_timestamp": manifest.start_timestamp, - "end_timestamp": manifest.end_timestamp, - "reward_data": manifest.reward_data.unwrap() - }))?; + let manifest = RewardManifestProto::decode(msg)?; + let report = RewardManifest::try_from(manifest)?; + print_json(&report)?; } FileType::SignedPocReceiptTxn => { // This just outputs a binary of the txns instead of the typical decode. diff --git a/file_store/src/error.rs b/file_store/src/error.rs index 3083357cf..156219657 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -79,6 +79,8 @@ pub enum DecodeError { UnsupportedPacketType(String, i32), #[error("file stream try decode error: {0}")] FileStreamTryDecode(String), + #[error("unsupported token type {0}")] + UnsupportedTokenType(String, i32), } #[derive(Error, Debug)] @@ -174,6 +176,10 @@ impl DecodeError { pub const fn empty_field(field: &'static str) -> Error { Error::Decode(Self::EmptyField(field)) } + + pub fn unsupported_token_type(msg1: E, msg2: i32) -> Error { + Error::Decode(Self::UnsupportedTokenType(msg1.to_string(), msg2)) + } } impl From for Error { diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 4371c276e..67a553a10 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -141,19 +141,19 @@ pub const SPEEDTEST_AVG: &str = "speedtest_avg"; pub const VALIDATED_HEARTBEAT: &str = "validated_heartbeat"; pub const SIGNED_POC_RECEIPT_TXN: &str = "signed_poc_receipt_txn"; pub const RADIO_REWARD_SHARE: &str = "radio_reward_share"; -pub const REWARD_MANIFEST: &str = "reward_manifest"; +pub const REWARD_MANIFEST: &str = "network_reward_manifest_v1"; pub const IOT_PACKET_REPORT: &str = "packetreport"; pub const IOT_VALID_PACKET: &str = "iot_valid_packet"; pub const INVALID_PACKET: &str = "invalid_packet"; pub const NON_REWARDABLE_PACKET: &str = "non_rewardable_packet"; -pub const IOT_REWARD_SHARE: &str = "iot_reward_share"; +pub const IOT_REWARD_SHARE: &str = "iot_network_reward_shares_v1"; pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ingest_report"; pub const INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "invalid_data_transfer_session_ingest_report"; pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; pub const VERIFIED_DATA_TRANSFER_SESSION: &str = "verified_data_transfer_session"; pub const PRICE_REPORT: &str = "price_report"; -pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share"; +pub const MOBILE_REWARD_SHARE: &str = "mobile_network_reward_shares_v1"; pub const MAPPER_MSG: &str = "mapper_msg"; pub const COVERAGE_OBJECT: &str = "coverage_object"; pub const COVERAGE_OBJECT_INGEST_REPORT: &str = "coverage_object_ingest_report"; diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 771754e90..d2ee9d9ef 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -676,7 +676,7 @@ mod tests { .file_name() .to_str() .and_then(|file_name| FileInfo::from_str(file_name).ok()) - .map_or(false, |file_info| { + .is_some_and(|file_info| { FileType::from_str(&file_info.prefix).expect("entropy report prefix") == FileType::EntropyReport }) diff --git a/file_store/src/reward_manifest.rs b/file_store/src/reward_manifest.rs index 6942da221..8dab95160 100644 --- a/file_store/src/reward_manifest.rs +++ b/file_store/src/reward_manifest.rs @@ -1,26 +1,32 @@ use crate::{error::DecodeError, traits::MsgDecode, Error}; use chrono::{DateTime, TimeZone, Utc}; use helium_proto as proto; +use helium_proto::{IotRewardToken, MobileRewardToken}; use rust_decimal::Decimal; +use serde::Serialize; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct RewardManifest { pub written_files: Vec, pub start_timestamp: DateTime, pub end_timestamp: DateTime, pub reward_data: Option, + pub epoch: u64, + pub price: u64, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub enum RewardData { MobileRewardData { poc_bones_per_reward_share: Decimal, boosted_poc_bones_per_reward_share: Decimal, + token: MobileRewardToken, }, IotRewardData { poc_bones_per_beacon_reward_share: Decimal, poc_bones_per_witness_reward_share: Decimal, dc_bones_per_share: Decimal, + token: IotRewardToken, }, } @@ -46,8 +52,16 @@ impl TryFrom for RewardManifest { .ok_or(Error::Decode(DecodeError::InvalidTimestamp( value.end_timestamp, )))?, + epoch: value.epoch, + price: value.price, reward_data: match value.reward_data { Some(proto::reward_manifest::RewardData::MobileRewardData(reward_data)) => { + let token = MobileRewardToken::try_from(reward_data.token).map_err(|_| { + DecodeError::unsupported_token_type( + "mobile_reward_manifest", + reward_data.token, + ) + })?; Some(RewardData::MobileRewardData { poc_bones_per_reward_share: reward_data .poc_bones_per_reward_share @@ -63,9 +77,16 @@ impl TryFrom for RewardManifest { .value .parse() .map_err(DecodeError::from)?, + token, }) } Some(proto::reward_manifest::RewardData::IotRewardData(reward_data)) => { + let token = IotRewardToken::try_from(reward_data.token).map_err(|_| { + DecodeError::unsupported_token_type( + "iot_reward_manifest", + reward_data.token, + ) + })?; Some(RewardData::IotRewardData { poc_bones_per_beacon_reward_share: reward_data .poc_bones_per_beacon_reward_share @@ -89,6 +110,7 @@ impl TryFrom for RewardManifest { .value .parse() .map_err(DecodeError::from)?, + token, }) } None => None, diff --git a/iot_config/Cargo.toml b/iot_config/Cargo.toml index cce124e9f..a16da50c9 100644 --- a/iot_config/Cargo.toml +++ b/iot_config/Cargo.toml @@ -15,10 +15,13 @@ chrono = { workspace = true } clap = { workspace = true } config = { workspace = true } db-store = { path = "../db_store" } +rust_decimal = { workspace = true, features = ["maths"] } +rust_decimal_macros = { workspace = true } file-store = { path = "../file_store" } futures = { workspace = true } futures-util = { workspace = true } helium-crypto = { workspace = true, features = ["sqlx-postgres"] } +helium-lib = { workspace = true } helium-proto = { workspace = true } hextree = { workspace = true } http = { workspace = true } diff --git a/iot_config/src/client/mod.rs b/iot_config/src/client/mod.rs index 1fcefb498..e81425b89 100644 --- a/iot_config/src/client/mod.rs +++ b/iot_config/src/client/mod.rs @@ -10,6 +10,7 @@ use std::{sync::Arc, time::Duration}; pub mod org_client; mod settings; +pub mod sub_dao_client; pub use org_client::OrgClient; pub use settings::Settings; @@ -24,6 +25,8 @@ pub enum ClientError { Verification(#[from] file_store::Error), #[error("error resolving region params: {0}")] UndefinedRegionParams(String), + #[error("Invalid SubDaoRewardInfo proto response {0}")] + InvalidSubDaoRewardInfoProto(#[from] SubDaoRewardInfoParseError), } #[async_trait::async_trait] @@ -77,6 +80,7 @@ macro_rules! call_with_retry { }}; } +use crate::sub_dao_epoch_reward_info::SubDaoRewardInfoParseError; pub(crate) use call_with_retry; impl Client { diff --git a/iot_config/src/client/sub_dao_client.rs b/iot_config/src/client/sub_dao_client.rs new file mode 100644 index 000000000..57254d82b --- /dev/null +++ b/iot_config/src/client/sub_dao_client.rs @@ -0,0 +1,80 @@ +use super::{call_with_retry, ClientError, Settings}; +use crate::sub_dao_epoch_reward_info::EpochRewardInfo; +use file_store::traits::MsgVerify; +use helium_crypto::{Keypair, PublicKey, Sign}; +use helium_proto::{ + services::{ + sub_dao::{self, SubDaoEpochRewardInfoReqV1}, + Channel, + }, + Message, +}; +use std::{error::Error, sync::Arc, time::Duration}; +use tonic::transport::Endpoint; + +#[derive(Clone)] +pub struct SubDaoClient { + pub client: sub_dao::sub_dao_client::SubDaoClient, + signing_key: Arc, + config_pubkey: PublicKey, +} + +impl SubDaoClient { + pub fn from_settings(settings: &Settings) -> Result> { + let channel = Endpoint::from(settings.url.clone()) + .connect_timeout(Duration::from_secs(settings.connect_timeout)) + .timeout(Duration::from_secs(settings.rpc_timeout)) + .connect_lazy(); + Ok(Self { + client: sub_dao::sub_dao_client::SubDaoClient::new(channel), + signing_key: settings.signing_keypair()?, + config_pubkey: settings.config_pubkey()?, + }) + } +} + +#[async_trait::async_trait] +pub trait SubDaoEpochRewardInfoResolver: Clone + Send + Sync + 'static { + type Error: Error + Send + Sync + 'static; + + async fn resolve_info( + &self, + sub_dao: &str, + epoch: u64, + ) -> Result, Self::Error>; +} + +#[async_trait::async_trait] +impl SubDaoEpochRewardInfoResolver for SubDaoClient { + type Error = ClientError; + + async fn resolve_info( + &self, + sub_dao: &str, + epoch: u64, + ) -> Result, Self::Error> { + let mut request = SubDaoEpochRewardInfoReqV1 { + sub_dao_address: sub_dao.to_string(), + epoch, + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + request.signature = self.signing_key.sign(&request.encode_to_vec())?; + tracing::debug!( + subdao = sub_dao.to_string(), + epoch = epoch, + "fetching subdao epoch info" + ); + let response = match call_with_retry!(self.client.clone().info(request.clone())) { + Ok(info_res) => { + let response = info_res.into_inner(); + response.verify(&self.config_pubkey)?; + response.info.map(EpochRewardInfo::try_from).transpose()? + } + Err(status) if status.code() == tonic::Code::NotFound => None, + Err(status) => Err(status)?, + }; + tracing::debug!(?response, "fetched subdao epoch info"); + Ok(response) + } +} diff --git a/iot_config/src/lib.rs b/iot_config/src/lib.rs index fea359fea..dadc4531d 100644 --- a/iot_config/src/lib.rs +++ b/iot_config/src/lib.rs @@ -14,12 +14,17 @@ pub mod route_service; pub mod settings; pub mod telemetry; +pub mod sub_dao_epoch_reward_info; +pub mod sub_dao_service; + pub use admin_service::AdminService; +use chrono::{DateTime, Duration, Utc}; pub use client::{Client, Settings as ClientSettings}; pub use gateway_service::GatewayService; pub use org_service::OrgService; pub use route_service::RouteService; pub use settings::Settings; +use std::ops::Range; use helium_crypto::PublicKey; use tokio::sync::broadcast; @@ -60,3 +65,19 @@ pub fn verify_public_key(bytes: &[u8]) -> Result { PublicKey::try_from(bytes) .map_err(|_| Status::invalid_argument(format!("invalid public key: {bytes:?}"))) } + +pub struct EpochInfo { + pub period: Range>, +} + +impl From for EpochInfo { + fn from(next_reward_epoch: u64) -> Self { + let start_time = DateTime::::UNIX_EPOCH + + Duration::days(next_reward_epoch as i64) + + Duration::hours(1); + let end_time = start_time + Duration::days(1); + EpochInfo { + period: start_time..end_time, + } + } +} diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 308652ea2..02d6a1d7e 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -2,7 +2,11 @@ use anyhow::{Error, Result}; use clap::Parser; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; -use helium_proto::services::iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer}; +use helium_proto::services::{ + iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer}, + sub_dao::SubDaoServer, +}; +use iot_config::sub_dao_service::SubDaoService; use iot_config::{ admin::AuthCache, admin_service::AdminService, db_cleaner::DbCleaner, gateway_service::GatewayService, org, org_service::OrgService, region_map::RegionMapReader, @@ -72,7 +76,7 @@ impl Daemon { let gateway_svc = GatewayService::new( settings, - metadata_pool, + metadata_pool.clone(), region_map.clone(), auth_cache.clone(), delegate_key_cache, @@ -98,6 +102,8 @@ impl Daemon { region_updater, )?; + let subdao_svc = SubDaoService::new(settings, auth_cache, metadata_pool)?; + let listen_addr = settings.listen; let pubkey = settings .signing_keypair() @@ -111,6 +117,7 @@ impl Daemon { route_svc, org_svc, admin_svc, + subdao_svc, }; let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention); @@ -130,6 +137,7 @@ pub struct GrpcServer { route_svc: RouteService, org_svc: OrgService, admin_svc: AdminService, + subdao_svc: SubDaoService, } impl ManagedTask for GrpcServer { @@ -146,6 +154,7 @@ impl ManagedTask for GrpcServer { .add_service(OrgServer::new(self.org_svc)) .add_service(RouteServer::new(self.route_svc)) .add_service(AdminServer::new(self.admin_svc)) + .add_service(SubDaoServer::new(self.subdao_svc)) .serve(self.listen_addr) .map_err(Error::from); diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 349933271..ec5c78e58 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -63,11 +63,11 @@ impl RouteService { self.update_channel.clone() } - async fn verify_request_signature<'a, R>( + async fn verify_request_signature( &self, signer: &PublicKey, request: &R, - id: OrgId<'a>, + id: OrgId<'_>, ) -> Result<(), Status> where R: MsgVerify, @@ -117,11 +117,11 @@ impl RouteService { } } - async fn verify_request_signature_or_stream<'a, R>( + async fn verify_request_signature_or_stream( &self, signer: &PublicKey, request: &R, - id: OrgId<'a>, + id: OrgId<'_>, ) -> Result<(), Status> where R: MsgVerify, @@ -151,9 +151,9 @@ impl RouteService { DevAddrEuiValidator::new(route_id, admin_keys, &self.pool, check_constraints).await } - async fn validate_skf_devaddrs<'a>( + async fn validate_skf_devaddrs( &self, - route_id: &'a str, + route_id: &'_ str, updates: &[route_skf_update_req_v1::RouteSkfUpdateV1], ) -> Result<(), Status> { let ranges: Vec = route::list_devaddr_ranges_for_route(route_id, &self.pool) diff --git a/iot_config/src/sub_dao_epoch_reward_info.rs b/iot_config/src/sub_dao_epoch_reward_info.rs new file mode 100644 index 000000000..7c9c9e612 --- /dev/null +++ b/iot_config/src/sub_dao_epoch_reward_info.rs @@ -0,0 +1,126 @@ +use crate::EpochInfo; +use chrono::{DateTime, Utc}; +use file_store::traits::{TimestampDecode, TimestampEncode}; +use helium_proto::services::sub_dao::SubDaoEpochRewardInfo as SubDaoEpochRewardInfoProto; +use rust_decimal::prelude::*; +use std::ops::Range; + +#[derive(Clone, Debug)] +pub struct EpochRewardInfo { + pub epoch_day: u64, + pub epoch_address: String, + pub sub_dao_address: String, + pub epoch_period: Range>, + pub epoch_emissions: Decimal, + pub rewards_issued_at: DateTime, +} + +#[derive(Clone, Debug)] +pub struct RawSubDaoEpochRewardInfo { + epoch: u64, + epoch_address: String, + sub_dao_address: String, + hnt_rewards_issued: u64, + delegation_rewards_issued: u64, + rewards_issued_at: DateTime, +} + +#[derive(thiserror::Error, Debug)] +pub enum SubDaoRewardInfoParseError { + #[error("file_store: {0}")] + FileStore(#[from] file_store::Error), +} + +impl From for SubDaoEpochRewardInfoProto { + fn from(info: RawSubDaoEpochRewardInfo) -> Self { + Self { + epoch: info.epoch, + epoch_address: info.epoch_address, + sub_dao_address: info.sub_dao_address, + hnt_rewards_issued: info.hnt_rewards_issued, + delegation_rewards_issued: info.delegation_rewards_issued, + rewards_issued_at: info.rewards_issued_at.encode_timestamp(), + } + } +} + +impl TryFrom for EpochRewardInfo { + type Error = SubDaoRewardInfoParseError; + + fn try_from(info: SubDaoEpochRewardInfoProto) -> Result { + let epoch_period: EpochInfo = info.epoch.into(); + let epoch_rewards = Decimal::from(info.hnt_rewards_issued + info.delegation_rewards_issued); + + Ok(Self { + epoch_day: info.epoch, + epoch_address: info.epoch_address, + sub_dao_address: info.sub_dao_address, + epoch_period: epoch_period.period, + epoch_emissions: epoch_rewards, + rewards_issued_at: info.rewards_issued_at.to_timestamp()?, + }) + } +} + +pub(crate) mod db { + use crate::sub_dao_epoch_reward_info::RawSubDaoEpochRewardInfo; + use chrono::{DateTime, Utc}; + use file_store::traits::TimestampDecode; + use sqlx::postgres::PgRow; + use sqlx::{FromRow, PgExecutor, Row}; + + const GET_EPOCH_REWARD_INFO_SQL: &str = r#" + SELECT + address AS epoch_address, + sub_dao AS sub_dao_address, + epoch::BIGINT, + delegation_rewards_issued::BIGINT as hnt_rewards_issued, + delegation_rewards_issued::BIGINT, + rewards_issued_at::BIGINT + FROM sub_dao_epoch_infos + WHERE epoch = $1 AND sub_dao = $2 + "#; + + pub async fn get_info( + db: impl PgExecutor<'_>, + epoch: u64, + sub_dao: &str, + ) -> anyhow::Result> { + let mut query: sqlx::QueryBuilder = + sqlx::QueryBuilder::new(GET_EPOCH_REWARD_INFO_SQL); + let res = query + .build_query_as::() + .bind(epoch as i64) + .bind(sub_dao) + .fetch_optional(db) + .await; + tracing::info!("get_info: {:?}", res); + Ok(res?) + } + + impl FromRow<'_, PgRow> for RawSubDaoEpochRewardInfo { + fn from_row(row: &PgRow) -> sqlx::Result { + let rewards_issued_at: DateTime = (row.try_get::("rewards_issued_at")? + as u64) + .to_timestamp() + .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; + + let hnt_rewards_issued = row.get::("hnt_rewards_issued") as u64; + if hnt_rewards_issued == 0 { + return Err(sqlx::Error::Decode(Box::new(sqlx::Error::Decode( + Box::from("hnt_rewards_issued is 0"), + )))); + }; + + Ok(Self { + epoch: row.try_get::("epoch")? as u64, + epoch_address: row.try_get::("epoch_address")?, + sub_dao_address: row.try_get::("sub_dao_address")?, + hnt_rewards_issued, + delegation_rewards_issued: row.try_get::("delegation_rewards_issued")? + as u64, + rewards_issued_at, + }) + } + } +} diff --git a/iot_config/src/sub_dao_service.rs b/iot_config/src/sub_dao_service.rs new file mode 100644 index 000000000..e7885bb76 --- /dev/null +++ b/iot_config/src/sub_dao_service.rs @@ -0,0 +1,102 @@ +use crate::{ + admin::AuthCache, sub_dao_epoch_reward_info, telemetry, verify_public_key, GrpcResult, Settings, +}; +use anyhow::Result; +use chrono::Utc; +use file_store::traits::{MsgVerify, TimestampEncode}; +use helium_crypto::{Keypair, PublicKey, Sign}; +use helium_proto::services::sub_dao::{ + self, SubDaoEpochRewardInfoReqV1, SubDaoEpochRewardInfoResV1, +}; +use helium_proto::Message; +use sqlx::{Pool, Postgres}; +use std::sync::Arc; +use tonic::{Request, Response, Status}; + +pub struct SubDaoService { + auth_cache: AuthCache, + metadata_pool: Pool, + signing_key: Arc, +} + +impl SubDaoService { + pub fn new( + settings: &Settings, + auth_cache: AuthCache, + metadata_pool: Pool, + ) -> Result { + Ok(Self { + auth_cache, + metadata_pool, + signing_key: Arc::new(settings.signing_keypair()?), + }) + } + + fn sign_response(&self, response: &[u8]) -> Result, Status> { + self.signing_key + .sign(response) + .map_err(|_| Status::internal("response signing error")) + } + + fn verify_request_signature(&self, signer: &PublicKey, request: &R) -> Result<(), Status> + where + R: MsgVerify, + { + self.auth_cache + .verify_signature(signer, request) + .map_err(|_| Status::permission_denied("invalid admin signature"))?; + Ok(()) + } + + fn verify_request_signature_for_info( + &self, + request: &SubDaoEpochRewardInfoReqV1, + ) -> std::result::Result<(), Status> { + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, request) + } +} + +#[tonic::async_trait] +impl sub_dao::sub_dao_server::SubDao for SubDaoService { + async fn info( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("sub_dao_reward_info", "info"); + custom_tracing::record("sub_dao", &request.sub_dao_address); + custom_tracing::record("epoch", request.epoch); + custom_tracing::record_b58("signer", &request.signer); + + self.verify_request_signature_for_info(&request)?; + + let epoch = request.epoch; + let sub_dao = request.sub_dao_address; + tracing::info!(sub_dao = %sub_dao, epoch = epoch, "fetching sub_dao epoch reward info"); + + sub_dao_epoch_reward_info::db::get_info(&self.metadata_pool, epoch, &sub_dao) + .await + .map_err(|e| { + tracing::error!(error = %e, "error fetching sub_dao epoch reward info"); + Status::internal("error fetching sub_dao epoch reward info") + })? + .map_or_else( + || { + telemetry::count_epoch_chain_lookup("not-found"); + Err(Status::not_found(epoch.to_string())) + }, + |info| { + let info = info.into(); + let mut res = SubDaoEpochRewardInfoResV1 { + info: Some(info), + timestamp: Utc::now().encode_timestamp(), + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + res.signature = self.sign_response(&res.encode_to_vec())?; + Ok(Response::new(res)) + }, + ) + } +} diff --git a/iot_config/src/telemetry.rs b/iot_config/src/telemetry.rs index d55a8adc5..9c6ce4c40 100644 --- a/iot_config/src/telemetry.rs +++ b/iot_config/src/telemetry.rs @@ -13,6 +13,8 @@ const GATEWAY_CHAIN_LOOKUP_METRIC: &str = const GATEWAY_CHAIN_LOOKUP_DURATION_METRIC: &str = concat!(env!("CARGO_PKG_NAME"), "-", "gateway-info-lookup-duration"); +const EPOCH_CHAIN_LOOKUP_METRIC: &str = concat!(env!("CARGO_PKG_NAME"), "-", "epoch-chain-lookup"); + pub fn initialize() { metrics::gauge!(STREAM_METRIC).set(0.0); } @@ -63,6 +65,10 @@ pub fn count_devaddr_updates(adds: usize, removes: usize) { metrics::counter!(DEVADDR_REMOVE_COUNT_METRIC).increment(removes as u64); } +pub fn count_epoch_chain_lookup(result: &'static str) { + metrics::counter!(EPOCH_CHAIN_LOOKUP_METRIC, "result" => result).increment(1); +} + pub fn route_stream_subscribe() { metrics::gauge!(STREAM_METRIC).increment(1.0); } diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index 0a8638415..716140a1a 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -29,6 +29,7 @@ futures = { workspace = true } futures-util = { workspace = true } prost = { workspace = true } chrono = { workspace = true } +helium-lib = { workspace = true } helium-proto = { workspace = true } helium-crypto = { workspace = true, features = ["sqlx-postgres"] } async-trait = { workspace = true } diff --git a/iot_verifier/src/lib.rs b/iot_verifier/src/lib.rs index 98e7e1440..48c44261c 100644 --- a/iot_verifier/src/lib.rs +++ b/iot_verifier/src/lib.rs @@ -20,4 +20,33 @@ mod settings; pub mod telemetry; pub mod tx_scaler; pub mod witness_updater; + +use helium_lib::keypair::Pubkey; +use rust_decimal::Decimal; pub use settings::Settings; + +#[derive(Clone, Debug)] +pub struct PriceInfo { + pub price_in_bones: u64, + pub price_per_token: Decimal, + pub price_per_bone: Decimal, + pub decimals: u8, +} + +impl PriceInfo { + pub fn new(price_in_bones: u64, decimals: u8) -> Self { + let price_per_token = + Decimal::from(price_in_bones) / Decimal::from(10_u64.pow(decimals as u32)); + let price_per_bone = price_per_token / Decimal::from(10_u64.pow(decimals as u32)); + Self { + price_in_bones, + price_per_token, + price_per_bone, + decimals, + } + } +} + +pub fn resolve_subdao_pubkey() -> Pubkey { + helium_lib::dao::SubDao::Iot.key() +} diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 4f4ee13d9..f9da511c9 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -16,6 +16,7 @@ use helium_proto::{ }, RewardManifest, }; +use iot_config::client::sub_dao_client::SubDaoClient; use iot_config::client::Client as IotConfigClient; use iot_verifier::{ entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader, @@ -81,6 +82,7 @@ impl Server { let store_base_path = path::Path::new(&settings.cache); let iot_config_client = IotConfigClient::from_settings(&settings.iot_config_client)?; + let sub_dao_rewards_client = SubDaoClient::from_settings(&settings.iot_config_client)?; // create the witness updater to handle serialization of last witness updates to db // also exposes a cache of the last witness updates @@ -139,14 +141,15 @@ impl Server { ) .await?; - let rewarder = Rewarder { - pool: pool.clone(), + let rewarder = Rewarder::new( + pool.clone(), rewards_sink, reward_manifests_sink, - reward_period_hours: settings.reward_period, - reward_offset: settings.reward_period_offset, + settings.reward_period, + settings.reward_period_offset, price_tracker, - }; + sub_dao_rewards_client, + )?; // * // setup entropy requirements diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index eeb7dfb4a..9df9812fc 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -1,5 +1,5 @@ -use crate::poc_report::ReportType as PocReportType; -use chrono::{DateTime, Duration, Utc}; +use crate::{poc_report::ReportType as PocReportType, PriceInfo}; +use chrono::{DateTime, Utc}; use file_store::{iot_packet::IotValidPacket, iot_valid_poc::IotPoc, traits::TimestampEncode}; use futures::stream::TryStreamExt; use helium_crypto::PublicKeyBinary; @@ -13,12 +13,7 @@ use std::{collections::HashMap, ops::Range}; const DEFAULT_PREC: u32 = 15; -// rewards in IoT Bones ( iot @ 10^6 ) per 24 hours based on emission curve year 1 -// TODO: expand to cover the full multi-year emission curve lazy_static! { - // TODO: year 1 emissions allocate 30% of total to PoC with 6% to beacons and 24% to witnesses but subsequent years back - // total PoC percentage off 1.5% each year; determine how beacons and witnesses will split the subsequent years' allocations - pub static ref REWARDS_PER_DAY: Decimal = (Decimal::from(32_500_000_000_u64) / Decimal::from(365)) * Decimal::from(1_000_000); // 88_797_814_207_650.273224043715847 static ref BEACON_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.06); static ref WITNESS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.24); // Data transfer is allocated 50% of daily rewards @@ -34,45 +29,28 @@ lazy_static! { static ref DC_USD_PRICE: Decimal = dec!(0.00001); } -pub fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { - ((tokens / Decimal::from(Duration::hours(24).num_seconds())) - * Decimal::from(duration.num_seconds())) - .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) -} - pub fn get_scheduled_poc_tokens( - duration: Duration, + epoch_emissions: Decimal, dc_transfer_remainder: Decimal, ) -> (Decimal, Decimal) { ( - get_tokens_by_duration(*REWARDS_PER_DAY * *BEACON_REWARDS_PER_DAY_PERCENT, duration) + epoch_emissions * *BEACON_REWARDS_PER_DAY_PERCENT + (dc_transfer_remainder * *BEACON_DC_REMAINER_PERCENT), - get_tokens_by_duration( - *REWARDS_PER_DAY * *WITNESS_REWARDS_PER_DAY_PERCENT, - duration, - ) + (dc_transfer_remainder * *WITNESS_DC_REMAINER_PERCENT), + epoch_emissions * *WITNESS_REWARDS_PER_DAY_PERCENT + + (dc_transfer_remainder * *WITNESS_DC_REMAINER_PERCENT), ) } -pub fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { - get_tokens_by_duration( - *REWARDS_PER_DAY * *DATA_TRANSFER_REWARDS_PER_DAY_PERCENT, - duration, - ) +pub fn get_scheduled_dc_tokens(epoch_emissions: Decimal) -> Decimal { + epoch_emissions * *DATA_TRANSFER_REWARDS_PER_DAY_PERCENT } -pub fn get_scheduled_ops_fund_tokens(duration: Duration) -> Decimal { - get_tokens_by_duration( - *REWARDS_PER_DAY * *OPERATIONS_REWARDS_PER_DAY_PERCENT, - duration, - ) +pub fn get_scheduled_ops_fund_tokens(epoch_emissions: Decimal) -> Decimal { + epoch_emissions * *OPERATIONS_REWARDS_PER_DAY_PERCENT } -pub fn get_scheduled_oracle_tokens(duration: Duration) -> Decimal { - get_tokens_by_duration( - *REWARDS_PER_DAY * *ORACLES_REWARDS_PER_DAY_PERCENT, - duration, - ) +pub fn get_scheduled_oracle_tokens(epoch_emissions: Decimal) -> Decimal { + epoch_emissions * *ORACLES_REWARDS_PER_DAY_PERCENT } #[derive(sqlx::FromRow)] @@ -248,9 +226,9 @@ impl GatewayShares { .map(|_| ()) } - pub fn into_iot_reward_shares( + pub fn into_reward_shares( self, - reward_period: &'_ Range>, + reward_period: &Range>, beacon_rewards_per_share: Decimal, witness_rewards_per_share: Decimal, dc_transfer_rewards_per_share: Decimal, @@ -293,38 +271,40 @@ impl GatewayShares { pub async fn calculate_rewards_per_share( &self, - reward_period: &'_ Range>, - iot_price: Decimal, + epoch_emissions: Decimal, + price_info: PriceInfo, ) -> anyhow::Result<(Decimal, Decimal, Decimal)> { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); - // the total number of iot rewards for dc transfer this epoch - let total_dc_transfer_rewards = - get_scheduled_dc_tokens(reward_period.end - reward_period.start); + // the max rewards for dc transfer this epoch + let total_dc_transfer_rewards = get_scheduled_dc_tokens(epoch_emissions); - // convert the total spent data transfer DC to it equiv iot bone value + // convert the total spent data transfer DC to it equiv hnt bone value // the rewards distributed to gateways will be equal to this // up to a max cap of total_dc_transfer_rewards // if the dc transfer rewards is less than total_dc_transfer_rewards // then the remainer will be added to the POC rewards allocation - let total_dc_transfer_rewards_used = dc_to_iot_bones(total_dc_shares, iot_price); + let total_dc_transfer_rewards_used = + dc_to_hnt_bones(total_dc_shares, price_info.price_per_bone); let (dc_transfer_rewards_unused, total_dc_transfer_rewards_capped) = normalize_dc_transfer_rewards( total_dc_transfer_rewards_used, total_dc_transfer_rewards, ); - // the total amounts of iot rewards this epoch for beacons, witnesses + + // the total amounts of hnt rewards this epoch for beacons, witnesses // taking into account any remaining dc transfer rewards - let (total_beacon_rewards, total_witness_rewards) = get_scheduled_poc_tokens( - reward_period.end - reward_period.start, - dc_transfer_rewards_unused, - ); + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(epoch_emissions, dc_transfer_rewards_unused); + // work out the rewards per share for beacons, witnesses and dc transfer let beacon_rewards_per_share = rewards_per_share(total_beacon_rewards, total_beacon_shares); + let witness_rewards_per_share = rewards_per_share(total_witness_rewards, total_witness_shares); + let dc_transfer_rewards_per_share = rewards_per_share(total_dc_transfer_rewards_capped, total_dc_shares); @@ -356,34 +336,13 @@ impl GatewayShares { } } -/// returns the equiv iot bones value for a specified dc amount -pub fn dc_to_iot_bones(dc_amount: Decimal, iot_price: Decimal) -> Decimal { - // iot prices are supplied in 10^6 *per iot token* - // we need the price at this point per iot bones - let iot_price = iot_price_to_bones(iot_price); - // use the price per bones to get the num of bones for our - // dc USD value - let dc_in_usd = dc_amount * (*DC_USD_PRICE); - (dc_in_usd / iot_price) +/// Returns the equivalent amount of Hnt bones for a specified amount of Data Credits +pub fn dc_to_hnt_bones(dc_amount: Decimal, hnt_bone_price: Decimal) -> Decimal { + let dc_in_usd = dc_amount * *DC_USD_PRICE; + (dc_in_usd / hnt_bone_price) .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::ToPositiveInfinity) } -/// returns the equiv dc value for a specified iot bones amount -pub fn iot_bones_to_dc(iot_amount: Decimal, iot_price: Decimal) -> Decimal { - // iot prices are supplied in 10^6 *per iot token* - // we need the price at this point per iot bones - let iot_price = iot_price_to_bones(iot_price); - // use the price per bones to get the value of our bones in DC - let iot_value = iot_amount * iot_price; - (iot_value / (*DC_USD_PRICE)).round_dp_with_strategy(0, RoundingStrategy::ToNegativeInfinity) -} - -pub fn iot_price_to_bones(iot_price: Decimal) -> Decimal { - iot_price - / dec!(1_000_000) // Per Iot token - / dec!(1_000_000) // Per Bone -} - pub fn normalize_dc_transfer_rewards( total_dc_transfer_rewards_used: Decimal, total_dc_transfer_rewards: Decimal, @@ -468,7 +427,15 @@ async fn aggregate_dc_shares( #[cfg(test)] mod test { use super::*; - use crate::reward_share; + use crate::{reward_share, PriceInfo}; + use chrono::Duration; + use helium_lib::token::Token; + use iot_config::sub_dao_epoch_reward_info::EpochRewardInfo; + + pub const EPOCH_ADDRESS: &str = "112E7TxoNHV46M6tiPA8N1MkeMeQxc9ztb4JQLXBVAAUfq1kJLoF"; + pub const SUB_DAO_ADDRESS: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; + + const EMISSIONS_POOL_IN_BONES_10_MINUTES: u64 = 618_340_943_683; fn reward_shares_in_dec( beacon_shares: Decimal, @@ -486,17 +453,77 @@ mod test { } } + /// returns the equiv dc value for a specified hnt bones amount + pub fn hnt_bones_to_dc(hnt_amount: Decimal, hnt_bones_price: Decimal) -> Decimal { + let value = hnt_amount * hnt_bones_price; + (value / (*DC_USD_PRICE)).round_dp_with_strategy(0, RoundingStrategy::ToNegativeInfinity) + } + + fn default_rewards_info(total_emissions: u64, epoch_duration: Duration) -> EpochRewardInfo { + let now = Utc::now(); + EpochRewardInfo { + epoch_day: 1, + epoch_address: EPOCH_ADDRESS.into(), + sub_dao_address: SUB_DAO_ADDRESS.into(), + epoch_period: (now - epoch_duration)..now, + epoch_emissions: Decimal::from(total_emissions), + rewards_issued_at: now, + } + } + #[test] - fn test_non_gateway_reward_shares() { - let epoch_duration = Duration::hours(1); - let total_tokens_for_period = *REWARDS_PER_DAY / dec!(24); - println!("total_tokens_for_period: {total_tokens_for_period}"); - - let operation_tokens_for_period = get_scheduled_ops_fund_tokens(epoch_duration); - assert_eq!( - dec!(259_703_196_347.031963470319635), - operation_tokens_for_period - ); + fn ensure_correct_conversion_of_bytes_to_bones() { + assert_eq!(dc_to_hnt_bones(Decimal::from(1), dec!(1.0)), dec!(0.00001)); + assert_eq!(dc_to_hnt_bones(Decimal::from(2), dec!(1.0)), dec!(0.00002)); + } + + #[test] + fn test_poc_scheduled_tokens() { + // set our rewards info + let rewards_info = default_rewards_info(100_000_000_000_000, Duration::hours(1)); + let (beacon_v, witness_v) = get_scheduled_poc_tokens(rewards_info.epoch_emissions, dec!(0)); + assert_eq!(dec!(6_000_000_000_000), beacon_v); + assert_eq!(dec!(24_000_000_000_000), witness_v); + } + + #[test] + fn test_poc_scheduled_tokens_with_dc_remainder() { + // set our rewards info + let rewards_info = default_rewards_info(100_000_000_000_000, Duration::hours(1)); + let (beacon_v, witness_v) = + get_scheduled_poc_tokens(rewards_info.epoch_emissions, dec!(1_000_000_000_000)); + assert_eq!(dec!(6_200_000_000_000), beacon_v); + assert_eq!(dec!(24_800_000_000_000), witness_v); + } + + #[test] + fn test_op_fund_scheduled_tokens() { + // set our rewards info + let rewards_info = default_rewards_info(100_000_000_000_000, Duration::hours(1)); + let v = get_scheduled_ops_fund_tokens(rewards_info.epoch_emissions); + assert_eq!(dec!(7_000_000_000_000), v); + } + + #[test] + fn test_oracles_scheduled_tokens() { + // set our rewards info + let rewards_info = default_rewards_info(100_000_000_000_000, Duration::hours(1)); + let v = get_scheduled_oracle_tokens(rewards_info.epoch_emissions); + assert_eq!(dec!(7_000_000_000_000), v); + } + + #[test] + fn test_price_conversion() { + let token = Token::Hnt; + let hnt_dollar_price = dec!(1.0); + let hnt_price_from_pricer = 100000000_u64; + let hnt_dollar_bone_price = dec!(0.00000001); + + let hnt_price = PriceInfo::new(hnt_price_from_pricer, token.decimals()); + + assert_eq!(hnt_dollar_bone_price, hnt_price.price_per_bone); + assert_eq!(hnt_price_from_pricer, hnt_price.price_in_bones); + assert_eq!(hnt_dollar_price, hnt_price.price_per_token); } #[tokio::test] @@ -505,7 +532,8 @@ mod test { // total epoch dc rewards amount // this results in a significant redistribution of dc rewards to POC async fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { - let iot_price = dec!(359); + let price_info = PriceInfo::new(3590000, 8); + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() .expect("failed gw1 parse"); @@ -525,9 +553,10 @@ mod test { .parse() .expect("failed gw6 parse"); - let now = Utc::now(); - let reward_period = (now - Duration::minutes(10))..now; - let total_data_transfer_tokens_for_period = get_scheduled_dc_tokens(Duration::minutes(10)); + let reward_info = + default_rewards_info(EMISSIONS_POOL_IN_BONES_10_MINUTES, Duration::minutes(10)); + let total_data_transfer_tokens_for_period = + get_scheduled_dc_tokens(reward_info.epoch_emissions); println!("total data transfer scheduled tokens: {total_data_transfer_tokens_for_period}"); let gw1_dc_spend = dec!(502); @@ -543,7 +572,8 @@ mod test { let total_dc_spend = gw1_dc_spend + gw2_dc_spend + gw3_dc_spend + gw4_dc_spend + gw5_dc_spend + gw6_dc_spend; println!("total dc spend: {total_dc_spend}"); - let total_used_data_transfer_tokens = dc_to_iot_bones(total_dc_spend, iot_price); + let total_used_data_transfer_tokens = + dc_to_hnt_bones(total_dc_spend, price_info.price_per_bone); println!("total data transfer rewards for dc spent: {total_used_data_transfer_tokens}"); let total_unused_data_transfer_tokens = total_data_transfer_tokens_for_period - total_used_data_transfer_tokens; @@ -578,23 +608,20 @@ mod test { let gw_shares = GatewayShares::new(shares).unwrap(); let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = gw_shares - .calculate_rewards_per_share(&reward_period, iot_price) + .calculate_rewards_per_share(reward_info.epoch_emissions, price_info.clone()) .await .unwrap(); - let (total_beacon_rewards, total_witness_rewards) = reward_share::get_scheduled_poc_tokens( - reward_period.end - reward_period.start, - dec!(0.0), - ); - let total_dc_rewards = - reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let (total_beacon_rewards, total_witness_rewards) = + reward_share::get_scheduled_poc_tokens(reward_info.epoch_emissions, dec!(0.0)); + let total_dc_rewards = reward_share::get_scheduled_dc_tokens(reward_info.epoch_emissions); let total_poc_dc_reward_allocation = total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut rewards: HashMap = HashMap::new(); let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares( - &reward_period, + for (reward_amount, reward) in gw_shares.into_reward_shares( + &reward_info.epoch_period, beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share, @@ -646,16 +673,26 @@ mod test { assert_eq!(data_transfer_diff, 1); // assert the expected data transfer rewards amounts per gateway - // using the dc_to_iot_bones helper function - let gw1_expected_dc_rewards = dc_to_iot_bones(gw1_dc_spend, iot_price).to_u64().unwrap(); + // using the dc_to_hnt_bones helper function + let gw1_expected_dc_rewards = dc_to_hnt_bones(gw1_dc_spend, price_info.price_per_bone) + .to_u64() + .unwrap(); assert_eq!(gw1_expected_dc_rewards.to_u64().unwrap(), 13_983_286); - let gw2_expected_dc_rewards = dc_to_iot_bones(gw2_dc_spend, iot_price).to_u64().unwrap(); + let gw2_expected_dc_rewards = dc_to_hnt_bones(gw2_dc_spend, price_info.price_per_bone) + .to_u64() + .unwrap(); assert_eq!(gw2_expected_dc_rewards.to_u64().unwrap(), 139_275_766); - let gw3_expected_dc_rewards = dc_to_iot_bones(gw3_dc_spend, iot_price).to_u64().unwrap(); + let gw3_expected_dc_rewards = dc_to_hnt_bones(gw3_dc_spend, price_info.price_per_bone) + .to_u64() + .unwrap(); assert_eq!(gw3_expected_dc_rewards.to_u64().unwrap(), 139_275_766); - let gw5_expected_dc_rewards = dc_to_iot_bones(gw5_dc_spend, iot_price).to_u64().unwrap(); + let gw5_expected_dc_rewards = dc_to_hnt_bones(gw5_dc_spend, price_info.price_per_bone) + .to_u64() + .unwrap(); assert_eq!(gw5_expected_dc_rewards.to_u64().unwrap(), 0); - let gw6_expected_dc_rewards = dc_to_iot_bones(gw6_dc_spend, iot_price).to_u64().unwrap(); + let gw6_expected_dc_rewards = dc_to_hnt_bones(gw6_dc_spend, price_info.price_per_bone) + .to_u64() + .unwrap(); assert_eq!(gw6_expected_dc_rewards.to_u64().unwrap(), 1_392_757_660); assert_eq!(gw1_rewards.dc_transfer_amount, gw1_expected_dc_rewards); assert_eq!(gw2_rewards.dc_transfer_amount, gw2_expected_dc_rewards); @@ -666,15 +703,15 @@ mod test { // assert the beacon and witness amount, these will now have an allocation // of any unused data transfer rewards assert_eq!(rewards.get(&gw4), None); // Validate zero-amount entry filtered out - assert_eq!(gw1_rewards.beacon_amount, 2_166_977_857); + assert_eq!(gw1_rewards.beacon_amount, 2_166_977_856); assert_eq!(gw1_rewards.witness_amount, 51_442_169_996); - assert_eq!(gw2_rewards.beacon_amount, 43_339_557_140); + assert_eq!(gw2_rewards.beacon_amount, 43_339_557_139); assert_eq!(gw2_rewards.witness_amount, 94_310_644_993); assert_eq!(gw3_rewards.beacon_amount, 16_252_333_927); assert_eq!(gw3_rewards.witness_amount, 68_589_559_995); - assert_eq!(gw5_rewards.beacon_amount, 4_333_955_714); + assert_eq!(gw5_rewards.beacon_amount, 4_333_955_713); assert_eq!(gw5_rewards.witness_amount, 120_031_729_992); - assert_eq!(gw6_rewards.beacon_amount, 32_504_667_855); + assert_eq!(gw6_rewards.beacon_amount, 32_504_667_854); assert_eq!(gw6_rewards.witness_amount, 60_015_864_996); // assert the total POC rewards allocated equals TOTAL_POC_REWARDS_FOR_PERIOD @@ -690,9 +727,12 @@ mod test { + gw6_rewards.beacon_amount + gw6_rewards.witness_amount; - let (exp_total_beacon_tokens, exp_total_witness_tokens) = - get_scheduled_poc_tokens(Duration::minutes(10), total_unused_data_transfer_tokens); + let (exp_total_beacon_tokens, exp_total_witness_tokens) = get_scheduled_poc_tokens( + reward_info.epoch_emissions, + total_unused_data_transfer_tokens, + ); let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; + println!("total_unused_data_transfer_tokens: {total_unused_data_transfer_tokens}"); println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); @@ -701,13 +741,14 @@ mod test { // due to going from decimal to u64 let unallocated_poc_reward_amount = total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); - assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 3); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 7); } #[tokio::test] // test reward distribution where there is zero transfer of dc rewards to poc async fn test_reward_share_calculation_without_data_transfer_distribution() { - let iot_price = dec!(359); + let price_info = PriceInfo::new(3590000, 8); + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() .expect("failed gw1 parse"); @@ -727,9 +768,10 @@ mod test { .parse() .expect("failed gw6 parse"); - let now = Utc::now(); - let reward_period = (now - Duration::minutes(10))..now; - let total_data_transfer_tokens_for_period = get_scheduled_dc_tokens(Duration::minutes(10)); + let reward_info = + default_rewards_info(EMISSIONS_POOL_IN_BONES_10_MINUTES, Duration::minutes(10)); + let total_data_transfer_tokens_for_period = + get_scheduled_dc_tokens(reward_info.epoch_emissions); println!("total data transfer scheduled tokens: {total_data_transfer_tokens_for_period}"); // get the expected total amount of dc we need to spend @@ -737,7 +779,10 @@ mod test { // distribute this amount of dc across the gateways // this results in zero unallocated dc rewards being // available to distributed to POC - let total_dc_to_spend = iot_bones_to_dc(total_data_transfer_tokens_for_period, iot_price); + let total_dc_to_spend = hnt_bones_to_dc( + total_data_transfer_tokens_for_period, + price_info.price_per_bone, + ); println!("total dc value of scheduled data transfer tokens: {total_dc_to_spend}"); // generate the rewards map @@ -784,20 +829,20 @@ mod test { let gw_shares = GatewayShares::new(shares).unwrap(); let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = gw_shares - .calculate_rewards_per_share(&reward_period, iot_price) + .calculate_rewards_per_share(reward_info.epoch_emissions, price_info) .await .unwrap(); let (total_beacon_rewards, total_witness_rewards) = - get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); - let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + get_scheduled_poc_tokens(reward_info.epoch_emissions, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_info.epoch_emissions); let total_poc_dc_reward_allocation = total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut rewards: HashMap = HashMap::new(); let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares( - &reward_period, + for (reward_amount, reward) in gw_shares.into_reward_shares( + &reward_info.epoch_period, beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share, @@ -886,7 +931,7 @@ mod test { + gw6_rewards.beacon_amount + gw6_rewards.witness_amount; let (exp_total_beacon_tokens, exp_total_witness_tokens) = - get_scheduled_poc_tokens(Duration::minutes(10), Decimal::ZERO); + get_scheduled_poc_tokens(reward_info.epoch_emissions, Decimal::ZERO); let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); @@ -902,7 +947,8 @@ mod test { #[tokio::test] // test reward distribution where there is transfer of dc rewards to poc async fn test_reward_share_calculation_with_data_transfer_distribution() { - let iot_price = dec!(359); + let price_info = PriceInfo::new(3590000, 8); + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() .expect("failed gw1 parse"); @@ -922,15 +968,19 @@ mod test { .parse() .expect("failed gw6 parse"); - let now = Utc::now(); - let reward_period = (now - Duration::minutes(10))..now; - let total_data_transfer_tokens_for_period = get_scheduled_dc_tokens(Duration::minutes(10)); + let reward_info = + default_rewards_info(EMISSIONS_POOL_IN_BONES_10_MINUTES, Duration::minutes(10)); + let total_data_transfer_tokens_for_period = + get_scheduled_dc_tokens(reward_info.epoch_emissions); println!("total_data_transfer_tokens_for_period: {total_data_transfer_tokens_for_period}"); // get the expected total amount of dc we need to spend // spread *some* of this across the gateways and then confirm // the unallocated rewards go to poc - let total_dc_to_spend = iot_bones_to_dc(total_data_transfer_tokens_for_period, iot_price); + let total_dc_to_spend = hnt_bones_to_dc( + total_data_transfer_tokens_for_period, + price_info.price_per_bone, + ); println!("total_dc_to_spend: {total_dc_to_spend}"); // generate the rewards map @@ -971,20 +1021,20 @@ mod test { let gw_shares = GatewayShares::new(shares).unwrap(); let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = gw_shares - .calculate_rewards_per_share(&reward_period, iot_price) + .calculate_rewards_per_share(reward_info.epoch_emissions, price_info) .await .unwrap(); let (total_beacon_rewards, total_witness_rewards) = - get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); - let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + get_scheduled_poc_tokens(reward_info.epoch_emissions, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_info.epoch_emissions); let total_poc_dc_reward_allocation = total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut rewards: HashMap = HashMap::new(); let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares( - &reward_period, + for (reward_amount, reward) in gw_shares.into_reward_shares( + &reward_info.epoch_period, beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share, @@ -1070,8 +1120,10 @@ mod test { let expected_data_transfer_tokens_for_poc = total_data_transfer_tokens_for_period - Decimal::from_u64(sum_data_transfer_amounts).unwrap(); println!("expected_data_transfer_tokens_for_poc: {expected_data_transfer_tokens_for_poc}"); - let (exp_total_beacon_tokens, exp_total_witness_tokens) = - get_scheduled_poc_tokens(Duration::minutes(10), expected_data_transfer_tokens_for_poc); + let (exp_total_beacon_tokens, exp_total_witness_tokens) = get_scheduled_poc_tokens( + reward_info.epoch_emissions, + expected_data_transfer_tokens_for_poc, + ); let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); @@ -1085,18 +1137,17 @@ mod test { } #[test] - fn test_dc_iot_conversion() { - let iot_price = dec!(359); //iot per token price @ 0.000359 @ 10^6 = 359 + fn test_dc_hnt_conversion() { + //hnt per token price @ 3.59 @ 10^8 = 0.00000359 + let hnt_bone_price = dec!(0.00000359); let dc_amount = dec!(1000000); - // convert the dc amount to iot and assert - let dc_iot_amt = dc_to_iot_bones(dc_amount, iot_price); - println!("dc_iot_amt: {dc_iot_amt}"); - assert_eq!(dc_iot_amt, dec!(27855153203.342618384401115)); + // convert the dc amount to hnt and assert + let dc_hnt_amt = dc_to_hnt_bones(dc_amount, hnt_bone_price); + assert_eq!(dc_hnt_amt, dec!(2785515.320334261838441)); // convert the returned iot amount back to dc and assert // it matches our original dc amount - let iot_dc_amt = iot_bones_to_dc(dc_iot_amt, iot_price); - println!("iot_dc_amt: {iot_dc_amt}"); - assert_eq!(iot_dc_amt, dc_amount); + let hnt_dc_amt = hnt_bones_to_dc(dc_hnt_amt, hnt_bone_price); + assert_eq!(hnt_dc_amt, dc_amount); } } diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 75e0449ff..3d109f13d 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,11 +1,13 @@ use crate::{ + resolve_subdao_pubkey, reward_share::{self, GatewayShares}, - telemetry, + telemetry, PriceInfo, }; use chrono::{DateTime, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; use futures::future::LocalBoxFuture; +use helium_lib::{keypair::Pubkey, token::Token}; use helium_proto::{ reward_manifest::RewardData::IotRewardData, services::poc_lora::{ @@ -15,6 +17,11 @@ use helium_proto::{ IotRewardData as ManifestIotRewardData, IotRewardToken, RewardManifest, }; use humantime_serde::re::humantime; +use iot_config::{ + client::{sub_dao_client::SubDaoEpochRewardInfoResolver, ClientError}, + sub_dao_epoch_reward_info::EpochRewardInfo, + EpochInfo, +}; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; @@ -26,13 +33,15 @@ use tokio::time::sleep; const REWARDS_NOT_CURRENT_DELAY_PERIOD: Duration = Duration::from_secs(5 * 60); -pub struct Rewarder { +pub struct Rewarder { + sub_dao: Pubkey, pub pool: Pool, pub rewards_sink: file_sink::FileSinkClient, pub reward_manifests_sink: file_sink::FileSinkClient, pub reward_period_hours: Duration, pub reward_offset: Duration, pub price_tracker: PriceTracker, + sub_dao_epoch_reward_client: A, } pub struct RewardPocDcDataPoints { @@ -41,7 +50,10 @@ pub struct RewardPocDcDataPoints { dc_transfer_rewards_per_share: Decimal, } -impl ManagedTask for Rewarder { +impl ManagedTask for Rewarder +where + A: SubDaoEpochRewardInfoResolver + Send + Sync + 'static, +{ fn start_task( self: Box, shutdown: triggered::Listener, @@ -50,63 +62,73 @@ impl ManagedTask for Rewarder { } } -impl Rewarder { - pub async fn new( +impl Rewarder +where + A: SubDaoEpochRewardInfoResolver + Send + Sync + 'static, +{ + pub fn new( pool: PgPool, rewards_sink: file_sink::FileSinkClient, reward_manifests_sink: file_sink::FileSinkClient, reward_period_hours: Duration, reward_offset: Duration, price_tracker: PriceTracker, - ) -> Self { - Self { + sub_dao_epoch_reward_client: A, + ) -> anyhow::Result { + // get the subdao address + let sub_dao = resolve_subdao_pubkey(); + tracing::info!("Iot SubDao pubkey: {}", sub_dao); + Ok(Self { + sub_dao, pool, rewards_sink, reward_manifests_sink, reward_period_hours, reward_offset, price_tracker, - } + sub_dao_epoch_reward_client, + }) } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("Starting rewarder"); - let reward_period_length = self.reward_period_hours; - loop { - let now = Utc::now(); + let next_reward_epoch = next_reward_epoch(&self.pool).await?; + let next_reward_epoch_period = EpochInfo::from(next_reward_epoch); let scheduler = Scheduler::new( - reward_period_length, - fetch_rewarded_timestamp("last_rewarded_end_time", &self.pool).await?, - fetch_rewarded_timestamp("next_rewarded_end_time", &self.pool).await?, + self.reward_period_hours, + next_reward_epoch_period.period.start, + next_reward_epoch_period.period.end, self.reward_offset, ); + let now = Utc::now(); let sleep_duration = if scheduler.should_trigger(now) { - let iot_price = self - .price_tracker - .price(&helium_proto::BlockchainTokenTypeV1::Iot) - .await?; - tracing::info!( - "Rewarding for period: {:?} with iot_price: {iot_price}", - scheduler.schedule_period - ); if self.data_current_check(&scheduler.schedule_period).await? { - self.reward(&scheduler, Decimal::from(iot_price)).await?; - scheduler.sleep_duration(Utc::now())? + match self.reward(next_reward_epoch).await { + Ok(()) => { + tracing::info!("Successfully rewarded for epoch {}", next_reward_epoch); + scheduler.sleep_duration(Utc::now())? + } + Err(e) => { + tracing::error!("Failed to reward: {}", e); + REWARDS_NOT_CURRENT_DELAY_PERIOD + } + } } else { - tracing::info!( - "rewards will be retried in {}", - humantime::format_duration(REWARDS_NOT_CURRENT_DELAY_PERIOD) - ); REWARDS_NOT_CURRENT_DELAY_PERIOD } } else { scheduler.sleep_duration(Utc::now())? }; + tracing::info!( + "rewards will be retried in {}", + humantime::format_duration(REWARDS_NOT_CURRENT_DELAY_PERIOD) + ); + let shutdown = shutdown.clone(); tokio::select! { biased; @@ -114,45 +136,69 @@ impl Rewarder { _ = sleep(sleep_duration) => (), } } - tracing::info!("stopping rewarder"); + + tracing::info!("Stopping rewarder"); Ok(()) } - pub async fn reward( - &mut self, - scheduler: &Scheduler, - iot_price: Decimal, - ) -> anyhow::Result<()> { - let reward_period = &scheduler.schedule_period; + pub async fn reward(&mut self, next_reward_epoch: u64) -> anyhow::Result<()> { + tracing::info!( + "Resolving reward info for epoch: {}, subdao: {}", + next_reward_epoch, + self.sub_dao + ); + + let reward_info = self + .sub_dao_epoch_reward_client + .resolve_info(&self.sub_dao.to_string(), next_reward_epoch) + .await? + .ok_or(anyhow::anyhow!( + "No reward info found for epoch {}", + next_reward_epoch + ))?; + + let pricer_hnt_price = self + .price_tracker + .price(&helium_proto::BlockchainTokenTypeV1::Hnt) + .await?; + + let price_info = PriceInfo::new(pricer_hnt_price, Token::Hnt.decimals()); + + tracing::info!( + "Rewarding for epoch {} period: {} to {} with hnt bone price: {} and reward pool: {}", + reward_info.epoch_day, + reward_info.epoch_period.start, + reward_info.epoch_period.end, + price_info.price_per_bone, + reward_info.epoch_emissions, + ); // process rewards for poc and dc - let poc_dc_shares = - reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?; + let poc_dc_shares = reward_poc_and_dc( + &self.pool, + &self.rewards_sink, + &reward_info, + price_info.clone(), + ) + .await?; + // process rewards for the operational fund - reward_operational(&self.rewards_sink, reward_period).await?; + reward_operational(&self.rewards_sink, &reward_info).await?; + // process rewards for the oracle - reward_oracles(&self.rewards_sink, reward_period).await?; + reward_oracles(&self.rewards_sink, &reward_info).await?; // commit the filesink let written_files = self.rewards_sink.commit().await?.await??; - // purge db let mut transaction = self.pool.begin().await?; + // Clear gateway shares table period to end of reward period - GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.schedule_period.start) + GatewayShares::clear_rewarded_shares(&mut transaction, reward_info.epoch_period.start) .await?; - save_rewarded_timestamp( - "last_rewarded_end_time", - &scheduler.schedule_period.end, - &mut transaction, - ) - .await?; - save_rewarded_timestamp( - "next_rewarded_end_time", - &scheduler.next_trigger_period().end, - &mut transaction, - ) - .await?; + + save_next_reward_epoch(&mut transaction, reward_info.epoch_day + 1).await?; + transaction.commit().await?; // now that the db has been purged, safe to write out the manifest @@ -171,19 +217,19 @@ impl Rewarder { self.reward_manifests_sink .write( RewardManifest { - start_timestamp: scheduler.schedule_period.start.encode_timestamp(), - end_timestamp: scheduler.schedule_period.end.encode_timestamp(), + start_timestamp: reward_info.epoch_period.start.encode_timestamp(), + end_timestamp: reward_info.epoch_period.end.encode_timestamp(), written_files, reward_data: Some(IotRewardData(reward_data)), - epoch: 0, // TODO: replace placeholder value - price: 0, // TODO: replace placeholder value + epoch: reward_info.epoch_day, + price: price_info.price_in_bones, }, [], ) .await? .await??; self.reward_manifests_sink.commit().await?; - telemetry::last_rewarded_end_time(scheduler.schedule_period.end); + telemetry::last_rewarded_end_time(reward_info.epoch_period.end); Ok(()) } @@ -234,27 +280,27 @@ impl Rewarder { pub async fn reward_poc_and_dc( pool: &Pool, rewards_sink: &file_sink::FileSinkClient, - reward_period: &Range>, - iot_price: Decimal, + reward_info: &EpochRewardInfo, + price_info: PriceInfo, ) -> anyhow::Result { - let reward_shares = reward_share::aggregate_reward_shares(pool, reward_period).await?; + let reward_shares = + reward_share::aggregate_reward_shares(pool, &reward_info.epoch_period).await?; let gateway_shares = GatewayShares::new(reward_shares)?; let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = gateway_shares - .calculate_rewards_per_share(reward_period, iot_price) + .calculate_rewards_per_share(reward_info.epoch_emissions, price_info) .await?; // get the total poc and dc rewards for the period let (total_beacon_rewards, total_witness_rewards) = - reward_share::get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); - let total_dc_rewards = - reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + reward_share::get_scheduled_poc_tokens(reward_info.epoch_emissions, dec!(0.0)); + let total_dc_rewards = reward_share::get_scheduled_dc_tokens(reward_info.epoch_emissions); let total_poc_dc_reward_allocation = total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut allocated_gateway_rewards = 0_u64; - for (gateway_reward_amount, reward_share) in gateway_shares.into_iot_reward_shares( - reward_period, + for (gateway_reward_amount, reward_share) in gateway_shares.into_reward_shares( + &reward_info.epoch_period, beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share, @@ -276,7 +322,7 @@ pub async fn reward_poc_and_dc( rewards_sink, UnallocatedRewardType::Poc, unallocated_poc_reward_amount, - reward_period, + &reward_info.epoch_period, ) .await?; Ok(RewardPocDcDataPoints { @@ -288,10 +334,10 @@ pub async fn reward_poc_and_dc( pub async fn reward_operational( rewards_sink: &file_sink::FileSinkClient, - reward_period: &Range>, + reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { let total_operational_rewards = - reward_share::get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start); + reward_share::get_scheduled_ops_fund_tokens(reward_info.epoch_emissions); let allocated_operational_rewards = total_operational_rewards .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() @@ -302,8 +348,8 @@ pub async fn reward_operational( rewards_sink .write( proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), + start_period: reward_info.epoch_period.start.encode_timestamp(), + end_period: reward_info.epoch_period.end.encode_timestamp(), reward: Some(ProtoReward::OperationalReward(op_fund_reward)), }, [], @@ -325,7 +371,7 @@ pub async fn reward_operational( rewards_sink, UnallocatedRewardType::Operation, unallocated_operation_reward_amount, - reward_period, + &reward_info.epoch_period, ) .await?; Ok(()) @@ -333,11 +379,11 @@ pub async fn reward_operational( pub async fn reward_oracles( rewards_sink: &file_sink::FileSinkClient, - reward_period: &Range>, + reward_info: &EpochRewardInfo, ) -> anyhow::Result<()> { // atm 100% of oracle rewards are assigned to 'unallocated' let total_oracle_rewards = - reward_share::get_scheduled_oracle_tokens(reward_period.end - reward_period.start); + reward_share::get_scheduled_oracle_tokens(reward_info.epoch_emissions); let allocated_oracle_rewards = 0_u64; let unallocated_oracle_reward_amount = (total_oracle_rewards - Decimal::from(allocated_oracle_rewards)) @@ -348,7 +394,7 @@ pub async fn reward_oracles( rewards_sink, UnallocatedRewardType::Oracle, unallocated_oracle_reward_amount, - reward_period, + &reward_info.epoch_period, ) .await?; Ok(()) @@ -358,7 +404,7 @@ async fn write_unallocated_reward( rewards_sink: &file_sink::FileSinkClient, unallocated_type: UnallocatedRewardType, unallocated_amount: u64, - reward_period: &'_ Range>, + reward_period: &Range>, ) -> anyhow::Result<()> { if unallocated_amount > 0 { let unallocated_reward = proto::IotRewardShare { @@ -373,20 +419,10 @@ async fn write_unallocated_reward( }; Ok(()) } - -pub async fn fetch_rewarded_timestamp( - timestamp_key: &str, - db: impl PgExecutor<'_>, -) -> db_store::Result> { - Utc.timestamp_opt(meta::fetch(db, timestamp_key).await?, 0) - .single() - .ok_or(db_store::Error::DecodeError) +pub async fn next_reward_epoch(db: &Pool) -> db_store::Result { + meta::fetch(db, "next_reward_epoch").await } -async fn save_rewarded_timestamp( - timestamp_key: &str, - value: &DateTime, - db: impl PgExecutor<'_>, -) -> db_store::Result<()> { - meta::store(db, timestamp_key, value.timestamp()).await +async fn save_next_reward_epoch(exec: impl PgExecutor<'_>, value: u64) -> db_store::Result<()> { + meta::store(exec, "next_reward_epoch", value).await } diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 5b1092c8d..98d0f06f6 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -514,7 +514,7 @@ where .witness_updater .get_last_witness(&beacon_report.report.pub_key) .await?; - Ok(last_witness.map_or(false, |lw| { + Ok(last_witness.is_some_and(|lw| { beacon_report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW })) } @@ -544,9 +544,8 @@ where ) -> anyhow::Result { let last_beacon_recip = LastBeaconReciprocity::get(&self.pool, &report.report.pub_key).await?; - Ok(last_beacon_recip.map_or(false, |lw| { - report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW - })) + Ok(last_beacon_recip + .is_some_and(|lw| report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW)) } } diff --git a/iot_verifier/src/telemetry.rs b/iot_verifier/src/telemetry.rs index 8f0a110c7..057ebdc63 100644 --- a/iot_verifier/src/telemetry.rs +++ b/iot_verifier/src/telemetry.rs @@ -1,10 +1,10 @@ use std::cell::RefCell; +use crate::{poc_report::Report, rewarder}; use chrono::{DateTime, Utc}; +use iot_config::EpochInfo; use sqlx::{Pool, Postgres}; -use crate::{poc_report::Report, rewarder}; - const PACKET_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "packet"); const NON_REWARDABLE_PACKET_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "non_rewardable_packet"); @@ -19,9 +19,10 @@ const INVALID_WITNESS_COUNTER: &str = const LAST_REWARDED_END_TIME: &str = "last_rewarded_end_time"; pub async fn initialize(db: &Pool) -> anyhow::Result<()> { - last_rewarded_end_time(rewarder::fetch_rewarded_timestamp(LAST_REWARDED_END_TIME, db).await?); + let next_reward_epoch = rewarder::next_reward_epoch(db).await?; + let epoch_period: EpochInfo = next_reward_epoch.into(); + last_rewarded_end_time(epoch_period.period.start); num_beacons(Report::count_all_beacons(db).await?); - Ok(()) } diff --git a/iot_verifier/tests/integrations/common/mod.rs b/iot_verifier/tests/integrations/common/mod.rs index cf4a63cb9..fc92800f6 100644 --- a/iot_verifier/tests/integrations/common/mod.rs +++ b/iot_verifier/tests/integrations/common/mod.rs @@ -1,5 +1,5 @@ use blake3::hash; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use file_store::{ file_sink::{FileSinkClient, Message as SinkMessage}, iot_beacon_report::{IotBeaconIngestReport, IotBeaconReport}, @@ -24,13 +24,42 @@ use iot_verifier::{ last_beacon_reciprocity::LastBeaconReciprocity, last_witness::LastWitness, poc_report::{InsertBindings, IotStatus, Report, ReportType}, + PriceInfo, }; +use helium_lib::token::Token; +use iot_config::sub_dao_epoch_reward_info::EpochRewardInfo; use prost::Message; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; use std::{self, ops::DerefMut, str::FromStr}; use tokio::{sync::mpsc::error::TryRecvError, sync::Mutex, time::timeout}; +pub const EPOCH_ADDRESS: &str = "112E7TxoNHV46M6tiPA8N1MkeMeQxc9ztb4JQLXBVAAUfq1kJLoF"; +pub const SUB_DAO_ADDRESS: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; +pub const EMISSIONS_POOL_IN_BONES_24_HOURS: u64 = 89_041_095_890_411; + +pub fn default_rewards_info(total_emissions: u64, epoch_duration: Duration) -> EpochRewardInfo { + let now = Utc::now(); + EpochRewardInfo { + epoch_day: 1, + epoch_address: EPOCH_ADDRESS.into(), + sub_dao_address: SUB_DAO_ADDRESS.into(), + epoch_period: (now - epoch_duration)..now, + epoch_emissions: Decimal::from(total_emissions), + rewards_issued_at: now, + } +} + +pub fn default_price_info() -> PriceInfo { + let token = Token::Hnt; + let price_info = PriceInfo::new(1, token.decimals()); + assert_eq!(price_info.price_per_token, dec!(0.00000001)); + assert_eq!(price_info.price_per_bone, dec!(0.0000000000000001)); + price_info +} + pub fn create_file_sink( ) -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(10); diff --git a/iot_verifier/tests/integrations/rewarder_operations.rs b/iot_verifier/tests/integrations/rewarder_operations.rs index 7bf9f51ca..c0326ccbb 100644 --- a/iot_verifier/tests/integrations/rewarder_operations.rs +++ b/iot_verifier/tests/integrations/rewarder_operations.rs @@ -1,5 +1,7 @@ -use crate::common::{self, MockFileSinkReceiver}; -use chrono::{Duration as ChronoDuration, Utc}; +use crate::common::{ + self, default_rewards_info, MockFileSinkReceiver, EMISSIONS_POOL_IN_BONES_24_HOURS, +}; +use chrono::Duration; use helium_proto::services::poc_lora::{IotRewardShare, OperationalReward}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; @@ -8,23 +10,24 @@ use rust_decimal_macros::dec; #[tokio::test] async fn test_operations() -> anyhow::Result<()> { let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); - let now = Utc::now(); - let epoch = (now - ChronoDuration::hours(24))..now; + + let reward_info = default_rewards_info(EMISSIONS_POOL_IN_BONES_24_HOURS, Duration::hours(24)); + let (_, rewards) = tokio::join!( - rewarder::reward_operational(&iot_rewards_client, &epoch), + rewarder::reward_operational(&iot_rewards_client, &reward_info), receive_expected_rewards(&mut iot_rewards) ); if let Ok(ops_reward) = rewards { // confirm the total rewards allocated matches expectations - let expected_total = reward_share::get_scheduled_ops_fund_tokens(epoch.end - epoch.start) - .to_u64() - .unwrap(); + let expected_total = + reward_share::get_scheduled_ops_fund_tokens(reward_info.epoch_emissions) + .to_u64() + .unwrap(); assert_eq!(ops_reward.amount, 6_232_876_712_328); assert_eq!(ops_reward.amount, expected_total); // confirm the ops percentage amount matches expectations - let daily_total = *reward_share::REWARDS_PER_DAY; - let ops_percent = (Decimal::from(ops_reward.amount) / daily_total) + let ops_percent = (Decimal::from(ops_reward.amount) / reward_info.epoch_emissions) .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); assert_eq!(ops_percent, dec!(0.07)); } else { diff --git a/iot_verifier/tests/integrations/rewarder_oracles.rs b/iot_verifier/tests/integrations/rewarder_oracles.rs index 72254a6a9..9aeb8766f 100644 --- a/iot_verifier/tests/integrations/rewarder_oracles.rs +++ b/iot_verifier/tests/integrations/rewarder_oracles.rs @@ -1,5 +1,7 @@ -use crate::common::{self, MockFileSinkReceiver}; -use chrono::{Duration as ChronoDuration, Utc}; +use crate::common::{ + self, default_rewards_info, MockFileSinkReceiver, EMISSIONS_POOL_IN_BONES_24_HOURS, +}; +use chrono::Duration; use helium_proto::services::poc_lora::{IotRewardShare, UnallocatedReward}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; @@ -9,23 +11,24 @@ use sqlx::PgPool; #[sqlx::test] async fn test_oracles(_pool: PgPool) -> anyhow::Result<()> { let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); - let now = Utc::now(); - let epoch = (now - ChronoDuration::hours(24))..now; + + let reward_info = default_rewards_info(EMISSIONS_POOL_IN_BONES_24_HOURS, Duration::hours(24)); + let (_, rewards) = tokio::join!( - rewarder::reward_oracles(&iot_rewards_client, &epoch), + rewarder::reward_oracles(&iot_rewards_client, &reward_info), receive_expected_rewards(&mut iot_rewards) ); if let Ok(unallocated_oracle_reward) = rewards { // confirm the total rewards matches expectations - let expected_total = reward_share::get_scheduled_oracle_tokens(epoch.end - epoch.start) + let expected_total = reward_share::get_scheduled_oracle_tokens(reward_info.epoch_emissions) .to_u64() .unwrap(); assert_eq!(unallocated_oracle_reward.amount, 6_232_876_712_328); assert_eq!(unallocated_oracle_reward.amount, expected_total); // confirm the ops percentage amount matches expectations - let daily_total = *reward_share::REWARDS_PER_DAY; - let oracle_percent = (Decimal::from(unallocated_oracle_reward.amount) / daily_total) + let oracle_percent = (Decimal::from(unallocated_oracle_reward.amount) + / reward_info.epoch_emissions) .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); assert_eq!(oracle_percent, dec!(0.07)); } else { diff --git a/iot_verifier/tests/integrations/rewarder_poc_dc.rs b/iot_verifier/tests/integrations/rewarder_poc_dc.rs index db822df65..1dffd7353 100644 --- a/iot_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/iot_verifier/tests/integrations/rewarder_poc_dc.rs @@ -1,5 +1,8 @@ -use crate::common::{self, MockFileSinkReceiver}; -use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use crate::common::{ + self, default_price_info, default_rewards_info, MockFileSinkReceiver, + EMISSIONS_POOL_IN_BONES_24_HOURS, +}; +use chrono::{DateTime, Duration as ChronoDuration, Duration, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora::{ GatewayReward, IotRewardShare, UnallocatedReward, UnallocatedRewardType, @@ -23,18 +26,20 @@ const HOTSPOT_4: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; #[sqlx::test] async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); - let now = Utc::now(); - let epoch = (now - ChronoDuration::hours(24))..now; + + let reward_info = default_rewards_info(EMISSIONS_POOL_IN_BONES_24_HOURS, Duration::hours(24)); + + let price_info = default_price_info(); // seed all the things let mut txn = pool.clone().begin().await?; - seed_pocs(epoch.start, &mut txn).await?; - seed_dc(epoch.start, &mut txn).await?; + seed_pocs(reward_info.epoch_period.start, &mut txn).await?; + seed_dc(reward_info.epoch_period.start, &mut txn).await?; txn.commit().await?; // run rewards for poc and dc let (_, rewards) = tokio::join!( - rewarder::reward_poc_and_dc(&pool, &iot_rewards_client, &epoch, dec!(0.0001)), + rewarder::reward_poc_and_dc(&pool, &iot_rewards_client, &reward_info, price_info), receive_expected_rewards(&mut iot_rewards) ); if let Ok((gateway_rewards, unallocated_poc_reward)) = rewards { @@ -51,9 +56,11 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { gateway_rewards[1].hotspot_key, PublicKeyBinary::from_str(HOTSPOT_2).unwrap().as_ref() ); + assert_eq!(gateway_rewards[1].beacon_amount, 0); assert_eq!(gateway_rewards[1].witness_amount, 8_547_945_205_479); assert_eq!(gateway_rewards[1].dc_transfer_amount, 29_680_365_296_803); + // hotspot 2 should have double the dc rewards of hotspot 1 assert_eq!( gateway_rewards[1].dc_transfer_amount, @@ -96,16 +103,16 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let dc_sum: u64 = gateway_rewards.iter().map(|r| r.dc_transfer_amount).sum(); let unallocated_sum: u64 = unallocated_poc_reward.amount; - let expected_dc = reward_share::get_scheduled_dc_tokens(epoch.end - epoch.start); + let expected_dc = reward_share::get_scheduled_dc_tokens(reward_info.epoch_emissions); let (expected_beacon_sum, expected_witness_sum) = - reward_share::get_scheduled_poc_tokens(epoch.end - epoch.start, expected_dc); + reward_share::get_scheduled_poc_tokens(reward_info.epoch_emissions, expected_dc); let expected_total = expected_beacon_sum.to_u64().unwrap() + expected_witness_sum.to_u64().unwrap(); assert_eq!(expected_total, poc_sum + dc_sum + unallocated_sum); // confirm the poc & dc percentage amount matches expectations - let daily_total = *reward_share::REWARDS_PER_DAY; - let poc_dc_percent = (Decimal::from(poc_sum + dc_sum + unallocated_sum) / daily_total) + let poc_dc_percent = (Decimal::from(poc_sum + dc_sum + unallocated_sum) + / reward_info.epoch_emissions) .round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); assert_eq!(poc_dc_percent, dec!(0.8)); } else { diff --git a/mobile_config/src/client/sub_dao_client.rs b/mobile_config/src/client/sub_dao_client.rs index adcba54ac..bb553ad40 100644 --- a/mobile_config/src/client/sub_dao_client.rs +++ b/mobile_config/src/client/sub_dao_client.rs @@ -69,6 +69,7 @@ impl SubDaoEpochRewardInfoResolver for SubDaoClient { Err(status) if status.code() == tonic::Code::NotFound => None, Err(status) => Err(status)?, }; + tracing::debug!(?response, "fetched subdao epoch info"); Ok(response) } } diff --git a/mobile_config/src/sub_dao_service.rs b/mobile_config/src/sub_dao_service.rs index 129036115..095aa5f9b 100644 --- a/mobile_config/src/sub_dao_service.rs +++ b/mobile_config/src/sub_dao_service.rs @@ -73,7 +73,10 @@ impl sub_dao::sub_dao_server::SubDao for SubDaoService { sub_dao_epoch_reward_info::db::get_info(&self.metadata_pool, epoch, &sub_dao_address) .await - .map_err(|_| Status::internal("error fetching sub_dao epoch reward info"))? + .map_err(|e| { + tracing::error!(error = %e, "error fetching sub_dao epoch reward info"); + Status::internal("error fetching sub_dao epoch reward info") + })? .map_or_else( || { telemetry::count_epoch_chain_lookup("not-found"); diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 5ef717084..a8dce5ddd 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -547,11 +547,11 @@ impl CoverageClaimTimeCache { Self { cache } } - pub async fn fetch_coverage_claim_time<'a, 'b>( + pub async fn fetch_coverage_claim_time( &self, - radio_key: KeyType<'a>, - coverage_object: &'a Option, - exec: &mut Transaction<'b, Postgres>, + radio_key: KeyType<'_>, + coverage_object: &'_ Option, + exec: &mut Transaction<'_, Postgres>, ) -> Result>, sqlx::Error> { let key = (radio_key.to_id(), *coverage_object); if let Some(coverage_claim_time) = self.cache.get(&key).await { diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 7f1940777..f8bc9a529 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -215,7 +215,7 @@ pub async fn sum_data_sessions_to_dc_by_payer<'a>( .collect::>()) } -pub async fn data_sessions_to_dc<'a>( +pub async fn data_sessions_to_dc( stream: impl Stream>, ) -> Result { tokio::pin!(stream); diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 2be924a1d..f5c005004 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -257,6 +257,12 @@ where } pub async fn reward(&self, next_reward_epoch: u64) -> anyhow::Result<()> { + tracing::info!( + "Resolving reward info for epoch: {}, subdao: {}", + next_reward_epoch, + self.sub_dao + ); + let reward_info = self .sub_dao_epoch_reward_client .resolve_info(&self.sub_dao.to_string(), next_reward_epoch) @@ -274,11 +280,12 @@ where let price_info = PriceInfo::new(pricer_hnt_price, Token::Hnt.decimals()); tracing::info!( - "Rewarding for epoch {} period: {} to {} with bone price: {}", + "Rewarding for epoch {} period: {} to {} with hnt bone price: {} and reward pool: {}", reward_info.epoch_day, reward_info.epoch_period.start, reward_info.epoch_period.end, - price_info.price_per_bone + price_info.price_per_bone, + reward_info.epoch_emissions, ); // process rewards for poc and data transfer diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 5ed549c47..746e98a8a 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -275,7 +275,7 @@ pub async fn get_latest_speedtests_for_pubkey( Ok(speedtests) } -pub async fn aggregate_epoch_speedtests<'a>( +pub async fn aggregate_epoch_speedtests( epoch_end: DateTime, exec: &sqlx::Pool, ) -> Result { diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index bc6c345a1..c6322b2ce 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -2,7 +2,10 @@ use crate::{reward_index, settings, telemetry, Settings}; use anyhow::{anyhow, bail, Result}; use chrono::Utc; use file_store::{ - file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore, + file_info_poller::FileInfoStream, + reward_manifest::RewardData::{self, IotRewardData, MobileRewardData}, + reward_manifest::RewardManifest, + FileInfo, FileStore, }; use futures::{stream, StreamExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; @@ -11,7 +14,7 @@ use helium_proto::{ poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare}, poc_mobile::{mobile_reward_share::Reward as MobileReward, MobileRewardShare}, }, - Message, ServiceProvider, + IotRewardToken, Message, MobileRewardToken, ServiceProvider, }; use poc_metrics::record_duration; use sqlx::{Pool, Postgres, Transaction}; @@ -112,6 +115,9 @@ impl Indexer { ) .boxed(); + // if the token type defined in the reward data is not HNT, then bail + self.verify_token_type(&manifest.reward_data)?; + let mut reward_shares = self.verifier_store.source_unordered(5, reward_files); let mut hotspot_rewards: HashMap = HashMap::new(); @@ -225,4 +231,27 @@ impl Indexer { } } } + + fn verify_token_type(&self, reward_data: &Option) -> Result<()> { + match reward_data { + Some(MobileRewardData { token, .. }) => { + if *token != MobileRewardToken::Hnt { + bail!( + "legacy token type defined in manifest: {}", + token.as_str_name() + ); + } + } + Some(IotRewardData { token, .. }) => { + if *token != IotRewardToken::Hnt { + bail!( + "legacy token type defined in manifest: {}", + token.as_str_name() + ); + } + } + None => bail!("missing reward data in manifest"), + } + Ok(()) + } } diff --git a/reward_scheduler/src/lib.rs b/reward_scheduler/src/lib.rs index a2aebbac0..00d4bba5f 100644 --- a/reward_scheduler/src/lib.rs +++ b/reward_scheduler/src/lib.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use std::{ops::Range, time::Duration}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Scheduler { pub period_duration: Duration, pub schedule_period: Range>,