Skip to content

Commit

Permalink
chore: update SyncCacContentTask
Browse files Browse the repository at this point in the history
  • Loading branch information
qduanmu committed Jan 15, 2025
1 parent 545dea9 commit 1506fee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
6 changes: 1 addition & 5 deletions trestlebot/cli/commands/sync_cac_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from trestlebot.cli.options.common import common_options, git_options
from trestlebot.cli.utils import run_bot
from trestlebot.const import ERROR_EXIT_CODE
from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition
from trestlebot.tasks.base_task import TaskBase
from trestlebot.tasks.sync_cac_content_task import SyncCacContentTask

Expand Down Expand Up @@ -70,16 +69,13 @@ def sync_cac_content_cmd(ctx: click.Context, **kwargs: Any) -> None:

try:
pre_tasks: List[TaskBase] = []
authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition(
trestle_root=working_dir,
)
sync_cac_content_task = SyncCacContentTask(
product,
cac_profile,
cac_content_root,
component_definition_type,
oscal_profile,
authored_comp,
working_dir,
)
pre_tasks.append(sync_cac_content_task)
results = run_bot(pre_tasks, kwargs)
Expand Down
33 changes: 16 additions & 17 deletions trestlebot/tasks/sync_cac_content_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@

"""Trestle Bot Sync CaC Content Tasks"""

import datetime
import logging
import os
import pathlib
from typing import List

# from ssg.products import get_all
from ssg.profiles import get_profiles_from_products
from trestle.common import const as trestle_const
from trestle.common.list_utils import none_if_empty
from trestle.common.model_utils import ModelUtils
from trestle.core.generators import generate_sample_model
from trestle.core.models.file_content_type import FileContentType
from trestle.oscal.common import Property
from trestle.oscal.component import ComponentDefinition, DefinedComponent

from trestlebot import const
from trestlebot.tasks.authored.base_authored import AuthoredObjectBase
from trestlebot.tasks.base_task import TaskBase
from trestlebot.transformers.cac_transformer import (
RuleInfo,
Expand All @@ -41,7 +40,7 @@ def __init__(
cac_content_root: str,
compdef_type: str,
oscal_profile: str,
authored_object: AuthoredObjectBase,
working_dir: str,
) -> None:
"""Initialize CaC content sync task."""
self.product: str = product
Expand All @@ -50,8 +49,6 @@ def __init__(
self.compdef_type: str = compdef_type
self.rules: List[str] = []

self._authored_object = authored_object
working_dir = self._authored_object.get_trestle_root()
super().__init__(working_dir, None)

def _collect_rules(self) -> None:
Expand Down Expand Up @@ -97,8 +94,13 @@ def _create_or_update_compdef(self, compdef_type: str = "service") -> None:
oscal_component.description = full_name
oscal_component.props = props
repo_path = pathlib.Path(self.working_dir)
cd_dir = repo_path.joinpath(f"{trestle_const.MODEL_DIR_COMPDEF}/{self.product}")
cd_json = cd_dir / "component-definition.json"
cd_json: pathlib.Path = ModelUtils.get_model_path_for_name_and_class(
repo_path,
self.product,
ComponentDefinition,
FileContentType.JSON,
)

if cd_json.exists():
logger.info(f"The component definition for {self.product} exists.")
compdef = ComponentDefinition.oscal_read(cd_json)
Expand All @@ -110,15 +112,14 @@ def _create_or_update_compdef(self, compdef_type: str = "service") -> None:
if component.title == oscal_component.title:
if component.props != oscal_component.props:
logger.info(
f"Start to update the props of the component {component.title}"
f"Start to update props of the component {component.title}"
)
compdef.components[index].props = oscal_component.props
updated = True
compdef.oscal_write(cd_json)
break
# If the component doesn't exist, append this component
if oscal_component.title not in components_titles:
logger.info(f"Start to append the component {oscal_component.title}")
logger.info(f"Start to append component {oscal_component.title}")
compdef.components.append(oscal_component)
compdef.oscal_write(cd_json)
updated = True
Expand All @@ -127,16 +128,12 @@ def _create_or_update_compdef(self, compdef_type: str = "service") -> None:
compdef.metadata.version = str(
"{:.1f}".format(float(compdef.metadata.version) + 0.1)
)
compdef.metadata.last_modified = (
datetime.datetime.now(datetime.timezone.utc)
.replace(microsecond=0)
.isoformat()
)
ModelUtils.update_last_modified(compdef)
compdef.oscal_write(cd_json)
else:
logger.info(f"Creating component definition for product {self.product}")
cd_dir = pathlib.Path(os.path.dirname(cd_json))
cd_dir.mkdir(exist_ok=True, parents=True)
cd_json = cd_dir / "component-definition.json"
component_definition.components.append(oscal_component)
component_definition.oscal_write(cd_json)

Expand All @@ -147,8 +144,10 @@ def execute(self) -> int:
# all_products = list(set().union(*get_all(self.cac_content_root)))
# if self.product not in all_products:
# raise TaskException(f"Product {self.product} does not exist.")

# Collect all selected rules in product profile
self._collect_rules()
# Create or update product component definition
self._create_or_update_compdef()

return const.SUCCESS_EXIT_CODE

0 comments on commit 1506fee

Please sign in to comment.