Skip to content

Commit

Permalink
indexer restore 3/N: restore checkpoints and chain identifier (#19341)
Browse files Browse the repository at this point in the history
## Description 
title

## Test plan 

local run and verify checkpoints and chain_identifier table.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp committed Sep 19, 2024
1 parent b7182a9 commit 72603de
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 41 deletions.
38 changes: 38 additions & 0 deletions crates/sui-archival/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,44 @@ impl ArchiveReader {
.await
}

pub async fn get_summaries_for_list_no_verify(
&self,
cp_list: Vec<CheckpointSequenceNumber>,
) -> Result<Vec<CertifiedCheckpointSummary>> {
let summary_files = self.get_summary_files_for_list(cp_list.clone()).await?;
let remote_object_store = self.remote_object_store.clone();
let stream = futures::stream::iter(summary_files.iter())
.map(|summary_metadata| {
let remote_object_store = remote_object_store.clone();
async move {
let summary_data =
get(&remote_object_store, &summary_metadata.file_path()).await?;
Ok::<Bytes, anyhow::Error>(summary_data)
}
})
.boxed();

stream
.buffer_unordered(self.concurrency)
.try_fold(Vec::new(), |mut acc, summary_data| async move {
let summary_result: Result<Vec<CertifiedCheckpointSummary>, anyhow::Error> =
make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
SUMMARY_FILE_MAGIC,
summary_data.reader(),
)
.map(|summary_iter| summary_iter.collect::<Vec<_>>());

match summary_result {
Ok(summaries) => {
acc.extend(summaries);
Ok(acc)
}
Err(e) => Err(e),
}
})
.await
}

