Skip to content

Threading Model

Andrew Lumsdaine edited this page Mar 16, 2022 · 15 revisions

Overview

TileDB manages parallel execution through the use of static thread pools.

TileDB allocates two thread pools per context: a compute thread pool and an IO thread pool. As the names suggest, the compute thread pool is used for executing compute-bound tasks while the IO thread pool is used for executing IO-bound tasks. By default, each thread pool has a concurrency level equal to the number of hardware threads. For example: on a system with 12 hardware threads, both thread pools would have a concurrency level of 12.

The motivation for this configuration is to ensure that each hardware thread has one TileDB software thread with CPU-heavy work. We keep the TileDB software threads for IO-bound tasks in their own thread pool so that the programmer does not need to worry about overloading a single shared thread pool with IO-bound tasks that may block pending CPU-bound tasks from performing useful work while the scheduled IO-bound tasks are idle-waiting.

The default concurrency level for each thread pool can be modified with the following configuration parameters:
"sm.compute_concurrency_level"
"sm.io_concurrency_level"

We use the term "concurrency level" so that we have the flexibility to adjust the number of allocated software threads in our thread pool implementation independent of the user configuration. As of TileDB 2.9, a concurrency level of N allocates N software threads.

Thread Pool Usage

Launching a Task

Tasks are launched on the thread pool using async() or (equivalently) execute(). The prototypes for async and execute are:

  template <class Fn, class... Args>
  ThreadPool::Task ThreadPool::async(Fn&& f, Args&&... args);

  template <class Fn, class... Args>
  ThreadPool::Task ThreadPool::execute(Fn&& f, Args&&... args);

The interface is similar to std::async() -- one can pass a function to be executed, along with its argument. The return type is a ThreadPool::Task, which is currently aliased to std::future<Status>. Although one can invoke wait() or get() on such a future, it is strongly recommended that ThreadPool::wait_all() be used instead. (future::wait() and future::get() are blocking calls whose use can lead to starvation deadlock if they are invoked by thread pool threads -- ThreadPool::wait_all() will not deadlock -- this is discussed in more detail below.)

Bulk Synchronous Usage

The following example illustrates typical bulk synchronous usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes two tasks on the thread pool.
  • The calling thread synchronously waits for both tasks to complete.
void foo() {
  ThreadPool tp;
  tp.init(2);

  std::vector<ThreadPool::Task> tasks;

  std::atomic<int> my_int;

  tasks.emplace_back(tp.execute([]() {
    my_int += 1;
  }));

  tasks.emplace_back(tp.execute([]() {
    my_int += 2;
  }));

  tp.wait_all(tasks);

  assert(my_int == 3);

  std::cout << "DONE" << std::endl;
}

The above snippet is valid for any non-zero concurrency level. By initializing the thread pool with a concurrency level of 2, the thread pool allows for up to two tasks to execute concurrently.

Asynchronous Usage

The following example illustrates typical asynchronous usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes two tasks on the thread pool, where each task asynchronously invokes a callback bar.
  • The calling thread does not wait for the tasks to complete. Instead, the completed state is reached when the last callback is executed.
std::atomic<int> g_my_int;

void bar(int my_int) {
  int prev = g_my_int.fetch_add(my_int);
  if (prev + my_int == 3)
    std::cout << "DONE" << std::endl;
}

void foo() {
  ThreadPool tp;
  tp.init(2);

  tp.execute([]() {
    bar(1);
  });

  tp.execute([]() {
    bar(2);
  });

  // Assume `tp` remains in-scope until both tasks have completed.
}

Note that in the implementation of ThreadPool, the threads comprising the thread pool are spawned in the constructor and joined in the destructor. Thus, the destructor will block while waiting for its threads to join. In the example above, this could happen if the tp goes out of scope before the tasks have completed.

Recursive Usage (Single Instance)

When used under prescribed conditions, the ThreadPool supports arbitrary recursive usage. We use the term "arbitrary" to indicate that the programmer does not need to worry about deadlock if the total number of scheduled tasks exceeds the concurrency level. By "prescribed conditions" we mean that

  • Thread pool threads do not make any blocking calls (in particular, that they do not make any calls to futures from the same thread pool)
  • Results from thread pool tasks are waited on using ThreadPool::wait_all()

The following example illustrates arbitrary recursive usage. The code snippet does the following:

  • The calling thread creates a thread pool with a concurrency level of 2.
  • The calling thread executes Task #1 and waits.
  • Task #1 executes Task #2 and waits.
  • Task #2 executes Task #3 and waits.
  • Task #3 executes.
void foo() {
  ThreadPool tp;
  tp.init(2);

  bool done = false;
  std::vector<ThreadPool::Task> task_1;
  task_1.emplace_back(tp.execute([&]() {
    std::vector<ThreadPool::Task> task_2;
    task_2.emplace_back(tp.execute([&]() {
      std::vector<ThreadPool::Task> task_3;
      task_3.emplace_back(tp.execute([&]() {
        done = true;
      }));
      tp.wait_all(task_3);
    }));
    tp.wait_all(task_2);
  }));
  tp.wait_all(task_1);

  assert(done == true);
}

It is important to note the use of ThreadPool::wait_all in this example. If we had instead called wait() or get() on the tasks returned by execute(), the example would likely deadlock because two threads could be blocking on wait() or get(), leaving no threads to actually make progress to complete those tasks. In contract, the implementation of ThreadPool::wait_all will make progress on outstanding tasks rather than blocking, thereby preventing deadlock.

Recursive Usage (Multiple Instances)

In the same way, ThreadPool allows arbitrary recursive usage among multiple thread pool instances. This is important because the programmer does not need to worry about how the tasks on the compute thread pool and IO thread pool are nested.

