Skip to content

Commit

Permalink
[python, C++] Re-indexer context thread pool
Browse files Browse the repository at this point in the history
Each C++ SOMAContext has it's own lazily created thread pool
  • Loading branch information
beroy committed Mar 1, 2024
1 parent 84f0815 commit d068793
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 62 deletions.
2 changes: 1 addition & 1 deletion apis/python/src/tiledbsoma/_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def axis_query( # type: ignore
obs_query=obs_query or query.AxisQuery(),
var_query=var_query or query.AxisQuery(),
index_factory=functools.partial(
tiledbsoma_build_index, context=self.context
tiledbsoma_build_index, context=self.context.native_context
),
)
10 changes: 2 additions & 8 deletions apis/python/src/tiledbsoma/_index_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ def tiledbsoma_build_index(
Lifecycle:
Experimental.
"""
if context is not None:
tdb_concurrency = int(
context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10)
)
thread_count = max(1, tdb_concurrency // 2)

reindexer = clib.IntIndexer()
reindexer.map_locations(keys, thread_count)
reindexer = clib.IntIndexer(context)
reindexer.map_locations(keys)
return reindexer # type: ignore[no-any-return]
8 changes: 6 additions & 2 deletions apis/python/src/tiledbsoma/_read_iters.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ def __init__(

# build indexers, as needed
self.axes_to_reindex = set(range(self.ndim)) - set(self.reindex_disable_on_axis)
assert context is not None
self.minor_axes_indexer = {
d: tiledbsoma_build_index(self.joinids[d].to_numpy(), context=context)
d: tiledbsoma_build_index(
self.joinids[d].to_numpy(), context=context.native_context
)
for d in (self.axes_to_reindex - set((self.major_axis,)))
}

Expand Down Expand Up @@ -251,8 +254,9 @@ def _reindexed_table_reader(
col = tbl.column(f"soma_dim_{d}")
if d in self.axes_to_reindex:
if d == self.major_axis:
assert self.context is not None
col = tiledbsoma_build_index(
coords[self.major_axis], context=self.context
coords[self.major_axis], context=self.context.native_context
).get_indexer(
col.to_numpy(),
)
Expand Down
8 changes: 3 additions & 5 deletions apis/python/src/tiledbsoma/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,14 @@ void load_reindexer(py::module& m) {
// between 0 and number of keys - 1) based on khash
py::class_<IntIndexer>(m, "IntIndexer")
.def(py::init<>())
.def(py::init<std::vector<int64_t>&, int>())
.def(py::init<std::shared_ptr<SOMAContext>>())
.def(
"map_locations",
[](IntIndexer& indexer,
py::array_t<int64_t> keys,
int num_threads) {
[](IntIndexer& indexer, py::array_t<int64_t> keys) {
auto buffer = keys.request();
int64_t* data = static_cast<int64_t*>(buffer.ptr);
size_t length = buffer.shape[0];
indexer.map_locations(keys.data(), keys.size(), num_threads);
indexer.map_locations(keys.data(), keys.size());
})
// Perform lookup for a large input array of keys and writes the
// looked up values into previously allocated array (works for the
Expand Down
20 changes: 16 additions & 4 deletions apis/python/tests/test_indexer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import List, Union

import numpy as np
Expand All @@ -22,7 +23,7 @@ def test_duplicate_key_indexer_error(
):
context = _validate_soma_tiledb_context(SOMATileDBContext())
with pytest.raises(RuntimeError, match="There are duplicate keys."):
tiledbsoma_build_index(keys, context=context)
tiledbsoma_build_index(keys, context=context.native_context)

pd_index = pd.Index(keys)
with pytest.raises(pd.errors.InvalidIndexError):
Expand Down Expand Up @@ -89,8 +90,19 @@ def test_duplicate_key_indexer_error(
)
def test_indexer(keys: np.array, lookups: np.array):
context = _validate_soma_tiledb_context(SOMATileDBContext())
indexer = tiledbsoma_build_index(keys, context=context)
results = indexer.get_indexer(lookups)
all_results = []
num_threads = 10

def target():
indexer = tiledbsoma_build_index(keys, context=context.native_context)
results = indexer.get_indexer(lookups)
all_results.append(results)

for t in range(num_threads):
thread = threading.Thread(target=target, args=())
thread.start()
thread.join()
panda_indexer = pd.Index(keys)
panda_results = panda_indexer.get_indexer(lookups)
np.testing.assert_equal(results.all(), panda_results.all())
for i in range(num_threads):
np.testing.assert_equal(all_results[i].all(), panda_results.all())
2 changes: 1 addition & 1 deletion apis/python/tests/test_reindexer_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ def test_reindexer_api_context():
keys = np.arange(3, 10, 2)
ids = np.arange(3, 10, 2)
expected = np.array([0, 1, 2, 3])
indexer = tiledbsoma_build_index(keys, context=context)
indexer = tiledbsoma_build_index(keys, context=context.native_context)
result = indexer.get_indexer(ids)
assert np.equal(result.all(), expected.all())
1 change: 1 addition & 0 deletions libtiledbsoma/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ add_library(TILEDB_SOMA_OBJECTS OBJECT
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_collection.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_experiment.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_measurement.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_context.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dataframe.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dense_ndarray.cc
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_sparse_ndarray.cc
Expand Down
37 changes: 14 additions & 23 deletions libtiledbsoma/src/reindexer/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <thread>
#include "khash.h"
#include "soma/enums.h"
#include "soma/soma_array.h"
#include "soma/soma_context.h"
#include "utils/arrow_adapter.h"
#include "utils/common.h"
#include "utils/logger.h"
Expand All @@ -46,28 +46,22 @@ KHASH_MAP_INIT_INT64(m64, int64_t)

namespace tiledbsoma {

void IntIndexer::map_locations(
const int64_t* keys, size_t size, size_t threads) {
void IntIndexer::map_locations(const int64_t* keys, size_t size) {
map_size_ = size;

// Handling edge cases
if (size == 0) {
return;
}
if (threads == 0) {
throw std::runtime_error("Re-indexer thread count cannot be zero.");
}

hash_ = kh_init(m64);
kh_resize(m64, hash_, size * 1.25);
int ret;
khint64_t k;
int64_t counter = 0;
// Hash map construction
LOG_DEBUG(fmt::format(
"[Re-indexer] Start of Map locations with {} keys and {} threads",
size,
threads));
LOG_DEBUG(
fmt::format("[Re-indexer] Start of Map locations with {} keys", size));
for (size_t i = 0; i < size; i++) {
k = kh_put(m64, hash_, keys[i], &ret);
assert(k != kh_end(hash_));
Expand All @@ -79,9 +73,7 @@ void IntIndexer::map_locations(
}
auto hsize = kh_size(hash_);
LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize));
if (threads > 1) {
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>(threads);
}

LOG_DEBUG(
fmt::format("[Re-indexer] Thread pool started and hash table created"));
}
Expand All @@ -90,7 +82,9 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
if (size == 0) {
return;
}
if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1
// Single thread checks
if (context_ == nullptr || context_->thread_pool() == nullptr ||
context_->thread_pool()->concurrency_level() == 1) {
for (int i = 0; i < size; i++) {
auto k = kh_get(m64, hash_, keys[i]);
if (k == kh_end(hash_)) {
Expand All @@ -104,12 +98,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Lookup with thread concurrency {} on data size {}",
tiledb_thread_pool_->concurrency_level(),
context_->thread_pool()->concurrency_level(),
size));

std::vector<tiledbsoma::ThreadPool::Task> tasks;

size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level();
size_t thread_chunk_size = size /
context_->thread_pool()->concurrency_level();
if (thread_chunk_size == 0) {
thread_chunk_size = 1;
}
Expand All @@ -122,7 +117,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
}
LOG_DEBUG(fmt::format(
"Creating tileDB task for the range from {} to {} ", start, end));
tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute(
tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute(
[this, start, end, &results, &keys]() {
for (size_t i = start; i < end; i++) {
auto k = kh_get(m64, hash_, keys[i]);
Expand All @@ -142,7 +137,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) {
start,
end));
}
tiledb_thread_pool_->wait_all(tasks);
context_->thread_pool()->wait_all(tasks);
}

IntIndexer::~IntIndexer() {
Expand All @@ -151,8 +146,4 @@ IntIndexer::~IntIndexer() {
}
}

IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) {
map_locations(keys, size, threads);
}

} // namespace tiledbsoma
} // namespace tiledbsoma
22 changes: 8 additions & 14 deletions libtiledbsoma/src/reindexer/reindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct kh_m64_s;

namespace tiledbsoma {

class ThreadPool;
class SOMAContext;

class IntIndexer {
public:
Expand All @@ -53,9 +53,9 @@ class IntIndexer {
* @param size yhr number of keys in the put
* @param threads number of threads in the thread pool
*/
void map_locations(const int64_t* keys, size_t size, size_t threads = 4);
void map_locations(const std::vector<int64_t>& keys, size_t threads = 4) {
map_locations(keys.data(), keys.size(), threads);
void map_locations(const int64_t* keys, size_t size);
void map_locations(const std::vector<int64_t>& keys) {
map_locations(keys.data(), keys.size());
}
/**
* Used for parallel lookup using khash
Expand All @@ -74,12 +74,8 @@ class IntIndexer {
lookup(keys.data(), results.data(), keys.size());
}
IntIndexer(){};
/**
* Constructor with the same arguments as map_locations
*/
IntIndexer(const int64_t* keys, int size, int threads);
IntIndexer(const std::vector<int64_t>& keys, int threads)
: IntIndexer(keys.data(), keys.size(), threads) {
IntIndexer(std::shared_ptr<tiledbsoma::SOMAContext> context) {
context_ = context;
}
virtual ~IntIndexer();

Expand All @@ -88,10 +84,8 @@ class IntIndexer {
* The created 64bit hash table
*/
kh_m64_s* hash_;
/*
* TileDB threadpool
*/
std::shared_ptr<tiledbsoma::ThreadPool> tiledb_thread_pool_ = nullptr;

std::shared_ptr<SOMAContext> context_ = nullptr;
/*
* Number of elements in the map set by map_locations
*/
Expand Down
2 changes: 1 addition & 1 deletion libtiledbsoma/src/reindexer/test_indexer_dtatye_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def build(keys, pandas):
indexer.get_indexer([1])
else:
perf_counter()
indexer = tiledbsoma_build_index(keys, context=context)
indexer = tiledbsoma_build_index(keys, context=context.native_context)
return indexer


Expand Down
49 changes: 49 additions & 0 deletions libtiledbsoma/src/soma/soma_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* @file soma_context.h
*
* @section LICENSE
*
* The MIT License
*
* @copyright Copyright (c) 2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* @section DESCRIPTION
*
* This file defines the SOMAContext class.
*/
#include "soma_context.h"
#include <thread_pool/thread_pool.h>

namespace tiledbsoma {

void SOMAContext::create_thread_pool() {
// Extracting concurrency from tiledb config
auto cfg = tiledb_config();
int concurrency = 10;
if (cfg.find("sm.compute_concurrency_level") != cfg.end()) {
concurrency = std::stoi(cfg["sm.compute_concurrency_level"]);
}
int thread_count = std::max(1, concurrency / 2);
if (thread_count > 1) {
thread_pool_ = std::make_shared<ThreadPool>(thread_count);
}
}
} // namespace tiledbsoma
25 changes: 23 additions & 2 deletions libtiledbsoma/src/soma/soma_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
#define SOMA_CONTEXT

#include <map>
#include <mutex>
#include <string>
#include <tiledb/tiledb>

namespace tiledbsoma {
class ThreadPool;

using namespace tiledb;

Expand All @@ -47,10 +49,12 @@ class SOMAContext {
//= public non-static
//===================================================================
SOMAContext()
: ctx_(std::make_shared<Context>(Config({}))){};
: ctx_(std::make_shared<Context>(Config({})))

Check warning on line 52 in libtiledbsoma/src/soma/soma_context.h

View check run for this annotation

Codecov / codecov/patch

libtiledbsoma/src/soma/soma_context.h#L52

Added line #L52 was not covered by tests
, thread_pool_mutex_(){};

SOMAContext(std::map<std::string, std::string> platform_config)
: ctx_(std::make_shared<Context>(Config(platform_config))){};
: ctx_(std::make_shared<Context>(Config(platform_config)))
, thread_pool_mutex_(){};

Check warning on line 57 in libtiledbsoma/src/soma/soma_context.h

View check run for this annotation

Codecov / codecov/patch

libtiledbsoma/src/soma/soma_context.h#L56-L57

Added lines #L56 - L57 were not covered by tests

bool operator==(const SOMAContext& other) const {
return ctx_ == other.ctx_;
Expand All @@ -67,13 +71,30 @@ class SOMAContext {
return cfg;
}

std::shared_ptr<ThreadPool>& thread_pool() {
const std::lock_guard<std::mutex> lock(thread_pool_mutex_);
// The first thread that gets here will create the context thread pool
if (thread_pool_ == nullptr) {
create_thread_pool();
}
return thread_pool_;
}

void create_thread_pool();

private:
//===================================================================
//= private non-static
//===================================================================

// TileDB context
std::shared_ptr<Context> ctx_;

// Threadpool
std::shared_ptr<ThreadPool> thread_pool_ = nullptr;

Check warning on line 94 in libtiledbsoma/src/soma/soma_context.h

View check run for this annotation

Codecov / codecov/patch

libtiledbsoma/src/soma/soma_context.h#L94

Added line #L94 was not covered by tests

// Semaphore to create and use the thread_pool
std::mutex thread_pool_mutex_;
};
} // namespace tiledbsoma

Expand Down
Loading

0 comments on commit d068793

Please sign in to comment.