From ef4f1f0aef08ac2a3c70e3421afed387b360c786 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Tue, 4 Apr 2023 17:38:24 +0100 Subject: [PATCH 1/9] refactor to enable multiple reward types, output operational fund rewards --- Cargo.lock | 4 +- file_store/src/file_info.rs | 10 +- iot_verifier/src/main.rs | 7 +- iot_verifier/src/reward_share.rs | 155 +++++++++++++++++++------------ iot_verifier/src/rewarder.rs | 24 +++-- reward_index/src/indexer.rs | 40 +++++--- reward_index/src/reward_index.rs | 7 +- reward_index/src/settings.rs | 5 + 8 files changed, 160 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce0baa226..e3977999c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,7 +1105,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/gateway-rs.git?branch=main#e362b7537ad852c39f5c09523f4c721829c8617d" +source = "git+https://github.com/helium/gateway-rs.git?branch=main#3ad4f7ea4d0fd2f058340ad070982ca2b2f4a883" dependencies = [ "base64 0.21.0", "byteorder", @@ -2909,7 +2909,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#6edba809feafb605b59b377fa15800b4443b5508" +source = "git+https://github.com/helium/proto?branch=master#4a5ac3f020dc18bbf075c6738375fb09c0dcd443" dependencies = [ "bytes", "prost", diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index c0b08f96b..34dc194aa 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -115,7 +115,7 @@ pub const REWARD_MANIFEST: &str = "reward_manifest"; pub const IOT_PACKET_REPORT: &str = "packetreport"; pub const IOT_VALID_PACKET: &str = "iot_valid_packet"; pub const INVALID_PACKET: &str = "invalid_packet"; -pub const GATEWAY_REWARD_SHARE: &str = "gateway_reward_share"; +pub const IOT_REWARD_SHARE: &str = "iot_reward_share"; pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ingest_report"; pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; pub const PRICE_REPORT: &str = "price_report"; @@ -143,7 +143,7 @@ pub enum FileType { IotPacketReport, IotValidPacket, InvalidPacket, - GatewayRewardShare, + IotRewardShare, DataTransferSessionIngestReport, ValidDataTransferSession, PriceReport, @@ -172,7 +172,7 @@ impl fmt::Display for FileType { Self::IotPacketReport => IOT_PACKET_REPORT, Self::IotValidPacket => IOT_VALID_PACKET, Self::InvalidPacket => INVALID_PACKET, - Self::GatewayRewardShare => GATEWAY_REWARD_SHARE, + Self::IotRewardShare => IOT_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, @@ -204,7 +204,7 @@ impl FileType { Self::IotPacketReport => IOT_PACKET_REPORT, Self::IotValidPacket => IOT_VALID_PACKET, Self::InvalidPacket => INVALID_PACKET, - Self::GatewayRewardShare => GATEWAY_REWARD_SHARE, + Self::IotRewardShare => IOT_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, @@ -236,7 +236,7 @@ impl FromStr for FileType { IOT_PACKET_REPORT => Self::IotPacketReport, IOT_VALID_PACKET => Self::IotValidPacket, INVALID_PACKET => Self::InvalidPacket, - GATEWAY_REWARD_SHARE => Self::GatewayRewardShare, + IOT_REWARD_SHARE => Self::IotRewardShare, DATA_TRANSFER_SESSION_INGEST_REPORT => Self::DataTransferSessionIngestReport, VALID_DATA_TRANSFER_SESSION => Self::ValidDataTransferSession, PRICE_REPORT => Self::PriceReport, diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 51f937231..7066ae2f0 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -20,6 +20,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Debug, clap::Parser)] #[clap(version = env!("CARGO_PKG_VERSION"))] #[clap(about = "Helium POC IOT Verifier")] + pub struct Cli { /// Optional configuration file to use. If present the toml file at the /// given path will be loaded. Environemnt variables can override the @@ -93,8 +94,8 @@ impl Server { let store_base_path = std::path::Path::new(&settings.cache); // Gateway reward shares sink - let (gateway_rewards_sink, mut gateway_rewards_server) = file_sink::FileSinkBuilder::new( - FileType::GatewayRewardShare, + let (rewards_sink, mut gateway_rewards_server) = file_sink::FileSinkBuilder::new( + FileType::IotRewardShare, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_gateway_reward_shares"), ) @@ -116,7 +117,7 @@ impl Server { let rewarder = Rewarder { pool: pool.clone(), - gateway_rewards_sink, + rewards_sink, reward_manifests_sink, reward_period_hours: settings.rewards, reward_offset: settings.reward_offset_duration(), diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 243c19529..d6f873dc5 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -4,7 +4,7 @@ use file_store::{iot_packet::IotValidPacket, iot_valid_poc::IotPoc, traits::Time use futures::stream::TryStreamExt; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora as proto; - +use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; use lazy_static::lazy_static; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -23,6 +23,8 @@ lazy_static! { static ref WITNESS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.24); // Data transfer is allocated 50% of daily rewards static ref DATA_TRANSFER_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.50); + // Operations fund is allocated 7% of daily rewards + static ref OPERATIONS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07); // dc remainer distributed at ration of 4:1 in favour of witnesses // ie WITNESS_REWARDS_PER_DAY_PERCENT:BEACON_REWARDS_PER_DAY_PERCENT static ref WITNESS_DC_REMAINER_PERCENT: Decimal = dec!(0.80); @@ -62,6 +64,16 @@ fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { ) } +fn get_scheduled_ops_fund_tokens(duration: Duration) -> u64 { + get_tokens_by_duration( + *REWARDS_PER_DAY * *OPERATIONS_REWARDS_PER_DAY_PERCENT, + duration, + ) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0) +} + #[derive(sqlx::FromRow)] pub struct GatewayPocShare { pub hotspot_key: PublicKeyBinary, @@ -295,7 +307,7 @@ impl GatewayShares { self, reward_period: &'_ Range>, iot_price: Decimal, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); @@ -330,30 +342,52 @@ impl GatewayShares { // compute the awards per hotspot self.shares .into_iter() - .map( - move |(hotspot_key, reward_shares)| proto::GatewayRewardShare { - hotspot_key: hotspot_key.into(), - beacon_amount: compute_rewards( - beacon_rewards_per_share, - reward_shares.beacon_shares, - ), - witness_amount: compute_rewards( - witness_rewards_per_share, - reward_shares.witness_shares, - ), - dc_transfer_amount: compute_rewards( - dc_transfer_rewards_per_share, - reward_shares.dc_shares, - ), - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - }, - ) + .map(move |(hotspot_key, reward_shares)| proto::GatewayReward { + hotspot_key: hotspot_key.into(), + beacon_amount: compute_rewards( + beacon_rewards_per_share, + reward_shares.beacon_shares, + ), + witness_amount: compute_rewards( + witness_rewards_per_share, + reward_shares.witness_shares, + ), + dc_transfer_amount: compute_rewards( + dc_transfer_rewards_per_share, + reward_shares.dc_shares, + ), + }) .filter(|reward_share| { reward_share.beacon_amount > 0 || reward_share.witness_amount > 0 || reward_share.dc_transfer_amount > 0 }) + .map(|gateway_reward| proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::GatewayReward(gateway_reward)), + }) + } +} + +#[derive(Default)] +pub struct OperationalRewards { + pub rewards: Vec, +} + +impl OperationalRewards { + pub fn compute(reward_period: &Range>) -> Self { + let mut operational_rewards = Self::default(); + let op_fund_reward = proto::OperationalReward { + amount: get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start), + }; + let op_fund_share = proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::OperationalReward(op_fund_reward)) + }; + operational_rewards.rewards.push(op_fund_share); + operational_rewards } } @@ -424,6 +458,16 @@ mod test { } } + #[test] + fn test_non_gateway_reward_shares() { + let epoch_duration = Duration::hours(1); + let total_tokens_for_period = *REWARDS_PER_DAY / dec!(24); + println!("total_tokens_for_period: {total_tokens_for_period}"); + + let operation_tokens_for_period = get_scheduled_ops_fund_tokens(epoch_duration); + assert_eq!(479452054794, operation_tokens_for_period); + } + #[test] // test reward distribution where there is a fixed dc spend per gateway // with the total dc spend across all gateways being significantly lower than the @@ -499,21 +543,20 @@ mod test { gw6.clone(), reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; - let rewards: HashMap = gw_shares + let gw_shares = GatewayShares { shares }; + let mut rewards: HashMap = HashMap::new(); + let gw_reward_shares: Vec = gw_shares .into_gateway_reward_shares(&reward_period, iot_price) - .map(|reward| { - ( - reward - .hotspot_key - .clone() - .try_into() - .expect("failed to decode hotspot_key"), - reward, - ) - }) .collect(); + for reward in gw_reward_shares { + if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + rewards.insert( + gateway_reward.hotspot_key.clone().try_into().unwrap(), + gateway_reward, + ); + } + } let gw1_rewards = rewards .get(&gw1) @@ -679,21 +722,20 @@ mod test { gw6.clone(), reward_shares_in_dec(dec!(150), dec!(350), total_dc_to_spend * dec!(0.8)), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; - let rewards: HashMap = gw_shares + let gw_shares = GatewayShares { shares }; + let mut rewards: HashMap = HashMap::new(); + let gw_reward_shares: Vec = gw_shares .into_gateway_reward_shares(&reward_period, iot_price) - .map(|reward| { - ( - reward - .hotspot_key - .clone() - .try_into() - .expect("failed to decode hotspot_key"), - reward, - ) - }) .collect(); + for reward in gw_reward_shares { + if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + rewards.insert( + gateway_reward.hotspot_key.clone().try_into().unwrap(), + gateway_reward, + ); + } + } let gw1_rewards = rewards .get(&gw1) @@ -840,21 +882,20 @@ mod test { gw6.clone(), reward_shares_in_dec(dec!(150), dec!(350), total_dc_to_spend * dec!(0.2)), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; - let rewards: HashMap = gw_shares + let gw_shares = GatewayShares { shares }; + let mut rewards: HashMap = HashMap::new(); + let gw_reward_shares: Vec = gw_shares .into_gateway_reward_shares(&reward_period, iot_price) - .map(|reward| { - ( - reward - .hotspot_key - .clone() - .try_into() - .expect("failed to decode hotspot_key"), - reward, - ) - }) .collect(); + for reward in gw_reward_shares { + if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + rewards.insert( + gateway_reward.hotspot_key.clone().try_into().unwrap(), + gateway_reward, + ); + } + } let gw1_rewards = rewards .get(&gw1) diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index b930696a1..404b3c953 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,4 +1,4 @@ -use crate::{reward_share::GatewayShares, scheduler::Scheduler}; +use crate::{reward_share::GatewayShares, reward_share::OperationalRewards, scheduler::Scheduler}; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; @@ -10,7 +10,7 @@ use tokio::time::sleep; pub struct Rewarder { pub pool: Pool, - pub gateway_rewards_sink: file_sink::FileSinkClient, + pub rewards_sink: file_sink::FileSinkClient, pub reward_manifests_sink: file_sink::FileSinkClient, pub reward_period_hours: i64, pub reward_offset: Duration, @@ -67,19 +67,29 @@ impl Rewarder { scheduler: &Scheduler, iot_price: Decimal, ) -> anyhow::Result<()> { - let reward_shares = GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?; + let gateway_reward_shares = + GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?; + + let operational_rewards = OperationalRewards::compute(&scheduler.reward_period); + for reward_share in - reward_shares.into_gateway_reward_shares(&scheduler.reward_period, iot_price) + gateway_reward_shares.into_gateway_reward_shares(&scheduler.reward_period, iot_price) { - self.gateway_rewards_sink + self.rewards_sink + .write(reward_share, []) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; + } + for reward_share in operational_rewards.rewards { + self.rewards_sink .write(reward_share, []) .await? // Await the returned oneshot to ensure we wrote the file .await??; } - let written_files = self.gateway_rewards_sink.commit().await?.await??; - + let written_files = self.rewards_sink.commit().await?.await??; // Write the rewards manifest for the completed period self.reward_manifests_sink .write( diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index f24e59fc2..70a612614 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -1,13 +1,13 @@ use crate::{reward_index, settings, Settings}; -use anyhow::Result; +use anyhow::{Result, bail}; use file_store::{ file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore, }; use futures::{stream, StreamExt, TryStreamExt}; -use helium_crypto::PublicKey; use helium_proto::{ - services::poc_lora::GatewayRewardShare, services::poc_mobile::RadioRewardShare, Message, + services::poc_mobile::RadioRewardShare, services::poc_lora::IotRewardShare, Message, }; +use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; use poc_metrics::record_duration; use sqlx::{Pool, Postgres, Transaction}; use std::{collections::HashMap, str::FromStr}; @@ -17,6 +17,7 @@ pub struct Indexer { pool: Pool, verifier_store: FileStore, mode: settings::Mode, + op_fund_key: Vec, } impl Indexer { @@ -25,6 +26,7 @@ impl Indexer { mode: settings.mode, verifier_store: FileStore::from_settings(&settings.verifier).await?, pool, + op_fund_key: settings.operation_fund_key(), }) } @@ -79,31 +81,41 @@ impl Indexer { let mut hotspot_rewards: HashMap, u64> = HashMap::new(); while let Some(msg) = reward_shares.try_next().await? { - let (hotspot_key, amount) = extract_reward_share(&self.mode, &msg)?; - *hotspot_rewards.entry(hotspot_key).or_default() += amount; + let (key, amount) = extract_reward_share(&self.mode, &self.op_fund_key, &msg)?; + *hotspot_rewards.entry(key).or_default() += amount; } - for (address, amount) in hotspot_rewards { - let pub_key = PublicKey::try_from(address)?; - reward_index::insert(&mut *txn, &pub_key, amount, &manifest_time).await?; + for (key, amount) in hotspot_rewards { + reward_index::insert(&mut *txn, &key, amount, &manifest_time).await?; } Ok(()) } } -fn extract_reward_share(mode: &settings::Mode, msg: &[u8]) -> Result<(Vec, u64)> { +fn extract_reward_share(mode: &settings::Mode, op_fund_key: &[u8], msg: &[u8]) -> Result<(Vec, u64)> { match mode { settings::Mode::Mobile => { let share = RadioRewardShare::decode(msg)?; Ok((share.hotspot_key, share.amount)) } settings::Mode::Iot => { - let share = GatewayRewardShare::decode(msg)?; - Ok(( - share.hotspot_key, - share.witness_amount + share.beacon_amount + share.dc_transfer_amount, - )) + let share = IotRewardShare::decode(msg)?; + match share.reward { + Some(ProtoReward::GatewayReward(r)) => { + Ok(( + r.hotspot_key, + r.witness_amount + r.beacon_amount + r.dc_transfer_amount, + )) + }, + Some(ProtoReward::OperationalReward(r)) => { + Ok(( + op_fund_key.to_vec(), + r.amount + )) + }, + _ => bail!("got an invalid reward share"), + } } } } diff --git a/reward_index/src/reward_index.rs b/reward_index/src/reward_index.rs index 5ddb2e4af..2edae8dcd 100644 --- a/reward_index/src/reward_index.rs +++ b/reward_index/src/reward_index.rs @@ -1,9 +1,8 @@ use chrono::{DateTime, Utc}; -use helium_crypto::PublicKey; pub async fn insert<'c, E>( executor: E, - address: &PublicKey, + address: &Vec, amount: u64, timestamp: &DateTime, ) -> Result<(), sqlx::Error> @@ -18,8 +17,8 @@ where sqlx::query( r#" insert into reward_index ( - address, - rewards, + address, + rewards, last_reward ) values ($1, $2, $3) on conflict(address) do update set diff --git a/reward_index/src/settings.rs b/reward_index/src/settings.rs index 8ba88b16c..16981708c 100644 --- a/reward_index/src/settings.rs +++ b/reward_index/src/settings.rs @@ -35,6 +35,7 @@ pub struct Settings { pub database: db_store::Settings, pub verifier: file_store::Settings, pub metrics: poc_metrics::Settings, + pub operation_fund_key: String, } pub fn default_log() -> String { @@ -67,6 +68,10 @@ impl Settings { pub fn interval(&self) -> Duration { Duration::seconds(self.interval) } + + pub fn operation_fund_key(&self) -> Vec { + self.operation_fund_key.clone().into_bytes() + } } fn default_interval() -> i64 { From 9e83369b63c49d61cd1620930b1d2c7d22046e04 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 7 Apr 2023 16:24:00 +0100 Subject: [PATCH 2/9] add support for reward type to db --- .../migrations/5_add_type_to_index.sql | 1 + reward_index/src/indexer.rs | 101 ++++++++++++------ reward_index/src/reward_index.rs | 5 +- 3 files changed, 75 insertions(+), 32 deletions(-) create mode 100644 reward_index/migrations/5_add_type_to_index.sql diff --git a/reward_index/migrations/5_add_type_to_index.sql b/reward_index/migrations/5_add_type_to_index.sql new file mode 100644 index 000000000..fb3f33555 --- /dev/null +++ b/reward_index/migrations/5_add_type_to_index.sql @@ -0,0 +1 @@ +ALTER TABLE reward_index ADD reward_type text; diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index 70a612614..b6c11e8b7 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -1,15 +1,16 @@ use crate::{reward_index, settings, Settings}; -use anyhow::{Result, bail}; +use anyhow::{bail, Result}; use file_store::{ file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore, }; use futures::{stream, StreamExt, TryStreamExt}; +use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; use helium_proto::{ - services::poc_mobile::RadioRewardShare, services::poc_lora::IotRewardShare, Message, + services::poc_lora::IotRewardShare, services::poc_mobile::RadioRewardShare, Message, }; -use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; use poc_metrics::record_duration; use sqlx::{Pool, Postgres, Transaction}; +use std::fmt; use std::{collections::HashMap, str::FromStr}; use tokio::sync::mpsc::Receiver; @@ -20,6 +21,29 @@ pub struct Indexer { op_fund_key: Vec, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum RewardType { + Mobile, + IotGateway, + IotOperational, +} + +impl fmt::Display for RewardType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Mobile => f.write_str("mobile"), + Self::IotGateway => f.write_str("iot_gateway"), + Self::IotOperational => f.write_str("iot_operational"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RewardKey { + key: Vec, + reward_type: RewardType, +} + impl Indexer { pub async fn new(settings: &Settings, pool: Pool) -> Result { Ok(Self { @@ -77,44 +101,59 @@ impl Indexer { .boxed(); let mut reward_shares = self.verifier_store.source_unordered(5, reward_files); - - let mut hotspot_rewards: HashMap, u64> = HashMap::new(); + let mut hotspot_rewards: HashMap = HashMap::new(); while let Some(msg) = reward_shares.try_next().await? { - let (key, amount) = extract_reward_share(&self.mode, &self.op_fund_key, &msg)?; + let (key, amount) = self.extract_reward_share(&msg)?; *hotspot_rewards.entry(key).or_default() += amount; } - for (key, amount) in hotspot_rewards { - reward_index::insert(&mut *txn, &key, amount, &manifest_time).await?; + for (reward_key, amount) in hotspot_rewards { + reward_index::insert( + &mut *txn, + &reward_key.key, + amount, + reward_key.reward_type.to_string(), + &manifest_time, + ) + .await?; } Ok(()) } -} -fn extract_reward_share(mode: &settings::Mode, op_fund_key: &[u8], msg: &[u8]) -> Result<(Vec, u64)> { - match mode { - settings::Mode::Mobile => { - let share = RadioRewardShare::decode(msg)?; - Ok((share.hotspot_key, share.amount)) - } - settings::Mode::Iot => { - let share = IotRewardShare::decode(msg)?; - match share.reward { - Some(ProtoReward::GatewayReward(r)) => { - Ok(( - r.hotspot_key, - r.witness_amount + r.beacon_amount + r.dc_transfer_amount, - )) - }, - Some(ProtoReward::OperationalReward(r)) => { - Ok(( - op_fund_key.to_vec(), - r.amount - )) - }, - _ => bail!("got an invalid reward share"), + fn extract_reward_share(&self, msg: &[u8]) -> Result<(RewardKey, u64)> { + match self.mode { + settings::Mode::Mobile => { + let share = RadioRewardShare::decode(msg)?; + let key = RewardKey { + key: share.hotspot_key, + reward_type: RewardType::Mobile, + }; + Ok((key, share.amount)) + } + settings::Mode::Iot => { + let share = IotRewardShare::decode(msg)?; + match share.reward { + Some(ProtoReward::GatewayReward(r)) => { + let key = RewardKey { + key: r.hotspot_key, + reward_type: RewardType::IotGateway, + }; + Ok(( + key, + r.witness_amount + r.beacon_amount + r.dc_transfer_amount, + )) + } + Some(ProtoReward::OperationalReward(r)) => { + let key = RewardKey { + key: self.op_fund_key.clone().to_vec(), + reward_type: RewardType::IotOperational, + }; + Ok((key, r.amount)) + } + _ => bail!("got an invalid reward share"), + } } } } diff --git a/reward_index/src/reward_index.rs b/reward_index/src/reward_index.rs index 2edae8dcd..2eed681f8 100644 --- a/reward_index/src/reward_index.rs +++ b/reward_index/src/reward_index.rs @@ -4,6 +4,7 @@ pub async fn insert<'c, E>( executor: E, address: &Vec, amount: u64, + reward_type: String, timestamp: &DateTime, ) -> Result<(), sqlx::Error> where @@ -20,7 +21,8 @@ where address, rewards, last_reward - ) values ($1, $2, $3) + reward_type + ) values ($1, $2, $3, $4) on conflict(address) do update set rewards = reward_index.rewards + EXCLUDED.rewards, last_reward = EXCLUDED.last_reward @@ -29,6 +31,7 @@ where .bind(address) .bind(amount as i64) .bind(timestamp) + .bind(reward_type) .execute(executor) .await?; From 42bf397b8dedbd8817b0321a24a7ff7c00300d09 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 7 Apr 2023 16:42:58 +0100 Subject: [PATCH 3/9] fix fmting --- iot_verifier/src/reward_share.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index d6f873dc5..78cbc69a6 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -384,7 +384,7 @@ impl OperationalRewards { let op_fund_share = proto::IotRewardShare { start_period: reward_period.start.encode_timestamp(), end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::OperationalReward(op_fund_reward)) + reward: Some(ProtoReward::OperationalReward(op_fund_reward)), }; operational_rewards.rewards.push(op_fund_share); operational_rewards From 606d431042dc89bb4bbc3e5f427aa70d1a0c19de Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 10:43:43 -0400 Subject: [PATCH 4/9] Small refactor and making operation fund key an optional setting --- iot_verifier/src/reward_share.rs | 8 ++++---- iot_verifier/src/rewarder.rs | 2 +- reward_index/src/indexer.rs | 9 +++++++-- reward_index/src/settings.rs | 8 +++++--- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 78cbc69a6..aba158e81 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -303,7 +303,7 @@ impl GatewayShares { Ok(()) } - pub fn into_gateway_reward_shares( + pub fn into_iot_reward_shares( self, reward_period: &'_ Range>, iot_price: Decimal, @@ -547,7 +547,7 @@ mod test { let gw_shares = GatewayShares { shares }; let mut rewards: HashMap = HashMap::new(); let gw_reward_shares: Vec = gw_shares - .into_gateway_reward_shares(&reward_period, iot_price) + .into_iot_reward_shares(&reward_period, iot_price) .collect(); for reward in gw_reward_shares { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { @@ -726,7 +726,7 @@ mod test { let gw_shares = GatewayShares { shares }; let mut rewards: HashMap = HashMap::new(); let gw_reward_shares: Vec = gw_shares - .into_gateway_reward_shares(&reward_period, iot_price) + .into_iot_reward_shares(&reward_period, iot_price) .collect(); for reward in gw_reward_shares { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { @@ -886,7 +886,7 @@ mod test { let gw_shares = GatewayShares { shares }; let mut rewards: HashMap = HashMap::new(); let gw_reward_shares: Vec = gw_shares - .into_gateway_reward_shares(&reward_period, iot_price) + .into_iot_reward_shares(&reward_period, iot_price) .collect(); for reward in gw_reward_shares { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 404b3c953..cef4fdb4e 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -73,7 +73,7 @@ impl Rewarder { let operational_rewards = OperationalRewards::compute(&scheduler.reward_period); for reward_share in - gateway_reward_shares.into_gateway_reward_shares(&scheduler.reward_period, iot_price) + gateway_reward_shares.into_iot_reward_shares(&scheduler.reward_period, iot_price) { self.rewards_sink .write(reward_share, []) diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index b6c11e8b7..12767e672 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -1,5 +1,5 @@ use crate::{reward_index, settings, Settings}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use file_store::{ file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore, }; @@ -50,7 +50,12 @@ impl Indexer { mode: settings.mode, verifier_store: FileStore::from_settings(&settings.verifier).await?, pool, - op_fund_key: settings.operation_fund_key(), + op_fund_key: match settings.mode { + settings::Mode::Iot => settings + .operation_fund_key() + .ok_or_else(|| anyhow!("operation fund key is required for IOT mode"))?, + settings::Mode::Mobile => vec![], + }, }) } diff --git a/reward_index/src/settings.rs b/reward_index/src/settings.rs index 16981708c..eba8d92e6 100644 --- a/reward_index/src/settings.rs +++ b/reward_index/src/settings.rs @@ -35,7 +35,7 @@ pub struct Settings { pub database: db_store::Settings, pub verifier: file_store::Settings, pub metrics: poc_metrics::Settings, - pub operation_fund_key: String, + pub operation_fund_key: Option, } pub fn default_log() -> String { @@ -69,8 +69,10 @@ impl Settings { Duration::seconds(self.interval) } - pub fn operation_fund_key(&self) -> Vec { - self.operation_fund_key.clone().into_bytes() + pub fn operation_fund_key(&self) -> Option> { + self.operation_fund_key + .clone() + .map(|string| string.into_bytes()) } } From 23b8a7fe7f51fccf46849266ba0947a5dffe6029 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 11:10:08 -0400 Subject: [PATCH 5/9] Change OperationalRewards to be mod that returns single IotRewardShare --- iot_verifier/src/reward_share.rs | 16 +++++----------- iot_verifier/src/rewarder.rs | 17 +++++++---------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index aba158e81..91e2c8ed7 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -370,24 +370,18 @@ impl GatewayShares { } } -#[derive(Default)] -pub struct OperationalRewards { - pub rewards: Vec, -} +pub mod operational_rewards { + use super::*; -impl OperationalRewards { - pub fn compute(reward_period: &Range>) -> Self { - let mut operational_rewards = Self::default(); + pub fn compute(reward_period: &Range>) -> proto::IotRewardShare { let op_fund_reward = proto::OperationalReward { amount: get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start), }; - let op_fund_share = proto::IotRewardShare { + proto::IotRewardShare { start_period: reward_period.start.encode_timestamp(), end_period: reward_period.end.encode_timestamp(), reward: Some(ProtoReward::OperationalReward(op_fund_reward)), - }; - operational_rewards.rewards.push(op_fund_share); - operational_rewards + } } } diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index cef4fdb4e..bb96dd215 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,4 +1,4 @@ -use crate::{reward_share::GatewayShares, reward_share::OperationalRewards, scheduler::Scheduler}; +use crate::{reward_share::operational_rewards, reward_share::GatewayShares, scheduler::Scheduler}; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; @@ -70,8 +70,6 @@ impl Rewarder { let gateway_reward_shares = GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?; - let operational_rewards = OperationalRewards::compute(&scheduler.reward_period); - for reward_share in gateway_reward_shares.into_iot_reward_shares(&scheduler.reward_period, iot_price) { @@ -81,13 +79,12 @@ impl Rewarder { // Await the returned oneshot to ensure we wrote the file .await??; } - for reward_share in operational_rewards.rewards { - self.rewards_sink - .write(reward_share, []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; - } + + self.rewards_sink + .write(operational_rewards::compute(&scheduler.reward_period), []) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; let written_files = self.rewards_sink.commit().await?.await??; // Write the rewards manifest for the completed period From 835662340ce216de5cfefd28870fb17bb2967411 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 11:30:04 -0400 Subject: [PATCH 6/9] Change new reward_type field in db to be an enum --- reward_index/migrations/5_add_type_to_index.sql | 4 +++- reward_index/src/indexer.rs | 16 +++------------- reward_index/src/reward_index.rs | 3 ++- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/reward_index/migrations/5_add_type_to_index.sql b/reward_index/migrations/5_add_type_to_index.sql index fb3f33555..e578872cc 100644 --- a/reward_index/migrations/5_add_type_to_index.sql +++ b/reward_index/migrations/5_add_type_to_index.sql @@ -1 +1,3 @@ -ALTER TABLE reward_index ADD reward_type text; +CREATE TYPE reward_type as enum('mobile', 'iot_gateway', 'iot_operational'); + +ALTER TABLE reward_index ADD reward_type reward_type; diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index 12767e672..156481dfe 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -10,7 +10,6 @@ use helium_proto::{ }; use poc_metrics::record_duration; use sqlx::{Pool, Postgres, Transaction}; -use std::fmt; use std::{collections::HashMap, str::FromStr}; use tokio::sync::mpsc::Receiver; @@ -21,23 +20,14 @@ pub struct Indexer { op_fund_key: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(sqlx::Type, Debug, Clone, PartialEq, Eq, Hash)] +#[sqlx(type_name = "reward_type", rename_all = "snake_case")] pub enum RewardType { Mobile, IotGateway, IotOperational, } -impl fmt::Display for RewardType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Mobile => f.write_str("mobile"), - Self::IotGateway => f.write_str("iot_gateway"), - Self::IotOperational => f.write_str("iot_operational"), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RewardKey { key: Vec, @@ -118,7 +108,7 @@ impl Indexer { &mut *txn, &reward_key.key, amount, - reward_key.reward_type.to_string(), + reward_key.reward_type, &manifest_time, ) .await?; diff --git a/reward_index/src/reward_index.rs b/reward_index/src/reward_index.rs index 2eed681f8..f073424d2 100644 --- a/reward_index/src/reward_index.rs +++ b/reward_index/src/reward_index.rs @@ -1,10 +1,11 @@ +use crate::indexer::RewardType; use chrono::{DateTime, Utc}; pub async fn insert<'c, E>( executor: E, address: &Vec, amount: u64, - reward_type: String, + reward_type: RewardType, timestamp: &DateTime, ) -> Result<(), sqlx::Error> where From 7958053a721955b77e05b263224318dd4e589e86 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 12:09:25 -0400 Subject: [PATCH 7/9] Change RewardType::Mobile to RewardType::MobileGateway --- reward_index/migrations/5_add_type_to_index.sql | 2 +- reward_index/src/indexer.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reward_index/migrations/5_add_type_to_index.sql b/reward_index/migrations/5_add_type_to_index.sql index e578872cc..6ef94512f 100644 --- a/reward_index/migrations/5_add_type_to_index.sql +++ b/reward_index/migrations/5_add_type_to_index.sql @@ -1,3 +1,3 @@ -CREATE TYPE reward_type as enum('mobile', 'iot_gateway', 'iot_operational'); +CREATE TYPE reward_type as enum('mobile_gateway', 'iot_gateway', 'iot_operational'); ALTER TABLE reward_index ADD reward_type reward_type; diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index 156481dfe..c92954871 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -23,7 +23,7 @@ pub struct Indexer { #[derive(sqlx::Type, Debug, Clone, PartialEq, Eq, Hash)] #[sqlx(type_name = "reward_type", rename_all = "snake_case")] pub enum RewardType { - Mobile, + MobileGateway, IotGateway, IotOperational, } @@ -123,7 +123,7 @@ impl Indexer { let share = RadioRewardShare::decode(msg)?; let key = RewardKey { key: share.hotspot_key, - reward_type: RewardType::Mobile, + reward_type: RewardType::MobileGateway, }; Ok((key, share.amount)) } From 0d43f6f6de8e418df9467fcb36aa08f4f380819b Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 12:11:46 -0400 Subject: [PATCH 8/9] Fix imports --- iot_verifier/src/rewarder.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index bb96dd215..4c78d7e3b 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,4 +1,7 @@ -use crate::{reward_share::operational_rewards, reward_share::GatewayShares, scheduler::Scheduler}; +use crate::{ + reward_share::{operational_rewards, GatewayShares}, + scheduler::Scheduler, +}; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; From 74d92fcd5e8cd756d57a3b9a91334640500a311d Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 10 Apr 2023 14:05:33 -0400 Subject: [PATCH 9/9] Update reward_index/src/reward_index.rs Co-authored-by: Matthew Plant --- reward_index/src/reward_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reward_index/src/reward_index.rs b/reward_index/src/reward_index.rs index f073424d2..57430cb5f 100644 --- a/reward_index/src/reward_index.rs +++ b/reward_index/src/reward_index.rs @@ -21,7 +21,7 @@ where insert into reward_index ( address, rewards, - last_reward + last_reward, reward_type ) values ($1, $2, $3, $4) on conflict(address) do update set