Skip to content

Commit

Permalink
Merge pull request #493 from helium/map/add-dc-transfer-reward-msg
Browse files Browse the repository at this point in the history
Add GatewayReward output from verifier
  • Loading branch information
jeffgrunewald authored Apr 28, 2023
2 parents 79dcd93 + 2990676 commit 45fecbb
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mobile_verifier/src/cli/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Cmd {
let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
let transfer_rewards = TransferRewards::empty();
for (reward, _) in reward_shares.into_rewards(&transfer_rewards, &epoch) {
for (reward, _) in reward_shares.into_rewards(transfer_rewards.reward_sum(), &epoch) {
total_rewards += reward.amount;
*owner_rewards
.entry(PublicKey::try_from(reward.owner_key)?)
Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
heartbeats::Heartbeats,
reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares, TransferRewards},
reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares},
speedtests::{Average, SpeedtestAverages},
Settings,
};
use anyhow::Result;
use chrono::{DateTime, NaiveDateTime, Utc};
use helium_crypto::PublicKey;
use rust_decimal::Decimal;
use serde_json::json;
use std::collections::HashMap;

Expand Down Expand Up @@ -42,8 +43,7 @@ impl Cmd {

let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
let transfer_rewards = TransferRewards::empty();
for (reward, _) in reward_shares.into_rewards(&transfer_rewards, &epoch) {
for (reward, _) in reward_shares.into_rewards(Decimal::ZERO, &epoch) {
total_rewards += reward.amount;
*owner_rewards
.entry(PublicKey::try_from(reward.owner_key)?)
Expand Down
65 changes: 46 additions & 19 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ impl TransferRewards {
self.reward_scale
}

pub fn reward(&self, hotspot: &PublicKeyBinary) -> Decimal {
pub fn reward_sum(&self) -> Decimal {
self.reward_sum
}

#[cfg(test)]
fn reward(&self, hotspot: &PublicKeyBinary) -> Decimal {
self.rewards.get(hotspot).copied().unwrap_or(Decimal::ZERO) * self.reward_scale
}

Expand Down Expand Up @@ -107,6 +112,34 @@ impl TransferRewards {
reward_sum: reward_sum * reward_scale,
}
}

pub fn into_rewards(
self,
epoch: &'_ Range<DateTime<Utc>>,
) -> impl Iterator<Item = proto::MobileRewardShare> + '_ {
let Self {
reward_scale,
rewards,
..
} = self;
let start_period = epoch.start.encode_timestamp();
let end_period = epoch.end.encode_timestamp();
rewards
.into_iter()
.map(move |(hotspot_key, reward)| proto::MobileRewardShare {
start_period,
end_period,
reward: Some(proto::mobile_reward_share::Reward::GatewayReward(
proto::GatewayReward {
hotspot_key: hotspot_key.into(),
dc_transfer_reward: (reward * reward_scale)
.round_dp_with_strategy(0, RoundingStrategy::ToZero)
.to_u64()
.unwrap_or(0),
},
)),
})
}
}

/// Returns the equivalent amount of Mobile bones for a specified amount of Data Credits
Expand Down Expand Up @@ -169,30 +202,28 @@ impl PocShares {
})
}