The following example illustrates arbitrary recursive usage. The code snippet does the following:

  • The calling thread creates two thread pools with a concurrency level of 2.
  • Tasks #1 - Task #5 are executed recursively among the two thread pool instances.
void foo() {
  ThreadPool tp_a;
  ThreadPool tp_b;
  tp_a.init(2);
  tp_b.init(2);

  bool done = false;
  std::vector<ThreadPool::Task> task_1;
  task_1.emplace_back(tp_a.execute([&]() {
    std::vector<ThreadPool::Task> task_2;
    task_2.emplace_back(tp_b.execute([&]() {
      std::vector<ThreadPool::Task> task_3;
      task_3.emplace_back(tp_a.execute([&]() {
        std::vector<ThreadPool::Task> task_4;
        task_4.emplace_back(tp_b.execute([&]() {
          std::vector<ThreadPool::Task> task_5;
          task_5.emplace_back(tp_a.execute([&]() {
            done = true;
          }));
          tp_a.wait_all(task_4);
        }));
        tp_b.wait_all(task_4);
      }));
      tp_a.wait_all(task_3);
    }));
    tp_b.wait_all(task_2);
  }));
  tp_a.wait_all(task_1);

  assert(done == true);
}

Parallel Routines

For convenience, TileDB provides 3 high-level parallel routines: parallel_sort, parallel_for, and parallel_for_2d. Each of these routines operates on a single ThreadPool instance, determined by the caller. All routines block until complete.

parallel_sort

/**
 * Sort the given iterator range, possibly in parallel.
 *
 * @tparam IterT Iterator type
 * @tparam CmpT Comparator type
 * @param tp The threadpool to use.
 * @param begin Beginning of range to sort (inclusive).
 * @param end End of range to sort (exclusive).
 * @param cmp Comparator.
 */
template <
    typename IterT,
    typename CmpT = std::less<typename std::iterator_traits<IterT>::value_type>>
void parallel_sort(
    ThreadPool* const tp, IterT begin, IterT end, const CmpT& cmp = CmpT());

The parallel_sort routine implements a semi-parallel variant of a traditional quick sort. The in-code documentation provides the following high-level overview of algorithm:

// Sort the range using a quicksort. The algorithm is:
// 1. Pick a pivot value in the range.
// 2. Re-order the range so that all values less than the
//    pivot value are ordered left of the pivot's index.
// 3. Recursively invoke step 1.
//
// To parallelize the algorithm, step #3 is modified to execute
// the recursion on the thread pool.

Despite the verbose templating in the interface, using this routine is relatively simple if a template specialization of T exists for the default std::less comparator.

For example, the following snippet sorts an std::vector<std::string>:

std::vector<std::string> my_strings = {
  "b",
  "d",
  "c",
  "a"
};

ThreadPool tp;
tp.init(4);
parallel_sort(&tp, my_strings.begin(), my_strings.end());

The second template type, CmpT may be given for custom data types or to use an alternative comparator for built-in types (e.g. std::greater).

parallel_for

/**
 * Call the given function on each element in the given iterator range.
 *
 * @tparam IterT Iterator type
 * @tparam FuncT Function type (returning Status).
 * @param tp The threadpool to use.
 * @param begin Beginning of range (inclusive).
 * @param end End of range (exclusive).
 * @param F Function to call on each item
 * @return Vector of Status objects, one for each function invocation.
 */
template <typename FuncT>
std::vector<Status> parallel_for(
    ThreadPool* const tp, uint64_t begin, uint64_t end, const FuncT& F);

The parallel_for routine exists for executing a traditional for-loop where each iteration may execute concurrently and out-of-order. The work routine for each iteration must be thread safe. To maximize CPU utilization by minimizing context switching among threads, the routine may partition the work into a number of tasks smaller than the total number of iterations.

The following example demonstrates how this routine may be used to create+write+close three files in parallel.

std::vector<std::string> file_names = {
  "a.txt",
  "b.txt",
  "c.txt"
};

ThreadPool tp;
tp.init(4);
parallel_for(&tp, 0, file_names.size(), [&](uint64_t i) {
  const std::string& file_name = file_names[i];
  create_file(file_name);
  write_file(file_name);
  close_file(file_name);
});

parallel_for_2d

/**
 * Call the given function on every pair (i, j) in the given i and j ranges,
 * possibly in parallel.
 *
 * @tparam FuncT Function type (returning Status).
 * @param tp The threadpool to use.
 * @param i0 Inclusive start of outer (rows) range.
 * @param i1 Exclusive end of outer range.
 * @param j0 Inclusive start of inner (cols) range.
 * @param j1 Exclusive end of inner range.
 * @param F Function to call on each (i, j) pair.
 * @return Vector of Status objects, one for each function invocation.
 */
template <typename FuncT>
std::vector<Status> parallel_for_2d(
    ThreadPool* const tp,
    uint64_t i0,
    uint64_t i1,
    uint64_t j0,
    uint64_t j1,
    const FuncT& F);

The parallel_for_2d is similar to parallel_for, except that it operates on two sets of index ranges. To maximize CPU utilization by minimizing context switching among threads, the routine may partition the work into a number of tasks smaller than the total number of iterations.

The following demonstrates how the routine may be used to generate all 2-element permutations from a single vector.

std::vector<std::string> elements = {
  "a",
  "b",
  "c",
  "d",
  "e"
};

std::vector<std::string> permutations;
std::mutex permutations_lock;

ThreadPool tp;
tp.init(4);
parallel_for(&tp, 0, elements.size(), 0, elements.size(), [&](uint64_t i, uint64_t j) {
  if (i != j) {
    permutations_lock.lock();
    permutations.emplace_back(elements[i] + elements[j]);
    permutations_lock.unlock();
  }
});