From 6cc51cbdc7bc2047528375bd3c4d483b83886ccd Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Tue, 16 Jul 2024 11:35:13 -0700 Subject: [PATCH] Prevent indexer from breaking when it encounters mobile_reward_v2 Both `mobile_reward` and `mobile_reward_v2` will be written out for 90 days. Within that 90 days, Indexer will switch to using `mobile_reward_v2` as the source of reward values, then `mobile_reward`s will stop being written. If both messages were used radios would be double rewarded. --- reward_index/src/indexer.rs | 46 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index c30e1438b..e4f6fb847 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -113,8 +113,9 @@ impl Indexer { let mut hotspot_rewards: HashMap = HashMap::new(); while let Some(msg) = reward_shares.try_next().await? { - let (key, amount) = self.extract_reward_share(&msg)?; - *hotspot_rewards.entry(key).or_default() += amount; + if let Some((key, amount)) = self.extract_reward_share(&msg)? { + *hotspot_rewards.entry(key).or_default() += amount; + }; } for (reward_key, amount) in hotspot_rewards { @@ -131,79 +132,86 @@ impl Indexer { Ok(()) } - fn extract_reward_share(&self, msg: &[u8]) -> Result<(RewardKey, u64)> { + fn extract_reward_share(&self, msg: &[u8]) -> Result> { match self.mode { settings::Mode::Mobile => { let share = MobileRewardShare::decode(msg)?; match share.reward { - Some(MobileReward::RadioReward(r)) => Ok(( + Some(MobileReward::RadioReward(r)) => Ok(Some(( RewardKey { key: PublicKeyBinary::from(r.hotspot_key).to_string(), reward_type: RewardType::MobileGateway, }, r.poc_reward, - )), - Some(MobileReward::GatewayReward(r)) => Ok(( + ))), + Some(MobileReward::RadioRewardV2(_)) => { + // NOTE: Eventually radio_reward_v2 will replace + // radio_reward as the source of rewards, then + // radio_reward will cease to be written. + Ok(None) + } + + Some(MobileReward::GatewayReward(r)) => Ok(Some(( RewardKey { key: PublicKeyBinary::from(r.hotspot_key).to_string(), reward_type: RewardType::MobileGateway, }, r.dc_transfer_reward, - )), - Some(MobileReward::SubscriberReward(r)) => Ok(( + ))), + Some(MobileReward::SubscriberReward(r)) => Ok(Some(( RewardKey { key: bs58::encode(&r.subscriber_id).into_string(), reward_type: RewardType::MobileSubscriber, }, r.discovery_location_amount, - )), + ))), Some(MobileReward::ServiceProviderReward(r)) => { ServiceProvider::try_from(r.service_provider_id) .map(|sp| { - Ok(( + Ok(Some(( RewardKey { key: sp.to_string(), reward_type: RewardType::MobileServiceProvider, }, r.amount, - )) + ))) }) .map_err(|_| anyhow!("failed to decode service provider"))? } - Some(MobileReward::UnallocatedReward(r)) => Ok(( + Some(MobileReward::UnallocatedReward(r)) => Ok(Some(( RewardKey { key: self.unallocated_reward_key.clone(), reward_type: RewardType::MobileUnallocated, }, r.amount, - )), + ))), _ => bail!("got an invalid reward share"), } } settings::Mode::Iot => { let share = IotRewardShare::decode(msg)?; match share.reward { - Some(IotReward::GatewayReward(r)) => Ok(( + Some(IotReward::GatewayReward(r)) => Ok(Some(( RewardKey { key: PublicKeyBinary::from(r.hotspot_key).to_string(), reward_type: RewardType::IotGateway, }, r.witness_amount + r.beacon_amount + r.dc_transfer_amount, - )), - Some(IotReward::OperationalReward(r)) => Ok(( + ))), + Some(IotReward::OperationalReward(r)) => Ok(Some(( RewardKey { key: self.op_fund_key.clone(), reward_type: RewardType::IotOperational, }, r.amount, - )), - Some(IotReward::UnallocatedReward(r)) => Ok(( + ))), + Some(IotReward::UnallocatedReward(r)) => Ok(Some(( RewardKey { key: self.unallocated_reward_key.clone(), reward_type: RewardType::IotUnallocated, }, r.amount, - )), + ))), _ => bail!("got an invalid iot reward share"), } }