Skip to content

Commit

Permalink
Merge pull request #1376 from stan-dev/feature/intel-tbb-init
Browse files Browse the repository at this point in the history
integrate Intel TBB. Closes #1377
  • Loading branch information
wds15 authored Oct 10, 2019
2 parents b9ed18c + fd680d3 commit 731e9a5
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 156 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ These are distributed under the `lib/` subdirectory. Only these versions of the

Installation
------------
The Stan Math Library is largely a header-only C++ library, with
exceptions for the Sundials code.
The Stan Math Library is a C++ library which depends on the Intel TBB
library and requires for some functionality (ordinary differential
equations and root solving) on the Sundials library.

A simple hello world program using Stan Math is as follows:

Expand All @@ -45,7 +46,7 @@ If this is in the file `/path/to/foo/foo.cpp`, then you can compile and run this

```
> cd /path/to/foo
> clang++ -std=c++1y -I /path/to/stan-math -I /path/to/Eigen -I /path/to/boost -I /path/to/sundials -I /path/to/tbb -D_REENTRANT foo.cpp
> clang++ -std=c++1y -I /path/to/stan-math -I /path/to/Eigen -I /path/to/boost -I /path/to/sundials -I /path/to/tbb -L /path/to/tbb-libs -ltbb -D_REENTRANT foo.cpp
> ./a.out
log normal(1 | 2, 3)=-2.07311
```
Expand All @@ -61,10 +62,10 @@ The `-I` includes provide paths pointing to the five necessary includes:
Note that the paths should *not* include the final directories `stan`, `Eigen`, or `boost` on the paths. An example of a real instantiation:

```
clang++ -std=c++1y -I ~/stan-dev/math -I ~/stan-dev/math/lib/eigen_3.3.3/ -I ~/stan-dev/math/lib/boost_1.69.0/ -I ~/stan-dev/math/lib/sundials_4.1.0/include -I ~/stan-dev/math/lib/tbb_2019_U8/include -D_REENTRANT foo.cpp
clang++ -std=c++1y -I ~/stan-dev/math -I ~/stan-dev/math/lib/eigen_3.3.3/ -I ~/stan-dev/math/lib/boost_1.69.0/ -I ~/stan-dev/math/lib/sundials_4.1.0/include -I ~/stan-dev/math/lib/tbb_2019_U8/include -L ~/stan-dev/math/lib/tbb -ltbb -Wl,-rpath,"~/stan-dev/math/lib/tbb" -D_REENTRANT foo.cpp
```

The following directories all exist below the links given to `-I`: `~/stan-dev/math/stan` and `~/stan-dev/math/lib/eigen_3.3.3/Eigen` and `~stan-dev/math/lib/boost_1.69.0/boost` and `~stan-dev/math/lib/sundials_4.1.0/include` and `~/stan-dev/math/lib/tbb_2019_U8/include`.
The following directories all exist below the links given to `-I`: `~/stan-dev/math/stan` and `~/stan-dev/math/lib/eigen_3.3.3/Eigen` and `~/stan-dev/math/lib/boost_1.69.0/boost` and `~/stan-dev/math/lib/sundials_4.1.0/include` and `~/stan-dev/math/lib/tbb_2019_U8/include`. The `~/stan-dev/math/lib/tbb` directory is created by the stan-math makefiles automatically when running any unit test (for example with `./runTests.py test/unit/math/rev/core/agrad_test.cpp`). The `-Wl,-rpath,...` instruct the linker to hard-code the path to the Intel TBB library inside the stan-math directory into the final binary. This way the Intel TBB is found when executing the program.

Other Compilers
---------------
Expand Down
6 changes: 6 additions & 0 deletions stan/math/prim/core.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#ifndef STAN_MATH_PRIM_CORE_HPP
#define STAN_MATH_PRIM_CORE_HPP

#include <stan/math/prim/core/init_threadpool_tbb.hpp>

#endif
95 changes: 95 additions & 0 deletions stan/math/prim/core/init_threadpool_tbb.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#ifndef STAN_MATH_PRIM_CORE_INIT_THREADPOOL_TBB_HPP
#define STAN_MATH_PRIM_CORE_INIT_THREADPOOL_TBB_HPP

#include <stan/math/prim/scal/err/invalid_argument.hpp>

#include <boost/lexical_cast.hpp>

#include <tbb/task_scheduler_init.h>

#include <cstdlib>
#include <thread>

