From 5cf3b93745a8f2d6900630688825e28b1937bbd3 Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:46:37 -0400 Subject: [PATCH] indexer restore 2/N: restore display table (#19324) ## Description title ## Test plan upload table to remote and restore from remote to make sure that tables match ``` (SELECT * FROM display EXCEPT SELECT * FROM display_backup) UNION ALL (SELECT * FROM display_backup EXCEPT SELECT * FROM display); ``` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../pg/2023-10-07-160139_display/up.sql | 1 - crates/sui-indexer/src/models/display.rs | 4 ++- .../src/restorer/formal_snapshot.rs | 25 ++++++++++++++++ crates/sui-indexer/src/store/indexer_store.rs | 1 + .../sui-indexer/src/store/pg_indexer_store.rs | 29 +++++++++++-------- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql b/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql index 1ef9f37bc54f8..c82918e253c8c 100644 --- a/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql +++ b/crates/sui-indexer/migrations/pg/2023-10-07-160139_display/up.sql @@ -1,4 +1,3 @@ --- Your SQL goes here CREATE TABLE display ( object_type text PRIMARY KEY, diff --git a/crates/sui-indexer/src/models/display.rs b/crates/sui-indexer/src/models/display.rs index bb9d3e9aa792b..33a1c7c7cb0cb 100644 --- a/crates/sui-indexer/src/models/display.rs +++ b/crates/sui-indexer/src/models/display.rs @@ -2,11 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use diesel::prelude::*; +use serde::Deserialize; + use sui_types::display::DisplayVersionUpdatedEvent; use crate::schema::display; -#[derive(Queryable, Insertable, Selectable, Debug, Clone)] +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Deserialize)] #[diesel(table_name = display)] pub struct StoredDisplay { pub object_type: String, diff --git a/crates/sui-indexer/src/restorer/formal_snapshot.rs b/crates/sui-indexer/src/restorer/formal_snapshot.rs index a9c9b4a7f1b4e..190bc308c899a 100644 --- a/crates/sui-indexer/src/restorer/formal_snapshot.rs +++ b/crates/sui-indexer/src/restorer/formal_snapshot.rs @@ -18,6 +18,7 @@ use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType}; use sui_core::authority::authority_store_tables::LiveObject; use sui_snapshot::reader::{download_bytes, LiveObjectIter, StateSnapshotReaderV1}; use sui_snapshot::FileMetadata; +use sui_storage::object_store::util::get; use sui_storage::object_store::ObjectStoreGetExt; use sui_types::accumulator::Accumulator; @@ -122,6 +123,8 @@ impl IndexerFormalSnapshotRestorer { ) .await?; info!("Finished restoring move objects"); + self.restore_display_table().await?; + info!("Finished restoring display table"); Ok(()) } @@ -244,4 +247,26 @@ impl IndexerFormalSnapshotRestorer { ) .await? } + + async fn restore_display_table(&self) -> std::result::Result<(), anyhow::Error> { + let cred_path = self.restore_config.gcs_cred_path.clone(); + let bucket = self.restore_config.gcs_display_bucket.clone(); + let start_epoch = self.restore_config.start_epoch; + + let remote_store_config = ObjectStoreConfig { + object_store: Some(ObjectStoreType::GCS), + bucket: Some(bucket), + google_service_account: Some(cred_path), + object_store_connection_limit: 200, + no_sign_request: false, + ..Default::default() + }; + let remote_store = remote_store_config.make().map_err(|e| { + IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e)) + })?; + let path = Path::from(format!("display_{}.csv", start_epoch).as_str()); + let bytes: bytes::Bytes = get(&remote_store, &path).await?; + self.store.restore_display(bytes).await?; + Ok(()) + } } diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index a73a243ef8072..536b0bf8b3e17 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -101,4 +101,5 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { ) -> Result; async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>; + async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError>; } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 97c2e9c471015..03af1a63c0860 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -8,7 +8,7 @@ use std::time::Duration; use async_trait::async_trait; use core::result::Result::Ok; -use csv::Writer; +use csv::{ReaderBuilder, Writer}; use diesel::dsl::{max, min}; use diesel::ExpressionMethods; use diesel::OptionalExtension; @@ -308,14 +308,14 @@ impl PgIndexerStore { async fn persist_display_updates( &self, - display_updates: BTreeMap, + display_updates: Vec, ) -> Result<(), IndexerError> { use diesel_async::RunQueryDsl; transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { diesel::insert_into(display::table) - .values(display_updates.values().collect::>()) + .values(display_updates) .on_conflict(display::object_type) .do_update() .set(( @@ -1846,8 +1846,8 @@ impl IndexerStore for PgIndexerStore { if display_updates.is_empty() { return Ok(()); } - - self.persist_display_updates(display_updates).await + self.persist_display_updates(display_updates.values().cloned().collect::>()) + .await } async fn persist_packages(&self, packages: Vec) -> Result<(), IndexerError> { @@ -1979,26 +1979,21 @@ impl IndexerStore for PgIndexerStore { async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> { use diesel_async::RunQueryDsl; - let mut connection = self.pool.get().await?; - let mut buffer = Cursor::new(Vec::new()); { let mut writer = Writer::from_writer(&mut buffer); - let displays = display::table .load::(&mut connection) .await .map_err(Into::into) .context("Failed to get display from database")?; - info!("Read {} displays", displays.len()); writer .write_record(["object_type", "id", "version", "bcs"]) .map_err(|_| { IndexerError::GcsError("Failed to write display to csv".to_string()) })?; - for display in displays { writer .write_record(&[ @@ -2009,7 +2004,6 @@ impl IndexerStore for PgIndexerStore { ]) .map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?; } - writer .flush() .map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?; @@ -2030,7 +2024,6 @@ impl IndexerStore for PgIndexerStore { let remote_store = remote_store_config.make().map_err(|e| { IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e)) })?; - let path = Path::from(format!("display_{}.csv", epoch_number).as_str()); put(&remote_store, &path, buffer.into_inner().into()) .await @@ -2041,6 +2034,18 @@ impl IndexerStore for PgIndexerStore { Ok(()) } + async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError> { + let cursor = Cursor::new(bytes); + let mut csv_reader = ReaderBuilder::new().has_headers(true).from_reader(cursor); + let displays = csv_reader + .deserialize() + .collect::, csv::Error>>() + .map_err(|e| { + IndexerError::GcsError(format!("Failed to deserialize display records: {}", e)) + })?; + self.persist_display_updates(displays).await + } + async fn get_network_total_transactions_by_end_of_epoch( &self, epoch: u64,