pub fn into_rewards<'a>(
pub fn into_rewards(
self,
transfer_rewards: &'a TransferRewards,
epoch: &'a Range<DateTime<Utc>>,
) -> impl Iterator<Item = (proto::RadioRewardShare, proto::MobileRewardShare)> + 'a {
transfer_rewards_sum: Decimal,
epoch: &'_ Range<DateTime<Utc>>,
) -> impl Iterator<Item = (proto::RadioRewardShare, proto::MobileRewardShare)> + '_ {
let total_shares = self.total_shares();
let available_poc_rewards = get_scheduled_tokens_for_poc_and_dc(epoch.end - epoch.start)
- transfer_rewards.reward_sum;
let available_poc_rewards =
get_scheduled_tokens_for_poc_and_dc(epoch.end - epoch.start) - transfer_rewards_sum;
let poc_rewards_per_share = available_poc_rewards / total_shares;
let start_period = epoch.start.encode_timestamp();
let end_period = epoch.end.encode_timestamp();
self.hotspot_shares.into_iter().flat_map(
move |(hotspot_key, RadioShares { radio_shares })| {
let mut dc_transfer_reward = Some(transfer_rewards.reward(&hotspot_key));
radio_shares
.into_iter()
.map(move |(cbsd_id, amount)| {
let start_period = epoch.start.encode_timestamp();
let end_period = epoch.end.encode_timestamp();
let poc_reward = poc_rewards_per_share * amount;
let dc_transfer_reward = dc_transfer_reward.take().unwrap_or(Decimal::ZERO);
let hotspot_key: Vec<u8> = hotspot_key.clone().into();
let radio_reward_share = proto::RadioRewardShare {
owner_key: Vec::new(),
cbsd_id: cbsd_id.clone(),
amount: (poc_reward + dc_transfer_reward)
amount: poc_reward
.round_dp_with_strategy(0, RoundingStrategy::ToZero)
.to_u64()
.unwrap_or(0),
Expand All @@ -207,14 +238,11 @@ impl PocShares {
proto::RadioReward {
hotspot_key,
cbsd_id,
dc_transfer_reward: dc_transfer_reward
.round_dp_with_strategy(0, RoundingStrategy::ToZero)
.to_u64()
.unwrap_or(0),
poc_reward: poc_reward
.round_dp_with_strategy(0, RoundingStrategy::ToZero)
.to_u64()
.unwrap_or(0),
..Default::default()
},
)),
};
Expand Down Expand Up @@ -724,10 +752,9 @@ mod test {
// calculate the rewards for the sample group
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let epoch = (now - Duration::hours(1))..now;
let transfer_rewards = TransferRewards::empty();
for (_, mobile_reward) in PocShares::aggregate(heartbeats, speedtest_avgs)
.await
.into_rewards(&transfer_rewards, &epoch)
.into_rewards(Decimal::ZERO, &epoch)
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -809,7 +836,7 @@ mod test {
let epoch = now - Duration::hours(1)..now;
let transfer_rewards = TransferRewards::empty();
let expected_hotspot = gw1;
for (_, mobile_reward) in owner_shares.into_rewards(&transfer_rewards, &epoch) {
for (_, mobile_reward) in owner_shares.into_rewards(transfer_rewards.reward_sum(), &epoch) {
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
_ => unreachable!(),
Expand Down
10 changes: 9 additions & 1 deletion mobile_verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl VerifierDaemon {
metrics::gauge!("data_transfer_rewards_scale", scale);

for (radio_reward_share, mobile_reward_share) in
poc_rewards.into_rewards(&transfer_rewards, &scheduler.reward_period)
poc_rewards.into_rewards(transfer_rewards.reward_sum(), &scheduler.reward_period)
{
self.radio_rewards
.write(radio_reward_share, [])
Expand All @@ -168,6 +168,14 @@ impl VerifierDaemon {
.await??;
}

for mobile_reward_share in transfer_rewards.into_rewards(&scheduler.reward_period) {
self.mobile_rewards
.write(mobile_reward_share, [])
.await?
// Await the returned one shot to ensure that we wrote the file
.await??;
}

let written_files = self.mobile_rewards.commit().await?.await??;

let mut transaction = self.pool.begin().await?;
Expand Down
10 changes: 9 additions & 1 deletion reward_index/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Indexer {
#[derive(sqlx::Type, Debug, Clone, PartialEq, Eq, Hash)]
#[sqlx(type_name = "reward_type", rename_all = "snake_case")]
pub enum RewardType {
MobileRadio,
MobileGateway,
IotGateway,
IotOperational,
Expand Down Expand Up @@ -126,11 +127,18 @@ impl Indexer {
let share = MobileRewardShare::decode(msg)?;
match share.reward {
Some(MobileReward::RadioReward(r)) => Ok((
RewardKey {
key: PublicKeyBinary::from(r.hotspot_key).to_string(),
reward_type: RewardType::MobileRadio,
},
r.poc_reward,
)),
Some(MobileReward::GatewayReward(r)) => Ok((
RewardKey {
key: PublicKeyBinary::from(r.hotspot_key).to_string(),
reward_type: RewardType::MobileGateway,
},
r.dc_transfer_reward + r.poc_reward,
r.dc_transfer_reward,
)),
_ => bail!("got an invalid mobile reward share"),
}
Expand Down

0 comments on commit 45fecbb

Please sign in to comment.