namespace stan {
namespace math {
namespace internal {

/**
* Get number of threads to use. The function uses the environment
* variable STAN_NUM_THREADS and follows these conventions:
*
* - STAN_NUM_THREADS is not defined => num_threads=1
* - STAN_NUM_THREADS is positive => num_threads is set to the
* specified number
* - STAN_NUM_THREADS is set to -1 => num_threads is the number of
* available cores on the machine
* - STAN_NUM_THREADS < -1, STAN_NUM_THREADS = 0 or STAN_NUM_THREADS is
* not numeric => throws an exception
*
* @return number of threads to use
* @throws std::invalid_argument if the value of STAN_NUM_THREADS env. variable
* is invalid
*/
inline int get_num_threads() {
int num_threads = 1;
const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
if (env_stan_num_threads != nullptr) {
try {
const int env_num_threads
= boost::lexical_cast<int>(env_stan_num_threads);
if (env_num_threads > 0) {
num_threads = env_num_threads;
} else if (env_num_threads == -1) {
num_threads = std::thread::hardware_concurrency();
} else {
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be positive or -1");
}
} catch (boost::bad_lexical_cast) {
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be a positive number or -1");
}
}
return num_threads;
}

} // namespace internal

/**
* Initialize the Intel TBB threadpool and global scheduler through
* the tbb::task_scheduler_init object. In case an instance of the
* tbb::task_scheduler_object has been instantiated prior to calling
* this function, then any subsequent initialization is ignored by the
* Intel TBB.
*
* The maximal number of threads is read from the environment variable
* STAN_NUM_THREADS using internal::get_num_threads. See conventions
* of get_num_threads. The TBB scheduler will be activated by calling
* this function.
*
* The function returns a reference to the static
* tbb::task_scheduler_init instance.
*
* @param stack_size sets the stack size of each thread; the default 0
* let's the TBB choose the stack size
* @return reference to the static tbb::task_scheduler_init
* @throws std::runtime_error if the value of STAN_NUM_THREADS env. variable
* is invalid
*/
inline tbb::task_scheduler_init& init_threadpool_tbb(
tbb::stack_size_type stack_size = 0) {
int tbb_max_threads = internal::get_num_threads();

static tbb::task_scheduler_init tbb_scheduler(tbb_max_threads, stack_size);

return tbb_scheduler;
}

} // namespace math
} // namespace stan

#endif
2 changes: 2 additions & 0 deletions stan/math/prim/mat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define STAN_MATH_PRIM_MAT_HPP

#include <stan/math/prim/mat/fun/Eigen.hpp>

#include <stan/math/prim/core.hpp>
#include <stan/math/prim/meta.hpp>

#include <stan/math/prim/mat/err/check_cholesky_factor.hpp>
Expand Down
56 changes: 0 additions & 56 deletions stan/math/prim/mat/functor/map_rect_concurrent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,12 @@
#include <stan/math/prim/meta.hpp>
#include <stan/math/prim/mat/fun/typedefs.hpp>

#include <stan/math/prim/scal/err/invalid_argument.hpp>
#include <boost/lexical_cast.hpp>

#include <cstdlib>
#include <vector>
#include <thread>

