Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
mononoke: add the ability to set the number of theads in tokio Runtim…
Browse files Browse the repository at this point in the history
…e from CLI

Summary:
The default behavior of `tokio::runtime::Runtime` is to use the number of CPUs
as per `num_cpus::get().max(1)` (see [1], [2]). This is a reasonable default on
physical systems with unlimited resource access, but in containerized
environment Mononoke may be limited to a smaller number of CPUs than totally
available on a system. Let's add the ability to override the size of the thread
pool on the command line.

[1] https://github.com/tokio-rs/tokio/blob/tokio-udp-0.1.2/tokio-threadpool/src/builder.rs#L93
[2] tokio-rs/tokio#400

Reviewed By: farnz

Differential Revision: D19309494

fbshipit-source-id: 1fa4054b940bf225fac5f1a518f5cbb50d882fda
  • Loading branch information
Kostia Balytskyi authored and facebook-github-bot committed Jan 8, 2020
1 parent 264a815 commit 4a18d42
Show file tree
Hide file tree
Showing 16 changed files with 60 additions and 35 deletions.
22 changes: 20 additions & 2 deletions cmdlib/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use std::{
collections::{HashMap, HashSet},
io,
path::Path,
sync::Arc,
};
Expand Down Expand Up @@ -36,8 +37,8 @@ use slog_logview::LogViewDrain;
use sql_ext::MysqlOptions;

use crate::helpers::{
init_cachelib_from_settings, open_sql_with_config_and_mysql_options, setup_repo_dir,
CachelibSettings, CreateStorage,
create_runtime, init_cachelib_from_settings, open_sql_with_config_and_mysql_options,
setup_repo_dir, CachelibSettings, CreateStorage,
};
use crate::log;

Expand All @@ -52,6 +53,7 @@ const TARGET_REPO_NAME: &str = "target-repo-name";
const ENABLE_MCROUTER: &str = "enable-mcrouter";
const MYSQL_MYROUTER_PORT: &str = "myrouter-port";
const MYSQL_MASTER_ONLY: &str = "mysql-master-only";
const RUNTIME_THREADS: &str = "runtime-threads";

const CACHE_ARGS: &[(&str, &str)] = &[
("blob-cache-size", "override size of the blob cache"),
Expand Down Expand Up @@ -248,11 +250,21 @@ impl MononokeApp {
app = add_logger_args(app);
app = add_mysql_options_args(app);
app = add_cachelib_args(app, self.hide_advanced_args);
app = add_runtime_args(app);

app
}
}

pub fn add_runtime_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> {
app.arg(
Arg::with_name(RUNTIME_THREADS)
.long(RUNTIME_THREADS)
.takes_value(true)
.help("a number of threads to use in the tokio runtime"),
)
}

pub fn add_logger_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> {
app.arg(
Arg::with_name("panic-fate")
Expand Down Expand Up @@ -960,3 +972,9 @@ pub fn parse_disabled_hooks_no_repo_prefix(

disabled_hooks
}

/// Initialize a new `tokio::runtime::Runtime` with thread number parsed from the CLI
pub fn init_runtime(matches: &ArgMatches) -> io::Result<tokio::runtime::Runtime> {
let core_threads = get_usize_opt(matches, RUNTIME_THREADS);
create_runtime(None, core_threads)
}
15 changes: 11 additions & 4 deletions cmdlib/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,15 @@ where
}
}

