diff --git a/forge/forge/python_codegen.py b/forge/forge/python_codegen.py
index 86cd2c6bf..4aab10d92 100644
--- a/forge/forge/python_codegen.py
+++ b/forge/forge/python_codegen.py
@@ -1085,7 +1085,7 @@ def write_pytest_function(
self.wl("")
self.wl("compiled_model = compile(framework_model, sample_inputs=inputs)")
self.wl("")
- self.wl("verify(inputs, framework_model, compiled_model, VerifyConfig(verify_allclose=False))")
+ self.wl("verify(inputs, framework_model, compiled_model)")
self.wl("")
self.wl("")
self.indent -= 1
diff --git a/forge/forge/tvm_unique_op_generation.py b/forge/forge/tvm_unique_op_generation.py
index bbb6a4ee2..433c45c85 100644
--- a/forge/forge/tvm_unique_op_generation.py
+++ b/forge/forge/tvm_unique_op_generation.py
@@ -2,9 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
import os
+import json
from enum import Enum
from loguru import logger
-from typing import Dict, List
+from typing import Any, Dict, List, Optional
import torch
@@ -17,6 +18,14 @@ class NodeType(Enum):
Parameter = 2
Constant = 3
+ @classmethod
+ def to_json(cls, value):
+ return value.name
+
+ @classmethod
+ def from_json(cls, value):
+ return cls[value]
+
class Operation:
"""
@@ -34,12 +43,13 @@ class Operation:
inputs_to_delete (list): A list of inputs to delete.
loop_with (list): A list of loop variables.
src_layer (optional): The source layer associated with the operation.
+ metadata (dict): It contains additional information associated with the operation like model, variant, framework
"""
def __init__(
self,
function_name,
- output_name,
+ output_name="",
node_name="",
input_names=[],
args=[],
@@ -47,6 +57,7 @@ def __init__(
input_shapes=[],
input_dtypes=[],
input_node_types=[],
+ metadata={},
):
self.function_name = function_name
self.node_name = node_name
@@ -60,6 +71,7 @@ def __init__(
self.inputs_to_delete = []
self.loop_with = []
self.src_layer = src_layer
+ self.metadata = metadata
class OpArgs(dict):
@@ -121,7 +133,7 @@ def is_empty(self):
return len(self) == 0
def __str__(self):
- return f"Opargs({super().__str__()})"
+ return super().__str__()
class OperandsInfo:
@@ -194,114 +206,130 @@ def __eq__(self, other):
def __str__(self):
if len(self.operand_types) > 0 and len(self.operand_shapes) > 0 and len(self.operand_dtypes) > 0:
- operand_info = "["
+ operands_info = "["
for operand_type, operand_shape, operand_dtype in zip(
self.operand_types, self.operand_shapes, self.operand_dtypes
):
if isinstance(operand_shape, torch.Tensor):
- operand_info += f"Operand(type={operand_type}, shape=Tensor, dtype={operand_dtype}), "
+ operands_info += f"Operand(type={operand_type}, shape=Tensor, dtype={operand_dtype}), "
else:
- operand_info += f"Operand(type={operand_type}, shape={operand_shape}, dtype={operand_dtype}), "
- operand_info += "]"
- return operand_info
+ operands_info += f"Operand(type={operand_type}, shape={operand_shape}, dtype={operand_dtype}), "
+ operands_info += "]"
+ return operands_info
else:
return "OperandsInfo is empty!"
-class OpArgsOpNames:
+class OpArgsOpMetadata:
"""
- Stores OpArgs and associated operand names.
+ Stores Operation Args and associated metadata.
- Initializes OpArgsOpNames with a given OpArgs and operand names.
+ Initializes OpArgsOpMetadata with a given OpArgs and operation metadata like operand_names.
Args:
- args (OpArgs): The OpArgs object to associate with operand names.
- operand_names (list): List of operand names to associate with args.
+ args (OpArgs): The OpArgs object to associate with operation metadata.
+ operation_metadata (Dict): Operation metadata to associate with args.
Data Members:
- opargs_opnames (list of tuples): Each tuple contains an OpArgs object and a list of operand names.
+ op_args_and_metadata (list of tuples): Each tuple contains an OpArgs object and a dict of operation metadata.
"""
- def __init__(self, args: OpArgs, operand_names: List[str]):
- self.opargs_opnames = [(args, [operand_names])]
+ def __init__(self, args: OpArgs, operation_metadata: Dict[str, Any]):
+ operation_metadata = self.transform_operation_metadata(operation_metadata)
+ self.op_args_and_metadata = [(args, operation_metadata)]
+
+ def get_op_args_and_metadata(self):
+ return self.op_args_and_metadata
- def get_opargs_opnames(self):
- return self.opargs_opnames
+ def transform_operation_metadata(self, operation_metadata):
+ new_operation_metadata = {}
+ for name, value in operation_metadata.items():
+ new_operation_metadata[name] = [value]
+ return new_operation_metadata
- def update(self, new_args, new_operand_names):
+ def update(self, new_args, new_operation_metadata):
"""
- Append operand names if arguments match, otherwise adds new OpArgs and operand names.
+ Append Operation metadata if arguments match, otherwise adds new OpArgs and Operation metadata.
Args:
new_args (OpArgs): New arguments to match against existing ones.
- new_operand_names (list): New operand names to associate if new_args matches.
+ new_operation_metadata (list): New operation metadata to associate if new_args matches.
"""
args_matched = False
- for idx, (arg, opnames_list) in enumerate(self.opargs_opnames):
+ for idx, (arg, metadata) in enumerate(self.op_args_and_metadata):
if (arg.is_empty() and new_args.is_empty()) or arg == new_args:
- self.opargs_opnames[idx][1].append(new_operand_names)
+ for name, value in new_operation_metadata.items():
+ if value not in self.op_args_and_metadata[idx][1][name]:
+ self.op_args_and_metadata[idx][1][name].append(value)
args_matched = True
break
if not args_matched:
- self.opargs_opnames.append((new_args, [new_operand_names]))
+ new_operation_metadata = self.transform_operation_metadata(new_operation_metadata)
+ self.op_args_and_metadata.append((new_args, new_operation_metadata))
def __str__(self):
- if len(self.opargs_opnames) > 0:
- uniqueoperation_info = ""
- for idx, (args, opnames_list) in enumerate(self.opargs_opnames, start=1):
- uniqueoperation_info += f"\t\t\t\t {idx})" + str(args) + "\n"
- for opnames_idx, opnames in enumerate(opnames_list):
- uniqueoperation_info += f"\t\t\t\t\t\t {opnames_idx})" + str(opnames) + "\n"
- return uniqueoperation_info
+ if len(self.op_args_and_metadata) > 0:
+ op_args_and_metadata_info = ""
+ for idx, (args, metadata) in enumerate(self.op_args_and_metadata, start=1):
+ op_args_and_metadata_info += f"\t\t\t\t {idx})Opargs(" + str(args) + ")\n"
+ for metadata_name, metadata_values in metadata.items():
+ op_args_and_metadata_info += f"\t\t\t\t\t\t" + str(metadata_name) + ":\n"
+ for metadata_value_idx, metadata_value in enumerate(metadata_values):
+ op_args_and_metadata_info += (
+ f"\t\t\t\t\t\t\t\t {metadata_value_idx})" + str(metadata_value) + "\n"
+ )
+ return op_args_and_metadata_info
else:
- return "OpArgsOpNames is empty!"
+ return "OpArgsOpMetadata is empty!"
class UniqueOperationInfo:
"""
- Stores operands and argument associated with operand names.
+ Stores operands and argument associated with operation metadata.
Args:
operands (OperandsInfo): Information about operand types, shapes, and dtypes.
- oparg_opnames (OpArgsOpNames): Argument associated with the operand names.
+ opargs_opmetadata (OpArgsOpMetadata): Argument associated with the operation metadata.
Data Members:
- unique_operands_and_opargs_opnames (list of tuples): Each tuple contains an OperandsInfo object
- and an OpArgsOpNames object.
+ unique_operands_and_opargs_opmetadata (list of tuples): Each tuple contains an OperandsInfo object
+ and an OpArgsOpMetadata object.
"""
- def __init__(self, operands: OperandsInfo, oparg_opnames: OpArgsOpNames):
- self.unique_operands_and_opargs_opnames = [(operands, oparg_opnames)]
+ def __init__(self, operands: OperandsInfo, opargs_opmetadata: OpArgsOpMetadata):
+ self.unique_operands_and_opargs_opmetadata = [(operands, opargs_opmetadata)]
- def get_unique_operands_and_opargs_opnames(self):
- return self.unique_operands_and_opargs_opnames
+ def get_unique_operands_and_opargs_opmetadata(self):
+ return self.unique_operands_and_opargs_opmetadata
- def add_operands_args(self, new_operands, new_args, new_operand_names):
+ def add_operands_args(self, new_operands, new_args, new_operation_metadata):
"""
- Adds or updates operandsInfo and Opargs and operand names.
+ Adds or updates operandsInfo and Opargs and Operation metadata.
Args:
new_operands (OperandsInfo): Operands information.
new_args (OpArgs): Operation arguments.
- new_operand_names (list): Operand names.
+ new_operation_metadata (Dict): Operation metadata.
"""
operands_matched = False
- for idx, (operands, oparg_opnames) in enumerate(self.unique_operands_and_opargs_opnames):
+ for idx, (operands, opargs_opmetadata) in enumerate(self.unique_operands_and_opargs_opmetadata):
if operands == new_operands:
operands_matched = True
- self.unique_operands_and_opargs_opnames[idx][1].update(new_args, new_operand_names)
+ self.unique_operands_and_opargs_opmetadata[idx][1].update(new_args, new_operation_metadata)
break
if not operands_matched:
- self.unique_operands_and_opargs_opnames.append((new_operands, OpArgsOpNames(new_args, new_operand_names)))
+ self.unique_operands_and_opargs_opmetadata.append(
+ (new_operands, OpArgsOpMetadata(new_args, new_operation_metadata))
+ )
def __str__(self):
- if len(self.unique_operands_and_opargs_opnames) > 0:
- uniqueoperation_info = ""
- for idx, (operands, oparg_opnames) in enumerate(self.unique_operands_and_opargs_opnames, start=1):
- uniqueoperation_info += f"\t\t {idx})" + str(operands) + "\n"
- uniqueoperation_info += str(oparg_opnames) + "\n"
- return uniqueoperation_info
+ if len(self.unique_operands_and_opargs_opmetadata) > 0:
+ unique_operation_info = ""
+ for idx, (operands, opargs_opmetadata) in enumerate(self.unique_operands_and_opargs_opmetadata, start=1):
+ unique_operation_info += f"\t\t {idx})" + str(operands) + "\n"
+ unique_operation_info += str(opargs_opmetadata) + "\n"
+ return unique_operation_info
else:
return "UniqueOperationInfo is empty!"
@@ -340,15 +368,18 @@ def validate_node_types(cls, operand_names, operand_types, node_name_to_node_typ
@classmethod
def create_unique_operations(
- cls, ops: Dict[int, Operation], node_name_to_node_type: Dict[str, NodeType], named_parameters
+ cls,
+ ops: Dict[int, Operation],
+ named_parameters: Dict[str, torch.Tensor],
+ node_name_to_node_type: Optional[Dict[str, NodeType]] = None,
):
"""
- Creates unique operations by mapping operand and argument information to function names.
+ Creates unique operations by mapping operand and argument information to forge op names.
Args:
ops (dict): Dictionary of operation.
- node_name_to_node_type (dict): Mapping of node names to types.
named_parameters (dict): Mapping of node name to model parameters and buffers.
+ node_name_to_node_type (dict): Mapping of node names to types.
Returns:
UniqueOperations: Populated UniqueOperations dictionary.
@@ -358,12 +389,18 @@ def create_unique_operations(
forge_op_function_name = ops[nid].function_name
operand_names = ops[nid].input_names
operand_types = ops[nid].input_node_types
- assert UniqueOperations.validate_node_types(
- operand_names, operand_types, node_name_to_node_type
- ), "Operand node types is not matching with node_name_to_node_type"
+ if node_name_to_node_type is not None:
+ assert UniqueOperations.validate_node_types(
+ operand_names, operand_types, node_name_to_node_type
+ ), "Operand node types is not matching with node_name_to_node_type"
operand_shapes = ops[nid].input_shapes
operand_dtypes = ops[nid].input_dtypes
args = ops[nid].args
+ metadata = ops[nid].metadata
+ operation_metadata = {"operand_names": operand_names}
+ if len(metadata) != 0:
+ for name, value in metadata.items():
+ operation_metadata[name] = value
assert (
len(operand_types) == len(operand_names)
and len(operand_names) == len(operand_shapes)
@@ -378,30 +415,30 @@ def create_unique_operations(
new_operands = OperandsInfo(operand_types, operand_shapes, operand_dtypes)
new_args = OpArgs(args)
if forge_op_function_name in unique_operations.keys():
- unique_operations[forge_op_function_name].add_operands_args(new_operands, new_args, operand_names)
+ unique_operations[forge_op_function_name].add_operands_args(new_operands, new_args, operation_metadata)
else:
unique_operations[forge_op_function_name] = UniqueOperationInfo(
- new_operands, OpArgsOpNames(new_args, operand_names)
+ new_operands, OpArgsOpMetadata(new_args, operation_metadata)
)
return unique_operations
def __str__(self):
if len(self) > 0:
- uniqueoperations_info = ""
+ unique_operations_info = ""
for forge_op_function_name, unique_operation in self.items():
- uniqueoperations_info += forge_op_function_name + ": \n"
- uniqueoperations_info += str(unique_operation) + "\n"
- return uniqueoperations_info
+ unique_operations_info += forge_op_function_name + ": \n"
+ unique_operations_info += str(unique_operation) + "\n"
+ return unique_operations_info
else:
return "UniqueOperations is empty!"
-def export_unique_op_tests_details_to_excel(module_name, unique_operation_data):
- headers = ["Framework", "Op", "Operands", "Args", "Testfile"]
+def export_unique_op_configuration_info(module_name, unique_operation_data, unique_ops_metadata):
+ headers = ["Op", "Operand_Names", "Operand_Shapes", "Operand_Types", "Operand_Dtypes", "Args", "Testfile"]
rows = []
for operation_info in unique_operation_data:
- rows.append(list(operation_info.values()))
+ rows.append([operation_info[header] for header in headers])
export_tvm_generated_unique_op_tests_details_dir_path = os.getenv(
"FORGE_EXPORT_TVM_GENERATED_UNIQUE_OP_TESTS_DETAILS_DIR_PATH", f"generated_modules/unique_ops/"
@@ -414,8 +451,15 @@ def export_unique_op_tests_details_to_excel(module_name, unique_operation_data):
export_tvm_generated_unique_op_tests_details_file_path = os.path.join(
export_tvm_generated_unique_op_tests_details_dir_path,
- "tvm_generated_op_test_details.xlsx",
+ "tvm_generated_unique_op_test_details.xlsx",
+ )
+
+ unique_ops_metadata_path = os.path.join(
+ export_tvm_generated_unique_op_tests_details_dir_path,
+ "tvm_generated_unique_ops_metadata.json",
)
+ with open(unique_ops_metadata_path, "w") as json_file:
+ json.dump(unique_ops_metadata, json_file, indent=4)
create_excel_file(
title=module_name,
@@ -457,9 +501,7 @@ def generate_unique_op_tests(
named_parameters.update(named_buffers)
# Extract unique operations by comparing operands types, shapes and dtypes and arguments if any
- unique_operations = UniqueOperations.create_unique_operations(ops, node_name_to_node_type, named_parameters)
-
- logger.info(f"Unique Operations:\n{unique_operations}")
+ unique_operations = UniqueOperations.create_unique_operations(ops, named_parameters, node_name_to_node_type)
def get_param_const(name):
for nid, param in params.items():
@@ -489,23 +531,41 @@ def get_param_const(name):
writer.write_header(include_pytest_imports=True)
# Get the unique operands and operation arguments assiocated the operand names
- unique_operands_and_opargs_opnames = unique_operations[
+ unique_operands_and_opargs_opmetadata = unique_operations[
forge_op_function_name
- ].get_unique_operands_and_opargs_opnames()
+ ].get_unique_operands_and_opargs_opmetadata()
pytest_input_shapes_and_dtypes_list = []
forge_module_names = []
module_idx = 0
forge_module_list = []
test_count = 0
- for operands_idx, (operands, opargs_opnames) in enumerate(unique_operands_and_opargs_opnames):
+ for operands_idx, (operands, opargs_opmetadata) in enumerate(unique_operands_and_opargs_opmetadata):
- for args_idx, (args, opnames_list) in enumerate(opargs_opnames.get_opargs_opnames()):
+ for args_idx, (args, operation_metadata) in enumerate(opargs_opmetadata.get_op_args_and_metadata()):
operand_types = operands.get_operand_types()
operand_shapes = operands.get_operand_shapes()
operand_dtypes = operands.get_operand_dtypes()
- operand_names = opnames_list[0]
+ operand_names = operation_metadata["operand_names"][0]
+
+ if compiler_cfg.export_tvm_generated_unique_op_tests_details:
+ operation_info = {}
+ operation_info["Op"] = forge_op_function_name
+ operation_info["Operand_Names"] = str(operand_names)
+ operation_info["Operand_Shapes"] = str(
+ [
+ operand_name if operand_type == NodeType.Constant else operand_shape
+ for operand_type, operand_shape, operand_name in zip(
+ operand_types, operand_shapes, operand_names
+ )
+ ]
+ )
+ operation_info["Operand_Types"] = str(
+ [NodeType.to_json(operand_type) for operand_type in operand_types]
+ )
+ operation_info["Operand_Dtypes"] = str(operand_dtypes)
+ operation_info["Args"] = str(args)
# Check if all operands types are parameters or constants and change the operand type from
# parameters or constants to activation and pass it as activation to forge module forward function
@@ -670,25 +730,7 @@ def get_param_const(name):
pytest_input_shapes_and_dtypes_list.append(pytest_input_shapes_dtypes)
if compiler_cfg.export_tvm_generated_unique_op_tests_details:
- operation_info = {}
- operands_info = []
- for node_type, name, shape, dtype in zip(
- operand_types, operand_names, operand_shapes, operand_dtypes
- ):
- name_or_shape_val = name if node_type == NodeType.Constant else shape
- operands_info.append(
- f"Operand(type={node_type.name}, name/shape={name_or_shape_val}, dtype={dtype})"
- )
- operation_info["Framework"] = framework
- operation_info["Op"] = op_name
- operation_info["Operands"] = "\n".join(operands_info)
- if args.is_empty():
- operation_info["Args"] = ""
- else:
- operation_info["Args"] = "\n".join(
- [f"{arg_name} : {arg_value}" for arg_name, arg_value in args.items()]
- )
- operation_info["tests"] = (
+ operation_info["Testfile"] = (
writer.module_directory
+ "/"
+ writer.filename
@@ -721,4 +763,11 @@ def get_param_const(name):
writer.close_file()
if compiler_cfg.export_tvm_generated_unique_op_tests_details:
- export_unique_op_tests_details_to_excel(current_module_name, unique_operation_details)
+ unique_ops_metadata = {
+ "framework": framework,
+ "module_name": current_module_name,
+ "param_file_name": param_file_name,
+ "named_params_file_name": named_params_file_name,
+ "named_buffers_file_name": named_buffers_file_name,
+ }
+ export_unique_op_configuration_info(current_module_name, unique_operation_details, unique_ops_metadata)
diff --git a/pytest.ini b/pytest.ini
index 385d606d9..fd0a35c91 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -14,6 +14,7 @@ markers =
nightly_sweeps: marks tests as nightly_sweeps
slow: marks tests as slow # deprecated - slow tests, should not be run in push pipeline
run_in_pp: marks tests as run_in_pp # deprecated - tests that should run in push pipeline
+ model_analysis: marks tests as model_analysis
# Where pytest should look for tests
testpaths =
diff --git a/scripts/model_analysis.py b/scripts/model_analysis.py
index 58613c4a9..7acc71021 100644
--- a/scripts/model_analysis.py
+++ b/scripts/model_analysis.py
@@ -4,6 +4,7 @@
import subprocess
import os
import time
+import json
from loguru import logger
import math
import argparse
@@ -13,6 +14,11 @@
from typing import Union, Dict, List, Tuple
from dataclasses import dataclass, asdict
import inspect
+import ast
+
+import torch
+
+from forge.tvm_unique_op_generation import Operation, NodeType, UniqueOperations
class CompilerComponent(IntEnum):
@@ -333,57 +339,6 @@ def match_rule(self, exception: str):
]
-@dataclass
-class ModelVariantInfo:
- """
- Stores information about a model, variant, framework of the model, including its support rates for different compiler components.
-
- Attributes:
- model_name (str): The name of the model.
- variant_name (str): The name of the model variant.
- framework (str): The framework used for the model.
- forge_support_rate (float): The support rate for the Forge compiler component. Defaults to 0.0.
- mlir_support_rate (float): The support rate for the MLIR compiler component. Defaults to 0.0.
- ttmetal_support_rate (float): The support rate for the TT_METAL compiler component. Defaults to 0.0.
- unknown_rate (float): The support rate for an unknown compiler component. Defaults to 0.0.
- """
-
- model_name: str
- variant_name: str
- framework: str
- forge_support_rate: float = 0.0
- mlir_support_rate: float = 0.0
- ttmetal_support_rate: float = 0.0
- unknown_rate: float = 0.0
- last_update_datetime: str = ""
-
- def get_support_rate(self, compiler_component: CompilerComponent):
- # Check and return the appropriate support rate based on the compiler component.
- if compiler_component == CompilerComponent.FORGE:
- return self.forge_support_rate
- elif compiler_component == CompilerComponent.MLIR:
- return self.mlir_support_rate
- elif compiler_component == CompilerComponent.TT_METAL:
- return self.ttmetal_support_rate
- elif compiler_component == CompilerComponent.UNKNOWN:
- return self.unknown_rate
- else:
- logger.error(f"There is no compilercomponent {compiler_component.name}")
-
- def update_support_rate(self, compiler_component: CompilerComponent, support_rate: float):
- # Update the appropriate support rate based on the compiler component.
- if compiler_component == CompilerComponent.FORGE:
- self.forge_support_rate = support_rate
- elif compiler_component == CompilerComponent.MLIR:
- self.mlir_support_rate = support_rate
- elif compiler_component == CompilerComponent.TT_METAL:
- self.ttmetal_support_rate = support_rate
- elif compiler_component == CompilerComponent.UNKNOWN:
- self.unknown_rate = support_rate
- else:
- logger.error(f"There is no compilercomponent {compiler_component.name}")
-
-
class UniqueOpTestInfo:
"""
Represents information about a unique operation test, that includes op name, operands
@@ -391,8 +346,8 @@ class UniqueOpTestInfo:
Attributes:
Op (str): The name of the operation.
- Operands (str): The operands associated with the operation.
- Args (str): Operation Arguments if any
+ Operands (List[str]): List of operands associated with the operation.
+ Args (List[str]): List of Operation Arguments if any
components (dict): A dictionary indicating the support status for each compiler component.
failure_reason (str): The reason for failure, if any, during testing.
"""
@@ -400,28 +355,45 @@ class UniqueOpTestInfo:
def __init__(
self,
Op: str,
- Operands: str,
- Args: str,
+ Operands: List[str],
+ Args: List[str],
):
self.Op = str(Op)
- self.Operands = str(Operands)
- self.Args = " " if pd.isna(Args) else str(Args)
+ self.Operands = Operands
+ self.Args = Args
self.components = {}
for compiler_component in CompilerComponent:
self.components[str(compiler_component.name)] = False
self.failure_reason = ""
@classmethod
- def create_from_dict(cls, data: Dict[str, str]):
+ def create(cls, op_name, operand_names, operand_types, operand_shapes, operand_dtypes, args):
+
+ operands = UniqueOpTestInfo.create_operands(operand_names, operand_types, operand_shapes, operand_dtypes)
- # Extract the names of parameters for the __init__ method (excluding 'self').
- unique_op_test_info_params = list(inspect.signature(cls.__init__).parameters.keys())[1:]
+ args = UniqueOpTestInfo.create_args(args)
- # Filter the dictionary to include only relevant keys for initialization.
- unique_op_test_data = {key: data[key] for key in unique_op_test_info_params if key in data}
+ return cls(Op=op_name, Operands=operands, Args=args)
- # Create and return an instance of the UniqueOpTestInfo class.
- return cls(**unique_op_test_data)
+ @classmethod
+ def create_operands(cls, operand_names, operand_types, operand_shapes, operand_dtypes):
+ operands = []
+ for operand_name, operand_type, operand_shape, operand_dtype in zip(
+ operand_names, operand_types, operand_shapes, operand_dtypes
+ ):
+ if isinstance(operand_shape, torch.Tensor):
+ operands.append(f"Operand(type={operand_type}, name={operand_name}, dtype={operand_dtype})")
+ else:
+ operands.append(f"Operand(type={operand_type}, shape={operand_shape}, dtype={operand_dtype})")
+ return operands
+
+ @classmethod
+ def create_args(cls, args):
+ arg_info = []
+ if not args.is_empty():
+ for arg_name, arg_value in args.items():
+ arg_info.append(f"{arg_name} : {arg_value}")
+ return arg_info
def update_compiler_components(self, error_message: str = ""):
if error_message:
@@ -450,6 +422,72 @@ def __str__(self):
return f"UniqueOpTestInfo(op={self.Op}, Operands={self.Operands}, Args={self.Args}, components={self.components}, self.failure_reason={self.failure_reason})"
+@dataclass
+class ModelVariantInfo:
+ """
+ Stores information about a model, variant, framework of the model, including its support rates for different compiler components.
+
+ Attributes:
+ model_name (str): The name of the model.
+ variant_name (str): The name of the model variant.
+ framework (str): The framework used for the model.
+ unique_ops (List[UniqueOpTestInfo]): List of unique op configuration test info
+ forge_support_rate (float): The support rate for the Forge compiler component. Defaults to 0.0.
+ mlir_support_rate (float): The support rate for the MLIR compiler component. Defaults to 0.0.
+ ttmetal_support_rate (float): The support rate for the TT_METAL compiler component. Defaults to 0.0.
+ unknown_rate (float): The support rate for an unknown compiler component. Defaults to 0.0.
+ """
+
+ model_name: str
+ variant_name: str
+ framework: str
+ unique_ops: List[UniqueOpTestInfo]
+ forge_support_rate: float = 0.0
+ mlir_support_rate: float = 0.0
+ ttmetal_support_rate: float = 0.0
+ unknown_rate: float = 0.0
+ last_update_datetime: str = ""
+
+ def get_support_rate(self, compiler_component: CompilerComponent):
+ # Check and return the appropriate support rate based on the compiler component.
+ if compiler_component == CompilerComponent.FORGE:
+ return self.forge_support_rate
+ elif compiler_component == CompilerComponent.MLIR:
+ return self.mlir_support_rate
+ elif compiler_component == CompilerComponent.TT_METAL:
+ return self.ttmetal_support_rate
+ elif compiler_component == CompilerComponent.UNKNOWN:
+ return self.unknown_rate
+ else:
+ logger.error(f"There is no compilercomponent {compiler_component.name}")
+
+ def update_support_rate(self, compiler_component: CompilerComponent, support_rate: float):
+ # Update the appropriate support rate based on the compiler component.
+ if compiler_component == CompilerComponent.FORGE:
+ self.forge_support_rate = support_rate
+ elif compiler_component == CompilerComponent.MLIR:
+ self.mlir_support_rate = support_rate
+ elif compiler_component == CompilerComponent.TT_METAL:
+ self.ttmetal_support_rate = support_rate
+ elif compiler_component == CompilerComponent.UNKNOWN:
+ self.unknown_rate = support_rate
+ else:
+ logger.error(f"There is no compilercomponent {compiler_component.name}")
+
+ def __str__(self):
+ model_variant_info = ""
+ model_variant_info += f"\t\tModel : {model_name}\n"
+ model_variant_info += f"\t\tVariant : {variant_name}\n"
+ model_variant_info += f"\t\tframework : {framework}\n"
+ model_variant_info += f"\t\tforge_support_rate : {forge_support_rate}\n"
+ model_variant_info += f"\t\tmlir_support_rate : {mlir_support_rate}\n"
+ model_variant_info += f"\t\tttmetal_support_rate : {ttmetal_support_rate}\n"
+ model_variant_info += f"\t\tunknown_rate : {unknown_rate}\n"
+ model_variant_info += f"\t\tlast_update_datetime : {last_update_datetime}\n"
+ for idx, unique_op in enumerate(unique_ops):
+ model_variant_info += f"\t\t\t\t{idx}){str(unique_op)}\n"
+
+
class HtmlSymbol(Enum):
PASS = "✅" # Checkmark
FAIL = "❌" # Crossmark
@@ -491,7 +529,8 @@ def write_table(self, headers, rows):
markdown_table = tabulate(rows, headers, tablefmt="github", colalign=("center",) * len(headers))
self.write_line(markdown_table)
- def get_component_names_for_header(self, compiler_component: CompilerComponent):
+ @classmethod
+ def get_component_names_for_header(cls, compiler_component: CompilerComponent):
if compiler_component == CompilerComponent.FORGE:
return "Forge-Fe"
elif compiler_component == CompilerComponent.MLIR:
@@ -550,24 +589,32 @@ def check_path(directory_or_file_path: str):
return False
-def dump_logs(log_file_dir_path: str, log_file_name: str, content: str):
- os.makedirs(log_file_dir_path, exist_ok=True)
- log_file = os.path.join(log_file_dir_path, log_file_name)
- with open(log_file, "w") as f:
- f.write(content)
- logger.info(f"Dumped test logs in {log_file}")
+def dump_logs(log_files: Union[str, List[str]], content: str):
+ if isinstance(log_files, str):
+ log_files = [log_files]
+ for log_file in log_files:
+ log_file_dir_path = "/".join(log_file.split("/")[:-1])
+ os.makedirs(log_file_dir_path, exist_ok=True)
+ with open(log_file, "w") as f:
+ f.write(content)
+ logger.info(f"Dumped test logs in {log_file}")
-def collect_all_model_analysis_test(directory_or_file_path, output_directory_path):
+def collect_all_model_analysis_test(directory_or_file_path: str, output_directory_path: str):
+ """
+ Collect all the tests marked with the `model_analysis` marker in a specified directory or file.
+ """
+ # Ensure the directory or file path exists
assert check_path(
directory_or_file_path
), f"The directory path for collecting test {directory_or_file_path} doesn't exists"
- logger.info(f"Collecting all test that has model_analysis marker in {directory_or_file_path}")
+ logger.info(f"Collecting all the test that has model_analysis marker in {directory_or_file_path}")
collected_test_outputs = ""
try:
+ # Run pytest to collect tests with the `model_analysis` marker
result = subprocess.run(
["pytest", directory_or_file_path, "-m", "model_analysis", "--collect-only"],
capture_output=True,
@@ -575,18 +622,20 @@ def collect_all_model_analysis_test(directory_or_file_path, output_directory_pat
check=True,
)
- collected_test_outputs += "STDOUT:\n"
- collected_test_outputs += result.stdout
- collected_test_outputs += "STDERR:\n"
- collected_test_outputs += result.stderr
+ # Append stdout and stderr to the collected outputs
+ collected_test_outputs += "STDOUT:\n" + result.stdout
+ collected_test_outputs += "STDERR:\n" + result.stderr
except subprocess.CalledProcessError as e:
collected_test_outputs += e.output
- dump_logs(output_directory_path, "collected_tests.txt", collected_test_outputs)
+ # Save the collected test outputs to a file
+ collected_test_file_path = os.path.join(output_directory_path, "collected_tests.txt")
+ dump_logs(collected_test_file_path, collected_test_outputs)
+ # Extract tests from the collected test outputs
test_list = []
- with open(os.path.join(output_directory_path, "collected_tests.txt"), "r") as collected_test_file:
+ with open(collected_test_file_path, "r") as collected_test_file:
lines = collected_test_file.readlines()
test_lines = False
for line in lines:
@@ -601,7 +650,12 @@ def collect_all_model_analysis_test(directory_or_file_path, output_directory_pat
return test_list
-def generate_and_export_unique_ops_tests(test_directory_or_file_path, unique_ops_output_directory_path):
+def generate_and_export_unique_ops_tests(test_directory_or_file_path: str, unique_ops_output_directory_path: str):
+ """
+ Collect the test with model_analysis marker in the test_directory_or_file_path specified by the user
+ and then generate unique op test for all the collected test and return the list of directory path
+ containing exported models unique op configuration as xlsx file
+ """
# Collect all the pytest inside the test_directory_or_file_path specified by the user with model_analysis marker
test_list = collect_all_model_analysis_test(test_directory_or_file_path, unique_ops_output_directory_path)
@@ -651,36 +705,18 @@ def generate_and_export_unique_ops_tests(test_directory_or_file_path, unique_ops
return model_output_dir_paths
-def run_model_unique_op_tests_and_generate_markdowns(
- model_output_dir_paths, markdown_directory_path, dump_failure_logs
-):
+def extract_unique_op_tests_from_models(model_output_dir_paths: List[str], unique_ops_output_directory_path: str):
"""
- Execute unique operation tests for specified models, gather compiler support details and
- generate detailed Markdown reports summarizing the results.
-
- Workflow:
- 1. Locate and Process Model Variants:
- - Iterate through the list of model directories (`model_output_dir_paths`).
- - For each model:
- - Identify its variants by listing the contents of the model directory.
- - Search for the unique operation tests information file (`.xlsx`) for each variant.
- 2. Extract test details and Run unique operation tests:
- - Load the `.xlsx` file for each model variant and extract the operation test information (e.g., framework, ops, operands, args, and test files).
- - For each test in the extracted details:
- - Execute the test and track pass or fail status for each compiler component (e.g., Forge-FE, MLIR, Metal) based on the test results.
- - if `dump_failure_logs` is True, save the failure logs
- - Calculate the percentage of successful tests for each compiler component.
- 3. Generate Markdown Reports:
- - Sub Markdown Files:
- - Create a Markdown file for each model variant.
- - Include details about unique operation configurations, pass/fail status for each compiler component, and failure reasons.
- - Root Markdown File:
- - Summarize the results for all models in a single file (`ModelsInfo.md`).
- - Include details such as the model name, its variants, framework, and passing rate percentages for each compiler component.
+ Extract unique op configuration across all the models which will avoid running the redudant
+ op configuration again by using the exported unique op configuration test details and models metadata
"""
- # List to store information about all processed model variants
- models_details = []
+ # Dictionary to store all the operations found in model variants
+ models_operations = {}
+ unique_op_count = 0
+
+ # Dictionary to store constants (name and tensor) used in the model variants
+ models_contants = {}
# Iterate through all provided model directories
for model_output_dir_path in model_output_dir_paths:
@@ -696,51 +732,156 @@ def run_model_unique_op_tests_and_generate_markdowns(
model_variant_dir_path = os.path.join(model_output_dir_path, model_variant)
- # Look for a single `.xlsx` file containing unique operation test details
- model_variant_tvm_generated_op_test_file = [
- f for f in os.listdir(model_variant_dir_path) if f.endswith(".xlsx")
- ]
- if len(model_variant_tvm_generated_op_test_file) != 1:
+ # Look for `.xlsx` and `.json` file containing unique operation details and metadata
+ model_variant_tvm_generated_unique_op_xslx_file_path = None
+ model_variant_tvm_generated_unique_op_metadata_file_path = None
+ for f in os.listdir(model_variant_dir_path):
+ if f.endswith(".xlsx"):
+ model_variant_tvm_generated_unique_op_xslx_file_path = os.path.join(model_variant_dir_path, f)
+ elif f.endswith(".json"):
+ model_variant_tvm_generated_unique_op_metadata_file_path = os.path.join(model_variant_dir_path, f)
+
+ # Skip if either `.xlsx` or `.json` file is missing
+ if (
+ model_variant_tvm_generated_unique_op_xslx_file_path is None
+ or model_variant_tvm_generated_unique_op_metadata_file_path is None
+ ):
continue
- # Read the `.xlsx` file for the model variant
- model_variant_tvm_generated_op_test_file_path = os.path.join(
- model_variant_dir_path, model_variant_tvm_generated_op_test_file[0]
- )
+ # Read the `.xlsx` file contains model variant unique op configuration details
model_variant_df = pd.read_excel(
- model_variant_tvm_generated_op_test_file_path,
+ model_variant_tvm_generated_unique_op_xslx_file_path,
header=0,
- usecols=["Framework", "Op", "Operands", "Args", "Testfile"],
+ usecols=[
+ "Op",
+ "Operand_Names",
+ "Operand_Shapes",
+ "Operand_Types",
+ "Operand_Dtypes",
+ "Args",
+ "Testfile",
+ ],
)
- # Create a UniqueOpTestInfo object to store details about model and variant name and framework of the model variant.
- model_variant_info: ModelVariantInfo = ModelVariantInfo(
- model_name=model_name,
- variant_name=model_variant,
- framework=model_variant_df["Framework"].unique()[0],
- )
+ # Read the `.json` file contains model variant metadata information
+ with open(model_variant_tvm_generated_unique_op_metadata_file_path, "r") as json_file:
+ model_variant_metadata = json.load(json_file)
- # List to store unique operation test results for the model variant
- model_variant_unique_op_tests_info = []
+ # Load model variants parameters and buffers as tensors from specified files
+ named_parameters = torch.load(model_variant_metadata["named_params_file_name"])
+ if model_variant_metadata["param_file_name"] is not None:
+ serialized_params = torch.load(model_variant_metadata["param_file_name"])
+ named_parameters.update(serialized_params)
+ named_buffers = torch.load(model_variant_metadata["named_buffers_file_name"])
+ named_parameters.update(named_buffers)
- # Iterate over each row in the DataFrame (each row corresponds to a test for a specific operation)
+ # Process each row in the `.xlsx` file to extract operation configurations
for index, row in model_variant_df.iterrows():
row = row.to_dict()
+ unique_op_count += 1
+
+ operand_names = ast.literal_eval(row["Operand_Names"])
+ operand_types = ast.literal_eval(row["Operand_Types"])
+ operand_types = [NodeType.from_json(operand_type) for operand_type in operand_types]
+
+ # Prepare metadata associated with the operation
+ metadata = {}
+ metadata["model_variant_info"] = {}
+ metadata["model_variant_info"]["model_name"] = model_name
+ metadata["model_variant_info"]["variant_name"] = model_variant_metadata["module_name"]
+ metadata["model_variant_info"]["framework"] = model_variant_metadata["framework"]
+ metadata["model_variant_info"]["Testfile"] = row["Testfile"]
+
+ # Create an Operation object with op name, shape, nodetype, dtype, arguments and operation metadata
+ models_operations[unique_op_count] = Operation(
+ function_name=row["Op"],
+ input_names=operand_names,
+ args=ast.literal_eval(row["Args"]),
+ input_shapes=ast.literal_eval(row["Operand_Shapes"]),
+ input_dtypes=ast.literal_eval(row["Operand_Dtypes"]),
+ input_node_types=operand_types,
+ metadata=metadata,
+ )
+
+ # Store tensor which has constant nodetype as operands
+ for operand_type, operand_name in zip(operand_types, operand_names):
+ if operand_type == NodeType.Constant:
+ models_contants[operand_name] = named_parameters[operand_name]
+
+ # Extract unique operation configuration configuration across all the model variants
+ unique_operations = UniqueOperations.create_unique_operations(models_operations, models_contants)
+
+ # Dump the extracted unique operation configurations across all the model variants to a log file
+ models_unique_op_config_file_path = os.path.join(
+ unique_ops_output_directory_path, "extracted_unique_configuration_across_models.log"
+ )
+ dump_logs(models_unique_op_config_file_path, str(unique_operations))
+
+ return unique_operations
+
+
+def run_models_unique_op_tests(unique_operations, unique_ops_output_directory_path, dump_failure_logs):
+ """
+ Run unique op configuration test that has been collected across all the models and populate the test result in the model variants
+ """
+
+ models_details = {}
+
+ # Iterate over each unique operation
+ for forge_op_function_name in sorted(unique_operations):
+
+ # Extract operation name from forge op function name
+ op_name = forge_op_function_name.split(".")[-1]
+
+ # Get the unique operands and operation arguments assiocated the operand metadata
+ unique_operands_and_opargs_opmetadata = unique_operations[
+ forge_op_function_name
+ ].get_unique_operands_and_opargs_opmetadata()
+
+ for operands, opargs_opmetadata in unique_operands_and_opargs_opmetadata:
+
+ for args, operation_metadata in opargs_opmetadata.get_op_args_and_metadata():
+
+ # Extract operands details such as names types, shapes, and data types
+ operand_types = [NodeType.to_json(operand_type) for operand_type in operands.get_operand_types()]
+ operand_shapes = operands.get_operand_shapes()
+ operand_dtypes = operands.get_operand_dtypes()
+
+ # Extract model varaiant info such as model, variant and framework name
+ model_variant_info_list = operation_metadata["model_variant_info"]
+ framework = model_variant_info_list[0]["framework"]
+ operand_names = operation_metadata["operand_names"][0]
# Create a UniqueOpTestInfo object to store details about the operation (name, operands, args)
- unique_op_test_info = UniqueOpTestInfo.create_from_dict(row)
+ unique_op_test_info = UniqueOpTestInfo.create(
+ op_name=op_name,
+ operand_names=operand_names,
+ operand_types=operand_types,
+ operand_shapes=operand_shapes,
+ operand_dtypes=operand_dtypes,
+ args=args,
+ )
# Extract the test file path
- test = row["Testfile"]
+ test = model_variant_info_list[0]["Testfile"]
logger.info(f"Running the test: {test}")
- # If dump_failure_logs is set to True, prepare the log file path for storing logs
+ # If dump_failure_logs is set to True, prepare the log file paths for storing logs
if dump_failure_logs:
- op_name = row["Op"] # Get the operation name
- log_file_dir_path = os.path.join(model_variant_dir_path, op_name)
-
- test_name = test.split("::")[-1] # Extract the test name from the test path
- log_file_name = str(test_name) + "_log.txt"
+ log_files = []
+ for model_variant_info in model_variant_info_list:
+ log_file_dir_path = os.path.join(
+ unique_ops_output_directory_path,
+ model_variant_info["model_name"],
+ model_variant_info["variant_name"],
+ op_name,
+ )
+ test_name = model_variant_info["Testfile"].split("::")[
+ -1
+ ] # Extract the test name from the test path
+ log_file_name = str(test_name) + "_log.txt"
+ log_file = os.path.join(log_file_dir_path, log_file_name)
+ log_files.append(log_file)
# Start the timer to measure test execution time
start_time = time.time()
@@ -752,7 +893,7 @@ def run_model_unique_op_tests_and_generate_markdowns(
check=True,
capture_output=True,
text=True,
- timeout=60,
+ timeout=180,
env=dict(
os.environ,
FORGE_DISABLE_REPORTIFY_DUMP="1",
@@ -780,7 +921,7 @@ def run_model_unique_op_tests_and_generate_markdowns(
# Save failure logs if dump_failure_logs is set to True
if dump_failure_logs:
- dump_logs(log_file_dir_path, log_file_name, error_message)
+ dump_logs(log_files, error_message)
else:
# If the test passed (return code is 0), update the UniqueOpTestInfo instance
@@ -792,13 +933,13 @@ def run_model_unique_op_tests_and_generate_markdowns(
except subprocess.TimeoutExpired as e:
elapsed_time = time.time() - start_time
- error_message = "Test timed out after 60 seconds"
+ error_message = "Test timed out after 180 seconds"
unique_op_test_info.update_compiler_components(error_message)
logger.info(f"\tFailed ({elapsed_time:.2f} seconds) due to {error_message}")
if dump_failure_logs:
- dump_logs(log_file_dir_path, log_file_name, error_message)
+ dump_logs(log_files, error_message)
# Do WH warm reset (potentially hang occurred)
logger.info("\tWarm reset...")
@@ -822,7 +963,7 @@ def run_model_unique_op_tests_and_generate_markdowns(
unique_op_test_info.update_compiler_components(error_message)
if dump_failure_logs:
- dump_logs(log_file_dir_path, log_file_name, error_message)
+ dump_logs(log_files, error_message)
# Handle unexpected exceptions
except Exception as ex:
@@ -834,136 +975,175 @@ def run_model_unique_op_tests_and_generate_markdowns(
logger.info(error_message)
if dump_failure_logs:
- dump_logs(log_file_dir_path, log_file_name, error_message)
+ dump_logs(log_files, error_message)
- # Append the current test's info to the list of tests for this model variant
- model_variant_unique_op_tests_info.append(unique_op_test_info)
+ # Update model details dictionary with variant name as key and ModelVariantInfo as values
+ for model_variant_info in model_variant_info_list:
+ if model_variant_info["variant_name"] in models_details.keys():
+ models_details[model_variant_info["variant_name"]].unique_ops.append(unique_op_test_info)
+ else:
+ models_details[model_variant_info["variant_name"]] = ModelVariantInfo(
+ model_name=model_variant_info["model_name"],
+ variant_name=model_variant_info["variant_name"],
+ framework=model_variant_info["framework"],
+ unique_ops=[unique_op_test_info],
+ )
- # Prepare the path for the Markdown file to store test results for this model variant
- model_variant_md_file_directory_path = os.path.join(markdown_directory_path, "Models", model_name)
+ # Calculate and update the compiler support rates for each component for all the model variants
+ for variant_name, model_variant_info in models_details.items():
+ for compiler_component in CompilerComponent:
+ compiler_component_passed_test_count = sum(
+ [
+ int(unique_op_test_info.components[str(compiler_component.name)])
+ for unique_op_test_info in model_variant_info.unique_ops
+ ]
+ )
+ total_num_of_test = len(model_variant_info.unique_ops)
+ compiler_component_pass_percentage = (
+ str(math.ceil((compiler_component_passed_test_count / total_num_of_test) * 100.0)) + " %"
+ )
+ models_details[variant_name].update_support_rate(compiler_component, compiler_component_pass_percentage)
- # Create a Markdown file for saving model variant unique op test information
- markdown_writer = MarkDownWriter(model_variant, model_variant_md_file_directory_path)
+ models_details[variant_name].last_update_datetime = time.strftime("%A, %d %b %Y %I:%M:%S %p", time.gmtime())
- # Write a heading for the HTML table in the model variant markdown file
- markdown_writer.write_html_table_heading("Unique ops configuration and compiler support info")
+ return models_details
- # Get the list of compiler component names to use in the table header
- compiler_component_names = [
- markdown_writer.get_component_names_for_header(compiler_component)
- for compiler_component in CompilerComponent
- ]
- # Define the table header with three main sections
- table_header = {
- "Operation Details": ["Name", "Operands", "Arguments"],
- "Component Passing Check": compiler_component_names,
- "Issues": ["Failure Reason"],
- }
+def generate_markdown(
+ markdown_file_name: str,
+ markdown_file_dir_path: str,
+ table_heading: str,
+ table_headers: Dict[str, List[str]],
+ table_rows: List[List[str]],
+):
+ """
+ Generates a Markdown file that contains an HTML table with the given headers and rows.
+ """
+ # Create a markdown file for summarizing the results for all models in a single file
+ markdown_writer = MarkDownWriter(markdown_file_name, markdown_file_dir_path)
- # List to store table rows
- table_rows = []
+ # Write a heading for the HTML table
+ markdown_writer.write_html_table_heading(table_heading)
- # Iterate over the unique operation test information to populate table rows
- for unique_op_test_info in model_variant_unique_op_tests_info:
+ # Generate and write the HTML table to the Markdown file
+ markdown_writer.create_html_table_and_write(headers=table_headers, rows=table_rows)
+
+ # Close the Markdown file after writing the table
+ markdown_writer.close_file()
- # Replace newline in Operands with line breaker and X character and Arguments with line breaker
- unique_op_test_info.Operands = unique_op_test_info.Operands.replace(
- "\n", "