namespace stan {
namespace math {
namespace internal {

/**
* Get number of threads to use for num_jobs jobs. The function uses
* the environment variable STAN_NUM_THREADS and follows these
* conventions:
*
* - STAN_NUM_THREADS is not defined => num_threads=1
* - STAN_NUM_THREADS is positive => num_threads is set to the
* specified number
* - STAN_NUM_THREADS is set to -1 => num_threads is the number of
* available cores on the machine
* - STAN_NUM_THREADS < -1, STAN_NUM_THREADS = 0 or STAN_NUM_THREADS is
* not numeric => throws an exception
*
* Should num_threads exceed the number of jobs, then num_threads will
* be set equal to the number of jobs.
*
* @param num_jobs number of jobs
* @return number of threads to use
* @throws std::runtime_error if the value of STAN_NUM_THREADS env. variable
* is invalid
*/
inline int get_num_threads(int num_jobs) {
int num_threads = 1;
#ifdef STAN_THREADS
const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
if (env_stan_num_threads != nullptr) {
try {
const int env_num_threads
= boost::lexical_cast<int>(env_stan_num_threads);
if (env_num_threads > 0)
num_threads = env_num_threads;
else if (env_num_threads == -1)
num_threads = std::thread::hardware_concurrency();
else
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be positive or -1");
} catch (boost::bad_lexical_cast) {
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be a positive number or -1");
}
}
if (num_threads > num_jobs)
num_threads = num_jobs;
#endif
return num_threads;
}

template <int call_id, typename F, typename T_shared_param,
typename T_job_param>
Eigen::Matrix<return_type_t<T_shared_param, T_job_param>, Eigen::Dynamic, 1>
Expand Down
59 changes: 55 additions & 4 deletions stan/math/rev/core/init_chainablestack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,67 @@

#include <stan/math/rev/core/chainablestack.hpp>

#include <tbb/task_scheduler_observer.h>

#include <mutex>
#include <unordered_map>
#include <utility>
#include <thread>
#include <tuple>

namespace stan {
namespace math {
namespace {

/**
* Initializes the AD stack for the main process. See
* autodiffstackstorage.hpp for more explanations.
* TBB observer object which is a callback hook called whenever the
* TBB scheduler adds a new thread to the TBB managed threadpool. This
* hook ensures that each worker thread has an initialized AD tape
* ready for use.
*
* Refer to https://software.intel.com/en-us/node/506314 for details
* on the observer concept.
*/
ChainableStack global_stack_instance_init;
class ad_tape_observer : public tbb::task_scheduler_observer {
using stack_ptr = std::unique_ptr<ChainableStack>;
using ad_map = std::unordered_map<std::thread::id, stack_ptr>;

public:
ad_tape_observer() : tbb::task_scheduler_observer(), thread_tape_map_() {
on_scheduler_entry(true); // register current process
observe(true); // activates the observer
}

void on_scheduler_entry(bool worker) {
std::lock_guard<std::mutex> thread_tape_map_lock(thread_tape_map_mutex_);
const std::thread::id thread_id = std::this_thread::get_id();
if (thread_tape_map_.find(thread_id) == thread_tape_map_.end()) {
ad_map::iterator insert_elem;
bool status = false;
std::tie(insert_elem, status)
= thread_tape_map_.emplace(ad_map::value_type{thread_id, nullptr});
insert_elem->second = stack_ptr(new ChainableStack());
}
}

void on_scheduler_exit(bool worker) {
std::lock_guard<std::mutex> thread_tape_map_lock(thread_tape_map_mutex_);
auto elem = thread_tape_map_.find(std::this_thread::get_id());
if (elem != thread_tape_map_.end()) {
thread_tape_map_.erase(elem);
}
}

private:
ad_map thread_tape_map_;
std::mutex thread_tape_map_mutex_;
};

namespace {

ad_tape_observer global_observer;

} // namespace
} // namespace math
} // namespace stan

#endif
78 changes: 25 additions & 53 deletions stan/math/rev/mat/functor/map_rect_concurrent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#include <stan/math/prim/mat/functor/map_rect_combine.hpp>
#include <stan/math/rev/core/chainablestack.hpp>

#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>

#include <algorithm>
#include <vector>
#include <thread>
#include <future>

namespace stan {
namespace math {
Expand All @@ -31,68 +33,38 @@ map_rect_concurrent(

const int num_jobs = job_params.size();
const vector_d shared_params_dbl = value_of(shared_params);
std::vector<std::future<std::vector<matrix_d>>> futures;

auto execute_chunk = [&](int start, int size) -> std::vector<matrix_d> {
#ifdef STAN_THREADS
ChainableStack thread_stack_instance;
#endif
const int end = start + size;
std::vector<matrix_d> chunk_f_out;
chunk_f_out.reserve(size);
for (int i = start; i != end; i++) {
chunk_f_out.push_back(ReduceF()(
shared_params_dbl, value_of(job_params[i]), x_r[i], x_i[i], msgs));
std::vector<matrix_d> job_output(num_jobs);
std::vector<int> world_f_out(num_jobs, 0);

auto execute_chunk = [&](std::size_t start, std::size_t end) -> void {
for (std::size_t i = start; i != end; ++i) {
job_output[i] = ReduceF()(shared_params_dbl, value_of(job_params[i]),
x_r[i], x_i[i], msgs);
world_f_out[i] = job_output[i].cols();
}
return chunk_f_out;
};

int num_threads = get_num_threads(num_jobs);
int num_jobs_per_thread = num_jobs / num_threads;
futures.emplace_back(
std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));

#ifdef STAN_THREADS
if (num_threads > 1) {
const int num_big_threads = num_jobs % num_threads;
const int first_big_thread = num_threads - num_big_threads;
for (int i = 1, job_start = num_jobs_per_thread, job_size = 0;
i < num_threads; ++i, job_start += job_size) {
job_size = i >= first_big_thread ? num_jobs_per_thread + 1
: num_jobs_per_thread;
futures.emplace_back(
std::async(std::launch::async, execute_chunk, job_start, job_size));
}
}
tbb::parallel_for(tbb::blocked_range<std::size_t>(0, num_jobs),
[&](const tbb::blocked_range<size_t>& r) {
execute_chunk(r.begin(), r.end());
});
#else
execute_chunk(0, num_jobs);
#endif

// collect results
std::vector<int> world_f_out;
world_f_out.reserve(num_jobs);
matrix_d world_output(0, 0);
const int num_world_output
= std::accumulate(world_f_out.begin(), world_f_out.end(), 0);
matrix_d world_output(job_output[0].rows(), num_world_output);

int offset = 0;
for (std::size_t i = 0; i < futures.size(); ++i) {
const std::vector<matrix_d>& chunk_result = futures[i].get();
if (i == 0) {
world_output.resize(chunk_result[0].rows(),
num_jobs * chunk_result[0].cols());
}

for (const auto& job_result : chunk_result) {
const int num_job_outputs = job_result.cols();
world_f_out.push_back(num_job_outputs);
for (const auto& job : job_output) {
const int num_job_outputs = job.cols();

if (world_output.cols() < offset + num_job_outputs) {
world_output.conservativeResize(Eigen::NoChange,
2 * (offset + num_job_outputs));
}
world_output.block(0, offset, world_output.rows(), num_job_outputs) = job;

world_output.block(0, offset, world_output.rows(), num_job_outputs)
= job_result;

offset += num_job_outputs;
}
offset += num_job_outputs;
}
CombineF combine(shared_params, job_params);
return combine(world_output, world_f_out);
Expand Down
Loading

0 comments on commit 731e9a5

Please sign in to comment.