pub fn create_runtime(log_thread_name_prefix: Option<&str>) -> io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new()
.name_prefix(log_thread_name_prefix.unwrap_or("tk-"))
.build()
/// Get a tokio `Runtime` with potentially explicitly set number of core threads
pub fn create_runtime(
log_thread_name_prefix: Option<&str>,
core_threads: Option<usize>,
) -> io::Result<tokio::runtime::Runtime> {
let mut builder = tokio::runtime::Builder::new();
builder.name_prefix(log_thread_name_prefix.unwrap_or("tk-"));
if let Some(core_threads) = core_threads {
builder.core_threads(core_threads);
}
builder.build()
}
4 changes: 2 additions & 2 deletions cmds/aliasverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::prelude::stream::iter_ok;
use blobrepo::BlobRepo;
use blobstore::Storable;
use changesets::SqlChangesets;
use cmdlib::{args, helpers::create_runtime};
use cmdlib::args;
use context::CoreContext;
use filestore::{self, Alias, AliasBlob, FetchKey};
use mononoke_types::{
Expand Down Expand Up @@ -369,7 +369,7 @@ fn main(fb: FacebookInit) -> Result<()> {
.verify_all(ctx, step, min_cs_db_id)
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
let result = runtime.block_on(aliasimport);
// Let the runtime finish remaining work - uploading logs etc
runtime.shutdown_on_idle();
Expand Down
4 changes: 2 additions & 2 deletions cmds/backfill_derived_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use changesets::{
};
use clap::{Arg, ArgMatches, SubCommand};
use cloned::cloned;
use cmdlib::{args, helpers, helpers::create_runtime, monitoring::start_fb303_and_stats_agg};
use cmdlib::{args, helpers, monitoring::start_fb303_and_stats_agg};
use context::CoreContext;
use dbbookmarks::SqlBookmarks;
use deleted_files_manifest::{RootDeletedManifestId, RootDeletedManifestMapping};
Expand Down Expand Up @@ -200,7 +200,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {

let logger = args::init_logging(fb, &matches);
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;

let run = match matches.subcommand() {
(SUBCOMMAND_BACKFILL, Some(sub_m)) => {
Expand Down
4 changes: 2 additions & 2 deletions cmds/blobimport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bonsai_globalrev_mapping::SqlBonsaiGlobalrevMapping;
use bytes::Bytes;
use clap::{App, Arg};
use cloned::cloned;
use cmdlib::{args, helpers::create_runtime, helpers::upload_and_show_trace};
use cmdlib::{args, helpers::upload_and_show_trace};
use context::CoreContext;
use failure_ext::SlogKVError;
use fbinit::FacebookInit;
Expand Down Expand Up @@ -318,7 +318,7 @@ fn main(fb: FacebookInit) -> Result<()> {
},
);

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
let result = runtime.block_on(blobimport);
// Let the runtime finish remaining work - uploading logs etc
runtime.shutdown_on_idle();
Expand Down
4 changes: 2 additions & 2 deletions cmds/blobstore_healer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use blobstore::Blobstore;
use blobstore_sync_queue::{BlobstoreSyncQueue, SqlBlobstoreSyncQueue, SqlConstructors};
use clap::{value_t, App};
use cloned::cloned;
use cmdlib::{args, helpers::create_runtime, monitoring};
use cmdlib::{args, monitoring};
use configerator::ConfigeratorAPI;
use context::CoreContext;
use dummy::{DummyBlobstore, DummyBlobstoreSyncQueue};
Expand Down Expand Up @@ -338,7 +338,7 @@ fn main(fb: FacebookInit) -> Result<()> {
}
};

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;

// Thread with a thrift service is now detached
monitoring::start_fb303_and_stats_agg(fb, &mut runtime, app_name, &logger, &matches)?;
Expand Down
4 changes: 2 additions & 2 deletions cmds/bonsai_verify/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use anyhow::{bail, format_err, Error, Result};
use blobrepo_utils::{BonsaiMFVerify, BonsaiMFVerifyResult};
use clap::{App, Arg, ArgMatches, SubCommand};
use cloned::cloned;
use cmdlib::{args, helpers::create_runtime};
use cmdlib::args;
use context::CoreContext;
use failure_ext::DisplayChain;
use fbinit::FacebookInit;
Expand Down Expand Up @@ -431,6 +431,6 @@ fn subcommmand_hg_manifest_verify(
})
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
runtime.block_on(run)
}
4 changes: 2 additions & 2 deletions cmds/lfs_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use anyhow::{Error, Result};
use bytes::Bytes;
use clap::Arg;
use cmdlib::{args, helpers::create_runtime};
use cmdlib::args;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{stream, Future, IntoFuture, Stream};
Expand Down Expand Up @@ -95,7 +95,7 @@ fn main(fb: FacebookInit) -> Result<()> {
.for_each(|_| Ok(()))
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
let result = runtime.block_on(import);
runtime.shutdown_on_idle();
result
Expand Down
4 changes: 2 additions & 2 deletions cmds/rechunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::stream::Stream;
use mercurial_types::{HgFileNodeId, HgNodeHash};
use std::str::FromStr;

use cmdlib::{args, helpers::create_runtime};
use cmdlib::args;

const NAME: &str = "rechunker";
const DEFAULT_NUM_JOBS: usize = 10;
Expand Down Expand Up @@ -89,7 +89,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
.for_each(|_| Ok(()))
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
let result = runtime.block_on(rechunk);
runtime.shutdown_on_idle();
result
Expand Down
4 changes: 2 additions & 2 deletions cmds/statistics_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bytes::Bytes;
use changesets::{deserialize_cs_entries, ChangesetEntry};
use clap::{App, Arg, SubCommand};
use cloned::cloned;
use cmdlib::{args, helpers::create_runtime, monitoring};
use cmdlib::{args, monitoring};
use context::CoreContext;
use fbinit::FacebookInit;
use futures::future;
Expand Down Expand Up @@ -628,7 +628,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
}
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;
monitoring::start_fb303_and_stats_agg(
fb,
&mut runtime,
Expand Down
4 changes: 2 additions & 2 deletions cmds/upload_globalrevs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bytes::Bytes;
use changesets::{deserialize_cs_entries, ChangesetEntry};
use clap::{App, Arg};
use cloned::cloned;
use cmdlib::{args, helpers::create_runtime};
use cmdlib::args;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::future::{Future, IntoFuture};
Expand Down Expand Up @@ -80,6 +80,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let globalrevs_store = args::open_sql::<SqlBonsaiGlobalrevMapping>(fb, &matches);

let mut runtime = args::init_runtime(&matches)?;
let run = args::open_repo(fb, &logger, &matches)
.join(globalrevs_store)
.and_then({
Expand All @@ -90,7 +91,6 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
}
});

let mut runtime = create_runtime(None)?;
runtime.block_on(run)?;
runtime.shutdown_on_idle();
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions hook_tailer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use blobrepo_factory::open_blobrepo;
use bookmarks::BookmarkName;
use clap::{App, Arg, ArgMatches};
use cloned::cloned;
use cmdlib::helpers::create_runtime;
use cmdlib::args::init_runtime;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::future::{err, ok, Future};
Expand Down Expand Up @@ -183,7 +183,7 @@ fn main(fb: FacebookInit) -> Result<()> {
}
});

