diff --git a/pex/common.py b/pex/common.py index a4bada46a..e3e2bd601 100644 --- a/pex/common.py +++ b/pex/common.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from typing import ( Any, + BinaryIO, Callable, DefaultDict, Iterable, @@ -34,6 +35,7 @@ Set, Sized, Tuple, + Union, ) # We use the start of MS-DOS time, which is what zipfiles use (see section 4.4.6 of @@ -220,6 +222,7 @@ def _chmod(self, info, path): @contextlib.contextmanager def open_zip(path, *args, **kwargs): + # type: (Union[str, BinaryIO], Any, Any) -> Iterator[PermPreservingZipFile] """A contextmanager for zip files. Passes through positional and kwargs to zipfile.ZipFile. @@ -585,7 +588,7 @@ def delete(self): def zip( self, - filename, # type: str + output_file, # type: Union[str, BinaryIO] mode="w", # type: str deterministic_timestamp=False, # type: bool exclude_file=lambda _: False, # type: Callable[[str], bool] @@ -603,7 +606,7 @@ def zip( selected_files = self.files() compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - with open_zip(filename, mode, compression) as zf: + with open_zip(output_file, mode, compression) as zf: def write_entry( filename, # type: str diff --git a/pex/pex_builder.py b/pex/pex_builder.py index 8805b1a05..a0bf002c6 100644 --- a/pex/pex_builder.py +++ b/pex/pex_builder.py @@ -7,6 +7,8 @@ import logging import os import shutil +import subprocess +from contextlib import contextmanager from pex import pex_warnings from pex.atomic_directory import atomic_directory @@ -20,6 +22,7 @@ safe_mkdir, safe_mkdtemp, safe_open, + temporary_dir, ) from pex.compatibility import commonpath, to_bytes from pex.compiler import Compiler @@ -28,6 +31,7 @@ from pex.environment import PEXEnvironment from pex.finders import get_entry_point_from_console_script, get_script_from_distributions from pex.interpreter import PythonInterpreter +from pex.jobs import Job from pex.layout import Layout from pex.orderedset import OrderedSet from pex.pex import PEX @@ -37,9 +41,10 @@ from pex.tracer import TRACER from pex.typing import TYPE_CHECKING from pex.util import CacheHelper +from pex.ziputils import ZipCommand if TYPE_CHECKING: - from typing import Dict, Optional + from typing import BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple class CopyMode(Enum["CopyMode.Value"]): @@ -88,14 +93,14 @@ def __maybe_run_venv__(pex, pex_root, pex_path): venv_dir = venv_dir( pex_file=pex, - pex_root=pex_root, + pex_root=pex_root, pex_hash={pex_hash!r}, has_interpreter_constraints={has_interpreter_constraints!r}, pex_path=pex_path, ) venv_pex = os.path.join(venv_dir, 'pex') if not __execute__ or not is_exe(venv_pex): - # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. + # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. return venv_dir TRACER.log('Executing venv PEX for {{}} at {{}}'.format(pex, venv_pex)) @@ -434,6 +439,7 @@ def set_header(self, header): self._header = header def _add_dist_dir(self, path, dist_name, fingerprint=None): + # type: (str, str, Optional[str]) -> str target_dir = os.path.join(self._pex_info.internal_cache, dist_name) if self._copy_mode == CopyMode.SYMLINK: self._copy_or_link(path, target_dir, label=dist_name) @@ -550,6 +556,7 @@ def _copy_or_link(self, src, dst, label=None): elif self._copy_mode == CopyMode.SYMLINK: self._chroot.symlink(src, dst, label) else: + assert self._copy_mode == CopyMode.LINK self._chroot.link(src, dst, label) def _prepare_bootstrap(self): @@ -769,31 +776,144 @@ def zip_cache_dir(path): os.path.join(internal_cache, location), ) - def _build_zipapp( - self, - filename, # type: str - deterministic_timestamp=False, # type: bool - compress=True, # type: bool - ): - # type: (...) -> None + def _cache_dists_for_stitching(self, compress): + # type: (bool) -> Dict[str, str] + merge_deps = {} # type: Dict[str, str] + with TRACER.timed("caching dists for stitched output", V=3): + for dist_label, fingerprint in self._pex_info.distributions.items(): + cache_key = "{}-{}".format( + fingerprint, "compressed" if compress else "uncompressed" + ) + cached_zip = os.path.join( + self._pex_info.pex_root, + "stitched_dists", + cache_key, + dist_label, + ) + with atomic_directory(os.path.dirname(cached_zip)) as atomic_zip_dir: + if not atomic_zip_dir.is_finalized(): + atomic_output_file = os.path.join( + atomic_zip_dir.work_dir, os.path.basename(cached_zip) + ) + with TRACER.timed("caching single dist {}".format(dist_label), V=3): + self._chroot.zip( + atomic_output_file, + labels=(dist_label,), + deterministic_timestamp=True, + compress=compress, + exclude_file=is_pyc_temporary_file, + ) + assert os.path.isfile(cached_zip) + merge_deps[dist_label] = cached_zip + + return merge_deps + + @contextmanager + def _concatenate_cached_entries(self, zip_cmd, deterministic_timestamp, compress): + # type: (ZipCommand, bool, bool) -> Iterator[BinaryIO] + merge_deps = self._cache_dists_for_stitching(compress=compress) + uncached_labels = sorted(frozenset(self._chroot.labels()) - frozenset(merge_deps.keys())) + + with TRACER.timed("synthesize zipapp", V=6), temporary_dir() as td: + concatenated_nonzip = os.path.join(td, "concatenated.broken-zip") + with open(concatenated_nonzip, "w+b") as concat_f: + with TRACER.timed("zipping up uncached sources", V=3): + self._chroot.zip( + concat_f, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + labels=uncached_labels, + ) + + with TRACER.timed("concatenating cached dist zips", V=3): + # Sort the cached zips by the prefixes of the filenames they'll be + # inserting into the merged result, to get a deterministic output. + for _, path in sorted(merge_deps.items(), key=lambda x: x[0]): + with open(path, "rb") as f: + shutil.copyfileobj(f, concat_f) # type: ignore[misc] + + fixed_zip = os.path.join(td, "fixed.zip") + zip_cmd.fix_concatenated_zips(concatenated_nonzip, fixed_zip) + + with open(fixed_zip, "rb") as read_handle: + yield read_handle + + @contextmanager + def _prepare_executable_zipapp(self, filename): + # type: (str) -> Iterator[BinaryIO] with safe_open(filename, "wb") as pexfile: assert os.path.getsize(pexfile.name) == 0 pexfile.write(to_bytes("{}\n".format(self._shebang))) if self._header: pexfile.write(to_bytes(self._header)) - with TRACER.timed("Zipping PEX file."): + + yield pexfile + + chmod_plus_x(pexfile.name) + + def _uncached_zipapp( + self, + filename, # type: str + deterministic_timestamp, # type: bool + compress, # type: bool + ): + # type: (...) -> None + + # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions as + # pointers to installed wheel directories in ~/.pex/installed_wheels/... Since those + # installed wheels reside in a shared cache, they can be in-use by other processes and so + # their code may be in the process of being bytecode compiled as we attempt to zip up our + # chroot. Bytecode compilation produces ephemeral temporary pyc files that we should avoid + # copying since they are useless and inherently racy. + exclude_file = is_pyc_temporary_file + + with TRACER.timed("Zipping PEX file."), self._prepare_executable_zipapp( + filename + ) as pexfile: self._chroot.zip( - filename, - mode="a", + pexfile, deterministic_timestamp=deterministic_timestamp, - # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions - # as pointers to installed wheel directories in ~/.pex/installed_wheels/... Since - # those installed wheels reside in a shared cache, they can be in-use by other - # processes and so their code may be in the process of being bytecode compiled as we - # attempt to zip up our chroot. Bytecode compilation produces ephemeral temporary - # pyc files that we should avoid copying since they are useless and inherently - # racy. - exclude_file=is_pyc_temporary_file, compress=compress, + exclude_file=exclude_file, ) - chmod_plus_x(filename) + + def _build_zipapp( + self, + filename, # type: str + deterministic_timestamp=False, # type: bool + compress=True, # type: bool + ): + # type: (...) -> None + # Naively creating a compressed zipapp with many downloaded distributions would perform + # a lot of I/O on each pex invocation and spend a lot of CPU on compression. While + # `--no-compress` runs significantly faster, the result may also be over twice as large. + should_try_synthesizing_from_cache = bool(self._pex_info.distributions) and compress + if not should_try_synthesizing_from_cache: + self._uncached_zipapp( + filename, deterministic_timestamp=deterministic_timestamp, compress=compress + ) + return + + # However, if we have access to the `zip` command, we can employ a caching strategy. + zip_cmd = ZipCommand.find() + if zip_cmd is None: + TRACER.log( + "`zip` command was not found, so compressed dist caches could not be used", + V=1, + ) + self._uncached_zipapp( + filename, deterministic_timestamp=deterministic_timestamp, compress=compress + ) + return + + with TRACER.timed( + "cache dists and synthesize zipapp", V=9 + ), self._concatenate_cached_entries( + zip_cmd, + deterministic_timestamp=deterministic_timestamp, + compress=compress, + ) as concatenated_zip_f: + with TRACER.timed( + "copying synthesized concatenated zip to output file", V=9 + ), self._prepare_executable_zipapp(filename) as pexfile: + shutil.copyfileobj(concatenated_zip_f, pexfile) # type: ignore[misc] diff --git a/pex/ziputils.py b/pex/ziputils.py index 7e2cd0b1c..12b78696f 100644 --- a/pex/ziputils.py +++ b/pex/ziputils.py @@ -7,11 +7,14 @@ import os import shutil import struct +import subprocess +from pex.jobs import Job +from pex.tracer import TRACER from pex.typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import BinaryIO, Optional + from typing import Any, BinaryIO, ClassVar, Optional import attr # vendor:skip else: @@ -242,3 +245,36 @@ def isolate_zip(self, out_fp): if self.has_header: in_fp.seek(self.header_size, os.SEEK_SET) shutil.copyfileobj(in_fp, out_fp) + + +@attr.s(frozen=True) +class ZipCommand(object): + exe_path = attr.ib() # type: str + + _cached = None # type: ClassVar[Optional[ZipCommand]] + + @classmethod + def find(cls): + # type: () -> Optional[ZipCommand] + if cls._cached is not None: + return cls._cached + + import distutils.spawn + + zip_path = distutils.spawn.find_executable("zip") + if zip_path: + cls._cached = cls(exe_path=zip_path) + return cls._cached + + def __call__(self, *args, **kwargs): + # type: (str, Any) -> Job + command = [self.exe_path] + list(args) + zip_proc = subprocess.Popen( + command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, encoding="utf-8", **kwargs + ) + return Job(command, zip_proc) + + def fix_concatenated_zips(self, concatenated_zips_path, output_path): + # type: (str, str) -> None + with TRACER.timed("fixing up concatenated zips with `zip -FF`"): + self("-FF", concatenated_zips_path, "--out", output_path).wait()