Skip to content

Commit

Permalink
indexer: upload display to GCS (#19196)
Browse files Browse the repository at this point in the history
## Description 

upload display table in csv to GCS for later restoring from there, as
it's now possible to restore from formal snapshot.

## Test plan 

local run and verify that files are on GCS


https://console.cloud.google.com/storage/browser/mysten-mainnet-display-table;tab=objects?forceOnBucketsSortingFiltering=true&hl=en&project=fullnode-snapshot-gcs&prefix=&forceOnObjectsSortingFiltering=false

![Screenshot 2024-09-03 at 12 42
29 PM](https://github.com/user-attachments/assets/d1921af2-688c-4c1b-adef-3117bf07cb03)

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [x] Indexer: need to add a cred before next deployment to enable
uploading.
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Sep 6, 2024
1 parent 6f9e7a0 commit c2b4168
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 10 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,43 @@ async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
bcs.workspace = true
bytes.workspace = true
chrono.workspace = true
serde_with.workspace = true
clap = { workspace = true, features = ["env"] }
tap.workspace = true
csv.workspace = true
diesel = { workspace = true, features = ["postgres", "chrono", "r2d2", "serde_json"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
bb8 = "0.8.5"
futures.workspace = true
hex.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
object_store.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
rayon.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing.workspace = true
url.workspace = true

fastcrypto = { workspace = true, features = ["copy_key"] }
mysten-metrics.workspace = true
sui-config.workspace = true
sui-data-ingestion-core.workspace = true
sui-json.workspace = true
sui-json-rpc.workspace = true
sui-json-rpc-api.workspace = true
sui-json-rpc-types.workspace = true
sui-open-rpc.workspace = true
sui-sdk.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-package-resolver.workspace = true
sui-protocol-config.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ pub enum Command {
snapshot_config: SnapshotLagConfig,
#[command(flatten)]
pruning_options: PruningOptions,
#[command(flatten)]
restore_config: RestoreConfig,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
Expand Down Expand Up @@ -198,6 +200,14 @@ impl Default for SnapshotLagConfig {
}
}

#[derive(Args, Debug, Clone, Default)]
pub struct RestoreConfig {
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: Option<String>,
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub enum IndexerError {
#[error("Indexer generic error: `{0}`")]
GenericError(String),

#[error("GCS error: `{0}`")]
GcsError(String),

#[error("Indexer failed to resolve object to move struct with error: `{0}`")]
ResolveMoveStructError(String),

Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,20 @@ where
let epoch = checkpoint.epoch.clone();
batch.push(checkpoint);
next_checkpoint_sequence_number += 1;
let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch);
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(&state, batch, epoch, &metrics, &commit_notifier).await;
batch = vec![];
}
if let Some(epoch_number) = epoch_number_option {
state.upload_display(epoch_number).await.tap_err(|e| {
error!(
"Failed to upload display table before epoch {} with error: {}",
epoch_number,
e.to_string()
);
})?;
}
}
if !batch.is_empty() && unprocessed.is_empty() {
commit_checkpoints(&state, batch, None, &metrics, &commit_notifier).await;
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ async fn main() -> anyhow::Result<()> {
ingestion_config,
snapshot_config,
pruning_options,
restore_config,
} => {
// Make sure to run all migrations on startup, and also serve as a compatibility check.
run_migrations(&mut get_pool_connection(&connection_pool)?).await?;

let store = PgIndexerStore::new(connection_pool, pool, indexer_metrics.clone());
let store = PgIndexerStore::new(
connection_pool,
pool,
restore_config,
indexer_metrics.clone(),
);

Indexer::start_writer_with_config(
&ingestion_config,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
&self,
epoch: u64,
) -> Result<u64, IndexerError>;

async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>;
}
75 changes: 72 additions & 3 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use core::result::Result::Ok;
use csv::Writer;
use diesel::dsl::{max, min};
use diesel::ExpressionMethods;
use diesel::OptionalExtension;
use diesel::{QueryDsl, RunQueryDsl};
use itertools::Itertools;
use object_store::path::Path;
use tap::TapFallible;
use tracing::info;
use tracing::{info, warn};

use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
use sui_protocol_config::ProtocolConfig;
use sui_storage::object_store::util::put;
use sui_types::base_types::ObjectID;

use crate::config::RestoreConfig;
use crate::database::ConnectionPool;
use crate::db::ConnectionPool as BlockingConnectionPool;
use crate::errors::{Context, IndexerError};
Expand Down Expand Up @@ -100,6 +105,8 @@ const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
pub struct PgIndexerStoreConfig {
pub parallel_chunk_size: usize,
pub parallel_objects_chunk_size: usize,
pub gcs_cred_path: Option<String>,
pub gcs_display_bucket: Option<String>,
}

#[derive(Clone)]
Expand All @@ -115,6 +122,7 @@ impl PgIndexerStore {
pub fn new(
blocking_cp: BlockingConnectionPool,
pool: ConnectionPool,
restore_config: RestoreConfig,
metrics: IndexerMetrics,
) -> Self {
let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
Expand All @@ -130,6 +138,8 @@ impl PgIndexerStore {
let config = PgIndexerStoreConfig {
parallel_chunk_size,
parallel_objects_chunk_size,
gcs_cred_path: restore_config.gcs_cred_path,
gcs_display_bucket: restore_config.gcs_display_bucket,
};

Self {
Expand Down Expand Up @@ -2074,6 +2084,65 @@ impl IndexerStore for PgIndexerStore {
Ok(())
}

async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> {
let mut buffer = Cursor::new(Vec::new());
{
let mut writer = Writer::from_writer(&mut buffer);

let displays = read_only_blocking!(&self.blocking_cp, |conn| {
display::table.load::<StoredDisplay>(conn)
})
.context("Failed to get display from database")?;

info!("Read {} displays", displays.len());
writer
.write_record(["object_type", "id", "version", "bcs"])
.map_err(|_| {
IndexerError::GcsError("Failed to write display to csv".to_string())
})?;

for display in displays {
writer
.write_record(&[
display.object_type,
hex::encode(display.id),
display.version.to_string(),
hex::encode(display.bcs),
])
.map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?;
}

writer
.flush()
.map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?;
}

if let (Some(cred_path), Some(bucket)) = (
self.config.gcs_cred_path.clone(),
self.config.gcs_display_bucket.clone(),
) {
let remote_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: Some(bucket),
google_service_account: Some(cred_path),
object_store_connection_limit: 200,
no_sign_request: false,
..Default::default()
};
let remote_store = remote_store_config.make().map_err(|e| {
IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
})?;

let path = Path::from(format!("display_{}.csv", epoch_number).as_str());
put(&remote_store, &path, buffer.into_inner().into())
.await
.map_err(|e| IndexerError::GcsError(format!("Failed to put to GCS: {}", e)))?;
} else {
warn!("Either GCS cred path or bucket is not set, skipping display upload.");
}
Ok(())
}

async fn get_network_total_transactions_by_end_of_epoch(
&self,
epoch: u64,
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use sui_json_rpc_types::SuiTransactionBlockResponse;

use crate::config::IngestionConfig;
use crate::config::PruningOptions;
use crate::config::RestoreConfig;
use crate::config::SnapshotLagConfig;
use crate::database::ConnectionPool;
use crate::db::{get_pool_connection, new_connection_pool, ConnectionPoolConfig};
Expand Down Expand Up @@ -101,7 +102,13 @@ pub async fn start_test_indexer_impl(
let pool = ConnectionPool::new(db_url.parse().unwrap(), pool_config)
.await
.unwrap();
let store = PgIndexerStore::new(blocking_pool.clone(), pool.clone(), indexer_metrics.clone());
let restore_config = RestoreConfig::default();
let store = PgIndexerStore::new(
blocking_pool.clone(),
pool.clone(),
restore_config,
indexer_metrics.clone(),
);

let handle = match reader_writer_config {
ReaderWriterConfig::Reader {
Expand Down

0 comments on commit c2b4168

Please sign in to comment.