Skip to content

Commit

Permalink
Merge pull request #91 from jku/target-mgmt-v4
Browse files Browse the repository at this point in the history
Targets management version 4
  • Loading branch information
jku authored May 30, 2023
2 parents a674ff6 + 994d74a commit b9b6e41
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 265 deletions.
33 changes: 12 additions & 21 deletions playground/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,18 @@ Notes on remotes configured in `.playground-sign.ini`:

### Modify target files

1. Make target file changes in the signing event git branch
* Choose a signing event name, create a branch
```
git fetch origin
git switch -C sign/my-target-changes origin/main
```
* Make changes with tools of your choosing:
```
echo "test content" > targets/file.txt
git add targets/file.txt
git commit -m "Add a target file"
```
* Submit changes to a signing event branch on the repository (by pushing to repository
or by using a PR to a signing event branch): This starts a signing event
```
git push origin sign/my-target-changes
```
1. Update targets metadata
```
playground-sign sign/my-target-changes
```
Make target file changes in the signing event git branch using tools and review processes
of your choice.

```
git fetch origin
git switch -C sign/my-target-changes origin/main
echo "test content" > targets/file.txt
git commit -m "Add a target file" -- targets/file.txt
git push origin sign/my-target-changes
```

This starts a signing event (or updates an existing signing event).

### Sign changes made by others

Expand Down
156 changes: 138 additions & 18 deletions playground/repo/playground/_playground_repository.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, unique
from glob import glob
import json
import logging
import os
import shutil
from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.signer import Signature, Signer, SigstoreKey, SigstoreSigner, KEY_FOR_TYPE_AND_SCHEME
from securesystemslib.signer import (
Signature,
Signer,
SigstoreKey,
SigstoreSigner,
KEY_FOR_TYPE_AND_SCHEME,
)
from sigstore.oidc import detect_credential

