Skip to content

Commit

Permalink
import: Parallel fast-import processes
Browse files Browse the repository at this point in the history
This feeds features from the existing single import source connection
to multiple (default=4) git-fast-import processes.
  • Loading branch information
craigds committed Apr 15, 2021
1 parent 985a32b commit f98c06b
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 75 deletions.
240 changes: 165 additions & 75 deletions sno/fast_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import time
import uuid
from contextlib import contextmanager, ExitStack
from enum import Enum, auto

import click
Expand All @@ -15,6 +16,8 @@
SUPPORTED_REPO_VERSION,
SUPPORTED_DATASET_CLASS,
)
from .rich_tree_builder import RichTreeBuilder
from .structure import Datasets
from .timestamps import minutes_to_tz_offset
from .pk_generation import PkGeneratingImportSource

Expand Down Expand Up @@ -113,11 +116,36 @@ def should_compare_imported_features_against_old_features(
return False


@contextmanager
def _git_fast_import(repo, *args):
p = subprocess.Popen(
["git", "fast-import", "--done", *args],
cwd=repo.path,
stdin=subprocess.PIPE,
env=tool_environment(),
)
try:
yield p
p.stdin.write(b"\ndone\n")
except BrokenPipeError:
# if git-fast-import dies early, we get an EPIPE here
# we'll deal with it below
pass
else:
p.stdin.close()
p.wait()
if p.returncode != 0:
raise SubprocessError(
f"git-fast-import error! {p.returncode}", exit_code=p.returncode
)


def fast_import_tables(
repo,
sources,
*,
verbosity=1,
num_processes=4,
header=None,
message=None,
replace_existing=ReplaceExisting.DONT_REPLACE,
Expand All @@ -137,6 +165,7 @@ def fast_import_tables(
0: no progress information is printed to stdout.
1: basic status information
2: full output of `git-fast-import --stats ...`
num_processes: how many import processes to run in parallel
header - the commit-header to supply git-fast-import. Generated if not supplied - see generate_header.
message - the commit-message used when generating the header. Generated if not supplied - see generate_message.
replace_existing - See ReplaceExisting enum
Expand All @@ -147,6 +176,15 @@ def fast_import_tables(
extra_cmd_args - any extra args for the git-fast-import command.
"""

MAX_PROCESSES = 64
if num_processes < 1:
num_processes = 1
elif num_processes > MAX_PROCESSES:
# this is almost certainly a mistake, but also:
# we want to split 256 trees roughly evenly, and if we're trying to split them across
# too many processes it won't be very even.
raise ValueError(f"Can't import with more than {MAX_PROCESSES} processes")

# The tree this repo was at before this function was called.
# May be None (repo is empty)
orig_tree = repo.head_tree
Expand Down Expand Up @@ -181,46 +219,69 @@ def fast_import_tables(
sources = PkGeneratingImportSource.wrap_sources_if_needed(sources, repo)

cmd = [
"git",
"fast-import",
"--done",
f"--max-pack-size={max_pack_size}",
f"--depth={max_delta_depth}",
]
if verbosity < 2:
cmd.append("--quiet")

if header is None:
# import onto a temp branch. then reset the head branch afterwards.
# this allows us to check the result before updating the orig branch.
import_branch = f"refs/heads/{uuid.uuid4()}"

# may be None, if head is detached
orig_branch = repo.head_branch
header = generate_header(repo, sources, message, import_branch)
else:
import_branch = None
orig_commit = repo.head_commit
import_branches = []

if verbosity >= 1:
click.echo("Starting git-fast-import...")

p = subprocess.Popen(
[*cmd, *extra_cmd_args],
cwd=repo.path,
stdin=subprocess.PIPE,
env=tool_environment(),
)
try:
if replace_existing != ReplaceExisting.ALL:
header += f"from {orig_commit.oid}\n"
p.stdin.write(header.encode("utf8"))
with ExitStack() as stack:
procs = []

# PARALLEL IMPORTING
# To do an import in parallel:
# * we only have one sno process, and one connection to the source.
# * we have multiple git-fast-import backend processes
# * we send all 'meta' blobs (anything that isn't a feature) to process 0
# * we assign feature blobs to a process based on it's first subtree.
# (all features in tree `datasetname/feature/01` will go to process 1, etc)
# * after the importing is all done, we merge the trees together.
# * there should never be any conflicts in this merge process.
for i in range(num_processes):
if header is None:
# import onto a temp branch. then reset the head branch afterwards.
import_branch = f"refs/heads/{uuid.uuid4()}"
import_branches.append(import_branch)

# may be None, if head is detached
orig_branch = repo.head_branch
generated_header = generate_header(
repo, sources, message, import_branch
)
else:
generated_header = header
# this won't work if num_processes > 1 because we'll try and write to
# the same branch multiple times in parallel.
# luckily only upgrade script passes a header in, so there we just use 1 proc.
assert num_processes == 1
proc = stack.enter_context(_git_fast_import(repo, *cmd))
procs.append(proc)
if replace_existing != ReplaceExisting.ALL:
generated_header += f"from {orig_commit.oid}\n"
proc.stdin.write(generated_header.encode("utf8"))

# Write the extra blob that records the repo's version:
for i, blob_path in write_blobs_to_stream(p.stdin, extra_blobs):
for i, blob_path in write_blobs_to_stream(procs[0].stdin, extra_blobs):
if replace_existing != ReplaceExisting.ALL and blob_path in starting_tree:
raise ValueError(f"{blob_path} already exists")

if num_processes == 1:

def proc_for_feature_path(path):
return procs[0]

else:

def proc_for_feature_path(path):
first_subtree = int(path.rsplit("/", 3)[1], 16)
return procs[first_subtree % len(procs)]

for source in sources:
replacing_dataset = None
if replace_existing == ReplaceExisting.GIVEN:
Expand All @@ -231,23 +292,34 @@ def fast_import_tables(
replacing_dataset = None

if replacing_dataset is not None:
if replace_ids is None:
# Delete the existing dataset, before we re-import it.
p.stdin.write(f"D {source.dest_path}\n".encode("utf8"))
else:
# delete and reimport meta/
# we also delete the specified features, but we do it further down
# so that we don't have to iterate the IDs more than once.
p.stdin.write(
f"D {source.dest_path}/.sno-dataset/meta\n".encode("utf8")
)

# We just deleted the legends, but we still need them to reimport
# data efficiently. Copy them from the original dataset.
for x in write_blobs_to_stream(
p.stdin, replacing_dataset.iter_legend_blob_data()
):
pass
for i, proc in enumerate(procs):
if replace_ids is None:
# Delete the existing dataset, before we re-import it.
proc.stdin.write(f"D {source.dest_path}\n".encode("utf8"))
else:
# delete and reimport meta/
proc.stdin.write(
f"D {source.dest_path}/.sno-dataset/meta\n".encode(
"utf8"
)
)
# delete all features not pertaining to this process.
# we also delete the features that *do*, but we do it further down
# so that we don't have to iterate the IDs more than once.
for subtree in range(256):
if subtree % num_processes != i:
proc.stdin.write(
f"D {source.dest_path}/.sno-dataset/feature/{subtree:02x}\n".encode(
"utf8"
)
)

# We just deleted the legends, but we still need them to reimport
# data efficiently. Copy them from the original dataset.
for x in write_blobs_to_stream(
proc.stdin, replacing_dataset.iter_legend_blob_data()
):
pass

dataset = dataset_class(tree=None, path=source.dest_path)

Expand All @@ -274,7 +346,9 @@ def _ids():
for pk in replace_ids:
pk = source.schema.sanitise_pks(pk)
path = dataset.encode_pks_to_path(pk)
p.stdin.write(f"D {path}\n".encode("utf8"))
proc_for_feature_path(path).stdin.write(
f"D {path}\n".encode("utf8")
)
yield pk

src_iterator = source.get_features(_ids(), ignore_missing=True)
Expand All @@ -295,7 +369,17 @@ def _ids():
feature_blob_iter = dataset.import_iter_feature_blobs(
repo, src_iterator, source
)
for i, blob_path in write_blobs_to_stream(p.stdin, feature_blob_iter):

for i, (feature_path, blob_data) in enumerate(feature_blob_iter):
stream = proc_for_feature_path(feature_path).stdin
stream.write(
f"M 644 inline {feature_path}\ndata {len(blob_data)}\n".encode(
"utf8"
)
)
stream.write(blob_data)
stream.write(b"\n")

if i and progress_every and i % progress_every == 0:
click.echo(f" {i:,d} features... @{time.monotonic()-t1:.1f}s")

Expand All @@ -311,48 +395,54 @@ def _ids():

# Meta items - written second as certain importers generate extra metadata as they import features.
for x in write_blobs_to_stream(
p.stdin, dataset.import_iter_meta_blobs(repo, source)
procs[0].stdin, dataset.import_iter_meta_blobs(repo, source)
):
pass

p.stdin.write(b"\ndone\n")
except BrokenPipeError:
# if git-fast-import dies early, we get an EPIPE here
# we'll deal with it below
pass
else:
p.stdin.close()
p.wait()
if p.returncode != 0:
raise SubprocessError(
f"git-fast-import error! {p.returncode}", exit_code=p.returncode
)
t3 = time.monotonic()
if verbosity >= 1:
click.echo(f"Closed in {(t3-t2):.0f}s")

if import_branch is not None:
# we created a temp branch for the import above.
if import_branches:
# we created temp branches for the import above.
# each of the branches has _part_ of the import.
# we have to merge the trees together to get a sensible commit.
try:
if orig_tree and not allow_empty:
if repo.revparse_single(import_branch).peel(pygit2.Tree) == orig_tree:
trees = [repo.revparse_single(b).peel(pygit2.Tree) for b in import_branches]
builder = RichTreeBuilder(repo, trees[0])
for t in trees[1:]:
datasets = Datasets(t, SUPPORTED_DATASET_CLASS)
for ds in datasets:
try:
feature_tree = ds.feature_tree
except KeyError:
pass
else:
for subtree in feature_tree:
builder.insert(
f"{ds.path}/{ds.FEATURE_PATH}{subtree.name}", subtree
)
new_tree = builder.flush()
if not allow_empty:
if new_tree == orig_tree:
raise NotFound("No changes to commit", exit_code=NO_CHANGES)
latest_commit_oid = repo.references[import_branch].peel(pygit2.Commit).oid
if orig_branch:
# reset the original branch head to the import branch, so it gets the new commits
if orig_tree:
# repo was non-empty before this, and head was not detached.
# so orig_branch exists already.
# we have to delete and re-create it at the new commit.
repo.references.delete(orig_branch)
repo.references.create(orig_branch, latest_commit_oid)
else:
# head was detached before this. just update head to the new commit,
# so it's still detached.
repo.set_head(latest_commit_oid)

# use the existing commit details we already imported, but use the new tree
existing_commit = repo.revparse_single(import_branches[0]).peel(
pygit2.Commit
)
repo.create_commit(
orig_branch or "HEAD",
existing_commit.author,
existing_commit.committer,
existing_commit.message,
new_tree.id,
existing_commit.parent_ids,
)
finally:
# remove the import branch
repo.references.delete(import_branch)
# remove the import branches
for b in import_branches:
repo.references.delete(b)


def write_blobs_to_stream(stream, blobs):
Expand Down
8 changes: 8 additions & 0 deletions sno/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ def convert(self, value, param, ctx):
default=True,
help="Whether to create a working copy once the import is finished, if no working copy exists yet.",
)
@click.option(
"--num-processes",
default=4,
type=click.INT,
help="How many git-fast-import processes to use",
)
def import_(
ctx,
all_tables,
Expand All @@ -209,6 +215,7 @@ def import_(
allow_empty,
max_delta_depth,
do_checkout,
num_processes,
):
"""
Import data into a repository.
Expand Down Expand Up @@ -327,6 +334,7 @@ def import_(
else ReplaceExisting.DONT_REPLACE,
replace_ids=replace_ids,
allow_empty=allow_empty,
num_processes=num_processes,
)

if do_checkout:
Expand Down
1 change: 1 addition & 0 deletions sno/upgrade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def _upgrade_commit(
# We import every commit onto refs/heads/main, even though not all commits are related - this means
# the main branch head will jump all over the place. git-fast-import only allows this with --force.
extra_cmd_args=["--force"],
num_processes=1,
)

dest_commit = dest_repo.head_commit
Expand Down

0 comments on commit f98c06b

Please sign in to comment.