Skip to content

Commit

Permalink
indexer: gcs upload
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Sep 4, 2024
1 parent aa9dd42 commit 92f4990
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 11 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,43 @@ async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
bcs.workspace = true
bytes.workspace = true
chrono.workspace = true
serde_with.workspace = true
clap = { workspace = true, features = ["env"] }
tap.workspace = true
csv.workspace = true
diesel = { workspace = true, features = ["postgres", "chrono", "r2d2", "serde_json"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
bb8 = "0.8.5"
futures.workspace = true
hex.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
object_store.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
rayon.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing.workspace = true
url.workspace = true

fastcrypto = { workspace = true, features = ["copy_key"] }
mysten-metrics.workspace = true
sui-config.workspace = true
sui-data-ingestion-core.workspace = true
sui-json.workspace = true
sui-json-rpc.workspace = true
sui-json-rpc-api.workspace = true
sui-json-rpc-types.workspace = true
sui-open-rpc.workspace = true
sui-sdk.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-package-resolver.workspace = true
sui-protocol-config.workspace = true
Expand Down
19 changes: 19 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ pub enum Command {
snapshot_config: SnapshotLagConfig,
#[command(flatten)]
pruning_options: PruningOptions,
#[command(flatten)]
restore_config: RestoreConfig,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
Expand Down Expand Up @@ -205,6 +207,23 @@ impl Default for SnapshotLagConfig {
}
}

#[derive(Args, Debug, Clone)]
pub struct RestoreConfig {
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: Option<String>,
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
}

impl Default for RestoreConfig {
fn default() -> Self {
RestoreConfig {
gcs_cred_path: None,
gcs_display_bucket: None,
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub enum IndexerError {
#[error("Indexer generic error: `{0}`")]
GenericError(String),

#[error("GCS error: `{0}`")]
GcsError(String),

#[error("Indexer failed to resolve object to move struct with error: `{0}`")]
ResolveMoveStructError(String),

Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,20 @@ where
let epoch = checkpoint.epoch.clone();
batch.push(checkpoint);
next_checkpoint_sequence_number += 1;
let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch);
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(&state, batch, epoch, &metrics, &commit_notifier).await;
batch = vec![];
}
if let Some(epoch_number) = epoch_number_option {
state.upload_display(epoch_number).await.tap_err(|e| {
error!(
"Failed to upload display table before epoch {} with error: {}",
epoch_number,
e.to_string()
);
})?;
}
}
if !batch.is_empty() && unprocessed.is_empty() {
commit_checkpoints(&state, batch, None, &metrics, &commit_notifier).await;
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ async fn main() -> Result<(), IndexerError> {
ingestion_config,
snapshot_config,
pruning_options,
restore_config,
} => {
let store = PgIndexerStore::new(connection_pool, indexer_metrics.clone());
let store =
PgIndexerStore::new(connection_pool, restore_config, indexer_metrics.clone());
Indexer::start_writer_with_config(
&ingestion_config,
store,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,6 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
&self,
epoch: u64,
) -> Result<u64, IndexerError>;

async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>;
}
80 changes: 76 additions & 4 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use core::result::Result::Ok;
use csv::Writer;
use diesel::dsl::{max, min};
use diesel::ExpressionMethods;
use diesel::OptionalExtension;
use diesel::{QueryDsl, RunQueryDsl};
use itertools::Itertools;
use object_store::path::Path;
use tap::TapFallible;
use tracing::info;
use tracing::{info, warn};

use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
use sui_protocol_config::ProtocolConfig;
use sui_storage::object_store::util::put;
use sui_types::base_types::ObjectID;

use crate::config::RestoreConfig;
use crate::db::ConnectionPool;
use crate::errors::{Context, IndexerError};
use crate::handlers::EpochToCommit;
Expand Down Expand Up @@ -131,6 +136,8 @@ SET object_version = EXCLUDED.object_version,
pub struct PgIndexerStoreConfig {
pub parallel_chunk_size: usize,
pub parallel_objects_chunk_size: usize,
pub gcs_cred_path: Option<String>,
pub gcs_display_bucket: Option<String>,
}

pub struct PgIndexerStore {
Expand All @@ -152,7 +159,11 @@ impl Clone for PgIndexerStore {
}

impl PgIndexerStore {
pub fn new(blocking_cp: ConnectionPool, metrics: IndexerMetrics) -> Self {
pub fn new(
blocking_cp: ConnectionPool,
restore_config: RestoreConfig,
metrics: IndexerMetrics,
) -> Self {
let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
.unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
.parse::<usize>()
Expand All @@ -166,6 +177,8 @@ impl PgIndexerStore {
let config = PgIndexerStoreConfig {
parallel_chunk_size,
parallel_objects_chunk_size,
gcs_cred_path: restore_config.gcs_cred_path,
gcs_display_bucket: restore_config.gcs_display_bucket,
};

Self {
Expand Down Expand Up @@ -2164,6 +2177,65 @@ impl IndexerStore for PgIndexerStore {
Ok(())
}

async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> {
let mut buffer = Cursor::new(Vec::new());
{
let mut writer = Writer::from_writer(&mut buffer);

let displays = read_only_blocking!(&self.blocking_cp, |conn| {
display::table.load::<StoredDisplay>(conn)
})
.context("Failed to get display from database")?;

info!("Read {} displays", displays.len());
writer
.write_record(&["object_type", "id", "version", "bcs"])
.map_err(|_| {
IndexerError::GcsError("Failed to write display to csv".to_string())
})?;

for display in displays {
writer
.write_record(&[
display.object_type,
hex::encode(display.id),
display.version.to_string(),
hex::encode(display.bcs),
])
.map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?;
}

writer
.flush()
.map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?;
}

if let (Some(cred_path), Some(bucket)) = (
self.config.gcs_cred_path.clone(),
self.config.gcs_display_bucket.clone(),
) {
let remote_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: Some(bucket),
google_service_account: Some(cred_path),
object_store_connection_limit: 200,
no_sign_request: false,
..Default::default()
};
let remote_store = remote_store_config.make().map_err(|e| {
IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
})?;

let path = Path::from(format!("display_{}.csv", epoch_number).as_str());
put(&remote_store, &path, buffer.into_inner().into())
.await
.map_err(|e| IndexerError::GcsError(format!("Failed to put to GCS: {}", e)))?;
} else {
warn!("Either GCS cred path or bucket is not set, skipping display upload.");
}
Ok(())
}

async fn get_network_total_transactions_by_end_of_epoch(
&self,
epoch: u64,
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sui_json_rpc_types::SuiTransactionBlockResponse;

use crate::config::IngestionConfig;
use crate::config::PruningOptions;
use crate::config::RestoreConfig;
use crate::config::SnapshotLagConfig;
use crate::db::{new_connection_pool, ConnectionPoolConfig};
use crate::errors::IndexerError;
Expand Down Expand Up @@ -132,7 +133,12 @@ pub async fn start_test_indexer_impl(
}

let blocking_pool = new_connection_pool(&parsed_url, &pool_config).unwrap();
let store = PgIndexerStore::new(blocking_pool.clone(), indexer_metrics.clone());
let restore_config = RestoreConfig::default();
let store = PgIndexerStore::new(
blocking_pool.clone(),
restore_config,
indexer_metrics.clone(),
);

let handle = match reader_writer_config {
ReaderWriterConfig::Reader {
Expand Down

0 comments on commit 92f4990

Please sign in to comment.