-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[c++/python] Use a shared threadpool for the reindexer #2148
Conversation
f1d2eec
to
919480a
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #2148 +/- ##
==========================================
- Coverage 81.37% 72.01% -9.36%
==========================================
Files 84 103 +19
Lines 6335 6872 +537
Branches 213 214 +1
==========================================
- Hits 5155 4949 -206
- Misses 1081 1824 +743
Partials 99 99
Flags with carried forward coverage won't be shown. Click here to find out more.
|
9c0a904
to
54cb26d
Compare
0b2591b
to
828f5ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it is going to have one single, global thread pool, across contexts and everything else. Is this what we actually want?
if soma_indexer_concurrency is not None: | ||
clib.IntIndexer.start_thread_pool(soma_indexer_concurrency) | ||
elif tiledb_ctx is not None: | ||
tdb_concurrency = int( | ||
tiledb_ctx.config().get("sm.compute_concurrency_level", 10) | ||
) | ||
thread_count = max(1, tdb_concurrency // 2) | ||
clib.IntIndexer.start_thread_pool(thread_count) | ||
else: | ||
clib.IntIndexer.start_thread_pool(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and __del__
lead to several race conditions—if I have two threads, and an indexer running in one, then starting a second one will lead to the following:
- Start indexer A. It starts the global thread pool.
- Indexer A runs.
- Start indexer B. It starts a new global thread pool, thus deleting the old one.
- Indexer B runs. (What happens to the threads of indexer A that are still running under the old pool?)
- Indexer A completes and is garbage collected (assuming some other problem does not arise). It deletes the thread pool entirely in
__del__
. - Indexer B cannot continue to run. (Does it crash? Just freeze?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thetorpedodog thanks for the comments. Thinking about them, some clarifications:
- The thread pool lifetime is tied to creation and deletion of soma context and not the indexer
- The thread pool currently supports parallel indexers with no race issues assuming they are both attached to the same context (it uses indexers as keys when running lookups). I can add unittests for parallel lookups and constructions to show that.
- I believe the only situation that can lead to the race condition you mentioned is when we have two parallel indexers each with a different context. In that case the scenario you mentioned can take place
If you think (3) is likely, I can change the design to use one concurrent hashmap, relating each soma context object to its corresponding threadpool (context_map
). Here are the steps:
- When a context starts, it creates new threadpool and puts
<context, threadpool>
into thecontext_map
- When a new indexer is initialized (map location), it creates its hash table and add using the context it looksup its threadpool
- When looking up values, the indexer uses its threadpool to submit its lookup tasks.
- When the context get deleted, it kills the corresponding threadpool
Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already discussed in DMs but x-posting here: Instead of a global map, we could add tiledb_threadpool
as a property of TileDBSOMAContext
. It could also be lazy loaded just like how we do with TileDBSOMAContext.tiledb_ctx
. Unless I'm misunderstanding something below, there isn't already a preexisting threadpool that can be directly accessed via tiledb::Context
and has to be instantiated within TileDBSOMAContext
in Python or the upcoming SOMAContext
in C++.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the start of a thread pool in somacore (threadpool
member on the ContextBase
protocol), but it seems to have not made it fully to TileDB-SOMA yet? https://github.com/single-cell-data/SOMA/blob/76b2d2bbca7c8baf543b5cc9cf71fc5c9138a39a/python-spec/src/somacore/types.py#L87
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OIC - that makes more sense now. Yeah SOMATIleDBContext
doesn't inherit from soma.ContextBase
yet, but that'll be easy to add and then we can start utilizing the context's threadpool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, for the time being I'm gonna implement the design with the thread on top of the C++ Soma context.
std::shared_ptr<tiledbsoma::ThreadPool> | ||
IntIndexer::tiledb_thread_pool_ = nullptr; | ||
|
||
void IntIndexer::start_thread_pool(int thread_count) { | ||
tiledb_thread_pool_ = std::make_unique<tiledbsoma::ThreadPool>( | ||
thread_count); | ||
LOG_DEBUG(fmt::format( | ||
"[Re-indexer] Thread pool started with {} threads", thread_count)); | ||
} | ||
|
||
void IntIndexer::stop_thread_pool() { | ||
tiledb_thread_pool_.reset(); | ||
LOG_DEBUG("[Re-indexer] Thread stopped"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from the problems noted above, the way this thread pool is accessed globally would need lock protection—otherwise, the same pointer can be racily accessed and reset across multiple threads with no guarantee of atomicity, leading to the dreaded Undefined Behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll work on implementing the design I mentioned as add on this PR @thetorpedodog @bkmartinjr
I think it would be better to have a per-SOMATileDBContext thread pool, following the tiledb.Ctx design pattern. Open question: is the context passed to the indexer init and bound at indexer create time or passed to get_indexer. For Pandas compat, I prefer the former |
@bkmartinjr, this PR assumes there is only one SOMATileDBContext existing at any given point. In other words, we support parallel indexers as long as there is no overlapping SOMATileDBContext. Does this assumption make sense? And wrt your question, the context is not bound to |
No—the idea of the SOMATileDBContext is that multiple contexts may exist which have access to different sets of resources, etc., within the same process. Having implicit global state very much contravenes that design. Since it’s possible to access Python objects from within C++ code, it seems like the indexer should be able to run its tasks using the thread pool that already exists within the context. One way to do this might be:
|
Apologies for slow response - out of pocket today. I agree with Paul's summary of the design goal:
|
@thetorpedodog and @bkmartinjr: @nguyenv is working on a way for the context threadpool to be directly accessible by the C++. So with that in place, on the C++ side, when creating an indexer I just need to grab the the tiledb context threadpool and keep it in the indexer object to be used at the lookup time. This would require minimum changes to the indexer as well. |
I think I need clarification. Currently, the there is a unique |
Yes, I believe you have it correct. Goal is a thread pool associated with each context, and indexer lookups occur within that context (much like queries, etc), submitting to the context's threadpool. |
Now with @nguyenv work on sharing context between C++ and python, I think I can just achieve this goal by adding threadpools to C++ TileDBSOMAContext so each indexer keeps track of its own context + threadpool (lazily create the pool when lookup is needed for the first time) |
980ece9
to
af06b25
Compare
@bkmartinjr, @thetorpedodog and @nguyenv I update the PR with rebasing to @nguyenv C++ context and used that lazily create threadpool. |
@beroy - can I suggest you base this PR on the |
af06b25
to
8b72dfb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Primary request is for the handling of the IntIndexer.context_ state -- it should be set during construction, and only accessed read-only by map_locations and lookup. The current code which sets it in map_locations will potentially race if we have multiple threads calling map_locations simultaneously (I know this is currently unsupported API use, but it the change will protect us if we ever allow that)
9d4aaf3
to
4e1cbcb
Compare
std::shared_ptr<ThreadPool>& thread_pool() { | ||
return thread_pool_; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would still prefer to see create_thread_pool
get called in this SOMAContext
class rather than in the IntIndexer
. As mentioned above by @bkmartinjr, we'll also want this to be thread-safe - I would probably look into using std::unique_lock
.
context->create_thread_pool(); | ||
thread_pool_ = context->thread_pool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should call create_thread_pool
in here. See other comment.
ac51913
to
79a386e
Compare
[sc-42198] |
This pull request has been linked to Shortcut Story #42198: tiledbsoma 1.7.3. |
@nguyenv and @bkmartinjr I updated the PR based on your last set of reviews. Will appreciate feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My primary remaining question is around lazy-creation of the thread pool. It looks like @nguyenv recommended this, and it makes sense to me. As I read the code, it looks like that was not completed?
c6a77c6
to
2486537
Compare
Each C++ SOMAContext has it's own lazily created thread pool
2486537
to
d068793
Compare
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-1.7 release-1.7
# Navigate to the new working tree
cd .worktrees/backport-release-1.7
# Create a new branch
git switch --create backport-2148-to-release-1.7
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick --mainline 1 a8da3a8a735307a730a8384d6a7e5ca1030a7f73
# Push it to GitHub
git push --set-upstream origin backport-2148-to-release-1.7
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-1.7 Then, create a pull request where the |
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-1.8 release-1.8
# Navigate to the new working tree
cd .worktrees/backport-release-1.8
# Create a new branch
git switch --create backport-2148-to-release-1.8
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick --mainline 1 a8da3a8a735307a730a8384d6a7e5ca1030a7f73
# Push it to GitHub
git push --set-upstream origin backport-2148-to-release-1.8
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-1.8 Then, create a pull request where the |
|
||
SOMAContext(std::map<std::string, std::string> platform_config) | ||
: ctx_(std::make_shared<Context>(Config(platform_config))){}; | ||
: ctx_(std::make_shared<Context>(Config(platform_config))) | ||
, thread_pool_mutex_(){}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does thread_pool_mutex_()
need to be explicitly constructed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure it is necessary.
std::shared_ptr<ThreadPool>& thread_pool() { | ||
const std::lock_guard<std::mutex> lock(thread_pool_mutex_); | ||
// The first thread that gets here will create the context thread pool | ||
if (thread_pool_ == nullptr) { | ||
create_thread_pool(); | ||
} | ||
return thread_pool_; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference would be to have just a declaration here and for the implementation to actually go in soma_context.cc
.
return thread_pool_; | ||
} | ||
|
||
void create_thread_pool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be in the header file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that? It is just a declaration and not global or static.
IntIndexer(){}; | ||
/** | ||
* Constructor with the same arguments as map_locations | ||
*/ | ||
IntIndexer(const int64_t* keys, int size, int threads); | ||
IntIndexer(const std::vector<int64_t>& keys, int threads) | ||
: IntIndexer(keys.data(), keys.size(), threads) { | ||
IntIndexer(std::shared_ptr<tiledbsoma::SOMAContext> context) { | ||
context_ = context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should always have a context.
IntIndexer() : context_(make_shared<SOMAContext>()){}; // or IntIndexer() = delete;?
IntIndexer(std::shared_ptr<tiledbsoma::SOMAContext> context) : context_(context) {};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't create a new context in IntIndexer agree with other suggestions. Also we must have the default constructor needed by pybind.
if (context_ == nullptr || context_->thread_pool() == nullptr || | ||
context_->thread_pool()->concurrency_level() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just check context_->thread_pool()->concurrency_level() == 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point but in cases like this, I really prefer to be ultra conservative
Backports failed, again :( |
Issue and/or context: #2149
Changes:
Notes for Reviewer: