From ac8e83c327c2a4ce6ff3e00b5703999466a10f16 Mon Sep 17 00:00:00 2001 From: Craig de Stigter Date: Wed, 14 Apr 2021 14:52:58 +1200 Subject: [PATCH 1/4] import: Parallel fast-import processes This feeds features from the existing single import source connection to multiple (default=4) git-fast-import processes. --- sno/fast_import.py | 240 +++++++++++++++++++++++++++------------- sno/init.py | 8 ++ sno/upgrade/__init__.py | 1 + 3 files changed, 174 insertions(+), 75 deletions(-) diff --git a/sno/fast_import.py b/sno/fast_import.py index 47a08ed64..dc6f39ff5 100644 --- a/sno/fast_import.py +++ b/sno/fast_import.py @@ -2,6 +2,7 @@ import subprocess import time import uuid +from contextlib import contextmanager, ExitStack from enum import Enum, auto import click @@ -15,6 +16,8 @@ SUPPORTED_REPO_VERSION, SUPPORTED_DATASET_CLASS, ) +from .rich_tree_builder import RichTreeBuilder +from .structure import Datasets from .timestamps import minutes_to_tz_offset from .pk_generation import PkGeneratingImportSource @@ -113,11 +116,36 @@ def should_compare_imported_features_against_old_features( return False +@contextmanager +def _git_fast_import(repo, *args): + p = subprocess.Popen( + ["git", "fast-import", "--done", *args], + cwd=repo.path, + stdin=subprocess.PIPE, + env=tool_environment(), + ) + try: + yield p + p.stdin.write(b"\ndone\n") + except BrokenPipeError: + # if git-fast-import dies early, we get an EPIPE here + # we'll deal with it below + pass + else: + p.stdin.close() + p.wait() + if p.returncode != 0: + raise SubprocessError( + f"git-fast-import error! {p.returncode}", exit_code=p.returncode + ) + + def fast_import_tables( repo, sources, *, verbosity=1, + num_processes=4, header=None, message=None, replace_existing=ReplaceExisting.DONT_REPLACE, @@ -137,6 +165,7 @@ def fast_import_tables( 0: no progress information is printed to stdout. 1: basic status information 2: full output of `git-fast-import --stats ...` + num_processes: how many import processes to run in parallel header - the commit-header to supply git-fast-import. Generated if not supplied - see generate_header. message - the commit-message used when generating the header. Generated if not supplied - see generate_message. replace_existing - See ReplaceExisting enum @@ -147,6 +176,15 @@ def fast_import_tables( extra_cmd_args - any extra args for the git-fast-import command. """ + MAX_PROCESSES = 64 + if num_processes < 1: + num_processes = 1 + elif num_processes > MAX_PROCESSES: + # this is almost certainly a mistake, but also: + # we want to split 256 trees roughly evenly, and if we're trying to split them across + # too many processes it won't be very even. + raise ValueError(f"Can't import with more than {MAX_PROCESSES} processes") + # The tree this repo was at before this function was called. # May be None (repo is empty) orig_tree = repo.head_tree @@ -181,8 +219,6 @@ def fast_import_tables( sources = PkGeneratingImportSource.wrap_sources_if_needed(sources, repo) cmd = [ - "git", - "fast-import", "--done", f"--max-pack-size={max_pack_size}", f"--depth={max_delta_depth}", @@ -190,37 +226,62 @@ def fast_import_tables( if verbosity < 2: cmd.append("--quiet") - if header is None: - # import onto a temp branch. then reset the head branch afterwards. - # this allows us to check the result before updating the orig branch. - import_branch = f"refs/heads/{uuid.uuid4()}" - - # may be None, if head is detached - orig_branch = repo.head_branch - header = generate_header(repo, sources, message, import_branch) - else: - import_branch = None orig_commit = repo.head_commit + import_branches = [] if verbosity >= 1: click.echo("Starting git-fast-import...") - - p = subprocess.Popen( - [*cmd, *extra_cmd_args], - cwd=repo.path, - stdin=subprocess.PIPE, - env=tool_environment(), - ) - try: - if replace_existing != ReplaceExisting.ALL: - header += f"from {orig_commit.oid}\n" - p.stdin.write(header.encode("utf8")) + with ExitStack() as stack: + procs = [] + + # PARALLEL IMPORTING + # To do an import in parallel: + # * we only have one sno process, and one connection to the source. + # * we have multiple git-fast-import backend processes + # * we send all 'meta' blobs (anything that isn't a feature) to process 0 + # * we assign feature blobs to a process based on it's first subtree. + # (all features in tree `datasetname/feature/01` will go to process 1, etc) + # * after the importing is all done, we merge the trees together. + # * there should never be any conflicts in this merge process. + for i in range(num_processes): + if header is None: + # import onto a temp branch. then reset the head branch afterwards. + import_branch = f"refs/heads/{uuid.uuid4()}" + import_branches.append(import_branch) + + # may be None, if head is detached + orig_branch = repo.head_branch + generated_header = generate_header( + repo, sources, message, import_branch + ) + else: + generated_header = header + # this won't work if num_processes > 1 because we'll try and write to + # the same branch multiple times in parallel. + # luckily only upgrade script passes a header in, so there we just use 1 proc. + assert num_processes == 1 + proc = stack.enter_context(_git_fast_import(repo, *cmd)) + procs.append(proc) + if replace_existing != ReplaceExisting.ALL: + generated_header += f"from {orig_commit.oid}\n" + proc.stdin.write(generated_header.encode("utf8")) # Write the extra blob that records the repo's version: - for i, blob_path in write_blobs_to_stream(p.stdin, extra_blobs): + for i, blob_path in write_blobs_to_stream(procs[0].stdin, extra_blobs): if replace_existing != ReplaceExisting.ALL and blob_path in starting_tree: raise ValueError(f"{blob_path} already exists") + if num_processes == 1: + + def proc_for_feature_path(path): + return procs[0] + + else: + + def proc_for_feature_path(path): + first_subtree = int(path.rsplit("/", 3)[1], 16) + return procs[first_subtree % len(procs)] + for source in sources: replacing_dataset = None if replace_existing == ReplaceExisting.GIVEN: @@ -231,23 +292,34 @@ def fast_import_tables( replacing_dataset = None if replacing_dataset is not None: - if replace_ids is None: - # Delete the existing dataset, before we re-import it. - p.stdin.write(f"D {source.dest_path}\n".encode("utf8")) - else: - # delete and reimport meta/ - # we also delete the specified features, but we do it further down - # so that we don't have to iterate the IDs more than once. - p.stdin.write( - f"D {source.dest_path}/.sno-dataset/meta\n".encode("utf8") - ) - - # We just deleted the legends, but we still need them to reimport - # data efficiently. Copy them from the original dataset. - for x in write_blobs_to_stream( - p.stdin, replacing_dataset.iter_legend_blob_data() - ): - pass + for i, proc in enumerate(procs): + if replace_ids is None: + # Delete the existing dataset, before we re-import it. + proc.stdin.write(f"D {source.dest_path}\n".encode("utf8")) + else: + # delete and reimport meta/ + proc.stdin.write( + f"D {source.dest_path}/.sno-dataset/meta\n".encode( + "utf8" + ) + ) + # delete all features not pertaining to this process. + # we also delete the features that *do*, but we do it further down + # so that we don't have to iterate the IDs more than once. + for subtree in range(256): + if subtree % num_processes != i: + proc.stdin.write( + f"D {source.dest_path}/.sno-dataset/feature/{subtree:02x}\n".encode( + "utf8" + ) + ) + + # We just deleted the legends, but we still need them to reimport + # data efficiently. Copy them from the original dataset. + for x in write_blobs_to_stream( + proc.stdin, replacing_dataset.iter_legend_blob_data() + ): + pass dataset = dataset_class(tree=None, path=source.dest_path) @@ -274,7 +346,9 @@ def _ids(): for pk in replace_ids: pk = source.schema.sanitise_pks(pk) path = dataset.encode_pks_to_path(pk) - p.stdin.write(f"D {path}\n".encode("utf8")) + proc_for_feature_path(path).stdin.write( + f"D {path}\n".encode("utf8") + ) yield pk src_iterator = source.get_features(_ids(), ignore_missing=True) @@ -295,7 +369,17 @@ def _ids(): feature_blob_iter = dataset.import_iter_feature_blobs( repo, src_iterator, source ) - for i, blob_path in write_blobs_to_stream(p.stdin, feature_blob_iter): + + for i, (feature_path, blob_data) in enumerate(feature_blob_iter): + stream = proc_for_feature_path(feature_path).stdin + stream.write( + f"M 644 inline {feature_path}\ndata {len(blob_data)}\n".encode( + "utf8" + ) + ) + stream.write(blob_data) + stream.write(b"\n") + if i and progress_every and i % progress_every == 0: click.echo(f" {i:,d} features... @{time.monotonic()-t1:.1f}s") @@ -311,48 +395,54 @@ def _ids(): # Meta items - written second as certain importers generate extra metadata as they import features. for x in write_blobs_to_stream( - p.stdin, dataset.import_iter_meta_blobs(repo, source) + procs[0].stdin, dataset.import_iter_meta_blobs(repo, source) ): pass - p.stdin.write(b"\ndone\n") - except BrokenPipeError: - # if git-fast-import dies early, we get an EPIPE here - # we'll deal with it below - pass - else: - p.stdin.close() - p.wait() - if p.returncode != 0: - raise SubprocessError( - f"git-fast-import error! {p.returncode}", exit_code=p.returncode - ) t3 = time.monotonic() if verbosity >= 1: click.echo(f"Closed in {(t3-t2):.0f}s") - if import_branch is not None: - # we created a temp branch for the import above. + if import_branches: + # we created temp branches for the import above. + # each of the branches has _part_ of the import. + # we have to merge the trees together to get a sensible commit. try: - if orig_tree and not allow_empty: - if repo.revparse_single(import_branch).peel(pygit2.Tree) == orig_tree: + trees = [repo.revparse_single(b).peel(pygit2.Tree) for b in import_branches] + builder = RichTreeBuilder(repo, trees[0]) + for t in trees[1:]: + datasets = Datasets(t, SUPPORTED_DATASET_CLASS) + for ds in datasets: + try: + feature_tree = ds.feature_tree + except KeyError: + pass + else: + for subtree in feature_tree: + builder.insert( + f"{ds.path}/{ds.FEATURE_PATH}{subtree.name}", subtree + ) + new_tree = builder.flush() + if not allow_empty: + if new_tree == orig_tree: raise NotFound("No changes to commit", exit_code=NO_CHANGES) - latest_commit_oid = repo.references[import_branch].peel(pygit2.Commit).oid - if orig_branch: - # reset the original branch head to the import branch, so it gets the new commits - if orig_tree: - # repo was non-empty before this, and head was not detached. - # so orig_branch exists already. - # we have to delete and re-create it at the new commit. - repo.references.delete(orig_branch) - repo.references.create(orig_branch, latest_commit_oid) - else: - # head was detached before this. just update head to the new commit, - # so it's still detached. - repo.set_head(latest_commit_oid) + + # use the existing commit details we already imported, but use the new tree + existing_commit = repo.revparse_single(import_branches[0]).peel( + pygit2.Commit + ) + repo.create_commit( + orig_branch or "HEAD", + existing_commit.author, + existing_commit.committer, + existing_commit.message, + new_tree.id, + existing_commit.parent_ids, + ) finally: - # remove the import branch - repo.references.delete(import_branch) + # remove the import branches + for b in import_branches: + repo.references.delete(b) def write_blobs_to_stream(stream, blobs): diff --git a/sno/init.py b/sno/init.py index e947eb211..611dfb7cf 100644 --- a/sno/init.py +++ b/sno/init.py @@ -193,6 +193,12 @@ def convert(self, value, param, ctx): default=True, help="Whether to create a working copy once the import is finished, if no working copy exists yet.", ) +@click.option( + "--num-processes", + default=4, + type=click.INT, + help="How many git-fast-import processes to use", +) def import_( ctx, all_tables, @@ -209,6 +215,7 @@ def import_( allow_empty, max_delta_depth, do_checkout, + num_processes, ): """ Import data into a repository. @@ -327,6 +334,7 @@ def import_( else ReplaceExisting.DONT_REPLACE, replace_ids=replace_ids, allow_empty=allow_empty, + num_processes=num_processes, ) if do_checkout: diff --git a/sno/upgrade/__init__.py b/sno/upgrade/__init__.py index e3cdf86bb..c1d94df74 100644 --- a/sno/upgrade/__init__.py +++ b/sno/upgrade/__init__.py @@ -179,6 +179,7 @@ def _upgrade_commit( # We import every commit onto refs/heads/main, even though not all commits are related - this means # the main branch head will jump all over the place. git-fast-import only allows this with --force. extra_cmd_args=["--force"], + num_processes=1, ) dest_commit = dest_repo.head_commit From da8ea14ff60ac7f08ce58e40fe906adb2c6e6d18 Mon Sep 17 00:00:00 2001 From: Craig de Stigter Date: Fri, 16 Apr 2021 14:32:33 +1200 Subject: [PATCH 2/4] init --num-processes: default to available CPUs --- sno/init.py | 9 +++++++-- sno/utils.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/sno/init.py b/sno/init.py index 611dfb7cf..17763a242 100644 --- a/sno/init.py +++ b/sno/init.py @@ -1,3 +1,4 @@ +import math import os from pathlib import Path @@ -21,6 +22,7 @@ from .pk_generation import PkGeneratingImportSource from .fast_import import fast_import_tables, ReplaceExisting from .repo import SnoRepo, PotentialRepo +from .utils import get_num_available_cores from .working_copy import WorkingCopyStatus @@ -195,9 +197,8 @@ def convert(self, value, param, ctx): ) @click.option( "--num-processes", - default=4, type=click.INT, - help="How many git-fast-import processes to use", + help="How many git-fast-import processes to use. Defaults to the number of available CPU cores.", ) def import_( ctx, @@ -323,6 +324,10 @@ def import_( ImportSource.check_valid(import_sources, param_hint="tables") + if num_processes is None: + num_processes = get_num_available_cores() + # that's a float, but we need an int + num_processes = max(1, int(math.ceil(num_processes))) fast_import_tables( repo, import_sources, diff --git a/sno/utils.py b/sno/utils.py index a19084745..06b9f3c1e 100644 --- a/sno/utils.py +++ b/sno/utils.py @@ -1,5 +1,8 @@ import functools import itertools +import os +import platform +from pathlib import Path def ungenerator(cast_function): @@ -34,3 +37,35 @@ def chunk(iterable, size): if not chunk: return yield chunk + + +def get_num_available_cores(): + """ + Returns the number of available CPU cores (best effort) + * uses cgroup quotas on Linux if available + * uses processor affinity on Windows/Linux if available + * otherwise, uses total number of CPU cores + + The result is a float which may or may not be a round number, and may be less than 1. + """ + if platform.system() == "Linux": + quota_f = Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") + try: + quota = quota_f.read_text() + period = Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text() + except FileNotFoundError: + pass + else: + if quota == -1: + # no quota set + pass + else: + # note: this is a float, and may not be a round number + # (it's possible to allocate half-cores) + return float(quota / period) + try: + return float(len(os.sched_getaffinity(0))) + except AttributeError: + # sched_getaffinity isn't available on some platforms (macOS mostly I think) + # Fallback to total machine CPUs + return float(os.cpu_count()) From c95eff11756b9a91175ae32014d34094a53e354f Mon Sep 17 00:00:00 2001 From: Craig de Stigter Date: Fri, 16 Apr 2021 15:06:50 +1200 Subject: [PATCH 3/4] Review changes --- sno/fast_import.py | 398 ++++++++++++++++++++++++--------------------- sno/utils.py | 7 +- 2 files changed, 218 insertions(+), 187 deletions(-) diff --git a/sno/fast_import.py b/sno/fast_import.py index dc6f39ff5..055824ef4 100644 --- a/sno/fast_import.py +++ b/sno/fast_import.py @@ -123,6 +123,7 @@ def _git_fast_import(repo, *args): cwd=repo.path, stdin=subprocess.PIPE, env=tool_environment(), + bufsize=128 * 1024, ) try: yield p @@ -140,6 +141,40 @@ def _git_fast_import(repo, *args): ) +def fast_import_clear_trees(*, procs, replace_ids, replacing_dataset, source): + """ + Clears out the appropriate trees in each of the fast_import processes, + before importing any actual data over the top. + """ + if replacing_dataset is None: + # nothing to do + return + for i, proc in enumerate(procs): + if replace_ids is None: + # Delete the existing dataset, before we re-import it. + proc.stdin.write(f"D {source.dest_path}\n".encode("utf8")) + else: + # delete and reimport meta/ + proc.stdin.write(f"D {source.dest_path}/.sno-dataset/meta\n".encode("utf8")) + # delete all features not pertaining to this process. + # we also delete the features that *do*, but we do it further down + # so that we don't have to iterate the IDs more than once. + for subtree in range(256): + if subtree % len(procs) != i: + proc.stdin.write( + f"D {source.dest_path}/.sno-dataset/feature/{subtree:02x}\n".encode( + "utf8" + ) + ) + + # We just deleted the legends, but we still need them to reimport + # data efficiently. Copy them from the original dataset. + for x in write_blobs_to_stream( + proc.stdin, replacing_dataset.iter_legend_blob_data() + ): + pass + + def fast_import_tables( repo, sources, @@ -227,210 +262,206 @@ def fast_import_tables( cmd.append("--quiet") orig_commit = repo.head_commit - import_branches = [] + import_refs = [] if verbosity >= 1: click.echo("Starting git-fast-import...") - with ExitStack() as stack: - procs = [] - - # PARALLEL IMPORTING - # To do an import in parallel: - # * we only have one sno process, and one connection to the source. - # * we have multiple git-fast-import backend processes - # * we send all 'meta' blobs (anything that isn't a feature) to process 0 - # * we assign feature blobs to a process based on it's first subtree. - # (all features in tree `datasetname/feature/01` will go to process 1, etc) - # * after the importing is all done, we merge the trees together. - # * there should never be any conflicts in this merge process. - for i in range(num_processes): - if header is None: - # import onto a temp branch. then reset the head branch afterwards. - import_branch = f"refs/heads/{uuid.uuid4()}" - import_branches.append(import_branch) - - # may be None, if head is detached - orig_branch = repo.head_branch - generated_header = generate_header( - repo, sources, message, import_branch - ) - else: - generated_header = header - # this won't work if num_processes > 1 because we'll try and write to - # the same branch multiple times in parallel. - # luckily only upgrade script passes a header in, so there we just use 1 proc. - assert num_processes == 1 - proc = stack.enter_context(_git_fast_import(repo, *cmd)) - procs.append(proc) - if replace_existing != ReplaceExisting.ALL: - generated_header += f"from {orig_commit.oid}\n" - proc.stdin.write(generated_header.encode("utf8")) - - # Write the extra blob that records the repo's version: - for i, blob_path in write_blobs_to_stream(procs[0].stdin, extra_blobs): - if replace_existing != ReplaceExisting.ALL and blob_path in starting_tree: - raise ValueError(f"{blob_path} already exists") - - if num_processes == 1: - - def proc_for_feature_path(path): - return procs[0] - else: + try: + with ExitStack() as stack: + procs = [] + + # PARALLEL IMPORTING + # To do an import in parallel: + # * we only have one sno process, and one connection to the source. + # * we have multiple git-fast-import backend processes + # * we send all 'meta' blobs (anything that isn't a feature) to process 0 + # * we assign feature blobs to a process based on it's first subtree. + # (all features in tree `datasetname/feature/01` will go to process 1, etc) + # * after the importing is all done, we merge the trees together. + # * there should never be any conflicts in this merge process. + for i in range(num_processes): + if header is None: + # import onto a temp branch. then reset the head branch afterwards. + import_ref = f"refs/sno-import/{uuid.uuid4()}" + import_refs.append(import_ref) + + # may be None, if head is detached + orig_branch = repo.head_branch + generated_header = generate_header( + repo, sources, message, import_ref + ) + else: + generated_header = header + # this won't work if num_processes > 1 because we'll try and write to + # the same branch multiple times in parallel. + # luckily only upgrade script passes a header in, so there we just use 1 proc. + assert num_processes == 1 + proc = stack.enter_context(_git_fast_import(repo, *cmd)) + procs.append(proc) + if replace_existing != ReplaceExisting.ALL: + generated_header += f"from {orig_commit.oid}\n" + proc.stdin.write(generated_header.encode("utf8")) + + # Write the extra blob that records the repo's version: + for i, blob_path in write_blobs_to_stream(procs[0].stdin, extra_blobs): + if ( + replace_existing != ReplaceExisting.ALL + and blob_path in starting_tree + ): + raise ValueError(f"{blob_path} already exists") - def proc_for_feature_path(path): - first_subtree = int(path.rsplit("/", 3)[1], 16) - return procs[first_subtree % len(procs)] + if num_processes == 1: - for source in sources: - replacing_dataset = None - if replace_existing == ReplaceExisting.GIVEN: - try: - replacing_dataset = repo.datasets()[source.dest_path] - except KeyError: - # no such dataset; no problem - replacing_dataset = None - - if replacing_dataset is not None: - for i, proc in enumerate(procs): - if replace_ids is None: - # Delete the existing dataset, before we re-import it. - proc.stdin.write(f"D {source.dest_path}\n".encode("utf8")) - else: - # delete and reimport meta/ - proc.stdin.write( - f"D {source.dest_path}/.sno-dataset/meta\n".encode( - "utf8" - ) - ) - # delete all features not pertaining to this process. - # we also delete the features that *do*, but we do it further down - # so that we don't have to iterate the IDs more than once. - for subtree in range(256): - if subtree % num_processes != i: - proc.stdin.write( - f"D {source.dest_path}/.sno-dataset/feature/{subtree:02x}\n".encode( - "utf8" - ) - ) - - # We just deleted the legends, but we still need them to reimport - # data efficiently. Copy them from the original dataset. - for x in write_blobs_to_stream( - proc.stdin, replacing_dataset.iter_legend_blob_data() - ): - pass + def proc_for_feature_path(path): + return procs[0] - dataset = dataset_class(tree=None, path=source.dest_path) + else: - with source: - if limit: - num_rows = min(limit, source.feature_count) - num_rows_text = f"{num_rows:,d} of {source.feature_count:,d}" - else: - num_rows = source.feature_count - num_rows_text = f"{num_rows:,d}" + def proc_for_feature_path(path): + first_subtree = int(path.rsplit("/", 3)[1], 16) + return procs[first_subtree % len(procs)] - if verbosity >= 1: - click.echo( - f"Importing {num_rows_text} features from {source} to {source.dest_path}/ ..." + for source in sources: + replacing_dataset = None + if replace_existing == ReplaceExisting.GIVEN: + try: + replacing_dataset = repo.datasets()[source.dest_path] + except KeyError: + # no such dataset; no problem + replacing_dataset = None + + fast_import_clear_trees( + procs=procs, + replace_ids=replace_ids, + replacing_dataset=replacing_dataset, + source=source, ) - # Features - t1 = time.monotonic() - if replace_ids is not None: - - # As we iterate over IDs, also delete them from the dataset. - # This means we don't have to load the whole list into memory. - def _ids(): - for pk in replace_ids: - pk = source.schema.sanitise_pks(pk) - path = dataset.encode_pks_to_path(pk) - proc_for_feature_path(path).stdin.write( - f"D {path}\n".encode("utf8") - ) - yield pk + dataset = dataset_class(tree=None, path=source.dest_path) - src_iterator = source.get_features(_ids(), ignore_missing=True) - else: - src_iterator = source.features() + with source: + if limit: + num_rows = min(limit, source.feature_count) + num_rows_text = f"{num_rows:,d} of {source.feature_count:,d}" + else: + num_rows = source.feature_count + num_rows_text = f"{num_rows:,d}" - progress_every = None - if verbosity >= 1: - progress_every = max(100, 100_000 // (10 ** (verbosity - 1))) + if verbosity >= 1: + click.echo( + f"Importing {num_rows_text} features from {source} to {source.dest_path}/ ..." + ) - if should_compare_imported_features_against_old_features( - repo, source, replacing_dataset - ): - feature_blob_iter = dataset.import_iter_feature_blobs( - repo, src_iterator, source, replacing_dataset=replacing_dataset - ) - else: - feature_blob_iter = dataset.import_iter_feature_blobs( - repo, src_iterator, source - ) + # Features + t1 = time.monotonic() + if replace_ids is not None: + + # As we iterate over IDs, also delete them from the dataset. + # This means we don't have to load the whole list into memory. + def _ids(): + for pk in replace_ids: + pk = source.schema.sanitise_pks(pk) + path = dataset.encode_pks_to_path(pk) + proc_for_feature_path(path).stdin.write( + f"D {path}\n".encode("utf8") + ) + yield pk - for i, (feature_path, blob_data) in enumerate(feature_blob_iter): - stream = proc_for_feature_path(feature_path).stdin - stream.write( - f"M 644 inline {feature_path}\ndata {len(blob_data)}\n".encode( - "utf8" + src_iterator = source.get_features(_ids(), ignore_missing=True) + else: + src_iterator = source.features() + + progress_every = None + if verbosity >= 1: + progress_every = max(100, 100_000 // (10 ** (verbosity - 1))) + + if should_compare_imported_features_against_old_features( + repo, source, replacing_dataset + ): + feature_blob_iter = dataset.import_iter_feature_blobs( + repo, + src_iterator, + source, + replacing_dataset=replacing_dataset, + ) + else: + feature_blob_iter = dataset.import_iter_feature_blobs( + repo, src_iterator, source ) - ) - stream.write(blob_data) - stream.write(b"\n") - - if i and progress_every and i % progress_every == 0: - click.echo(f" {i:,d} features... @{time.monotonic()-t1:.1f}s") - - if limit is not None and i == (limit - 1): - click.secho(f" Stopping at {limit:,d} features", fg="yellow") - break - t2 = time.monotonic() - if verbosity >= 1: - click.echo(f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s") - click.echo( - f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)" - ) - # Meta items - written second as certain importers generate extra metadata as they import features. - for x in write_blobs_to_stream( - procs[0].stdin, dataset.import_iter_meta_blobs(repo, source) - ): - pass + for i, (feature_path, blob_data) in enumerate(feature_blob_iter): + stream = proc_for_feature_path(feature_path).stdin + stream.write( + f"M 644 inline {feature_path}\ndata {len(blob_data)}\n".encode( + "utf8" + ) + ) + stream.write(blob_data) + stream.write(b"\n") - t3 = time.monotonic() - if verbosity >= 1: - click.echo(f"Closed in {(t3-t2):.0f}s") - - if import_branches: - # we created temp branches for the import above. - # each of the branches has _part_ of the import. - # we have to merge the trees together to get a sensible commit. - try: - trees = [repo.revparse_single(b).peel(pygit2.Tree) for b in import_branches] - builder = RichTreeBuilder(repo, trees[0]) - for t in trees[1:]: - datasets = Datasets(t, SUPPORTED_DATASET_CLASS) - for ds in datasets: - try: - feature_tree = ds.feature_tree - except KeyError: - pass - else: - for subtree in feature_tree: - builder.insert( - f"{ds.path}/{ds.FEATURE_PATH}{subtree.name}", subtree + if i and progress_every and i % progress_every == 0: + click.echo( + f" {i:,d} features... @{time.monotonic()-t1:.1f}s" + ) + + if limit is not None and i == (limit - 1): + click.secho( + f" Stopping at {limit:,d} features", fg="yellow" ) - new_tree = builder.flush() + break + t2 = time.monotonic() + if verbosity >= 1: + click.echo( + f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s" + ) + click.echo( + f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)" + ) + + # Meta items - written second as certain importers generate extra metadata as they import features. + for x in write_blobs_to_stream( + procs[0].stdin, dataset.import_iter_meta_blobs(repo, source) + ): + pass + + t3 = time.monotonic() + if verbosity >= 1: + click.echo(f"Closed in {(t3-t2):.0f}s") + + if import_refs: + # we created temp branches for the import above. + # each of the branches has _part_ of the import. + # we have to merge the trees together to get a sensible commit. + trees = [repo.revparse_single(b).peel(pygit2.Tree) for b in import_refs] + if len(import_refs) > 1: + click.echo(f"Joining {len(import_refs)} parallel-imported trees...") + builder = RichTreeBuilder(repo, trees[0]) + for t in trees[1:]: + datasets = Datasets(t, SUPPORTED_DATASET_CLASS) + for ds in datasets: + try: + feature_tree = ds.feature_tree + except KeyError: + pass + else: + for subtree in feature_tree: + builder.insert( + f"{ds.path}/{ds.FEATURE_PATH}{subtree.name}", + subtree, + ) + new_tree = builder.flush() + t4 = time.monotonic() + click.echo(f"Joined trees in {(t4-t3):.0f}s") + else: + new_tree = trees[0] + t4 = time.monotonic() if not allow_empty: if new_tree == orig_tree: raise NotFound("No changes to commit", exit_code=NO_CHANGES) # use the existing commit details we already imported, but use the new tree - existing_commit = repo.revparse_single(import_branches[0]).peel( - pygit2.Commit - ) + existing_commit = repo.revparse_single(import_refs[0]).peel(pygit2.Commit) repo.create_commit( orig_branch or "HEAD", existing_commit.author, @@ -439,9 +470,10 @@ def _ids(): new_tree.id, existing_commit.parent_ids, ) - finally: - # remove the import branches - for b in import_branches: + finally: + # remove the import branches + for b in import_refs: + if b in repo.references: repo.references.delete(b) diff --git a/sno/utils.py b/sno/utils.py index 06b9f3c1e..413e29ebe 100644 --- a/sno/utils.py +++ b/sno/utils.py @@ -49,10 +49,9 @@ def get_num_available_cores(): The result is a float which may or may not be a round number, and may be less than 1. """ if platform.system() == "Linux": - quota_f = Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") try: - quota = quota_f.read_text() - period = Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text() + quota = float(Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us").read_text()) + period = float(Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text()) except FileNotFoundError: pass else: @@ -62,7 +61,7 @@ def get_num_available_cores(): else: # note: this is a float, and may not be a round number # (it's possible to allocate half-cores) - return float(quota / period) + return quota / period try: return float(len(os.sched_getaffinity(0))) except AttributeError: From 7aa4765839a5de5f28e6bb6639a9c24dd6ceacca Mon Sep 17 00:00:00 2001 From: Craig de Stigter Date: Tue, 20 Apr 2021 11:19:12 +1200 Subject: [PATCH 4/4] `init --num-processes=N` --- CHANGELOG.md | 1 + sno/init.py | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4df630388..bb10dc188 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ _When adding new entries to the changelog, please include issue/PR numbers where ## 0.9.0 (UNRELEASED) +* `import` & `init` are often much faster now because they do imports in parallel subprocesses. Use `--num-processes` to control this behaviour. [#408](https://github.com/koordinates/sno/pull/408) * `diff` now accepts `--only-feature-count`, which produces a feature count for the diff. The feature count can be exact or a fast estimate. * `log` now accepts `--with-feature-count` which adds a feature count to each commit when used with `-o json`. The feature count can be exact or a fast estimate. * `status -o json` now shows which branch you are on, even if that branch doesn't yet have any commits yet. diff --git a/sno/init.py b/sno/init.py index 17763a242..32a83f130 100644 --- a/sno/init.py +++ b/sno/init.py @@ -73,6 +73,12 @@ def convert(self, value, param, ctx): return (line.rstrip("\n") for line in fp) +def get_default_num_processes(): + num_processes = get_num_available_cores() + # that's a float, but we need an int + return max(1, int(math.ceil(num_processes))) + + @click.command("import") @click.pass_context @click.argument("source") @@ -323,11 +329,6 @@ def import_( import_sources.append(import_source) ImportSource.check_valid(import_sources, param_hint="tables") - - if num_processes is None: - num_processes = get_num_available_cores() - # that's a float, but we need an int - num_processes = max(1, int(math.ceil(num_processes))) fast_import_tables( repo, import_sources, @@ -339,7 +340,7 @@ def import_( else ReplaceExisting.DONT_REPLACE, replace_ids=replace_ids, allow_empty=allow_empty, - num_processes=num_processes, + num_processes=num_processes or get_default_num_processes(), ) if do_checkout: @@ -405,6 +406,11 @@ def import_( type=click.INT, help="--depth option to git-fast-import (advanced users only)", ) +@click.option( + "--num-processes", + type=click.INT, + help="How many git-fast-import processes to use. Defaults to the number of available CPU cores.", +) def init( ctx, message, @@ -415,6 +421,7 @@ def init( initial_branch, wc_location, max_delta_depth, + num_processes, ): """ Initialise a new repository and optionally import data. @@ -456,6 +463,7 @@ def init( sources, message=message, max_delta_depth=max_delta_depth, + num_processes=num_processes or get_default_num_processes(), ) head_commit = repo.head_commit if do_checkout and not bare: