Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utilities for parallelization #8320

Merged
merged 4 commits into from
Jun 29, 2020
Merged

Add utilities for parallelization #8320

merged 4 commits into from
Jun 29, 2020

Conversation

McSinyx
Copy link
Contributor

@McSinyx McSinyx commented May 25, 2020

This adds utils.parallel.map_{multiprocess,multiprocess}. It is to settle a fallback mechanism for worker pools to resolve GH-8169. Additionally, I want to use this as the place to discuss of the future use of this module. To avoid situation like GH-8161, it'd be really nice if we can have parallelization as an toggle-able unstable feature and frequent prereleases to attract more feedbacks, especially from those using more obscure platforms. Edit: I forget to run pre-commit before commit again.

cc @bmartinn on map_multiprocess

@McSinyx McSinyx marked this pull request as ready for review May 25, 2020 15:01
@McSinyx McSinyx force-pushed the pools branch 2 times, most recently from 9243eb8 to 498beac Compare May 25, 2020 15:11
@McSinyx
Copy link
Contributor Author

McSinyx commented May 29, 2020

@uranusjr, I saw you giving this a thumb up while this was a draft. Since I've finished the tests, may I have a full review now?

@pradyunsg
Copy link
Member

I do think that we should add an additional wrapper, to make using this a lot more transparent for the sequential case. We should also clean up the implementation as well, to remove the need for the try-except patterns checking if things are usable on every run.

I'm basically imagining this file would do all the checks on-import, and then, we'd have a single entry point, which can be used to dictate the processing: map_parallel(func, iterable). This would gracefully fall back to a regular map, while using whichever mechanism the user has requested for.


In terms of how we do this, I think that we should just straight up assume threading support exists on the platform, and exit gracefully in every pip command if it doesn't. We shouldn't use multiprocessing because it's very slow on some platforms which defeats the purpose of the parallelization! (overhead > benefit)

