Skip to content

Commit

Permalink
indexer formal restore 1/N: restore packages and move objects (#18886)
Browse files Browse the repository at this point in the history
## Description 

first of the stack PR to restore indexer from sui archives & formal
snapshot

## Test plan 

test locally with GCS buckets, local snapshot dir and local PG server

```
██████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ 75 out of 555 move object files restored (Restored 569395 live move objects and 0 wrapped or deleted objects from epoch_500/1_53.obj)2024-09-10T14:21:39.854508Z  INFO sui_indexer::restorer::formal_snapshot: Finished downloading move object file Path { raw: "epoch_500/1_125.obj" }
2024-09-10T14:21:44.111960Z  INFO sui_indexer::restorer::formal_snapshot: Start persisting 565556 move objects from epoch_500/1_125.obj
2024-09-10T14:22:12.142760Z  INFO sui_indexer::store::pg_indexer_store: Persisted 563126 objects snapshot elapsed=7058.20103075
2024-09-10T14:22:12.142830Z  INFO sui_indexer::restorer::formal_snapshot: Finished persisting 0 wrapped or deleted objects from epoch_500/1_86.obj
[10:46:20]
```

need to benchmark in the production env



---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Sep 19, 2024
1 parent 959bf75 commit f3825b1
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 97 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ anyhow.workspace = true
async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
bb8 = "0.8.5"
bcs.workspace = true
bytes.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["env"] }
csv.workspace = true
diesel = { workspace = true, features = ["chrono", "serde_json"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
bb8 = "0.8.5"
futures.workspace = true
hex.workspace = true
indicatif.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
object_store.workspace = true
Expand All @@ -41,13 +42,16 @@ url.workspace = true
fastcrypto = { workspace = true, features = ["copy_key"] }
mysten-metrics.workspace = true
sui-config.workspace = true
sui-archival.workspace = true
sui-core.workspace = true
sui-data-ingestion-core.workspace = true
sui-json.workspace = true
sui-json-rpc.workspace = true
sui-json-rpc-api.workspace = true
sui-json-rpc-types.workspace = true
sui-open-rpc.workspace = true
sui-sdk.workspace = true
sui-snapshot.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-package-resolver.workspace = true
Expand Down
30 changes: 27 additions & 3 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub enum Command {
#[command(flatten)]
pruning_options: PruningOptions,
#[command(flatten)]
restore_config: RestoreConfig,
upload_options: UploadOptions,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
Expand All @@ -201,6 +201,8 @@ pub enum Command {
#[command(flatten)]
backfill_config: SqlBackFillConfig,
},
/// Restore the database from formal snaphots.
Restore(RestoreConfig),
}

#[derive(Args, Default, Debug, Clone)]
Expand Down Expand Up @@ -240,11 +242,33 @@ impl Default for SnapshotLagConfig {
}

#[derive(Args, Debug, Clone, Default)]
pub struct RestoreConfig {
pub struct UploadOptions {
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: Option<String>,
}

#[derive(Args, Debug, Clone)]
pub struct RestoreConfig {
#[arg(long, env = "GCS_ARCHIVE_BUCKET")]
pub gcs_archive_bucket: String,
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: String,
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
pub gcs_display_bucket: String,
#[arg(long, env = "GCS_SNAPSHOT_DIR")]
pub gcs_snapshot_dir: String,
#[arg(long, env = "GCS_SNAPSHOT_BUCKET")]
pub gcs_snapshot_bucket: String,
#[arg(long, env = "START_EPOCH")]
pub start_epoch: u64,
#[arg(long, env = "S3_ENDPOINT")]
pub s3_endpoint: String,
#[arg(env = "OBJECT_STORE_CONCURRENT_LIMIT")]
pub object_store_concurrent_limit: usize,
#[arg(env = "OBJECT_STORE_MAX_TIMEOUT_SECS")]
pub object_store_max_timeout_secs: u64,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod indexer;
pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod restorer;
pub mod schema;
pub mod sql_backfill;
pub mod store;
Expand Down
24 changes: 17 additions & 7 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use sui_indexer::config::Command;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use sui_indexer::config::{Command, UploadOptions};
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::{check_db_migration_consistency, reset_database, run_migrations};
use sui_indexer::indexer::Indexer;
use sui_indexer::metrics::{
spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics,
};
use sui_indexer::restorer::formal_snapshot::IndexerFormalSnapshotRestorer;
use sui_indexer::sql_backfill::run_sql_backfill;
use sui_indexer::store::PgIndexerStore;
use tokio_util::sync::CancellationToken;
use tracing::warn;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -40,12 +41,11 @@ async fn main() -> anyhow::Result<()> {
ingestion_config,
snapshot_config,
pruning_options,
restore_config,
upload_options,
} => {
// Make sure to run all migrations on startup, and also serve as a compatibility check.
run_migrations(pool.dedicated_connection().await?).await?;

let store = PgIndexerStore::new(pool, restore_config, indexer_metrics.clone());
let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone());

Indexer::start_writer_with_config(
&ingestion_config,
Expand Down Expand Up @@ -91,6 +91,16 @@ async fn main() -> anyhow::Result<()> {
)
.await;
}
Command::Restore(restore_config) => {
let upload_options = UploadOptions {
gcs_display_bucket: Some(restore_config.gcs_display_bucket.clone()),
gcs_cred_path: Some(restore_config.gcs_snapshot_bucket.clone()),
};
let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone());
let mut formal_restorer =
IndexerFormalSnapshotRestorer::new(store, restore_config).await?;
formal_restorer.restore().await?;
}
}

Ok(())
Expand Down
43 changes: 43 additions & 0 deletions crates/sui-indexer/src/restorer/archives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::num::NonZeroUsize;

use prometheus::Registry;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

use crate::types::IndexerResult;

pub async fn read_next_checkpoint_after_epoch(
cred_path: String,
archive_bucket: Option<String>,
epoch: u64,
) -> IndexerResult<u64> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
google_service_account: Some(cred_path.clone()),
object_store_connection_limit: 50,
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(50).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(epoch);
info!(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
Ok(next_checkpoint_after_epoch)
}
Loading

0 comments on commit f3825b1

Please sign in to comment.