From 4a18d424223399d193efe4cb7a0f13d4a604400c Mon Sep 17 00:00:00 2001 From: Kostia Balytskyi Date: Wed, 8 Jan 2020 06:13:44 -0800 Subject: [PATCH] mononoke: add the ability to set the number of theads in tokio Runtime from CLI Summary: The default behavior of `tokio::runtime::Runtime` is to use the number of CPUs as per `num_cpus::get().max(1)` (see [1], [2]). This is a reasonable default on physical systems with unlimited resource access, but in containerized environment Mononoke may be limited to a smaller number of CPUs than totally available on a system. Let's add the ability to override the size of the thread pool on the command line. [1] https://github.com/tokio-rs/tokio/blob/tokio-udp-0.1.2/tokio-threadpool/src/builder.rs#L93 [2] https://github.com/tokio-rs/tokio/issues/400 Reviewed By: farnz Differential Revision: D19309494 fbshipit-source-id: 1fa4054b940bf225fac5f1a518f5cbb50d882fda --- cmdlib/src/args.rs | 22 ++++++++++++++++++++-- cmdlib/src/helpers.rs | 15 +++++++++++---- cmds/aliasverify.rs | 4 ++-- cmds/backfill_derived_data.rs | 4 ++-- cmds/blobimport.rs | 4 ++-- cmds/blobstore_healer/main.rs | 4 ++-- cmds/bonsai_verify/main.rs | 4 ++-- cmds/lfs_import.rs | 4 ++-- cmds/rechunker.rs | 4 ++-- cmds/statistics_collector.rs | 4 ++-- cmds/upload_globalrevs.rs | 4 ++-- hook_tailer/main.rs | 4 ++-- lfs_server/src/main.rs | 4 ++-- pushrebase/src/lib.rs | 8 ++++---- server/src/main.rs | 2 +- walker/src/main.rs | 4 ++-- 16 files changed, 60 insertions(+), 35 deletions(-) diff --git a/cmdlib/src/args.rs b/cmdlib/src/args.rs index 61e82707..1ff2e16c 100644 --- a/cmdlib/src/args.rs +++ b/cmdlib/src/args.rs @@ -8,6 +8,7 @@ use std::{ collections::{HashMap, HashSet}, + io, path::Path, sync::Arc, }; @@ -36,8 +37,8 @@ use slog_logview::LogViewDrain; use sql_ext::MysqlOptions; use crate::helpers::{ - init_cachelib_from_settings, open_sql_with_config_and_mysql_options, setup_repo_dir, - CachelibSettings, CreateStorage, + create_runtime, init_cachelib_from_settings, open_sql_with_config_and_mysql_options, + setup_repo_dir, CachelibSettings, CreateStorage, }; use crate::log; @@ -52,6 +53,7 @@ const TARGET_REPO_NAME: &str = "target-repo-name"; const ENABLE_MCROUTER: &str = "enable-mcrouter"; const MYSQL_MYROUTER_PORT: &str = "myrouter-port"; const MYSQL_MASTER_ONLY: &str = "mysql-master-only"; +const RUNTIME_THREADS: &str = "runtime-threads"; const CACHE_ARGS: &[(&str, &str)] = &[ ("blob-cache-size", "override size of the blob cache"), @@ -248,11 +250,21 @@ impl MononokeApp { app = add_logger_args(app); app = add_mysql_options_args(app); app = add_cachelib_args(app, self.hide_advanced_args); + app = add_runtime_args(app); app } } +pub fn add_runtime_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> { + app.arg( + Arg::with_name(RUNTIME_THREADS) + .long(RUNTIME_THREADS) + .takes_value(true) + .help("a number of threads to use in the tokio runtime"), + ) +} + pub fn add_logger_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> { app.arg( Arg::with_name("panic-fate") @@ -960,3 +972,9 @@ pub fn parse_disabled_hooks_no_repo_prefix( disabled_hooks } + +/// Initialize a new `tokio::runtime::Runtime` with thread number parsed from the CLI +pub fn init_runtime(matches: &ArgMatches) -> io::Result { + let core_threads = get_usize_opt(matches, RUNTIME_THREADS); + create_runtime(None, core_threads) +} diff --git a/cmdlib/src/helpers.rs b/cmdlib/src/helpers.rs index 1412a2c1..02e2cb8f 100644 --- a/cmdlib/src/helpers.rs +++ b/cmdlib/src/helpers.rs @@ -308,8 +308,15 @@ where } } -pub fn create_runtime(log_thread_name_prefix: Option<&str>) -> io::Result { - tokio::runtime::Builder::new() - .name_prefix(log_thread_name_prefix.unwrap_or("tk-")) - .build() +/// Get a tokio `Runtime` with potentially explicitly set number of core threads +pub fn create_runtime( + log_thread_name_prefix: Option<&str>, + core_threads: Option, +) -> io::Result { + let mut builder = tokio::runtime::Builder::new(); + builder.name_prefix(log_thread_name_prefix.unwrap_or("tk-")); + if let Some(core_threads) = core_threads { + builder.core_threads(core_threads); + } + builder.build() } diff --git a/cmds/aliasverify.rs b/cmds/aliasverify.rs index 92962145..1c3911aa 100644 --- a/cmds/aliasverify.rs +++ b/cmds/aliasverify.rs @@ -27,7 +27,7 @@ use tokio::prelude::stream::iter_ok; use blobrepo::BlobRepo; use blobstore::Storable; use changesets::SqlChangesets; -use cmdlib::{args, helpers::create_runtime}; +use cmdlib::args; use context::CoreContext; use filestore::{self, Alias, AliasBlob, FetchKey}; use mononoke_types::{ @@ -369,7 +369,7 @@ fn main(fb: FacebookInit) -> Result<()> { .verify_all(ctx, step, min_cs_db_id) }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let result = runtime.block_on(aliasimport); // Let the runtime finish remaining work - uploading logs etc runtime.shutdown_on_idle(); diff --git a/cmds/backfill_derived_data.rs b/cmds/backfill_derived_data.rs index a303506a..820e43fc 100644 --- a/cmds/backfill_derived_data.rs +++ b/cmds/backfill_derived_data.rs @@ -20,7 +20,7 @@ use changesets::{ }; use clap::{Arg, ArgMatches, SubCommand}; use cloned::cloned; -use cmdlib::{args, helpers, helpers::create_runtime, monitoring::start_fb303_and_stats_agg}; +use cmdlib::{args, helpers, monitoring::start_fb303_and_stats_agg}; use context::CoreContext; use dbbookmarks::SqlBookmarks; use deleted_files_manifest::{RootDeletedManifestId, RootDeletedManifestMapping}; @@ -200,7 +200,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { let logger = args::init_logging(fb, &matches); let ctx = CoreContext::new_with_logger(fb, logger.clone()); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let run = match matches.subcommand() { (SUBCOMMAND_BACKFILL, Some(sub_m)) => { diff --git a/cmds/blobimport.rs b/cmds/blobimport.rs index 1760ba68..b843e670 100644 --- a/cmds/blobimport.rs +++ b/cmds/blobimport.rs @@ -15,7 +15,7 @@ use bonsai_globalrev_mapping::SqlBonsaiGlobalrevMapping; use bytes::Bytes; use clap::{App, Arg}; use cloned::cloned; -use cmdlib::{args, helpers::create_runtime, helpers::upload_and_show_trace}; +use cmdlib::{args, helpers::upload_and_show_trace}; use context::CoreContext; use failure_ext::SlogKVError; use fbinit::FacebookInit; @@ -318,7 +318,7 @@ fn main(fb: FacebookInit) -> Result<()> { }, ); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let result = runtime.block_on(blobimport); // Let the runtime finish remaining work - uploading logs etc runtime.shutdown_on_idle(); diff --git a/cmds/blobstore_healer/main.rs b/cmds/blobstore_healer/main.rs index f3fba1c3..258fed65 100644 --- a/cmds/blobstore_healer/main.rs +++ b/cmds/blobstore_healer/main.rs @@ -17,7 +17,7 @@ use blobstore::Blobstore; use blobstore_sync_queue::{BlobstoreSyncQueue, SqlBlobstoreSyncQueue, SqlConstructors}; use clap::{value_t, App}; use cloned::cloned; -use cmdlib::{args, helpers::create_runtime, monitoring}; +use cmdlib::{args, monitoring}; use configerator::ConfigeratorAPI; use context::CoreContext; use dummy::{DummyBlobstore, DummyBlobstoreSyncQueue}; @@ -338,7 +338,7 @@ fn main(fb: FacebookInit) -> Result<()> { } }; - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; // Thread with a thrift service is now detached monitoring::start_fb303_and_stats_agg(fb, &mut runtime, app_name, &logger, &matches)?; diff --git a/cmds/bonsai_verify/main.rs b/cmds/bonsai_verify/main.rs index d37c56da..f98126e0 100644 --- a/cmds/bonsai_verify/main.rs +++ b/cmds/bonsai_verify/main.rs @@ -14,7 +14,7 @@ use anyhow::{bail, format_err, Error, Result}; use blobrepo_utils::{BonsaiMFVerify, BonsaiMFVerifyResult}; use clap::{App, Arg, ArgMatches, SubCommand}; use cloned::cloned; -use cmdlib::{args, helpers::create_runtime}; +use cmdlib::args; use context::CoreContext; use failure_ext::DisplayChain; use fbinit::FacebookInit; @@ -431,6 +431,6 @@ fn subcommmand_hg_manifest_verify( }) }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; runtime.block_on(run) } diff --git a/cmds/lfs_import.rs b/cmds/lfs_import.rs index b9a2c7e5..89f9c208 100644 --- a/cmds/lfs_import.rs +++ b/cmds/lfs_import.rs @@ -9,7 +9,7 @@ use anyhow::{Error, Result}; use bytes::Bytes; use clap::Arg; -use cmdlib::{args, helpers::create_runtime}; +use cmdlib::args; use context::CoreContext; use fbinit::FacebookInit; use futures::{stream, Future, IntoFuture, Stream}; @@ -95,7 +95,7 @@ fn main(fb: FacebookInit) -> Result<()> { .for_each(|_| Ok(())) }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let result = runtime.block_on(import); runtime.shutdown_on_idle(); result diff --git a/cmds/rechunker.rs b/cmds/rechunker.rs index cb616e84..1be076a8 100644 --- a/cmds/rechunker.rs +++ b/cmds/rechunker.rs @@ -19,7 +19,7 @@ use futures::stream::Stream; use mercurial_types::{HgFileNodeId, HgNodeHash}; use std::str::FromStr; -use cmdlib::{args, helpers::create_runtime}; +use cmdlib::args; const NAME: &str = "rechunker"; const DEFAULT_NUM_JOBS: usize = 10; @@ -89,7 +89,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { .for_each(|_| Ok(())) }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let result = runtime.block_on(rechunk); runtime.shutdown_on_idle(); result diff --git a/cmds/statistics_collector.rs b/cmds/statistics_collector.rs index 20f8bd90..0e16f773 100644 --- a/cmds/statistics_collector.rs +++ b/cmds/statistics_collector.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use changesets::{deserialize_cs_entries, ChangesetEntry}; use clap::{App, Arg, SubCommand}; use cloned::cloned; -use cmdlib::{args, helpers::create_runtime, monitoring}; +use cmdlib::{args, monitoring}; use context::CoreContext; use fbinit::FacebookInit; use futures::future; @@ -628,7 +628,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { } }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; monitoring::start_fb303_and_stats_agg( fb, &mut runtime, diff --git a/cmds/upload_globalrevs.rs b/cmds/upload_globalrevs.rs index 933c0a69..60f0dd80 100644 --- a/cmds/upload_globalrevs.rs +++ b/cmds/upload_globalrevs.rs @@ -15,7 +15,7 @@ use bytes::Bytes; use changesets::{deserialize_cs_entries, ChangesetEntry}; use clap::{App, Arg}; use cloned::cloned; -use cmdlib::{args, helpers::create_runtime}; +use cmdlib::args; use context::CoreContext; use fbinit::FacebookInit; use futures::future::{Future, IntoFuture}; @@ -80,6 +80,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { let ctx = CoreContext::new_with_logger(fb, logger.clone()); let globalrevs_store = args::open_sql::(fb, &matches); + let mut runtime = args::init_runtime(&matches)?; let run = args::open_repo(fb, &logger, &matches) .join(globalrevs_store) .and_then({ @@ -90,7 +91,6 @@ fn main(fb: FacebookInit) -> Result<(), Error> { } }); - let mut runtime = create_runtime(None)?; runtime.block_on(run)?; runtime.shutdown_on_idle(); Ok(()) diff --git a/hook_tailer/main.rs b/hook_tailer/main.rs index f9904cd6..b61ff4b1 100644 --- a/hook_tailer/main.rs +++ b/hook_tailer/main.rs @@ -16,7 +16,7 @@ use blobrepo_factory::open_blobrepo; use bookmarks::BookmarkName; use clap::{App, Arg, ArgMatches}; use cloned::cloned; -use cmdlib::helpers::create_runtime; +use cmdlib::args::init_runtime; use context::CoreContext; use fbinit::FacebookInit; use futures::future::{err, ok, Future}; @@ -183,7 +183,7 @@ fn main(fb: FacebookInit) -> Result<()> { } }); - let mut runtime = create_runtime(None)?; + let mut runtime = init_runtime(&matches)?; runtime.block_on(fut) } diff --git a/lfs_server/src/main.rs b/lfs_server/src/main.rs index f93ec6c0..40f76c33 100644 --- a/lfs_server/src/main.rs +++ b/lfs_server/src/main.rs @@ -33,7 +33,7 @@ use tokio_openssl::SslAcceptorExt; use blobrepo::BlobRepo; use blobrepo_factory::open_blobrepo; -use cmdlib::{args, helpers::create_runtime, monitoring::start_fb303_server}; +use cmdlib::{args, monitoring::start_fb303_server}; use failure_ext::chain::ChainExt; use metaconfig_parser::RepoConfigs; use stats::schedule_stats_aggregation; @@ -310,7 +310,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { } }); - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; let stats_aggregation = schedule_stats_aggregation() .map_err(|_| Error::msg("Failed to create stats aggregation worker"))? diff --git a/pushrebase/src/lib.rs b/pushrebase/src/lib.rs index 5633c419..7aec61dd 100644 --- a/pushrebase/src/lib.rs +++ b/pushrebase/src/lib.rs @@ -2988,7 +2988,7 @@ mod tests { #[fbinit::test] fn pushrebase_over_merge_even(fb: FacebookInit) -> Result<()> { - let mut runtime = create_runtime(None)?; + let mut runtime = create_runtime(None, None)?; let ctx = CoreContext::test_mock(fb); let repo = merge_even::getrepo(fb); @@ -3065,7 +3065,7 @@ mod tests { #[fbinit::test] fn pushrebase_of_branch_merge(fb: FacebookInit) -> Result<()> { - let mut runtime = create_runtime(None)?; + let mut runtime = create_runtime(None, None)?; let ctx = CoreContext::test_mock(fb); let repo = blobrepo_factory::new_memblob_empty(None)?; @@ -3194,7 +3194,7 @@ mod tests { #[fbinit::test] fn pushrebase_of_branch_merge_with_removal(fb: FacebookInit) -> Result<()> { - let mut runtime = create_runtime(None)?; + let mut runtime = create_runtime(None, None)?; let ctx = CoreContext::test_mock(fb); let repo = blobrepo_factory::new_memblob_empty(None)?; @@ -3305,7 +3305,7 @@ mod tests { #[fbinit::test] fn pushrebase_of_branch_merge_with_rename(fb: FacebookInit) -> Result<()> { - let mut runtime = create_runtime(None)?; + let mut runtime = create_runtime(None, None)?; let ctx = CoreContext::test_mock(fb); let repo = blobrepo_factory::new_memblob_empty(None)?; diff --git a/server/src/main.rs b/server/src/main.rs index 58f2f559..743e79da 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -77,7 +77,7 @@ fn main(fb: FacebookInit) { let stats_aggregation = stats::schedule_stats_aggregation() .expect("failed to create stats aggregation scheduler"); - let mut runtime = cmdlib::helpers::create_runtime(None)?; + let mut runtime = cmdlib::args::init_runtime(&matches)?; let config = get_config(fb, &matches)?; let cert = matches.value_of("cert").unwrap().to_string(); diff --git a/walker/src/main.rs b/walker/src/main.rs index 385226f7..07ba4472 100644 --- a/walker/src/main.rs +++ b/walker/src/main.rs @@ -14,7 +14,7 @@ use fbinit::FacebookInit; use futures::IntoFuture; use futures_ext::FutureExt; -use cmdlib::{args, helpers::create_runtime, monitoring}; +use cmdlib::{args, monitoring}; use slog::{error, info}; mod blobstore; @@ -46,7 +46,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { .boxify(), }; - let mut runtime = create_runtime(None)?; + let mut runtime = args::init_runtime(&matches)?; monitoring::start_fb303_and_stats_agg(fb, &mut runtime, app_name, &logger, &matches)?; let res = runtime.block_on(future);