Skip to content

Commit

Permalink
Adding input param of num_cores to ThreadPool::new()
Browse files Browse the repository at this point in the history
  • Loading branch information
robinbernon committed Mar 7, 2021
1 parent fb9ee3b commit ce88de4
Show file tree
Hide file tree
Showing 30 changed files with 70 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
**/*.rs.bk
Cargo.lock
/.idea
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct LogLine {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down Expand Up @@ -133,7 +133,7 @@ use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;

let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down Expand Up @@ -169,7 +169,7 @@ use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None)?;
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
Expand Down
2 changes: 1 addition & 1 deletion benches/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[derive(Data, Clone, Deserialize, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
2 changes: 1 addition & 1 deletion benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[bench]
fn vec(b: &mut Bencher) {
Expand Down
2 changes: 1 addition & 1 deletion benches/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None, None).unwrap());

#[derive(Data, Clone, PartialEq, Debug)]
struct TenKayVeeTwo {
Expand Down
2 changes: 1 addition & 1 deletion examples/cloudfront_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use amadeus::prelude::*;
#[allow(unreachable_code)]
#[tokio::main]
async fn main() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Cloudfront::new_with(
AwsRegion::UsEast1,
Expand Down
2 changes: 1 addition & 1 deletion examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use amadeus::prelude::*;

#[tokio::main]
async fn main() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Cloudfront::new_with(
AwsRegion::UsEast1,
Expand Down
2 changes: 1 addition & 1 deletion examples/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use amadeus::{data::Webpage, dist::prelude::*};
async fn main() {
return; // TODO: runs for a long time

let pool = ThreadPool::new(None).unwrap();
let pool = ThreadPool::new(None, None).unwrap();

let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/pool/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl ProcessPoolInner {
let receiver = Receiver::<Option<Request>>::new(parent);
let sender = Sender::<Result<Response, Panicked>>::new(parent);

let thread_pool = ThreadPool::new(tasks_per_core).unwrap();
let thread_pool = ThreadPool::new(tasks_per_core, None).unwrap();

while let Some(work) = receiver.recv().await.unwrap() {
let ret = panic::catch_unwind(panic::AssertUnwindSafe(|| {
Expand Down
23 changes: 15 additions & 8 deletions src/pool/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@ struct ThreadPoolInner {
#[derive(Debug)]
pub struct ThreadPool(Arc<ThreadPoolInner>);
impl ThreadPool {
pub fn new(tasks_per_core: Option<usize>) -> io::Result<Self> {
let logical_cores = if !cfg!(target_arch = "wasm32") {
num_cpus::get()
} else {
1
};
let tasks_per_core = tasks_per_core.unwrap_or(DEFAULT_TASKS_PER_CORE);
#[cfg(not(target_arch = "wasm32"))]
pub fn new(tasks_per_core: Option<usize>, num_cores: Option<usize>) -> io::Result<Self> {
let logical_cores = if let Some(cores) = num_cores {
cores
} else if !cfg!(target_arch = "wasm32") {
num_cpus::get()
} else {
1
};

let tasks_per_core = tasks_per_core.unwrap_or(DEFAULT_TASKS_PER_CORE);

println!("Logical cores: {}", logical_cores);
println!("Tasks per core: {}", tasks_per_core);

#[cfg(not(target_arch = "wasm32"))]
let pool = Pool::new(logical_cores);
Ok(ThreadPool(Arc::new(ThreadPoolInner {
logical_cores,
Expand Down
2 changes: 1 addition & 1 deletion tests/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::SystemTime;
#[tokio::test(threaded_scheduler)]
#[cfg_attr(miri, ignore)]
async fn cloudfront() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let start = SystemTime::now();

Expand Down
2 changes: 1 addition & 1 deletion tests/cloudfront_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/commoncrawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use amadeus::{data::Webpage, prelude::*};
async fn commoncrawl() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();
let _ = webpages
Expand Down
2 changes: 1 addition & 1 deletion tests/commoncrawl_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use amadeus::prelude::*;
async fn csv() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
2 changes: 1 addition & 1 deletion tests/csv_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/csv_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn csv() {
let timer = web_sys::window().unwrap().performance().unwrap();
let start = timer.now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand Down
2 changes: 1 addition & 1 deletion tests/into_par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use amadeus::prelude::*;
#[tokio::test(threaded_scheduler)]
#[cfg_attr(miri, ignore)]
async fn into_par_stream() {
let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

<&[usize] as IntoParallelStream>::into_par_stream(&[1, 2, 3])
.map(|a: usize| a)
Expand Down
2 changes: 1 addition & 1 deletion tests/into_par_stream_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use amadeus::prelude::*;
async fn json() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();
let tasks = 100;

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion tests/json_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool, 100).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use amadeus::prelude::*;
async fn panic() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let res = AssertUnwindSafe((0i32..1_000).into_par_stream().for_each(pool, |i| {
if i == 500 {
Expand Down
2 changes: 1 addition & 1 deletion tests/panic_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use amadeus::prelude::*;
async fn parquet() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Parquet::<_, Value>::new(vec![
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
Expand Down
2 changes: 1 addition & 1 deletion tests/parquet_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
2 changes: 1 addition & 1 deletion tests/parquet_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn parquet() {
let timer = web_sys::window().unwrap().performance().unwrap();
let start = timer.now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

let rows = Parquet::<_, Value>::new(vec![
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use amadeus::prelude::*;
async fn postgres() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct Weather {
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool).await
};
#[cfg(feature = "constellation")]
Expand Down
27 changes: 25 additions & 2 deletions tests/threads.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use futures::future::join_all;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{
convert::TryInto, time::{Duration, SystemTime}
convert::TryInto, time::{Duration, SystemTime},
collections::HashSet,
};
use tokio::time::delay_for as sleep;

Expand All @@ -12,7 +13,7 @@ use amadeus::dist::prelude::*;
async fn threads() {
let start = SystemTime::now();

let pool = &ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None, None).unwrap();
let parallel = 1000;

join_all((0..parallel).map(|i| async move {
Expand All @@ -29,3 +30,25 @@ async fn threads() {

println!("in {:?}", start.elapsed().unwrap());
}

#[tokio::test(threaded_scheduler)]
#[cfg_attr(miri, ignore)]
async fn user_set_core_count() {
let num_cores = 4;

let pool = &ThreadPool::new(Some(1), Some(num_cores)).unwrap();
let parallel = 1000;

let ret = join_all((0..parallel).map(|_| async move {
pool
.spawn(move || async move {
format!("{:?}", std::thread::current().id())
})
.await.unwrap()
})).await;

let unique_thread_ids: HashSet<String> = ret.into_iter().collect();

println!("Number of cores used: {}", unique_thread_ids.len());
assert_eq!(unique_thread_ids.len(), num_cores);
}
2 changes: 1 addition & 1 deletion tests/threads_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {
.unwrap()
.block_on(async {
let thread_pool_time = {
let thread_pool = ThreadPool::new(None).unwrap();
let thread_pool = ThreadPool::new(None, None).unwrap();
run(&thread_pool, 1000).await
};
#[cfg(feature = "constellation")]
Expand Down

0 comments on commit ce88de4

Please sign in to comment.