Skip to content

Commit

Permalink
Implement pruning.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirois committed Oct 6, 2024
1 parent fdc4dfa commit 5c86209
Showing 1 changed file with 152 additions and 54 deletions.
206 changes: 152 additions & 54 deletions pex/cache/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import itertools
import os.path
import sqlite3
from collections import OrderedDict
from contextlib import closing, contextmanager

from pex.atomic_directory import atomic_directory
Expand All @@ -20,7 +21,7 @@
)
from pex.common import CopyMode
from pex.dist_metadata import ProjectNameAndVersion
from pex.typing import TYPE_CHECKING
from pex.typing import TYPE_CHECKING, overload

if TYPE_CHECKING:
from typing import (
Expand Down Expand Up @@ -58,6 +59,8 @@
bootstrap_hash TEXT NOT NULL,
code_hash TEXT NOT NULL
) WITHOUT ROWID;
CREATE INDEX zipapps_idx_bootstrap_hash ON zipapps (bootstrap_hash ASC);
CREATE INDEX zipapps_idx_code_hash ON zipapps (code_hash ASC);
CREATE TABLE zipapp_deps (
pex_hash TEXT NOT NULL REFERENCES zipapps(pex_hash) ON DELETE CASCADE,
Expand All @@ -83,21 +86,24 @@


@contextmanager
def _db_connection():
# type: () -> Iterator[sqlite3.Connection]
db_dir = CacheDir.DBS.path("deps")
with atomic_directory(db_dir) as atomic_dir:
if not atomic_dir.is_finalized():
with sqlite3.connect(os.path.join(atomic_dir.work_dir, "deps.db")) as conn:
conn.executescript(_SCHEMA).close()
with sqlite3.connect(os.path.join(db_dir, "deps.db")) as conn:
conn.executescript(
"""
PRAGMA synchronous=NORMAL;
PRAGMA foreign_keys=ON;
"""
).close()
def _db_connection(conn=None):
# type: (Optional[sqlite3.Connection]) -> Iterator[sqlite3.Connection]
if conn:
yield conn
else:
db_dir = CacheDir.DBS.path("deps")
with atomic_directory(db_dir) as atomic_dir:
if not atomic_dir.is_finalized():
with sqlite3.connect(os.path.join(atomic_dir.work_dir, "deps.db")) as conn:
conn.executescript(_SCHEMA).close()
with sqlite3.connect(os.path.join(db_dir, "deps.db")) as conn:
conn.executescript(
"""
PRAGMA synchronous=NORMAL;
PRAGMA foreign_keys=ON;
"""
).close()
yield conn


@contextmanager
Expand Down Expand Up @@ -207,24 +213,44 @@ def record_venv(coon_or_cursor):
_K = TypeVar("_K")


@overload
def _iter_key_chunks(items):
# type: (Sequence[_K]) -> Iterator[Tuple[str, Sequence[_K]]]
pass


@overload
def _iter_key_chunks(
items, # type: Sequence[_I]
extract_key, # type: Callable[[_I], _K]
):
# type: (...) -> Iterator[Tuple[str, Sequence[_K]]]
pass


def _iter_key_chunks(
items, # type: Sequence
extract_key=None, # type: Optional[Callable[[_I], _K]]
):
# type: (...) -> Iterator[Tuple[str, Sequence[_K]]]

# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 100
for index in range(0, len(items), chunk_size):
keys = tuple(map(extract_key, items[index : index + chunk_size]))
item_chunk = items[index : index + chunk_size]
keys = tuple(map(extract_key, item_chunk) if extract_key else item_chunk)
placeholders = ", ".join(itertools.repeat("?", len(keys)))
yield placeholders, keys


def _zipapp_deps(unzip_dirs):
# type: (Sequence[UnzipDir]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]
with _db_connection() as conn:
def _zipapp_deps(
unzip_dirs, # type: Sequence[UnzipDir]
connection=None, # type: Optional[sqlite3.Connection]
):
# type: (...) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]

with _db_connection(conn=connection) as conn:
for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash):
with closing(
conn.execute(
Expand Down Expand Up @@ -259,10 +285,13 @@ def _zipapp_deps(unzip_dirs):
)


def _venv_deps(venv_dirs):
# type: (Sequence[VenvDirs]) -> Iterator[InstalledWheelDir]
def _venv_deps(
venv_dirs, # type: Sequence[VenvDirs]
connection=None, # type: Optional[sqlite3.Connection]
):
# type: (...) -> Iterator[InstalledWheelDir]

with _db_connection() as conn:
with _db_connection(conn=connection) as conn:
for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash):
with closing(
conn.execute(
Expand All @@ -283,54 +312,123 @@ def _venv_deps(venv_dirs):
)


def dir_dependencies(pex_dirs):
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]
def dir_dependencies(
pex_dirs, # type: Iterable[Union[UnzipDir, VenvDirs]]
connection=None, # type: Optional[sqlite3.Connection]
):
# type: (...) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]

