Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use num_partitions to find specific stake rewards in partitions #1677

Merged
merged 9 commits into from
Jun 26, 2024
22 changes: 22 additions & 0 deletions rpc-client-api/src/custom_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub const JSON_RPC_SERVER_ERROR_TRANSACTION_SIGNATURE_LEN_MISMATCH: i64 = -32013
pub const JSON_RPC_SERVER_ERROR_BLOCK_STATUS_NOT_AVAILABLE_YET: i64 = -32014;
pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015;
pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016;
pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017;

#[derive(Error, Debug)]
pub enum RpcCustomError {
Expand Down Expand Up @@ -65,6 +66,12 @@ pub enum RpcCustomError {
UnsupportedTransactionVersion(u8),
#[error("MinContextSlotNotReached")]
MinContextSlotNotReached { context_slot: Slot },
#[error("EpochRewardsPeriodActive")]
EpochRewardsPeriodActive {
slot: Slot,
current_block_height: u64,
rewards_complete_block_height: u64,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -79,6 +86,13 @@ pub struct MinContextSlotNotReachedErrorData {
pub context_slot: Slot,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EpochRewardsPeriodActiveErrorData {
pub current_block_height: u64,
pub rewards_complete_block_height: u64,
}

impl From<EncodeError> for RpcCustomError {
fn from(err: EncodeError) -> Self {
match err {
Expand Down Expand Up @@ -206,6 +220,14 @@ impl From<RpcCustomError> for Error {
context_slot,
})),
},
RpcCustomError::EpochRewardsPeriodActive { slot, current_block_height, rewards_complete_block_height } => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE),
message: format!("Epoch rewards period still active at slot {slot}"),
data: Some(serde_json::json!(EpochRewardsPeriodActiveErrorData {
current_block_height,
rewards_complete_block_height,
})),
},
}
}
}
164 changes: 144 additions & 20 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use {
clock::{Slot, UnixTimestamp, MAX_PROCESSING_AGE},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_rewards_hasher::EpochRewardsHasher,
epoch_schedule::EpochSchedule,
exit::Exit,
feature_set,
Expand Down Expand Up @@ -93,8 +94,9 @@ use {
solana_transaction_status::{
map_inner_instructions, BlockEncodingOptions, ConfirmedBlock,
ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta,
EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, TransactionBinaryEncoding,
TransactionConfirmationStatus, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding,
EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, Rewards,
TransactionBinaryEncoding, TransactionConfirmationStatus, TransactionStatus,
UiConfirmedBlock, UiTransactionEncoding,
},
solana_vote_program::vote_state::{VoteState, MAX_LOCKOUT_HISTORY},
spl_token_2022::{
Expand Down Expand Up @@ -548,6 +550,34 @@ impl JsonRpcRequestProcessor {
})
}

fn filter_map_rewards<'a, F>(
rewards: &'a Option<Rewards>,
slot: Slot,
addresses: &'a [String],
reward_type_filter: &'a F,
) -> HashMap<String, (Reward, Slot)>
where
F: Fn(RewardType) -> bool,
{
Self::filter_rewards(rewards, reward_type_filter)
.filter(|reward| addresses.contains(&reward.pubkey))
.map(|reward| (reward.pubkey.clone(), (reward.clone(), slot)))
.collect()
}

fn filter_rewards<'a, F>(
rewards: &'a Option<Rewards>,
reward_type_filter: &'a F,
) -> impl Iterator<Item = &'a Reward>
where
F: Fn(RewardType) -> bool,
{
rewards
.iter()
.flatten()
.filter(move |reward| reward.reward_type.is_some_and(reward_type_filter))
}

pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
Expand Down Expand Up @@ -592,7 +622,22 @@ impl JsonRpcRequestProcessor {
slot: first_slot_in_epoch,
})?;

let Ok(Some(first_confirmed_block)) = self
// Determine if partitioned epoch rewards were enabled for the desired
// epoch
let bank = self.get_bank_with_config(context_config)?;

// DO NOT CLEAN UP with feature_set::enable_partitioned_epoch_reward
// This logic needs to be retained indefinitely to support historical
// rewards before and after feature activation.
let partitioned_epoch_reward_enabled_slot = bank
.feature_set
.activated_slot(&feature_set::enable_partitioned_epoch_reward::id());
let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot
.map(|slot| slot <= first_confirmed_block_in_epoch)
.unwrap_or(false);

