diff --git a/Cargo.lock b/Cargo.lock index 6a3fed2554..9b8643f9b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,22 +470,19 @@ dependencies = [ "ckb-db", "ckb-instrument", "ckb-jsonrpc-types", + "ckb-launcher", "ckb-logger", "ckb-logger-service", "ckb-memory-tracker", "ckb-metrics-service", "ckb-miner", "ckb-network", - "ckb-network-alert", "ckb-resource", - "ckb-rpc", "ckb-sentry", "ckb-shared", "ckb-store", - "ckb-sync", "ckb-types", "ckb-util", - "ckb-verification", "ckb-verification-traits", "clap", "ctrlc", @@ -720,6 +717,36 @@ dependencies = [ "serde_json", ] +[[package]] +name = "ckb-launcher" +version = "0.41.0-pre" +dependencies = [ + "ckb-app-config", + "ckb-async-runtime", + "ckb-build-info", + "ckb-chain", + "ckb-db", + "ckb-db-migration", + "ckb-db-schema", + "ckb-error", + "ckb-freezer", + "ckb-jsonrpc-types", + "ckb-logger", + "ckb-migration-template", + "ckb-network", + "ckb-network-alert", + "ckb-proposal-table", + "ckb-resource", + "ckb-rpc", + "ckb-shared", + "ckb-store", + "ckb-sync", + "ckb-types", + "ckb-verification", + "ckb-verification-traits", + "num_cpus", +] + [[package]] name = "ckb-librocksdb-sys" version = "6.11.6" @@ -1205,12 +1232,10 @@ dependencies = [ "ckb-channel", "ckb-constant", "ckb-db", - "ckb-db-migration", "ckb-db-schema", "ckb-error", "ckb-freezer", "ckb-logger", - "ckb-migration-template", "ckb-notify", "ckb-proposal-table", "ckb-snapshot", @@ -1220,8 +1245,6 @@ dependencies = [ "ckb-types", "ckb-verification", "faketime", - "num_cpus", - "tempfile", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 80ae63b7d1..722e56a4c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,12 +76,13 @@ members = [ "verification", "verification/contextual", "tx-pool", - "shared/migration-template", "shared", "chain", "sync", "util/instrument", "rpc", + "util/launcher/migration-template", + "util/launcher", "ckb-bin" ] diff --git a/ckb-bin/Cargo.toml b/ckb-bin/Cargo.toml index 6149cf546c..cd492a7331 100644 --- a/ckb-bin/Cargo.toml +++ b/ckb-bin/Cargo.toml @@ -27,19 +27,16 @@ ckb-store = { path = "../store", version = "= 0.41.0-pre" } ckb-chain-spec = {path = "../spec", version = "= 0.41.0-pre"} ckb-miner = { path = "../miner", version = "= 0.41.0-pre" } ckb-network = { path = "../network", version = "= 0.41.0-pre"} -ckb-rpc = { path = "../rpc", version = "= 0.41.0-pre"} ckb-resource = { path = "../resource", version = "= 0.41.0-pre"} -ckb-network-alert = { path = "../util/network-alert", version = "= 0.41.0-pre" } ctrlc = { version = "3.1", features = ["termination"] } -ckb-sync = { path = "../sync", version = "= 0.41.0-pre"} ckb-instrument = { path = "../util/instrument", version = "= 0.41.0-pre", features = ["progress_bar"] } ckb-build-info = { path = "../util/build-info", version = "= 0.41.0-pre" } ckb-memory-tracker = { path = "../util/memory-tracker", version = "= 0.41.0-pre" } ckb-chain-iter = { path = "../util/chain-iter", version = "= 0.41.0-pre" } -ckb-verification = { path = "../verification", version = "= 0.41.0-pre" } ckb-verification-traits = { path = "../verification/traits", version = "= 0.41.0-pre" } ckb-async-runtime = { path = "../util/runtime", version = "= 0.41.0-pre" } ckb-db = { path = "../db", version = "= 0.41.0-pre" } +ckb-launcher = { path = "../util/launcher", version = "= 0.41.0-pre" } base64 = "0.13.0" tempfile = "3.0" rayon = "1.0" @@ -49,5 +46,5 @@ atty = "0.2" [features] deadlock_detection = ["ckb-util/deadlock_detection"] profiling = ["ckb-memory-tracker/profiling"] -with_sentry = ["sentry", "ckb-network/with_sentry", "ckb-sync/with_sentry", "ckb-app-config/with_sentry", "ckb-logger-service/with_sentry"] +with_sentry = ["sentry", "ckb-launcher/with_sentry", "ckb-network/with_sentry", "ckb-app-config/with_sentry", "ckb-logger-service/with_sentry"] with_dns_seeding = ["ckb-network/with_dns_seeding"] diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index f8e4a8ea7c..53b8d099fe 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -10,7 +10,6 @@ use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; use setup_guard::SetupGuard; -pub(crate) const LOG_TARGET_MAIN: &str = "main"; #[cfg(feature = "with_sentry")] pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; @@ -59,7 +58,7 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { (cli::CMD_IMPORT, Some(matches)) => subcommand::import(setup.import(&matches)?, handle), (cli::CMD_STATS, Some(matches)) => subcommand::stats(setup.stats(&matches)?, handle), (cli::CMD_RESET_DATA, Some(matches)) => subcommand::reset_data(setup.reset_data(&matches)?), - (cli::CMD_MIGRATE, Some(matches)) => subcommand::migrate(setup.migrate(&matches)?, handle), + (cli::CMD_MIGRATE, Some(matches)) => subcommand::migrate(setup.migrate(&matches)?), (cli::CMD_DB_REPAIR, Some(matches)) => subcommand::db_repair(setup.db_repair(&matches)?), _ => unreachable!(), } diff --git a/ckb-bin/src/subcommand/export.rs b/ckb-bin/src/subcommand/export.rs index 9b12264528..a8fe2fd5d0 100644 --- a/ckb-bin/src/subcommand/export.rs +++ b/ckb-bin/src/subcommand/export.rs @@ -4,7 +4,7 @@ use ckb_instrument::Export; use ckb_shared::shared::SharedBuilder; pub fn export(args: ExportArgs, async_handle: Handle) -> Result<(), ExitCode> { - let (shared, _) = SharedBuilder::new(&args.config.db, None, async_handle) + let (shared, _) = SharedBuilder::new(&args.config.db, async_handle) .consensus(args.consensus) .build() .map_err(|err| { diff --git a/ckb-bin/src/subcommand/import.rs b/ckb-bin/src/subcommand/import.rs index cd6897622a..72f00366bc 100644 --- a/ckb-bin/src/subcommand/import.rs +++ b/ckb-bin/src/subcommand/import.rs @@ -5,7 +5,7 @@ use ckb_instrument::Import; use ckb_shared::shared::SharedBuilder; pub fn import(args: ImportArgs, async_handle: Handle) -> Result<(), ExitCode> { - let (shared, table) = SharedBuilder::new(&args.config.db, None, async_handle) + let (shared, table) = SharedBuilder::new(&args.config.db, async_handle) .consensus(args.consensus) .build() .map_err(|err| { diff --git a/ckb-bin/src/subcommand/migrate.rs b/ckb-bin/src/subcommand/migrate.rs index 63a8f23acf..9d94c7632a 100644 --- a/ckb-bin/src/subcommand/migrate.rs +++ b/ckb-bin/src/subcommand/migrate.rs @@ -1,25 +1,24 @@ use ckb_app_config::{ExitCode, MigrateArgs}; -use ckb_async_runtime::Handle; -use ckb_shared::shared::SharedBuilder; +use ckb_launcher::DatabaseMigration; use crate::helper::prompt; -pub fn migrate(args: MigrateArgs, async_handle: Handle) -> Result<(), ExitCode> { - let builder = SharedBuilder::new(&args.config.db, None, async_handle); +pub fn migrate(args: MigrateArgs) -> Result<(), ExitCode> { + let migration = DatabaseMigration::new(&args.config.db.path); if args.check { - if builder.migration_check() { + if migration.migration_check() { return Ok(()); } else { return Err(ExitCode::Cli); } } - if !builder.migration_check() { + if !migration.migration_check() { return Ok(()); } - if builder.require_expensive_migrations() && !args.force { + if migration.require_expensive_migrations() && !args.force { if atty::is(atty::Stream::Stdin) && atty::is(atty::Stream::Stdout) { let input = prompt("\ \n\ @@ -41,9 +40,10 @@ pub fn migrate(args: MigrateArgs, async_handle: Handle) -> Result<(), ExitCode> } } - let (_shared, _table) = builder.consensus(args.consensus).build().map_err(|err| { + migration.migrate().map_err(|err| { eprintln!("Run error: {:?}", err); ExitCode::Failure })?; + Ok(()) } diff --git a/ckb-bin/src/subcommand/replay.rs b/ckb-bin/src/subcommand/replay.rs index e352bad550..b33a8e5c1d 100644 --- a/ckb-bin/src/subcommand/replay.rs +++ b/ckb-bin/src/subcommand/replay.rs @@ -9,7 +9,7 @@ use ckb_verification_traits::Switch; use std::sync::Arc; pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { - let (shared, _table) = SharedBuilder::new(&args.config.db, None, async_handle.clone()) + let (shared, _table) = SharedBuilder::new(&args.config.db, async_handle.clone()) .consensus(args.consensus.clone()) .tx_pool_config(args.config.tx_pool) .build() @@ -33,7 +33,7 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { let mut tmp_db_config = args.config.db.clone(); tmp_db_config.path = tmp_db_dir.path().to_path_buf(); - let (tmp_shared, table) = SharedBuilder::new(&tmp_db_config, None, async_handle) + let (tmp_shared, table) = SharedBuilder::new(&tmp_db_config, async_handle) .consensus(args.consensus) .tx_pool_config(args.config.tx_pool) .build() diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index a1f8c964b0..05bc275cd7 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,70 +1,26 @@ use crate::helper::deadlock_detection; -use ckb_app_config::{BlockAssemblerConfig, ExitCode, RunArgs}; +use ckb_app_config::{ExitCode, RunArgs}; use ckb_async_runtime::Handle; use ckb_build_info::Version; -use ckb_chain::chain::ChainService; -use ckb_jsonrpc_types::ScriptHashType; -use ckb_logger::info_target; -use ckb_network::{ - CKBProtocol, DefaultExitHandler, ExitHandler, NetworkService, NetworkState, SupportProtocols, -}; -use ckb_network_alert::alert_relayer::AlertRelayer; -use ckb_resource::Resource; -use ckb_rpc::{RpcServer, ServiceBuilder}; -use ckb_shared::shared::{Shared, SharedBuilder}; -use ckb_store::{ChainDB, ChainStore}; -use ckb_sync::{NetTimeProtocol, Relayer, SyncShared, Synchronizer}; -use ckb_types::packed::Byte32; -use ckb_types::{core::cell::setup_system_cell_cache, prelude::*}; -use ckb_verification::GenesisVerifier; -use ckb_verification_traits::Verifier; -use std::sync::Arc; +use ckb_launcher::Launcher; +use ckb_logger::info; +use ckb_network::{DefaultExitHandler, ExitHandler}; +use ckb_store::ChainStore; +use ckb_types::core::cell::setup_system_cell_cache; -const SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN: usize = 20; - -pub fn run(mut args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { +pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); - info_target!(crate::LOG_TARGET_MAIN, "ckb version: {}", version); + info!("ckb version: {}", version); + + let mut launcher = Launcher::new(args, version, async_handle); - let block_assembler_config = sanitize_block_assembler_config(&args)?; + let block_assembler_config = launcher.sanitize_block_assembler_config()?; let miner_enable = block_assembler_config.is_some(); let exit_handler = DefaultExitHandler::default(); - let (shared, table) = { - let shared_builder = SharedBuilder::new( - &args.config.db, - Some(args.config.ancient.clone()), - async_handle, - ); - - if shared_builder.require_expensive_migrations() { - eprintln!( - "For optimal performance, CKB wants to migrate the data into new format.\n\ - You can use the old version CKB if you don't want to do the migration.\n\ - We strongly recommended you to use the latest stable version of CKB, \ - since the old versions may have unfixed vulnerabilities.\n\ - Run `ckb migrate --help` for more information about migration." - ); - return Err(ExitCode::Failure); - } - - shared_builder - .consensus(args.consensus.clone()) - .tx_pool_config(args.config.tx_pool) - .notify_config(args.config.notify.clone()) - .store_config(args.config.store) - .block_assembler_config(block_assembler_config) - .build() - .map_err(|err| { - eprintln!("Run error: {:?}", err); - ExitCode::Failure - })? - }; - - // Verify genesis every time starting node - verify_genesis(&shared)?; - check_spec(&shared, &args)?; + launcher.migrate_guard()?; + let (shared, table) = launcher.build_shared(block_assembler_config)?; // spawn freezer background process let _freezer = shared.spawn_freeze(); @@ -80,111 +36,16 @@ pub fn run(mut args: RunArgs, version: Version, async_handle: Handle) -> Result< .expect("Init the global thread pool for rayon failed"); ckb_memory_tracker::track_current_process( - args.config.memory_tracker.interval, + launcher.args.config.memory_tracker.interval, Some(shared.store().db().inner()), ); - // Check whether the data already exists in the database before starting - if let Some(ref target) = args.config.network.sync.assume_valid_target { - if shared.snapshot().block_exists(&target.pack()) { - args.config.network.sync.assume_valid_target.take(); - } - } - - let chain_service = ChainService::new(shared.clone(), table); - let chain_controller = chain_service.start(Some("ChainService")); - info_target!( - crate::LOG_TARGET_MAIN, - "chain genesis hash: {:#x}", - shared.genesis_hash() - ); - - let sync_shared = Arc::new(SyncShared::with_tmpdir( - shared.clone(), - args.config.network.sync.clone(), - args.config.tmp_dir.as_ref(), - )); - let network_state = Arc::new( - NetworkState::from_config(args.config.network).expect("Init network state failed"), - ); - let synchronizer = Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared)); - - let relayer = Relayer::new( - chain_controller.clone(), - Arc::clone(&sync_shared), - args.config.tx_pool.min_fee_rate, - args.config.tx_pool.max_tx_verify_cycles, - ); - let net_timer = NetTimeProtocol::default(); - let alert_signature_config = args.config.alert_signature.unwrap_or_default(); - let alert_relayer = AlertRelayer::new( - version.to_string(), - shared.notify_controller().clone(), - alert_signature_config, - ); - - let alert_notifier = Arc::clone(alert_relayer.notifier()); - let alert_verifier = Arc::clone(alert_relayer.verifier()); - - let protocols = vec![ - CKBProtocol::new_with_support_protocol( - SupportProtocols::Sync, - Box::new(synchronizer.clone()), - Arc::clone(&network_state), - ), - CKBProtocol::new_with_support_protocol( - SupportProtocols::Relay, - Box::new(relayer), - Arc::clone(&network_state), - ), - CKBProtocol::new_with_support_protocol( - SupportProtocols::Time, - Box::new(net_timer), - Arc::clone(&network_state), - ), - CKBProtocol::new_with_support_protocol( - SupportProtocols::Alert, - Box::new(alert_relayer), - Arc::clone(&network_state), - ), - ]; - - let required_protocol_ids = vec![SupportProtocols::Sync.protocol_id()]; + launcher.check_assume_valid_target(&shared); - let network_controller = NetworkService::new( - Arc::clone(&network_state), - protocols, - required_protocol_ids, - shared.consensus().identify_name(), - version.to_string(), - exit_handler.clone(), - ) - .start(shared.async_handle()) - .expect("Start network service failed"); + let chain_controller = launcher.start_chain_service(&shared, table); - let builder = ServiceBuilder::new(&args.config.rpc) - .enable_chain(shared.clone()) - .enable_pool( - shared.clone(), - Arc::clone(&sync_shared), - args.config.tx_pool.min_fee_rate, - args.config.rpc.reject_ill_transactions, - ) - .enable_miner( - shared.clone(), - network_controller.clone(), - chain_controller.clone(), - miner_enable, - ) - .enable_net(network_controller.clone(), sync_shared) - .enable_stats(shared.clone(), synchronizer, Arc::clone(&alert_notifier)) - .enable_experiment(shared.clone()) - .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) - .enable_debug(); - let io_handler = builder.build(); - - let rpc_server = RpcServer::new(args.config.rpc, io_handler, shared.notify_controller()); + let (network_controller, rpc_server) = + launcher.start_network_and_rpc(&shared, chain_controller, &exit_handler, miner_enable); let exit_handler_clone = exit_handler.clone(); ctrlc::set_handler(move || { @@ -193,131 +54,9 @@ pub fn run(mut args: RunArgs, version: Version, async_handle: Handle) -> Result< .expect("Error setting Ctrl-C handler"); exit_handler.wait_for_exit(); - info_target!(crate::LOG_TARGET_MAIN, "Finishing work, please wait..."); + info!("Finishing work, please wait..."); drop(rpc_server); drop(network_controller); Ok(()) } - -fn verify_genesis(shared: &Shared) -> Result<(), ExitCode> { - GenesisVerifier::new() - .verify(shared.consensus()) - .map_err(|err| { - eprintln!("genesis error: {}", err); - ExitCode::Config - }) -} - -fn check_spec(shared: &Shared, args: &RunArgs) -> Result<(), ExitCode> { - let store = shared.store(); - let stored_spec_hash = store.get_chain_spec_hash(); - - if stored_spec_hash.is_none() { - // fresh yet - write_chain_spec_hash(store, &args.chain_spec_hash)?; - info_target!( - crate::LOG_TARGET_MAIN, - "Touch chain spec hash: {}", - args.chain_spec_hash - ); - } else if stored_spec_hash.as_ref() == Some(&args.chain_spec_hash) { - // stored == configured - // do nothing - } else if args.overwrite_chain_spec { - // stored != configured with --overwrite-spec - write_chain_spec_hash(store, &args.chain_spec_hash)?; - info_target!( - crate::LOG_TARGET_MAIN, - "Overwrite chain spec hash from {} to {}", - stored_spec_hash.expect("checked"), - args.overwrite_chain_spec, - ); - } else if args.skip_chain_spec_check { - // stored != configured with --skip-spec-check - // do nothing - } else { - // stored != configured - eprintln!( - "chain_spec_hash mismatch Config({}) storage({}), pass command line argument \ - --skip-spec-check if you are sure that the two different chains are compatible; \ - or pass --overwrite-spec to force overriding stored chain spec with configured chain spec", - args.chain_spec_hash, stored_spec_hash.expect("checked") - ); - return Err(ExitCode::Config); - } - Ok(()) -} - -fn write_chain_spec_hash(store: &ChainDB, chain_spec_hash: &Byte32) -> Result<(), ExitCode> { - store.put_chain_spec_hash(chain_spec_hash).map_err(|err| { - eprintln!( - "store.put_chain_spec_hash {} error: {}", - chain_spec_hash, err - ); - ExitCode::IO - }) -} - -fn sanitize_block_assembler_config( - args: &RunArgs, -) -> Result, ExitCode> { - let block_assembler_config = match ( - args.config.rpc.miner_enable(), - args.config.block_assembler.clone(), - ) { - (true, Some(block_assembler)) => { - let check_lock_code_hash = |code_hash| -> Result { - let secp_cell_data = - Resource::bundled("specs/cells/secp256k1_blake160_sighash_all".to_string()) - .get() - .map_err(|err| { - eprintln!( - "Load specs/cells/secp256k1_blake160_sighash_all error: {:?}", - err - ); - ExitCode::Failure - })?; - let genesis_cellbase = &args.consensus.genesis_block().transactions()[0]; - Ok(genesis_cellbase - .outputs() - .into_iter() - .zip(genesis_cellbase.outputs_data().into_iter()) - .any(|(output, data)| { - data.raw_data() == secp_cell_data.as_ref() - && output - .type_() - .to_opt() - .map(|script| script.calc_script_hash()) - .as_ref() - == Some(code_hash) - })) - }; - if args.block_assembler_advanced - || (block_assembler.hash_type == ScriptHashType::Type - && block_assembler.args.len() == SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN - && check_lock_code_hash(&block_assembler.code_hash.pack())?) - { - Some(block_assembler) - } else { - info_target!( - crate::LOG_TARGET_MAIN, - "Miner is disabled because block assmebler is not a recommended lock format. \ - Edit ckb.toml or use `ckb run --ba-advanced` to use other lock scripts" - ); - - None - } - } - - _ => { - info_target!( - crate::LOG_TARGET_MAIN, - "Miner is disabled, edit ckb.toml to enable it" - ); - - None - } - }; - Ok(block_assembler_config) -} diff --git a/ckb-bin/src/subcommand/stats.rs b/ckb-bin/src/subcommand/stats.rs index 811082be52..d2c44f2602 100644 --- a/ckb-bin/src/subcommand/stats.rs +++ b/ckb-bin/src/subcommand/stats.rs @@ -18,7 +18,7 @@ struct Statics { impl Statics { pub fn build(args: StatsArgs, async_handle: Handle) -> Result { - let (shared, _) = SharedBuilder::new(&args.config.db, None, async_handle) + let (shared, _) = SharedBuilder::new(&args.config.db, async_handle) .consensus(args.consensus) .build() .map_err(|err| { diff --git a/shared/Cargo.toml b/shared/Cargo.toml index ef7df3e1ad..f602edf09c 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -21,15 +21,11 @@ ckb-tx-pool = { path = "../tx-pool", version = "= 0.41.0-pre" } ckb-verification = { path = "../verification", version = "= 0.41.0-pre" } ckb-notify = { path = "../notify", version = "= 0.41.0-pre" } ckb-app-config = { path = "../util/app-config", version = "= 0.41.0-pre" } -ckb-db-migration = { path = "../db-migration", version = "= 0.41.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.41.0-pre" } ckb-freezer = { path = "../freezer", version = "= 0.41.0-pre" } ckb-db-schema = { path = "../db-schema", version = "= 0.41.0-pre" } ckb-async-runtime = { path = "../util/runtime", version = "= 0.41.0-pre" } ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.41.0-pre" } ckb-channel = { path = "../util/channel", version = "= 0.41.0-pre" } -ckb-migration-template = { path = "migration-template", version = "= 0.41.0-pre" } ckb-constant = { path = "../util/constant", version = "= 0.41.0-pre" } -num_cpus = "1.10" faketime = "0.2.0" -tempfile = "3.0" diff --git a/shared/src/lib.rs b/shared/src/lib.rs index d0ca906b70..9650169756 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,10 +1,6 @@ //! TODO(doc): @quake // num_cpus is used in proc_macro -// declare here for mute ./devtools/ci/check-cargotoml.sh error -extern crate num_cpus; - -mod migrations; pub mod shared; pub use ckb_snapshot::{Snapshot, SnapshotMgr}; diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 5b4ce11bdc..efd02e60af 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -1,5 +1,5 @@ //! TODO(doc): @quake -use crate::{migrations, Snapshot, SnapshotMgr}; +use crate::{Snapshot, SnapshotMgr}; use arc_swap::Guard; use ckb_app_config::{BlockAssemblerConfig, DBConfig, NotifyConfig, StoreConfig, TxPoolConfig}; use ckb_async_runtime::{new_global_runtime, Handle}; @@ -7,8 +7,7 @@ use ckb_chain_spec::consensus::Consensus; use ckb_chain_spec::SpecError; use ckb_constant::store::TX_INDEX_UPPER_BOUND; use ckb_constant::sync::MAX_TIP_AGE; -use ckb_db::{Direction, IteratorMode, ReadOnlyDB, RocksDB}; -use ckb_db_migration::{DefaultMigration, Migrations}; +use ckb_db::{Direction, IteratorMode, RocksDB}; use ckb_db_schema::COLUMN_BLOCK_BODY; use ckb_db_schema::{COLUMNS, COLUMN_NUMBER_HASH}; use ckb_error::{Error, InternalErrorKind}; @@ -503,39 +502,29 @@ impl Shared { /// TODO(doc): @quake pub struct SharedBuilder { - db_config: DBConfig, + db: RocksDB, ancient_path: Option, consensus: Option, tx_pool_config: Option, store_config: Option, block_assembler_config: Option, notify_config: Option, - migrations: Migrations, async_handle: Handle, // async stop handle, only test will be assigned async_stop: Option>, } -const INIT_DB_VERSION: &str = "20191127135521"; - impl SharedBuilder { /// Generates the base SharedBuilder with ancient path and async_handle - pub fn new(db_config: &DBConfig, ancient: Option, async_handle: Handle) -> Self { - let mut migrations = Migrations::default(); - migrations.add_migration(Box::new(DefaultMigration::new(INIT_DB_VERSION))); - migrations.add_migration(Box::new(migrations::ChangeMoleculeTableToStruct)); - migrations.add_migration(Box::new(migrations::CellMigration)); - migrations.add_migration(Box::new(migrations::AddNumberHashMapping)); - + pub fn new(db_config: &DBConfig, async_handle: Handle) -> Self { SharedBuilder { - db_config: db_config.clone(), - ancient_path: ancient, + db: RocksDB::open(db_config, COLUMNS), + ancient_path: None, consensus: None, tx_pool_config: None, notify_config: None, store_config: None, block_assembler_config: None, - migrations, async_handle, async_stop: None, } @@ -544,20 +533,14 @@ impl SharedBuilder { /// Generates the SharedBuilder with temp db pub fn with_temp_db() -> Self { let (handle, stop) = new_global_runtime(); - let tmp_dir = tempfile::Builder::new().tempdir().unwrap(); - let db_config = DBConfig { - path: tmp_dir.path().to_path_buf(), - ..Default::default() - }; SharedBuilder { - db_config, + db: RocksDB::open_tmp(COLUMNS), ancient_path: None, consensus: None, tx_pool_config: None, notify_config: None, store_config: None, block_assembler_config: None, - migrations: Migrations::default(), async_handle: handle, async_stop: Some(stop), } @@ -565,24 +548,6 @@ impl SharedBuilder { } impl SharedBuilder { - /// Check whether database requires migration - /// - /// Return true if migration is required - pub fn migration_check(&self) -> bool { - ReadOnlyDB::open(&self.db_config.path) - .unwrap_or_else(|err| panic!("{}", err)) - .map(|db| self.migrations.check(&db)) - .unwrap_or(false) - } - - /// Check whether database requires expensive migrations. - pub fn require_expensive_migrations(&self) -> bool { - ReadOnlyDB::open(&self.db_config.path) - .unwrap_or_else(|err| panic!("{}", err)) - .map(|db| self.migrations.expensive(&db)) - .unwrap_or(false) - } - /// TODO(doc): @quake pub fn consensus(mut self, value: Consensus) -> Self { self.consensus = Some(value); @@ -626,18 +591,11 @@ impl SharedBuilder { let notify_config = self.notify_config.unwrap_or_else(Default::default); let store_config = self.store_config.unwrap_or_else(Default::default); - if let Some(migration_db) = - RocksDB::prepare_for_bulk_load_open(&self.db_config.path, COLUMNS)? - { - self.migrations.migrate(migration_db)?; - } - - let db = RocksDB::open(&self.db_config, COLUMNS); let store = if store_config.freezer_enable && self.ancient_path.is_some() { let freezer = Freezer::open(self.ancient_path.expect("exist checked"))?; - ChainDB::new_with_freezer(db, freezer, store_config) + ChainDB::new_with_freezer(self.db, freezer, store_config) } else { - ChainDB::new(db, store_config) + ChainDB::new(self.db, store_config) }; Shared::init( diff --git a/test/src/specs/p2p/malformed_message.rs b/test/src/specs/p2p/malformed_message.rs index a0860eae56..623613801a 100644 --- a/test/src/specs/p2p/malformed_message.rs +++ b/test/src/specs/p2p/malformed_message.rs @@ -1,5 +1,5 @@ use crate::util::mining::out_ibd_mode; -use crate::utils::wait_until; +use crate::utils::{sleep, wait_until}; use crate::{Net, Node, Spec}; use ckb_logger::info; use ckb_network::{bytes::Bytes, SupportProtocols}; @@ -75,6 +75,9 @@ impl Spec for MalformedMessageWithWhitelist { config.network.whitelist_peers = vec![net.p2p_address().parse().unwrap()] }); node0.start(); + // FIXME + // currrently, we have no idea how to fix autobind_reuse + sleep(5); net.connect(&node0); let rpc_client = node0.rpc_client(); diff --git a/util/launcher/Cargo.toml b/util/launcher/Cargo.toml new file mode 100644 index 0000000000..338fd3517c --- /dev/null +++ b/util/launcher/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "ckb-launcher" +version = "0.41.0-pre" +license = "MIT" +authors = ["Nervos Core Dev "] +edition = "2018" +description = "CKB tool to import/export chain data." +homepage = "https://github.com/nervosnetwork/ckb" +repository = "https://github.com/nervosnetwork/ckb" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ckb-types = { path = "../types", version = "= 0.41.0-pre" } +ckb-store = { path = "../../store", version = "= 0.41.0-pre" } +ckb-db = { path = "../../db", version = "= 0.41.0-pre" } +ckb-migration-template = { path = "migration-template", version = "= 0.41.0-pre" } +ckb-app-config = { path = "../app-config", version = "= 0.41.0-pre" } +ckb-db-migration = { path = "../../db-migration", version = "= 0.41.0-pre" } +ckb-logger = { path = "../logger", version = "= 0.41.0-pre" } +ckb-freezer = { path = "../../freezer", version = "= 0.41.0-pre" } +ckb-db-schema = { path = "../../db-schema", version = "= 0.41.0-pre" } +ckb-error = { path = "../../error", version = "= 0.41.0-pre" } +ckb-build-info = { path = "../build-info", version = "= 0.41.0-pre" } +ckb-jsonrpc-types = { path = "../jsonrpc-types", version = "= 0.41.0-pre" } +ckb-chain = { path = "../../chain", version = "= 0.41.0-pre" } +ckb-shared = { path = "../../shared", version = "= 0.41.0-pre" } +ckb-network = { path = "../../network", version = "= 0.41.0-pre"} +ckb-rpc = { path = "../../rpc", version = "= 0.41.0-pre"} +ckb-resource = { path = "../../resource", version = "= 0.41.0-pre"} +ckb-network-alert = { path = "../network-alert", version = "= 0.41.0-pre" } +ckb-sync = { path = "../../sync", version = "= 0.41.0-pre"} +ckb-verification = { path = "../../verification", version = "= 0.41.0-pre" } +ckb-verification-traits = { path = "../../verification/traits", version = "= 0.41.0-pre" } +ckb-async-runtime = { path = "../runtime", version = "= 0.41.0-pre" } +ckb-proposal-table = { path = "../proposal-table", version = "= 0.41.0-pre" } +num_cpus = "1.10" + + +[features] +with_sentry = [ "ckb-sync/with_sentry", "ckb-network/with_sentry", "ckb-app-config/with_sentry" ] diff --git a/util/launcher/migration-template/Cargo.toml b/util/launcher/migration-template/Cargo.toml new file mode 100644 index 0000000000..07e2c50094 --- /dev/null +++ b/util/launcher/migration-template/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ckb-migration-template" +version = "0.41.0-pre" +license = "MIT" +authors = ["Nervos "] +edition = "2018" +description = "Provide proc-macros to setup migration." +homepage = "https://github.com/nervosnetwork/ckb" +repository = "https://github.com/nervosnetwork/ckb" + +[lib] +proc-macro = true + +[dependencies] +quote = "1.0" +syn = { version = "1.0", features = ["full", "printing"] } diff --git a/util/launcher/migration-template/src/lib.rs b/util/launcher/migration-template/src/lib.rs new file mode 100644 index 0000000000..7f6568418a --- /dev/null +++ b/util/launcher/migration-template/src/lib.rs @@ -0,0 +1,75 @@ +//! Provide proc-macros to setup migration. + +extern crate proc_macro; + +use proc_macro::TokenStream; +use quote::quote; +use syn::parse_macro_input; + +/// multi thread migration template +#[proc_macro] +pub fn multi_thread_migration(input: TokenStream) -> TokenStream { + let block_expr = parse_macro_input!(input as syn::ExprBlock); + let expanded = quote! { + const MAX_THREAD: u64 = 6; + const MIN_THREAD: u64 = 2; + const BATCH: usize = 1_000; + + let chain_db = ChainDB::new(db, StoreConfig::default()); + let tip = chain_db.get_tip_header().expect("db tip header index"); + let tip_number = tip.number(); + + let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64); + let tb_num = std::cmp::min(tb_num, MAX_THREAD); + let chunk_size = tip_number / tb_num; + let remainder = tip_number % tb_num; + let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize)); + + let handles: Vec<_> = (0..tb_num).map(|i| { + let chain_db = chain_db.clone(); + let pb = ::std::sync::Arc::clone(&pb); + let barrier = Arc::clone(&_barrier); + + let last = i == (tb_num - 1); + let size = if last { + chunk_size + remainder + } else { + chunk_size + }; + let end = if last { + tip_number + 1 + } else { + (i + 1) * chunk_size + }; + + let pbi = pb(size * 2); + pbi.set_style( + ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}", + ) + .progress_chars("#>-"), + ); + pbi.set_position(0); + pbi.enable_steady_tick(5000); + ::std::thread::spawn(move || { + let mut wb = chain_db.new_write_batch(); + + #block_expr + + if !wb.is_empty() { + chain_db.write(&wb).unwrap(); + } + pbi.finish_with_message("done!"); + }) + }).collect(); + + // Wait for other threads to finish. + for handle in handles { + handle.join().unwrap(); + } + Ok(chain_db.into_inner()) + }; + + TokenStream::from(expanded) +} diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs new file mode 100644 index 0000000000..4b8b0089ab --- /dev/null +++ b/util/launcher/src/lib.rs @@ -0,0 +1,395 @@ +//! CKB launcher. +//! +//! ckb launcher is helps to launch ckb node. + +// declare here for mute ./devtools/ci/check-cargotoml.sh error +extern crate num_cpus; + +mod migrations; + +use ckb_app_config::{BlockAssemblerConfig, ExitCode, RunArgs}; +use ckb_async_runtime::Handle; +use ckb_build_info::Version; +use ckb_chain::chain::{ChainController, ChainService}; +use ckb_db::{ReadOnlyDB, RocksDB}; +use ckb_db_migration::{DefaultMigration, Migrations}; +use ckb_db_schema::COLUMNS; +use ckb_error::Error; +use ckb_freezer::Freezer; +use ckb_jsonrpc_types::ScriptHashType; +use ckb_logger::info; +use ckb_network::{ + CKBProtocol, DefaultExitHandler, NetworkController, NetworkService, NetworkState, + SupportProtocols, +}; +use ckb_network_alert::alert_relayer::AlertRelayer; +use ckb_proposal_table::ProposalTable; +use ckb_resource::Resource; +use ckb_rpc::{RpcServer, ServiceBuilder}; +use ckb_shared::shared::Shared; +use ckb_store::{ChainDB, ChainStore}; +use ckb_sync::{NetTimeProtocol, Relayer, SyncShared, Synchronizer}; +use ckb_types::prelude::*; +use ckb_verification::GenesisVerifier; +use ckb_verification_traits::Verifier; +use std::path::PathBuf; +use std::sync::Arc; + +const INIT_DB_VERSION: &str = "20191127135521"; +const SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN: usize = 20; + +/// Wrapper contains migration and db +pub struct DatabaseMigration { + migrations: Migrations, + path: PathBuf, +} + +impl DatabaseMigration { + /// Open db with bulk loading parameters, init migration + pub fn new>(path: P) -> Self { + let mut migrations = Migrations::default(); + migrations.add_migration(Box::new(DefaultMigration::new(INIT_DB_VERSION))); + migrations.add_migration(Box::new(migrations::ChangeMoleculeTableToStruct)); + migrations.add_migration(Box::new(migrations::CellMigration)); + migrations.add_migration(Box::new(migrations::AddNumberHashMapping)); + + DatabaseMigration { + migrations, + path: path.into(), + } + } + + /// Return true if migration is required + pub fn migration_check(&self) -> bool { + ReadOnlyDB::open(&self.path) + .unwrap_or_else(|err| panic!("{}", err)) + .map(|db| self.migrations.check(&db)) + .unwrap_or(false) + } + + /// Check whether database requires expensive migrations. + pub fn require_expensive_migrations(&self) -> bool { + ReadOnlyDB::open(&self.path) + .unwrap_or_else(|err| panic!("{}", err)) + .map(|db| self.migrations.expensive(&db)) + .unwrap_or(false) + } + + /// Perform migrate. + pub fn migrate(self) -> Result<(), Error> { + if let Some(db) = RocksDB::prepare_for_bulk_load_open(&self.path, COLUMNS)? { + self.migrations.migrate(db)?; + } + Ok(()) + } +} + +/// Ckb launcher is helps to launch ckb node. +pub struct Launcher { + /// cli `run` subcommand parsed args + pub args: RunArgs, + /// ckb node version + pub version: Version, + /// ckb global runtime handle + pub async_handle: Handle, +} + +impl Launcher { + /// Construct new Launcher from cli args + pub fn new(args: RunArgs, version: Version, async_handle: Handle) -> Self { + Launcher { + args, + version, + async_handle, + } + } + + /// Sanitize block assembler config + pub fn sanitize_block_assembler_config( + &self, + ) -> Result, ExitCode> { + let block_assembler_config = match ( + self.args.config.rpc.miner_enable(), + self.args.config.block_assembler.clone(), + ) { + (true, Some(block_assembler)) => { + let check_lock_code_hash = |code_hash| -> Result { + let secp_cell_data = + Resource::bundled("specs/cells/secp256k1_blake160_sighash_all".to_string()) + .get() + .map_err(|err| { + eprintln!( + "Load specs/cells/secp256k1_blake160_sighash_all error: {:?}", + err + ); + ExitCode::Failure + })?; + let genesis_cellbase = &self.args.consensus.genesis_block().transactions()[0]; + Ok(genesis_cellbase + .outputs() + .into_iter() + .zip(genesis_cellbase.outputs_data().into_iter()) + .any(|(output, data)| { + data.raw_data() == secp_cell_data.as_ref() + && output + .type_() + .to_opt() + .map(|script| script.calc_script_hash()) + .as_ref() + == Some(code_hash) + })) + }; + if self.args.block_assembler_advanced + || (block_assembler.hash_type == ScriptHashType::Type + && block_assembler.args.len() == SECP256K1_BLAKE160_SIGHASH_ALL_ARG_LEN + && check_lock_code_hash(&block_assembler.code_hash.pack())?) + { + Some(block_assembler) + } else { + info!( + "Miner is disabled because block assmebler is not a recommended lock format. \ + Edit ckb.toml or use `ckb run --ba-advanced` to use other lock scripts" + ); + + None + } + } + + _ => { + info!("Miner is disabled, edit ckb.toml to enable it"); + + None + } + }; + Ok(block_assembler_config) + } + + /// Migrate prompt + pub fn migrate_guard(&self) -> Result<(), ExitCode> { + let migration = DatabaseMigration::new(&self.args.config.db.path); + if migration.require_expensive_migrations() { + eprintln!( + "For optimal performance, CKB wants to migrate the data into new format.\n\ + You can use the old version CKB if you don't want to do the migration.\n\ + We strongly recommended you to use the latest stable version of CKB, \ + since the old versions may have unfixed vulnerabilities.\n\ + Run `ckb migrate --help` for more information about migration." + ); + return Err(ExitCode::Failure); + } + Ok(()) + } + + fn write_chain_spec_hash(&self, store: &ChainDB) -> Result<(), ExitCode> { + store + .put_chain_spec_hash(&self.args.chain_spec_hash) + .map_err(|err| { + eprintln!( + "store.put_chain_spec_hash {} error: {}", + self.args.chain_spec_hash, err + ); + ExitCode::IO + }) + } + + fn check_spec(&self, shared: &Shared) -> Result<(), ExitCode> { + let store = shared.store(); + let stored_spec_hash = store.get_chain_spec_hash(); + + if stored_spec_hash.is_none() { + // fresh yet + self.write_chain_spec_hash(store)?; + info!("Touch chain spec hash: {}", self.args.chain_spec_hash); + } else if stored_spec_hash.as_ref() == Some(&self.args.chain_spec_hash) { + // stored == configured + // do nothing + } else if self.args.overwrite_chain_spec { + // stored != configured with --overwrite-spec + self.write_chain_spec_hash(store)?; + info!( + "Overwrite chain spec hash from {} to {}", + stored_spec_hash.expect("checked"), + self.args.overwrite_chain_spec, + ); + } else if self.args.skip_chain_spec_check { + // stored != configured with --skip-spec-check + // do nothing + } else { + // stored != configured + eprintln!( + "chain_spec_hash mismatch Config({}) storage({}), pass command line argument \ + --skip-spec-check if you are sure that the two different chains are compatible; \ + or pass --overwrite-spec to force overriding stored chain spec with configured chain spec", + self.args.chain_spec_hash, stored_spec_hash.expect("checked") + ); + return Err(ExitCode::Config); + } + Ok(()) + } + + fn verify_genesis(&self, shared: &Shared) -> Result<(), ExitCode> { + GenesisVerifier::new() + .verify(shared.consensus()) + .map_err(|err| { + eprintln!("genesis error: {}", err); + ExitCode::Config + }) + } + + /// Build shared + pub fn build_shared( + &self, + block_assembler_config: Option, + ) -> Result<(Shared, ProposalTable), ExitCode> { + let db = RocksDB::open(&self.args.config.db, COLUMNS); + let store = if self.args.config.store.freezer_enable { + let freezer = Freezer::open(self.args.config.ancient.clone()).map_err(|err| { + eprintln!("Freezer open error: {:?}", err); + ExitCode::Failure + })?; + ChainDB::new_with_freezer(db, freezer, self.args.config.store) + } else { + ChainDB::new(db, self.args.config.store) + }; + + let (shared, table) = Shared::init( + store, + self.args.consensus.clone(), + self.args.config.tx_pool, + self.args.config.notify.clone(), + block_assembler_config, + self.async_handle.clone(), + None, + ) + .map_err(|err| { + eprintln!("Shared init error: {:?}", err); + ExitCode::Failure + })?; + + // Verify genesis every time starting node + self.verify_genesis(&shared)?; + self.check_spec(&shared)?; + + Ok((shared, table)) + } + + /// Check whether the data already exists in the database before starting + pub fn check_assume_valid_target(&mut self, shared: &Shared) { + if let Some(ref target) = self.args.config.network.sync.assume_valid_target { + if shared.snapshot().block_exists(&target.pack()) { + self.args.config.network.sync.assume_valid_target.take(); + } + } + } + + /// Start chain service, return ChainController + pub fn start_chain_service(&self, shared: &Shared, table: ProposalTable) -> ChainController { + let chain_service = ChainService::new(shared.clone(), table); + let chain_controller = chain_service.start(Some("ChainService")); + info!("chain genesis hash: {:#x}", shared.genesis_hash()); + chain_controller + } + + /// Start network service and rpc servre + pub fn start_network_and_rpc( + &self, + shared: &Shared, + chain_controller: ChainController, + exit_handler: &DefaultExitHandler, + miner_enable: bool, + ) -> (NetworkController, RpcServer) { + let sync_shared = Arc::new(SyncShared::with_tmpdir( + shared.clone(), + self.args.config.network.sync.clone(), + self.args.config.tmp_dir.as_ref(), + )); + let network_state = Arc::new( + NetworkState::from_config(self.args.config.network.clone()) + .expect("Init network state failed"), + ); + let synchronizer = Synchronizer::new(chain_controller.clone(), Arc::clone(&sync_shared)); + + let relayer = Relayer::new( + chain_controller.clone(), + Arc::clone(&sync_shared), + self.args.config.tx_pool.min_fee_rate, + self.args.config.tx_pool.max_tx_verify_cycles, + ); + let net_timer = NetTimeProtocol::default(); + let alert_signature_config = self.args.config.alert_signature.clone().unwrap_or_default(); + let alert_relayer = AlertRelayer::new( + self.version.to_string(), + shared.notify_controller().clone(), + alert_signature_config, + ); + + let alert_notifier = Arc::clone(alert_relayer.notifier()); + let alert_verifier = Arc::clone(alert_relayer.verifier()); + + let protocols = vec![ + CKBProtocol::new_with_support_protocol( + SupportProtocols::Sync, + Box::new(synchronizer.clone()), + Arc::clone(&network_state), + ), + CKBProtocol::new_with_support_protocol( + SupportProtocols::Relay, + Box::new(relayer), + Arc::clone(&network_state), + ), + CKBProtocol::new_with_support_protocol( + SupportProtocols::Time, + Box::new(net_timer), + Arc::clone(&network_state), + ), + CKBProtocol::new_with_support_protocol( + SupportProtocols::Alert, + Box::new(alert_relayer), + Arc::clone(&network_state), + ), + ]; + + let required_protocol_ids = vec![SupportProtocols::Sync.protocol_id()]; + + let network_controller = NetworkService::new( + Arc::clone(&network_state), + protocols, + required_protocol_ids, + shared.consensus().identify_name(), + self.version.to_string(), + exit_handler.clone(), + ) + .start(shared.async_handle()) + .expect("Start network service failed"); + + let builder = ServiceBuilder::new(&self.args.config.rpc) + .enable_chain(shared.clone()) + .enable_pool( + shared.clone(), + Arc::clone(&sync_shared), + self.args.config.tx_pool.min_fee_rate, + self.args.config.rpc.reject_ill_transactions, + ) + .enable_miner( + shared.clone(), + network_controller.clone(), + chain_controller.clone(), + miner_enable, + ) + .enable_net(network_controller.clone(), sync_shared) + .enable_stats(shared.clone(), synchronizer, Arc::clone(&alert_notifier)) + .enable_experiment(shared.clone()) + .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + .enable_debug(); + let io_handler = builder.build(); + + let rpc_server = RpcServer::new( + self.args.config.rpc.clone(), + io_handler, + shared.notify_controller(), + ); + + (network_controller, rpc_server) + } +} diff --git a/shared/src/migrations/add_number_hash_mapping.rs b/util/launcher/src/migrations/add_number_hash_mapping.rs similarity index 100% rename from shared/src/migrations/add_number_hash_mapping.rs rename to util/launcher/src/migrations/add_number_hash_mapping.rs diff --git a/shared/src/migrations/cell.rs b/util/launcher/src/migrations/cell.rs similarity index 100% rename from shared/src/migrations/cell.rs rename to util/launcher/src/migrations/cell.rs diff --git a/shared/src/migrations/mod.rs b/util/launcher/src/migrations/mod.rs similarity index 100% rename from shared/src/migrations/mod.rs rename to util/launcher/src/migrations/mod.rs diff --git a/shared/src/migrations/table_to_struct.rs b/util/launcher/src/migrations/table_to_struct.rs similarity index 100% rename from shared/src/migrations/table_to_struct.rs rename to util/launcher/src/migrations/table_to_struct.rs