In terms of rolling this out, a good start would be to add the logic for detecting this now, and printing a warning that things might fail (when we detect that the support doesn't exist, via the deprecated helper) and switching to exiting gracefully starting in 20.3. That'll also give us the opportunity to see if we have users who care about non-threading Python -- letting them come to us and complain. If enough people complain, we'll reconsider what to do. If needed, we'll add code for doing things synchronously, by writing another function here, that has the same side-effects as the parallel utility and falling back to that on such platforms.


In terms of the implementation, We need to care about all the caveats of parallelization here, and it's one thing that we can't escape.

I think we should not guarantee the order of the returned iterable. Instead, we should use a callback/error_callback based approach here (they're arguments on Pool methods) along with a .join() at the end of these helpers. This means that the callbacks would need to use queues for pushing information back to the main thread, which would denote progress, handle proper error messaging etc. Anything needed to make implementing that easier (i.e. reuable bits!) would be what we'd put into this module, so that we'd have the ability to keep the final code as clean as possible.

Basically, these utilities would need some amount of effort put into them, in exchange for being closer to asyncio in terms of how-stuff-works (I know it's not exact, but it's closer), while not having *that much of implementation work and fitting in cleanly.


Also, timeout should probably not have a default. :P

@McSinyx
Copy link
Contributor Author

McSinyx commented May 30, 2020

I do think that we should add an additional wrapper, to make using this a lot more transparent for the sequential case.

I don't think I get what you mean by the *sequential case. Do you mean where higher-level code manually insert jobs to the pool?

We should also clean up the implementation as well, to remove the need for the try-except patterns checking if things are usable on every run.

I'm basically imagining this file would do all the checks on-import, and then, we'd have a single entry point.

I've thought of that and I'm neither fully with nor against it:

  • It does make the implementation of the functions in this module cleaner, however we'll need to do the if ...: def ... and while CPython actually does it a lot for wrapper code, I prefer having 2 blank lines between functions 🥺
  • Performance wise: check at import-time make pip starts up (marginally) slowlier, while runtime check gives (marginal) overhead (since exceptions are unlikely and are optimized as such I think). I don't think that the overhead in either case is an important factor though.

We shouldn't use multiprocessing because it's very slow on some platforms which defeats the purpose of the parallelization!

I think I forgot to give this PR enough context. This is not only for parallel networking, but also for CPU-intensive tasks like GH-8125. A large part of the hacks here are concluded from the discussion with @bmartinn over that PR.

In terms of rolling this out, a good start would be to add the logic for detecting this now, and printing a warning that things might fail [...] That'll also give us the opportunity to see if we have users who care about non-threading Python

I think you're referring to @pfmoore's comment over #8169 (comment), which doesn't fully capture the situation: the failure is due to the lack of sem_open on Android. I do not know if threading actually works on Android though. Catching ImportError as in this PR won't fallback in case threading isn't supported, so we will know when there's user feedback. I agree with the warning too: I want to know where multiprocessing[.dummy].Pool is not usable to know the community-wise impact of the speedup.

Also, timeout should probably not have a default. :P

It's an ugly hack to make KeyboardInterrupt work on Python 2 😞

I think we should not guarantee the order of the returned iterable. Instead, we should use a callback/error_callback based approach here

I originally wanted to provide both (1) the unordered and lazy and (2) the ordered and keen, but due to the bug above, I dropped the naïve attempt on the unordered one. I'll try to add the callback variant.

@pradyunsg
Copy link
Member

this is not only for parallel networking, but also for CPU-intensive tasks like GH-8125

I presume you mean #8215? I don't think that task is CPU-intensive -- it's unpacking/moving files, which is definitely I/O bound.

I don't think subprocesses are the right approach for any of pip's "slow" parts, since all of those are I/O bound operations. :)

  • if ...: def ...

Rather, I meant:

def _parallel_whatever(...):
     assert _HAVE_THREAD_SUPPORT
     ...


def whatever(...):
    if _HAVE_THREAD_SUPPORT:
        return _parallel_whatever(...)
    # raise error, or call "non-parallel" fallback here.

I don't think I get what you mean by the *sequential case. Do you mean where higher-level code manually insert jobs to the pool?

I was referring to the case where we might want a sequential fallback for the parallel code here -- basically allowing for graceful degradation on platforms where we don't have threading support, if we want to do it that way. I don't know if we'd have to make this accommodation, and what amount of effort it'll take to "do it right"; but I do think if this might cause disruption, we should be a bunch more careful here.

check at import-time make pip starts up (marginally) slowlier, while runtime check gives (marginal) overhead (since exceptions are unlikely and are optimized as such I think).

We can optimize this later, by computing the specific value lazily. This feels like premature optimization and isn't a deal breaker IMO.

the failure is due to the lack of sem_open on Android.

I understand -- the effect is what matters though, that we can't use multiprocessing.dummy.Pool on that platform.

@bmartinn
Copy link

I presume you mean #8215? I don't think that task is CPU-intensive -- it's unpacking/moving files, which is definitely I/O bound.

Actually this is CPU bound, since the unzipping is python code, threading actually hurts performance (due to the GIL effect). This was the reason to introduce Process Pool (as opposed to the download part, that is accelerated by using threads, as it is mostly Network bounded)

I'm open to other ideas on accelerating the wheel unzipping. If the use case is a single package unzipping then this is negligible, but in case of installing an entire environment, even if the wheels are cached, just unzipping 30 packages can take above 30 seconds.

@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 1, 2020

I presume you mean 8215? I don't think that task is CPU-intensive -- it's unpacking/moving files, which is definitely I/O bound.

@pradyunsg, yes 🤣 I haven't experiment with it much so I'll take @bmartinn's word for now. To @bmartinn, it would be nice if you can post the benchmark of multithreading vs multiprocessing over that PR. I'll try to catch up later doing the same thing and we'll see if the results match.

Rather, I meant: [...]

I wonder what is the benefits of doing so? I.e. what is the difference between conditional and exception handling? Also I figured that we can import multiprocessing.synchronize instead of creating a Pool at the beginning of the module if we want to do it.

I don't get your point on the fallback part: this PR already fallback known failures to map. Relating to this, on error handling, I've done a local test and the error got propagated just right (similar to when map is used). The problem was only with non-Exception like KeyboardInterrupt and only on Python 2.

Edit: please ignore a1cdbc1, I've thought of how to improve it right after pushing.

@pradyunsg pradyunsg changed the title Add utilities for paralleliztion Add utilities for parallelization Jun 2, 2020
@McSinyx McSinyx force-pushed the pools branch 2 times, most recently from a1cdbc1 to 1b4dac5 Compare June 4, 2020 15:22
@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 4, 2020

1b4dac5 made all ugly hacks I've ever made, even throw-away code I did for competitive programming pretty 😞

Quick iteractive tests (mainly to avoid the future me from rewrite it everytime):

from __future__ import print_function
from pip._internal.utils.parallel import *

def ID(x): return x

# OK
map_multiprocess(print, range(10**7))

# Edit: this cannot be cancelled anyhow,
# but if print is replaced by time.sleep,
# KeyboardInterrupt works as expected.
map_multithread(print, range(10**7))

# Hang forever, interrupt doesn't work properly
for i in imap_multiprocess(ID, range(10**7)): print(i)
for i in imap_multithread(ID, range(10**7)): print(i)

# OK, but need to imply chunksize from input size
for i in imap_multiprocess(ID, range(10**7), 10**6): print(i)
for i in imap_multithread(ID, range(10**7), 10**6): print(i)

At this point I'm not sure if the laziness is worth the amount of hacks we need to pull out. What do you think @pradyunsg?

@bmartinn
Copy link

bmartinn commented Jun 4, 2020

Hi @McSinyx ,

Hang forever, interrupt doesn't work properly

Which python version ?
Do both process and thread version hang ?

@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 5, 2020

@bmartinn, it's on Python 3 (the Python 2 version is just iter wrapping around the list for interface compatibility), and yes, both hang. I'm on GNU/Linux if that matters.



@contextmanager
def closing(pool):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary, since pool's exit should handle closing and other details.

Copy link
Contributor Author

@McSinyx McSinyx Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for imap* to start submitting tasks to the pool for some reason. The non lazy variant doesn't need it though. I think I'll add a comment explaining why it is needed. Edit: I have, but I think my pun made it unclear so I'm gonna rephrase.

src/pip/_internal/utils/parallel.py Outdated Show resolved Hide resolved
Comment on lines 68 to 73
"""Make an iterator applying func to each element in iterable.

This function is the sequential fallback when sem_open is unavailable.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest dropping the docstrings in all the function definitions, and instead describe the functions in the module docstring (see other comment about trimming the API of this module to only 2 functions).

These internal-only function names are fairly self-explanatory, and I don't think the value-add of static analysis finding the relevant docstring is worth the duplication in this module; which makes it difficult to navigate + find relevant code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a rough sample for what the docstring could be:

"""Helpers for parallelization of higher order functions.

This module provides two helper functions, with appropriate fallbacks on Py2
and on systems lacking support for synchronization mechanisms.

- ``map_multiprocess``
- ``map_multithread``

These helpers work like `map`, with 2 differences:

- They don't guarantee the order of processing of the elements of the iterable.
- The underlying process/thread pools chop the iterable into a number of chunks,
  and the (approximate) size of these chunks can be specified by passing
  an optional keyword-only argument ``chunksize`` (positive integer).
"""

Copy link
Contributor Author

@McSinyx McSinyx Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest dropping the docstrings in all the function definitions

I'm from the ask-docs camp (which is bound to K in my editor), and I'd like to support my kind! I wonder if we keep the function docstrings, should we add the docs you suggested to the module docstring or that those should exist exclusively though.

Edit: I figure the intended API would be either map_multi* which says neither about what happens to long input nor unorderedness.

Another edit: I'm going non-ReST for the module docstring.

@pradyunsg
Copy link
Member

Windows isn't happy with these changes. :)

@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 22, 2020

Linting was also failing 😄, however test failed on py35 on Windows seems irrelevant:

def test_rmtree_retries_for_3sec(tmpdir, monkeypatch):
    """
    Test pip._internal.utils.rmtree will retry failures for no more than 3 sec
    """
    monkeypatch.setattr(shutil, 'rmtree', Failer(duration=5).call)
    with pytest.raises(OSError):
        rmtree('foo')

Edit: but it is somehow, hmmm...

Edit: this confuses me, the failing test on Windows was first on Python 3.5, then 2.7, then 3.6, all on i386. I think I made something flaky 😞

@McSinyx McSinyx closed this Jun 22, 2020
@McSinyx McSinyx reopened this Jun 22, 2020
@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 22, 2020

Could I please have your help on the failing test, @pfmoore? I can't wrap my head around its logic and I don't know how the import mocks I introduce make retry flaky (my guess because that's the only state changed by this PR).

@pfmoore
Copy link
Member

pfmoore commented Jun 22, 2020

Sorry, no idea to be honest. The failure doesn't seem to be related to the changes in this PR at all...

Oh wait, you're doing some really nasty hacks with the import mechanism. I wonder whether it's an interaction between what you're doing and the pytest plugin that runs our tests in parallel?

@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 22, 2020

runs our tests in parallel

Oh no! I think I'll revert back to the handling in d9d18ff which was easier/less hacky to test.

@McSinyx
Copy link
Contributor Author

McSinyx commented Jun 22, 2020

False alarm: that test is flaky, Imma revert back to ec6e31e then sigh

Edit: I think this is ready now.

Copy link
Member

@pradyunsg pradyunsg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There's still stuff that we might want to iterate on, but this is a good start. :)

@pradyunsg
Copy link
Member

I'm gonna go ahead and merge this in, since there hasn't been any activity over the past 4 days here. We can iterate on this further based on inputs/learning.

_import = __import__


def reload_parallel():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be called one last time after all the tests have run (maybe via an auto-use fixture at module scope) to make sure it is loaded without being affected by the tests monkeypatch.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Oct 12, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type: feature request Request for a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Multithreading and unsupported platforms
5 participants