Skip to content

Commit

Permalink
[Indexer] Add json serialized SuiSystemStateSummary to epochs table (#…
Browse files Browse the repository at this point in the history
…19428)

## Description 

The existing system_state column is a BCS serialization of a JSON-RPC
type, which is not evolvable.
This PR adds a new column that is the JSON serialization of
SuiSystemStateSummary without BCS.

It also fixes two bugs along the way:
1. We were storing the wrong version of the system state, offsetting by
1 epoch. This PR fixes it by storing the right version of the json
column at the beginning of the epoch instead of at the end.
2. We were ignoring a deserialization error and swallow it in some
place. Made that throw instead.

## Test plan 

Added a debug assert to see that this column is populated with the
correct epoch.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Sep 19, 2024
1 parent 5cf3b93 commit 0ae6514
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE epochs DROP COLUMN system_state_summary_json;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE epochs ADD COLUMN system_state_summary_json JSONB;
15 changes: 7 additions & 8 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use sui_types::messages_checkpoint::{
};
use sui_types::object::Object;
use sui_types::object::Owner;
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait};
use sui_types::transaction::TransactionDataAPI;

Expand Down Expand Up @@ -186,12 +185,12 @@ impl CheckpointHandler {
// Genesis epoch
if *checkpoint_summary.sequence_number() == 0 {
info!("Processing genesis epoch");
let system_state: SuiSystemStateSummary =
let system_state_summary =
get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
return Ok(Some(EpochToCommit {
last_epoch: None,
new_epoch: IndexedEpochInfo::from_new_system_state_summary(
system_state,
system_state_summary,
0, //first_checkpoint_id
None,
),
Expand All @@ -204,7 +203,7 @@ impl CheckpointHandler {
return Ok(None);
}

let system_state: SuiSystemStateSummary =
let system_state_summary =
get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();

let epoch_event = transactions
Expand All @@ -228,11 +227,11 @@ impl CheckpointHandler {
// side, where we can guarantee that the previous epoch's checkpoints have been written to
// db.

let network_tx_count_prev_epoch = match system_state.epoch {
let network_tx_count_prev_epoch = match system_state_summary.epoch {
// If first epoch change, this number is 0
1 => Ok(0),
_ => {
let last_epoch = system_state.epoch - 2;
let last_epoch = system_state_summary.epoch - 2;
state
.get_network_total_transactions_by_end_of_epoch(last_epoch)
.await
Expand All @@ -241,13 +240,13 @@ impl CheckpointHandler {

Ok(Some(EpochToCommit {
last_epoch: Some(IndexedEpochInfo::from_end_of_epoch_data(
&system_state,
system_state_summary.clone(),
checkpoint_summary,
&event,
network_tx_count_prev_epoch,
)),
new_epoch: IndexedEpochInfo::from_new_system_state_summary(
system_state,
system_state_summary,
checkpoint_summary.sequence_number + 1, // first_checkpoint_id
Some(&event),
),
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,26 @@ impl IndexerReader {
None => return Err(IndexerError::InvalidArgumentError("Invalid epoch".into())),
};

let system_state: SuiSystemStateSummary = bcs::from_bytes(&stored_epoch.system_state)
.map_err(|_| {
let system_state_summary: SuiSystemStateSummary =
bcs::from_bytes(&stored_epoch.system_state).map_err(|_| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"Failed to deserialize `system_state` for epoch {:?}",
epoch,
))
})?;
Ok(system_state)
#[cfg(debug_assertions)]
{
let correct_system_state_summary: SuiSystemStateSummary =
serde_json::from_value(stored_epoch.system_state_summary_json.clone().unwrap())
.unwrap();
// The old system state summary is incorrect and its epoch will be offset by 1.
// This is fixed in the new system state summary. Assert it here to double check.
assert_eq!(
correct_system_state_summary.epoch,
stored_epoch.epoch as u64
);
}
Ok(system_state_summary)
}

async fn get_checkpoint_from_db(
Expand Down
17 changes: 14 additions & 3 deletions crates/sui-indexer/src/models/epoch.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::{Insertable, Queryable, Selectable};

use crate::schema::epochs;
use crate::types::IndexedEpochInfo;
use crate::{errors::IndexerError, schema::feature_flags, schema::protocol_configs};
use diesel::{Insertable, Queryable, Selectable};
use sui_json_rpc_types::{EndOfEpochInfo, EpochInfo};
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;

Expand All @@ -31,6 +30,7 @@ pub struct StoredEpochInfo {
pub total_stake_rewards_distributed: Option<i64>,
pub leftover_storage_fund_inflow: Option<i64>,
pub epoch_commitments: Option<Vec<u8>>,
pub system_state_summary_json: Option<serde_json::Value>,
}

#[derive(Queryable, Insertable, Debug, Clone, Default)]
Expand Down Expand Up @@ -82,6 +82,9 @@ impl StoredEpochInfo {
pub fn from_epoch_beginning_info(e: &IndexedEpochInfo) -> Self {
Self {
epoch: e.epoch as i64,
system_state_summary_json: Some(
serde_json::to_value(e.system_state_summary.clone()).unwrap(),
),
first_checkpoint_id: e.first_checkpoint_id as i64,
epoch_start_timestamp: e.epoch_start_timestamp as i64,
reference_gas_price: e.reference_gas_price as i64,
Expand All @@ -95,7 +98,10 @@ impl StoredEpochInfo {
pub fn from_epoch_end_info(e: &IndexedEpochInfo) -> Self {
Self {
epoch: e.epoch as i64,
system_state: e.system_state.clone(),
// TODO: Deprecate this.
system_state: bcs::to_bytes(&e.system_state_summary.clone()).unwrap(),
// At epoch end the system state would be the state of the next epoch, so we ignore it.
system_state_summary_json: None,
epoch_total_transactions: e.epoch_total_transactions.map(|v| v as i64),
last_checkpoint_id: e.last_checkpoint_id.map(|v| v as i64),
epoch_end_timestamp: e.epoch_end_timestamp.map(|v| v as i64),
Expand Down Expand Up @@ -153,6 +159,11 @@ impl TryFrom<StoredEpochInfo> for EpochInfo {
fn try_from(value: StoredEpochInfo) -> Result<Self, Self::Error> {
let epoch = value.epoch as u64;
let end_of_epoch_info = (&value).into();
// FIXME: We should not swallow the error here.
// However currently there is a bug in the system_state column where it's always
// off-by-one, and hence there will be a period of time when
// this conversion will fail.
// We should fix this once we replace it with the new json column.
let system_state: Option<SuiSystemStateSummary> = bcs::from_bytes(&value.system_state)
.map_err(|_| {
IndexerError::PersistentStorageDataCorruptionError(format!(
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ diesel::table! {
total_stake_rewards_distributed -> Nullable<Int8>,
leftover_storage_fund_inflow -> Nullable<Int8>,
epoch_commitments -> Nullable<Bytea>,
system_state_summary_json -> Nullable<Jsonb>,
}
}

Expand Down
22 changes: 16 additions & 6 deletions crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl IndexedCheckpoint {

/// Represents system state and summary info at the start and end of an epoch. Optional fields are
/// populated at epoch boundary, since they cannot be determined at the start of the epoch.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct IndexedEpochInfo {
pub epoch: u64,
pub first_checkpoint_id: u64,
Expand All @@ -101,7 +101,7 @@ pub struct IndexedEpochInfo {
pub protocol_version: u64,
pub total_stake: u64,
pub storage_fund_balance: u64,
pub system_state: Vec<u8>,
pub system_state_summary: SuiSystemStateSummary,
pub epoch_total_transactions: Option<u64>,
pub last_checkpoint_id: Option<u64>,
pub epoch_end_timestamp: Option<u64>,
Expand Down Expand Up @@ -132,16 +132,26 @@ impl IndexedEpochInfo {
// the event is optional b/c no such event for the first epoch.
total_stake: event.map(|e| e.total_stake).unwrap_or(0),
storage_fund_balance: event.map(|e| e.storage_fund_balance).unwrap_or(0),
system_state: bcs::to_bytes(&new_system_state_summary).unwrap(),
..Default::default()
system_state_summary: new_system_state_summary,
epoch_total_transactions: None,
last_checkpoint_id: None,
epoch_end_timestamp: None,
storage_fund_reinvestment: None,
storage_charge: None,
storage_rebate: None,
stake_subsidy_amount: None,
total_gas_fees: None,
total_stake_rewards_distributed: None,
leftover_storage_fund_inflow: None,
epoch_commitments: None,
}
}

/// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X.
/// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions
/// that occurred in the epoch X-1.
pub fn from_end_of_epoch_data(
system_state_summary: &SuiSystemStateSummary,
system_state_summary: SuiSystemStateSummary,
last_checkpoint_summary: &CertifiedCheckpointSummary,
event: &SystemEpochInfoEvent,
network_total_tx_num_at_last_epoch_end: u64,
Expand All @@ -165,7 +175,7 @@ impl IndexedEpochInfo {
.end_of_epoch_data
.as_ref()
.map(|e| e.epoch_commitments.clone()),
system_state: bcs::to_bytes(system_state_summary).unwrap(),
system_state_summary,
// The following felds will not and shall not be upserted
// into DB. We have them below to make compiler and diesel happy
first_checkpoint_id: 0,
Expand Down

0 comments on commit 0ae6514

Please sign in to comment.