from tuf.api.metadata import Key, Metadata, MetaFile, Root, Snapshot, Targets, Timestamp
from tuf.api.metadata import (
Key,
Metadata,
MetaFile,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.repository import AbortEdit, Repository
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer

Expand All @@ -21,17 +37,37 @@

logger = logging.getLogger(__name__)


@unique
class State(Enum):
ADDED = (0,)
MODIFIED = (1,)
REMOVED = (2,)


@dataclass
class TargetState:
target: TargetFile
state: State

def __str__(self):
return f"{self.target.path}: {self.state.name}"


@dataclass
class SigningStatus:
invites: set[str] # invites to _delegations_ of the role
invites: set[str] # invites to _delegations_ of the role
signed: set[str]
missing: set[str]
threshold: int
target_changes: list[TargetState]
valid: bool
message: str | None


class SigningEventState:
"""Class to manage the .signing-event-state file"""

def __init__(self, file_path: str):
self._file_path = file_path
self._invites = {}
Expand All @@ -55,7 +91,8 @@ class PlaygroundRepository(Repository):
dir: metadata directory to operate on
prev_dir: optional known good repository directory
"""
def __init__(self, dir: str, prev_dir: str|None = None):

def __init__(self, dir: str, prev_dir: str | None = None):
self._dir = dir
self._prev_dir = prev_dir

Expand All @@ -68,7 +105,7 @@ def _get_filename(self, role: str) -> str:
def _get_keys(self, role: str) -> list[Key]:
"""Return public keys for delegated role"""
if role in ["root", "timestamp", "snapshot", "targets"]:
delegator: Root|Targets = self.root()
delegator: Root | Targets = self.root()
else:
delegator = self.targets()

Expand All @@ -81,7 +118,7 @@ def _get_keys(self, role: str) -> list[Key]:
pass
return keys

def open(self, role:str) -> Metadata:
def open(self, role: str) -> Metadata:
"""Return existing metadata, or create new metadata
This is an implementation of Repository.open()
Expand All @@ -107,7 +144,6 @@ def open(self, role:str) -> Metadata:

return md


def signing_expiry_period(self, rolename: str) -> tuple[int, int]:
"""Extracts the signing and expiry period for a role
Expand All @@ -127,7 +163,6 @@ def signing_expiry_period(self, rolename: str) -> tuple[int, int]:

return (signing_days, expiry_days)


def close(self, rolename: str, md: Metadata) -> None:
"""Write metadata to a file in repo dir
Expand All @@ -154,7 +189,7 @@ def close(self, rolename: str, md: Metadata) -> None:
md.signatures[key.keyid] = Signature(key.keyid, "")

if rolename in ["timestamp", "snapshot"]:
root_md:Metadata[Root] = self.open("root")
root_md: Metadata[Root] = self.open("root")
# repository should never write unsigned online roles
root_md.verify_delegate(rolename, md)

Expand All @@ -163,7 +198,6 @@ def close(self, rolename: str, md: Metadata) -> None:
with open(filename, "wb") as f:
f.write(data)


@property
def targets_infos(self) -> dict[str, MetaFile]:
"""Implementation of Repository.target_infos
Expand Down Expand Up @@ -194,7 +228,7 @@ def snapshot_info(self) -> MetaFile:
"""
return MetaFile(self.snapshot().version)

def open_prev(self, role:str) -> Metadata | None:
def open_prev(self, role: str) -> Metadata | None:
"""Return known good metadata for role (if it exists)"""
prev_fname = f"{self._prev_dir}/{role}.json"
if os.path.exists(prev_fname):
Expand All @@ -203,7 +237,9 @@ def open_prev(self, role:str) -> Metadata | None:

return None

def _validate_role(self, delegator: Metadata, rolename: str) -> tuple[bool, str | None]:
def _validate_role(
self, delegator: Metadata, rolename: str
) -> tuple[bool, str | None]:
"""Validate role compatibility with this repository
Returns bool for validity and optional error message"""
Expand Down Expand Up @@ -238,6 +274,66 @@ def _validate_role(self, delegator: Metadata, rolename: str) -> tuple[bool, str

return True, None

@staticmethod
def _build_targets(target_dir: str, rolename: str) -> dict[str, TargetFile]:
"""Build a roles dict of TargetFile based on target files in a directory"""
targetfiles = {}

if rolename == "targets":
root_dir = target_dir
else:
root_dir = os.path.join(target_dir, rolename)

for fname in glob("*", root_dir=root_dir):
realpath = os.path.join(root_dir, fname)
if not os.path.isfile(realpath):
continue

# targetpath is a URL path, not OS path
if rolename == "targets":
targetpath = fname
else:
targetpath = f"{rolename}/{fname}"
targetfiles[targetpath] = TargetFile.from_file(
targetpath, realpath, ["sha256"]
)

return targetfiles

def _known_good_targets(self, rolename: str) -> Targets:
"""Return Targets from the known good version (signing event start point)"""
prev_path = os.path.join(self._prev_dir, f"{rolename}.json")
if os.path.exists(prev_path):
with open(prev_path, "rb") as f:
md = Metadata.from_bytes(f.read())
assert isinstance(md.signed, Targets)
return md.signed
else:
# this role did not exist: return an empty one for comparison purposes
return Targets()

def _get_target_changes(self, rolename: str) -> list[TargetState]:
"""Compare targetfiles in known good version and signing event version:
return list of changes"""

if rolename in ["root", "timestamp", "snapshot"]:
return []

changes = []

known_good_targetfiles = self._known_good_targets(rolename).targets
for targetfile in self.targets(rolename).targets.values():
if targetfile.path not in known_good_targetfiles:
changes.append(TargetState(targetfile, State.ADDED))
elif targetfile != known_good_targetfiles[targetfile.path]:
changes.append(TargetState(targetfile, State.MODIFIED))
del known_good_targetfiles[targetfile.path]

for targetfile in known_good_targetfiles.values():
changes.append(TargetState(targetfile, State.REMOVED))

return changes

def _get_signing_status(self, delegator: Metadata, rolename: str) -> SigningStatus:
"""Build signing status for role.
Expand Down Expand Up @@ -270,10 +366,18 @@ def _get_signing_status(self, delegator: Metadata, rolename: str) -> SigningStat
except (KeyError, UnverifiedSignatureError):
missing_sigs.add(keyowner)

# Document changes to targets metadata in this signing event
target_changes = self._get_target_changes(rolename)

# Just to be sure: double check that delegation threshold is reached
valid, msg = self._validate_role(delegator, rolename)
if invites:
valid, msg = False, None
else:
valid, msg = self._validate_role(delegator, rolename)

return SigningStatus(invites, sigs, missing_sigs, role.threshold, valid, msg)
return SigningStatus(
invites, sigs, missing_sigs, role.threshold, target_changes, valid, msg
)

def status(self, rolename: str) -> tuple[SigningStatus, SigningStatus | None]:
"""Returns signing status for role.
Expand Down Expand Up @@ -312,12 +416,12 @@ def publish(self, directory: str):
dst_path = os.path.join(metadata_dir, f"{snapshot.version}.snapshot.json")
shutil.copy(os.path.join(self._dir, "snapshot.json"), dst_path)

for filename, metafile in snapshot.meta.items():
for filename, metafile in snapshot.meta.items():
src_path = os.path.join(self._dir, filename)
dst_path = os.path.join(metadata_dir, f"{metafile.version}.{filename}")
shutil.copy(src_path, dst_path)

targets = self.targets(filename[:-len(".json")])
targets = self.targets(filename[: -len(".json")])
for target in targets.targets.values():
parent, sep, name = target.path.rpartition("/")
os.makedirs(os.path.join(targets_dir, parent), exist_ok=True)
Expand All @@ -326,8 +430,7 @@ def publish(self, directory: str):
dst_path = os.path.join(targets_dir, parent, f"{hash}.{name}")
shutil.copy(src_path, dst_path)


def bump_expiring(self, rolename:str) -> int | None:
def bump_expiring(self, rolename: str) -> int | None:
"""Create a new version of role if it is about to expire"""
now = datetime.utcnow()
bumped = True
Expand All @@ -343,3 +446,20 @@ def bump_expiring(self, rolename:str) -> int | None:
raise AbortEdit

return signed.version if bumped else None

def update_targets(self, rolename: str) -> bool:
if rolename in ["root", "timestamp", "snapshot"]:
return False

new_target_dict = self._build_targets(
os.path.join(self._dir, "..", "targets"), rolename
)
with self.edit_targets(rolename) as targets:
# if targets dict has no changes, cancel the metadata edit
if targets.targets == new_target_dict:
raise AbortEdit("No target changes needed")

targets.targets = new_target_dict
return True

return False
20 changes: 14 additions & 6 deletions playground/repo/playground/bump_expiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@


def _git(cmd: list[str]) -> subprocess.CompletedProcess:
cmd = ["git", "-c", "user.name=repository-playground", "-c", "user.email=41898282+github-actions[bot]@users.noreply.github.com"] + cmd
cmd = [
"git",
"-c",
"user.name=repository-playground",
"-c",
"user.email=41898282+github-actions[bot]@users.noreply.github.com",
] + cmd
proc = subprocess.run(cmd, check=True, capture_output=True, text=True)
logger.debug("%s:\n%s", cmd, proc.stdout)
return proc
Expand All @@ -24,7 +30,7 @@ def _git(cmd: list[str]) -> subprocess.CompletedProcess:
@click.option("-v", "--verbose", count=True, default=0)
@click.option("--push/--no-push", default=False)
@click.argument("publish-dir", required=False)
def bump_online(verbose: int, push: bool, publish_dir: str|None) -> None:
def bump_online(verbose: int, push: bool, publish_dir: str | None) -> None:
"""Commit new metadata versions for online roles if needed
New versions will be signed.
Expand Down Expand Up @@ -54,7 +60,9 @@ def bump_online(verbose: int, push: bool, publish_dir: str|None) -> None:
sys.exit(1)

click.echo(msg)
_git(["commit", "-m", msg, "--", "metadata/timestamp.json", "metadata/snapshot.json"])
_git(
["commit", "-m", msg, "--", "metadata/timestamp.json", "metadata/snapshot.json"]
)
if push:
_git(["push", "origin", "HEAD"])

Expand All @@ -78,12 +86,12 @@ def bump_offline(verbose: int, push: bool) -> None:
logging.basicConfig(level=logging.WARNING - verbose * 10)

repo = PlaygroundRepository("metadata")
events=[]
events = []
for filename in glob("*.json", root_dir="metadata"):
if filename in ["timestamp.json", "snapshot.json"]:
continue

rolename = filename[:-len(".json")]
rolename = filename[: -len(".json")]
version = repo.bump_expiring(rolename)
if version is None:
logging.debug("No version bump needed for %s", rolename)
Expand All @@ -107,4 +115,4 @@ def bump_offline(verbose: int, push: bool) -> None:
_git(["reset", "--hard", "HEAD^"])

# print out list of created event branches
click.echo(" ".join(events))
click.echo(" ".join(events))
Loading

0 comments on commit b9b6e41

Please sign in to comment.