Skip to content

Commit

Permalink
[4/n][gql-performance] Indexer handles partitioning per epoch's worth…
Browse files Browse the repository at this point in the history
… of tx_sequence_number (#18224)

## Description 

To partition an epoch per `tx_sequence_number`, the last
`tx_sequence_number` of an epoch is the `network_total_transactions`
from the last checkpoint in the current epoch. The first
`tx_sequence_number` is then the `network_total_transactions` from the
last checkpoint in the previous epoch + 1, or the
`network_total_transactions` of the current epoch -
`epoch_total_transactions` of the current epoch.

Since we have `network_total_transactions` from the checkpoint to
commit, and `epoch_total_transactions` since we fetch current epoch from
db, we can derive the range without additional db fetches.

However, this approach uncovered a bug of sorts: when we read the
network total txn of epoch X - 2 from the db on the indexing side, we
`select max(network_total_transactions) from checkpoints where epoch =
{epoch}`. However, we may not have actually committed epoch X-2's data
on the committer side. This is unlikely to happen in prod because it
requires the lag between indexing and committing to be >= one epoch.
Since an epoch today consists of ~80k checkpoints, it is quite
improbably for this to occur. However, in many of our tests it's
possible to have as little as one checkpoint and/ or transaction between
epochs, which brought this race condition to light.

To resolve, we can place the responsibility on `persist_epoch`. When we
get to a point to persist epoch X - 1, epoch X - 2's data must have been
indexed to db.

## Test plan 

Existing tests

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
wlmyng authored and emmazzz committed Jun 20, 2024
1 parent 18ca96b commit 7d5fe08
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ CREATE TABLE transactions (
-- number of successful commands in this transaction, bound by number of command
-- in a programmaable transaction.
success_command_count smallint NOT NULL,
PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number)
) PARTITION BY RANGE (checkpoint_sequence_number);
PRIMARY KEY (tx_sequence_number)
) PARTITION BY RANGE (tx_sequence_number);
CREATE TABLE transactions_partition_0 PARTITION OF transactions FOR VALUES FROM (0) TO (MAXVALUE);
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start_cp BIGINT, new_epoch_start_cp BIGINT)
CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start BIGINT, new_epoch_start BIGINT)
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE format('ALTER TABLE %I DETACH PARTITION %I_partition_%s', table_name, table_name, last_epoch);
EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start_cp, new_epoch_start_cp);
EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start_cp);
EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start, new_epoch_start);
EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start);
END;
$$;

Expand Down
9 changes: 8 additions & 1 deletion crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ where
0, //first_checkpoint_id
None,
),
network_total_transactions: 0,
}));
}

Expand All @@ -241,7 +242,12 @@ where
let event = bcs::from_bytes::<SystemEpochInfoEvent>(&epoch_event.contents)?;

// Now we just entered epoch X, we want to calculate the diff between
// TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2)
// TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on
// the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather
// than impose a wait on the reading side, however, we overwrite this on the persisting
// side, where we can guarantee that the previous epoch's checkpoints have been written to
// db.

let network_tx_count_prev_epoch = match system_state.epoch {
// If first epoch change, this number is 0
1 => Ok(0),
Expand All @@ -265,6 +271,7 @@ where
checkpoint_summary.sequence_number + 1, // first_checkpoint_id
Some(&event),
),
network_total_transactions: checkpoint_summary.network_total_transactions,
}))
}

Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ pub struct TransactionObjectChangesToCommit {
pub struct EpochToCommit {
pub last_epoch: Option<IndexedEpochInfo>,
pub new_epoch: IndexedEpochInfo,
pub network_total_transactions: u64,
}
25 changes: 24 additions & 1 deletion crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,12 +989,35 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
.checkpoint_db_commit_latency_epoch
.start_timer();
let epoch_id = epoch.new_epoch.epoch;

transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
if let Some(last_epoch) = &epoch.last_epoch {
let last_epoch_id = last_epoch.epoch;
let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
// Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because
// we are not guaranteed to have the latest data in db when this is set on
// indexer's chain-reading side. However, when we `persist_epoch`, the
// checkpoints from an epoch ago must have been indexed.
let previous_epoch_network_total_transactions = match epoch_id {
0 | 1 => 0,
_ => {
let prev_epoch_id = epoch_id - 2;
let result = checkpoints::table
.filter(checkpoints::epoch.eq(prev_epoch_id as i64))
.select(max(checkpoints::network_total_transactions))
.first::<Option<i64>>(conn)
.map(|o| o.unwrap_or(0))?;

result as u64
}
};

let epoch_total_transactions = epoch.network_total_transactions
- previous_epoch_network_total_transactions;

let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch);
last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64);
info!(last_epoch_id, "Persisting epoch end data: {:?}", last_epoch);
on_conflict_do_update!(
epochs::table,
Expand Down
63 changes: 57 additions & 6 deletions crates/sui-indexer/src/store/pg_partition_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use diesel::r2d2::R2D2Connection;
use diesel::sql_types::{BigInt, VarChar};
use diesel::{QueryableByName, RunQueryDsl};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::time::Duration;
use tracing::{error, info};

Expand Down Expand Up @@ -44,22 +44,32 @@ GROUP BY table_name;

pub struct PgPartitionManager<T: R2D2Connection + 'static> {
cp: ConnectionPool<T>,
partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
}

