Skip to content

Commit

Permalink
[sui-tool] dump-packages uses GraphQL
Browse files Browse the repository at this point in the history
## Description

Replace the original implementation of the dump-packages
command (which requires access to an indexer database) with an
implementation that reads from a GraphQL service. The former is not
readily accessible, but the latter should be.

The new tool is also able to run incrementally: Fetching only packages
created before a certain checkpoint, or pick up where a previous
invocation took off to fetch new packages that were introduced since.

## Test plan

Ran a test invocation, on our experimental read replica. With a max
page size of 200, I was able to fetch 17000 packages (all the packages
at the time the read replica was created) in 3 minutes.
  • Loading branch information
amnn committed Jun 19, 2024
1 parent 95cf33d commit 1c4097f
Show file tree
Hide file tree
Showing 11 changed files with 862 additions and 173 deletions.
464 changes: 427 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ members = [
"crates/sui-open-rpc",
"crates/sui-open-rpc-macros",
"crates/sui-oracle",
"crates/sui-package-dump",
"crates/sui-package-management",
"crates/sui-package-resolver",
"crates/sui-proc-macros",
Expand Down Expand Up @@ -304,6 +305,9 @@ criterion = { version = "0.5.0", features = [
] }
crossterm = "0.25.0"
csv = "1.2.1"
# Pin to 3.4.3 as ^3.5.0 requires reqwest ^0.12
cynic = { version = "= 3.4.3", features = ["http-reqwest"] }
cynic-codegen = "= 3.4.3"
dashmap = "5.5.3"
# datatest-stable = "0.1.2"
datatest-stable = { git = "https://github.com/nextest-rs/datatest-stable.git", rev = "72db7f6d1bbe36a5407e96b9488a581f763e106f" }
Expand Down Expand Up @@ -622,6 +626,7 @@ sui-network = { path = "crates/sui-network" }
sui-node = { path = "crates/sui-node" }
sui-open-rpc = { path = "crates/sui-open-rpc" }
sui-open-rpc-macros = { path = "crates/sui-open-rpc-macros" }
sui-package-dump = { path = "crates/sui-package-dump" }
sui-package-management = { path = "crates/sui-package-management" }
sui-package-resolver = { path = "crates/sui-package-resolver" }
sui-proc-macros = { path = "crates/sui-proc-macros" }
Expand Down
22 changes: 22 additions & 0 deletions crates/sui-package-dump/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "sui-package-dump"
version.workspace = true
authors = ["Mysten Labs <build@mystenlabs.com"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
anyhow.workspace = true
bcs.workspace = true
cynic.workspace = true
fastcrypto.workspace = true
move-core-types.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sui-types.workspace = true
tracing.workspace = true

[build-dependencies]
cynic-codegen.workspace = true
10 changes: 10 additions & 0 deletions crates/sui-package-dump/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

fn main() {
cynic_codegen::register_schema("sui")
.from_sdl_file("../sui-graphql-rpc/schema.graphql")
.unwrap()
.as_default()
.unwrap();
}
39 changes: 39 additions & 0 deletions crates/sui-package-dump/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Context, Result};
use cynic::{http::ReqwestExt, Operation, QueryBuilder};
use reqwest::IntoUrl;
use serde::{de::DeserializeOwned, Serialize};

pub(crate) struct Client {
inner: reqwest::Client,
url: reqwest::Url,
}

impl Client {
/// Create a new GraphQL client, talking to a Sui GraphQL service at `url`.
pub(crate) fn new(url: impl IntoUrl) -> Result<Self> {
Ok(Self {
inner: reqwest::Client::builder()
.user_agent(concat!("sui-package-dump/", env!("CARGO_PKG_VERSION")))
.build()
.context("Failed to create GraphQL client")?,
url: url.into_url().context("Invalid RPC URL")?,
})
}

pub(crate) async fn query<Q, V>(&self, query: Operation<Q, V>) -> Result<Q>
where
V: Serialize,
Q: DeserializeOwned + QueryBuilder<V> + 'static,
{
self.inner
.post(self.url.clone())
.run_graphql(query)
.await
.context("Failed to send GraphQL query")?
.data
.ok_or_else(|| anyhow!("Empty response to query"))
}
}
240 changes: 240 additions & 0 deletions crates/sui-package-dump/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
};

