diff --git a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql index e9e04cd5cb25e..f5404e3610751 100644 --- a/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-08-19-044026_transactions/up.sql @@ -18,6 +18,6 @@ CREATE TABLE transactions ( -- number of successful commands in this transaction, bound by number of command -- in a programmaable transaction. success_command_count smallint NOT NULL, - PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY (tx_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE transactions_partition_0 PARTITION OF transactions FOR VALUES FROM (0) TO (MAXVALUE); diff --git a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql index cb24af8e09934..8ca64b86a7081 100644 --- a/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-11-29-193859_advance_partition/up.sql @@ -1,10 +1,10 @@ -CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start_cp BIGINT, new_epoch_start_cp BIGINT) +CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start BIGINT, new_epoch_start BIGINT) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format('ALTER TABLE %I DETACH PARTITION %I_partition_%s', table_name, table_name, last_epoch); - EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start_cp, new_epoch_start_cp); - EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start_cp); + EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start, new_epoch_start); + EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start); END; $$; diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 1999998e10c6d..2ce65bce4376d 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -215,6 +215,7 @@ where 0, //first_checkpoint_id None, ), + network_total_transactions: 0, })); } @@ -241,7 +242,12 @@ where let event = bcs::from_bytes::(&epoch_event.contents)?; // Now we just entered epoch X, we want to calculate the diff between - // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2) + // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on + // the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather + // than impose a wait on the reading side, however, we overwrite this on the persisting + // side, where we can guarantee that the previous epoch's checkpoints have been written to + // db. + let network_tx_count_prev_epoch = match system_state.epoch { // If first epoch change, this number is 0 1 => Ok(0), @@ -265,6 +271,7 @@ where checkpoint_summary.sequence_number + 1, // first_checkpoint_id Some(&event), ), + network_total_transactions: checkpoint_summary.network_total_transactions, })) } diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 587a13f22bb95..193c6d828e5c9 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -39,4 +39,5 @@ pub struct TransactionObjectChangesToCommit { pub struct EpochToCommit { pub last_epoch: Option, pub new_epoch: IndexedEpochInfo, + pub network_total_transactions: u64, } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index ef603238ad1c1..0f1ae25be69fb 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -989,12 +989,35 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_epoch .start_timer(); let epoch_id = epoch.new_epoch.epoch; + transactional_blocking_with_retry!( &self.blocking_cp, |conn| { if let Some(last_epoch) = &epoch.last_epoch { let last_epoch_id = last_epoch.epoch; - let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because + // we are not guaranteed to have the latest data in db when this is set on + // indexer's chain-reading side. However, when we `persist_epoch`, the + // checkpoints from an epoch ago must have been indexed. + let previous_epoch_network_total_transactions = match epoch_id { + 0 | 1 => 0, + _ => { + let prev_epoch_id = epoch_id - 2; + let result = checkpoints::table + .filter(checkpoints::epoch.eq(prev_epoch_id as i64)) + .select(max(checkpoints::network_total_transactions)) + .first::>(conn) + .map(|o| o.unwrap_or(0))?; + + result as u64 + } + }; + + let epoch_total_transactions = epoch.network_total_transactions + - previous_epoch_network_total_transactions; + + let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64); info!(last_epoch_id, "Persisting epoch end data: {:?}", last_epoch); on_conflict_do_update!( epochs::table, diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index fc92299be3356..0af894d4640ab 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -4,7 +4,7 @@ use diesel::r2d2::R2D2Connection; use diesel::sql_types::{BigInt, VarChar}; use diesel::{QueryableByName, RunQueryDsl}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::time::Duration; use tracing::{error, info}; @@ -44,22 +44,32 @@ GROUP BY table_name; pub struct PgPartitionManager { cp: ConnectionPool, + partition_strategies: HashMap<&'static str, PgPartitionStrategy>, } impl Clone for PgPartitionManager { fn clone(&self) -> PgPartitionManager { Self { cp: self.cp.clone(), + partition_strategies: self.partition_strategies.clone(), } } } +#[derive(Clone, Copy)] +pub enum PgPartitionStrategy { + CheckpointSequenceNumber, + TxSequenceNumber, +} + #[derive(Clone, Debug)] pub struct EpochPartitionData { last_epoch: u64, next_epoch: u64, last_epoch_start_cp: u64, next_epoch_start_cp: u64, + last_epoch_start_tx: u64, + next_epoch_start_tx: u64, } impl EpochPartitionData { @@ -68,18 +78,33 @@ impl EpochPartitionData { let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64; let next_epoch = epoch.new_epoch.epoch; let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id; + + // Determining the tx_sequence_number range for the epoch partition differs from the + // checkpoint_sequence_number range, because the former is a sum of total transactions - + // this sum already addresses the off-by-one. + let next_epoch_start_tx = epoch.network_total_transactions; + let last_epoch_start_tx = + next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64; + Self { last_epoch, next_epoch, last_epoch_start_cp, next_epoch_start_cp, + last_epoch_start_tx, + next_epoch_start_tx, } } } impl PgPartitionManager { pub fn new(cp: ConnectionPool) -> Result { - let manager = Self { cp }; + let mut partition_strategies = HashMap::new(); + partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber); + let manager = Self { + cp, + partition_strategies, + }; let tables = manager.get_table_partitions()?; info!( "Found {} tables with partitions : [{:?}]", @@ -116,6 +141,31 @@ impl PgPartitionManager { ) } + /// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to + /// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's + /// checkpoints today. + pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy { + self.partition_strategies + .get(table_name) + .copied() + .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber) + } + + pub fn determine_partition_range( + &self, + table_name: &str, + data: &EpochPartitionData, + ) -> (u64, u64) { + match self.get_strategy(table_name) { + PgPartitionStrategy::CheckpointSequenceNumber => { + (data.last_epoch_start_cp, data.next_epoch_start_cp) + } + PgPartitionStrategy::TxSequenceNumber => { + (data.last_epoch_start_tx, data.next_epoch_start_tx) + } + } + } + pub fn advance_and_prune_epoch_partition( &self, table: String, @@ -124,6 +174,7 @@ impl PgPartitionManager { data: &EpochPartitionData, epochs_to_keep: Option, ) -> Result<(), IndexerError> { + let partition_range = self.determine_partition_range(&table, data); if data.next_epoch == 0 { tracing::info!("Epoch 0 partition has been created in the initial setup."); return Ok(()); @@ -138,8 +189,8 @@ impl PgPartitionManager { .bind::(table.clone()) .bind::(data.last_epoch as i64) .bind::(data.next_epoch as i64) - .bind::(data.last_epoch_start_cp as i64) - .bind::(data.next_epoch_start_cp as i64), + .bind::(partition_range.0 as i64) + .bind::(partition_range.1 as i64), conn, ) }, @@ -150,13 +201,13 @@ impl PgPartitionManager { transactional_blocking_with_retry!( &self.cp, |conn| { - RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start_cp}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start_cp = data.next_epoch_start_cp as i64, next_epoch = data.next_epoch as i64)), conn) + RunQueryDsl::execute(diesel::sql_query(format!("ALTER TABLE {table_name} REORGANIZE PARTITION {table_name}_partition_{last_epoch} INTO (PARTITION {table_name}_partition_{last_epoch} VALUES LESS THAN ({next_epoch_start}), PARTITION {table_name}_partition_{next_epoch} VALUES LESS THAN MAXVALUE)", table_name = table.clone(), last_epoch = data.last_epoch as i64, next_epoch_start = partition_range.1 as i64, next_epoch = data.next_epoch as i64)), conn) }, Duration::from_secs(10) )?; info!( "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}", - table, last_partition, data.next_epoch, data.last_epoch_start_cp + table, last_partition, data.next_epoch, partition_range.0 ); // prune old partitions beyond the retention period diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index a99729b146953..db6ad9f155f0c 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -82,6 +82,8 @@ impl IndexedCheckpoint { } } +/// Represents system state and summary info at the start and end of an epoch. Optional fields are +/// populated at epoch boundary, since they cannot be determined at the start of the epoch. #[derive(Clone, Debug, Default)] pub struct IndexedEpochInfo { pub epoch: u64, @@ -127,6 +129,9 @@ impl IndexedEpochInfo { } } + /// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X. + /// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions + /// that occurred in the epoch X-1. pub fn from_end_of_epoch_data( system_state_summary: &SuiSystemStateSummary, last_checkpoint_summary: &CertifiedCheckpointSummary,