Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add basic parallel support for listing packages #422

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nixpkgs_review/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ def common_flags() -> list[CommonFlag]:
help="Extra nixpkgs config to pass to `import <nixpkgs>`",
),
CommonFlag(
"--num-procs-eval",
"--num-parallel-evals",
type=int,
default=1,
help="Number of parallel `nix-env` processes to run simultaneously (warning, can imply heavy RAM usage)",
help="Number of parallel `nix-env`/`nix eval` processes to run simultaneously (warning, can imply heavy RAM usage)",
),
]

Expand Down
2 changes: 1 addition & 1 deletion nixpkgs_review/cli/pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def pr_command(args: argparse.Namespace) -> str:
build_graph=args.build_graph,
nixpkgs_config=nixpkgs_config,
extra_nixpkgs_config=args.extra_nixpkgs_config,
n_procs_eval=args.num_procs_eval,
num_parallel_evals=args.num_parallel_evals,
)
contexts.append((pr, builddir.path, review.build_pr(pr)))
except NixpkgsReviewError as e:
Expand Down
46 changes: 21 additions & 25 deletions nixpkgs_review/nix.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import concurrent.futures
import json
import multiprocessing as mp
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from sys import platform
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -271,32 +270,29 @@ def nix_eval(
os.unlink(attr_json.name)


def nix_eval_thread(
system: System,
attr_names: set[str],
allow: AllowedFeatures,
nix_path: str,
) -> tuple[System, list[Attr]]:
return system, nix_eval(attr_names, system, allow, nix_path)


def multi_system_eval(
attr_names_per_system: dict[System, set[str]],
allow: AllowedFeatures,
nix_path: str,
n_procs: int,
n_threads: int,
) -> dict[System, list[Attr]]:
nix_eval_partial = partial(
nix_eval_thread,
allow=allow,
nix_path=nix_path,
)

args: list[tuple[System, set[str]]] = list(attr_names_per_system.items())
with mp.Pool(n_procs) as pool:
results: list[tuple[System, list[Attr]]] = pool.starmap(nix_eval_partial, args)

return {system: attrs for system, attrs in results}
results: dict[System, list[Attr]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
nix_eval,
attrs=attrs,
system=system,
allow=allow,
nix_path=nix_path,
): system
for system, attrs in attr_names_per_system.items()
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()

return results


def nix_build(
Expand All @@ -309,7 +305,7 @@ def nix_build(
build_graph: str,
nix_path: str,
nixpkgs_config: Path,
n_procs_eval: int,
n_threads: int,
) -> dict[System, list[Attr]]:
if not attr_names_per_system:
info("Nothing to be built.")
Expand All @@ -319,7 +315,7 @@ def nix_build(
attr_names_per_system,
allow,
nix_path,
n_procs=n_procs_eval,
n_threads=n_threads,
)

filtered_per_system: dict[System, list[str]] = {}
Expand Down
75 changes: 46 additions & 29 deletions nixpkgs_review/review.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import concurrent.futures
import os
import subprocess
import sys
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(
skip_packages_regex: list[Pattern[str]] = [],
checkout: CheckoutOption = CheckoutOption.MERGE,
sandbox: bool = False,
n_procs_eval: int = 1,
num_parallel_evals: int = 1,
) -> None:
self.builddir = builddir
self.build_args = build_args
Expand Down Expand Up @@ -140,7 +141,7 @@ def __init__(
self.build_graph = build_graph
self.nixpkgs_config = nixpkgs_config
self.extra_nixpkgs_config = extra_nixpkgs_config
self.n_procs_eval = n_procs_eval
self.num_parallel_evals = num_parallel_evals

def worktree_dir(self) -> str:
return str(self.builddir.worktree_dir)
Expand Down Expand Up @@ -181,36 +182,26 @@ def build_commit(
self.git_worktree(base_commit)

# TODO: nix-eval-jobs ?
# parallel version: returning a dict[System, list[Package]]
# base_packages = list_packages(
# self.builddir.nix_path,
# self.systems,
# self.allow,
# )
base_packages = {
system: list_packages(
self.builddir.nix_path,
system,
self.allow,
)
for system in self.systems
}
base_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
n_threads=self.num_parallel_evals,
)

if reviewed_commit is None:
self.apply_unstaged(staged)
else:
self.git_merge(reviewed_commit)

# TODO: nix-eval-jobs ?
merged_packages = {
system: list_packages(
self.builddir.nix_path,
system,
self.allow,
check_meta=True,
)
for system in self.systems
}
merged_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
n_threads=self.num_parallel_evals,
check_meta=True,
)

# Systems ordered correctly (x86_64-linux, aarch64-linux, x86_64-darwin, aarch64-darwin)
sorted_systems: list[System] = sorted(
Expand Down Expand Up @@ -268,7 +259,7 @@ def build(
self.build_graph,
self.builddir.nix_path,
self.nixpkgs_config,
self.n_procs_eval,
self.num_parallel_evals,
)

def build_pr(self, pr_number: int) -> dict[System, list[Attr]]:
Expand Down Expand Up @@ -419,9 +410,9 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]:
return packages


def list_packages(
def _list_packages_system(
system: System,
nix_path: str,
system: str,
allow: AllowedFeatures,
check_meta: bool = False,
) -> list[Package]:
Expand Down Expand Up @@ -458,6 +449,32 @@ def list_packages(
return parse_packages_xml(f)


def list_packages(
nix_path: str,
systems: set[System],
allow: AllowedFeatures,
n_threads: int,
check_meta: bool = False,
) -> dict[System, list[Package]]:
results: dict[System, list[Package]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
_list_packages_system,
system=system,
nix_path=nix_path,
allow=allow,
check_meta=check_meta,
): system
for system in systems
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()

return results


def package_attrs(
package_set: set[str],
system: str,
Expand Down Expand Up @@ -633,7 +650,7 @@ def review_local_revision(
build_graph=args.build_graph,
nixpkgs_config=nixpkgs_config,
extra_nixpkgs_config=args.extra_nixpkgs_config,
n_procs_eval=args.num_procs_eval,
num_parallel_evals=args.num_parallel_evals,
)
review.review_commit(builddir.path, args.branch, commit, staged, print_result)
return builddir.path