let mut runtime = create_runtime(None)?;
let mut runtime = init_runtime(&matches)?;
runtime.block_on(fut)
}

Expand Down
4 changes: 2 additions & 2 deletions lfs_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio_openssl::SslAcceptorExt;

use blobrepo::BlobRepo;
use blobrepo_factory::open_blobrepo;
use cmdlib::{args, helpers::create_runtime, monitoring::start_fb303_server};
use cmdlib::{args, monitoring::start_fb303_server};
use failure_ext::chain::ChainExt;
use metaconfig_parser::RepoConfigs;
use stats::schedule_stats_aggregation;
Expand Down Expand Up @@ -310,7 +310,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
}
});

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;

let stats_aggregation = schedule_stats_aggregation()
.map_err(|_| Error::msg("Failed to create stats aggregation worker"))?
Expand Down
8 changes: 4 additions & 4 deletions pushrebase/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2988,7 +2988,7 @@ mod tests {

#[fbinit::test]
fn pushrebase_over_merge_even(fb: FacebookInit) -> Result<()> {
let mut runtime = create_runtime(None)?;
let mut runtime = create_runtime(None, None)?;
let ctx = CoreContext::test_mock(fb);
let repo = merge_even::getrepo(fb);

Expand Down Expand Up @@ -3065,7 +3065,7 @@ mod tests {

#[fbinit::test]
fn pushrebase_of_branch_merge(fb: FacebookInit) -> Result<()> {
let mut runtime = create_runtime(None)?;
let mut runtime = create_runtime(None, None)?;
let ctx = CoreContext::test_mock(fb);
let repo = blobrepo_factory::new_memblob_empty(None)?;

Expand Down Expand Up @@ -3194,7 +3194,7 @@ mod tests {

#[fbinit::test]
fn pushrebase_of_branch_merge_with_removal(fb: FacebookInit) -> Result<()> {
let mut runtime = create_runtime(None)?;
let mut runtime = create_runtime(None, None)?;
let ctx = CoreContext::test_mock(fb);
let repo = blobrepo_factory::new_memblob_empty(None)?;

Expand Down Expand Up @@ -3305,7 +3305,7 @@ mod tests {

#[fbinit::test]
fn pushrebase_of_branch_merge_with_rename(fb: FacebookInit) -> Result<()> {
let mut runtime = create_runtime(None)?;
let mut runtime = create_runtime(None, None)?;
let ctx = CoreContext::test_mock(fb);
let repo = blobrepo_factory::new_memblob_empty(None)?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main(fb: FacebookInit) {
let stats_aggregation = stats::schedule_stats_aggregation()
.expect("failed to create stats aggregation scheduler");

let mut runtime = cmdlib::helpers::create_runtime(None)?;
let mut runtime = cmdlib::args::init_runtime(&matches)?;

let config = get_config(fb, &matches)?;
let cert = matches.value_of("cert").unwrap().to_string();
Expand Down
4 changes: 2 additions & 2 deletions walker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use fbinit::FacebookInit;
use futures::IntoFuture;
use futures_ext::FutureExt;

use cmdlib::{args, helpers::create_runtime, monitoring};
use cmdlib::{args, monitoring};
use slog::{error, info};

mod blobstore;
Expand Down Expand Up @@ -46,7 +46,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
.boxify(),
};

let mut runtime = create_runtime(None)?;
let mut runtime = args::init_runtime(&matches)?;

monitoring::start_fb303_and_stats_agg(fb, &mut runtime, app_name, &logger, &matches)?;
let res = runtime.block_on(future);
Expand Down

0 comments on commit 4a18d42

Please sign in to comment.