From 349e6c9ef65a454006a4c34e8f636ba3166facad Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 6 Nov 2024 20:40:40 -0500 Subject: [PATCH] support multiple commands, piped together --- dvc_utils/main.py | 116 +++++++++++++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 33 deletions(-) diff --git a/dvc_utils/main.py b/dvc_utils/main.py index 3522fc9..68642d6 100644 --- a/dvc_utils/main.py +++ b/dvc_utils/main.py @@ -1,14 +1,11 @@ from functools import cache from os import environ as env, getcwd - -from typing import Optional, Tuple - -import shlex from os.path import join, relpath +import shlex +from subprocess import Popen, PIPE +from typing import Optional, Tuple from click import option, argument, group -from subprocess import Popen - import click import yaml from utz import process, singleton, err @@ -52,7 +49,7 @@ def dvc_cache_dir(log: bool = False) -> str: def dvc_md5(git_ref: str, dvc_path: str, log: bool = False) -> str: dir_path = get_dir_path() dir_path = '' if dir_path == '.' else f'{dir_path}/' - dvc_spec = process.output('git', 'show', f'{git_ref}:{dir_path}{dvc_path}', log=log) + dvc_spec = process.output('git', 'show', f'{git_ref}:{dir_path}{dvc_path}', log=err if log else None) dvc_obj = yaml.safe_load(dvc_spec) out = singleton(dvc_obj['outs'], dedupe=False) md5 = out['md5'] @@ -73,15 +70,28 @@ def dvc_cache_path(ref: str, dvc_path: Optional[str] = None, log: bool = False) def diff_cmds( - cmd1: str, - cmd2: str, + cmds1: list[str], + cmds2: list[str], verbose: bool = False, color: bool = False, unified: int | None = None, ignore_whitespace: bool = False, **kwargs, ): - """Run two commands and diff their output. + """Run two sequences of piped commands and diff their output. + + Args: + cmds1: First sequence of commands to pipe together + cmds2: Second sequence of commands to pipe together + verbose: Whether to print commands being executed + color: Whether to show colored diff output + unified: Number of unified context lines, or None + ignore_whitespace: Whether to ignore whitespace changes + **kwargs: Additional arguments passed to subprocess.Popen + + Each command sequence will be piped together before being compared. + For example, if cmds1 = ['cat foo.txt', 'sort'], the function will + execute 'cat foo.txt | sort' before comparing with cmds2's output. Adapted from https://stackoverflow.com/a/28840955""" with named_pipes(n=2) as pipes: @@ -96,11 +106,45 @@ def diff_cmds( ] diff = Popen(diff_cmd) processes = [] - for path, cmd in ((pipe1, cmd1), (pipe2, cmd2)): - with open(path, 'wb', 0) as pipe: - if verbose: - err(f"Running: {cmd}") - processes.append(Popen(cmd, stdout=pipe, close_fds=True, **kwargs)) + + for pipe, cmds in ((pipe1, cmds1), (pipe2, cmds2)): + if verbose: + err(f"Running pipeline: {' | '.join(cmds)}") + + # Create the pipeline of processes + prev_process = None + for i, cmd in enumerate(cmds): + is_last = i + 1 == len(cmds) + + # For the first process, take input from the original source + stdin = None if prev_process is None else prev_process.stdout + + # For the last process, output to the named pipe + if is_last: + with open(pipe, 'wb', 0) as pipe_fd: + proc = Popen( + cmd, + stdin=stdin, + stdout=pipe_fd, + close_fds=True, + **kwargs + ) + # For intermediate processes, output to a pipe + else: + proc = Popen( + cmd, + stdin=stdin, + stdout=PIPE, + close_fds=True, + **kwargs + ) + + if prev_process is not None: + prev_process.stdout.close() + + processes.append(proc) + prev_process = proc + for p in [diff] + processes: p.wait() @@ -112,7 +156,8 @@ def diff_cmds( @option('-U', '--unified', type=int, help='Number of lines of context to show (passes through to `diff`)') @option('-v', '--verbose', is_flag=True, help="Log intermediate commands to stderr") @option('-w', '--ignore-whitespace', is_flag=True, help="Ignore whitespace differences (pass `-w` to `diff`)") -@argument('args', metavar='[cmd...] ', nargs=-1) +@option('-x', '--exec-cmd', 'exec_cmds', multiple=True, help='Command(s) to execute before diffing; alternate syntax to passing commands as positional arguments') +@argument('args', metavar='[exec_cmd...] ', nargs=-1) def dvc_utils_diff( color: bool, refspec: str | None, @@ -120,6 +165,7 @@ def dvc_utils_diff( unified: int | None, verbose: bool, ignore_whitespace: bool, + exec_cmds: Tuple[str, ...], args: Tuple[str, ...], ): """Diff a file at two commits (or one commit vs. current worktree), optionally passing both through `cmd` first @@ -134,14 +180,8 @@ def dvc_utils_diff( raise click.UsageError('Must specify [cmd...] ') shell = not no_shell - if len(args) == 2: - cmd, path = args - cmd = shlex.split(cmd) - elif len(args) == 1: - cmd = None - path, = args - else: - raise click.UsageError('Maximum 2 positional args: [cmd] ') + *cmds, path = args + cmds = list(exec_cmds) + cmds path, dvc_path = dvc_paths(path) @@ -158,17 +198,27 @@ def dvc_utils_diff( before_path = dvc_cache_path(before, dvc_path, log=log) after_path = path if after is None else dvc_cache_path(after, dvc_path, log=log) - if cmd: - def args(path: str): - arr = cmd + [path] - return shlex.join(arr) if shell else arr + if cmds: + cmd, *sub_cmds = cmds + if not shell: + sub_cmds = [ shlex.split(c) for c in sub_cmds ] + before_cmds = [ + shlex.split(f'{cmd} {before_path}'), + *sub_cmds, + ] + after_cmds = [ + shlex.split(f'{cmd} {after_path}'), + *sub_cmds, + ] + shell_kwargs = {} + else: + before_cmds = [ f'{cmd} {before_path}', *sub_cmds ] + after_cmds = [ f'{cmd} {after_path}', *sub_cmds ] + shell_kwargs = dict(shell=shell) - shell_kwargs = dict(shell=shell) if shell else {} - before_cmd = args(before_path) - after_cmd = args(after_path) diff_cmds( - before_cmd, - after_cmd, + before_cmds, + after_cmds, verbose=verbose, color=color, unified=unified,