diff --git a/.dockerignore b/.dockerignore index a6274ffa46194..7777cad9ecf3f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -25,7 +25,7 @@ !aptos-move/framework/ !aptos-move/move-examples/hello_blockchain/ !crates/aptos/src/move_tool/*.bpl -!crates/aptos/src/node/local_testnet/hasura_metadata.json +!crates/aptos-localnet/src/hasura_metadata.json !crates/aptos-faucet/doc/ !crates/transaction-emitter-lib/src/emitter/test_proofs_for_localnet_txn_emitter.txt !api/doc/ diff --git a/Cargo.lock b/Cargo.lock index f2757456dbe81..9c5d1f9def8f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "aptos-indexer-grpc-utils", "aptos-keygen", "aptos-ledger", + "aptos-localnet", "aptos-logger", "aptos-move-debugger", "aptos-network-checker", @@ -313,6 +314,7 @@ dependencies = [ "aptos-vm-genesis", "aptos-vm-logging", "aptos-vm-types", + "aptos-workspace-server", "async-trait", "base64 0.13.1", "bcs 0.1.4", @@ -369,7 +371,6 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.18", "url", - "version-compare", ] [[package]] @@ -2721,6 +2722,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "aptos-localnet" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-indexer-grpc-utils", + "aptos-protos 1.3.1", + "aptos-rest-client", + "bollard", + "diesel", + "diesel-async", + "dirs 5.0.1", + "futures", + "processor", + "reqwest 0.11.23", + "serde", + "serde_json", + "tokio", + "tonic 0.11.0", + "tracing", + "version-compare", +] + [[package]] name = "aptos-log-derive" version = "0.1.0" @@ -4713,10 +4737,10 @@ name = "aptos-workspace-server" version = "0.1.0" dependencies = [ "anyhow", - "aptos", "aptos-cached-packages", "aptos-config", "aptos-faucet-core", + "aptos-localnet", "aptos-node", "aptos-types", "bollard", diff --git a/Cargo.toml b/Cargo.toml index bca2044043628..85473f6bb3de5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ members = [ "crates/aptos-jwk-consensus", "crates/aptos-keygen", "crates/aptos-ledger", + "crates/aptos-localnet", "crates/aptos-log-derive", "crates/aptos-logger", "crates/aptos-metrics-core", @@ -380,6 +381,7 @@ aptos-jwk-utils = { path = "crates/jwk-utils" } aptos-keygen = { path = "crates/aptos-keygen" } aptos-language-e2e-tests = { path = "aptos-move/e2e-tests" } aptos-ledger = { path = "crates/aptos-ledger" } +aptos-localnet = { path = "crates/aptos-localnet" } aptos-log-derive = { path = "crates/aptos-log-derive" } aptos-logger = { path = "crates/aptos-logger" } aptos-memory-usage-tracker = { path = "aptos-move/aptos-memory-usage-tracker" } @@ -466,6 +468,7 @@ aptos-vm-logging = { path = "aptos-move/aptos-vm-logging" } aptos-vm-genesis = { path = "aptos-move/vm-genesis" } aptos-vm-types = { path = "aptos-move/aptos-vm-types" } aptos-vm-validator = { path = "vm-validator" } +aptos-workspace-server = { path = "aptos-move/aptos-workspace-server" } aptos-warp-webserver = { path = "crates/aptos-warp-webserver" } aptos-cargo-cli = { path = "devtools/aptos-cargo-cli" } diff --git a/aptos-move/aptos-workspace-server/Cargo.toml b/aptos-move/aptos-workspace-server/Cargo.toml index e72f59d6b7d2a..505a4e8620c5b 100644 --- a/aptos-move/aptos-workspace-server/Cargo.toml +++ b/aptos-move/aptos-workspace-server/Cargo.toml @@ -13,10 +13,10 @@ rust-version = { workspace = true } [dependencies] # aptos deps -aptos = { workspace = true } aptos-cached-packages = { workspace = true } aptos-config = { workspace = true } aptos-faucet-core = { workspace = true } +aptos-localnet = { workspace = true } aptos-node = { workspace = true } aptos-types = { workspace = true } diff --git a/aptos-move/aptos-workspace-server/src/lib.rs b/aptos-move/aptos-workspace-server/src/lib.rs new file mode 100644 index 0000000000000..f27102d335ea0 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/lib.rs @@ -0,0 +1,183 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! This library runs and manages a set of services that makes up a local Aptos network. +//! - node +//! - node API +//! - indexer grpc +//! - faucet +//! - indexer +//! - postgres db +//! - processors +//! - indexer API +//! +//! The services are bound to unique OS-assigned ports to allow for multiple local networks +//! to operate simultaneously, enabling testing and development in isolated environments. +//! +//! ## Key Features: +//! - Shared Futures +//! - The code makes extensive use of shared futures across multiple services, +//! ensuring orderly startup while maximizing parallel execution. +//! - Graceful Shutdown +//! - When a `Ctrl-C` signal is received or if any of the services fail to start +//! or exit unexpectedly, the system attempts to gracefully shut down all services, +//! cleaning up resources like Docker containers, volumes and networks. + +mod common; +mod services; + +use anyhow::{Context, Result}; +use common::make_shared; +use futures::TryFutureExt; +use services::{ + docker_common::create_docker_network, indexer_api::start_indexer_api, + processors::start_all_processors, +}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +pub async fn run_all_services() -> Result<()> { + let test_dir = tempfile::tempdir()?; + let test_dir = test_dir.path(); + println!("Created test directory: {}", test_dir.display()); + + let instance_id = Uuid::new_v4(); + + // Phase 0: Register the signal handler for ctrl-c. + let shutdown = CancellationToken::new(); + { + // TODO: Find a way to register the signal handler in a blocking manner without + // waiting for it to trigger. + let shutdown = shutdown.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + + println!("\nCtrl-C received. Shutting down services. This may take a while.\n"); + + shutdown.cancel(); + }); + } + + // Phase 1: Start all services. + // Node + let (fut_node_api, fut_indexer_grpc, fut_node_finish) = services::node::start_node(test_dir)?; + + let fut_node_api = make_shared(fut_node_api); + let fut_indexer_grpc = make_shared(fut_indexer_grpc); + + // Faucet + let (fut_faucet, fut_faucet_finish) = services::faucet::start_faucet( + test_dir.to_owned(), + fut_node_api.clone(), + fut_indexer_grpc.clone(), + ); + + // Docker Network + let docker_network_name = format!("aptos-workspace-{}", instance_id); + let (fut_docker_network, fut_docker_network_clean_up) = + create_docker_network(shutdown.clone(), docker_network_name); + + // Indexer part 1: postgres db + let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) = + services::postgres::start_postgres( + shutdown.clone(), + fut_docker_network.clone(), + instance_id, + ); + let fut_postgres = make_shared(fut_postgres); + + // Indexer part 2: processors + let (fut_all_processors_ready, fut_any_processor_finish) = start_all_processors( + fut_node_api.clone(), + fut_indexer_grpc.clone(), + fut_postgres.clone(), + ); + let fut_all_processors_ready = make_shared(fut_all_processors_ready); + + // Indexer part 3: indexer API + let (fut_indexer_api, fut_indexer_api_finish, fut_indexer_api_clean_up) = start_indexer_api( + instance_id, + shutdown.clone(), + fut_docker_network.clone(), + fut_postgres.clone(), + fut_all_processors_ready.clone(), + ); + + // Phase 2: Wait for all services to be up. + let all_services_up = async move { + tokio::try_join!( + fut_node_api.map_err(anyhow::Error::msg), + fut_indexer_grpc.map_err(anyhow::Error::msg), + fut_faucet, + fut_postgres.map_err(anyhow::Error::msg), + fut_all_processors_ready.map_err(anyhow::Error::msg), + fut_indexer_api, + ) + }; + let clean_up_all = async move { + eprintln!("Running shutdown steps"); + fut_indexer_api_clean_up.await; + fut_postgres_clean_up.await; + fut_docker_network_clean_up.await; + }; + tokio::select! { + _ = shutdown.cancelled() => { + clean_up_all.await; + + return Ok(()) + } + res = all_services_up => { + match res.context("one or more services failed to start") { + Ok(_) => println!("ALL SERVICES UP"), + Err(err) => { + eprintln!("\nOne or more services failed to start, will run shutdown steps\n"); + clean_up_all.await; + + return Err(err) + } + } + } + } + + // Phase 3: Wait for services to stop, which should only happen in case of an error, or + // the shutdown signal to be received. + tokio::select! { + _ = shutdown.cancelled() => (), + res = fut_node_finish => { + eprintln!("Node exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_faucet_finish => { + eprintln!("Faucet exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_postgres_finish => { + eprintln!("Postgres exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_any_processor_finish => { + eprintln!("One of the processors exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_indexer_api_finish => { + eprintln!("Indexer API exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + } + + clean_up_all.await; + + println!("Finished running all services"); + + Ok(()) +} diff --git a/aptos-move/aptos-workspace-server/src/main.rs b/aptos-move/aptos-workspace-server/src/main.rs index 34a66ea96f942..a142293ef5ae1 100644 --- a/aptos-move/aptos-workspace-server/src/main.rs +++ b/aptos-move/aptos-workspace-server/src/main.rs @@ -1,191 +1,11 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -//! This binary runs and manages a set of services that makes up a local Aptos network. -//! - node -//! - node API -//! - indexer grpc -//! - faucet -//! - indexer -//! - postgres db -//! - processors -//! - indexer API -//! -//! The services are bound to unique OS-assigned ports to allow for multiple local networks -//! to operate simultaneously, enabling testing and development in isolated environments. -//! -//! ## Key Features: -//! - Shared Futures -//! - The code makes extensive use of shared futures across multiple services, -//! ensuring orderly startup while maximizing parallel execution. -//! - Graceful Shutdown -//! - When a `Ctrl-C` signal is received or if any of the services fail to start -//! or exit unexpectedly, the system attempts to gracefully shut down all services, -//! cleaning up resources like Docker containers, volumes and networks. - -mod common; -mod services; - -use anyhow::{Context, Result}; -use common::make_shared; -use futures::TryFutureExt; -use services::{ - docker_common::create_docker_network, indexer_api::start_indexer_api, - processors::start_all_processors, -}; -use std::path::Path; -use tokio_util::sync::CancellationToken; -use uuid::Uuid; - -async fn run_all_services(test_dir: &Path) -> Result<()> { - let instance_id = Uuid::new_v4(); - - // Phase 0: Register the signal handler for ctrl-c. - let shutdown = CancellationToken::new(); - { - // TODO: Find a way to register the signal handler in a blocking manner without - // waiting for it to trigger. - let shutdown = shutdown.clone(); - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - - println!("\nCtrl-C received. Shutting down services. This may take a while.\n"); - - shutdown.cancel(); - }); - } - - // Phase 1: Start all services. - // Node - let (fut_node_api, fut_indexer_grpc, fut_node_finish) = services::node::start_node(test_dir)?; - - let fut_node_api = make_shared(fut_node_api); - let fut_indexer_grpc = make_shared(fut_indexer_grpc); - - // Faucet - let (fut_faucet, fut_faucet_finish) = services::faucet::start_faucet( - test_dir.to_owned(), - fut_node_api.clone(), - fut_indexer_grpc.clone(), - ); - - // Docker Network - let docker_network_name = format!("aptos-workspace-{}", instance_id); - let (fut_docker_network, fut_docker_network_clean_up) = - create_docker_network(shutdown.clone(), docker_network_name); - - // Indexer part 1: postgres db - let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) = - services::postgres::start_postgres( - shutdown.clone(), - fut_docker_network.clone(), - instance_id, - ); - let fut_postgres = make_shared(fut_postgres); - - // Indexer part 2: processors - let (fut_all_processors_ready, fut_any_processor_finish) = start_all_processors( - fut_node_api.clone(), - fut_indexer_grpc.clone(), - fut_postgres.clone(), - ); - let fut_all_processors_ready = make_shared(fut_all_processors_ready); - - // Indexer part 3: indexer API - let (fut_indexer_api, fut_indexer_api_finish, fut_indexer_api_clean_up) = start_indexer_api( - instance_id, - shutdown.clone(), - fut_docker_network.clone(), - fut_postgres.clone(), - fut_all_processors_ready.clone(), - ); - - // Phase 2: Wait for all services to be up. - let all_services_up = async move { - tokio::try_join!( - fut_node_api.map_err(anyhow::Error::msg), - fut_indexer_grpc.map_err(anyhow::Error::msg), - fut_faucet, - fut_postgres.map_err(anyhow::Error::msg), - fut_all_processors_ready.map_err(anyhow::Error::msg), - fut_indexer_api, - ) - }; - let clean_up_all = async move { - eprintln!("Running shutdown steps"); - fut_indexer_api_clean_up.await; - fut_postgres_clean_up.await; - fut_docker_network_clean_up.await; - }; - tokio::select! { - _ = shutdown.cancelled() => { - clean_up_all.await; - - return Ok(()) - } - res = all_services_up => { - match res.context("one or more services failed to start") { - Ok(_) => println!("ALL SERVICES UP"), - Err(err) => { - eprintln!("\nOne or more services failed to start, will run shutdown steps\n"); - clean_up_all.await; - - return Err(err) - } - } - } - } - - // Phase 3: Wait for services to stop, which should only happen in case of an error, or - // the shutdown signal to be received. - tokio::select! { - _ = shutdown.cancelled() => (), - res = fut_node_finish => { - eprintln!("Node exited unexpectedly"); - if let Err(err) = res { - eprintln!("Error: {}", err); - } - } - res = fut_faucet_finish => { - eprintln!("Faucet exited unexpectedly"); - if let Err(err) = res { - eprintln!("Error: {}", err); - } - } - res = fut_postgres_finish => { - eprintln!("Postgres exited unexpectedly"); - if let Err(err) = res { - eprintln!("Error: {}", err); - } - } - res = fut_any_processor_finish => { - eprintln!("One of the processors exited unexpectedly"); - if let Err(err) = res { - eprintln!("Error: {}", err); - } - } - res = fut_indexer_api_finish => { - eprintln!("Indexer API exited unexpectedly"); - if let Err(err) = res { - eprintln!("Error: {}", err); - } - } - } - - clean_up_all.await; - - Ok(()) -} +use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { - let test_dir = tempfile::tempdir()?; - - println!("Test directory: {}", test_dir.path().display()); - - run_all_services(test_dir.path()).await?; - - println!("Finished running all services"); + aptos_workspace_server::run_all_services().await?; Ok(()) } diff --git a/aptos-move/aptos-workspace-server/src/services/docker_common.rs b/aptos-move/aptos-workspace-server/src/services/docker_common.rs index 7315a0fd28391..c49da313502fe 100644 --- a/aptos-move/aptos-workspace-server/src/services/docker_common.rs +++ b/aptos-move/aptos-workspace-server/src/services/docker_common.rs @@ -3,7 +3,7 @@ use crate::common::make_shared; use anyhow::{anyhow, bail, Context, Result}; -use aptos::node::local_testnet::docker; +use aptos_localnet::docker; use bollard::{ container::{CreateContainerOptions, InspectContainerOptions, StartContainerOptions}, network::CreateNetworkOptions, diff --git a/aptos-move/aptos-workspace-server/src/services/faucet.rs b/aptos-move/aptos-workspace-server/src/services/faucet.rs index d44c8a7d4bd71..ed9ea036e68b9 100644 --- a/aptos-move/aptos-workspace-server/src/services/faucet.rs +++ b/aptos-move/aptos-workspace-server/src/services/faucet.rs @@ -3,8 +3,8 @@ use crate::common::IP_LOCAL_HOST; use anyhow::{anyhow, Context, Result}; -use aptos::node::local_testnet::HealthChecker; use aptos_faucet_core::server::{FunderKeyEnum, RunConfig}; +use aptos_localnet::health_checker::HealthChecker; use futures::channel::oneshot; use std::{future::Future, path::PathBuf, sync::Arc}; use url::Url; diff --git a/aptos-move/aptos-workspace-server/src/services/indexer_api.rs b/aptos-move/aptos-workspace-server/src/services/indexer_api.rs index 159470709912b..62043db8cf407 100644 --- a/aptos-move/aptos-workspace-server/src/services/indexer_api.rs +++ b/aptos-move/aptos-workspace-server/src/services/indexer_api.rs @@ -7,10 +7,10 @@ use super::{ }; use crate::common::{make_shared, IP_LOCAL_HOST}; use anyhow::{anyhow, Context, Result}; -use aptos::node::local_testnet::{ +use aptos_localnet::{ docker, + health_checker::HealthChecker, indexer_api::{post_metadata, HASURA_IMAGE, HASURA_METADATA}, - HealthChecker, }; use bollard::{ container::{CreateContainerOptions, WaitContainerOptions}, diff --git a/aptos-move/aptos-workspace-server/src/services/node.rs b/aptos-move/aptos-workspace-server/src/services/node.rs index 0ceb165606426..3809023c05de5 100644 --- a/aptos-move/aptos-workspace-server/src/services/node.rs +++ b/aptos-move/aptos-workspace-server/src/services/node.rs @@ -3,8 +3,8 @@ use crate::common::IP_LOCAL_HOST; use anyhow::{bail, Result}; -use aptos::node::local_testnet::HealthChecker; use aptos_config::config::{NodeConfig, TableInfoServiceMode}; +use aptos_localnet::health_checker::HealthChecker; use aptos_node::{load_node_config, start_and_report_ports}; use aptos_types::network_address::{NetworkAddress, Protocol}; use futures::channel::oneshot; diff --git a/aptos-move/aptos-workspace-server/src/services/postgres.rs b/aptos-move/aptos-workspace-server/src/services/postgres.rs index 1a84798a626d1..3ff53471ada94 100644 --- a/aptos-move/aptos-workspace-server/src/services/postgres.rs +++ b/aptos-move/aptos-workspace-server/src/services/postgres.rs @@ -1,10 +1,12 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::docker_common::{create_docker_volume, create_start_and_inspect_container}; -use crate::common::{make_shared, IP_LOCAL_HOST}; +use crate::{ + common::{make_shared, IP_LOCAL_HOST}, + services::docker_common::{create_docker_volume, create_start_and_inspect_container}, +}; use anyhow::{anyhow, Context, Result}; -use aptos::node::local_testnet::{docker, HealthChecker}; +use aptos_localnet::{docker, health_checker::HealthChecker}; use bollard::{ container::{CreateContainerOptions, WaitContainerOptions}, secret::{ContainerInspectResponse, HostConfig, PortBinding}, diff --git a/aptos-move/aptos-workspace-server/src/services/processors.rs b/aptos-move/aptos-workspace-server/src/services/processors.rs index 97a0dc38a64c0..70d99b7b88b46 100644 --- a/aptos-move/aptos-workspace-server/src/services/processors.rs +++ b/aptos-move/aptos-workspace-server/src/services/processors.rs @@ -4,7 +4,7 @@ use super::{node::get_data_service_url, postgres::get_postgres_connection_string}; use crate::common::make_shared; use anyhow::{anyhow, Context, Result}; -use aptos::node::local_testnet::{processors::get_processor_config, HealthChecker}; +use aptos_localnet::{health_checker::HealthChecker, processors::get_processor_config}; use diesel::Connection; use diesel_async::{async_connection_wrapper::AsyncConnectionWrapper, pg::AsyncPgConnection}; use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; diff --git a/crates/aptos-localnet/Cargo.toml b/crates/aptos-localnet/Cargo.toml new file mode 100644 index 0000000000000..370ddebccdf73 --- /dev/null +++ b/crates/aptos-localnet/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "aptos-localnet" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-indexer-grpc-utils = { workspace = true } +bollard = { workspace = true } +dirs = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +version-compare = { workspace = true } + +aptos-protos = { workspace = true } +aptos-rest-client = { workspace = true } +diesel = { workspace = true, features = [ + "postgres_backend", +] } +diesel-async = { workspace = true } +processor = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "51a34901b40d7f75767ac907b4d2478104d6a515", default-features = false } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } diff --git a/crates/aptos-localnet/src/docker.rs b/crates/aptos-localnet/src/docker.rs new file mode 100644 index 0000000000000..4a899cb7cc57e --- /dev/null +++ b/crates/aptos-localnet/src/docker.rs @@ -0,0 +1,92 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{Context, Result}; +use bollard::Docker; +#[cfg(unix)] +use bollard::API_DEFAULT_VERSION; +use tracing::{info, warn}; +use version_compare::Version; + +const ERROR_MESSAGE: &str = "Docker is not available, confirm it is installed and running. See https://aptos.dev/guides/local-development-network#faq for assistance."; + +/// This function returns a Docker client. Before returning, it confirms that it can +/// actually query the API and checks that the API version is sufficient. It first +/// tries to connect at the default socket location and if that fails, it tries to find +/// a socket in the user's home directory. On Windows NT it doesn't try that since +/// there no second location, there is just the one named pipe. +pub async fn get_docker() -> Result { + let docker = Docker::connect_with_local_defaults() + .context(format!("{} (init_default)", ERROR_MESSAGE)) + .inspect_err(|e| eprintln!("{:#}", e))?; + + // We have to specify the type because the compiler can't figure out the error + // in the case where the system is Unix. + let out: Result<(Docker, bollard::system::Version), bollard::errors::Error> = + match docker.version().await { + Ok(version) => Ok((docker, version)), + Err(err) => { + warn!( + "Received this error trying to use default Docker socket location: {:#}", + err + ); + // Look for the socket in ~/.docker/run + // We don't have to do this if this issue gets addressed: + // https://github.com/fussybeaver/bollard/issues/345 + #[cfg(unix)] + { + let path = dirs::home_dir() + .context(format!("{} (home_dir)", ERROR_MESSAGE))? + .join(".docker") + .join("run") + .join("docker.sock"); + info!("Looking for Docker socket at {}", path.display()); + let path = path.to_str().context(format!("{} (path)", ERROR_MESSAGE))?; + let docker = Docker::connect_with_socket(path, 120, API_DEFAULT_VERSION) + .context(format!("{} (init_home)", ERROR_MESSAGE))?; + let version = docker + .version() + .await + .context(format!("{} (version_home)", ERROR_MESSAGE))?; + Ok((docker, version)) + } + // Just return the original error. + #[cfg(not(unix))] + Err(err) + }, + }; + let (docker, version) = out?; + + // Try to warn the user about their Docker version being too old. We don't error + // out if the version is too old in case we're wrong about the minimum version + // for their particular system. We just print a warning. + match version.api_version { + Some(current_api_version) => match Version::from(¤t_api_version) { + Some(current_api_version) => { + let minimum_api_version = Version::from("1.42").unwrap(); + if current_api_version < minimum_api_version { + eprintln!( + "WARNING: Docker API version {} is too old, minimum required version is {}. Please update Docker!", + current_api_version, + minimum_api_version, + ); + } else { + info!("Docker version is sufficient: {}", current_api_version); + } + }, + None => { + eprintln!( + "WARNING: Failed to parse Docker API version: {}", + current_api_version + ); + }, + }, + None => { + eprintln!( + "WARNING: Failed to determine Docker version, confirm your Docker is up to date!" + ); + }, + } + + Ok(docker) +} diff --git a/crates/aptos/src/node/local_testnet/hasura_metadata.json b/crates/aptos-localnet/src/hasura_metadata.json similarity index 100% rename from crates/aptos/src/node/local_testnet/hasura_metadata.json rename to crates/aptos-localnet/src/hasura_metadata.json diff --git a/crates/aptos-localnet/src/health_checker.rs b/crates/aptos-localnet/src/health_checker.rs new file mode 100644 index 0000000000000..19edd2fb561aa --- /dev/null +++ b/crates/aptos-localnet/src/health_checker.rs @@ -0,0 +1,216 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::indexer_api::confirm_metadata_applied; +use anyhow::{anyhow, Context, Result}; +use aptos_protos::indexer::v1::GetTransactionsRequest; +use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel_async::{pg::AsyncPgConnection, AsyncConnection, RunQueryDsl}; +use futures::StreamExt; +use processor::schema::processor_status; +use reqwest::Url; +use serde::Serialize; +use std::time::Duration; +use tokio::time::Instant; +use tracing::info; + +const MAX_WAIT_S: u64 = 60; +const WAIT_INTERVAL_MS: u64 = 200; + +/// This provides a single place to define a variety of different healthchecks. In +/// cases where the name of the service being checked isn't obvious, the enum will take +/// a string arg that names it. +#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize)] +pub enum HealthChecker { + /// Check that a HTTP API is up. The second param is the name of the HTTP service. + Http(Url, String), + /// Check that the node API is up. This is just a specific case of Http for extra + /// guarantees around liveliness. + NodeApi(Url), + /// Check that a data service GRPC stream is up. + DataServiceGrpc(Url), + /// Check that a postgres instance is up. + Postgres(String), + /// Check that a processor is successfully processing txns. The first value is the + /// postgres connection string. The second is the name of the processor. We check + /// the that last_success_version in the processor_status table is present and > 0. + Processor(String, String), + /// Check that the indexer API is up and the metadata has been applied. We only use + /// this one in the ready server. + IndexerApiMetadata(Url), +} + +impl HealthChecker { + pub async fn check(&self) -> Result<()> { + match self { + HealthChecker::Http(url, _) => { + reqwest::get(Url::clone(url)) + .await + .with_context(|| format!("Failed to GET {}", url))?; + Ok(()) + }, + HealthChecker::NodeApi(url) => { + aptos_rest_client::Client::new(Url::clone(url)) + .get_index() + .await?; + Ok(()) + }, + HealthChecker::DataServiceGrpc(url) => { + let mut client = aptos_indexer_grpc_utils::create_data_service_grpc_client( + url.clone(), + Some(Duration::from_secs(5)), + ) + .await?; + let request = tonic::Request::new(GetTransactionsRequest { + starting_version: Some(0), + ..Default::default() + }); + // Make sure we can stream the first message from the stream. + client + .get_transactions(request) + .await + .context("GRPC connection error")? + .into_inner() + .next() + .await + .context("Did not receive init signal from data service GRPC stream")? + .context("Error processing first message from GRPC stream")?; + Ok(()) + }, + HealthChecker::Postgres(connection_string) => { + AsyncPgConnection::establish(connection_string) + .await + .context("Failed to connect to postgres to check DB liveness")?; + Ok(()) + }, + HealthChecker::Processor(connection_string, processor_name) => { + let mut connection = AsyncPgConnection::establish(connection_string) + .await + .context("Failed to connect to postgres to check processor status")?; + let result = processor_status::table + .select((processor_status::last_success_version,)) + .filter(processor_status::processor.eq(processor_name)) + .first::<(i64,)>(&mut connection) + .await + .optional() + .context("Failed to look up processor status")?; + match result { + Some(result) => { + // This is last_success_version. + if result.0 > 0 { + info!( + "Processor {} started processing successfully (currently at version {})", + processor_name, result.0 + ); + Ok(()) + } else { + Err(anyhow!( + "Processor {} found in DB but last_success_version is zero", + processor_name + )) + } + }, + None => Err(anyhow!( + "Processor {} has not processed any transactions", + processor_name + )), + } + }, + HealthChecker::IndexerApiMetadata(url) => { + confirm_metadata_applied(url.clone()).await?; + Ok(()) + }, + } + } + + /// Wait up to MAX_WAIT_S seconds for a service to start up. + pub async fn wait( + &self, + // The service, if any, waiting for this service to start up. + waiting_service: Option<&str>, + ) -> Result<()> { + let prefix = self.to_string(); + wait_for_startup(|| self.check(), match waiting_service { + Some(waiting_service) => { + format!( + "{} at {} did not start up before {}", + prefix, + self.address_str(), + waiting_service, + ) + }, + None => format!("{} at {} did not start up", prefix, self.address_str()), + }) + .await + } + + /// This is only ever used for display purposes. If possible, this should be the + /// endpoint of the service that this HealthChecker is checking. + pub fn address_str(&self) -> &str { + match self { + HealthChecker::Http(url, _) => url.as_str(), + HealthChecker::NodeApi(url) => url.as_str(), + HealthChecker::DataServiceGrpc(url) => url.as_str(), + HealthChecker::Postgres(url) => url.as_str(), + HealthChecker::Processor(_, processor_name) => processor_name.as_str(), + HealthChecker::IndexerApiMetadata(url) => url.as_str(), + } + } + + /// Given a port, make an instance of HealthChecker::Http targeting 127.0.0.1. + pub fn http_checker_from_port(port: u16, name: String) -> Self { + Self::Http( + Url::parse(&format!("http://127.0.0.1:{}", port,)).unwrap(), + name, + ) + } +} + +impl std::fmt::Display for HealthChecker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HealthChecker::Http(_, name) => write!(f, "{}", name), + HealthChecker::NodeApi(_) => write!(f, "Node API"), + HealthChecker::DataServiceGrpc(_) => write!(f, "Transaction stream"), + HealthChecker::Postgres(_) => write!(f, "Postgres"), + HealthChecker::Processor(_, processor_name) => write!(f, "{}", processor_name), + HealthChecker::IndexerApiMetadata(_) => write!(f, "Indexer API with metadata applied"), + } + } +} + +async fn wait_for_startup(check_fn: F, error_message: String) -> Result<()> +where + F: Fn() -> Fut, + Fut: futures::Future>, +{ + let max_wait = Duration::from_secs(MAX_WAIT_S); + let wait_interval = Duration::from_millis(WAIT_INTERVAL_MS); + + let start = Instant::now(); + let mut started_successfully = false; + + let mut last_error_message = None; + while start.elapsed() < max_wait { + match check_fn().await { + Ok(_) => { + started_successfully = true; + break; + }, + Err(err) => { + last_error_message = Some(format!("{:#}", err)); + }, + } + tokio::time::sleep(wait_interval).await + } + + if !started_successfully { + let error_message = match last_error_message { + Some(last_error_message) => format!("{}: {}", error_message, last_error_message), + None => error_message, + }; + return Err(anyhow!(error_message)); + } + + Ok(()) +} diff --git a/crates/aptos-localnet/src/indexer_api.rs b/crates/aptos-localnet/src/indexer_api.rs new file mode 100644 index 0000000000000..673add052b68a --- /dev/null +++ b/crates/aptos-localnet/src/indexer_api.rs @@ -0,0 +1,122 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{anyhow, Context, Result}; +use reqwest::Url; +use tracing::info; + +pub const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.44.0-ce"; + +/// This Hasura metadata originates from the aptos-indexer-processors repo. +/// +/// This metadata here should come from the same revision as the `processor` dep. +/// +/// The metadata file is not taken verbatim, it is currently edited by hand to remove +/// any references to tables that aren't created by the Rust processor migrations. +/// +/// To arrive at the final edited file I normally start with the new metadata file, +/// try to start the localnet, and check .aptos/testnet/main/tracing.log to +/// see what error Hasura returned. Remove the culprit from the metadata, which is +/// generally a few tables and relations to those tables, and try again. Repeat until +/// it accepts the metadata. +/// +/// This works fine today since all the key processors you'd need in a localnet +/// are in the set of processors written in Rust. If this changes, we can explore +/// alternatives, e.g. running processors in other languages using containers. +pub const HASURA_METADATA: &str = include_str!("hasura_metadata.json"); + +/// This submits a POST request to apply metadata to a Hasura API. +pub async fn post_metadata(url: Url, metadata_content: &str) -> Result<()> { + // Parse the metadata content as JSON. + let metadata_json: serde_json::Value = serde_json::from_str(metadata_content)?; + + // Make the request. + info!("Submitting request to apply Hasura metadata"); + let response = + make_hasura_metadata_request(url, "replace_metadata", Some(metadata_json)).await?; + info!( + "Received response for applying Hasura metadata: {:?}", + response + ); + + // Confirm that the metadata was applied successfully and there is no inconsistency + // between the schema and the underlying DB schema. + if let Some(obj) = response.as_object() { + if let Some(is_consistent_val) = obj.get("is_consistent") { + if is_consistent_val.as_bool() == Some(true) { + return Ok(()); + } + } + } + + Err(anyhow!( + "Something went wrong applying the Hasura metadata, perhaps it is not consistent with the DB. Response: {:#?}", + response + )) +} + +/// This confirms that the metadata has been applied. We use this in the health +/// checker. +pub async fn confirm_metadata_applied(url: Url) -> Result<()> { + // Make the request. + info!("Confirming Hasura metadata applied..."); + let response = make_hasura_metadata_request(url, "export_metadata", None).await?; + info!( + "Received response for confirming Hasura metadata applied: {:?}", + response + ); + + // If the sources field is set it means the metadata was applied successfully. + if let Some(obj) = response.as_object() { + if let Some(sources) = obj.get("sources") { + if let Some(sources) = sources.as_array() { + if !sources.is_empty() { + return Ok(()); + } + } + } + } + + Err(anyhow!( + "The Hasura metadata has not been applied yet. Response: {:#?}", + response + )) +} + +/// The /v1/metadata endpoint supports a few different operations based on the `type` +/// field in the request body. All requests have a similar format, with these `type` +/// and `args` fields. +pub async fn make_hasura_metadata_request( + mut url: Url, + typ: &str, + args: Option, +) -> Result { + let client = reqwest::Client::new(); + + // Update the query path. + url.set_path("/v1/metadata"); + + // Construct the payload. + let mut payload = serde_json::Map::new(); + payload.insert( + "type".to_string(), + serde_json::Value::String(typ.to_string()), + ); + + // If args is provided, use that. Otherwise use an empty object. We have to set it + // no matter what because the API expects the args key to be set. + let args = match args { + Some(args) => args, + None => serde_json::Value::Object(serde_json::Map::new()), + }; + payload.insert("args".to_string(), args); + + // Send the POST request. + let response = client.post(url).json(&payload).send().await?; + + // Return the response as a JSON value. + response + .json() + .await + .context("Failed to parse response as JSON") +} diff --git a/crates/aptos-localnet/src/lib.rs b/crates/aptos-localnet/src/lib.rs new file mode 100644 index 0000000000000..cfcf78d6d8fb3 --- /dev/null +++ b/crates/aptos-localnet/src/lib.rs @@ -0,0 +1,13 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! This is a library that provides functionalities required for running a local Aptos network, +//! use by `aptos-workspace-server`` and the CLI's localnet command. +//! +//! Currently it only contains some utility functions, but more code will be moved here over +//! time. + +pub mod docker; +pub mod health_checker; +pub mod indexer_api; +pub mod processors; diff --git a/crates/aptos-localnet/src/processors.rs b/crates/aptos-localnet/src/processors.rs new file mode 100644 index 0000000000000..350de43e2d667 --- /dev/null +++ b/crates/aptos-localnet/src/processors.rs @@ -0,0 +1,66 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{bail, Result}; +use processor::processors::{ + objects_processor::ObjectsProcessorConfig, stake_processor::StakeProcessorConfig, + token_v2_processor::TokenV2ProcessorConfig, ProcessorConfig, ProcessorName, +}; + +pub fn get_processor_config(processor_name: &ProcessorName) -> Result { + Ok(match processor_name { + ProcessorName::AccountTransactionsProcessor => { + ProcessorConfig::AccountTransactionsProcessor + }, + ProcessorName::AnsProcessor => { + bail!("ANS processor is not supported in the localnet") + }, + ProcessorName::DefaultProcessor => ProcessorConfig::DefaultProcessor, + ProcessorName::EventsProcessor => ProcessorConfig::EventsProcessor, + ProcessorName::FungibleAssetProcessor => ProcessorConfig::FungibleAssetProcessor, + ProcessorName::MonitoringProcessor => { + bail!("Monitoring processor is not supported in the localnet") + }, + ProcessorName::NftMetadataProcessor => { + bail!("NFT Metadata processor is not supported in the localnet") + }, + ProcessorName::ObjectsProcessor => { + ProcessorConfig::ObjectsProcessor(ObjectsProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }) + }, + ProcessorName::ParquetDefaultProcessor => { + bail!("ParquetDefaultProcessor is not supported in the localnet") + }, + ProcessorName::ParquetFungibleAssetProcessor => { + bail!("ParquetFungibleAssetProcessor is not supported in the localnet") + }, + ProcessorName::ParquetTransactionMetadataProcessor => { + bail!("ParquetTransactionMetadataProcessor is not supported in the localnet") + }, + ProcessorName::ParquetAnsProcessor => { + bail!("ParquetAnsProcessor is not supported in the localnet") + }, + ProcessorName::ParquetEventsProcessor => { + bail!("ParquetEventsProcessor is not supported in the localnet") + }, + ProcessorName::ParquetTokenV2Processor => { + bail!("ParquetTokenV2Processor is not supported in the localnet") + }, + ProcessorName::StakeProcessor => ProcessorConfig::StakeProcessor(StakeProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }), + ProcessorName::TokenV2Processor => { + ProcessorConfig::TokenV2Processor(TokenV2ProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }) + }, + ProcessorName::TransactionMetadataProcessor => { + ProcessorConfig::TransactionMetadataProcessor + }, + ProcessorName::UserTransactionProcessor => ProcessorConfig::UserTransactionProcessor, + }) +} diff --git a/crates/aptos/Cargo.toml b/crates/aptos/Cargo.toml index 9970cad41819d..4e8c393472f65 100644 --- a/crates/aptos/Cargo.toml +++ b/crates/aptos/Cargo.toml @@ -33,6 +33,7 @@ aptos-indexer-grpc-server-framework = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-keygen = { workspace = true } aptos-ledger = { workspace = true } +aptos-localnet = { workspace = true } aptos-logger = { workspace = true } aptos-move-debugger = { workspace = true } aptos-network-checker = { workspace = true } @@ -49,6 +50,7 @@ aptos-vm-environment = { workspace = true } aptos-vm-genesis = { workspace = true } aptos-vm-logging = { workspace = true } aptos-vm-types = { workspace = true } +aptos-workspace-server = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bcs = { workspace = true } @@ -107,7 +109,6 @@ tonic = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } url = { workspace = true } -version-compare = { workspace = true } [target.'cfg(unix)'.dependencies] jemallocator = { workspace = true } diff --git a/crates/aptos/src/lib.rs b/crates/aptos/src/lib.rs index 49a0d3c79816b..5dbe6413482d2 100644 --- a/crates/aptos/src/lib.rs +++ b/crates/aptos/src/lib.rs @@ -16,6 +16,7 @@ pub mod stake; #[cfg(any(test, feature = "fuzzing"))] pub mod test; pub mod update; +pub mod workspace; use crate::common::{ types::{CliCommand, CliResult, CliTypedResult}, @@ -51,6 +52,8 @@ pub enum Tool { Stake(stake::StakeTool), #[clap(subcommand)] Update(update::UpdateTool), + #[clap(subcommand, hide(true))] + Workspace(workspace::WorkspaceTool), } impl Tool { @@ -70,6 +73,7 @@ impl Tool { Node(tool) => tool.execute().await, Stake(tool) => tool.execute().await, Update(tool) => tool.execute().await, + Workspace(tool) => tool.execute().await, } } } diff --git a/crates/aptos/src/node/local_testnet/docker.rs b/crates/aptos/src/node/local_testnet/docker.rs index a118c6eea598c..c1eeff465fc59 100644 --- a/crates/aptos/src/node/local_testnet/docker.rs +++ b/crates/aptos/src/node/local_testnet/docker.rs @@ -3,107 +3,21 @@ use super::traits::ShutdownStep; use anyhow::{Context, Result}; +pub use aptos_localnet::docker::get_docker; use async_trait::async_trait; -#[cfg(unix)] -use bollard::API_DEFAULT_VERSION; use bollard::{ container::{RemoveContainerOptions, StopContainerOptions}, errors::Error as BollardError, image::CreateImageOptions, network::CreateNetworkOptions, volume::{CreateVolumeOptions, RemoveVolumeOptions}, - Docker, }; use futures::TryStreamExt; use std::{fs::create_dir_all, path::Path}; use tracing::{info, warn}; -use version_compare::Version; - -const ERROR_MESSAGE: &str = "Docker is not available, confirm it is installed and running. See https://aptos.dev/guides/local-development-network#faq for assistance."; pub const CONTAINER_NETWORK_NAME: &str = "aptos-local-testnet-network"; -/// This function returns a Docker client. Before returning, it confirms that it can -/// actually query the API and checks that the API version is sufficient. It first -/// tries to connect at the default socket location and if that fails, it tries to find -/// a socket in the user's home directory. On Windows NT it doesn't try that since -/// there no second location, there is just the one named pipe. -pub async fn get_docker() -> Result { - let docker = Docker::connect_with_local_defaults() - .context(format!("{} (init_default)", ERROR_MESSAGE)) - .inspect_err(|e| eprintln!("{:#}", e))?; - - // We have to specify the type because the compiler can't figure out the error - // in the case where the system is Unix. - let out: Result<(Docker, bollard::system::Version), bollard::errors::Error> = - match docker.version().await { - Ok(version) => Ok((docker, version)), - Err(err) => { - warn!( - "Received this error trying to use default Docker socket location: {:#}", - err - ); - // Look for the socket in ~/.docker/run - // We don't have to do this if this issue gets addressed: - // https://github.com/fussybeaver/bollard/issues/345 - #[cfg(unix)] - { - let path = dirs::home_dir() - .context(format!("{} (home_dir)", ERROR_MESSAGE))? - .join(".docker") - .join("run") - .join("docker.sock"); - info!("Looking for Docker socket at {}", path.display()); - let path = path.to_str().context(format!("{} (path)", ERROR_MESSAGE))?; - let docker = Docker::connect_with_socket(path, 120, API_DEFAULT_VERSION) - .context(format!("{} (init_home)", ERROR_MESSAGE))?; - let version = docker - .version() - .await - .context(format!("{} (version_home)", ERROR_MESSAGE))?; - Ok((docker, version)) - } - // Just return the original error. - #[cfg(not(unix))] - Err(err) - }, - }; - let (docker, version) = out?; - - // Try to warn the user about their Docker version being too old. We don't error - // out if the version is too old in case we're wrong about the minimum version - // for their particular system. We just print a warning. - match version.api_version { - Some(current_api_version) => match Version::from(¤t_api_version) { - Some(current_api_version) => { - let minimum_api_version = Version::from("1.42").unwrap(); - if current_api_version < minimum_api_version { - eprintln!( - "WARNING: Docker API version {} is too old, minimum required version is {}. Please update Docker!", - current_api_version, - minimum_api_version, - ); - } else { - info!("Docker version is sufficient: {}", current_api_version); - } - }, - None => { - eprintln!( - "WARNING: Failed to parse Docker API version: {}", - current_api_version - ); - }, - }, - None => { - eprintln!( - "WARNING: Failed to determine Docker version, confirm your Docker is up to date!" - ); - }, - } - - Ok(docker) -} - /// Delete a container. If the container doesn't exist, that's fine, just move on. pub async fn delete_container(container_name: &str) -> Result<()> { info!( diff --git a/crates/aptos/src/node/local_testnet/indexer_api.rs b/crates/aptos/src/node/local_testnet/indexer_api.rs index 7c9dc6dafe3ed..5d0ca124f0ddf 100644 --- a/crates/aptos/src/node/local_testnet/indexer_api.rs +++ b/crates/aptos/src/node/local_testnet/indexer_api.rs @@ -11,6 +11,9 @@ use super::{ RunLocalnet, }; use anyhow::{anyhow, Context, Result}; +pub use aptos_localnet::indexer_api::{ + make_hasura_metadata_request, post_metadata, HASURA_IMAGE, HASURA_METADATA, +}; use async_trait::async_trait; use bollard::{ container::{Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions}, @@ -24,25 +27,6 @@ use std::{collections::HashSet, path::PathBuf, time::Duration}; use tracing::{info, warn}; const INDEXER_API_CONTAINER_NAME: &str = "local-testnet-indexer-api"; -pub const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.44.0-ce"; - -/// This Hasura metadata originates from the aptos-indexer-processors repo. -/// -/// This metadata here should come from the same revision as the `processor` dep. -/// -/// The metadata file is not taken verbatim, it is currently edited by hand to remove -/// any references to tables that aren't created by the Rust processor migrations. -/// -/// To arrive at the final edited file I normally start with the new metadata file, -/// try to start the localnet, and check .aptos/testnet/main/tracing.log to -/// see what error Hasura returned. Remove the culprit from the metadata, which is -/// generally a few tables and relations to those tables, and try again. Repeat until -/// it accepts the metadata. -/// -/// This works fine today since all the key processors you'd need in a localnet -/// are in the set of processors written in Rust. If this changes, we can explore -/// alternatives, e.g. running processors in other languages using containers. -pub const HASURA_METADATA: &str = include_str!("hasura_metadata.json"); /// Args related to running an indexer API for the localnet. #[derive(Debug, Parser)] @@ -336,36 +320,6 @@ impl ServiceManager for IndexerApiManager { } } -/// This submits a POST request to apply metadata to a Hasura API. -pub async fn post_metadata(url: Url, metadata_content: &str) -> Result<()> { - // Parse the metadata content as JSON. - let metadata_json: serde_json::Value = serde_json::from_str(metadata_content)?; - - // Make the request. - info!("Submitting request to apply Hasura metadata"); - let response = - make_hasura_metadata_request(url, "replace_metadata", Some(metadata_json)).await?; - info!( - "Received response for applying Hasura metadata: {:?}", - response - ); - - // Confirm that the metadata was applied successfully and there is no inconsistency - // between the schema and the underlying DB schema. - if let Some(obj) = response.as_object() { - if let Some(is_consistent_val) = obj.get("is_consistent") { - if is_consistent_val.as_bool() == Some(true) { - return Ok(()); - } - } - } - - Err(anyhow!( - "Something went wrong applying the Hasura metadata, perhaps it is not consistent with the DB. Response: {:#?}", - response - )) -} - /// This confirms that the metadata has been applied. We use this in the health /// checker. pub async fn confirm_metadata_applied(url: Url) -> Result<()> { @@ -393,41 +347,3 @@ pub async fn confirm_metadata_applied(url: Url) -> Result<()> { response )) } - -/// The /v1/metadata endpoint supports a few different operations based on the `type` -/// field in the request body. All requests have a similar format, with these `type` -/// and `args` fields. -async fn make_hasura_metadata_request( - mut url: Url, - typ: &str, - args: Option, -) -> Result { - let client = reqwest::Client::new(); - - // Update the query path. - url.set_path("/v1/metadata"); - - // Construct the payload. - let mut payload = serde_json::Map::new(); - payload.insert( - "type".to_string(), - serde_json::Value::String(typ.to_string()), - ); - - // If args is provided, use that. Otherwise use an empty object. We have to set it - // no matter what because the API expects the args key to be set. - let args = match args { - Some(args) => args, - None => serde_json::Value::Object(serde_json::Map::new()), - }; - payload.insert("args".to_string(), args); - - // Send the POST request. - let response = client.post(url).json(&payload).send().await?; - - // Return the response as a JSON value. - response - .json() - .await - .context("Failed to parse response as JSON") -} diff --git a/crates/aptos/src/node/local_testnet/processors.rs b/crates/aptos/src/node/local_testnet/processors.rs index 4be69ac2e5bf2..e8fbf21ab376b 100644 --- a/crates/aptos/src/node/local_testnet/processors.rs +++ b/crates/aptos/src/node/local_testnet/processors.rs @@ -3,19 +3,15 @@ use super::{health_checker::HealthChecker, traits::ServiceManager, RunLocalnet}; use anyhow::{bail, Context, Result}; +pub use aptos_localnet::processors::get_processor_config; use async_trait::async_trait; use clap::Parser; use diesel::Connection; use diesel_async::{async_connection_wrapper::AsyncConnectionWrapper, pg::AsyncPgConnection}; use maplit::hashset; use processor::{ - gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, - processors::{ - objects_processor::ObjectsProcessorConfig, stake_processor::StakeProcessorConfig, - token_v2_processor::TokenV2ProcessorConfig, ProcessorConfig, ProcessorName, - }, - utils::database::run_pending_migrations, - IndexerGrpcProcessorConfig, + gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorName, + utils::database::run_pending_migrations, IndexerGrpcProcessorConfig, }; use reqwest::Url; use server_framework::RunnableConfig; @@ -57,64 +53,6 @@ pub struct ProcessorManager { prerequisite_health_checkers: HashSet, } -pub fn get_processor_config(processor_name: &ProcessorName) -> Result { - Ok(match processor_name { - ProcessorName::AccountTransactionsProcessor => { - ProcessorConfig::AccountTransactionsProcessor - }, - ProcessorName::AnsProcessor => { - bail!("ANS processor is not supported in the localnet") - }, - ProcessorName::DefaultProcessor => ProcessorConfig::DefaultProcessor, - ProcessorName::EventsProcessor => ProcessorConfig::EventsProcessor, - ProcessorName::FungibleAssetProcessor => ProcessorConfig::FungibleAssetProcessor, - ProcessorName::MonitoringProcessor => { - bail!("Monitoring processor is not supported in the localnet") - }, - ProcessorName::NftMetadataProcessor => { - bail!("NFT Metadata processor is not supported in the localnet") - }, - ProcessorName::ObjectsProcessor => { - ProcessorConfig::ObjectsProcessor(ObjectsProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }) - }, - ProcessorName::ParquetDefaultProcessor => { - bail!("ParquetDefaultProcessor is not supported in the localnet") - }, - ProcessorName::ParquetFungibleAssetProcessor => { - bail!("ParquetFungibleAssetProcessor is not supported in the localnet") - }, - ProcessorName::ParquetTransactionMetadataProcessor => { - bail!("ParquetTransactionMetadataProcessor is not supported in the localnet") - }, - ProcessorName::ParquetAnsProcessor => { - bail!("ParquetAnsProcessor is not supported in the localnet") - }, - ProcessorName::ParquetEventsProcessor => { - bail!("ParquetEventsProcessor is not supported in the localnet") - }, - ProcessorName::ParquetTokenV2Processor => { - bail!("ParquetTokenV2Processor is not supported in the localnet") - }, - ProcessorName::StakeProcessor => ProcessorConfig::StakeProcessor(StakeProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }), - ProcessorName::TokenV2Processor => { - ProcessorConfig::TokenV2Processor(TokenV2ProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }) - }, - ProcessorName::TransactionMetadataProcessor => { - ProcessorConfig::TransactionMetadataProcessor - }, - ProcessorName::UserTransactionProcessor => ProcessorConfig::UserTransactionProcessor, - }) -} - impl ProcessorManager { fn new( processor_name: &ProcessorName, diff --git a/crates/aptos/src/workspace/mod.rs b/crates/aptos/src/workspace/mod.rs new file mode 100644 index 0000000000000..8b4827d8f8c3c --- /dev/null +++ b/crates/aptos/src/workspace/mod.rs @@ -0,0 +1,38 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::common::types::{CliCommand, CliResult, CliTypedResult}; +use async_trait::async_trait; +use clap::Parser; + +/// Tool for operations related to Aptos Workspace +#[derive(Parser)] +pub enum WorkspaceTool { + Run(RunWorkspace), +} + +impl WorkspaceTool { + pub async fn execute(self) -> CliResult { + use WorkspaceTool::*; + + match self { + Run(cmd) => cmd.execute_serialized_without_logger().await, + } + } +} + +#[derive(Parser)] +pub struct RunWorkspace; + +#[async_trait] +impl CliCommand<()> for RunWorkspace { + fn command_name(&self) -> &'static str { + "RunWorkspace" + } + + async fn execute(self) -> CliTypedResult<()> { + aptos_workspace_server::run_all_services().await?; + + Ok(()) + } +}