-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid stuck map operation when subprocesses crashes #5976
Avoid stuck map operation when subprocesses crashes #5976
Conversation
Hi ! Do you think this can be fixed at the Pool level ? Ideally it should be the Pool responsibility to handle this, not the |
@lhoestq it makes sense to me. Just pushed a refactoring creating a |
The documentation is not available anymore as the PR was closed or merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice ! Some additional suggestions:
src/datasets/utils/py_utils.py
Outdated
def iflatmap_unordered( | ||
pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool], | ||
pool: ProcessPool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need to keep Union[multiprocessing.pool.Pool, multiprocess.pool.Pool]
support in iflatmap_unordered
I managed to raise an error without subclassing Pool with two additions to
original_pool = list(pool._pool)
if any(async_result._pool != original_pool for async_result in async_results) and queue.empty():
raise RuntimeError(
"One of the subprocesses has abruptly died during map operation."
"To debug the error, disable multiprocessing."
) It's still a fix that only works for |
e6a19f3
to
45a5ee0
Compare
45a5ee0
to
2ad4253
Compare
2ad4253
to
095d5ea
Compare
@lhoestq sorry for the delay. Busy weeks here. I just pushed the change you requested. It looks closer to the original proposal, actually. It seems that |
Yes fixing iflatmap_unordered does fix Dataset.map, but it won't fix any Pool.map that we may use elsewhere so we'll have to keep this in mind. |
It looks all good to me, feel free to fix code formatting by running |
Right, I agree. The best way moving forward is probably not using the buggy Anyway, I've run |
It looks like checking the async_result._pool doesn't always work - sorry about that. We might just go back to your original solution then. Would also be cool to open an issue in |
@lhoestq no problem! Reverted to the previous version. TBH, given the discussions in this python issue, I don't think the error in |
Co-authored-by: Quentin Lhoest <42851186+lhoestq@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks !
Show benchmarksPyArrow==8.0.0 Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
|
I've been using Dataset.map() with
num_proc=os.cpu_count()
to leverage multicore processing for my datasets, but from time to time I get stuck processes waiting forever. Apparently, when one of the subprocesses is abruptly killed (OOM killer, segfault, SIGKILL, etc), the main process keeps waiting for the async task sent to that child process to finish.It seems to be easy to reproduce the issue with the following script:
This is an old behavior in Python, which apparently was fixed a few years ago in
concurrent.futures.ProcessPoolExecutor
(ref), but not inmultiprocessing.pool.Pool
/multiprocess.pool.Pool
, which is used byDataset.map
(ref).This PR is an idea to try to detect when a child process gets killed, and raises a
RuntimeError
warning the dataset.map() caller.EDIT: Related proposal for future improvement: #5977