use anyhow::{bail, ensure, Context, Result};
use client::Client;
use fastcrypto::encoding::{Base64, Encoding};
use query::{limits, packages};
use sui_types::object::Object;
use tracing::info;

mod client;
mod query;

/// Ensure all packages created before `before_checkpoint` are written to the `output_dir`ectory,
/// from the GraphQL service at `rpc_url`.
///
/// `output_dir` can be a path to a non-existent directory, in which case it will be created, or an
/// existing empty directory (in which case it will be filled), or an existing directory that has
/// been written to in the past (in which case this invocation will pick back up from where the
/// previous invocation left off).
pub async fn dump(
rpc_url: String,
output_dir: PathBuf,
before_checkpoint: Option<i32>,
) -> Result<()> {
ensure_output_directory(&output_dir)?;

let client = Client::new(rpc_url)?;
let after_checkpoint = read_last_checkpoint(&output_dir)?;
let limit = max_page_size(&client).await?;
let (last_checkpoint, packages) =
fetch_packages(&client, limit, after_checkpoint, before_checkpoint).await?;

for package in &packages {
let packages::SuiAddress(address) = &package.address;
dump_package(&output_dir, package)
.with_context(|| format!("Failed to dump package {address}"))?;
}

if let Some(last_checkpoint) = last_checkpoint {
write_last_checkpoint(&output_dir, last_checkpoint)?;
}

Ok(())
}

/// Ensure the output directory exists, either because it already exists as a writable directory, or
/// by creating a new directory.
fn ensure_output_directory(path: impl Into<PathBuf>) -> Result<()> {
let path: PathBuf = path.into();
if !path.exists() {
fs::create_dir_all(&path).context("Making output directory")?;
return Ok(());
}

ensure!(
path.is_dir(),
"Output path is not a directory: {}",
path.display()
);

let metadata = fs::metadata(&path).context("Getting metadata for output path")?;

ensure!(
!metadata.permissions().readonly(),
"Output directory is not writable: {}",
path.display()
);

Ok(())
}

/// Load the last checkpoint that was loaded by a previous run of the tool, if there is a previous
/// run.
fn read_last_checkpoint(output: &Path) -> Result<Option<i32>> {
let path = output.join("last-checkpoint");
if !path.exists() {
return Ok(None);
}

let content = fs::read_to_string(&path).context("Failed to read last checkpoint")?;
let checkpoint: i32 =
serde_json::from_str(&content).context("Failed to parse last checkpoint")?;

info!("Resuming download after checkpoint {checkpoint}");

Ok(Some(checkpoint))
}

/// Write the max checkpoint that we have seen a package from back to the output directory.
fn write_last_checkpoint(output: &Path, checkpoint: i32) -> Result<()> {
let path = output.join("last-checkpoint");
let content =
serde_json::to_string(&checkpoint).context("Failed to serialize last checkpoint")?;

fs::write(path, content).context("Failed to write last checkpoint")?;
Ok(())
}

/// Read the max page size supported by the GraphQL service.
async fn max_page_size(client: &Client) -> Result<i32> {
Ok(client
.query(limits::build())
.await
.context("Failed to fetch max page size")?
.service_config
.max_page_size)
}

