Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to support IotRewardShare and multiple sub reward types. Add support for operation fund rewards #429

Merged
merged 9 commits into from
Apr 10, 2023
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub const REWARD_MANIFEST: &str = "reward_manifest";
pub const IOT_PACKET_REPORT: &str = "packetreport";
pub const IOT_VALID_PACKET: &str = "iot_valid_packet";
pub const INVALID_PACKET: &str = "invalid_packet";
pub const GATEWAY_REWARD_SHARE: &str = "gateway_reward_share";
pub const IOT_REWARD_SHARE: &str = "iot_reward_share";
pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ingest_report";
pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session";
pub const PRICE_REPORT: &str = "price_report";
Expand Down Expand Up @@ -143,7 +143,7 @@ pub enum FileType {
IotPacketReport,
IotValidPacket,
InvalidPacket,
GatewayRewardShare,
IotRewardShare,
DataTransferSessionIngestReport,
ValidDataTransferSession,
PriceReport,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl fmt::Display for FileType {
Self::IotPacketReport => IOT_PACKET_REPORT,
Self::IotValidPacket => IOT_VALID_PACKET,
Self::InvalidPacket => INVALID_PACKET,
Self::GatewayRewardShare => GATEWAY_REWARD_SHARE,
Self::IotRewardShare => IOT_REWARD_SHARE,
Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT,
Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION,
Self::PriceReport => PRICE_REPORT,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl FileType {
Self::IotPacketReport => IOT_PACKET_REPORT,
Self::IotValidPacket => IOT_VALID_PACKET,
Self::InvalidPacket => INVALID_PACKET,
Self::GatewayRewardShare => GATEWAY_REWARD_SHARE,
Self::IotRewardShare => IOT_REWARD_SHARE,
Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT,
Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION,
Self::PriceReport => PRICE_REPORT,
Expand Down Expand Up @@ -236,7 +236,7 @@ impl FromStr for FileType {
IOT_PACKET_REPORT => Self::IotPacketReport,
IOT_VALID_PACKET => Self::IotValidPacket,
INVALID_PACKET => Self::InvalidPacket,
GATEWAY_REWARD_SHARE => Self::GatewayRewardShare,
IOT_REWARD_SHARE => Self::IotRewardShare,
DATA_TRANSFER_SESSION_INGEST_REPORT => Self::DataTransferSessionIngestReport,
VALID_DATA_TRANSFER_SESSION => Self::ValidDataTransferSession,
PRICE_REPORT => Self::PriceReport,
Expand Down
7 changes: 4 additions & 3 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug, clap::Parser)]
#[clap(version = env!("CARGO_PKG_VERSION"))]
#[clap(about = "Helium POC IOT Verifier")]

pub struct Cli {
/// Optional configuration file to use. If present the toml file at the
/// given path will be loaded. Environemnt variables can override the
Expand Down Expand Up @@ -93,8 +94,8 @@ impl Server {

let store_base_path = std::path::Path::new(&settings.cache);
// Gateway reward shares sink
let (gateway_rewards_sink, mut gateway_rewards_server) = file_sink::FileSinkBuilder::new(
FileType::GatewayRewardShare,
let (rewards_sink, mut gateway_rewards_server) = file_sink::FileSinkBuilder::new(
FileType::IotRewardShare,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_gateway_reward_shares"),
)
Expand All @@ -116,7 +117,7 @@ impl Server {

let rewarder = Rewarder {
pool: pool.clone(),
gateway_rewards_sink,
rewards_sink,
reward_manifests_sink,
reward_period_hours: settings.rewards,
reward_offset: settings.reward_offset_duration(),
Expand Down
157 changes: 96 additions & 61 deletions iot_verifier/src/reward_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use file_store::{iot_packet::IotValidPacket, iot_valid_poc::IotPoc, traits::Time
use futures::stream::TryStreamExt;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_lora as proto;

use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward;
use lazy_static::lazy_static;
use rust_decimal::prelude::*;
use rust_decimal_macros::dec;
Expand All @@ -23,6 +23,8 @@ lazy_static! {
static ref WITNESS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.24);
// Data transfer is allocated 50% of daily rewards
static ref DATA_TRANSFER_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.50);
// Operations fund is allocated 7% of daily rewards
static ref OPERATIONS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07);
// dc remainer distributed at ration of 4:1 in favour of witnesses
// ie WITNESS_REWARDS_PER_DAY_PERCENT:BEACON_REWARDS_PER_DAY_PERCENT
static ref WITNESS_DC_REMAINER_PERCENT: Decimal = dec!(0.80);
Comment on lines 23 to 30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these can be constants instead of lazy statics

Expand Down Expand Up @@ -62,6 +64,16 @@ fn get_scheduled_dc_tokens(duration: Duration) -> Decimal {
)
}

fn get_scheduled_ops_fund_tokens(duration: Duration) -> u64 {
get_tokens_by_duration(
*REWARDS_PER_DAY * *OPERATIONS_REWARDS_PER_DAY_PERCENT,
duration,
)
.round_dp_with_strategy(0, RoundingStrategy::ToZero)
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
.to_u64()
.unwrap_or(0)
}

#[derive(sqlx::FromRow)]
pub struct GatewayPocShare {
pub hotspot_key: PublicKeyBinary,
Expand Down Expand Up @@ -291,11 +303,11 @@ impl GatewayShares {
Ok(())
}

pub fn into_gateway_reward_shares(
pub fn into_iot_reward_shares(
self,
reward_period: &'_ Range<DateTime<Utc>>,
iot_price: Decimal,
) -> impl Iterator<Item = proto::GatewayRewardShare> + '_ {
) -> impl Iterator<Item = proto::IotRewardShare> + '_ {
// the total number of shares for beacons, witnesses and data transfer
// dc shares here is the sum of all spent data transfer DC this epoch
let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares();
Expand Down Expand Up @@ -330,30 +342,46 @@ impl GatewayShares {
// compute the awards per hotspot
self.shares
.into_iter()
.map(
move |(hotspot_key, reward_shares)| proto::GatewayRewardShare {
hotspot_key: hotspot_key.into(),
beacon_amount: compute_rewards(
beacon_rewards_per_share,
reward_shares.beacon_shares,
),
witness_amount: compute_rewards(
witness_rewards_per_share,
reward_shares.witness_shares,
),
dc_transfer_amount: compute_rewards(
dc_transfer_rewards_per_share,
reward_shares.dc_shares,
),
start_period: reward_period.start.encode_timestamp(),
end_period: reward_period.end.encode_timestamp(),
},
)
.map(move |(hotspot_key, reward_shares)| proto::GatewayReward {
hotspot_key: hotspot_key.into(),
beacon_amount: compute_rewards(
beacon_rewards_per_share,
reward_shares.beacon_shares,
),
witness_amount: compute_rewards(
witness_rewards_per_share,
reward_shares.witness_shares,
),
dc_transfer_amount: compute_rewards(
dc_transfer_rewards_per_share,
reward_shares.dc_shares,
),
})
.filter(|reward_share| {
reward_share.beacon_amount > 0
|| reward_share.witness_amount > 0
|| reward_share.dc_transfer_amount > 0
})
.map(|gateway_reward| proto::IotRewardShare {
start_period: reward_period.start.encode_timestamp(),
end_period: reward_period.end.encode_timestamp(),
reward: Some(ProtoReward::GatewayReward(gateway_reward)),
})
}
}

pub mod operational_rewards {
use super::*;

pub fn compute(reward_period: &Range<DateTime<Utc>>) -> proto::IotRewardShare {
let op_fund_reward = proto::OperationalReward {
amount: get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start),
};
proto::IotRewardShare {
start_period: reward_period.start.encode_timestamp(),
end_period: reward_period.end.encode_timestamp(),
reward: Some(ProtoReward::OperationalReward(op_fund_reward)),
}
}
}

Expand Down Expand Up @@ -424,6 +452,16 @@ mod test {
}
}

#[test]
fn test_non_gateway_reward_shares() {
let epoch_duration = Duration::hours(1);
let total_tokens_for_period = *REWARDS_PER_DAY / dec!(24);
println!("total_tokens_for_period: {total_tokens_for_period}");

let operation_tokens_for_period = get_scheduled_ops_fund_tokens(epoch_duration);
assert_eq!(479452054794, operation_tokens_for_period);
}

#[test]
// test reward distribution where there is a fixed dc spend per gateway
// with the total dc spend across all gateways being significantly lower than the
Expand Down Expand Up @@ -499,21 +537,20 @@ mod test {
gw6.clone(),
reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend),
); // 0.0150, 0.0350
let gw_shares = GatewayShares { shares };

let rewards: HashMap<PublicKeyBinary, proto::GatewayRewardShare> = gw_shares
.into_gateway_reward_shares(&reward_period, iot_price)
.map(|reward| {
(
reward
.hotspot_key
.clone()
.try_into()
.expect("failed to decode hotspot_key"),
reward,
)
})
let gw_shares = GatewayShares { shares };
let mut rewards: HashMap<PublicKeyBinary, proto::GatewayReward> = HashMap::new();
let gw_reward_shares: Vec<proto::IotRewardShare> = gw_shares
.into_iot_reward_shares(&reward_period, iot_price)
.collect();
for reward in gw_reward_shares {
if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward {
rewards.insert(
gateway_reward.hotspot_key.clone().try_into().unwrap(),
gateway_reward,
);
}
}

let gw1_rewards = rewards
.get(&gw1)
Expand Down Expand Up @@ -679,21 +716,20 @@ mod test {
gw6.clone(),
reward_shares_in_dec(dec!(150), dec!(350), total_dc_to_spend * dec!(0.8)),
); // 0.0150, 0.0350
let gw_shares = GatewayShares { shares };

let rewards: HashMap<PublicKeyBinary, proto::GatewayRewardShare> = gw_shares
.into_gateway_reward_shares(&reward_period, iot_price)
.map(|reward| {
(
reward
.hotspot_key
.clone()
.try_into()
.expect("failed to decode hotspot_key"),
reward,
)
})
let gw_shares = GatewayShares { shares };
let mut rewards: HashMap<PublicKeyBinary, proto::GatewayReward> = HashMap::new();
let gw_reward_shares: Vec<proto::IotRewardShare> = gw_shares
.into_iot_reward_shares(&reward_period, iot_price)
.collect();
for reward in gw_reward_shares {
if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward {
rewards.insert(
gateway_reward.hotspot_key.clone().try_into().unwrap(),
gateway_reward,
);
}
}

let gw1_rewards = rewards
.get(&gw1)
Expand Down Expand Up @@ -840,21 +876,20 @@ mod test {
gw6.clone(),
reward_shares_in_dec(dec!(150), dec!(350), total_dc_to_spend * dec!(0.2)),
); // 0.0150, 0.0350
let gw_shares = GatewayShares { shares };

let rewards: HashMap<PublicKeyBinary, proto::GatewayRewardShare> = gw_shares
.into_gateway_reward_shares(&reward_period, iot_price)
.map(|reward| {
(
reward
.hotspot_key
.clone()
.try_into()
.expect("failed to decode hotspot_key"),
reward,
)
})
let gw_shares = GatewayShares { shares };
let mut rewards: HashMap<PublicKeyBinary, proto::GatewayReward> = HashMap::new();
let gw_reward_shares: Vec<proto::IotRewardShare> = gw_shares
.into_iot_reward_shares(&reward_period, iot_price)
.collect();
for reward in gw_reward_shares {
if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward {
rewards.insert(
gateway_reward.hotspot_key.clone().try_into().unwrap(),
gateway_reward,
);
}
}

let gw1_rewards = rewards
.get(&gw1)
Expand Down
19 changes: 13 additions & 6 deletions iot_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{reward_share::GatewayShares, scheduler::Scheduler};
use crate::{reward_share::operational_rewards, reward_share::GatewayShares, scheduler::Scheduler};
bbalser marked this conversation as resolved.
Show resolved Hide resolved
use chrono::{DateTime, Duration, TimeZone, Utc};
use db_store::meta;
use file_store::{file_sink, traits::TimestampEncode};
Expand All @@ -10,7 +10,7 @@ use tokio::time::sleep;

pub struct Rewarder {
pub pool: Pool<Postgres>,
pub gateway_rewards_sink: file_sink::FileSinkClient,
pub rewards_sink: file_sink::FileSinkClient,
pub reward_manifests_sink: file_sink::FileSinkClient,
pub reward_period_hours: i64,
pub reward_offset: Duration,
Expand Down Expand Up @@ -67,19 +67,26 @@ impl Rewarder {
scheduler: &Scheduler,
iot_price: Decimal,
) -> anyhow::Result<()> {
let reward_shares = GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?;
let gateway_reward_shares =
GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?;

for reward_share in
reward_shares.into_gateway_reward_shares(&scheduler.reward_period, iot_price)
gateway_reward_shares.into_iot_reward_shares(&scheduler.reward_period, iot_price)
{
self.gateway_rewards_sink
self.rewards_sink
.write(reward_share, [])
.await?
// Await the returned oneshot to ensure we wrote the file
.await??;
}

let written_files = self.gateway_rewards_sink.commit().await?.await??;
self.rewards_sink
.write(operational_rewards::compute(&scheduler.reward_period), [])
.await?
// Await the returned oneshot to ensure we wrote the file
.await??;

let written_files = self.rewards_sink.commit().await?.await??;
// Write the rewards manifest for the completed period
self.reward_manifests_sink
.write(
Expand Down
3 changes: 3 additions & 0 deletions reward_index/migrations/5_add_type_to_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TYPE reward_type as enum('mobile', 'iot_gateway', 'iot_operational');
bbalser marked this conversation as resolved.
Show resolved Hide resolved

ALTER TABLE reward_index ADD reward_type reward_type;
Loading