diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index cb12150a0c5634..911f51a6332389 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -218,8 +218,7 @@ impl ObjectsSnapshotProcessor { let last_checkpoint_seq = batch.last().as_ref().unwrap().checkpoint_sequence_number; info!("Objects snapshot processor is updating objects snapshot table from {} to {}", first_checkpoint_seq, last_checkpoint_seq); - - store.backfill_objects_snapshot(batch.drain(..).map(|obj| obj.object_changes).collect()) + store.persist_objects_snapshot(batch.drain(..).map(|obj| obj.object_changes).collect()) .await .unwrap_or_else(|_| panic!("Failed to backfill objects snapshot from {} to {}", first_checkpoint_seq, last_checkpoint_seq)); start_cp = last_checkpoint_seq + 1; diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 6b2d8318635230..7f4f229ddcda98 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -130,7 +130,6 @@ pub struct IndexerMetrics { pub epoch_db_commit_latency: Histogram, // latencies of slow DB update queries, now only advance epoch and objects_snapshot update pub advance_epoch_latency: Histogram, - pub update_object_snapshot_latency: Histogram, // latencies of RPC endpoints in read.rs pub get_transaction_block_latency: Histogram, pub multi_get_transaction_blocks_latency: Histogram, @@ -569,12 +568,6 @@ impl IndexerMetrics { DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(), registry, ).unwrap(), - update_object_snapshot_latency: register_histogram_with_registry!( - "update_object_snapshot_latency", - "Time spent in updating object snapshot", - DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(), - registry, - ).unwrap(), subscription_process_latency: register_histogram_with_registry!( "subscription_process_latency", "Time spent in process Websocket subscription", diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index b7edb28a0fbd19..743c2471f58ea9 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -48,16 +48,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { object_changes: Vec, ) -> Result<(), IndexerError>; - // persist objects snapshot with object changes during backfill - async fn backfill_objects_snapshot( + async fn persist_objects_snapshot( &self, object_changes: Vec, ) -> Result<(), IndexerError>; - // update objects snapshot after backfill is done - async fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) - -> Result<(), IndexerError>; - async fn persist_checkpoints( &self, checkpoints: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 2d3a889a490dfa..4b4a21cb447152 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -96,38 +96,6 @@ const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100; const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500; const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600); -// with rn = 1, we only select the latest version of each object, -// so that we don't have to update the same object multiple times. -const UPDATE_OBJECTS_SNAPSHOT_QUERY: &str = r" -INSERT INTO objects_snapshot (object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id) -SELECT object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id -FROM ( - SELECT *, - ROW_NUMBER() OVER (PARTITION BY object_id ORDER BY object_version DESC) as rn - FROM objects_history - WHERE checkpoint_sequence_number >= $1 AND checkpoint_sequence_number < $2 -) as subquery -WHERE rn = 1 -ON CONFLICT (object_id) DO UPDATE -SET object_version = EXCLUDED.object_version, - object_status = EXCLUDED.object_status, - object_digest = EXCLUDED.object_digest, - checkpoint_sequence_number = EXCLUDED.checkpoint_sequence_number, - owner_type = EXCLUDED.owner_type, - owner_id = EXCLUDED.owner_id, - object_type = EXCLUDED.object_type, - object_type_package = EXCLUDED.object_type_package, - object_type_module = EXCLUDED.object_type_module, - object_type_name = EXCLUDED.object_type_name, - serialized_object = EXCLUDED.serialized_object, - coin_type = EXCLUDED.coin_type, - coin_balance = EXCLUDED.coin_balance, - df_kind = EXCLUDED.df_kind, - df_name = EXCLUDED.df_name, - df_object_type = EXCLUDED.df_object_type, - df_object_id = EXCLUDED.df_object_id; -"; - #[derive(Clone)] pub struct PgIndexerStoreConfig { pub parallel_chunk_size: usize, @@ -619,34 +587,6 @@ impl PgIndexerStore { }) } - fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) -> Result<(), IndexerError> { - let work_mem_gb = std::env::var("INDEXER_PG_WORK_MEM") - .unwrap_or_else(|_e| "16".to_string()) - .parse::() - .unwrap(); - let pg_work_mem_query_string = format!("SET work_mem = '{}GB'", work_mem_gb); - let pg_work_mem_query = pg_work_mem_query_string.as_str(); - transactional_blocking_with_retry!( - &self.blocking_cp, - |conn| { RunQueryDsl::execute(diesel::sql_query(pg_work_mem_query), conn,) }, - PG_DB_COMMIT_SLEEP_DURATION - )?; - - transactional_blocking_with_retry!( - &self.blocking_cp, - |conn| { - RunQueryDsl::execute( - diesel::sql_query(UPDATE_OBJECTS_SNAPSHOT_QUERY) - .bind::(start_cp as i64) - .bind::(end_cp as i64), - conn, - ) - }, - PG_DB_COMMIT_SLEEP_DURATION - )?; - Ok(()) - } - fn persist_checkpoints(&self, checkpoints: Vec) -> Result<(), IndexerError> { let Some(first_checkpoint) = checkpoints.first() else { return Ok(()); @@ -1764,7 +1704,7 @@ impl IndexerStore for PgIndexerStore { Ok(()) } - async fn backfill_objects_snapshot( + async fn persist_objects_snapshot( &self, object_changes: Vec, ) -> Result<(), IndexerError> { @@ -1859,37 +1799,6 @@ impl IndexerStore for PgIndexerStore { Ok(()) } - async fn update_objects_snapshot( - &self, - start_cp: u64, - end_cp: u64, - ) -> Result<(), IndexerError> { - let skip_snapshot = std::env::var("SKIP_OBJECT_SNAPSHOT") - .map(|val| val.eq_ignore_ascii_case("true")) - .unwrap_or(false); - if skip_snapshot { - info!("skipping object snapshot"); - return Ok(()); - } - - let guard = self.metrics.update_object_snapshot_latency.start_timer(); - - self.spawn_blocking_task(move |this| this.update_objects_snapshot(start_cp, end_cp)) - .await - .map_err(|e| { - IndexerError::PostgresWriteError(format!( - "Failed to update objects snapshot: {:?}", - e - )) - })??; - let elapsed = guard.stop_and_record(); - info!( - elapsed, - "Persisted snapshot for checkpoints from {} to {}", start_cp, end_cp - ); - Ok(()) - } - async fn persist_checkpoints( &self, checkpoints: Vec,