/// Read all the packages between `after_checkpoint` and `before_checkpoint`, in batches of
/// `page_size` from the `client` connected to a GraphQL service.
///
/// If `after_checkpoint` is not provided, packages will be read from genesis. If
/// `before_checkpoint` is not provided, packages will be read until the latest checkpoint.
///
/// Returns the latest checkpoint that was read from in this fetch, and a list of all the packages
/// that were read.
async fn fetch_packages(
client: &Client,
page_size: i32,
after_checkpoint: Option<i32>,
before_checkpoint: Option<i32>,
) -> Result<(Option<i32>, Vec<packages::MovePackage>)> {
let packages::Query {
checkpoint: checkpoint_viewed_at,
packages:
packages::MovePackageConnection {
mut page_info,
mut nodes,
},
} = client
.query(packages::build(
page_size,
None,
after_checkpoint,
before_checkpoint,
))
.await
.with_context(|| "Failed to fetch page 1 of packages.")?;

for i in 2.. {
if !page_info.has_next_page {
break;
}

let packages = client
.query(packages::build(
page_size,
page_info.end_cursor,
after_checkpoint,
before_checkpoint,
))
.await
.with_context(|| format!("Failed to fetch page {i} of packages."))?
.packages;

nodes.extend(packages.nodes);
page_info = packages.page_info;

info!(
"Fetched page {i} ({} package{} so far).",
nodes.len(),
if nodes.len() == 1 { "" } else { "s" },
);
}

use packages::Checkpoint as C;
let last_checkpoint = match (checkpoint_viewed_at, before_checkpoint) {
(Some(C { sequence_number: v }), Some(b)) if b > 0 => Some(v.min(b - 1)),
(Some(C { sequence_number: c }), _) | (_, Some(c)) => Some(c),
_ => None,
};

Ok((last_checkpoint, nodes))
}

/// Write out `pkg` to the `output_dir`ectory, using the package's address and name as the directory
/// name. The following files are written for each directory:
///
/// - `object.bcs` -- the BCS serialized form of the `Object` type containing the package.
///
/// - `linkage.json` -- a JSON serialization of the package's linkage table, mapping dependency
/// original IDs to the version of the dependency being depended on and the ID of the object
/// on-chain that contains that version.
///
/// - `origins.json` -- a JSON serialize of the type origin table, mapping type names contained in
/// this package to the version of the package that first introduced that type.
///
/// - `*.mv` -- a BCS serialization of each compiled module in the package.
fn dump_package(output_dir: &Path, pkg: &packages::MovePackage) -> Result<()> {
let Some(packages::Base64(bcs)) = &pkg.bcs else {
bail!("Missing BCS");
};

let bytes = Base64::decode(bcs).context("Failed to decode BCS")?;

let object = bcs::from_bytes::<Object>(&bytes).context("Failed to deserialize")?;
let id = object.id();
let Some(package) = object.data.try_as_package() else {
bail!("Not a package");
};

let origins: BTreeMap<_, _> = package
.type_origin_table()
.iter()
.map(|o| {
(
format!("{}::{}", o.module_name, o.struct_name),
o.package.to_string(),
)
})
.collect();

let package_dir = output_dir.join(format!("{}.{}", id, package.version().value()));
fs::create_dir(&package_dir).context("Failed to make output directory")?;

let linkage_json = serde_json::to_string_pretty(package.linkage_table())
.context("Failed to serialize linkage")?;
let origins_json =
serde_json::to_string_pretty(&origins).context("Failed to serialize type origins")?;

fs::write(package_dir.join("object.bcs"), bytes).context("Failed to write object BCS")?;
fs::write(package_dir.join("linkage.json"), linkage_json).context("Failed to write linkage")?;
fs::write(package_dir.join("origins.json"), origins_json)
.context("Failed to write type origins")?;

for (module_name, module_bytes) in package.serialized_module_map() {
let module_path = package_dir.join(format!("{module_name}.mv"));
fs::write(module_path, module_bytes)
.with_context(|| format!("Failed to write module: {module_name}"))?
}

Ok(())
}
Loading

0 comments on commit 1c4097f

Please sign in to comment.