From 72603de6260795d5c9ed60f885a4ebe717a9430e Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Thu, 19 Sep 2024 16:08:29 -0400 Subject: [PATCH] indexer restore 3/N: restore checkpoints and chain identifier (#19341) ## Description title ## Test plan local run and verify checkpoints and chain_identifier table. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-archival/src/reader.rs | 38 ++++++++++++++ crates/sui-indexer/src/errors.rs | 3 ++ crates/sui-indexer/src/restorer/archives.rs | 25 +++++++-- .../src/restorer/formal_snapshot.rs | 52 ++++++++++++------- crates/sui-indexer/src/store/indexer_store.rs | 5 ++ .../sui-indexer/src/store/pg_indexer_store.rs | 41 +++++++++------ crates/sui-indexer/src/types.rs | 2 +- 7 files changed, 125 insertions(+), 41 deletions(-) diff --git a/crates/sui-archival/src/reader.rs b/crates/sui-archival/src/reader.rs index aa3196a3bc2fa..e13a7a17453ce 100644 --- a/crates/sui-archival/src/reader.rs +++ b/crates/sui-archival/src/reader.rs @@ -361,6 +361,44 @@ impl ArchiveReader { .await } + pub async fn get_summaries_for_list_no_verify( + &self, + cp_list: Vec, + ) -> Result> { + let summary_files = self.get_summary_files_for_list(cp_list.clone()).await?; + let remote_object_store = self.remote_object_store.clone(); + let stream = futures::stream::iter(summary_files.iter()) + .map(|summary_metadata| { + let remote_object_store = remote_object_store.clone(); + async move { + let summary_data = + get(&remote_object_store, &summary_metadata.file_path()).await?; + Ok::(summary_data) + } + }) + .boxed(); + + stream + .buffer_unordered(self.concurrency) + .try_fold(Vec::new(), |mut acc, summary_data| async move { + let summary_result: Result, anyhow::Error> = + make_iterator::>( + SUMMARY_FILE_MAGIC, + summary_data.reader(), + ) + .map(|summary_iter| summary_iter.collect::>()); + + match summary_result { + Ok(summaries) => { + acc.extend(summaries); + Ok(acc) + } + Err(e) => Err(e), + } + }) + .await + } + /// Load checkpoints+txns+effects from archive into the input store `S` for the given /// checkpoint range. If latest available checkpoint in archive is older than the start of the /// input range then this call fails with an error otherwise we load as many checkpoints as diff --git a/crates/sui-indexer/src/errors.rs b/crates/sui-indexer/src/errors.rs index 247d5e83231d6..e1c40861a2532 100644 --- a/crates/sui-indexer/src/errors.rs +++ b/crates/sui-indexer/src/errors.rs @@ -28,6 +28,9 @@ impl std::fmt::Display for DataDownloadError { #[derive(Debug, Error)] pub enum IndexerError { + #[error("Indexer failed to read from archives store with error: `{0}`")] + ArchiveReaderError(String), + #[error("Indexer failed to convert timestamp to NaiveDateTime with error: `{0}`")] DateTimeParsingError(String), diff --git a/crates/sui-indexer/src/restorer/archives.rs b/crates/sui-indexer/src/restorer/archives.rs index 90d224daa32d6..1e1fbd5f678d4 100644 --- a/crates/sui-indexer/src/restorer/archives.rs +++ b/crates/sui-indexer/src/restorer/archives.rs @@ -4,19 +4,27 @@ use std::num::NonZeroUsize; use prometheus::Registry; +use sui_types::digests::CheckpointDigest; use tracing::info; use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics}; use sui_config::node::ArchiveReaderConfig; use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType}; +use crate::errors::IndexerError; use crate::types::IndexerResult; -pub async fn read_next_checkpoint_after_epoch( +#[derive(Clone, Debug)] +pub struct RestoreCheckpointInfo { + pub next_checkpoint_after_epoch: u64, + pub chain_identifier: CheckpointDigest, +} + +pub async fn read_restore_checkpoint_info( cred_path: String, archive_bucket: Option, epoch: u64, -) -> IndexerResult { +) -> IndexerResult { let archive_store_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::GCS), bucket: archive_bucket, @@ -39,5 +47,16 @@ pub async fn read_next_checkpoint_after_epoch( "Read from archives: next checkpoint sequence after epoch {} is: {}", epoch, next_checkpoint_after_epoch ); - Ok(next_checkpoint_after_epoch) + let cp_summaries = archive_reader + .get_summaries_for_list_no_verify(vec![0]) + .await + .map_err(|e| IndexerError::ArchiveReaderError(format!("Failed to get summaries: {}", e)))?; + let first_cp = cp_summaries + .first() + .ok_or_else(|| IndexerError::ArchiveReaderError("No checkpoint found".to_string()))?; + let chain_identifier = *first_cp.digest(); + Ok(RestoreCheckpointInfo { + next_checkpoint_after_epoch, + chain_identifier, + }) } diff --git a/crates/sui-indexer/src/restorer/formal_snapshot.rs b/crates/sui-indexer/src/restorer/formal_snapshot.rs index 190bc308c899a..6b6696b9d434f 100644 --- a/crates/sui-indexer/src/restorer/formal_snapshot.rs +++ b/crates/sui-indexer/src/restorer/formal_snapshot.rs @@ -25,9 +25,9 @@ use sui_types::accumulator::Accumulator; use crate::config::RestoreConfig; use crate::errors::IndexerError; use crate::handlers::TransactionObjectChangesToCommit; -use crate::restorer::archives::read_next_checkpoint_after_epoch; +use crate::restorer::archives::{read_restore_checkpoint_info, RestoreCheckpointInfo}; use crate::store::{indexer_store::IndexerStore, PgIndexerStore}; -use crate::types::IndexedObject; +use crate::types::{IndexedCheckpoint, IndexedObject}; pub type DigestByBucketAndPartition = BTreeMap>; pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator); @@ -37,7 +37,6 @@ pub struct IndexerFormalSnapshotRestorer { store: PgIndexerStore, reader: StateSnapshotReaderV1, restore_config: RestoreConfig, - next_checkpoint_after_epoch: u64, } impl IndexerFormalSnapshotRestorer { @@ -89,17 +88,9 @@ impl IndexerFormalSnapshotRestorer { restore_config.start_epoch ); - let next_checkpoint_after_epoch = read_next_checkpoint_after_epoch( - restore_config.gcs_cred_path.clone(), - Some(restore_config.gcs_archive_bucket.clone()), - restore_config.start_epoch, - ) - .await?; - Ok(Self { store, reader, - next_checkpoint_after_epoch, restore_config: restore_config.clone(), }) } @@ -125,6 +116,8 @@ impl IndexerFormalSnapshotRestorer { info!("Finished restoring move objects"); self.restore_display_table().await?; info!("Finished restoring display table"); + self.restore_cp_watermark_and_chain_id().await?; + info!("Finished restoring checkpoint info"); Ok(()) } @@ -161,7 +154,6 @@ impl IndexerFormalSnapshotRestorer { let store_clone = self.store.clone(); let bar_clone = move_object_progress_bar.clone(); let restore_config = self.restore_config.clone(); - let next_checkpoint_after_epoch = self.next_checkpoint_after_epoch; let restore_task = task::spawn(async move { let _permit = sema_limit_clone.acquire().await.unwrap(); @@ -186,14 +178,11 @@ impl IndexerFormalSnapshotRestorer { for object in obj_iter { match object { LiveObject::Normal(obj) => { - if !obj.is_package() { - let indexed_object = IndexedObject::from_object( - next_checkpoint_after_epoch, - obj, - None, - ); - move_objects.push(indexed_object); - } + // TODO: placeholder values for df_info and checkpoint_seq_num, + // will clean it up when the columne cleanup is done. + let indexed_object = + IndexedObject::from_object(0, obj, None); + move_objects.push(indexed_object); } LiveObject::Wrapped(_) => {} } @@ -269,4 +258,27 @@ impl IndexerFormalSnapshotRestorer { self.store.restore_display(bytes).await?; Ok(()) } + + async fn restore_cp_watermark_and_chain_id(&self) -> Result<(), IndexerError> { + let restore_checkpoint_info = read_restore_checkpoint_info( + self.restore_config.gcs_cred_path.clone(), + Some(self.restore_config.gcs_archive_bucket.clone()), + self.restore_config.start_epoch, + ) + .await?; + let RestoreCheckpointInfo { + next_checkpoint_after_epoch, + chain_identifier, + } = restore_checkpoint_info; + self.store + .persist_chain_identifier(chain_identifier.into_inner().to_vec()) + .await?; + assert!(next_checkpoint_after_epoch > 0); + let last_cp = IndexedCheckpoint { + sequence_number: next_checkpoint_after_epoch - 1, + ..Default::default() + }; + self.store.persist_checkpoints(vec![last_cp]).await?; + Ok(()) + } } diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 536b0bf8b3e17..ab463867ab12f 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -69,6 +69,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { checkpoints: Vec, ) -> Result<(), IndexerError>; + async fn persist_chain_identifier( + &self, + checkpoint_digest: Vec, + ) -> Result<(), IndexerError>; + async fn persist_transactions( &self, transactions: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 0c3c221d1bc84..13d6f63c4d946 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -651,23 +651,7 @@ impl PgIndexerStore { let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec(); self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone()) .await?; - - transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { - async { - let checkpoint_digest = - first_checkpoint.checkpoint_digest.into_inner().to_vec(); - diesel::insert_into(chain_identifier::table) - .values(StoredChainIdentifier { checkpoint_digest }) - .on_conflict_do_nothing() - .execute(conn) - .await - .map_err(IndexerError::from) - .context("failed to write to chain_identifier table")?; - Ok::<(), IndexerError>(()) - } - .scope_boxed() - }) - .await?; + self.persist_chain_identifier(checkpoint_digest).await?; } let guard = self .metrics @@ -2144,6 +2128,29 @@ impl IndexerStore for PgIndexerStore { .await?; Ok(()) } + + async fn persist_chain_identifier( + &self, + checkpoint_digest: Vec, + ) -> Result<(), IndexerError> { + use diesel_async::RunQueryDsl; + + transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { + async { + diesel::insert_into(chain_identifier::table) + .values(StoredChainIdentifier { checkpoint_digest }) + .on_conflict_do_nothing() + .execute(conn) + .await + .map_err(IndexerError::from) + .context("failed to write to chain_identifier table")?; + Ok::<(), IndexerError>(()) + } + .scope_boxed() + }) + .await?; + Ok(()) + } } /// Construct deleted objects and mutated objects to commit. diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 348ecee2e4291..9e5cd640d8ab0 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -27,7 +27,7 @@ use sui_types::transaction::SenderSignedData; pub type IndexerResult = Result; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct IndexedCheckpoint { pub sequence_number: u64, pub checkpoint_digest: CheckpointDigest,