diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index 907036005378a5..858fcf45b51e6f 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -243,7 +243,7 @@ where info!("Objects snapshot processor is updating objects snapshot table from {} to {}", first_checkpoint_seq, last_checkpoint_seq); let changes_to_commit = object_changes_batch.into_iter().map(|obj| obj.object_changes).collect(); - store.backfill_objects_snapshot(changes_to_commit) + store.persist_objects_snapshot(changes_to_commit) .await .unwrap_or_else(|_| panic!("Failed to backfill objects snapshot from {} to {}", first_checkpoint_seq, last_checkpoint_seq)); start_cp = last_checkpoint_seq + 1; diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 34978836db5d2c..c12abf8cb65d28 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -153,7 +153,6 @@ pub struct IndexerMetrics { pub epoch_db_commit_latency: Histogram, // latencies of slow DB update queries, now only advance epoch and objects_snapshot update pub advance_epoch_latency: Histogram, - pub update_object_snapshot_latency: Histogram, // latencies of RPC endpoints in read.rs pub get_transaction_block_latency: Histogram, pub multi_get_transaction_blocks_latency: Histogram, @@ -592,12 +591,6 @@ impl IndexerMetrics { DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(), registry, ).unwrap(), - update_object_snapshot_latency: register_histogram_with_registry!( - "update_object_snapshot_latency", - "Time spent in updating object snapshot", - DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(), - registry, - ).unwrap(), subscription_process_latency: register_histogram_with_registry!( "subscription_process_latency", "Time spent in process Websocket subscription", diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 2fd2c1531b76ab..d4942d649ad6c3 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -49,16 +49,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static { object_changes: Vec, ) -> Result<(), IndexerError>; - // persist objects snapshot with object changes during backfill - async fn backfill_objects_snapshot( + async fn persist_objects_snapshot( &self, object_changes: Vec, ) -> Result<(), IndexerError>; - // update objects snapshot after backfill is done - async fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) - -> Result<(), IndexerError>; - async fn persist_checkpoints( &self, checkpoints: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 7a2ac3cdb88542..2f63c17ebcd786 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -99,38 +99,6 @@ const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100; const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500; const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600); -// with rn = 1, we only select the latest version of each object, -// so that we don't have to update the same object multiple times. -const UPDATE_OBJECTS_SNAPSHOT_QUERY: &str = r" -INSERT INTO objects_snapshot (object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id) -SELECT object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id -FROM ( - SELECT *, - ROW_NUMBER() OVER (PARTITION BY object_id ORDER BY object_version DESC) as rn - FROM objects_history - WHERE checkpoint_sequence_number >= $1 AND checkpoint_sequence_number < $2 -) as subquery -WHERE rn = 1 -ON CONFLICT (object_id) DO UPDATE -SET object_version = EXCLUDED.object_version, - object_status = EXCLUDED.object_status, - object_digest = EXCLUDED.object_digest, - checkpoint_sequence_number = EXCLUDED.checkpoint_sequence_number, - owner_type = EXCLUDED.owner_type, - owner_id = EXCLUDED.owner_id, - object_type = EXCLUDED.object_type, - object_type_package = EXCLUDED.object_type_package, - object_type_module = EXCLUDED.object_type_module, - object_type_name = EXCLUDED.object_type_name, - serialized_object = EXCLUDED.serialized_object, - coin_type = EXCLUDED.coin_type, - coin_balance = EXCLUDED.coin_balance, - df_kind = EXCLUDED.df_kind, - df_name = EXCLUDED.df_name, - df_object_type = EXCLUDED.df_object_type, - df_object_id = EXCLUDED.df_object_id; -"; - #[derive(Clone)] pub struct PgIndexerStoreConfig { pub parallel_chunk_size: usize, @@ -627,34 +595,6 @@ impl PgIndexerStore { }) } - fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) -> Result<(), IndexerError> { - let work_mem_gb = std::env::var("INDEXER_PG_WORK_MEM") - .unwrap_or_else(|_e| "16".to_string()) - .parse::() - .unwrap(); - let pg_work_mem_query_string = format!("SET work_mem = '{}GB'", work_mem_gb); - let pg_work_mem_query = pg_work_mem_query_string.as_str(); - transactional_blocking_with_retry!( - &self.blocking_cp, - |conn| { RunQueryDsl::execute(diesel::sql_query(pg_work_mem_query), conn,) }, - PG_DB_COMMIT_SLEEP_DURATION - )?; - - transactional_blocking_with_retry!( - &self.blocking_cp, - |conn| { - RunQueryDsl::execute( - diesel::sql_query(UPDATE_OBJECTS_SNAPSHOT_QUERY) - .bind::(start_cp as i64) - .bind::(end_cp as i64), - conn, - ) - }, - PG_DB_COMMIT_SLEEP_DURATION - )?; - Ok(()) - } - fn persist_checkpoints(&self, checkpoints: Vec) -> Result<(), IndexerError> { let Some(first_checkpoint) = checkpoints.first() else { return Ok(()); @@ -1772,7 +1712,7 @@ impl IndexerStore for PgIndexerStore { Ok(()) } - async fn backfill_objects_snapshot( + async fn persist_objects_snapshot( &self, object_changes: Vec, ) -> Result<(), IndexerError> { @@ -1867,37 +1807,6 @@ impl IndexerStore for PgIndexerStore { Ok(()) } - async fn update_objects_snapshot( - &self, - start_cp: u64, - end_cp: u64, - ) -> Result<(), IndexerError> { - let skip_snapshot = std::env::var("SKIP_OBJECT_SNAPSHOT") - .map(|val| val.eq_ignore_ascii_case("true")) - .unwrap_or(false); - if skip_snapshot { - info!("skipping object snapshot"); - return Ok(()); - } - - let guard = self.metrics.update_object_snapshot_latency.start_timer(); - - self.spawn_blocking_task(move |this| this.update_objects_snapshot(start_cp, end_cp)) - .await - .map_err(|e| { - IndexerError::PostgresWriteError(format!( - "Failed to update objects snapshot: {:?}", - e - )) - })??; - let elapsed = guard.stop_and_record(); - info!( - elapsed, - "Persisted snapshot for checkpoints from {} to {}", start_cp, end_cp - ); - Ok(()) - } - async fn persist_checkpoints( &self, checkpoints: Vec,