// Get first block in the epoch
let Ok(Some(epoch_boundary_block)) = self
.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
Expand All @@ -605,30 +650,109 @@ impl JsonRpcRequestProcessor {
.into());
};

let addresses: Vec<String> = addresses
.into_iter()
.map(|pubkey| pubkey.to_string())
.collect();
// Collect rewards from first block in the epoch if partitioned epoch
// rewards not enabled, or address is a vote account
let mut reward_map: HashMap<String, (Reward, Slot)> = {
let addresses: Vec<String> =
addresses.iter().map(|pubkey| pubkey.to_string()).collect();
Self::filter_map_rewards(
&epoch_boundary_block.rewards,
first_confirmed_block_in_epoch,
&addresses,
&|reward_type| -> bool {
reward_type == RewardType::Voting
|| (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking)
},
)
};

let reward_hash: HashMap<String, Reward> = first_confirmed_block
.rewards
.unwrap_or_default()
.into_iter()
.filter_map(|reward| match reward.reward_type? {
RewardType::Staking | RewardType::Voting => addresses
.contains(&reward.pubkey)
.then(|| (reward.clone().pubkey, reward)),
_ => None,
})
.collect();
// Append stake account rewards from partitions if partitions epoch
// rewards is enabled
if partitioned_epoch_reward_enabled {
let num_partitions = epoch_boundary_block.num_reward_partitions.expect(
"epoch-boundary block should have num_reward_partitions after partitioned epoch \
rewards enabled",
);

let num_partitions = usize::try_from(num_partitions)
.expect("num_partitions should never exceed usize::MAX");
let hasher = EpochRewardsHasher::new(
num_partitions,
&Hash::from_str(&epoch_boundary_block.previous_blockhash)
.expect("UiConfirmedBlock::previous_blockhash should be properly formed"),
);
let mut partition_index_addresses: HashMap<usize, Vec<String>> = HashMap::new();
for address in addresses.iter() {
let address_string = address.to_string();
// Skip this address if (Voting) rewards were already found in
// the first block of the epoch
if !reward_map.contains_key(&address_string) {
let partition_index = hasher.clone().hash_address_to_partition(address);
partition_index_addresses
.entry(partition_index)
.and_modify(|list| list.push(address_string.clone()))
.or_insert(vec![address_string]);
}
}

let block_list = self
.get_blocks_with_limit(
first_confirmed_block_in_epoch + 1,
num_partitions,
Some(context_config),
)
.await?;

for (partition_index, addresses) in partition_index_addresses.iter() {
let slot = *block_list.get(*partition_index).ok_or_else(|| {
// If block_list.len() too short to contain
// partition_index, the epoch rewards period must be
// currently active.
let rewards_complete_block_height = epoch_boundary_block
.block_height
.map(|block_height| {
block_height
.saturating_add(num_partitions as u64)
.saturating_add(1)
})
.expect(
"every block after partitioned_epoch_reward_enabled should have a \
populated block_height",
);
RpcCustomError::EpochRewardsPeriodActive {
slot: bank.slot(),
current_block_height: bank.block_height(),
rewards_complete_block_height,
}
})?;

let Ok(Some(block)) = self
.get_block(
slot,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable { slot }.into());
};
t-nelson marked this conversation as resolved.
Show resolved Hide resolved

let index_reward_map = Self::filter_map_rewards(
&block.rewards,
slot,
addresses,
&|reward_type| -> bool { reward_type == RewardType::Staking },
);
reward_map.extend(index_reward_map);
}
}

let rewards = addresses
.iter()
.map(|address| {
if let Some(reward) = reward_hash.get(address) {
if let Some((reward, slot)) = reward_map.get(&address.to_string()) {
return Some(RpcInflationReward {
epoch,
effective_slot: first_confirmed_block_in_epoch,
effective_slot: *slot,
amount: reward.lamports.unsigned_abs(),
post_balance: reward.post_balance,
commission: reward.commission,
Expand Down
5 changes: 1 addition & 4 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
account::AccountSharedData,
clock::Slot,
epoch_schedule::EpochSchedule,
feature_set,
native_token::sol_to_lamports,
pubkey::Pubkey,
rent::Rent,
Expand Down Expand Up @@ -352,9 +351,7 @@ fn main() {
exit(1);
});

let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();
// Remove this when client support is ready for the enable_partitioned_epoch_reward feature
features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id());
let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();

if TestValidatorGenesis::ledger_exists(&ledger_path) {
for (name, long) in &[
Expand Down