Skip to content

Commit

Permalink
repo: Run black on the files for lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jku committed May 30, 2023
1 parent f99b61d commit 994d74a
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 55 deletions.
71 changes: 46 additions & 25 deletions playground/repo/playground/_playground_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,25 @@
import os
import shutil
from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.signer import Signature, Signer, SigstoreKey, SigstoreSigner, KEY_FOR_TYPE_AND_SCHEME
from securesystemslib.signer import (
Signature,
Signer,
SigstoreKey,
SigstoreSigner,
KEY_FOR_TYPE_AND_SCHEME,
)
from sigstore.oidc import detect_credential

from tuf.api.metadata import Key, Metadata, MetaFile, Root, Snapshot, TargetFile, Targets, Timestamp
from tuf.api.metadata import (
Key,
Metadata,
MetaFile,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.repository import AbortEdit, Repository
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer

Expand All @@ -22,11 +37,12 @@

logger = logging.getLogger(__name__)


@unique
class State(Enum):
ADDED = 0,
MODIFIED = 1,
REMOVED = 2,
ADDED = (0,)
MODIFIED = (1,)
REMOVED = (2,)


@dataclass
Expand All @@ -40,16 +56,18 @@ def __str__(self):

@dataclass
class SigningStatus:
invites: set[str] # invites to _delegations_ of the role
invites: set[str] # invites to _delegations_ of the role
signed: set[str]
missing: set[str]
threshold: int
target_changes: list[TargetState]
valid: bool
message: str | None


class SigningEventState:
"""Class to manage the .signing-event-state file"""

def __init__(self, file_path: str):
self._file_path = file_path
self._invites = {}
Expand All @@ -73,7 +91,8 @@ class PlaygroundRepository(Repository):
dir: metadata directory to operate on
prev_dir: optional known good repository directory
"""
def __init__(self, dir: str, prev_dir: str|None = None):

def __init__(self, dir: str, prev_dir: str | None = None):
self._dir = dir
self._prev_dir = prev_dir

Expand All @@ -86,7 +105,7 @@ def _get_filename(self, role: str) -> str:
def _get_keys(self, role: str) -> list[Key]:
"""Return public keys for delegated role"""
if role in ["root", "timestamp", "snapshot", "targets"]:
delegator: Root|Targets = self.root()
delegator: Root | Targets = self.root()
else:
delegator = self.targets()

Expand All @@ -99,7 +118,7 @@ def _get_keys(self, role: str) -> list[Key]:
pass
return keys

def open(self, role:str) -> Metadata:
def open(self, role: str) -> Metadata:
"""Return existing metadata, or create new metadata
This is an implementation of Repository.open()
Expand All @@ -125,7 +144,6 @@ def open(self, role:str) -> Metadata:

return md


def signing_expiry_period(self, rolename: str) -> tuple[int, int]:
"""Extracts the signing and expiry period for a role
Expand All @@ -145,7 +163,6 @@ def signing_expiry_period(self, rolename: str) -> tuple[int, int]:

return (signing_days, expiry_days)


def close(self, rolename: str, md: Metadata) -> None:
"""Write metadata to a file in repo dir
Expand All @@ -172,7 +189,7 @@ def close(self, rolename: str, md: Metadata) -> None:
md.signatures[key.keyid] = Signature(key.keyid, "")

if rolename in ["timestamp", "snapshot"]:
root_md:Metadata[Root] = self.open("root")
root_md: Metadata[Root] = self.open("root")
# repository should never write unsigned online roles
root_md.verify_delegate(rolename, md)

Expand All @@ -181,7 +198,6 @@ def close(self, rolename: str, md: Metadata) -> None:
with open(filename, "wb") as f:
f.write(data)


@property
def targets_infos(self) -> dict[str, MetaFile]:
"""Implementation of Repository.target_infos
Expand Down Expand Up @@ -212,7 +228,7 @@ def snapshot_info(self) -> MetaFile:
"""
return MetaFile(self.snapshot().version)

def open_prev(self, role:str) -> Metadata | None:
def open_prev(self, role: str) -> Metadata | None:
"""Return known good metadata for role (if it exists)"""
prev_fname = f"{self._prev_dir}/{role}.json"
if os.path.exists(prev_fname):
Expand All @@ -221,7 +237,9 @@ def open_prev(self, role:str) -> Metadata | None:

return None

def _validate_role(self, delegator: Metadata, rolename: str) -> tuple[bool, str | None]:
def _validate_role(
self, delegator: Metadata, rolename: str
) -> tuple[bool, str | None]:
"""Validate role compatibility with this repository
Returns bool for validity and optional error message"""
Expand Down Expand Up @@ -276,7 +294,9 @@ def _build_targets(target_dir: str, rolename: str) -> dict[str, TargetFile]:
targetpath = fname
else:
targetpath = f"{rolename}/{fname}"
targetfiles[targetpath] = TargetFile.from_file(targetpath, realpath, ["sha256"])
targetfiles[targetpath] = TargetFile.from_file(
targetpath, realpath, ["sha256"]
)

return targetfiles

Expand Down Expand Up @@ -314,7 +334,6 @@ def _get_target_changes(self, rolename: str) -> list[TargetState]:

return changes


def _get_signing_status(self, delegator: Metadata, rolename: str) -> SigningStatus:
"""Build signing status for role.
Expand Down Expand Up @@ -356,7 +375,9 @@ def _get_signing_status(self, delegator: Metadata, rolename: str) -> SigningStat
else:
valid, msg = self._validate_role(delegator, rolename)

return SigningStatus(invites, sigs, missing_sigs, role.threshold, target_changes, valid, msg)
return SigningStatus(
invites, sigs, missing_sigs, role.threshold, target_changes, valid, msg
)

def status(self, rolename: str) -> tuple[SigningStatus, SigningStatus | None]:
"""Returns signing status for role.
Expand Down Expand Up @@ -395,12 +416,12 @@ def publish(self, directory: str):
dst_path = os.path.join(metadata_dir, f"{snapshot.version}.snapshot.json")
shutil.copy(os.path.join(self._dir, "snapshot.json"), dst_path)

for filename, metafile in snapshot.meta.items():
for filename, metafile in snapshot.meta.items():
src_path = os.path.join(self._dir, filename)
dst_path = os.path.join(metadata_dir, f"{metafile.version}.{filename}")
shutil.copy(src_path, dst_path)

targets = self.targets(filename[:-len(".json")])
targets = self.targets(filename[: -len(".json")])
for target in targets.targets.values():
parent, sep, name = target.path.rpartition("/")
os.makedirs(os.path.join(targets_dir, parent), exist_ok=True)
Expand All @@ -409,8 +430,7 @@ def publish(self, directory: str):
dst_path = os.path.join(targets_dir, parent, f"{hash}.{name}")
shutil.copy(src_path, dst_path)


def bump_expiring(self, rolename:str) -> int | None:
def bump_expiring(self, rolename: str) -> int | None:
"""Create a new version of role if it is about to expire"""
now = datetime.utcnow()
bumped = True
Expand All @@ -427,12 +447,13 @@ def bump_expiring(self, rolename:str) -> int | None:

return signed.version if bumped else None


def update_targets(self, rolename: str) -> bool:
if rolename in ["root", "timestamp", "snapshot"]:
return False

new_target_dict = self._build_targets(os.path.join(self._dir, "..", "targets"), rolename)

new_target_dict = self._build_targets(
os.path.join(self._dir, "..", "targets"), rolename
)
with self.edit_targets(rolename) as targets:
# if targets dict has no changes, cancel the metadata edit
if targets.targets == new_target_dict:
Expand Down
20 changes: 14 additions & 6 deletions playground/repo/playground/bump_expiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@


def _git(cmd: list[str]) -> subprocess.CompletedProcess:
cmd = ["git", "-c", "user.name=repository-playground", "-c", "user.email=41898282+github-actions[bot]@users.noreply.github.com"] + cmd
cmd = [
"git",
"-c",
"user.name=repository-playground",
"-c",
"user.email=41898282+github-actions[bot]@users.noreply.github.com",
] + cmd
proc = subprocess.run(cmd, check=True, capture_output=True, text=True)
logger.debug("%s:\n%s", cmd, proc.stdout)
return proc
Expand All @@ -24,7 +30,7 @@ def _git(cmd: list[str]) -> subprocess.CompletedProcess:
@click.option("-v", "--verbose", count=True, default=0)
@click.option("--push/--no-push", default=False)
@click.argument("publish-dir", required=False)
def bump_online(verbose: int, push: bool, publish_dir: str|None) -> None:
def bump_online(verbose: int, push: bool, publish_dir: str | None) -> None:
"""Commit new metadata versions for online roles if needed
New versions will be signed.
Expand Down Expand Up @@ -54,7 +60,9 @@ def bump_online(verbose: int, push: bool, publish_dir: str|None) -> None:
sys.exit(1)

click.echo(msg)
_git(["commit", "-m", msg, "--", "metadata/timestamp.json", "metadata/snapshot.json"])
_git(
["commit", "-m", msg, "--", "metadata/timestamp.json", "metadata/snapshot.json"]
)
if push:
_git(["push", "origin", "HEAD"])

Expand All @@ -78,12 +86,12 @@ def bump_offline(verbose: int, push: bool) -> None:
logging.basicConfig(level=logging.WARNING - verbose * 10)

repo = PlaygroundRepository("metadata")
events=[]
events = []
for filename in glob("*.json", root_dir="metadata"):
if filename in ["timestamp.json", "snapshot.json"]:
continue

rolename = filename[:-len(".json")]
rolename = filename[: -len(".json")]
version = repo.bump_expiring(rolename)
if version is None:
logging.debug("No version bump needed for %s", rolename)
Expand All @@ -107,4 +115,4 @@ def bump_offline(verbose: int, push: bool) -> None:
_git(["reset", "--hard", "HEAD^"])

# print out list of created event branches
click.echo(" ".join(events))
click.echo(" ".join(events))
10 changes: 8 additions & 2 deletions playground/repo/playground/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@


def _git(cmd: list[str]) -> subprocess.CompletedProcess:
cmd = ["git", "-c", "user.name=repository-playground", "-c", "user.email=41898282+github-actions[bot]@users.noreply.github.com"] + cmd
cmd = [
"git",
"-c",
"user.name=repository-playground",
"-c",
"user.email=41898282+github-actions[bot]@users.noreply.github.com",
] + cmd
proc = subprocess.run(cmd, check=True, text=True)
logger.debug("%s:\n%s", cmd, proc.stdout)
return proc
Expand All @@ -24,7 +30,7 @@ def _git(cmd: list[str]) -> subprocess.CompletedProcess:
@click.option("-v", "--verbose", count=True, default=0)
@click.option("--push/--no-push", default=False)
@click.argument("publish-dir", required=False)
def snapshot(verbose: int, push: bool, publish_dir: str|None) -> None:
def snapshot(verbose: int, push: bool, publish_dir: str | None) -> None:
"""Update The TUF snapshot based on current repository content
Create a commit with the snapshot and timestamp changes (if any).
Expand Down
Loading

0 comments on commit 994d74a

Please sign in to comment.