From 7d5fe088edc5c140afe700a205dcc9d242c06e95 Mon Sep 17 00:00:00 2001 From: wlmyng <127570466+wlmyng@users.noreply.github.com> Date: Tue, 18 Jun 2024 12:43:08 -0700 Subject: [PATCH] [4/n][gql-performance] Indexer handles partitioning per epoch's worth of tx_sequence_number (#18224) ## Description To partition an epoch per `tx_sequence_number`, the last `tx_sequence_number` of an epoch is the `network_total_transactions` from the last checkpoint in the current epoch. The first `tx_sequence_number` is then the `network_total_transactions` from the last checkpoint in the previous epoch + 1, or the `network_total_transactions` of the current epoch - `epoch_total_transactions` of the current epoch. Since we have `network_total_transactions` from the checkpoint to commit, and `epoch_total_transactions` since we fetch current epoch from db, we can derive the range without additional db fetches. However, this approach uncovered a bug of sorts: when we read the network total txn of epoch X - 2 from the db on the indexing side, we `select max(network_total_transactions) from checkpoints where epoch = {epoch}`. However, we may not have actually committed epoch X-2's data on the committer side. This is unlikely to happen in prod because it requires the lag between indexing and committing to be >= one epoch. Since an epoch today consists of ~80k checkpoints, it is quite improbably for this to occur. However, in many of our tests it's possible to have as little as one checkpoint and/ or transaction between epochs, which brought this race condition to light. To resolve, we can place the responsibility on `persist_epoch`. When we get to a point to persist epoch X - 1, epoch X - 2's data must have been indexed to db. ## Test plan Existing tests --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- .../pg/2023-08-19-044026_transactions/up.sql | 4 +- .../up.sql | 6 +- .../src/handlers/checkpoint_handler.rs | 9 ++- crates/sui-indexer/src/handlers/mod.rs | 1 + .../sui-indexer/src/store/pg_indexer_store.rs | 25 +++++++- .../src/store/pg_partition_manager.rs | 63 +++++++++++++++++-- crates/sui-indexer/src/types.rs | 5 ++ 7 files changed, 100 insertions(+), 13 deletions(-) diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql index e9e04cd5cb25e..f5404e3610751 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql @@ -18,6 +18,6 @@ CREATE TABLE transactions ( -- number of successful commands in this transaction, bound by number of command -- in a programmaable transaction. success_command_count smallint NOT NULL, - PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY (tx_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE transactions_partition_0 PARTITION OF transactions FOR VALUES FROM (0) TO (MAXVALUE); diff --git a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql index cb24af8e09934..8ca64b86a7081 100644 --- a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql @@ -1,10 +1,10 @@ -CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start_cp BIGINT, new_epoch_start_cp BIGINT) +CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start BIGINT, new_epoch_start BIGINT) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format('ALTER TABLE %I DETACH PARTITION %I_partition_%s', table_name, table_name, last_epoch); - EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start_cp, new_epoch_start_cp); - EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start_cp); + EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start, new_epoch_start); + EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start); END; $$; diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 1999998e10c6d..2ce65bce4376d 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -215,6 +215,7 @@ where 0, //first_checkpoint_id None, ), + network_total_transactions: 0, })); } @@ -241,7 +242,12 @@ where let event = bcs::from_bytes::(&epoch_event.contents)?; // Now we just entered epoch X, we want to calculate the diff between - // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2) + // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on + // the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather + // than impose a wait on the reading side, however, we overwrite this on the persisting + // side, where we can guarantee that the previous epoch's checkpoints have been written to + // db. + let network_tx_count_prev_epoch = match system_state.epoch { // If first epoch change, this number is 0 1 => Ok(0), @@ -265,6 +271,7 @@ where checkpoint_summary.sequence_number + 1, // first_checkpoint_id Some(&event), ), + network_total_transactions: checkpoint_summary.network_total_transactions, })) } diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 587a13f22bb95..193c6d828e5c9 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -39,4 +39,5 @@ pub struct TransactionObjectChangesToCommit { pub struct EpochToCommit { pub last_epoch: Option, pub new_epoch: IndexedEpochInfo, + pub network_total_transactions: u64, } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index ef603238ad1c1..0f1ae25be69fb 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -989,12 +989,35 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_epoch .start_timer(); let epoch_id = epoch.new_epoch.epoch; + transactional_blocking_with_retry!( &self.blocking_cp, |conn| { if let Some(last_epoch) = &epoch.last_epoch { let last_epoch_id = last_epoch.epoch; - let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because + // we are not guaranteed to have the latest data in db when this is set on + // indexer's chain-reading side. However, when we `persist_epoch`, the + // checkpoints from an epoch ago must have been indexed. + let previous_epoch_network_total_transactions = match epoch_id { + 0 | 1 => 0, + _ => { + let prev_epoch_id = epoch_id - 2; + let result = checkpoints::table + .filter(checkpoints::epoch.eq(prev_epoch_id as i64)) + .select(max(checkpoints::network_total_transactions)) + .first::>(conn) + .map(|o| o.unwrap_or(0))?; + + result as u64 + } + }; + + let epoch_total_transactions = epoch.network_total_transactions + - previous_epoch_network_total_transactions; + + let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64); info!(last_epoch_id, "Persisting epoch end data: {:?}", last_epoch); on_conflict_do_update!( epochs::table, diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index fc92299be3356..0af894d4640ab 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -4,7 +4,7 @@ use diesel::r2d2::R2D2Connection; use diesel::sql_types::{BigInt, VarChar}; use diesel::{QueryableByName, RunQueryDsl}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::time::Duration; use tracing::{error, info}; @@ -44,22 +44,32 @@ GROUP BY table_name; pub struct PgPartitionManager { cp: ConnectionPool, + partition_strategies: HashMap<&'static str, PgPartitionStrategy>, } impl Clone for PgPartitionManager { fn clone(&self) -> PgPartitionManager { Self { cp: self.cp.clone(), + partition_strategies: self.partition_strategies.clone(), } } } +#[derive(Clone, Copy)] +pub enum PgPartitionStrategy { + CheckpointSequenceNumber, + TxSequenceNumber, +} + #[derive(Clone, Debug)] pub struct EpochPartitionData { last_epoch: u64, next_epoch: u64, last_epoch_start_cp: u64, next_epoch_start_cp: u64, + last_epoch_start_tx: u64, + next_epoch_start_tx: u64, } impl EpochPartitionData { @@ -68,18 +78,33 @@ impl EpochPartitionData { let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64; let next_epoch = epoch.new_epoch.epoch; let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id; + + // Determining the tx_sequence_number range for the epoch partition differs from the + // checkpoint_sequence_number range, because the former is a sum of total transactions - + // this sum already addresses the off-by-one. + let next_epoch_start_tx = epoch.network_total_transactions; + let last_epoch_start_tx = + next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64; + Self { last_epoch, next_epoch, last_epoch_start_cp, next_epoch_start_cp, + last_epoch_start_tx, + next_epoch_start_tx, } } } impl PgPartitionManager { pub fn new(cp: ConnectionPool) -> Result { - let manager = Self { cp }; + let mut partition_strategies = HashMap::new(); + partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber); + let manager = Self { + cp, + partition_strategies, + }; let tables = manager.get_table_partitions()?; info!( "Found {} tables with partitions : [{:?}]", @@ -116,6 +141,31 @@ impl PgPartitionManager { ) } + /// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to + /// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's + /// checkpoints today. + pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy { + self.partition_strategies + .get(table_name) + .copied() + .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber) + } + + pub fn determine_partition_range( + &self, + table_name: &str, + data: &EpochPartitionData, + ) -> (u64, u64) { + match self.get_strategy(table_name) { + PgPartitionStrategy::CheckpointSequenceNumber => { + (data.last_epoch_start_cp, data.next_epoch_start_cp) + } + PgPartitionStrategy::TxSequenceNumber => { + (data.last_epoch_start_tx, data.next_epoch_start_tx) + } + } + } + pub fn advance_and_prune_epoch_partition( &self, table: String, @@ -124,6 +174,7 @@ impl PgPartitionManager { data: &EpochPartitionData, epochs_to_keep: Option, ) -> Result<(), IndexerError> { + let partition_range = self.determine_partition_range(&table, data); if data.next_epoch == 0 { tracing::info!("Epoch 0 partition has been created in the initial setup."); return Ok(()); @@ -138,8 +189,8 @@ impl PgPartitionManager { .bind::(table.clone()) .bind::(data.last_epoch as i64) .bind::(data.next_epoch as i64) - .bind::(data.last_epoch_start_cp as i64) - .bind::(data.next_epoch_start_cp as i64), + .bind::(partition_range.0 as i64) + .bind::(partition_range.1 as i64), conn, ) }, @@ -150,13 +201,13 @@ impl PgPartitionManager { transactional_blocking_with_retry!( &self.cp, |conn| { - RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start_cp}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start_cp = data.next_epoch_start_cp as i64, next_epoch = data.next_epoch as i64)), conn) + RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start = partition_range.1 as i64, next_epoch = data.next_epoch as i64)), conn) }, Duration::from_secs(10) )?; info!( "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}", - table, last_partition, data.next_epoch, data.last_epoch_start_cp + table, last_partition, data.next_epoch, partition_range.0 ); // prune old partitions beyond the retention period diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index a99729b146953..db6ad9f155f0c 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -82,6 +82,8 @@ impl IndexedCheckpoint { } } +/// Represents system state and summary info at the start and end of an epoch. Optional fields are +/// populated at epoch boundary, since they cannot be determined at the start of the epoch. #[derive(Clone, Debug, Default)] pub struct IndexedEpochInfo { pub epoch: u64, @@ -127,6 +129,9 @@ impl IndexedEpochInfo { } } + /// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X. + /// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions + /// that occurred in the epoch X-1. pub fn from_end_of_epoch_data( system_state_summary: &SuiSystemStateSummary, last_checkpoint_summary: &CertifiedCheckpointSummary,