Skip to content

Commit

Permalink
indexer: clean up legacy snapshot codes
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Aug 22, 2024
1 parent bd0dc2f commit b563626
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ where
info!("Objects snapshot processor is updating objects snapshot table from {} to {}", first_checkpoint_seq, last_checkpoint_seq);

let changes_to_commit = object_changes_batch.into_iter().map(|obj| obj.object_changes).collect();
store.backfill_objects_snapshot(changes_to_commit)
store.persist_objects_snapshot(changes_to_commit)
.await
.unwrap_or_else(|_| panic!("Failed to backfill objects snapshot from {} to {}", first_checkpoint_seq, last_checkpoint_seq));
start_cp = last_checkpoint_seq + 1;
Expand Down
7 changes: 0 additions & 7 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ pub struct IndexerMetrics {
pub epoch_db_commit_latency: Histogram,
// latencies of slow DB update queries, now only advance epoch and objects_snapshot update
pub advance_epoch_latency: Histogram,
pub update_object_snapshot_latency: Histogram,
// latencies of RPC endpoints in read.rs
pub get_transaction_block_latency: Histogram,
pub multi_get_transaction_blocks_latency: Histogram,
Expand Down Expand Up @@ -592,12 +591,6 @@ impl IndexerMetrics {
DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
update_object_snapshot_latency: register_histogram_with_registry!(
"update_object_snapshot_latency",
"Time spent in updating object snapshot",
DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
subscription_process_latency: register_histogram_with_registry!(
"subscription_process_latency",
"Time spent in process Websocket subscription",
Expand Down
7 changes: 1 addition & 6 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static {
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

// persist objects snapshot with object changes during backfill
async fn backfill_objects_snapshot(
async fn persist_objects_snapshot(
&self,
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError>;

// update objects snapshot after backfill is done
async fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64)
-> Result<(), IndexerError>;

async fn persist_checkpoints(
&self,
checkpoints: Vec<IndexedCheckpoint>,
Expand Down
93 changes: 1 addition & 92 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,38 +99,6 @@ const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);

// with rn = 1, we only select the latest version of each object,
// so that we don't have to update the same object multiple times.
const UPDATE_OBJECTS_SNAPSHOT_QUERY: &str = r"
INSERT INTO objects_snapshot (object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id)
SELECT object_id, object_version, object_status, object_digest, checkpoint_sequence_number, owner_type, owner_id, object_type, object_type_package, object_type_module, object_type_name, serialized_object, coin_type, coin_balance, df_kind, df_name, df_object_type, df_object_id
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY object_id ORDER BY object_version DESC) as rn
FROM objects_history
WHERE checkpoint_sequence_number >= $1 AND checkpoint_sequence_number < $2
) as subquery
WHERE rn = 1
ON CONFLICT (object_id) DO UPDATE
SET object_version = EXCLUDED.object_version,
object_status = EXCLUDED.object_status,
object_digest = EXCLUDED.object_digest,
checkpoint_sequence_number = EXCLUDED.checkpoint_sequence_number,
owner_type = EXCLUDED.owner_type,
owner_id = EXCLUDED.owner_id,
object_type = EXCLUDED.object_type,
object_type_package = EXCLUDED.object_type_package,
object_type_module = EXCLUDED.object_type_module,
object_type_name = EXCLUDED.object_type_name,
serialized_object = EXCLUDED.serialized_object,
coin_type = EXCLUDED.coin_type,
coin_balance = EXCLUDED.coin_balance,
df_kind = EXCLUDED.df_kind,
df_name = EXCLUDED.df_name,
df_object_type = EXCLUDED.df_object_type,
df_object_id = EXCLUDED.df_object_id;
";

#[derive(Clone)]
pub struct PgIndexerStoreConfig {
pub parallel_chunk_size: usize,
Expand Down Expand Up @@ -627,34 +595,6 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
})
}

fn update_objects_snapshot(&self, start_cp: u64, end_cp: u64) -> Result<(), IndexerError> {
let work_mem_gb = std::env::var("INDEXER_PG_WORK_MEM")
.unwrap_or_else(|_e| "16".to_string())
.parse::<i64>()
.unwrap();
let pg_work_mem_query_string = format!("SET work_mem = '{}GB'", work_mem_gb);
let pg_work_mem_query = pg_work_mem_query_string.as_str();
transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| { RunQueryDsl::execute(diesel::sql_query(pg_work_mem_query), conn,) },
PG_DB_COMMIT_SLEEP_DURATION
)?;

transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
RunQueryDsl::execute(
diesel::sql_query(UPDATE_OBJECTS_SNAPSHOT_QUERY)
.bind::<diesel::sql_types::BigInt, _>(start_cp as i64)
.bind::<diesel::sql_types::BigInt, _>(end_cp as i64),
conn,
)
},
PG_DB_COMMIT_SLEEP_DURATION
)?;
Ok(())
}

fn persist_checkpoints(&self, checkpoints: Vec<IndexedCheckpoint>) -> Result<(), IndexerError> {
let Some(first_checkpoint) = checkpoints.first() else {
return Ok(());
Expand Down Expand Up @@ -1772,7 +1712,7 @@ impl<T: R2D2Connection> IndexerStore for PgIndexerStore<T> {
Ok(())
}

async fn backfill_objects_snapshot(
async fn persist_objects_snapshot(
&self,
object_changes: Vec<TransactionObjectChangesToCommit>,
) -> Result<(), IndexerError> {
Expand Down Expand Up @@ -1867,37 +1807,6 @@ impl<T: R2D2Connection> IndexerStore for PgIndexerStore<T> {
Ok(())
}

async fn update_objects_snapshot(
&self,
start_cp: u64,
end_cp: u64,
) -> Result<(), IndexerError> {
let skip_snapshot = std::env::var("SKIP_OBJECT_SNAPSHOT")
.map(|val| val.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if skip_snapshot {
info!("skipping object snapshot");
return Ok(());
}

let guard = self.metrics.update_object_snapshot_latency.start_timer();

self.spawn_blocking_task(move |this| this.update_objects_snapshot(start_cp, end_cp))
.await
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to update objects snapshot: {:?}",
e
))
})??;
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Persisted snapshot for checkpoints from {} to {}", start_cp, end_cp
);
Ok(())
}

async fn persist_checkpoints(
&self,
checkpoints: Vec<IndexedCheckpoint>,
Expand Down

0 comments on commit b563626

Please sign in to comment.