Skip to content

Commit

Permalink
add Red Hat converter
Browse files Browse the repository at this point in the history
  • Loading branch information
jasinner committed Sep 9, 2024
1 parent 28de9aa commit 695c5e4
Show file tree
Hide file tree
Showing 11 changed files with 2,165 additions and 0 deletions.
12 changes: 12 additions & 0 deletions tools/redhat/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"

[packages]
jsonschema = "*"
requests = "*"

[dev-packages]
pylint = "*"
yapf = "*"
377 changes: 377 additions & 0 deletions tools/redhat/Pipfile.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions tools/redhat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Red Hat CSAF to OSV Converter

## Setup

~~~
$ pipenv sync
$ pipenv shell
~~~

## Usage

Needs to be run in a folder where the Red Hat CSAF documents to convert already exist. Files can be downloaded the [Red Hat Customer Portal Security Data section](https://access.redhat.com/security/data/csaf/v2/advisories/)
~~~
$ ./convert_redhat.py csaf/rhsa-2024_4546.json
~~~

OSV documents will be output in the `osv` directory by default. Override the default with the `--output_directory` option.

## Tests

~~~
$ python3 -m unittest *_test.py
~~~
72 changes: 72 additions & 0 deletions tools/redhat/convert_redhat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

# Convert a CSAF document to OSV format
# i.e. https://access.redhat.com/security/data/csaf/v2/advisories/2024/rhsa-2024_4546.json
import argparse
import json
import sys
from datetime import datetime

import requests
from csaf import CSAF
from jsonschema import validate
from osv import OSV, OSVEncoder

class RedHatConverter:
"""
Class which converts and validates a CSAF string to an OSV string
"""
SCHEMA = (f"https://raw.githubusercontent.com/ossf/osv-schema/v{OSV.SCHEMA_VERSION}"
"/validation/schema.json")
REQUEST_TIMEOUT = 60

def __init__(self):
schema_content = requests.get(self.SCHEMA, timeout=self.REQUEST_TIMEOUT)
self.osv_schema = schema_content.json()

def convert(self, csaf_content: str, modified: str, published: str = "") -> tuple[str, str]:
"""
Converts csaf_content json string into an OSV json string
returns an OSV ID and the json string content of the OSV file
the json string content will be empty if no content is applicable
throws a validation error in the schema doesn't validate correctly.
The modified value for osv is passed in so it matches what's in all.json
Raises ValueError is CSAF file can't be parsed
"""
csaf = CSAF(csaf_content)
osv = OSV(csaf, modified, published)

# We convert from an OSV object to a JSON string here in order to use the OSVEncoder
# Once we OSV json string data we validate it using the OSV schema
osv_content = json.dumps(osv, cls=OSVEncoder, indent=2)
osv_data = json.loads(osv_content)
validate(osv_data, schema=self.osv_schema)

return osv.id, osv_content


def main():
"""
Given a Red Hat CSAF document, covert it to OSV. Writes the OSV file to disk at 'osv' by default
"""
parser = argparse.ArgumentParser(description='CSAF to OSV Converter')
parser.add_argument("csaf", metavar="FILE", help='CSAF file to process')
parser.add_argument('--output_directory', dest='out_dir', default="osv")

args = parser.parse_args()

with open(args.csaf, "r", encoding="utf-8") as in_f:
csaf_data = in_f.read()

converter = RedHatConverter()
osv_id, osv_data = converter.convert(csaf_data, datetime.now().strftime(OSV.DATE_FORMAT))

if not osv_data:
sys.exit(1)

with open(f"{args.out_dir}/{osv_id}.json", "w", encoding="utf-8") as out_f:
out_f.write(osv_data)


if __name__ == '__main__':
main()
27 changes: 27 additions & 0 deletions tools/redhat/convert_redhat_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import unittest
from datetime import datetime
from convert_redhat import RedHatConverter
from osv import OSV
import json


class TestRedHatConverter(unittest.TestCase):
def test_convert_redhat(self):
modified_time = datetime.strptime("2024-09-02T14:30:00", "%Y-%m-%dT%H:%M:%S")
csaf_file = "testdata/rhsa-2024_4546.json"
expected_file = "testdata/RHSA-2024_4546.json"

with open(csaf_file, "r", encoding="utf-8") as fp:
csaf_data = fp.read()
converter = RedHatConverter()
osv_data = converter.convert(csaf_data, modified_time.strftime(OSV.DATE_FORMAT))

assert osv_data[0] == "RHSA-2024:4546"
result_data = json.loads(osv_data[1])

with open(expected_file, "r", encoding="utf-8") as fp:
expected_data = json.load(fp)
assert expected_data == result_data

if __name__ == '__main__':
unittest.main()
144 changes: 144 additions & 0 deletions tools/redhat/csaf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import json
from typing import Any, Iterable

class Remediation:
"""
class to handle remediation advice in CSAF data
"""

# pylint: disable=too-few-public-methods
# This class is used for initialization and encapsulation of Remediation data

def __init__(self, csaf_product_id: str, cpes: dict[str, str], purls: dict[str, str]):
if ":" not in csaf_product_id:
raise ValueError(f"Did not find ':' in product_id: {csaf_product_id}")
(self.product, self.product_version) = csaf_product_id.split(":", maxsplit=1)

# NEVRA stands for Name Epoch Version Release and Architecture
# We split the name from the rest of the 'version' data (EVRA). We store name as component.
split_component_version = self.product_version.rsplit("-", maxsplit=2)
if len(split_component_version) < 3:
raise ValueError(f"Could not convert component into NEVRA: {self.product_version}")
# RHEL Modules have 4 colons in the name part of the NEVRA. If we detect a modular RPM
# product ID, discard the module part of the name and look for that in the purl dict.
# Ideally we would keep the module information and use it when scanning a RHEL system,
# however this is not done today by Clair: https://github.com/quay/claircore/pull/901/files
if split_component_version[0].count(":") == 4:
self.component = split_component_version[0].rsplit(":")[-1]
else:
self.component = split_component_version[0]
self.fixed_version = "-".join((split_component_version[1], split_component_version[2]))

try:
nevra = f"{self.component}-{self.fixed_version}"
self.purl = purls[nevra]
self.cpe = cpes[self.product]
except KeyError:
raise ValueError(f"Did not find {csaf_product_id} in product branches")

# There are many pkg:oci/ remediations in Red Hat data. However there are no strict
# rules enforced on versioning Red Hat containers, therefore we cant compare container
# versions to each other with 100% accuracy at this time.
if not self.purl.startswith("pkg:rpm/"):
raise ValueError("Non RPM remediations are not supported in OSV at this time")


class Vulnerability:
"""
class to handle vulnerability information
"""

# pylint: disable=too-few-public-methods
# This class encapsulates Red Hat CSAF Vulnerability data
# Only initialization is required because data retrieval is via JSON encoding

def __init__(self, csaf_vuln: dict[str, Any], cpes: dict[str, str], purls: dict[str, str]):
self.cve_id = csaf_vuln["cve"]
for score in csaf_vuln.get("scores", []):
if "cvss_v3" in score:
self.cvss_v3_vector = score["cvss_v3"]["vectorString"]
self.cvss_v3_base_score = score["cvss_v3"]["baseScore"]
self.references = csaf_vuln["references"]
self.remediations = []
for product_id in csaf_vuln["product_status"]["fixed"]:
try:
self.remediations.append(Remediation(product_id, cpes, purls))
except ValueError as e:
print("Could not parse product_id: %s. %s", product_id, e)


def gen_dict_extract(key, var: Iterable):
"""
Given a key value and dictionary or list, traverses that dictionary or list returning the value
of the given key.
From https://stackoverflow.com/questions/9807634/
find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
"""
if hasattr(var, "items"):
for k, v in var.items():
if k == key:
yield v
if isinstance(v, dict):
yield from gen_dict_extract(key, v)
elif isinstance(v, list):
for d in v:
yield from gen_dict_extract(key, d)


def build_product_maps(product_tree_branches: dict) -> tuple[dict[str, str], dict[str, str]]:
"""
Given a CSAF product tree branch dictionary returns a tuple of CPEs by product ID and PURLs by
product ID.
"""
cpe_map = {}
purl_map = {}
products = gen_dict_extract("product", product_tree_branches)
for product in products:
product_id = product["product_id"]
if "product_identification_helper" in product:
helper = product["product_identification_helper"]
if "cpe" in helper:
cpe_map[product_id] = helper["cpe"]
elif "purl" in helper:
purl_map[product_id] = helper["purl"]
return cpe_map, purl_map


class CSAF:
"""
class to handle CSAF data read from a local file path
"""

def __init__(self, csaf_content: str):
csaf_data = json.loads(csaf_content)

if not csaf_data:
raise ValueError("Unable to load CSAF JSON data.")

self.doc = csaf_data["document"]

self.csaf = {"type": self.doc["category"], "csaf_version": self.doc["csaf_version"]}

# Only support csaf_vex 2.0
if self.csaf != {"type": "csaf_vex", "csaf_version": "2.0"}:
raise ValueError(f"Can only handle csaf_vex 2.0 documents. Got: {self.csaf}")

self.cpes, self.purls = build_product_maps(csaf_data["product_tree"])

self.vulnerabilities = [
Vulnerability(v, self.cpes, self.purls) for v in (csaf_data["vulnerabilities"])
]

@property
def title(self):
"""
Document Title
"""
return self.doc["title"]

@property
def references(self):
"""
Document References
"""
return self.doc["references"]
23 changes: 23 additions & 0 deletions tools/redhat/csaf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import unittest

from csaf import Remediation


class CSAFTest(unittest.TestCase):
def test_parse_remediation(self):
cpe = "cpe:/a:redhat:rhel_tus:8.4::appstream"
purl = "pkg:rpm/redhat/buildah@1.19.9-1.module%2Bel8.4.0%2B21078%2Ba96cfbf6?arch=src"
cpes = {"AppStream-8.4.0.Z.TUS": cpe}
purls = {"buildah-0:1.19.9-1.module+el8.4.0+21078+a96cfbf6.src": purl}
result = Remediation(
"AppStream-8.4.0.Z.TUS:container-tools:3.0:8040020240104111259:c0c392d5"
":buildah-0:1.19.9-1.module+el8.4.0+21078+a96cfbf6.src",
cpes,
purls
)
self.assertEqual(result.cpe, cpe)
self.assertEqual(result.purl, purl)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 695c5e4

Please sign in to comment.