diff --git a/apis/python/src/tiledbsoma/_index_util.py b/apis/python/src/tiledbsoma/_index_util.py index 433e0433e0..94e67a4ce7 100644 --- a/apis/python/src/tiledbsoma/_index_util.py +++ b/apis/python/src/tiledbsoma/_index_util.py @@ -40,8 +40,8 @@ def tiledbsoma_build_index( tdb_concurrency = int( context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10) ) - thread_count = max(1, tdb_concurrency // 2) + max(1, tdb_concurrency // 2) reindexer = clib.IntIndexer() - reindexer.map_locations(keys, thread_count) + reindexer.map_locations(keys, context) return reindexer # type: ignore[no-any-return] diff --git a/apis/python/src/tiledbsoma/reindexer.cc b/apis/python/src/tiledbsoma/reindexer.cc index 575c246ef6..5ed8c45ab3 100644 --- a/apis/python/src/tiledbsoma/reindexer.cc +++ b/apis/python/src/tiledbsoma/reindexer.cc @@ -46,23 +46,16 @@ void load_reindexer(py::module &m) { // between 0 and number of keys - 1) based on khash py::class_(m, "IntIndexer") .def(py::init<>()) - .def(py::init&, int>()) + .def(py::init&, std::shared_ptr>()) .def( "map_locations", [](IntIndexer& indexer, py::array_t keys, - int num_threads) { + std::shared_ptr context) { auto buffer = keys.request(); int64_t* data = static_cast(buffer.ptr); size_t length = buffer.shape[0]; - indexer.map_locations(keys.data(), keys.size(), num_threads); - }) - .def( - "map_locations", - [](IntIndexer& indexer, - std::vector keys, - int num_threads) { - indexer.map_locations(keys.data(), keys.size(), num_threads); + indexer.map_locations(keys.data(), keys.size(), context); }) // Perform lookup for a large input array of keys and return the looked // up value array (passing ownership from C++ to python) diff --git a/libtiledbsoma/src/reindexer/reindexer.cc b/libtiledbsoma/src/reindexer/reindexer.cc index fbd52fcc02..96164c3a6f 100644 --- a/libtiledbsoma/src/reindexer/reindexer.cc +++ b/libtiledbsoma/src/reindexer/reindexer.cc @@ -47,16 +47,15 @@ KHASH_MAP_INIT_INT64(m64, int64_t) namespace tiledbsoma { void IntIndexer::map_locations( - const int64_t* keys, size_t size, size_t threads) { + const int64_t* keys, + size_t size, + std::shared_ptr context) { map_size_ = size; // Handling edge cases if (size == 0) { return; } - if (threads == 0) { - throw std::runtime_error("Re-indexer thread count cannot be zero."); - } hash_ = kh_init(m64); kh_resize(m64, hash_, size * 1.25); @@ -64,6 +63,7 @@ void IntIndexer::map_locations( khint64_t k; int64_t counter = 0; // Hash map construction + int threads = 5; LOG_DEBUG(fmt::format( "[Re-indexer] Start of Map locations with {} keys and {} threads", size, @@ -79,18 +79,32 @@ void IntIndexer::map_locations( } auto hsize = kh_size(hash_); LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize)); - if (threads > 1) { - tiledb_thread_pool_ = std::make_unique(threads); + + if (threads == 0) { + throw std::runtime_error("Re-indexer thread count cannot be zero."); } LOG_DEBUG( fmt::format("[Re-indexer] Thread pool started and hash table created")); + context_ = context; } void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { if (size == 0) { return; } - if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1 + if (context_->thread_pool() == nullptr) { + auto cfg = context_->tiledb_config(); + int concurrency = 10; + if (cfg.find("sm.compute_concurrency_level") != cfg.end()) { + concurrency = std::stoi(cfg["sm.compute_concurrency_level"]); + } + int thread_count = std::max(1, concurrency / 2); + if (thread_count > 1) { + context_->thread_pool() = std::make_shared( + thread_count); + } + } + if (context_->thread_pool() == nullptr) { // When concurrency is 1 for (int i = 0; i < size; i++) { auto k = kh_get(m64, hash_, keys[i]); if (k == kh_end(hash_)) { @@ -104,12 +118,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Lookup with thread concurrency {} on data size {}", - tiledb_thread_pool_->concurrency_level(), + context_->thread_pool()->concurrency_level(), size)); std::vector tasks; - size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level(); + size_t thread_chunk_size = size / + context_->thread_pool()->concurrency_level(); if (thread_chunk_size == 0) { thread_chunk_size = 1; } @@ -122,7 +137,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Creating tileDB task for the range from {} to {} ", start, end)); - tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute( + tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute( [this, start, end, &results, &keys]() { for (size_t i = start; i < end; i++) { auto k = kh_get(m64, hash_, keys[i]); @@ -142,7 +157,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { start, end)); } - tiledb_thread_pool_->wait_all(tasks); + context_->thread_pool()->wait_all(tasks); } IntIndexer::~IntIndexer() { @@ -151,8 +166,11 @@ IntIndexer::~IntIndexer() { } } -IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) { - map_locations(keys, size, threads); +IntIndexer::IntIndexer( + const int64_t* keys, + int size, + std::shared_ptr context) { + map_locations(keys, size, context); } } // namespace tiledbsoma \ No newline at end of file diff --git a/libtiledbsoma/src/reindexer/reindexer.h b/libtiledbsoma/src/reindexer/reindexer.h index 5996ab5065..24d1215de6 100644 --- a/libtiledbsoma/src/reindexer/reindexer.h +++ b/libtiledbsoma/src/reindexer/reindexer.h @@ -44,6 +44,7 @@ struct kh_m64_s; namespace tiledbsoma { class ThreadPool; +class SOMAContext; class IntIndexer { public: @@ -53,9 +54,14 @@ class IntIndexer { * @param size yhr number of keys in the put * @param threads number of threads in the thread pool */ - void map_locations(const int64_t* keys, size_t size, size_t threads = 4); - void map_locations(const std::vector& keys, size_t threads = 4) { - map_locations(keys.data(), keys.size(), threads); + void map_locations( + const int64_t* keys, + size_t size, + std::shared_ptr context); + void map_locations( + const std::vector& keys, + std::shared_ptr context) { + map_locations(keys.data(), keys.size(), context); } /** * Used for parallel lookup using khash @@ -77,9 +83,14 @@ class IntIndexer { /** * Constructor with the same arguments as map_locations */ - IntIndexer(const int64_t* keys, int size, int threads); - IntIndexer(const std::vector& keys, int threads) - : IntIndexer(keys.data(), keys.size(), threads) { + IntIndexer( + const int64_t* keys, + int size, + std::shared_ptr context); + IntIndexer( + const std::vector& keys, + std::shared_ptr context) + : IntIndexer(keys.data(), keys.size(), context) { } virtual ~IntIndexer(); @@ -88,10 +99,8 @@ class IntIndexer { * The created 64bit hash table */ kh_m64_s* hash_; - /* - * TileDB threadpool - */ - std::shared_ptr tiledb_thread_pool_ = nullptr; + + std::shared_ptr context_ = nullptr; /* * Number of elements in the map set by map_locations */ diff --git a/libtiledbsoma/src/soma/soma_array.h b/libtiledbsoma/src/soma/soma_array.h index 695b53a7b9..b4c17ee6c4 100644 --- a/libtiledbsoma/src/soma/soma_array.h +++ b/libtiledbsoma/src/soma/soma_array.h @@ -122,7 +122,7 @@ class SOMAArray : public SOMAObject { //=================================================================== //= public non-static //=================================================================== - + /** * @brief Construct a new SOMAArray object * diff --git a/libtiledbsoma/src/soma/soma_context.h b/libtiledbsoma/src/soma/soma_context.h index 0766113082..d05962cccd 100644 --- a/libtiledbsoma/src/soma/soma_context.h +++ b/libtiledbsoma/src/soma/soma_context.h @@ -38,6 +38,7 @@ #include namespace tiledbsoma { +class ThreadPool; using namespace tiledb; @@ -67,6 +68,10 @@ class SOMAContext { return cfg; } + std::shared_ptr& thread_pool() { + return thread_pool_; + } + private: //=================================================================== //= private non-static @@ -74,6 +79,9 @@ class SOMAContext { // TileDB context std::shared_ptr ctx_; + + // Threadpool + std::shared_ptr thread_pool_ = nullptr; }; } // namespace tiledbsoma diff --git a/libtiledbsoma/test/test_indexer.cc b/libtiledbsoma/test/test_indexer.cc index c99d13ff61..784297dc48 100644 --- a/libtiledbsoma/test/test_indexer.cc +++ b/libtiledbsoma/test/test_indexer.cc @@ -31,8 +31,10 @@ */ #include +#include #include #include +#include #include #include #include @@ -57,9 +59,10 @@ bool run_test(int id, std::vector keys, std::vector lookups) { try { std::vector indexer_results; indexer_results.resize(lookups.size()); + auto context = std::make_shared(); tiledbsoma::IntIndexer indexer; - indexer.map_locations(keys); + indexer.map_locations(keys, context); auto* hash = kh_init(m64); int ret; khint64_t k;