diff --git a/apis/python/src/tiledbsoma/_experiment.py b/apis/python/src/tiledbsoma/_experiment.py index 2b277be62d..87cfe7c8df 100644 --- a/apis/python/src/tiledbsoma/_experiment.py +++ b/apis/python/src/tiledbsoma/_experiment.py @@ -95,6 +95,6 @@ def axis_query( # type: ignore obs_query=obs_query or query.AxisQuery(), var_query=var_query or query.AxisQuery(), index_factory=functools.partial( - tiledbsoma_build_index, context=self.context + tiledbsoma_build_index, context=self.context.native_context ), ) diff --git a/apis/python/src/tiledbsoma/_index_util.py b/apis/python/src/tiledbsoma/_index_util.py index 433e0433e0..a317c6d184 100644 --- a/apis/python/src/tiledbsoma/_index_util.py +++ b/apis/python/src/tiledbsoma/_index_util.py @@ -36,12 +36,6 @@ def tiledbsoma_build_index( Lifecycle: Experimental. """ - if context is not None: - tdb_concurrency = int( - context.tiledb_ctx.config().get("sm.compute_concurrency_level", 10) - ) - thread_count = max(1, tdb_concurrency // 2) - - reindexer = clib.IntIndexer() - reindexer.map_locations(keys, thread_count) + reindexer = clib.IntIndexer(context) + reindexer.map_locations(keys) return reindexer # type: ignore[no-any-return] diff --git a/apis/python/src/tiledbsoma/_read_iters.py b/apis/python/src/tiledbsoma/_read_iters.py index 6004fcd314..21a828ea1d 100644 --- a/apis/python/src/tiledbsoma/_read_iters.py +++ b/apis/python/src/tiledbsoma/_read_iters.py @@ -133,8 +133,11 @@ def __init__( # build indexers, as needed self.axes_to_reindex = set(range(self.ndim)) - set(self.reindex_disable_on_axis) + assert context is not None self.minor_axes_indexer = { - d: tiledbsoma_build_index(self.joinids[d].to_numpy(), context=context) + d: tiledbsoma_build_index( + self.joinids[d].to_numpy(), context=context.native_context + ) for d in (self.axes_to_reindex - set((self.major_axis,))) } @@ -251,8 +254,9 @@ def _reindexed_table_reader( col = tbl.column(f"soma_dim_{d}") if d in self.axes_to_reindex: if d == self.major_axis: + assert self.context is not None col = tiledbsoma_build_index( - coords[self.major_axis], context=self.context + coords[self.major_axis], context=self.context.native_context ).get_indexer( col.to_numpy(), ) diff --git a/apis/python/src/tiledbsoma/reindexer.cc b/apis/python/src/tiledbsoma/reindexer.cc index d9cbf6f3ba..0e9b2b2da1 100644 --- a/apis/python/src/tiledbsoma/reindexer.cc +++ b/apis/python/src/tiledbsoma/reindexer.cc @@ -135,16 +135,14 @@ void load_reindexer(py::module& m) { // between 0 and number of keys - 1) based on khash py::class_(m, "IntIndexer") .def(py::init<>()) - .def(py::init&, int>()) + .def(py::init>()) .def( "map_locations", - [](IntIndexer& indexer, - py::array_t keys, - int num_threads) { + [](IntIndexer& indexer, py::array_t keys) { auto buffer = keys.request(); int64_t* data = static_cast(buffer.ptr); size_t length = buffer.shape[0]; - indexer.map_locations(keys.data(), keys.size(), num_threads); + indexer.map_locations(keys.data(), keys.size()); }) // Perform lookup for a large input array of keys and writes the // looked up values into previously allocated array (works for the diff --git a/apis/python/tests/test_indexer.py b/apis/python/tests/test_indexer.py index 3493b62f44..b3d787b4b3 100644 --- a/apis/python/tests/test_indexer.py +++ b/apis/python/tests/test_indexer.py @@ -1,3 +1,4 @@ +import threading from typing import List, Union import numpy as np @@ -22,7 +23,7 @@ def test_duplicate_key_indexer_error( ): context = _validate_soma_tiledb_context(SOMATileDBContext()) with pytest.raises(RuntimeError, match="There are duplicate keys."): - tiledbsoma_build_index(keys, context=context) + tiledbsoma_build_index(keys, context=context.native_context) pd_index = pd.Index(keys) with pytest.raises(pd.errors.InvalidIndexError): @@ -89,8 +90,19 @@ def test_duplicate_key_indexer_error( ) def test_indexer(keys: np.array, lookups: np.array): context = _validate_soma_tiledb_context(SOMATileDBContext()) - indexer = tiledbsoma_build_index(keys, context=context) - results = indexer.get_indexer(lookups) + all_results = [] + num_threads = 10 + + def target(): + indexer = tiledbsoma_build_index(keys, context=context.native_context) + results = indexer.get_indexer(lookups) + all_results.append(results) + + for t in range(num_threads): + thread = threading.Thread(target=target, args=()) + thread.start() + thread.join() panda_indexer = pd.Index(keys) panda_results = panda_indexer.get_indexer(lookups) - np.testing.assert_equal(results.all(), panda_results.all()) + for i in range(num_threads): + np.testing.assert_equal(all_results[i].all(), panda_results.all()) diff --git a/apis/python/tests/test_reindexer_api.py b/apis/python/tests/test_reindexer_api.py index 9f24fb113f..993564963e 100644 --- a/apis/python/tests/test_reindexer_api.py +++ b/apis/python/tests/test_reindexer_api.py @@ -17,6 +17,6 @@ def test_reindexer_api_context(): keys = np.arange(3, 10, 2) ids = np.arange(3, 10, 2) expected = np.array([0, 1, 2, 3]) - indexer = tiledbsoma_build_index(keys, context=context) + indexer = tiledbsoma_build_index(keys, context=context.native_context) result = indexer.get_indexer(ids) assert np.equal(result.all(), expected.all()) diff --git a/libtiledbsoma/src/CMakeLists.txt b/libtiledbsoma/src/CMakeLists.txt index 8bd8639193..4205da0319 100644 --- a/libtiledbsoma/src/CMakeLists.txt +++ b/libtiledbsoma/src/CMakeLists.txt @@ -58,6 +58,7 @@ add_library(TILEDB_SOMA_OBJECTS OBJECT ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_collection.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_experiment.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_measurement.cc + ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_context.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dataframe.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_dense_ndarray.cc ${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_sparse_ndarray.cc diff --git a/libtiledbsoma/src/reindexer/reindexer.cc b/libtiledbsoma/src/reindexer/reindexer.cc index fbd52fcc02..8f1e4fcaa6 100644 --- a/libtiledbsoma/src/reindexer/reindexer.cc +++ b/libtiledbsoma/src/reindexer/reindexer.cc @@ -36,7 +36,7 @@ #include #include "khash.h" #include "soma/enums.h" -#include "soma/soma_array.h" +#include "soma/soma_context.h" #include "utils/arrow_adapter.h" #include "utils/common.h" #include "utils/logger.h" @@ -46,17 +46,13 @@ KHASH_MAP_INIT_INT64(m64, int64_t) namespace tiledbsoma { -void IntIndexer::map_locations( - const int64_t* keys, size_t size, size_t threads) { +void IntIndexer::map_locations(const int64_t* keys, size_t size) { map_size_ = size; // Handling edge cases if (size == 0) { return; } - if (threads == 0) { - throw std::runtime_error("Re-indexer thread count cannot be zero."); - } hash_ = kh_init(m64); kh_resize(m64, hash_, size * 1.25); @@ -64,10 +60,8 @@ void IntIndexer::map_locations( khint64_t k; int64_t counter = 0; // Hash map construction - LOG_DEBUG(fmt::format( - "[Re-indexer] Start of Map locations with {} keys and {} threads", - size, - threads)); + LOG_DEBUG( + fmt::format("[Re-indexer] Start of Map locations with {} keys", size)); for (size_t i = 0; i < size; i++) { k = kh_put(m64, hash_, keys[i], &ret); assert(k != kh_end(hash_)); @@ -79,9 +73,7 @@ void IntIndexer::map_locations( } auto hsize = kh_size(hash_); LOG_DEBUG(fmt::format("[Re-indexer] khash size = {}", hsize)); - if (threads > 1) { - tiledb_thread_pool_ = std::make_unique(threads); - } + LOG_DEBUG( fmt::format("[Re-indexer] Thread pool started and hash table created")); } @@ -90,7 +82,9 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { if (size == 0) { return; } - if (tiledb_thread_pool_ == nullptr) { // When concurrency is 1 + // Single thread checks + if (context_ == nullptr || context_->thread_pool() == nullptr || + context_->thread_pool()->concurrency_level() == 1) { for (int i = 0; i < size; i++) { auto k = kh_get(m64, hash_, keys[i]); if (k == kh_end(hash_)) { @@ -104,12 +98,13 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Lookup with thread concurrency {} on data size {}", - tiledb_thread_pool_->concurrency_level(), + context_->thread_pool()->concurrency_level(), size)); std::vector tasks; - size_t thread_chunk_size = size / tiledb_thread_pool_->concurrency_level(); + size_t thread_chunk_size = size / + context_->thread_pool()->concurrency_level(); if (thread_chunk_size == 0) { thread_chunk_size = 1; } @@ -122,7 +117,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { } LOG_DEBUG(fmt::format( "Creating tileDB task for the range from {} to {} ", start, end)); - tiledbsoma::ThreadPool::Task task = tiledb_thread_pool_->execute( + tiledbsoma::ThreadPool::Task task = context_->thread_pool()->execute( [this, start, end, &results, &keys]() { for (size_t i = start; i < end; i++) { auto k = kh_get(m64, hash_, keys[i]); @@ -142,7 +137,7 @@ void IntIndexer::lookup(const int64_t* keys, int64_t* results, int size) { start, end)); } - tiledb_thread_pool_->wait_all(tasks); + context_->thread_pool()->wait_all(tasks); } IntIndexer::~IntIndexer() { @@ -151,8 +146,4 @@ IntIndexer::~IntIndexer() { } } -IntIndexer::IntIndexer(const int64_t* keys, int size, int threads) { - map_locations(keys, size, threads); -} - -} // namespace tiledbsoma \ No newline at end of file +} // namespace tiledbsoma diff --git a/libtiledbsoma/src/reindexer/reindexer.h b/libtiledbsoma/src/reindexer/reindexer.h index 5996ab5065..2a7a39cbdd 100644 --- a/libtiledbsoma/src/reindexer/reindexer.h +++ b/libtiledbsoma/src/reindexer/reindexer.h @@ -43,7 +43,7 @@ struct kh_m64_s; namespace tiledbsoma { -class ThreadPool; +class SOMAContext; class IntIndexer { public: @@ -53,9 +53,9 @@ class IntIndexer { * @param size yhr number of keys in the put * @param threads number of threads in the thread pool */ - void map_locations(const int64_t* keys, size_t size, size_t threads = 4); - void map_locations(const std::vector& keys, size_t threads = 4) { - map_locations(keys.data(), keys.size(), threads); + void map_locations(const int64_t* keys, size_t size); + void map_locations(const std::vector& keys) { + map_locations(keys.data(), keys.size()); } /** * Used for parallel lookup using khash @@ -74,12 +74,8 @@ class IntIndexer { lookup(keys.data(), results.data(), keys.size()); } IntIndexer(){}; - /** - * Constructor with the same arguments as map_locations - */ - IntIndexer(const int64_t* keys, int size, int threads); - IntIndexer(const std::vector& keys, int threads) - : IntIndexer(keys.data(), keys.size(), threads) { + IntIndexer(std::shared_ptr context) { + context_ = context; } virtual ~IntIndexer(); @@ -88,10 +84,8 @@ class IntIndexer { * The created 64bit hash table */ kh_m64_s* hash_; - /* - * TileDB threadpool - */ - std::shared_ptr tiledb_thread_pool_ = nullptr; + + std::shared_ptr context_ = nullptr; /* * Number of elements in the map set by map_locations */ diff --git a/libtiledbsoma/src/reindexer/test_indexer_dtatye_perf.py b/libtiledbsoma/src/reindexer/test_indexer_dtatye_perf.py index 6b152b7e0c..f4fd4f4b92 100644 --- a/libtiledbsoma/src/reindexer/test_indexer_dtatye_perf.py +++ b/libtiledbsoma/src/reindexer/test_indexer_dtatye_perf.py @@ -20,7 +20,7 @@ def build(keys, pandas): indexer.get_indexer([1]) else: perf_counter() - indexer = tiledbsoma_build_index(keys, context=context) + indexer = tiledbsoma_build_index(keys, context=context.native_context) return indexer diff --git a/libtiledbsoma/src/soma/soma_context.cc b/libtiledbsoma/src/soma/soma_context.cc new file mode 100644 index 0000000000..be724ad290 --- /dev/null +++ b/libtiledbsoma/src/soma/soma_context.cc @@ -0,0 +1,49 @@ +/** + * @file soma_context.h + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2024 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * This file defines the SOMAContext class. + */ +#include "soma_context.h" +#include + +namespace tiledbsoma { + +void SOMAContext::create_thread_pool() { + // Extracting concurrency from tiledb config + auto cfg = tiledb_config(); + int concurrency = 10; + if (cfg.find("sm.compute_concurrency_level") != cfg.end()) { + concurrency = std::stoi(cfg["sm.compute_concurrency_level"]); + } + int thread_count = std::max(1, concurrency / 2); + if (thread_count > 1) { + thread_pool_ = std::make_shared(thread_count); + } +} +} // namespace tiledbsoma diff --git a/libtiledbsoma/src/soma/soma_context.h b/libtiledbsoma/src/soma/soma_context.h index 0766113082..4c2fdc483c 100644 --- a/libtiledbsoma/src/soma/soma_context.h +++ b/libtiledbsoma/src/soma/soma_context.h @@ -34,10 +34,12 @@ #define SOMA_CONTEXT #include +#include #include #include namespace tiledbsoma { +class ThreadPool; using namespace tiledb; @@ -47,10 +49,12 @@ class SOMAContext { //= public non-static //=================================================================== SOMAContext() - : ctx_(std::make_shared(Config({}))){}; + : ctx_(std::make_shared(Config({}))) + , thread_pool_mutex_(){}; SOMAContext(std::map platform_config) - : ctx_(std::make_shared(Config(platform_config))){}; + : ctx_(std::make_shared(Config(platform_config))) + , thread_pool_mutex_(){}; bool operator==(const SOMAContext& other) const { return ctx_ == other.ctx_; @@ -67,6 +71,17 @@ class SOMAContext { return cfg; } + std::shared_ptr& thread_pool() { + const std::lock_guard lock(thread_pool_mutex_); + // The first thread that gets here will create the context thread pool + if (thread_pool_ == nullptr) { + create_thread_pool(); + } + return thread_pool_; + } + + void create_thread_pool(); + private: //=================================================================== //= private non-static @@ -74,6 +89,12 @@ class SOMAContext { // TileDB context std::shared_ptr ctx_; + + // Threadpool + std::shared_ptr thread_pool_ = nullptr; + + // Semaphore to create and use the thread_pool + std::mutex thread_pool_mutex_; }; } // namespace tiledbsoma diff --git a/libtiledbsoma/test/test_indexer.cc b/libtiledbsoma/test/test_indexer.cc index c99d13ff61..4f507940b6 100644 --- a/libtiledbsoma/test/test_indexer.cc +++ b/libtiledbsoma/test/test_indexer.cc @@ -31,8 +31,10 @@ */ #include +#include #include #include +#include #include #include #include @@ -57,8 +59,9 @@ bool run_test(int id, std::vector keys, std::vector lookups) { try { std::vector indexer_results; indexer_results.resize(lookups.size()); + auto context = std::make_shared(); - tiledbsoma::IntIndexer indexer; + tiledbsoma::IntIndexer indexer(context); indexer.map_locations(keys); auto* hash = kh_init(m64); int ret;