Skip to content

Commit

Permalink
indexer restore 2/N: restore display table (#19324)
Browse files Browse the repository at this point in the history
## Description 
title


## Test plan 

upload table to remote and restore from remote to make sure that tables
match
```
(SELECT * FROM display EXCEPT SELECT * FROM display_backup)
UNION ALL
(SELECT * FROM display_backup EXCEPT SELECT * FROM display);
```

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp committed Sep 19, 2024
1 parent 94d68bf commit 5cf3b93
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- Your SQL goes here
CREATE TABLE display
(
object_type text PRIMARY KEY,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-indexer/src/models/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;
use serde::Deserialize;

use sui_types::display::DisplayVersionUpdatedEvent;

use crate::schema::display;

#[derive(Queryable, Insertable, Selectable, Debug, Clone)]
#[derive(Queryable, Insertable, Selectable, Debug, Clone, Deserialize)]
#[diesel(table_name = display)]
pub struct StoredDisplay {
pub object_type: String,
Expand Down
25 changes: 25 additions & 0 deletions crates/sui-indexer/src/restorer/formal_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
use sui_core::authority::authority_store_tables::LiveObject;
use sui_snapshot::reader::{download_bytes, LiveObjectIter, StateSnapshotReaderV1};
use sui_snapshot::FileMetadata;
use sui_storage::object_store::util::get;
use sui_storage::object_store::ObjectStoreGetExt;
use sui_types::accumulator::Accumulator;

Expand Down Expand Up @@ -122,6 +123,8 @@ impl IndexerFormalSnapshotRestorer {
)
.await?;
info!("Finished restoring move objects");
self.restore_display_table().await?;
info!("Finished restoring display table");
Ok(())
}

Expand Down Expand Up @@ -244,4 +247,26 @@ impl IndexerFormalSnapshotRestorer {
)
.await?
}

async fn restore_display_table(&self) -> std::result::Result<(), anyhow::Error> {
let cred_path = self.restore_config.gcs_cred_path.clone();
let bucket = self.restore_config.gcs_display_bucket.clone();
let start_epoch = self.restore_config.start_epoch;

let remote_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: Some(bucket),
google_service_account: Some(cred_path),
object_store_connection_limit: 200,
no_sign_request: false,
..Default::default()
};
let remote_store = remote_store_config.make().map_err(|e| {
IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
})?;
let path = Path::from(format!("display_{}.csv", start_epoch).as_str());
let bytes: bytes::Bytes = get(&remote_store, &path).await?;
self.store.restore_display(bytes).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
) -> Result<u64, IndexerError>;

async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>;
async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError>;
}
29 changes: 17 additions & 12 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;

use async_trait::async_trait;
use core::result::Result::Ok;
use csv::Writer;
use csv::{ReaderBuilder, Writer};
use diesel::dsl::{max, min};
use diesel::ExpressionMethods;
use diesel::OptionalExtension;
Expand Down Expand Up @@ -308,14 +308,14 @@ impl PgIndexerStore {

async fn persist_display_updates(
&self,
display_updates: BTreeMap<String, StoredDisplay>,
display_updates: Vec<StoredDisplay>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(display::table)
.values(display_updates.values().collect::<Vec<_>>())
.values(display_updates)
.on_conflict(display::object_type)
.do_update()
.set((
Expand Down Expand Up @@ -1846,8 +1846,8 @@ impl IndexerStore for PgIndexerStore {
if display_updates.is_empty() {
return Ok(());
}

self.persist_display_updates(display_updates).await
self.persist_display_updates(display_updates.values().cloned().collect::<Vec<_>>())
.await
}

async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
Expand Down Expand Up @@ -1979,26 +1979,21 @@ impl IndexerStore for PgIndexerStore {

async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

let mut connection = self.pool.get().await?;

let mut buffer = Cursor::new(Vec::new());
{
let mut writer = Writer::from_writer(&mut buffer);

let displays = display::table
.load::<StoredDisplay>(&mut connection)
.await
.map_err(Into::into)
.context("Failed to get display from database")?;

info!("Read {} displays", displays.len());
writer
.write_record(["object_type", "id", "version", "bcs"])
.map_err(|_| {
IndexerError::GcsError("Failed to write display to csv".to_string())
})?;

for display in displays {
writer
.write_record(&[
Expand All @@ -2009,7 +2004,6 @@ impl IndexerStore for PgIndexerStore {
])
.map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?;
}

writer
.flush()
.map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?;
Expand All @@ -2030,7 +2024,6 @@ impl IndexerStore for PgIndexerStore {
let remote_store = remote_store_config.make().map_err(|e| {
IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
})?;

let path = Path::from(format!("display_{}.csv", epoch_number).as_str());
put(&remote_store, &path, buffer.into_inner().into())
.await
Expand All @@ -2041,6 +2034,18 @@ impl IndexerStore for PgIndexerStore {
Ok(())
}

async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError> {
let cursor = Cursor::new(bytes);
let mut csv_reader = ReaderBuilder::new().has_headers(true).from_reader(cursor);
let displays = csv_reader
.deserialize()
.collect::<Result<Vec<StoredDisplay>, csv::Error>>()
.map_err(|e| {
IndexerError::GcsError(format!("Failed to deserialize display records: {}", e))
})?;
self.persist_display_updates(displays).await
}

async fn get_network_total_transactions_by_end_of_epoch(
&self,
epoch: u64,
Expand Down

0 comments on commit 5cf3b93

Please sign in to comment.