diff --git a/precli/cli/main.py b/precli/cli/main.py index 272b61db..8fa37965 100644 --- a/precli/cli/main.py +++ b/precli/cli/main.py @@ -6,12 +6,9 @@ import pathlib import sys import tempfile -import zipfile from argparse import ArgumentParser from datetime import datetime from importlib import metadata -from urllib.parse import urljoin -from urllib.parse import urlparse if sys.version_info >= (3, 11): import tomllib @@ -19,13 +16,7 @@ import tomli as tomllib import requests -from ignorelib import IgnoreFilterManager from rich.console import Console -from rich.progress import BarColumn -from rich.progress import DownloadColumn -from rich.progress import MofNCompleteColumn -from rich.progress import Progress -from rich.progress import TextColumn import precli from precli.core import loader @@ -183,7 +174,11 @@ def setup_arg_parser(): sys.exit(2) for target in args.targets: - if target != "-" and not pathlib.Path(target).exists(): + if ( + target != "-" + and not target.startswith("https://") + and not pathlib.Path(target).exists() + ): parser.error( f"argument targets: can't open '{target}': [Errno 2] No such " f"file or directory: '{target}'" @@ -209,142 +204,22 @@ def find_config(targets: list[str]) -> dict: return {} -def get_owner_repo(repo_url: str): - # Extract owner and repository name from the URL - path = urlparse(repo_url).path.lstrip("/").split("/") - return path[0], path[1] - - -def get_default_branch(owner: str, repo: str): - api_url = f"https://api.github.com/repos/{owner}/{repo}" - response = requests.get(api_url, timeout=5) - response.raise_for_status() - return response.json().get("default_branch") - - -def extract_github_repo(owner: str, repo: str, branch: str): - base_url = "https://api.github.com/repos" - api_url = f"{base_url}/{owner}/{repo}/zipball/{branch}" - temp_dir = tempfile.mkdtemp() - zip_path = os.path.join(temp_dir, f"{repo}.zip") - - progress = Progress( - TextColumn("[progress.description]{task.description}"), - BarColumn(), - DownloadColumn(), - ) - with progress: - with requests.get(api_url, stream=True, timeout=5) as r: - r.raise_for_status() - - # TODO: ideally set total to file size, but the Content-Length is - # not reliably sent in the response header. - task_id = progress.add_task("Downloading...", total=None) - chunk_size = 8192 - with open(zip_path, "wb") as f: - for chunk in r.iter_content(chunk_size=chunk_size): - f.write(chunk) - progress.update(task_id, advance=chunk_size) - - progress = Progress( - TextColumn("[progress.description]{task.description}"), - BarColumn(), - MofNCompleteColumn(), - ) - with progress: - with zipfile.ZipFile(zip_path, "r") as zip_ref: - name_list = zip_ref.namelist() - for name in progress.track(name_list, description="Extracting..."): - zip_ref.extract(name, temp_dir) - - os.remove(zip_path) - - for path in os.listdir(temp_dir): - if path.startswith(f"{owner}-{repo}-"): - temp_dir = os.path.join(temp_dir, path) - - return temp_dir - - -def file_to_url(owner, repo, branch, target, root, file): - target_len = len(target) - prefix = root[target_len:].lstrip("/") - urlpath = f"{owner}/{repo}/blob/{branch}" - rel_path = "/".join([urlpath, prefix, file]) - return urljoin(GITHUB_URL, rel_path) - - -def discover_files(targets: list[str], recursive: bool): - FILE_EXTS = (".go", ".java", ".py", ".pyw") +def discover_files(targets: list[str], recursive: bool) -> list[Artifact]: artifacts = [] for target in targets: if target.startswith(GITHUB_URL): - owner, repo = get_owner_repo(target) - if repo: - try: - branch = get_default_branch(owner, repo) - target = extract_github_repo(owner, repo, branch) - except requests.exceptions.ConnectionError: - owner = None - repo = None - else: - owner = None - repo = None - - if os.path.isdir(target): - gitignore_mgr = IgnoreFilterManager.build( - target, - global_ignore_file_paths=[ - os.path.join(".git", "info", "exclude"), - os.path.expanduser( - os.path.join("~", ".config", "git", "ignore") - ), - ], - global_patterns=[".git"], - ignore_file_name=".gitignore", + target_ext = loader.load_extension( + group="precli.targets", name="github" ) - preignore_mgr = IgnoreFilterManager.build( - target, - global_ignore_file_paths=[], - global_patterns=[], - ignore_file_name=".preignore", + targeter = target_ext() + else: + target_ext = loader.load_extension( + group="precli.targets", name="file" ) + targeter = target_ext() - if recursive is True: - for root, _, files in gitignore_mgr.walk(): - for file in files: - path = os.path.join(root, file) - file_path = file if os.path.isabs(path) else path - - if ( - not preignore_mgr.is_ignored(file_path) - and pathlib.Path(path).suffix in FILE_EXTS - ): - if repo: - uri = file_to_url( - owner, repo, branch, target, root, file - ) - artifact = Artifact(path, uri) - else: - artifact = Artifact(path) - artifacts.append(artifact) - else: - files = os.listdir(path=target) - for file in files: - if ( - not ( - gitignore_mgr.is_ignored(file) - or preignore_mgr.is_ignored(file) - ) - and pathlib.Path(file).suffix in FILE_EXTS - ): - artifact = Artifact(os.path.join(target, file)) - artifacts.append(artifact) - else: - if pathlib.Path(target).suffix in FILE_EXTS or target == "-": - artifact = Artifact(target) - artifacts.append(artifact) + artifacts.extend(targeter.discover(target, recursive)) return artifacts diff --git a/precli/core/loader.py b/precli/core/loader.py index bb7abc7b..31af3fd3 100644 --- a/precli/core/loader.py +++ b/precli/core/loader.py @@ -1,9 +1,11 @@ # Copyright 2024 Secure Sauce LLC # SPDX-License-Identifier: BUSL-1.1 import sys +from functools import cache from importlib.metadata import entry_points +@cache def load_extension(group: str, name: str = ""): if not name: extensions = {} diff --git a/precli/targets/__init__.py b/precli/targets/__init__.py new file mode 100644 index 00000000..ba215837 --- /dev/null +++ b/precli/targets/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2024 Secure Sauce LLC +from abc import ABC +from abc import abstractmethod + +from precli.core.artifact import Artifact + + +class Target(ABC): + def __init__(self): + self.FILE_EXTS = (".go", ".java", ".py", ".pyw") + + @abstractmethod + def discover(self, target: str, recursive: bool) -> list[Artifact]: + pass diff --git a/precli/targets/file.py b/precli/targets/file.py new file mode 100644 index 00000000..1e8a3238 --- /dev/null +++ b/precli/targets/file.py @@ -0,0 +1,60 @@ +# Copyright 2024 Secure Sauce LLC +import os +import pathlib + +from ignorelib import IgnoreFilterManager + +from precli.core.artifact import Artifact +from precli.targets import Target + + +class File(Target): + def discover(self, target: str, recursive: bool) -> list[Artifact]: + artifacts = [] + + if os.path.isdir(target): + gitignore_mgr = IgnoreFilterManager.build( + target, + global_ignore_file_paths=[ + os.path.join(".git", "info", "exclude"), + os.path.expanduser( + os.path.join("~", ".config", "git", "ignore") + ), + ], + global_patterns=[".git"], + ignore_file_name=".gitignore", + ) + preignore_mgr = IgnoreFilterManager.build( + target, + global_ignore_file_paths=[], + global_patterns=[], + ignore_file_name=".preignore", + ) + + if recursive is True: + for root, _, files in gitignore_mgr.walk(): + for file in files: + path = os.path.join(root, file) + file_path = file if os.path.isabs(path) else path + + if ( + not preignore_mgr.is_ignored(file_path) + and pathlib.Path(path).suffix in self.FILE_EXTS + ): + artifacts.append(Artifact(path)) + else: + files = os.listdir(path=target) + for file in files: + if ( + not ( + gitignore_mgr.is_ignored(file) + or preignore_mgr.is_ignored(file) + ) + and pathlib.Path(file).suffix in self.FILE_EXTS + ): + artifacts.append(Artifact(os.path.join(target, file))) + else: + if pathlib.Path(target).suffix in self.FILE_EXTS or target == "-": + artifacts.append(Artifact(target)) + + return artifacts diff --git a/precli/targets/github.py b/precli/targets/github.py new file mode 100644 index 00000000..df729566 --- /dev/null +++ b/precli/targets/github.py @@ -0,0 +1,156 @@ +# Copyright 2024 Secure Sauce LLC +import os +import pathlib +import tempfile +import zipfile +from urllib.parse import urljoin +from urllib.parse import urlparse + +import requests +from ignorelib import IgnoreFilterManager +from rich.progress import BarColumn +from rich.progress import DownloadColumn +from rich.progress import MofNCompleteColumn +from rich.progress import Progress +from rich.progress import TextColumn + +from precli.core.artifact import Artifact +from precli.targets import Target + + +GITHUB_API = "https://api.github.com" +GITHUB_URL = "https://github.com" + + +class GitHub(Target): + def get_owner_repo(self, repo_url: str) -> tuple[str, str]: + # Extract owner and repository name from the URL + path = urlparse(repo_url).path.lstrip("/").split("/") + return path[0], path[1] + + def get_default_branch(self, owner: str, repo: str) -> str: + api_url = f"{GITHUB_API}/repos/{owner}/{repo}" + response = requests.get(api_url, timeout=5) + response.raise_for_status() + return response.json().get("default_branch") + + def extract_github_repo(self, owner: str, repo: str, branch: str) -> str: + api_url = f"{GITHUB_API}/repos/{owner}/{repo}/zipball/{branch}" + temp_dir = tempfile.mkdtemp() + zip_path = os.path.join(temp_dir, f"{repo}.zip") + + progress = Progress( + TextColumn("[progress.description]{task.description}"), + BarColumn(), + DownloadColumn(), + ) + with progress: + with requests.get(api_url, stream=True, timeout=5) as r: + r.raise_for_status() + + # TODO: ideally set total to file size, but the Content-Length + # is not reliably sent in the response header. + task_id = progress.add_task("Downloading...", total=None) + chunk_size = 8192 + with open(zip_path, "wb") as f: + for chunk in r.iter_content(chunk_size=chunk_size): + f.write(chunk) + progress.update(task_id, advance=chunk_size) + + progress = Progress( + TextColumn("[progress.description]{task.description}"), + BarColumn(), + MofNCompleteColumn(), + ) + with progress: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + name_list = zip_ref.namelist() + for name in progress.track( + name_list, description="Extracting..." + ): + zip_ref.extract(name, temp_dir) + + os.remove(zip_path) + + for path in os.listdir(temp_dir): + if path.startswith(f"{owner}-{repo}-"): + temp_dir = os.path.join(temp_dir, path) + + return temp_dir + + def file_to_url( + self, + owner: str, + repo: str, + branch: str, + target: str, + root: str, + file: str, + ) -> str: + target_len = len(target) + prefix = root[target_len:].lstrip("/") + urlpath = f"{owner}/{repo}/blob/{branch}" + rel_path = "/".join([urlpath, prefix, file]) + return urljoin(GITHUB_URL, rel_path) + + def discover(self, target: str, recursive: bool) -> list[Artifact]: + artifacts = [] + + owner, repo = self.get_owner_repo(target) + if repo: + try: + branch = self.get_default_branch(owner, repo) + target = self.extract_github_repo(owner, repo, branch) + except requests.exceptions.ConnectionError: + owner = None + repo = None + + if os.path.isdir(target): + gitignore_mgr = IgnoreFilterManager.build( + target, + global_ignore_file_paths=[ + os.path.join(".git", "info", "exclude"), + os.path.expanduser( + os.path.join("~", ".config", "git", "ignore") + ), + ], + global_patterns=[".git"], + ignore_file_name=".gitignore", + ) + preignore_mgr = IgnoreFilterManager.build( + target, + global_ignore_file_paths=[], + global_patterns=[], + ignore_file_name=".preignore", + ) + + if recursive is True: + for root, _, files in gitignore_mgr.walk(): + for file in files: + path = os.path.join(root, file) + file_path = file if os.path.isabs(path) else path + + if ( + not preignore_mgr.is_ignored(file_path) + and pathlib.Path(path).suffix in self.FILE_EXTS + ): + uri = self.file_to_url( + owner, repo, branch, target, root, file + ) + artifacts.append(Artifact(path, uri)) + else: + files = os.listdir(path=target) + for file in files: + if ( + not ( + gitignore_mgr.is_ignored(file) + or preignore_mgr.is_ignored(file) + ) + and pathlib.Path(file).suffix in self.FILE_EXTS + ): + artifacts.append(Artifact(os.path.join(target, file))) + else: + if pathlib.Path(target).suffix in self.FILE_EXTS: + artifacts.append(Artifact(target)) + + return artifacts diff --git a/setup.cfg b/setup.cfg index 4ef252bb..750b1756 100644 --- a/setup.cfg +++ b/setup.cfg @@ -234,3 +234,10 @@ precli.rules.python = # precli/rules/python/stdlib/ssl_no_timeout.py PY046 = precli.rules.python.stdlib.ssl_no_timeout:SslNoTimeout + +precli.targets = + # precli/targets/file.py + file = precli.targets.file:File + + # precli/targets/github.py + github = precli.targets.github:GitHub