diff --git a/Makefile b/Makefile index 6a33bd8a80..e5d67887bd 100644 --- a/Makefile +++ b/Makefile @@ -91,6 +91,7 @@ schema-check: # Is the generated Go code synced with the schema? grep -q "$(DIGEST)" pkg/builds/cosa_v1.go grep -q "$(DIGEST)" pkg/builds/schema_doc.go + grep -q "$(DIGEST)" src/cmd-cloud-prune install: install -d $(DESTDIR)$(PREFIX)/lib/coreos-assembler diff --git a/cmd/coreos-assembler.go b/cmd/coreos-assembler.go index 2824864fa5..696ac0ba02 100644 --- a/cmd/coreos-assembler.go +++ b/cmd/coreos-assembler.go @@ -16,7 +16,7 @@ var buildCommands = []string{"init", "fetch", "build", "run", "prune", "clean", var advancedBuildCommands = []string{"buildfetch", "buildupload", "oc-adm-release", "push-container"} var buildextendCommands = []string{"aliyun", "applehv", "aws", "azure", "digitalocean", "exoscale", "extensions-container", "gcp", "hashlist-experimental", "hyperv", "ibmcloud", "kubevirt", "live", "metal", "metal4k", "nutanix", "openstack", "qemu", "secex", "virtualbox", "vmware", "vultr"} -var utilityCommands = []string{"aws-replicate", "compress", "copy-container", "koji-upload", "kola", "push-container-manifest", "remote-build-container", "remote-prune", "remote-session", "sign", "tag", "update-variant"} +var utilityCommands = []string{"aws-replicate", "compress", "copy-container", "koji-upload", "kola", "push-container-manifest", "remote-build-container", "cloud-prune", "remote-session", "sign", "tag", "update-variant"} var otherCommands = []string{"shell", "meta"} func init() { diff --git a/src/cmd-cloud-prune b/src/cmd-cloud-prune new file mode 100755 index 0000000000..be3eff2575 --- /dev/null +++ b/src/cmd-cloud-prune @@ -0,0 +1,413 @@ +#!/usr/bin/python3 -u + +# This script parses a policy.yaml file, which outlines the specific +# pruning actions required for each stream and the age threshold for +# deleting artifacts within them. +# Example of policy.yaml +# rawhide: +# # all cloud images +# cloud-uploads: 2 years +# # artifacts in meta.json's `images` key +# images: 2 years +# images-keep: [qemu, live-iso] +# build: 3 years + +import argparse +import subprocess +import json +import yaml +import collections +import datetime +import os +import boto3 +from dateutil.relativedelta import relativedelta +import requests +from tenacity import retry, retry_if_exception_type + +retry_requests_exception = (retry_if_exception_type(requests.Timeout) | + retry_if_exception_type(requests.ReadTimeout) | + retry_if_exception_type(requests.ConnectTimeout) | + retry_if_exception_type(requests.ConnectionError)) + +from cosalib.gcp import remove_gcp_image +from cosalib.s3 import S3 +from cosalib.aws import deregister_ami, delete_snapshot +from cosalib.builds import Builds, BUILDFILES +from cosalib.cmdlib import ( + load_json, + retry_stop, + retry_stop_long, + retry_wait_long, + retry_boto_exception, + retry_callback +) +Build = collections.namedtuple("Build", ["id", "timestamp", "images", "arch"]) +# set metadata caching to 5m +CACHE_MAX_AGE_METADATA = 60 * 5 + +def retry_callback(retry_state): + print(f"Retrying after {retry_state.outcome.exception()}") + +def main(): + args = parse_args() + + # Boto3 loads credentials from ~/.aws/config by default and we can change + # this default location by setting the AWS_CONFIG_FILE environment variable. + # The alternative is to manually pass ACCESS_KEY and SECRET_KEY which isn't favourable. + if args.aws_config_file: + os.environ["AWS_CONFIG_FILE"] = args.aws_config_file + + gcp_cloud_config = { + 'gcp': { + 'json-key': args.gcp_json_key, + 'project': args.gcp_project, + } + } + + with open(f"{args.policy}") as f: + policy = yaml.safe_load(f) + stream = args.stream + + # These lists are up to date as of schema hash + # 4c19aed3b3d84af278780bff63728510bb3e70613e4c4eef8cabd7939eb31bd8. If changing + # this hash, ensure that the list of supported and unsupported artifacts below + # is up to date. + supported = ["amis", "gcp"] + unsupported = [] + FCOS_STREAMS_URL = "https://builds.coreos.fedoraproject.org/prod/streams" + + if stream not in policy: + print(f"No pruning policy specified for {stream} stream in policy.yaml") + return + else: + s3_client = boto3.client('s3', endpoint_url=args.endpoint_url) + # If the build key is set in the policy file, then the cloud-uploads key must + # also be present, and the duration of cloud-uploads must be equal or shorter + if "build" in policy[stream].keys(): + cloud_uploads_check(policy[stream]) + # Base URL for Fedora CoreOS builds + url = args.url or f'{FCOS_STREAMS_URL}/{stream}/builds' + if url.startswith("s3://"): + fetcher = S3Fetcher(url) + elif url.startswith("http://") or url.startswith("https://"): + fetcher = HTTPFetcher(url) + else: + raise Exception("Invalid scheme: only s3:// and http(s):// supported") + + if fetcher.exists('builds.json'): + # Check to see if local builds.json has been modified with local builds + if os.path.isfile(BUILDFILES['sourcedata']) \ + and os.path.isfile(BUILDFILES['list']): + # If we have local builds, don't overwrite that by default. + havelocalchanges = subprocess.call(['cmp', BUILDFILES['sourcedata'], BUILDFILES['list']], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) != 0 + if havelocalchanges: + if args.force: + print(f"Detected local modifications to {BUILDFILES['list']}") + print("Forcing update as requested by --force") + else: + raise SystemExit(f"{BUILDFILES['list']} modified locally. " + "Run with --force to overwrite local changes") + + # Download builds.json to local builds.json + fetcher.fetch('builds.json', dest=BUILDFILES['list']) + print(f"Updated {BUILDFILES['list']}") + # Record the origin and original state + with open(BUILDFILES['sourceurl'], 'w') as f: + f.write(args.url + '\n') + # Copy the builds.json to the local sourcedata file so we can + # detect local modifications. + subprocess.check_call(['cp-reflink', BUILDFILES['list'], BUILDFILES['sourcedata']]) + else: + print("No builds.json found") + return + + buildJsonData=fetcher.fetch_json('builds.json') + # action is basically whatever is needed to be pruned for the respective stream + for action in policy[stream]: + duration = policy[stream][action] + duration_in_months = get_months(duration) + refDate = datetime.datetime.now() - relativedelta(months=int(duration_in_months)) + + print(f"Pruning {duration} old {action} for {stream} builds") + # Enumerating in reverse to go from the oldest build to the newest one + for index, build in enumerate(reversed(buildJsonData["builds"])): + build_id = build["id"] + if "policy-cleanup" in build.keys(): + # If we have already pruned the spedified resources for this + # build as per builds.json, we skip through it. + if action in build["policy-cleanup"]: + print(f"The {build_id} has already had {action} pruning completed") + break + timestamp = build_id.split('.')[1] + buildDate = datetime.datetime(int(timestamp[0:4]), int(timestamp[4:6]), int(timestamp[-2:])) + if buildDate < refDate: + for arch in build["arches"]: + meta_url = f"{url}/{build_id}/{arch}/meta.json" + meta_json = fetch_json(meta_url) + images = { + "amis": meta_json.get("amis") or [], + "gcp": meta_json.get("gcp") or [], + } + currentBuild = Build( + id=build_id, + timestamp=timestamp, + images=images, + arch=arch, + ) + match action: + case "cloud-uploads": + # Prunes only AWS and GCP at the moment + delete_cloud_resources(currentBuild, gcp_cloud_config, args.dry_run) + if not args.dry_run: + build.setdefault("policy-cleanup", []).append(action) + buildJsonData["builds"][index] = build + case "build": + print(f"Deleting key {args.prefix}{build.id} from bucket {args.bucket}") + # Delete the build's directory in S3 + S3().delete_object(args.bucket, f"{args.prefix}{str(currentBuild.id)}") + else: + break + with open(f"builds/builds.json", "w") as json_file: + json_file.write(json.dumps(buildJsonData)) + + s3_copy(s3_client, BUILDFILES['list'], args.bucket, f'{args.prefix}/builds.json', + CACHE_MAX_AGE_METADATA, args.acl, extra_args={}, + dry_run=args.dry_run) + # And now update our cached copy to note we've successfully sync'd. + with open(BUILDFILES['sourceurl'], 'w') as f: + f.write(f"s3://{args.bucket}/{args.prefix}\n") + subprocess.check_call(['cp-reflink', BUILDFILES['list'], BUILDFILES['sourcedata']]) + +def parse_args(): + parser = argparse.ArgumentParser(prog="coreos-assembler cloud-prune") + parser.add_argument("--policy", default='./policy.yaml', type=str, + help="Path to policy.yaml file") + parser.add_argument("--url", metavar='URL', default="", + help="URL from which to fetch metadata") + parser.add_argument("--dry-run", help="Don't actually delete anything", + action='store_true') + parser.add_argument("--stream", type=str, help="Fedora stream", required=True) + parser.add_argument("--force", action='store_true', + help="Assuming local changes, force update {BUILDFILES['list']}") + + parser.add_argument("--gcp-json-key", help="GCP Service Account JSON Auth", + default=os.environ.get("GCP_JSON_AUTH")) + parser.add_argument("--gcp-project", help="GCP Project name", + default=os.environ.get("GCP_PROJECT_NAME")) + parser.add_argument("--aws-config-file", help="AWS Credentials", + default=os.environ.get("AWS_CONFIG_FILE"), type=str) + + subparsers = parser.add_subparsers(dest='cmd', title='subcommands') + subparsers.required = True + + s3 = subparsers.add_parser('s3', help='Prune s3 buckets') + s3.add_argument("--bucket", help="Bucket name") + s3.add_argument("--prefix", help="Key prefix") + s3.add_argument("--acl", help="ACL for objects", + action='store', default='private') + return parser.parse_args() + +# Handling just AWS and GCP at the moment +def delete_cloud_resources(build, gcp_cloud_config, dry_run): + errors = [] + totalBuildAMIs = len(build.images.get("amis") or '') + GCPImage = 0 + # Unregister AMIs and snapshots + if not build.images.get("amis", []): + print(f"No AMIs for {build.id} for {build.arch}") + if not build.images.get("gcp", []): + print(f"No GCP image for {build.id} for {build.arch}") + else: + GCPImage=1 + + if dry_run: + print(f"Would delete {totalBuildAMIs} amis/snapshots and {GCPImage} GCP image for {build.id} for {build.arch}") + return + + for ami in build.images.get("amis", []): + print(f"Deleting cloud uploads for {build.id}") + region_name = ami.get("name") + ami_id = ami.get("hvm") + snapshot_id = ami.get("snapshot") + if ami_id and region_name: + try: + deregister_ami(ami_id, region=region_name, dry_run=dry_run) + except Exception as e: + errors.append(e) + if snapshot_id and region_name: + try: + delete_snapshot(snapshot_id, region=region_name, dry_run=dry_run) + except Exception as e: + errors.append(e) + + gcp = build.images.get('gcp') + if gcp: + gcp_image = gcp.get('image') + json_key = gcp_cloud_config.get('gcp', {}).get('json-key') + project = gcp_cloud_config.get('gcp', {}).get('project') + if gcp_image and json_key and project: + try: + remove_gcp_image(gcp_image, json_key, project) + except Exception as e: + errors.append(e) + + if len(errors) != 0: + print(f"Found errors when removing build {build.id}:") + for e in errors: + raise Exception(e) + +def cloud_uploads_check(actions): + if "cloud-uploads" in actions.keys(): + cloud_uploads_duration = get_months(actions["cloud-uploads"]) + build_duration = get_months(actions["build"]) + assert cloud_uploads_duration < build_duration + else: + print(f"cloud-uploads must be set or be less than builds pruning duration in policy.yaml") + +def get_months(duration): + val, unit = duration.split(" ") + if unit in ["years" or "year"]: + months = int(val)*12 + elif unit in ["months" or "month"]: + months = int(val) + else: + print(f"Duration of pruning for resources is only supported in years or months") + raise Exception + return months + + +def fetch_json(url): + response = requests.get(url) + response.raise_for_status() + return response.json() + +class Fetcher(object): + + def __init__(self, url_base): + self.url_base = url_base + + def fetch(self, path, dest=None): + # if no specific dest given, assume it's a path under builds/ + if dest is None: + dest = f'builds/{path}' + # NB: `urllib.parse.urljoin()` does not do what one thinks it does. + # Using `os.path.join()` is a hack, but eh... we're not planning to run + # on Windows anytime soon. + url = os.path.join(self.url_base, path) + print(f"Fetching: {url}") + # ensure the dir for dest file exists + # otherwise s3 download_file won't be able to write temp file + os.makedirs(os.path.dirname(dest), exist_ok=True) + self.fetch_impl(url, dest) + return dest + + def fetch_impl(self, url, dest): + raise NotImplementedError + + def exists_impl(self, url): + raise NotImplementedError + + def fetch_json(self, path): + return load_json(self.fetch(path)) + + def exists(self, path): + url = os.path.join(self.url_base, path) + return self.exists_impl(url) + +class S3Fetcher(Fetcher): + def __init__(self, url_base): + super().__init__(url_base) + self.s3_client = S3() + + def fetch_impl(self, url, dest): + assert url.startswith("s3://") + bucket, key = url[len("s3://"):].split('/', 1) + # this function does not need to be retried with the decorator as download_file would + # retry automatically based on s3config settings + self.s3_client.download_file(bucket, key, dest) + + def exists_impl(self, url): + assert url.startswith("s3://") + bucket, key = url[len("s3://"):].split('/', 1) + # sanity check that the bucket exists and we have access to it + self.s3_client.head_bucket(bucket=bucket) + return self.s3_client.head_object(bucket=bucket, key=key) + + +class HTTPFetcher(Fetcher): + + def __init__(self, url_base): + super().__init__(url_base) + + @retry(stop=retry_stop, retry=retry_requests_exception, before_sleep=retry_callback) + def fetch_impl(self, url, dest): + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(dest, mode='wb') as f: + # Stream file data from the network to the file in these size chunks. + # 30 MiB is somewhat arbitrary but should be easily supported on most systems + # without transfer slowdown. + max_chunk_size = 30 * 1024 * 1024 + + # If the HTTP headers have encoded the file transfer as chunks already, respect those instead + # of our hardcoded max size. + if 'chunked' in r.headers.get('transfer-encoding', list()): + max_chunk_size = None + + # With stream=True above, read data from the network and write it to the file in chunks + # rather than trying to put it all in RAM and then write it all to disk. + # For large ociarchive files on lower-RAM systems, this can cause a crash, and the performance + # trade-off for chunking it is usually negligible unless the files are extra huge, the disk IO cache is + # very small, and the network pipe is very large. + for chunk in r.iter_content(chunk_size=max_chunk_size): + f.write(chunk) + + @retry(stop=retry_stop, retry=retry_requests_exception, before_sleep=retry_callback) + def exists_impl(self, url): + with requests.head(url) as r: + if r.status_code == 200: + return True + # just treat 403 as ENOENT too; this is common for APIs to do (like + # AWS) and we don't support HTTP basic auth here anyway + if r.status_code in [404, 403]: + return False + raise Exception(f"Received rc {r.status_code} for {url}") + +@retry(stop=retry_stop_long, wait=retry_wait_long, + retry=retry_boto_exception, retry_error_callback=retry_callback) +def s3_copy(s3_client, src, bucket, key, max_age, acl, extra_args={}, dry_run=False): + extra_args = dict(extra_args) + if 'ContentType' not in extra_args: + if key.endswith('.json'): + extra_args['ContentType'] = 'application/json' + elif key.endswith('.tar'): + extra_args['ContentType'] = 'application/x-tar' + elif key.endswith('.xz'): + extra_args['ContentType'] = 'application/x-xz' + elif key.endswith('.gz'): + extra_args['ContentType'] = 'application/gzip' + elif key.endswith('.iso'): + extra_args['ContentType'] = 'application/x-iso9660-image' + else: + # use a standard MIME type for "binary blob" instead of the default + # 'binary/octet-stream' AWS slaps on + extra_args['ContentType'] = 'application/octet-stream' + upload_args = { + 'CacheControl': f'max-age={max_age}', + 'ACL': acl + } + upload_args.update(extra_args) + + print((f"{'Would upload' if dry_run else 'Uploading'} {src} to " + f"s3://{bucket}/{key} {extra_args if len(extra_args) else ''}")) + + if dry_run: + return + + s3_client.upload_file(Filename=src, Bucket=bucket, Key=key, ExtraArgs=upload_args) + + +if __name__ == "__main__": + main()