Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the github target logic into an extension #675

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 14 additions & 139 deletions precli/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,17 @@
import pathlib
import sys
import tempfile
import zipfile
from argparse import ArgumentParser
from datetime import datetime
from importlib import metadata
from urllib.parse import urljoin
from urllib.parse import urlparse

if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib

import requests
from ignorelib import IgnoreFilterManager
from rich.console import Console
from rich.progress import BarColumn
from rich.progress import DownloadColumn
from rich.progress import MofNCompleteColumn
from rich.progress import Progress
from rich.progress import TextColumn

import precli
from precli.core import loader
Expand Down Expand Up @@ -183,7 +174,11 @@ def setup_arg_parser():
sys.exit(2)

for target in args.targets:
if target != "-" and not pathlib.Path(target).exists():
if (
target != "-"
and not target.startswith("https://")
and not pathlib.Path(target).exists()
):
parser.error(
f"argument targets: can't open '{target}': [Errno 2] No such "
f"file or directory: '{target}'"
Expand All @@ -209,142 +204,22 @@ def find_config(targets: list[str]) -> dict:
return {}


def get_owner_repo(repo_url: str):
# Extract owner and repository name from the URL
path = urlparse(repo_url).path.lstrip("/").split("/")
return path[0], path[1]


def get_default_branch(owner: str, repo: str):
api_url = f"https://api.github.com/repos/{owner}/{repo}"
response = requests.get(api_url, timeout=5)
response.raise_for_status()
return response.json().get("default_branch")


def extract_github_repo(owner: str, repo: str, branch: str):
base_url = "https://api.github.com/repos"
api_url = f"{base_url}/{owner}/{repo}/zipball/{branch}"
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(temp_dir, f"{repo}.zip")

progress = Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
)
with progress:
with requests.get(api_url, stream=True, timeout=5) as r:
r.raise_for_status()

# TODO: ideally set total to file size, but the Content-Length is
# not reliably sent in the response header.
task_id = progress.add_task("Downloading...", total=None)
chunk_size = 8192
with open(zip_path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk)
progress.update(task_id, advance=chunk_size)

progress = Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
)
with progress:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
name_list = zip_ref.namelist()
for name in progress.track(name_list, description="Extracting..."):
zip_ref.extract(name, temp_dir)

os.remove(zip_path)

for path in os.listdir(temp_dir):
if path.startswith(f"{owner}-{repo}-"):
temp_dir = os.path.join(temp_dir, path)

return temp_dir


def file_to_url(owner, repo, branch, target, root, file):
target_len = len(target)
prefix = root[target_len:].lstrip("/")
urlpath = f"{owner}/{repo}/blob/{branch}"
rel_path = "/".join([urlpath, prefix, file])
return urljoin(GITHUB_URL, rel_path)


def discover_files(targets: list[str], recursive: bool):
FILE_EXTS = (".go", ".java", ".py", ".pyw")
def discover_files(targets: list[str], recursive: bool) -> list[Artifact]:
artifacts = []

for target in targets:
if target.startswith(GITHUB_URL):
owner, repo = get_owner_repo(target)
if repo:
try:
branch = get_default_branch(owner, repo)
target = extract_github_repo(owner, repo, branch)
except requests.exceptions.ConnectionError:
owner = None
repo = None
else:
owner = None
repo = None

if os.path.isdir(target):
gitignore_mgr = IgnoreFilterManager.build(
target,
global_ignore_file_paths=[
os.path.join(".git", "info", "exclude"),
os.path.expanduser(
os.path.join("~", ".config", "git", "ignore")
),
],
global_patterns=[".git"],
ignore_file_name=".gitignore",
target_ext = loader.load_extension(
group="precli.targets", name="github"
)
preignore_mgr = IgnoreFilterManager.build(
target,
global_ignore_file_paths=[],
global_patterns=[],
ignore_file_name=".preignore",
targeter = target_ext()
else:
target_ext = loader.load_extension(
group="precli.targets", name="file"
)
targeter = target_ext()

if recursive is True:
for root, _, files in gitignore_mgr.walk():
for file in files:
path = os.path.join(root, file)
file_path = file if os.path.isabs(path) else path

if (
not preignore_mgr.is_ignored(file_path)
and pathlib.Path(path).suffix in FILE_EXTS
):
if repo:
uri = file_to_url(
owner, repo, branch, target, root, file
)
artifact = Artifact(path, uri)
else:
artifact = Artifact(path)
artifacts.append(artifact)
else:
files = os.listdir(path=target)
for file in files:
if (
not (
gitignore_mgr.is_ignored(file)
or preignore_mgr.is_ignored(file)
)
and pathlib.Path(file).suffix in FILE_EXTS
):
artifact = Artifact(os.path.join(target, file))
artifacts.append(artifact)
else:
if pathlib.Path(target).suffix in FILE_EXTS or target == "-":
artifact = Artifact(target)
artifacts.append(artifact)
artifacts.extend(targeter.discover(target, recursive))

return artifacts

Expand Down
2 changes: 2 additions & 0 deletions precli/core/loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright 2024 Secure Sauce LLC
# SPDX-License-Identifier: BUSL-1.1
import sys
from functools import cache
from importlib.metadata import entry_points


@cache
def load_extension(group: str, name: str = ""):
if not name:
extensions = {}
Expand Down
14 changes: 14 additions & 0 deletions precli/targets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2024 Secure Sauce LLC
from abc import ABC
from abc import abstractmethod

from precli.core.artifact import Artifact


class Target(ABC):
def __init__(self):
self.FILE_EXTS = (".go", ".java", ".py", ".pyw")

@abstractmethod
def discover(self, target: str, recursive: bool) -> list[Artifact]:
pass
60 changes: 60 additions & 0 deletions precli/targets/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2024 Secure Sauce LLC
import os
import pathlib

from ignorelib import IgnoreFilterManager

from precli.core.artifact import Artifact
from precli.targets import Target


class File(Target):
def discover(self, target: str, recursive: bool) -> list[Artifact]:
artifacts = []

if os.path.isdir(target):
gitignore_mgr = IgnoreFilterManager.build(
target,
global_ignore_file_paths=[
os.path.join(".git", "info", "exclude"),
os.path.expanduser(
os.path.join("~", ".config", "git", "ignore")
),
],
global_patterns=[".git"],
ignore_file_name=".gitignore",
)
preignore_mgr = IgnoreFilterManager.build(
target,
global_ignore_file_paths=[],
global_patterns=[],
ignore_file_name=".preignore",
)

if recursive is True:
for root, _, files in gitignore_mgr.walk():
for file in files:
path = os.path.join(root, file)
file_path = file if os.path.isabs(path) else path

if (
not preignore_mgr.is_ignored(file_path)
and pathlib.Path(path).suffix in self.FILE_EXTS
):
artifacts.append(Artifact(path))
else:
files = os.listdir(path=target)
for file in files:
if (
not (
gitignore_mgr.is_ignored(file)
or preignore_mgr.is_ignored(file)
)
and pathlib.Path(file).suffix in self.FILE_EXTS
):
artifacts.append(Artifact(os.path.join(target, file)))
else:
if pathlib.Path(target).suffix in self.FILE_EXTS or target == "-":
artifacts.append(Artifact(target))

return artifacts
Loading