seen = set()
for dep in _zipapp_deps([pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)]):
if dep not in seen:
seen.add(dep)
yield dep
for dep in _venv_deps([venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)]):
if dep not in seen:
seen.add(dep)
yield dep
with _db_connection(conn=connection) as conn:
for dep in _zipapp_deps(
[pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)], connection=conn
):
if dep not in seen:
seen.add(dep)
yield dep

for dep in _venv_deps(
[venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)],
connection=conn,
):
if dep not in seen:
seen.add(dep)
yield dep


def _delete_zipapps(unzip_dirs):
# type: (Sequence[UnzipDir]) -> None
@contextmanager
def delete(pex_dirs):
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]]

with _db_connection() as conn:
for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash):
yield dir_dependencies(pex_dirs, connection=conn)

for placeholders, keys in _iter_key_chunks(
[pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)],
extract_key=lambda u: u.pex_hash,
):
conn.execute(
"DELETE FROM zipapps WHERE pex_hash IN ({keys})".format(keys=placeholders), keys
).close()


def _delete_venvs(venv_dirs):
# type: (Sequence[VenvDirs]) -> None

with _db_connection() as conn:
for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash):
for placeholders, keys in _iter_key_chunks(
[venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)],
extract_key=lambda v: v.short_hash,
):
conn.execute(
"DELETE FROM venvs WHERE short_hash IN ({keys})".format(keys=placeholders), keys
).close()


@contextmanager
def delete(pex_dirs):
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]]
def prune(
deps, # type: Iterable[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]
):
# type: (...) -> Iterator[Iterator[AtomicCacheDir]]

yield dir_dependencies(pex_dirs)
_delete_zipapps(unzip_dirs=[pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)])
_delete_venvs(
venv_dirs=[venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)]
)
with _db_connection() as conn:
bootstraps_by_hash = OrderedDict(
(dep.bootstrap_hash, dep) for dep in deps if isinstance(dep, BootstrapDir)
) # type: OrderedDict[str, BootstrapDir]
for placeholders, keys in _iter_key_chunks(tuple(bootstraps_by_hash.keys())):
with closing(
conn.execute(
"""
SELECT bootstrap_hash FROM zipapps WHERE bootstrap_hash IN ({keys})
""".format(
keys=placeholders
),
keys,
)
) as cursor:
for bootstrap_hash in cursor:
bootstraps_by_hash.pop(bootstrap_hash)

user_code_by_hash = OrderedDict(
(dep.code_hash, dep) for dep in deps if isinstance(dep, UserCodeDir)
) # type: OrderedDict[str, UserCodeDir]
for placeholders, keys in _iter_key_chunks(tuple(user_code_by_hash.keys())):
with closing(
conn.execute(
"""
SELECT code_hash FROM zipapps WHERE code_hash IN ({keys})
""".format(
keys=placeholders
),
keys,
)
) as cursor:
for code_hash in cursor:
user_code_by_hash.pop(code_hash)

@contextmanager
def prune(deps):
# type: (Iterable[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]) -> Iterator[Iterator[AtomicCacheDir]]
wheels_by_hash = OrderedDict(
(dep.install_hash, dep) for dep in deps if isinstance(dep, InstalledWheelDir)
) # type: OrderedDict[str, InstalledWheelDir]
for placeholders, keys in _iter_key_chunks(tuple(wheels_by_hash.keys())):
with closing(
conn.execute(
"""
SELECT DISTINCT wheels.install_hash
FROM wheels
LEFT JOIN zipapp_deps ON zipapp_deps.wheel_install_hash = wheels.install_hash
LEFT JOIN venv_deps ON venv_deps.wheel_install_hash = wheels.install_hash
WHERE wheels.install_hash IN ({keys}) AND (
zipapp_deps.pex_hash IS NOT NULL OR venv_deps.venv_hash IS NOT NULL
)
""".format(
keys=placeholders
),
keys,
)
) as cursor:
for install_hash in cursor:
wheels_by_hash.pop(install_hash)

# TODO(John Sirois): XXX: yield an iterator over just the subset of items that are safe to
# prune (have no dependents) and then delete that subset from the db.
pass
yield itertools.chain(
bootstraps_by_hash.values(), user_code_by_hash.values(), wheels_by_hash.values()
)

for placeholders, keys in _iter_key_chunks(tuple(wheels_by_hash.keys())):
conn.execute(
"DELETE FROM wheels WHERE install_hash in ({keys})".format(keys=placeholders), keys
).close()

0 comments on commit 5c86209

Please sign in to comment.