impl<T: R2D2Connection> Clone for PgPartitionManager<T> {
fn clone(&self) -> PgPartitionManager<T> {
Self {
cp: self.cp.clone(),
partition_strategies: self.partition_strategies.clone(),
}
}
}

#[derive(Clone, Copy)]
pub enum PgPartitionStrategy {
CheckpointSequenceNumber,
TxSequenceNumber,
}

#[derive(Clone, Debug)]
pub struct EpochPartitionData {
last_epoch: u64,
next_epoch: u64,
last_epoch_start_cp: u64,
next_epoch_start_cp: u64,
last_epoch_start_tx: u64,
next_epoch_start_tx: u64,
}

impl EpochPartitionData {
Expand All @@ -68,18 +78,33 @@ impl EpochPartitionData {
let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
let next_epoch = epoch.new_epoch.epoch;
let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id;

// Determining the tx_sequence_number range for the epoch partition differs from the
// checkpoint_sequence_number range, because the former is a sum of total transactions -
// this sum already addresses the off-by-one.
let next_epoch_start_tx = epoch.network_total_transactions;
let last_epoch_start_tx =
next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64;

Self {
last_epoch,
next_epoch,
last_epoch_start_cp,
next_epoch_start_cp,
last_epoch_start_tx,
next_epoch_start_tx,
}
}
}

impl<T: R2D2Connection> PgPartitionManager<T> {
pub fn new(cp: ConnectionPool<T>) -> Result<Self, IndexerError> {
let manager = Self { cp };
let mut partition_strategies = HashMap::new();
partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
let manager = Self {
cp,
partition_strategies,
};
let tables = manager.get_table_partitions()?;
info!(
"Found {} tables with partitions : [{:?}]",
Expand Down Expand Up @@ -116,6 +141,31 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
)
}

/// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to
/// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's
/// checkpoints today.
pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
self.partition_strategies
.get(table_name)
.copied()
.unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
}

pub fn determine_partition_range(
&self,
table_name: &str,
data: &EpochPartitionData,
) -> (u64, u64) {
match self.get_strategy(table_name) {
PgPartitionStrategy::CheckpointSequenceNumber => {
(data.last_epoch_start_cp, data.next_epoch_start_cp)
}
PgPartitionStrategy::TxSequenceNumber => {
(data.last_epoch_start_tx, data.next_epoch_start_tx)
}
}
}

pub fn advance_and_prune_epoch_partition(
&self,
table: String,
Expand All @@ -124,6 +174,7 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
data: &EpochPartitionData,
epochs_to_keep: Option<u64>,
) -> Result<(), IndexerError> {
let partition_range = self.determine_partition_range(&table, data);
if data.next_epoch == 0 {
tracing::info!("Epoch 0 partition has been created in the initial setup.");
return Ok(());
Expand All @@ -138,8 +189,8 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
.bind::<diesel::sql_types::Text, _>(table.clone())
.bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
.bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
.bind::<diesel::sql_types::BigInt, _>(data.last_epoch_start_cp as i64)
.bind::<diesel::sql_types::BigInt, _>(data.next_epoch_start_cp as i64),
.bind::<diesel::sql_types::BigInt, _>(partition_range.0 as i64)
.bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
conn,
)
},
Expand All @@ -150,13 +201,13 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
transactional_blocking_with_retry!(
&self.cp,
|conn| {
RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start_cp}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start_cp = data.next_epoch_start_cp as i64, next_epoch = data.next_epoch as i64)), conn)
RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start = partition_range.1 as i64, next_epoch = data.next_epoch as i64)), conn)
},
Duration::from_secs(10)
)?;
info!(
"Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
table, last_partition, data.next_epoch, data.last_epoch_start_cp
table, last_partition, data.next_epoch, partition_range.0
);

// prune old partitions beyond the retention period
Expand Down
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ impl IndexedCheckpoint {
}
}

/// Represents system state and summary info at the start and end of an epoch. Optional fields are
/// populated at epoch boundary, since they cannot be determined at the start of the epoch.
#[derive(Clone, Debug, Default)]
pub struct IndexedEpochInfo {
pub epoch: u64,
Expand Down Expand Up @@ -127,6 +129,9 @@ impl IndexedEpochInfo {
}
}

/// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X.
/// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions
/// that occurred in the epoch X-1.
pub fn from_end_of_epoch_data(
system_state_summary: &SuiSystemStateSummary,
last_checkpoint_summary: &CertifiedCheckpointSummary,
Expand Down

0 comments on commit 7d5fe08

Please sign in to comment.