Skip to content

Commit

Permalink
Generated resources (#2013)
Browse files Browse the repository at this point in the history
* Generate resources using class_generator script

* Generate resources using generate-class script

* Generate resources using generate-class script

* Generate resources using generate-class script

* reuse code

* Remove kubevirt deprecated resources
  • Loading branch information
myakove committed Aug 11, 2024
1 parent 014af3b commit 68efa9c
Show file tree
Hide file tree
Showing 24 changed files with 1,321 additions and 172 deletions.
158 changes: 98 additions & 60 deletions class_generator/class_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,64 @@
TESTS_MANIFESTS_DIR = "class_generator/tests/manifests"


def process_fields_args(
fields_output: str,
output_dict: Dict[str, Any],
dict_key: str,
kind: str,
debug: bool,
output_debug_file_path: str,
add_tests: bool,
debug_content: Optional[Dict[str, str]] = None,
args_to_ignore: Optional[List[str]] = None,
) -> Dict[str, Any]:
if _fields_args := re.findall(r" .*", fields_output, re.DOTALL):
for field in [_field for _field in _fields_args[0].splitlines() if _field]:
if args_to_ignore and field.split()[0] in args_to_ignore:
continue

# If line is indented 4 spaces we know that this is a field under spec
if len(re.findall(r" +", field)[0]) == 2:
output_dict[dict_key].append(
get_arg_params(
field=field.strip(),
kind=kind,
field_under_spec=True if dict_key == SPEC_STR else False,
debug=debug,
debug_content=debug_content,
output_debug_file_path=output_debug_file_path,
add_tests=add_tests,
)
)

return output_dict


def get_sections_dict(output: str) -> Dict[str, str]:
raw_resource_dict: Dict[str, str] = {}

# Get all sections from output, section is [A-Z]: for example `KIND:`
sections = re.findall(r"([A-Z]+):.*", output)

# Get all sections indexes to be able to get needed test from output by indexes later
sections_indexes = [output.index(section) for section in sections]

for idx, section_idx in enumerate(sections_indexes):
_section_name = sections[idx].strip(":")

# Get the end index of the section name, add +1 since we strip the `:`
_end_of_section_name_idx = section_idx + len(_section_name) + 1

try:
# If we have next section we get the string from output till the next section
raw_resource_dict[_section_name] = output[_end_of_section_name_idx : output.index(sections[idx + 1])]
except IndexError:
# If this is the last section get the rest of output
raw_resource_dict[_section_name] = output[_end_of_section_name_idx:]

return raw_resource_dict


def get_oc_or_kubectl() -> str:
if run_command(command=shlex.split("which oc"), check=False)[0]:
return "oc"
Expand Down Expand Up @@ -76,7 +134,7 @@ def get_kind_data_and_debug_file(kind: str, debug: bool = False, add_tests: bool
"""
Get oc/kubectl explain output for given kind, if kind is namespaced and debug filepath
"""
_, explain_out, _ = run_command(command=shlex.split(f"oc explain {kind} --recursive"))
_, explain_out, _ = run_command(command=shlex.split(f"oc explain {kind}"))

resource_kind = re.search(r".*?KIND:\s+(.*?)\n", explain_out)
resource_kind_str: str = ""
Expand Down Expand Up @@ -395,31 +453,11 @@ def parse_explain(
debug_content: Optional[Dict[str, str]] = None,
add_tests: bool = False,
) -> Dict[str, Any]:
sections: List[str] = []
resource_dict: Dict[str, Any] = {
"BASE_CLASS": "NamespacedResource" if namespaced else "Resource",
}

raw_resource_dict: Dict[str, str] = {}

# Get all sections from output, section is [A-Z]: for example `KIND:`
sections = re.findall(r"([A-Z]+):.*", output)

# Get all sections indexes to be able to get needed test from output by indexes later
sections_indexes = [output.index(section) for section in sections]

for idx, section_idx in enumerate(sections_indexes):
_section_name = sections[idx].strip(":")

# Get the end index of the section name, add +1 since we strip the `:`
_end_of_section_name_idx = section_idx + len(_section_name) + 1

try:
# If we have next section we get the string from output till the next section
raw_resource_dict[_section_name] = output[_end_of_section_name_idx : output.index(sections[idx + 1])]
except IndexError:
# If this is the last section get the rest of output
raw_resource_dict[_section_name] = output[_end_of_section_name_idx:]
raw_resource_dict = get_sections_dict(output=output)

resource_dict["KIND"] = raw_resource_dict["KIND"].strip()
resource_dict["DESCRIPTION"] = raw_resource_dict["DESCRIPTION"].strip()
Expand All @@ -435,46 +473,46 @@ def parse_explain(
resource_dict[SPEC_STR] = []
resource_dict[FIELDS_STR] = []

# Get all spec fields till spec indent is done, section indent is 2 empty spaces
# ```
# spec <ServiceSpec>
# allocateLoadBalancerNodePorts <boolean>
# type <string>
# status <ServiceStatus>
# ```
if _spec_fields := re.findall(rf" {SPEC_STR.lower()}.*(?=\n [a-z])", raw_resource_dict[FIELDS_STR], re.DOTALL):
for field in [_field for _field in _spec_fields[0].splitlines() if _field]:
# If line is indented 4 spaces we know that this is a field under spec
if len(re.findall(r" +", field)[0]) == 4:
resource_dict[SPEC_STR].append(
get_arg_params(
field=field.strip(),
kind=kind,
field_under_spec=True,
debug=debug,
debug_content=debug_content,
output_debug_file_path=output_debug_file_path,
add_tests=add_tests,
)
)
# Get kind spec
spec_out: str = ""
if debug_content:
if debug_content:
spec_out = debug_content.get("explain-spec", "")
else:
rc, spec_out, _ = run_command(command=shlex.split(f"oc explain {kind}.spec"), check=False, log_errors=False)
if not rc:
LOGGER.warning(f"{kind} spec not found, skipping")

_spec_sections_dict = get_sections_dict(output=spec_out)
if _spec_fields := _spec_sections_dict.get(FIELDS_STR):
if output_debug_file_path:
write_to_file(
data={"explain-spec": spec_out},
output_debug_file_path=output_debug_file_path,
)

if _fields := re.findall(r" .*", raw_resource_dict[FIELDS_STR], re.DOTALL):
for line in [_line for _line in _fields[0].splitlines() if _line]:
if line.split()[0] in keys_to_ignore:
continue
resource_dict = process_fields_args(
fields_output=_spec_fields,
output_dict=resource_dict,
dict_key=SPEC_STR,
kind=kind,
add_tests=add_tests,
debug=debug,
debug_content=debug_content,
output_debug_file_path=output_debug_file_path,
)

# Process only top level fields with 2 spaces indent
if len(re.findall(r" +", line)[0]) == 2:
resource_dict[FIELDS_STR].append(
get_arg_params(
field=line,
kind=kind,
debug=debug,
debug_content=debug_content,
output_debug_file_path=output_debug_file_path,
add_tests=add_tests,
)
)
resource_dict = process_fields_args(
fields_output=raw_resource_dict[FIELDS_STR],
output_dict=resource_dict,
dict_key=FIELDS_STR,
kind=kind,
add_tests=add_tests,
debug=debug,
debug_content=debug_content,
output_debug_file_path=output_debug_file_path,
args_to_ignore=keys_to_ignore,
)

api_group_real_name = resource_dict.get("GROUP")
# If API Group is not present in resource, try to get it from VERSION
Expand Down
Loading

0 comments on commit 68efa9c

Please sign in to comment.