/// Load checkpoints+txns+effects from archive into the input store `S` for the given
/// checkpoint range. If latest available checkpoint in archive is older than the start of the
/// input range then this call fails with an error otherwise we load as many checkpoints as
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ impl std::fmt::Display for DataDownloadError {

#[derive(Debug, Error)]
pub enum IndexerError {
#[error("Indexer failed to read from archives store with error: `{0}`")]
ArchiveReaderError(String),

#[error("Indexer failed to convert timestamp to NaiveDateTime with error: `{0}`")]
DateTimeParsingError(String),

Expand Down
25 changes: 22 additions & 3 deletions crates/sui-indexer/src/restorer/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@
use std::num::NonZeroUsize;

use prometheus::Registry;
use sui_types::digests::CheckpointDigest;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

use crate::errors::IndexerError;
use crate::types::IndexerResult;

pub async fn read_next_checkpoint_after_epoch(
#[derive(Clone, Debug)]
pub struct RestoreCheckpointInfo {
pub next_checkpoint_after_epoch: u64,
pub chain_identifier: CheckpointDigest,
}

pub async fn read_restore_checkpoint_info(
cred_path: String,
archive_bucket: Option<String>,
epoch: u64,
) -> IndexerResult<u64> {
) -> IndexerResult<RestoreCheckpointInfo> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
Expand All @@ -39,5 +47,16 @@ pub async fn read_next_checkpoint_after_epoch(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
Ok(next_checkpoint_after_epoch)
let cp_summaries = archive_reader
.get_summaries_for_list_no_verify(vec![0])
.await
.map_err(|e| IndexerError::ArchiveReaderError(format!("Failed to get summaries: {}", e)))?;
let first_cp = cp_summaries
.first()
.ok_or_else(|| IndexerError::ArchiveReaderError("No checkpoint found".to_string()))?;
let chain_identifier = *first_cp.digest();
Ok(RestoreCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
})
}
52 changes: 32 additions & 20 deletions crates/sui-indexer/src/restorer/formal_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use sui_types::accumulator::Accumulator;
use crate::config::RestoreConfig;
use crate::errors::IndexerError;
use crate::handlers::TransactionObjectChangesToCommit;
use crate::restorer::archives::read_next_checkpoint_after_epoch;
use crate::restorer::archives::{read_restore_checkpoint_info, RestoreCheckpointInfo};
use crate::store::{indexer_store::IndexerStore, PgIndexerStore};
use crate::types::IndexedObject;
use crate::types::{IndexedCheckpoint, IndexedObject};

pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
Expand All @@ -37,7 +37,6 @@ pub struct IndexerFormalSnapshotRestorer {
store: PgIndexerStore,
reader: StateSnapshotReaderV1,
restore_config: RestoreConfig,
next_checkpoint_after_epoch: u64,
}

impl IndexerFormalSnapshotRestorer {
Expand Down Expand Up @@ -89,17 +88,9 @@ impl IndexerFormalSnapshotRestorer {
restore_config.start_epoch
);

let next_checkpoint_after_epoch = read_next_checkpoint_after_epoch(
restore_config.gcs_cred_path.clone(),
Some(restore_config.gcs_archive_bucket.clone()),
restore_config.start_epoch,
)
.await?;

Ok(Self {
store,
reader,
next_checkpoint_after_epoch,
restore_config: restore_config.clone(),
})
}
Expand All @@ -125,6 +116,8 @@ impl IndexerFormalSnapshotRestorer {
info!("Finished restoring move objects");
self.restore_display_table().await?;
info!("Finished restoring display table");
self.restore_cp_watermark_and_chain_id().await?;
info!("Finished restoring checkpoint info");
Ok(())
}

Expand Down Expand Up @@ -161,7 +154,6 @@ impl IndexerFormalSnapshotRestorer {
let store_clone = self.store.clone();
let bar_clone = move_object_progress_bar.clone();
let restore_config = self.restore_config.clone();
let next_checkpoint_after_epoch = self.next_checkpoint_after_epoch;

let restore_task = task::spawn(async move {
let _permit = sema_limit_clone.acquire().await.unwrap();
Expand All @@ -186,14 +178,11 @@ impl IndexerFormalSnapshotRestorer {
for object in obj_iter {
match object {
LiveObject::Normal(obj) => {
if !obj.is_package() {
let indexed_object = IndexedObject::from_object(
next_checkpoint_after_epoch,
obj,
None,
);
move_objects.push(indexed_object);
}
// TODO: placeholder values for df_info and checkpoint_seq_num,
// will clean it up when the columne cleanup is done.
let indexed_object =
IndexedObject::from_object(0, obj, None);
move_objects.push(indexed_object);
}
LiveObject::Wrapped(_) => {}
}
Expand Down Expand Up @@ -269,4 +258,27 @@ impl IndexerFormalSnapshotRestorer {
self.store.restore_display(bytes).await?;
Ok(())
}

async fn restore_cp_watermark_and_chain_id(&self) -> Result<(), IndexerError> {
let restore_checkpoint_info = read_restore_checkpoint_info(
self.restore_config.gcs_cred_path.clone(),
Some(self.restore_config.gcs_archive_bucket.clone()),
self.restore_config.start_epoch,
)
.await?;
let RestoreCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
} = restore_checkpoint_info;
self.store
.persist_chain_identifier(chain_identifier.into_inner().to_vec())
.await?;
assert!(next_checkpoint_after_epoch > 0);
let last_cp = IndexedCheckpoint {
sequence_number: next_checkpoint_after_epoch - 1,
..Default::default()
};
self.store.persist_checkpoints(vec![last_cp]).await?;
Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
checkpoints: Vec<IndexedCheckpoint>,
) -> Result<(), IndexerError>;

async fn persist_chain_identifier(
&self,
checkpoint_digest: Vec<u8>,
) -> Result<(), IndexerError>;

async fn persist_transactions(
&self,
transactions: Vec<IndexedTransaction>,
Expand Down
41 changes: 24 additions & 17 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,7 @@ impl PgIndexerStore {
let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())
.await?;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
let checkpoint_digest =
first_checkpoint.checkpoint_digest.into_inner().to_vec();
diesel::insert_into(chain_identifier::table)
.values(StoredChainIdentifier { checkpoint_digest })
.on_conflict_do_nothing()
.execute(conn)
.await
.map_err(IndexerError::from)
.context("failed to write to chain_identifier table")?;
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await?;
self.persist_chain_identifier(checkpoint_digest).await?;
}
let guard = self
.metrics
Expand Down Expand Up @@ -2144,6 +2128,29 @@ impl IndexerStore for PgIndexerStore {
.await?;
Ok(())
}

async fn persist_chain_identifier(
&self,
checkpoint_digest: Vec<u8>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(chain_identifier::table)
.values(StoredChainIdentifier { checkpoint_digest })
.on_conflict_do_nothing()
.execute(conn)
.await
.map_err(IndexerError::from)
.context("failed to write to chain_identifier table")?;
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await?;
Ok(())
}
}

/// Construct deleted objects and mutated objects to commit.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sui_types::transaction::SenderSignedData;

pub type IndexerResult<T> = Result<T, IndexerError>;

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct IndexedCheckpoint {
pub sequence_number: u64,
pub checkpoint_digest: CheckpointDigest,
Expand Down

0 comments on commit 72603de

Please sign in to comment.