From 356c2ba2f6cdf916c4e6fbe0702a3ecbd4006199 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 19:44:40 -0400 Subject: [PATCH 01/15] Fix a typo in docstring of pytorch_tensor.py --- src/converter/pytorch_tensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/converter/pytorch_tensor.py b/src/converter/pytorch_tensor.py index 631b54a0..1f27db5d 100644 --- a/src/converter/pytorch_tensor.py +++ b/src/converter/pytorch_tensor.py @@ -29,7 +29,7 @@ def is_valid(self) -> bool: Checks if the tensor data is valid. Returns: - bool: True if tensor_data is a list of exactly five integers, + bool: True if tensor_data is a list of exactly six integers, False otherwise. """ return ( From 73dcb13f2e40a37d011819ee3bcd76b3cc5cc768 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 20:20:12 -0400 Subject: [PATCH 02/15] Refactor chakra_converter --- src/converter/converter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/converter/converter.py b/src/converter/converter.py index 2fda617f..4f47a245 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -58,7 +58,11 @@ def main() -> None: converter = PyTorchConverter(args.input_filename, args.output_filename, logger) converter.convert() else: - logger.error(f"{args.input_type} unsupported") + supported_types = ["Text", "PyTorch"] + logger.error( + f"The input type '{args.input_type}' is not supported. " + f"Supported types are: {', '.join(supported_types)}." + ) sys.exit(1) except Exception: traceback.print_exc() From cd08d7ef9e9c5e86499749e9ddd501acc85e745e Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 19:58:37 -0400 Subject: [PATCH 03/15] Refactor PyTorchTensor --- src/converter/pytorch_tensor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/converter/pytorch_tensor.py b/src/converter/pytorch_tensor.py index 1f27db5d..91b9e468 100644 --- a/src/converter/pytorch_tensor.py +++ b/src/converter/pytorch_tensor.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - from typing import List From f2dd4914c21a267449619b3f7867cbc537b00432 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 20:05:36 -0400 Subject: [PATCH 04/15] Refactor PyTorchNode --- src/converter/pytorch_node.py | 56 +++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index 9740f59c..16c77c42 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 - +import traceback from enum import Enum from typing import Any, Dict, List, Optional @@ -22,22 +21,34 @@ class PyTorchNode: data_deps (List[PyTorchNode]): List of data-dependent parent nodes. children (List[PyTorchNode]): List of child nodes. gpu_children (List[PyTorchNode]): List of GPU-specific child nodes. - record_param_comms_node (Optional['PyTorchNode']): Corresponding record_param_comms node. - nccl_node (Optional['PyTorchNode']): Corresponding NCCL node. - id (int): Unique identifier of the node. + record_param_comms_node (Optional[PyTorchNode]): Corresponding + record_param_comms node. + nccl_node (Optional[PyTorchNode]): Corresponding NCCL node. + id (str): Identifier of the node. name (str): Name of the node. - parent (int): Control dependencies identifier. - inputs (Dict[str, Any]): Input data including values, shapes, and types. - outputs (Dict[str, Any]): Output data including values, shapes, and types. + parent (Any): Parent of the node. + inputs (Any): Inputs of the node. + outputs (Any): Outputs of the node. + inclusive_dur (Optional[float]): Inclusive duration of the node. + exclusive_dur (float): Exclusive duration of the node. + ts (Optional[float]): Timestamp of the node. + inter_thread_dep (Any): Inter-thread dependency of the node. + cat (Any): Category of the node. + stream (Any): Stream associated with the node. """ + SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4"] + def __init__(self, schema: str, node_data: Dict[str, Any]) -> None: """ - Initializes a PyTorchNode object using the node data and schema version provided. + Initializes a PyTorchNode object using the node data and schema version + provided. Args: - schema (str): The schema version based on which the node will be initialized. - node_data (Dict[str, Any]): Dictionary containing the data of the PyTorch node. + schema (str): The schema version based on which the node will be + initialized. + node_data (Dict[str, Any]): Dictionary containing the data of the + PyTorch node. """ self.schema = schema self.data_deps: List["PyTorchNode"] = [] @@ -71,15 +82,14 @@ def parse_data(self, node_data: Dict[str, Any]) -> None: Args: node_data (Dict[str, Any]): The node data to be parsed. """ - supported_versions = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4"] - if self.schema in supported_versions: + if self.schema in self.SUPPORTED_VERSIONS: if self.schema == "1.0.2-chakra.0.0.4" or self.schema == "1.0.3-chakra.0.0.4": self._parse_data_1_0_3_chakra_0_0_4(node_data) else: raise ValueError( f"Unsupported schema version '{self.schema}'. Please check " f"if the schema version is in the list of supported versions: " - f"{supported_versions}" + f"{self.SUPPORTED_VERSIONS}" ) def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: @@ -88,8 +98,6 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: self.parent = node_data["ctrl_deps"] self.inputs = node_data["inputs"] self.outputs = node_data["outputs"] - - # TODO: should be added as attributes self.inclusive_dur = node_data.get("inclusive_dur") self.exclusive_dur = node_data.get("exclusive_dur", 0) self.ts = node_data.get("ts") @@ -155,7 +163,8 @@ def add_gpu_child(self, gpu_child_node: "PyTorchNode") -> None: Adds a child GPU node for this node. Args: - gpu_child_node (Optional[PyTorchNode]): The child GPU node to be added. + gpu_child_node (Optional[PyTorchNode]): The child GPU node to be + added. """ self.gpu_children.append(gpu_child_node) @@ -164,7 +173,8 @@ def is_record_param_comms_op(self) -> bool: Checks if the node is a record_param_comms operator. Returns: - bool: True if the node is a record_param_comms operator, False otherwise. + bool: True if the node is a record_param_comms operator, False + otherwise. """ return "record_param_comms" in self.name @@ -234,4 +244,12 @@ def get_data_type_size(data_type: str) -> int: try: return data_type_size_map[data_type] except KeyError as e: - raise ValueError(f"Unsupported data type: {data_type}") from e + traceback_str = traceback.format_exc() + raise ValueError( + f"Unsupported data type: {data_type}. The data_type_size_map " + f"dictionary is used for mapping the number of bytes for a " + f"given tensor data type. This dictionary may be incomplete. " + f"Please update the data_type_size_map or report this issue " + f"to the maintainer by creating an issue. Traceback:\n" + f"{traceback_str}" + ) from e From 7587dbfb206989017d5c37524d36d9afb934b8c0 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 19:58:50 -0400 Subject: [PATCH 05/15] Add unit tests for PyTorchTensor --- requirements-dev.txt | 1 + tests/converter/test_pytorch_tensor.py | 59 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 tests/converter/test_pytorch_tensor.py diff --git a/requirements-dev.txt b/requirements-dev.txt index f7e265f2..f921deac 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ pyre-check==0.9.19 pyright==1.1.359 +pytest==8.1.1 ruff==0.3.5 diff --git a/tests/converter/test_pytorch_tensor.py b/tests/converter/test_pytorch_tensor.py new file mode 100644 index 00000000..e386482a --- /dev/null +++ b/tests/converter/test_pytorch_tensor.py @@ -0,0 +1,59 @@ +from src.converter.pytorch_tensor import PyTorchTensor, list_to_pytorch_tensor + + +def test_pytorch_tensor_initialization(): + """Test initialization of PyTorchTensor object.""" + tensor_data = [1, 2, 3, 4, 5, 6] + tensor = PyTorchTensor(tensor_data) + assert tensor.tensor_data == tensor_data + + +def test_pytorch_tensor_is_valid(): + """Test the is_valid method of PyTorchTensor.""" + valid_data = [1, 2, 3, 4, 5, 6] + invalid_data_1 = [1, 2, 3, 4, 5] # Less than 6 elements + invalid_data_2 = [1, 2, 3, 4, 5, 6, 7] # More than 6 elements + invalid_data_3 = [1, 2, 3, 4, 5, "a"] # Non-integer element + + valid_tensor = PyTorchTensor(valid_data) + invalid_tensor_1 = PyTorchTensor(invalid_data_1) + invalid_tensor_2 = PyTorchTensor(invalid_data_2) + invalid_tensor_3 = PyTorchTensor(invalid_data_3) + + assert valid_tensor.is_valid() is True + assert invalid_tensor_1.is_valid() is False + assert invalid_tensor_2.is_valid() is False + assert invalid_tensor_3.is_valid() is False + + +def test_pytorch_tensor_properties(): + """Test property methods of PyTorchTensor.""" + tensor_data = [1, 2, 3, 4, 5, 6] + tensor = PyTorchTensor(tensor_data) + + assert tensor.tensor_id == 1 + assert tensor.storage_id == 2 + assert tensor.offset == 3 + assert tensor.num_elem == 4 + assert tensor.elem_bytes == 5 + + +def test_pytorch_tensor_has_valid_storage_id(): + """Test has_valid_storage_id method of PyTorchTensor.""" + valid_storage_id_data = [1, 2, 3, 4, 5, 6] + invalid_storage_id_data = [1, 0, 3, 4, 5, 6] # storage_id = 0 + + valid_tensor = PyTorchTensor(valid_storage_id_data) + invalid_tensor = PyTorchTensor(invalid_storage_id_data) + + assert valid_tensor.has_valid_storage_id() is True + assert invalid_tensor.has_valid_storage_id() is False + + +def test_list_to_pytorch_tensor(): + """Test list_to_pytorch_tensor function.""" + tensor_data = [1, 2, 3, 4, 5, 6] + tensor = list_to_pytorch_tensor(tensor_data) + + assert isinstance(tensor, PyTorchTensor) + assert tensor.tensor_data == tensor_data From 23fb517486000a32a0a8ee85154b1f079590811b Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 21:05:31 -0400 Subject: [PATCH 06/15] Add unit tests for PyTorchNode --- tests/converter/test_pytorch_node.py | 62 +++++++++++++++++++++++++++ tests/data/1.0.2-chakra.0.0.4.tgz | Bin 0 -> 8097 bytes 2 files changed, 62 insertions(+) create mode 100644 tests/converter/test_pytorch_node.py create mode 100644 tests/data/1.0.2-chakra.0.0.4.tgz diff --git a/tests/converter/test_pytorch_node.py b/tests/converter/test_pytorch_node.py new file mode 100644 index 00000000..dda058b2 --- /dev/null +++ b/tests/converter/test_pytorch_node.py @@ -0,0 +1,62 @@ +import json +import tarfile +from pathlib import Path +from typing import Any, Dict + +import pytest + +from src.converter.pytorch_node import PyTorchNode + + +@pytest.fixture +def extract_tar_gz_file(tmp_path: Path) -> Path: + """ + Fixture to extract a tar.gz file to a temporary directory. + + Args: + tmp_path (Path): Temporary directory path provided by pytest. + + Returns: + Path: Path to the extracted directory. + """ + tar_gz_file = Path("tests/data/1.0.2-chakra.0.0.4.tgz") + extracted_dir = tmp_path / "extracted" + extracted_dir.mkdir() + + with tarfile.open(tar_gz_file, "r:gz") as tar: + tar.extractall(path=extracted_dir) + + return extracted_dir + + +def load_pytorch_execution_traces(file_path: str) -> Dict[str, Any]: + """ + Loads PyTorch execution traces from a file. + + Args: + file_path (str): Path to the PyTorch execution trace file. + + Returns: + Dict[str, Any]: Parsed PyTorch execution trace data. + """ + with open(file_path, "r") as pytorch_et: + return json.load(pytorch_et) + + +def test_pytorch_node_parsing(extract_tar_gz_file: Path) -> None: + """ + Test to check if PyTorchNode can parse nodes properly from the extracted data. + + Args: + extract_tar_gz_file (Path): Path to the extracted directory containing + the PyTorch execution trace file. + """ + pytorch_trace_file = extract_tar_gz_file / "1.0.2-chakra.0.0.4.json" + pytorch_et_data = load_pytorch_execution_traces(str(pytorch_trace_file)) + + pytorch_schema = pytorch_et_data["schema"] + pytorch_nodes = pytorch_et_data["nodes"] + + for node_data in pytorch_nodes: + node = PyTorchNode(pytorch_schema, node_data) + assert node is not None # Check if node is instantiated properly diff --git a/tests/data/1.0.2-chakra.0.0.4.tgz b/tests/data/1.0.2-chakra.0.0.4.tgz new file mode 100644 index 0000000000000000000000000000000000000000..ad84aba8f1ae86ebee3d08322b274cc33add1edb GIT binary patch literal 8097 zcmZvgRZtvVxV3Q)1a}A|xI>VE;1&oToM6F&4epkK0GS~}aJRwT-QC?KxCaQ%pa;Hl zs{Wh5s;|1bc6E0>@3Yq4v|tQ`HcS~y1mvSs0~bnp?;rchIZ@1l4GOGU4IQ1atRnnb zc|N0Oi<}GkOU??5hC|_VzcraLP-J!ANg!viFPtg+EBY(@Yg;0yQDrsd>*hFW?Ku4*J2LPHv@5Pqmh@N!N3+?j@+8 zBW5QuQ65={X$s@O8+LJtfsQwtH&h9Zq(zsWdSbDc1FjPnUfX%pw1W2!S32a)J|}}q ztB*sg)c410YwMo8H&dG(#^_CT2Nj?I<|il~?w`B4Ge7vL&uaBuoJZT;@zPPcfbnU- z_S2Kq$!GsMpZh)#C?K7E6t{18H*cz|YTw)Uy%F@J+1K;r(cAm#VZase;M^c&ptV}$ z%ZR?s2j49kd)oinOf@!F1;1E>!}slKaJ!!TPCQ@%w{_8%x4v$>FenM>;Z)DY_WH(+ zr$8c2oH1PeeD(1`+`ryu=j14DfAz2Ed71w$GmguJQ<9wg>G{{QQ&OgF!W4h@t3reG ztKHMXvb;^${u%7J6q)J?_BvDaDNmh>7RQL~W?{K5?M@sE-Rf{*;es$G^Bs7DegJnU z;9YQ3ewc&B`dY`s*opsL&)oIXAvCnC@9^?0(y~pJ=1&u)_`O32?ZeI9HmvNJN&LRD zRq*jPS!b;6Y3#4I1`bX9>UsyB@@mkxLIWWg<*`7&j+?E}=kGl=rVYV6@#AT_i+B9> zbOZNxy11+pSM}Fx_722$y_h-F89P1L06d;lonAv#Xj4SJFYlcu&d$&7jGMi@Jg%?c zD{WsMPmf!gV(J_NT0=Ctz40mhG5vIE41{IT9cP=}Xdj$PpKM|>gchMsUt^^1^EQv> z3_u+~en4VcM5qUvv<7aM3HOI|>ezt9ZLgovXI%%)`C{bpPe1Fr_5?VgC-*7@s~zf> z3p3b_G$lIQc`^zK+X|4*j9Z@bo{4Jji-8i>&il(*fk`TdRsd%kQ0$NYD$}mxEO>is z>j>`b^g$$*b5+Gym5KCl;j5TFS6}r0(&a!Uj((b!xP7Hj+OJ=^yEB4v9Ze?y@wWL7 zAmh-o-aOJJ+Ph+JXQs{bBIv9mr})j$N_^a$>V8>C>EGl&f4}P?cBuR|SN+}nK@fPs zH`T#tcEp2=7ApW7O~)8`f4=A2oH9B`Z(N=9Sbc34##J9t8qi~|?*DZY+F$C|k#udF z@n8(PBc2Ng?SWe}buW^Z%nmKw%RYdAbgib@8=b5S`5VfsE6bm>i8B+wPb}4jkq-6r zz*84|YiXLF9$W2zRdzpA{r!YL1n?BL9!;GE<#qkDfpetcrbC&y^3GvrA!Rh9JU_9C zdN11&*?0P?NSC;hzEDM+3^PSf9g)U6jSbCAr6#1|3aSFDlO8JJ_%{cc0~=pk{Tsc! z(qEMo{TryV6{ZsV)2c03qZEcv&0exypSykPE!^f}d?v{-|Mf$tsywLBScSPw&qAb%`eR<&1+phHpR4@mzopqjx ztuQ-bd{xic$ew4#2g)*W&-$4s$0v<{E0L+n-d zxZxU|=2~BrsStc!CQv9N;S3`{5wkOb{s-tv1wmzvoI_2#fYh?M>L zz64;qbqkr6cnUXSp0sq~!=j#xo*aI0A<$1!)Jg!Ac;f6Qs`dcgJb}Pqj*)nMb#E_| zVH|g2EV)+A$mBO_BEi*#-`ucr2Ce!@fQzMxO2Mtwr2?#EA&fO2KiOSRLe?C$2H-g4 zJGk@{A@m1Gc@Oi7&gpGaz>@L0>y}|s&#^|W=&^1yR_=Ccfqe* zH(e>7l1_)wI*w;y_du$*q;&kaknAAfht{y-tsV=yYOm0IKhM0XNCM$i2PYgu<_sE` z20~Hx`ZyGcv5Uw#X!VPdO6MVJMNZZ`N!v(H_Bfj!9{_!CRFOh@>Wp6q1PxJ0eEbbm zQ@u|fgI>YKS54KM{j$)iaB+z3F#`<4qxS%(!?k{ERcD($&i7x1^{Hsfayuh3!q_8Y z#r=i#2rbvOdiN|b$p!HDM;bE&L~AVCHcCsTafeP6`rCZ%fbT_`roG^&6BjjMl(T18rb)&Nh-xHI`O;NQ^80RS;LHPv8Nt zdDkzD8h3lWHj74)BYTYeI3y~`-Mgg}c9XE%FAj11oXg3QzVPC_@WJ71+T z&YQln=D@OsNpp>C)7u!@F$N~=y8X**2Q~SiYC5knN}|9^{?NA7p__6;tcsYx!+6&c zVSwRuJhkCs?MwCkJDhd_GL`8D3NBt%%VVuS&SDkFut>dKfC$uM22K(DgK zB=hX3XES?yKt`=WdL|bt%WO0owBLA`f?tHTt{Ok+@g})zTk)sh^4qd zC?BrQQmWeB@Rj;5j#AzZWh(=lg0YwLR)-1sJ^S z4auc3$BQ9Tk&#X=vmHlY8gOlzzhLlT*+7CwS3&0Q7^3k7VwofK&l-wD-{*!qY3ks} zK2I2V9zl*dSC|TeVuSLVLcI$K3j1;K_KKeERk~PadoL@q*3x|&6fI}8HxWCOY!zwW zqm2jW_$1<^Vl#gL!;;kme@K%^QcR5y@?#ZzTJs00{E*%HAV^8ak1;EbX$GeHUSoG{QZ zV|14|{}`b@rVxR-jIO^z9?wG}D+3i=Qi#aMsANMb_cu~PB78%T*ua$C3!%cnU)5zK zwgVJQ)Nj8bjVe#EeZIR~6~)9~7+qYFR$lY(1*%AjDsP@3$(bQf57xi>>8U@CxM6wR zc1-AycM1`Y#%Cr1D)&G00;dzOOX}27_OggI| zr^uZfA@x@W$6r9S%(pfev8NK8^xCtR4ZGeaU1JUZ2{-7GRy31S31BPmYE-s)?=XgP z#n64UAo9_S6T@cM@B4fGKn>+ggZa=cWtQyUaqSIVrIj77+&c93!h#E04R2sP*WnYo zM`6FOyJruD_T;YC6Fv`GyWS;KJwHVDU9NS+@ zMYHnAnP)P;7_wxTwGzuigz#!e_aAQD7wRUc^>WE~a@qHm-nZIDKV*-$=u!3NE&0NK z*1B>Y;zxvdsp+6Gyr5)7Kx`h7H2RL!vmdKb@8Zo1N*Vv56rKOZwrzR^x)T7T^!lEXt)i7}zOHABe%Gk<99$A2{aiFlzY>xCwp|7d#F zt`pNP6 zo{}5BYONncMo1JP6mD_I6nwfvn!82KG!d&*n-Sig4K^%mP4A26obM?ccnaKC?{l4N&K& zREt)7n3ki&LG8A|l-?(=$)8?{TMCmp9_e6-@_;N`gN#iWRhKzwvMAZw+&*KwQT(w6 zNt?z>EoZjC1nIkxhAK1w-Khs|%&~lUdK?+i^I)Z46N&Li1{2W$kACeP!}zZG@an^5 z6a%_$sUaS`#(o&6^!|wDdT`cfQZl)cl!+RQ{--Z`6m6+vT*mW5ca(KfNXm~^t z6C@rIp*Qrw1B%>!g}=D$y`?0HQxVf(rvlhmmqh9JldZW0!b;lVNgMPI&MRc3dNQt% zje~FshQ-X+$F#eax-zA^`M6+L3F?yaz%=&^V|nR3Q+z##SR8OZ@l0T;F!C4qb5Q`1 zluO5-`utYcE;u{76(pEDWfwmrxBo_MGloRWqqc==Z<*6=#ePJp6 zQRp97HA`Ar%WJT#YPe!v)4*=aF~I-GPPI`*NNw`eKYXVX3sLu#;p|tSr}1&x2kiq+ z+0wCk&17mp$#o?3$D+oDi}9S0u?Tt;X{kzu{7-k7_zg5N=}l{t5XQy8r0|2#5BlrO zCR(3_6sMMDWkzAVEKqf@ji%K3FLjy{ZXBHb@Vg%QVKW)r)nY|S{ zYMYVntM((gpkEfWxJ|kYv-6i-qgbmQD9R_eA_P_wVFO^VE-Z+TYz=rjWqXUR*lMt& zOzNkGnvoI-xrn-R6Lw9P;*Y2d8;0l>b`_+~*vj+%z9W)E^^Lck?HMQ1`caWR$7t2+ z!$q!B*|rR)Jd!)VjmZX=2}t;$0zU^D`H}63&K(3kU4j|cV8kQ{TebDbjWU!B=AsE= zU;APO4#fsF7xt|v7seesycN~_3yh~AhElUXDUe{nO|z3kWO~8AN`k7>>?gQg9J?tH zFoCn7ywjI#(2aEn^2G(pgD{!!D{#Mek-4r+Nj{qE3^{_S!FU2VqshcxlW0y5=-J&T z6m4TRR5;RJ%T?>9(5tXz_Qe=x7v#GofTVjXRgm53Z{LL@aXoZ3pNtM)T(L^7^3J?0bu-A`#Cv6xbVHa2`#UuRxFq-v2^ zF%oLyrym^RtJH8Eeyp3d(tE<)oyYC3I=}(Y+kidIi6*2Hw*4(w6U{Z}tM{^?ix6E* zl&gSOOkgkun^&pE#}R5c3xme# z2PvePDo7is4jIWc?%BQx@8UiUKC`nHUVTEfZETTczB$cQ=_J*JU1vEaSyV}Cm8wx* zv1yh1sut9c_=noRIFanJ?+A?0bogTNroKMil?U_7uQG`CP!X2sytH~TijR74ZT&&801*z zIah)~*bVp+=o4Kh9zs;kU`_RrnE*jH7W75=?2i?G|Pd*}W0 zKfHnsRkXnrQm~Zd`1e2LNG52&_1;;QdLEd|uA>9}%uG6Nn(}hOK&ZB^gsQ=K87yDv5lybT@S6l_D&isxKKljf0o4|kRov>|1roIkSE4o0wnroJBQ(%4PYD*ayrdo^ zDVKNP3e4e(fn)j*?QblMdP7W8Kj+jO1i?r2pi)xU31Sz>m&{jRg%rFYh-_~*WCIkS zUMkKqFUvUy5BIm`muh zj2+-`0%=K7_zZiAfKtKLQ_Nx^ zl47yy9~xMg27LbZmC5 zHeYl;=E+u=~zyGuZfXhrwJ#2xr1 zNa+wP?NE@MYe?Au6&fD?+{Vl7`pI)xXeGZ=Qlt?dx8XW1=Q%8|Cvfo)ybYF-s5c^j zALD`xKD%>Nk9=g!ujFg@tIpVt+AolT=lF}7xs(0g&dMDROa3bO44M_@gnT3$G68V$_3tI z9VQD=-Mqzpo9JF!yoh7>U|GvKRF4r*yhV=^uqA-*u zrbRDZp>UvtYer9C)5vr$59ybyzp3Pt?T$OAx_{-;0L*gmWn1f3+l^9_#Szzn_6cp409aj}-xJIrEuB=P)ydAb0@N3*K2EbN zJycD8vwEf?WU2crdh+xfNkA{M^IRgbYv?!9!3OXU+XY1MMoUip=TO)GdaOxJq}ZGp zgF@R`=Bh#nj;=a*45IbGHka#w3QbRqobie|GjoLZ>w6%tFmI4F+AP)(+>2kMs#8>@;IceI509ZFZBU`|j=rn(<`D7rr+Qby1~+wQi`&&m2Q4HCWF zm)|P4@SVBm2%URN%V?vyTMWzXjJ;26k*?F=VZBGnIIb;w0QvF{KFzQwp zL0_U9@~A<(={@zhGu@Z9J1tMushm$txaGAEFbFRXB{lN3vMU^8NS$z zrsMs8n8iouh+VBuqJMtDjCi^)!46aKgsZzj{%&eU*Y>e65nDDdp&iPtI{(j7teap?Y9>F?(LOsJ8h}gp65X8AI$D)Uqi#LB-qdda0 zSI8IW_kPtr{DLUsrS`)cV~UCRQRSBJ8+o`Z!*3IapDFv)3|>QXbwHEbs{D1({t^3f z0t;_%AM4gl$d!WZH^0t6dzilVE3!&HiDxScl1hR*{*33he(!>bF@`8Hy(H$WUqy)l zugX{2=vx{K>};Ys>h{T(+P^FxB$swHFOS2P%37j@H%e?mD;rG0?e~15tG?F*{Cyk7 zlYuJJyvKFPQ&RWWQOR%l3!%5NA7l-88cVey265xH=$c)g7^7)mRnhr0f-U2BO#E=F z719GUxvTabos^f}fBVw=SzdbobRUFjI4V!<|9U^}OYc{st-9^U0FTC>&Z7O-`+ae$ ze6e6RkmcCuY_S)c@%>TAvBrl5VX-WKAce zma`sUVmt2)Qldvse^44K?%K6i^^Sl3D=XaBXz&9E3!FC50P3vl^xy{3ABo7T>ozcN z2wh`2QB6AZmn^tG=Vg~^e33a-lmEQly2x{0u9(V8aZq@90UULMoX+#zD^i-;n`Opg zK*k|NIql6U?7Z$-TkfRaLSu0lB&QS^ee_aLg4rx~C!kCGyj5F$D8LK9kDstA=T{Jf z2EAe(l3Ytr#7X);+^_{bYpiGZJi_@|V{4S#%}rw(4_V=N)*+|}EgZ*zoTqZA&wN4W zopMG7BMK^~pn{c|t5JI_lFw18%;%1($My{af_1F_>9dJB{tbSp-lfur)CocTQ%ZD@ zy5#X8GcLpxNPG0Wu`K;>V;O|6=M;+r$>*O5q!U4wvBUOuVR=th{0ee*OMnZ#OR(Zi zwbQF1*S%4;TZwjho`N|+76XZ#`WA$^R_?RtJLc)$D(d@$cI`Yx#~ILhGU!EO+^Ku$ zaP@yPK!OJ{WMhf$h{r5iu=mj8&Yt|V$AuY|M~1e?=uDqodP>^@^S%B4ug!&hSGfDF zw{U~**dUH|3zcfFqerb*ZJFL^q>l(-FFvZ|g090n_=dlcL<7ERHS$~tZ1Bkd literal 0 HcmV?d00001 From 10c85e9654a569ffe8e156360d498d50796b06a7 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 14 May 2024 19:47:45 -0400 Subject: [PATCH 07/15] Add GitHub actions workflow for running Python unit tests --- .github/workflows/python_tests.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/python_tests.yml diff --git a/.github/workflows/python_tests.yml b/.github/workflows/python_tests.yml new file mode 100644 index 00000000..399b5a91 --- /dev/null +++ b/.github/workflows/python_tests.yml @@ -0,0 +1,24 @@ +name: Python Unit Tests + +on: pull_request + +jobs: + python-tests: + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Setup Python Environment + uses: actions/setup-python@v2 + with: + python-version: '3.8' + + - name: Install Dependencies + run: | + pip install -r requirements-dev.txt + + - name: Run Unit Tests + run: | + python -m pytest -vv tests From 0a2b563a42f1b3a9bb7078cb040aba7c736d1179 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 10:31:47 -0400 Subject: [PATCH 08/15] Split classes from trace_link.py into separate files for improved modularity --- src/trace_link/kineto_operator.py | 107 +++ src/trace_link/trace_link.py | 1054 +------------------------- src/trace_link/trace_linker.py | 883 +++++++++++++++++++++ src/trace_link/unique_id_assigner.py | 73 ++ 4 files changed, 1064 insertions(+), 1053 deletions(-) create mode 100644 src/trace_link/kineto_operator.py create mode 100644 src/trace_link/trace_linker.py create mode 100644 src/trace_link/unique_id_assigner.py diff --git a/src/trace_link/kineto_operator.py b/src/trace_link/kineto_operator.py new file mode 100644 index 00000000..22a8c18f --- /dev/null +++ b/src/trace_link/kineto_operator.py @@ -0,0 +1,107 @@ +from typing import Any, Dict, Optional + +from param_bench.train.compute.python.tools.execution_trace import ( + Node as PyTorchOperator, +) + + +class KinetoOperator: + """ + Represents a single operator extracted from the Kineto trace. + + Attributes: + op_dict (Dict[str, Any]): Dictionary containing the operator data. + category (str): Category of the operator. + name (str): Name of the operator. + phase (Optional[str]): Phase of the operator. + inclusive_dur (int): Inclusive duration of the operator in microseconds. + exclusive_dur (int): Exclusive duration of the operator in microseconds. + timestamp (int): Timestamp of the operator in microseconds. + external_id (str): External ID associated with the operator. + ev_idx (str): Event index associated with the operator. + tid (int): Thread ID associated with the operator. + pytorch_op (Optional[PyTorchOperator]): Associated PyTorch operator. + parent_pytorch_op_id (Optional[int]): ID of the parent PyTorch operator. + inter_thread_dep (Optional[int]): ID of the latest CPU node from other + threads before the gap. + stream (Optional[int]): Stream ID associated with the operator. + rf_id (Optional[int]): Record function ID. + correlation (int): Correlation ID used to link CUDA runtime operations + with their GPU counterparts. + """ + + def __init__(self, kineto_op: Dict[str, Any]) -> None: + """ + Initializes a new instance of the KinetoOperator class. + + Args: + kineto_op (Dict[str, Any]): The dictionary representing the + operator data. + """ + self.op_dict = kineto_op + self.category = kineto_op.get("cat", "") + self.name = kineto_op.get("name", "") + self.phase = kineto_op.get("ph") + self.inclusive_dur = kineto_op.get("dur", 0) + self.exclusive_dur = kineto_op.get("dur", 0) + self.timestamp = kineto_op.get("ts", 0) + self.external_id = "" + self.ev_idx = "" + self.tid = kineto_op.get("tid", 0) + self.pytorch_op: Optional[PyTorchOperator] = None + self.parent_pytorch_op_id = None + self.inter_thread_dep: Optional[int] = None + self.stream: Optional[int] = None + self.rf_id: Optional[int] = None + self.correlation: int = None + + if "args" in kineto_op: + self.external_id = kineto_op["args"].get("External id") + self.ev_idx = kineto_op["args"].get("Ev Idx", "") + self.stream = kineto_op["args"].get("stream") + if "Record function id" in kineto_op["args"]: + self.rf_id = int(kineto_op["args"]["Record function id"]) + if "correlation" in kineto_op["args"]: + self.correlation = int(kineto_op["args"]["correlation"]) + + def is_valid( + self, + category: str, + name_exception: str = "ProfilerStep", + phase: Optional[str] = None, + ) -> bool: + """ + Checks if the operator matches specified filtering criteria. + + Args: + category (str): The category to check against. + name_exception (str): A name to exclude in the check. + phase (Optional[str]): The phase to check against, if any. + + Returns: + bool: True if the operator matches the criteria, False otherwise. + """ + return ( + self.category is not None + and name_exception not in self.name + and self.category == category + and (phase is None or self.phase == phase) + ) + + def __repr__(self) -> str: + """ + Represent the KinetoOperator as a string. + + Returns: + str: A string representation of the KinetoOperator. + """ + return ( + f"KinetoOperator(category={self.category}, " + f"name={self.name}, phase={self.phase}, " + f"inclusive_dur={self.inclusive_dur}, " + f"exclusive_dur={self.exclusive_dur}, " + f"timestamp={self.timestamp}, external_id={self.external_id}, " + f"ev_idx={self.ev_idx}, tid={self.tid}, " + f"rf_id={self.rf_id}, " + f"parent_pytorch_op_id={self.parent_pytorch_op_id})" + ) diff --git a/src/trace_link/trace_link.py b/src/trace_link/trace_link.py index 78b6970e..479ba799 100644 --- a/src/trace_link/trace_link.py +++ b/src/trace_link/trace_link.py @@ -1,1058 +1,6 @@ import argparse -import bisect -import copy -import json -import logging -import sys -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Dict, List, Optional, Tuple -from param_bench.train.compute.python.tools.execution_trace import ( - EXECUTION_TRACE_PROCESS_ANNOTATION, - EXECUTION_TRACE_THREAD_ANNOTATION, -) -from param_bench.train.compute.python.tools.execution_trace import ( - Node as PyTorchOperator, -) -from param_bench.train.compute.python.tools.utility import ( - load_execution_trace_file, - read_dictionary_from_json_file, -) - -# Increase the recursion limit for deep PyTorch execution traces. -sys.setrecursionlimit(10**6) - - -class KinetoOperator: - """ - Represents a single operator extracted from the Kineto trace. - - Attributes: - op_dict (Dict[str, Any]): Dictionary containing the operator data. - category (str): Category of the operator. - name (str): Name of the operator. - phase (Optional[str]): Phase of the operator. - inclusive_dur (int): Inclusive duration of the operator in microseconds. - exclusive_dur (int): Exclusive duration of the operator in microseconds. - timestamp (int): Timestamp of the operator in microseconds. - external_id (str): External ID associated with the operator. - ev_idx (str): Event index associated with the operator. - tid (int): Thread ID associated with the operator. - pytorch_op (Optional[PyTorchOperator]): Associated PyTorch operator. - parent_pytorch_op_id (Optional[int]): ID of the parent PyTorch operator. - inter_thread_dep (Optional[int]): ID of the latest CPU node from other - threads before the gap. - stream (Optional[int]): Stream ID associated with the operator. - rf_id (Optional[int]): Record function ID. - correlation (int): Correlation ID used to link CUDA runtime operations - with their GPU counterparts. - """ - - def __init__(self, kineto_op: Dict[str, Any]) -> None: - """ - Initializes a new instance of the KinetoOperator class. - - Args: - kineto_op (Dict[str, Any]): The dictionary representing the - operator data. - """ - self.op_dict = kineto_op - self.category = kineto_op.get("cat", "") - self.name = kineto_op.get("name", "") - self.phase = kineto_op.get("ph") - self.inclusive_dur = kineto_op.get("dur", 0) - self.exclusive_dur = kineto_op.get("dur", 0) - self.timestamp = kineto_op.get("ts", 0) - self.external_id = "" - self.ev_idx = "" - self.tid = kineto_op.get("tid", 0) - self.pytorch_op: Optional[PyTorchOperator] = None - self.parent_pytorch_op_id = None - self.inter_thread_dep: Optional[int] = None - self.stream: Optional[int] = None - self.rf_id: Optional[int] = None - self.correlation: int = None - - if "args" in kineto_op: - self.external_id = kineto_op["args"].get("External id") - self.ev_idx = kineto_op["args"].get("Ev Idx", "") - self.stream = kineto_op["args"].get("stream") - if "Record function id" in kineto_op["args"]: - self.rf_id = int(kineto_op["args"]["Record function id"]) - if "correlation" in kineto_op["args"]: - self.correlation = int(kineto_op["args"]["correlation"]) - - def is_valid( - self, - category: str, - name_exception: str = "ProfilerStep", - phase: Optional[str] = None, - ) -> bool: - """ - Checks if the operator matches specified filtering criteria. - - Args: - category (str): The category to check against. - name_exception (str): A name to exclude in the check. - phase (Optional[str]): The phase to check against, if any. - - Returns: - bool: True if the operator matches the criteria, False otherwise. - """ - return ( - self.category is not None - and name_exception not in self.name - and self.category == category - and (phase is None or self.phase == phase) - ) - - def __repr__(self) -> str: - """ - Represent the KinetoOperator as a string. - - Returns: - str: A string representation of the KinetoOperator. - """ - return ( - f"KinetoOperator(category={self.category}, " - f"name={self.name}, phase={self.phase}, " - f"inclusive_dur={self.inclusive_dur}, " - f"exclusive_dur={self.exclusive_dur}, " - f"timestamp={self.timestamp}, external_id={self.external_id}, " - f"ev_idx={self.ev_idx}, tid={self.tid}, " - f"rf_id={self.rf_id}, " - f"parent_pytorch_op_id={self.parent_pytorch_op_id})" - ) - - -class UniqueIdAssigner: - """ - Assigns unique IDs to items, ensuring each item gets a distinct ID. - - This class is used to maintain a consistent and unique mapping of original - identifiers to new unique identifiers. It's particularly useful in scenarios - where the uniqueness of IDs across different entities or iterations needs to - be preserved. - - Attributes: - next_id (int): The next unique ID to be assigned. - original_to_new_ids (Dict[int, int]): A mapping from original IDs to their - corresponding new unique IDs. This helps in retrieving already assigned - unique IDs and ensures the same original ID always maps to the same - unique ID. - """ - - def __init__(self) -> None: - """ - Initializes the UniqueIdAssigner with a starting ID of 0. - """ - self.next_id = 0 - self.original_to_new_ids: Dict[int, int] = {} - - def assign_or_retrieve_id(self, original_id: int) -> int: - """ - Assigns a new unique ID to the given original ID if it doesn't have one already; - otherwise, returns the previously assigned unique ID. - - Args: - original_id (int): The original ID for which a unique ID is needed. - - Returns: - int: A unique ID corresponding to the original ID. - """ - if original_id not in self.original_to_new_ids: - self.original_to_new_ids[original_id] = self.next_id - self.next_id += 1 - - return self.original_to_new_ids[original_id] - - def generate_new_id(self) -> int: - """ - Generates a new unique ID without needing an original ID. - - This is useful for cases where new entities are created that do not - have an existing identifier. - - Returns: - int: A new unique ID. - """ - unique_id = self.next_id - self.next_id += 1 - return unique_id - - def lookup_new_id(self, original_id: int) -> int: - """ - Retrieves the new unique ID for a given original ID, if it has been assigned. - - This method is useful for checking if a unique ID has already been - assigned to an original ID and retrieving it. - - Args: - original_id (int): The original ID to look up. - - Returns: - int: The new unique ID if it has been assigned, otherwise returns - the original ID. - """ - return self.original_to_new_ids.get(original_id, original_id) - - -class TraceLinker: - """ - Links PyTorch Execution Traces (ET) and Kineto Traces to generate PyTorch ET plus. - - This class handles the process of loading, processing, and linking - PyTorch Execution Traces with Kineto Traces, enriching the PyTorch - Execution Trace with detailed performance data. - - Attributes: - pytorch_et_file (str): Path to the PyTorch execution trace file. - kineto_file (str): Path to the Kineto trace file. - pytorch_ops (List[PyTorchOperator]): PyTorch operators from ET trace. - kineto_ops (List[KinetoOperator]): Kineto operators from the trace. - sorted_kineto_ops (List[KinetoOperator]): Sorted list of Kineto operators based on timestamps. - sorted_ts (List[int]): Sorted list of timestamps extracted from Kineto operators for efficient temporal queries. - kineto_ops_by_tid (Dict[int, List[KinetoOperator]]): Operators grouped by thread ID. - kineto_cuda_runtime (Dict[int, KinetoOperator]): Mapping of CUDA runtime - API calls to Kineto operators, indexed by their correlation ID. This - includes operations like `cudaLaunchKernel` and `cudaMemcpyAsync`, - crucial for mapping GPU activities back to their initiating CPU calls. - kineto_ac2g_s_ops (Dict[str, KinetoOperator]): Start ops for CPU to GPU. - kineto_ac2g_f_ops (Dict[str, KinetoOperator]): Final ops for CPU to GPU. - kineto_cpu_launcher_ops (Dict[str, KinetoOperator]): CPU launcher ops. - kineto_gpu_ops (List[KinetoOperator]): GPU operators. - kineto_process_start_time (int): Start time of the process, based on the - earliest operator timestamp. - kineto_process_end_time (int): End time of the process, based on the - latest operator timestamp. - kineto_thread_info (Dict[int, Tuple[int, int]]): Information about threads, - mapping thread IDs to a tuple of start and end times. - kineto_rf_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from - rf_id to KinetoOperator instances. - pytorch_op_id_to_kineto_ops_map (Dict[int, List[KinetoOperator]]): - Map from PyTorch op IDs to Kineto GPU ops. - pytorch_op_id_to_inclusive_dur_map (Dict[int, int]): Inclusive duration map for PyTorch ops. - pytorch_op_id_to_inclusive_dur_map (Dict[int, int]): Exclusive duration map for PyTorch ops. - pytorch_op_id_to_timestamp_map (Dict[int, int]): Timestamp map for PyTorch ops. - pytorch_op_id_to_inter_thread_dep_map (Dict[int, int]): Mapping of PyTorch - operator IDs to IDs of latest CPU node from other threads before the gap. - id_assigner (UniqueIdAssigner): Assigns unique IDs to operators. - pytorch_et_plus_data (Optional[Dict]): PyTorch ET plus data. - logger (logging.Logger): Logger for the class. - """ - - def __init__(self, pytorch_et_file: str, kineto_file: str, log_level: str = "INFO") -> None: - """ - Initializes the TraceLinker with paths to the PyTorch and Kineto trace files, - and a log level. - - Args: - pytorch_et_file (str): Path to the PyTorch execution trace file. - kineto_file (str): Path to the Kineto trace file. - log_level (str): Logging level for the class. - """ - self.pytorch_et_file = pytorch_et_file - self.kineto_file = kineto_file - self.pytorch_ops: List[PyTorchOperator] = [] - self.kineto_ops: List[KinetoOperator] = [] - self.sorted_kineto_ops: List[KinetoOperator] = [] - self.sorted_ts: List[int] = [] - self.kineto_ops_by_tid: Dict[int, List[KinetoOperator]] = {} - self.kineto_cuda_runtime: Dict[int, KinetoOperator] = {} - self.kineto_ac2g_s_ops: Dict[str, KinetoOperator] = {} - self.kineto_ac2g_f_ops: Dict[str, KinetoOperator] = {} - self.kineto_cpu_launcher_ops: Dict[str, KinetoOperator] = {} - self.kineto_gpu_ops: List[KinetoOperator] = [] - self.kineto_process_start_time: int = 0 - self.kineto_process_end_time: int = 0 - self.kineto_thread_info: Dict[int, Tuple[int, int]] = {} - self.kineto_rf_id_to_kineto_op_map: Dict[int, KinetoOperator] = {} - self.pytorch_op_id_to_kineto_ops_map: Dict[int, List[KinetoOperator]] = {} - self.pytorch_op_id_to_inclusive_dur_map: Dict[int, int] = {} - self.pytorch_op_id_to_exclusive_dur_map: Dict[int, int] = {} - self.pytorch_op_id_to_timestamp_map: Dict[int, int] = {} - self.pytorch_op_id_to_inter_thread_dep_map: Dict[int, int] = {} - self.id_assigner = UniqueIdAssigner() - self.pytorch_et_plus_data: Optional[Dict] = None - self.logger = logging.getLogger(__name__) - self.logger.setLevel(log_level.upper()) - - def load_traces(self) -> None: - """ - Loads both PyTorch Execution Traces and Kineto Traces. - This method is a high-level orchestrator that calls specific methods to load - and process the PyTorch and Kineto traces individually. - """ - self.load_pytorch_et() - self.load_kineto_trace() - - def load_pytorch_et(self) -> None: - """ - Loads and processes the PyTorch Execution Trace. - This method handles multiple iterations in the trace and extracts the nodes, - considering the specified annotation for segmenting the iterations. - """ - self.logger.info("Starting to load PyTorch Execution Trace.") - pytorch_et = load_execution_trace_file(self.pytorch_et_file) - - root_node = pytorch_et.get_nodes()[1] # Root node is usually 1-based - self.pytorch_ops = self.extract_pytorch_ops(root_node) - self.logger.info(f"Original ops in PyTorch ET: {len(self.pytorch_ops)}") - self.logger.info("PyTorch Execution Trace loaded successfully.") - - def extract_pytorch_ops(self, node: PyTorchOperator) -> List[PyTorchOperator]: - """ - Extracts and sorts nodes from the PyTorch execution trace recursively. - - This method traverses the execution trace starting from the provided node, - extracting all the operator nodes recursively, and then returns them sorted - by their identifiers. - - Args: - node (PyTorchOperator): Starting node for extraction. - - Returns: - List[PyTorchOperator]: Sorted list of extracted PyTorchOperator nodes. - """ - nodes = [] - - def traverse(node: PyTorchOperator): - nodes.append(node) - for child in node.children: - traverse(child) - - traverse(node) - return sorted(nodes, key=lambda x: x.id) - - def load_kineto_trace(self) -> None: - """ - Loads and processes the Kineto Trace. - This method parses the Kineto trace file, creating KinetoOperator instances - for each operator in the trace. It then categorizes and segments these - operators for further processing and linking with PyTorch operators. - """ - self.logger.info("Starting to load Kineto Trace.") - kineto_trace_data = read_dictionary_from_json_file(self.kineto_file) - sorted_kineto_ops = sorted( - [KinetoOperator(op) for op in kineto_trace_data["traceEvents"]], - key=lambda op: op.timestamp, - ) - - self.categorize_and_track_kineto_ops(sorted_kineto_ops) - self.construct_kineto_rf_id_map() - self.calculate_exclusive_dur() - - self.sorted_kineto_ops = sorted(self.kineto_ops, key=lambda op: op.timestamp) - self.sorted_kineto_ts = [op.timestamp for op in self.sorted_kineto_ops] - - self.logger.info( - f"Processed Kineto trace with {len(self.kineto_ops)} CPU ops, " - f"{len(self.kineto_cpu_launcher_ops)} CPU launcher ops, " - f"and {len(self.kineto_gpu_ops)} GPU ops." - ) - self.logger.info("Kineto Trace loaded successfully.") - - def categorize_and_track_kineto_ops(self, kineto_ops: List[KinetoOperator]) -> None: - """ - Categorizes Kineto operators based on their properties and assigns them to - corresponding groups for CPU, GPU, and other operations. - - Args: - kineto_ops (List[KinetoOperator]): List of Kineto operators to categorize. - - Raises: - ValueError: If duplicate correlation IDs are found in 'cuda_runtime' - category operators. - """ - self.logger.info("Categorizing Kineto operators and calculating timing boundaries.") - process_start_time = sys.maxsize - process_end_time = 0 - thread_info = {} - - for op in kineto_ops: - if op.is_valid("cpu_op") or op.is_valid("user_annotation"): - self.kineto_ops.append(op) - self.kineto_ops_by_tid.setdefault(op.tid, []).append(op) - self.logger.debug(f"Added CPU or user annotation op: {op.name}") - elif op.is_valid("ac2g", phase="s"): - self._add_op_to_dict(op, self.kineto_ac2g_s_ops, "id") - elif op.is_valid("ac2g", phase="f"): - self._add_op_to_dict(op, self.kineto_ac2g_f_ops, "id") - elif ( - op.is_valid("cuda_runtime") - and op.name - in [ - "cudaLaunchKernel", - "cudaLaunchKernelExC", - "cudaMemcpyAsync", - ] - ) or (op.category == "cuda_driver" and op.name in ["cuLaunchKernel"]): - self._add_op_to_dict(op, self.kineto_cpu_launcher_ops, "args", "External id") - self.logger.debug(f"Added CPU launcher op: {op.name}") - elif op.is_valid("kernel") or op.is_valid("gpu_memcpy"): - self.kineto_gpu_ops.append(op) - self.logger.debug(f"Added GPU op: {op.name}") - - if (op.category == "cuda_runtime") or (op.category == "cuda_driver"): - if op.correlation in self.kineto_cuda_runtime: - raise ValueError(f"Duplicate correlation ID {op.correlation} found in cuda_runtime operators.") - self.kineto_cuda_runtime[op.correlation] = op - - # Update timing boundaries - if op.tid is not None: - process_start_time = min(process_start_time, op.timestamp) - process_end_time = max(process_end_time, op.timestamp + op.inclusive_dur) - thread_start_end = thread_info.setdefault(op.tid, [sys.maxsize, 0]) - thread_start_end[0] = min(thread_start_end[0], op.timestamp) - thread_start_end[1] = max(thread_start_end[1], op.timestamp + op.inclusive_dur) - - # Apply collected timing info - self.kineto_process_start_time = process_start_time - self.kineto_process_end_time = process_end_time - self.kineto_thread_info = thread_info - self.logger.info("Kineto operators categorized and timing boundaries calculated.") - - def construct_kineto_rf_id_map(self) -> None: - """ - Constructs a map from rf_id to KinetoOperator instances. - """ - self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_ops if op.rf_id is not None} - - def calculate_exclusive_dur(self) -> None: - """ - Calculates the exclusive duration of each operator in the Kineto traces - in parallel. The exclusive duration is defined as the total duration of - the operator minus any time spent in child operators, effectively - representing the time spent exclusively in that operator. This approach - significantly improves the performance of calculating exclusive durations, - especially for traces with a large number of operators. Additionally, by - processing each thread's operators in parallel, the method takes advantage - of concurrent execution capabilities to further speed up the computation. - """ - self.logger.info("Calculating exclusive durations for Kineto operators in parallel.") - - def process_ops_for_thread(ops: List["KinetoOperator"]) -> None: - self.logger.info(f"Processing {len(ops)} operators in thread.") - sorted_ops = sorted(ops, key=lambda op: (op.timestamp, op.inclusive_dur)) - for i, op in enumerate(sorted_ops): - exclusive_dur = op.inclusive_dur - overlapping_regions = [] - - # Identify overlapping regions with child operators - for child_op in sorted_ops[i + 1 :]: - if child_op.timestamp >= op.timestamp and (child_op.timestamp + child_op.inclusive_dur) <= ( - op.timestamp + op.inclusive_dur - ): - overlap_start = child_op.timestamp - overlap_end = child_op.timestamp + child_op.inclusive_dur - overlapping_regions.append((overlap_start, overlap_end)) - if (op.timestamp + op.inclusive_dur) < child_op.timestamp: - break - - # Merge overlapping regions and calculate exclusive duration - merged_regions = self.merge_overlapping_intervals(overlapping_regions) - for start, end in merged_regions: - exclusive_dur -= end - start - - # Check if exclusive_dur is not negative or zero - if exclusive_dur < 0: - error_msg = ( - f"Exclusive duration calculation error for node " - f"'{op.name}' (ts: {op.timestamp}, " - f"inclusive_dur: {op.inclusive_dur}, " - f"rf_id: {op.rf_id}): " - f"Duration cannot be less than zero." - ) - self.logger.error(error_msg) - raise ValueError(error_msg) - - op.exclusive_dur = exclusive_dur - self.logger.debug( - f"Node '{op.name}' (ts: {op.timestamp}, " - f"inclusive_dur: {op.inclusive_dur}, " - f"rf_id: {op.rf_id}) " - f"exclusive duration: {op.exclusive_dur} microseconds." - ) - - with ThreadPoolExecutor() as executor: - futures = [executor.submit(process_ops_for_thread, ops) for ops in self.kineto_ops_by_tid.values()] - - for future in as_completed(futures): - future.result() # Wait for all threads to complete and handle any exceptions - - self.logger.info("Exclusive durations for Kineto operators calculated successfully.") - - @staticmethod - def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: - """ - Merges overlapping intervals into a single interval. - - Args: - intervals (List[Tuple[int, int]]): List of intervals. - - Returns: - List[Tuple[int, int]]: List of merged intervals. - """ - if not intervals: - return [] - - # Sort intervals based on the start time - intervals.sort(key=lambda x: x[0]) - merged = [intervals[0]] - - for current in intervals: - prev = merged[-1] - if current[0] <= prev[1]: - # There is overlap, merge the current interval with the previous one - merged[-1] = (prev[0], max(prev[1], current[1])) - else: - # No overlap, add the current interval - merged.append(current) - - return merged - - def _add_op_to_dict(self, op: KinetoOperator, target_dict: Dict, *keys: str) -> None: - """ - Adds an operator to a specific dictionary based on provided keys. - The method navigates through the operator's dictionary using the keys - and adds the operator to the target dictionary. - - Args: - op (KinetoOperator): The operator to be added. - target_dict (Dict): The dictionary to which the operator should be added. - *keys (str): Keys used to navigate through the operator's dictionary. - - Raises: - KeyError: If any of the keys are not found in the operator's dictionary. - """ - value = op.op_dict - for key in keys: - if key not in value: - error_msg = f"Key '{key}' not found in operator dictionary for op {op.name}." - self.logger.error(error_msg) - raise KeyError(error_msg) - value = value[key] - - target_dict[value] = op - - def enforce_inter_thread_order(self, threshold: int = 1000) -> None: - """ - Enforces order between groups of operators in different threads. In - Kineto traces with multiple threads, operators are executed in turns, - creating groups. This function identifies these groups by detecting - significant gaps in execution within each thread. It then establishes - dependencies between these groups across different threads, ensuring - the final Chakra execution traces reflect inter-thread dependencies - realistically. - - An isolated group is formed when there's a significant gap in execution - within a thread. Each new group relies on the last CPU operator from - other threads, enforcing order and dependency across threads. - - Args: - threshold (int): Threshold for significant gap detection in - microseconds, used to define group boundaries. - """ - self.logger.info("Enforcing inter-thread order in Kineto traces.") - - def process_thread( - tid: int, - ops: List[KinetoOperator], - ops_by_tid: Dict[int, List[KinetoOperator]], - ) -> None: - self.logger.info(f"Thread {tid}: Identifying gaps for dependency linking with threshold {threshold}us.") - sorted_ops = sorted(ops, key=lambda op: op.timestamp) - last_cpu_node_rf_id = None - - for i, op in enumerate(sorted_ops): - if ( - i == 0 - or (sorted_ops[i].timestamp - sorted_ops[i - 1].timestamp - sorted_ops[i - 1].inclusive_dur) - > threshold - ): - last_cpu_node_rf_id = self.find_last_cpu_node_before_timestamp(ops_by_tid, tid, op.timestamp) - if last_cpu_node_rf_id: - self.logger.debug( - f"Thread {tid}: Linking op '{op.name}' " - f"to CPU node before gap with rf_id " - f"'{last_cpu_node_rf_id}'." - ) - - if last_cpu_node_rf_id: - op.inter_thread_dep = last_cpu_node_rf_id - - with ThreadPoolExecutor() as executor: - futures = { - executor.submit(process_thread, tid, ops, self.kineto_ops_by_tid): tid - for tid, ops in self.kineto_ops_by_tid.items() - } - - for future in as_completed(futures): - tid = futures[future] - try: - future.result() - self.logger.debug(f"Thread {tid} dependencies processed.") - except Exception as e: - self.logger.error(f"Error processing thread {tid}: {e}") - - def find_last_cpu_node_before_timestamp( - self, - ops_by_tid: Dict[int, List[KinetoOperator]], - exclude_tid: int, - timestamp: int, - ) -> Optional[int]: - """ - Finds the last CPU node ID before a given timestamp in threads other - than the excluded one. This ID is used to establish dependencies - between groups across threads. - - Args: - ops_by_tid (Dict[int, List[KinetoOperator]]): Operators grouped by - thread ID. - exclude_tid (int): Thread ID to exclude from the search. - timestamp (int): Timestamp to compare against. - - Returns: - Optional[int]: The ID of the last CPU node found, or None if not found. - """ - self.logger.debug(f"Finding last CPU node before timestamp {timestamp} excluding thread {exclude_tid}.") - last_cpu_node = None - last_cpu_node_rf_id = None - latest_timestamp = 0 - for tid, ops in ops_by_tid.items(): - if tid != exclude_tid: - for op in sorted(ops, key=lambda op: op.timestamp): - if ( - (op.category in ["cpu_op", "user_annotation"]) - and (op.timestamp < timestamp) - and (op.timestamp > latest_timestamp) - ): - last_cpu_node = op - latest_timestamp = op.timestamp - last_cpu_node_rf_id = op.rf_id - if last_cpu_node: - self.logger.debug(f"Last CPU node before timestamp {timestamp} found: {last_cpu_node}") - return last_cpu_node_rf_id - - def link_traces(self) -> None: - """ - Initiates the linking process between PyTorch Execution Traces (ET) and - Kineto Traces to produce an enhanced PyTorch Execution Trace (ET+). This - process relies on the assumption of an 'exact match' between these traces. - """ - self.logger.info("Starting the process of linking PyTorch and Kineto traces.") - self.add_thread_and_process_annotations() - self.map_pytorch_to_kineto_ops() - self.construct_et_plus_data() - self.logger.info("Traces have been successfully linked.") - - def add_thread_and_process_annotations(self) -> None: - """ - Adds thread and process annotations to Kineto operators based on - previously tracked timing information. These annotations are crucial - for aligning Kineto operators with PyTorch ET nodes, ensuring - completeness and compatibility of trace data for analysis. This method - uses the process start and end times, as well as thread start and end - times, collected during the categorization process to insert - appropriate annotations directly into the Kineto operators list. - """ - self.logger.info("Adding process and thread annotations to Kineto operators.") - - # Insert process annotation operator. This operator represents the - # overall time span of the trace process. - process_annotation_op = KinetoOperator( - { - "name": EXECUTION_TRACE_PROCESS_ANNOTATION, - "ts": self.kineto_process_start_time, - "inclusive_dur": self.kineto_process_end_time - self.kineto_process_start_time, - "exclusive_dur": 0, # Process exclusive duration not applicable - } - ) - self.kineto_ops.insert(0, process_annotation_op) - self.logger.debug( - "Process annotation added with start time {} and duration {}.".format( - self.kineto_process_start_time, - self.kineto_process_end_time - self.kineto_process_start_time, - ) - ) - - # Insert thread annotation operators for each thread. These annotations - # are crucial for understanding thread-level execution within the trace. - for tid, (start_ts, end_ts) in self.kineto_thread_info.items(): - inclusive_dur = end_ts - start_ts - thread_annotation_op = KinetoOperator( - { - "name": EXECUTION_TRACE_THREAD_ANNOTATION, - "ts": start_ts, - "inclusive_dur": inclusive_dur, - # Exclusive duration is set to zero in the final annotation. - # This is to avoid constraining the execution schedule to the - # original trace, allowing more flexibility in analyzing - # dependencies without being bound by specific execution timings. - "exclusive_dur": 0, - } - ) - # Find the correct position to insert the thread annotation - position = next( - (i for i, op in enumerate(self.kineto_ops) if op.tid == tid and op.timestamp >= start_ts), - None, - ) - if position is not None: - self.kineto_ops.insert(position, thread_annotation_op) - else: - self.kineto_ops.append(thread_annotation_op) - self.logger.debug( - "Thread {} annotation added with start time {} and duration {}.".format(tid, start_ts, inclusive_dur) - ) - - def map_pytorch_to_kineto_ops(self) -> None: - """ - Maps PyTorch ET nodes to corresponding Kineto operators, ensuring - each PyTorch node has a matching Kineto operator. - """ - self.logger.info("Mapping PyTorch ET nodes to Kineto operators.") - cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers() - - pytorch_ops_count = len(self.pytorch_ops) - kineto_ops_count = len(self.kineto_ops) - if pytorch_ops_count > kineto_ops_count: - # The specific comment is placed within the if block as requested. - self.logger.warning( - f"Number of PyTorch operators ({pytorch_ops_count}) is larger " - f"than the number of Kineto operators ({kineto_ops_count}). " - f"It is expected that the number of PyTorch operators (CPU only) " - f"will be smaller than the number of Kineto operators (CPU and GPU)." - f" A warning is logged if this is not the case, which is a rare " - f"but possible scenario." - ) - - for _, pytorch_op in enumerate(self.pytorch_ops): - if (pytorch_op.rf_id is not None) and (pytorch_op.rf_id in self.kineto_rf_id_to_kineto_op_map): - kineto_op = self.kineto_rf_id_to_kineto_op_map[pytorch_op.rf_id] - if kineto_op is None: - self.logger.warning( - f"No corresponding Kineto op found for PyTorch op " - f"ID: {pytorch_op.id}, Name: '{pytorch_op.name}'." - ) - continue - self.link_ops(pytorch_op, kineto_op, cpu_ev_idx_to_gpu_ops_map) - - self.logger.info("Completed mapping of PyTorch operators to Kineto operators.") - - def group_gpu_ops_by_cpu_launchers(self) -> Dict[str, List[KinetoOperator]]: - """ - Groups GPU operators based on their corresponding CPU launchers. - - This is determined by the 'ev_idx' which links GPU operators to their - initiating CPU launcher events. - - Returns: - Dict[str, List[KinetoOperator]]: Mapping from CPU launch event indices - to GPU operators. - - Raises: - ValueError: If 'ev_idx' is missing for any GPU operator. - """ - cpu_ev_idx_to_gpu_ops_map = {} - for gpu_op in self.kineto_gpu_ops: - parent_cpu_op = self.find_parent_cpu_op(gpu_op) - if not parent_cpu_op: - warning_msg = f"Missing parent CPU operator for GPU op '{gpu_op.name}'. Orphaned GPU operator." - self.logger.warning(warning_msg) - continue - - if parent_cpu_op.ev_idx == "": - error_msg = ( - f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. " - f"Cannot link to GPU op {gpu_op.name} to {parent_cpu_op.name}." - ) - self.logger.warning(error_msg) - continue - - self.logger.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") - - cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op) - - return cpu_ev_idx_to_gpu_ops_map - - def find_parent_cpu_op(self, kineto_gpu_op: KinetoOperator) -> Optional[KinetoOperator]: - """ - Finds the parent CPU operator for a given GPU operator by identifying - the corresponding CUDA runtime operator through the correlation ID. It - then locates the closest preceding CPU operator based on the CUDA runtime's - timestamp, considering the temporal distance between the GPU operation's - start and the initiating CPU operation. - - Args: - kineto_gpu_op (KinetoOperator): The GPU operator. - - Returns: - Optional[KinetoOperator]: The parent CPU operator if found. - - Raises: - ValueError: If no CUDA runtime operator is found for the given - correlation ID. - """ - if kineto_gpu_op.correlation not in self.kineto_cuda_runtime: - warning_msg = "No CUDA runtime operator found for correlation ID {kineto_gpu_op.correlation}." - self.logger.warning(warning_msg) - return None - - kineto_cuda_runtime_op = self.kineto_cuda_runtime[kineto_gpu_op.correlation] - kineto_gpu_op.tid = kineto_cuda_runtime_op.tid - self.logger.debug( - f"Found CUDA runtime operation '{kineto_cuda_runtime_op.name}' for GPU operator '{kineto_gpu_op.name}'." - ) - - kineto_gpu_op.timestamp = self.get_start_timestamp_for_gpu_op(kineto_gpu_op) - - # Find the closest CPU operator that precedes the CUDA runtime operation - parent_cpu_op = self.find_closest_op(kineto_gpu_op, self.sorted_kineto_ops, kineto_cuda_runtime_op.timestamp) - if not parent_cpu_op: - self.logger.warning( - f"No parent CPU operator found for GPU operator '{kineto_gpu_op.name}' " - f"linked to CUDA runtime operation '{kineto_cuda_runtime_op.name}' " - f"(ts: {kineto_cuda_runtime_op.timestamp})." - ) - - return parent_cpu_op - - def get_start_timestamp_for_gpu_op(self, kineto_gpu_op: KinetoOperator) -> int: - """ - Determines the start timestamp for a GPU operator from various sources. - - Args: - kineto_gpu_op (KinetoOperator): The GPU operator. - - Returns: - int: The start timestamp. - - Raises: - RuntimeError: If no valid timestamp is found for the GPU operator. - """ - if kineto_gpu_op.external_id in self.kineto_cpu_launcher_ops: - cpu_launcher_op = self.kineto_cpu_launcher_ops[kineto_gpu_op.external_id] - return cpu_launcher_op.timestamp + cpu_launcher_op.inclusive_dur - if kineto_gpu_op.external_id in self.kineto_ac2g_s_ops: - return self.kineto_ac2g_s_ops[kineto_gpu_op.external_id].timestamp - if kineto_gpu_op.external_id in self.kineto_ac2g_f_ops: - return self.kineto_ac2g_f_ops[kineto_gpu_op.external_id].timestamp - raise RuntimeError(f"No valid timestamp found for GPU operator: {kineto_gpu_op.name}") - - def find_closest_op( - self, kineto_gpu_op: KinetoOperator, kineto_ops: List[KinetoOperator], ts: int - ) -> Optional[KinetoOperator]: - """ - Finds the Kineto operator that is closest in start time to a given timestamp - and has a duration that covers the timestamp. - - Args: - kineto_gpu_op (KinetoOperator): The GPU operator being compared. - kineto_ops (List[KinetoOperator]): List of Kineto operators. - ts (int): The timestamp to compare against. - - Returns: - Optional[KinetoOperator]: The closest Kineto operator if found. - """ - # Searching for the closest timestamp index - index = bisect.bisect_left(self.sorted_kineto_ts, ts) - - if index == 0: - # All operators are later than the timestamp - return None - else: - # The operator immediately before the index is the closest one before the timestamp - closest_op = kineto_ops[index - 1] - - # Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped - if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced": - # Move back to find a non-'nccl:coalesced' operator, if available - for new_index in range(index - 2, -1, -1): - potential_op = kineto_ops[new_index] - if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced": - return potential_op - # If no valid alternative found before 'nccl:coalesced', continue search forward - index = index - 1 # Adjust index to skip 'nccl:coalesced' - - # After skipping 'nccl:coalesced', verify that the closest operation is on the same thread - # as the GPU operation - if closest_op.tid == kineto_gpu_op.tid: - return closest_op - - # If the tids do not match, search forward to find the closest matching tid - for i in range(index - 1, -1, -1): - op = kineto_ops[i] - if op.tid == kineto_gpu_op.tid: - if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": - continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation - if op.timestamp <= ts: - return op - - # If no matching tid is found going forward, return None - return None - - def link_ops( - self, - pytorch_op: PyTorchOperator, - kineto_op: KinetoOperator, - cpu_ev_idx_to_gpu_ops_map: Dict[str, List[KinetoOperator]], - ) -> None: - """ - Links a PyTorch operator to its corresponding Kineto operator and any associated GPU operators. - - Args: - pytorch_op (PyTorchOperator): PyTorch operator to link. - kineto_op (KinetoOperator): Corresponding Kineto operator. - cpu_ev_idx_to_gpu_ops_map (Dict[str, List[KinetoOperator]]): GPU ops mapping. - """ - kineto_op.pytorch_op = pytorch_op - if kineto_op.ev_idx in cpu_ev_idx_to_gpu_ops_map: - self.pytorch_op_id_to_kineto_ops_map[pytorch_op.id] = cpu_ev_idx_to_gpu_ops_map[kineto_op.ev_idx] - self.pytorch_op_id_to_inclusive_dur_map[pytorch_op.id] = kineto_op.inclusive_dur - self.pytorch_op_id_to_exclusive_dur_map[pytorch_op.id] = kineto_op.exclusive_dur - self.pytorch_op_id_to_timestamp_map[pytorch_op.id] = kineto_op.timestamp - if kineto_op.inter_thread_dep: - inter_thread_dep_kineto_op = self.kineto_rf_id_to_kineto_op_map[kineto_op.inter_thread_dep] - if inter_thread_dep_kineto_op.pytorch_op: - self.pytorch_op_id_to_inter_thread_dep_map[pytorch_op.id] = inter_thread_dep_kineto_op.pytorch_op.id - if kineto_op.ev_idx in cpu_ev_idx_to_gpu_ops_map: - self.link_gpu_ops(pytorch_op, cpu_ev_idx_to_gpu_ops_map[kineto_op.ev_idx]) - - def link_gpu_ops(self, pytorch_op: PyTorchOperator, kineto_gpu_ops: List[KinetoOperator]) -> None: - """ - Links GPU operators to a PyTorch operator. - - Args: - pytorch_op (PyTorchOperator): The PyTorch operator to link to. - kineto_gpu_ops (List[KinetoOperator]): GPU operators to link. - """ - for gpu_op in kineto_gpu_ops: - gpu_op.parent_pytorch_op_id = pytorch_op.id - - def construct_et_plus_data(self) -> None: - """ - Constructs the enhanced PyTorch Execution Trace (ET+) data structure by - integrating Kineto data into the original PyTorch Execution Trace. - - This method enriches the PyTorch execution trace with detailed performance - data from the Kineto trace, offering a comprehensive view of the execution. - """ - self.logger.info("Constructing ET+ data.") - with open(self.pytorch_et_file, "r") as file: - pytorch_et_data = json.load(file) - - sorted_nodes = sorted(pytorch_et_data["nodes"], key=lambda x: x["id"]) - gpu_ops = [] - for op in sorted_nodes: - gpu_ops += self.process_op_and_dependents(op) - pytorch_et_data["nodes"] += gpu_ops - - # Update parent-child relationships with new IDs - sorted_nodes = sorted(pytorch_et_data["nodes"], key=lambda x: x["id"]) - for op in sorted_nodes: - if "ctrl_deps" in op: - op["ctrl_deps"] = self.id_assigner.assign_or_retrieve_id(op["ctrl_deps"]) - - self.pytorch_et_plus_data = pytorch_et_data - self.logger.info("ET+ data construction completed.") - - def process_op_and_dependents(self, op: Dict) -> List[Dict]: - """ - Processes a single operator in the PyTorch ET data, assigns a new unique ID, - and processes any dependent GPU operators. - - Args: - op (Dict): The operator to be processed. - - Returns: - List[Dict]: A list of GPU operators processed and linked to the given - operator. - """ - orig_op_id = op["id"] - new_op_id = self.id_assigner.assign_or_retrieve_id(orig_op_id) - op["id"] = new_op_id - - # Update operator with Kineto data if available - if orig_op_id in self.pytorch_op_id_to_inclusive_dur_map: - op["inclusive_dur"] = self.pytorch_op_id_to_inclusive_dur_map[orig_op_id] - op["exclusive_dur"] = self.pytorch_op_id_to_exclusive_dur_map[orig_op_id] - op["ts"] = self.pytorch_op_id_to_timestamp_map[orig_op_id] - if orig_op_id in self.pytorch_op_id_to_inter_thread_dep_map: - op["inter_thread_dep"] = self.id_assigner.lookup_new_id( - self.pytorch_op_id_to_inter_thread_dep_map[orig_op_id] - ) - else: - op["inter_thread_dep"] = None - - # Process and append dependent GPU operators - if orig_op_id in self.pytorch_op_id_to_kineto_ops_map: - gpu_ops = self.process_dependent_gpu_ops(op, orig_op_id) - self.pytorch_op_id_to_kineto_ops_map.pop(orig_op_id) - return gpu_ops - return [] - - def process_dependent_gpu_ops(self, cpu_op: Dict, orig_op_id: int) -> List[Dict]: - """ - Creates and returns a list of GPU operators that are dependent on a - specific CPU operator, sorted by their timestamp. The GPU operators are - deep copies of the existing operators with updated IDs and other relevant - fields from the CPU operator. - - Args: - cpu_op (Dict): The PyTorch CPU operator. - orig_op_id (int): The original ID of the CPU operator. - - Returns: - List[Dict]: A list of processed GPU operators. - """ - updated_gpu_ops = [] - dependent_gpu_ops = self.pytorch_op_id_to_kineto_ops_map.get(orig_op_id, []) - for gpu_op in sorted(dependent_gpu_ops, key=lambda x: x.timestamp): - new_gpu_op = copy.deepcopy(cpu_op) - new_gpu_op_id = self.id_assigner.generate_new_id() - new_gpu_op.update( - { - "id": new_gpu_op_id, - "ctrl_deps": orig_op_id, - "inputs": cpu_op["inputs"], - "outputs": cpu_op["outputs"], - "cat": gpu_op.category, - "name": gpu_op.name, - "ph": gpu_op.phase, - "inclusive_dur": gpu_op.inclusive_dur, - "exclusive_dur": gpu_op.exclusive_dur, - "ts": gpu_op.timestamp, - "stream": gpu_op.stream, - } - ) - updated_gpu_ops.append(new_gpu_op) - - return updated_gpu_ops - - def dump_pytorch_execution_trace_plus(self, output_file: str) -> None: - """ - Dumps the enhanced PyTorch Execution Trace (ET+) data to a file. - - Args: - output_file (str): The file path where the ET+ data will be saved. - """ - self.logger.info(f"Starting to dump ET+ data to {output_file}.") - - if self.pytorch_et_plus_data is None: - self.logger.error("ET+ data not constructed. Please run construct_et_plus_data first.") - return - - if "nodes" in self.pytorch_et_plus_data: - self.pytorch_et_plus_data["nodes"] = sorted(self.pytorch_et_plus_data["nodes"], key=lambda x: x["id"]) - - try: - with open(output_file, "w") as file: - json.dump(self.pytorch_et_plus_data, file, indent=4) - self.logger.info(f"ET+ data dumped to {output_file}.") - except IOError as e: - self.logger.error(f"Failed to dump ET+ data to {output_file}. Error: {e}") - except Exception as e: - self.logger.error(f"An unexpected error occurred while dumping ET+ data. Error: {e}") +from .trace_linker import TraceLinker def main() -> None: diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py new file mode 100644 index 00000000..292f4c70 --- /dev/null +++ b/src/trace_link/trace_linker.py @@ -0,0 +1,883 @@ +import bisect +import copy +import json +import logging +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, List, Optional, Tuple + +from param_bench.train.compute.python.tools.execution_trace import ( + EXECUTION_TRACE_PROCESS_ANNOTATION, + EXECUTION_TRACE_THREAD_ANNOTATION, +) +from param_bench.train.compute.python.tools.execution_trace import ( + Node as PyTorchOperator, +) +from param_bench.train.compute.python.tools.utility import ( + load_execution_trace_file, + read_dictionary_from_json_file, +) + +from .kineto_operator import KinetoOperator +from .unique_id_assigner import UniqueIdAssigner + +# Increase the recursion limit for deep PyTorch execution traces. +sys.setrecursionlimit(10**6) + + +class TraceLinker: + """ + Links PyTorch Execution Traces (ET) and Kineto Traces to generate PyTorch ET plus. + + This class handles the process of loading, processing, and linking + PyTorch Execution Traces with Kineto Traces, enriching the PyTorch + Execution Trace with detailed performance data. + + Attributes: + pytorch_et_file (str): Path to the PyTorch execution trace file. + kineto_file (str): Path to the Kineto trace file. + pytorch_ops (List[PyTorchOperator]): PyTorch operators from ET trace. + kineto_ops (List[KinetoOperator]): Kineto operators from the trace. + sorted_kineto_ops (List[KinetoOperator]): Sorted list of Kineto operators based on timestamps. + sorted_ts (List[int]): Sorted list of timestamps extracted from Kineto operators for efficient temporal queries. + kineto_ops_by_tid (Dict[int, List[KinetoOperator]]): Operators grouped by thread ID. + kineto_cuda_runtime (Dict[int, KinetoOperator]): Mapping of CUDA runtime + API calls to Kineto operators, indexed by their correlation ID. This + includes operations like `cudaLaunchKernel` and `cudaMemcpyAsync`, + crucial for mapping GPU activities back to their initiating CPU calls. + kineto_ac2g_s_ops (Dict[str, KinetoOperator]): Start ops for CPU to GPU. + kineto_ac2g_f_ops (Dict[str, KinetoOperator]): Final ops for CPU to GPU. + kineto_cpu_launcher_ops (Dict[str, KinetoOperator]): CPU launcher ops. + kineto_gpu_ops (List[KinetoOperator]): GPU operators. + kineto_process_start_time (int): Start time of the process, based on the + earliest operator timestamp. + kineto_process_end_time (int): End time of the process, based on the + latest operator timestamp. + kineto_thread_info (Dict[int, Tuple[int, int]]): Information about threads, + mapping thread IDs to a tuple of start and end times. + kineto_rf_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from + rf_id to KinetoOperator instances. + pytorch_op_id_to_kineto_ops_map (Dict[int, List[KinetoOperator]]): + Map from PyTorch op IDs to Kineto GPU ops. + pytorch_op_id_to_inclusive_dur_map (Dict[int, int]): Inclusive duration map for PyTorch ops. + pytorch_op_id_to_inclusive_dur_map (Dict[int, int]): Exclusive duration map for PyTorch ops. + pytorch_op_id_to_timestamp_map (Dict[int, int]): Timestamp map for PyTorch ops. + pytorch_op_id_to_inter_thread_dep_map (Dict[int, int]): Mapping of PyTorch + operator IDs to IDs of latest CPU node from other threads before the gap. + id_assigner (UniqueIdAssigner): Assigns unique IDs to operators. + pytorch_et_plus_data (Optional[Dict]): PyTorch ET plus data. + logger (logging.Logger): Logger for the class. + """ + + def __init__(self, pytorch_et_file: str, kineto_file: str, log_level: str = "INFO") -> None: + """ + Initializes the TraceLinker with paths to the PyTorch and Kineto trace files, + and a log level. + + Args: + pytorch_et_file (str): Path to the PyTorch execution trace file. + kineto_file (str): Path to the Kineto trace file. + log_level (str): Logging level for the class. + """ + self.pytorch_et_file = pytorch_et_file + self.kineto_file = kineto_file + self.pytorch_ops: List[PyTorchOperator] = [] + self.kineto_ops: List[KinetoOperator] = [] + self.sorted_kineto_ops: List[KinetoOperator] = [] + self.sorted_ts: List[int] = [] + self.kineto_ops_by_tid: Dict[int, List[KinetoOperator]] = {} + self.kineto_cuda_runtime: Dict[int, KinetoOperator] = {} + self.kineto_ac2g_s_ops: Dict[str, KinetoOperator] = {} + self.kineto_ac2g_f_ops: Dict[str, KinetoOperator] = {} + self.kineto_cpu_launcher_ops: Dict[str, KinetoOperator] = {} + self.kineto_gpu_ops: List[KinetoOperator] = [] + self.kineto_process_start_time: int = 0 + self.kineto_process_end_time: int = 0 + self.kineto_thread_info: Dict[int, Tuple[int, int]] = {} + self.kineto_rf_id_to_kineto_op_map: Dict[int, KinetoOperator] = {} + self.pytorch_op_id_to_kineto_ops_map: Dict[int, List[KinetoOperator]] = {} + self.pytorch_op_id_to_inclusive_dur_map: Dict[int, int] = {} + self.pytorch_op_id_to_exclusive_dur_map: Dict[int, int] = {} + self.pytorch_op_id_to_timestamp_map: Dict[int, int] = {} + self.pytorch_op_id_to_inter_thread_dep_map: Dict[int, int] = {} + self.id_assigner = UniqueIdAssigner() + self.pytorch_et_plus_data: Optional[Dict] = None + self.logger = logging.getLogger(__name__) + self.logger.setLevel(log_level.upper()) + + def load_traces(self) -> None: + """ + Loads both PyTorch Execution Traces and Kineto Traces. + This method is a high-level orchestrator that calls specific methods to load + and process the PyTorch and Kineto traces individually. + """ + self.load_pytorch_et() + self.load_kineto_trace() + + def load_pytorch_et(self) -> None: + """ + Loads and processes the PyTorch Execution Trace. + This method handles multiple iterations in the trace and extracts the nodes, + considering the specified annotation for segmenting the iterations. + """ + self.logger.info("Starting to load PyTorch Execution Trace.") + pytorch_et = load_execution_trace_file(self.pytorch_et_file) + + root_node = pytorch_et.get_nodes()[1] # Root node is usually 1-based + self.pytorch_ops = self.extract_pytorch_ops(root_node) + self.logger.info(f"Original ops in PyTorch ET: {len(self.pytorch_ops)}") + self.logger.info("PyTorch Execution Trace loaded successfully.") + + def extract_pytorch_ops(self, node: PyTorchOperator) -> List[PyTorchOperator]: + """ + Extracts and sorts nodes from the PyTorch execution trace recursively. + + This method traverses the execution trace starting from the provided node, + extracting all the operator nodes recursively, and then returns them sorted + by their identifiers. + + Args: + node (PyTorchOperator): Starting node for extraction. + + Returns: + List[PyTorchOperator]: Sorted list of extracted PyTorchOperator nodes. + """ + nodes = [] + + def traverse(node: PyTorchOperator): + nodes.append(node) + for child in node.children: + traverse(child) + + traverse(node) + return sorted(nodes, key=lambda x: x.id) + + def load_kineto_trace(self) -> None: + """ + Loads and processes the Kineto Trace. + This method parses the Kineto trace file, creating KinetoOperator instances + for each operator in the trace. It then categorizes and segments these + operators for further processing and linking with PyTorch operators. + """ + self.logger.info("Starting to load Kineto Trace.") + kineto_trace_data = read_dictionary_from_json_file(self.kineto_file) + sorted_kineto_ops = sorted( + [KinetoOperator(op) for op in kineto_trace_data["traceEvents"]], + key=lambda op: op.timestamp, + ) + + self.categorize_and_track_kineto_ops(sorted_kineto_ops) + self.construct_kineto_rf_id_map() + self.calculate_exclusive_dur() + + self.sorted_kineto_ops = sorted(self.kineto_ops, key=lambda op: op.timestamp) + self.sorted_kineto_ts = [op.timestamp for op in self.sorted_kineto_ops] + + self.logger.info( + f"Processed Kineto trace with {len(self.kineto_ops)} CPU ops, " + f"{len(self.kineto_cpu_launcher_ops)} CPU launcher ops, " + f"and {len(self.kineto_gpu_ops)} GPU ops." + ) + self.logger.info("Kineto Trace loaded successfully.") + + def categorize_and_track_kineto_ops(self, kineto_ops: List[KinetoOperator]) -> None: + """ + Categorizes Kineto operators based on their properties and assigns them to + corresponding groups for CPU, GPU, and other operations. + + Args: + kineto_ops (List[KinetoOperator]): List of Kineto operators to categorize. + + Raises: + ValueError: If duplicate correlation IDs are found in 'cuda_runtime' + category operators. + """ + self.logger.info("Categorizing Kineto operators and calculating timing boundaries.") + process_start_time = sys.maxsize + process_end_time = 0 + thread_info = {} + + for op in kineto_ops: + if op.is_valid("cpu_op") or op.is_valid("user_annotation"): + self.kineto_ops.append(op) + self.kineto_ops_by_tid.setdefault(op.tid, []).append(op) + self.logger.debug(f"Added CPU or user annotation op: {op.name}") + elif op.is_valid("ac2g", phase="s"): + self._add_op_to_dict(op, self.kineto_ac2g_s_ops, "id") + elif op.is_valid("ac2g", phase="f"): + self._add_op_to_dict(op, self.kineto_ac2g_f_ops, "id") + elif ( + op.is_valid("cuda_runtime") + and op.name + in [ + "cudaLaunchKernel", + "cudaLaunchKernelExC", + "cudaMemcpyAsync", + ] + ) or (op.category == "cuda_driver" and op.name in ["cuLaunchKernel"]): + self._add_op_to_dict(op, self.kineto_cpu_launcher_ops, "args", "External id") + self.logger.debug(f"Added CPU launcher op: {op.name}") + elif op.is_valid("kernel") or op.is_valid("gpu_memcpy"): + self.kineto_gpu_ops.append(op) + self.logger.debug(f"Added GPU op: {op.name}") + + if (op.category == "cuda_runtime") or (op.category == "cuda_driver"): + if op.correlation in self.kineto_cuda_runtime: + raise ValueError(f"Duplicate correlation ID {op.correlation} found in cuda_runtime operators.") + self.kineto_cuda_runtime[op.correlation] = op + + # Update timing boundaries + if op.tid is not None: + process_start_time = min(process_start_time, op.timestamp) + process_end_time = max(process_end_time, op.timestamp + op.inclusive_dur) + thread_start_end = thread_info.setdefault(op.tid, [sys.maxsize, 0]) + thread_start_end[0] = min(thread_start_end[0], op.timestamp) + thread_start_end[1] = max(thread_start_end[1], op.timestamp + op.inclusive_dur) + + # Apply collected timing info + self.kineto_process_start_time = process_start_time + self.kineto_process_end_time = process_end_time + self.kineto_thread_info = thread_info + self.logger.info("Kineto operators categorized and timing boundaries calculated.") + + def construct_kineto_rf_id_map(self) -> None: + """ + Constructs a map from rf_id to KinetoOperator instances. + """ + self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_ops if op.rf_id is not None} + + def calculate_exclusive_dur(self) -> None: + """ + Calculates the exclusive duration of each operator in the Kineto traces + in parallel. The exclusive duration is defined as the total duration of + the operator minus any time spent in child operators, effectively + representing the time spent exclusively in that operator. This approach + significantly improves the performance of calculating exclusive durations, + especially for traces with a large number of operators. Additionally, by + processing each thread's operators in parallel, the method takes advantage + of concurrent execution capabilities to further speed up the computation. + """ + self.logger.info("Calculating exclusive durations for Kineto operators in parallel.") + + def process_ops_for_thread(ops: List["KinetoOperator"]) -> None: + self.logger.info(f"Processing {len(ops)} operators in thread.") + sorted_ops = sorted(ops, key=lambda op: (op.timestamp, op.inclusive_dur)) + for i, op in enumerate(sorted_ops): + exclusive_dur = op.inclusive_dur + overlapping_regions = [] + + # Identify overlapping regions with child operators + for child_op in sorted_ops[i + 1 :]: + if child_op.timestamp >= op.timestamp and (child_op.timestamp + child_op.inclusive_dur) <= ( + op.timestamp + op.inclusive_dur + ): + overlap_start = child_op.timestamp + overlap_end = child_op.timestamp + child_op.inclusive_dur + overlapping_regions.append((overlap_start, overlap_end)) + if (op.timestamp + op.inclusive_dur) < child_op.timestamp: + break + + # Merge overlapping regions and calculate exclusive duration + merged_regions = self.merge_overlapping_intervals(overlapping_regions) + for start, end in merged_regions: + exclusive_dur -= end - start + + # Check if exclusive_dur is not negative or zero + if exclusive_dur < 0: + error_msg = ( + f"Exclusive duration calculation error for node " + f"'{op.name}' (ts: {op.timestamp}, " + f"inclusive_dur: {op.inclusive_dur}, " + f"rf_id: {op.rf_id}): " + f"Duration cannot be less than zero." + ) + self.logger.error(error_msg) + raise ValueError(error_msg) + + op.exclusive_dur = exclusive_dur + self.logger.debug( + f"Node '{op.name}' (ts: {op.timestamp}, " + f"inclusive_dur: {op.inclusive_dur}, " + f"rf_id: {op.rf_id}) " + f"exclusive duration: {op.exclusive_dur} microseconds." + ) + + with ThreadPoolExecutor() as executor: + futures = [executor.submit(process_ops_for_thread, ops) for ops in self.kineto_ops_by_tid.values()] + + for future in as_completed(futures): + future.result() # Wait for all threads to complete and handle any exceptions + + self.logger.info("Exclusive durations for Kineto operators calculated successfully.") + + @staticmethod + def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: + """ + Merges overlapping intervals into a single interval. + + Args: + intervals (List[Tuple[int, int]]): List of intervals. + + Returns: + List[Tuple[int, int]]: List of merged intervals. + """ + if not intervals: + return [] + + # Sort intervals based on the start time + intervals.sort(key=lambda x: x[0]) + merged = [intervals[0]] + + for current in intervals: + prev = merged[-1] + if current[0] <= prev[1]: + # There is overlap, merge the current interval with the previous one + merged[-1] = (prev[0], max(prev[1], current[1])) + else: + # No overlap, add the current interval + merged.append(current) + + return merged + + def _add_op_to_dict(self, op: KinetoOperator, target_dict: Dict, *keys: str) -> None: + """ + Adds an operator to a specific dictionary based on provided keys. + The method navigates through the operator's dictionary using the keys + and adds the operator to the target dictionary. + + Args: + op (KinetoOperator): The operator to be added. + target_dict (Dict): The dictionary to which the operator should be added. + *keys (str): Keys used to navigate through the operator's dictionary. + + Raises: + KeyError: If any of the keys are not found in the operator's dictionary. + """ + value = op.op_dict + for key in keys: + if key not in value: + error_msg = f"Key '{key}' not found in operator dictionary for op {op.name}." + self.logger.error(error_msg) + raise KeyError(error_msg) + value = value[key] + + target_dict[value] = op + + def enforce_inter_thread_order(self, threshold: int = 1000) -> None: + """ + Enforces order between groups of operators in different threads. In + Kineto traces with multiple threads, operators are executed in turns, + creating groups. This function identifies these groups by detecting + significant gaps in execution within each thread. It then establishes + dependencies between these groups across different threads, ensuring + the final Chakra execution traces reflect inter-thread dependencies + realistically. + + An isolated group is formed when there's a significant gap in execution + within a thread. Each new group relies on the last CPU operator from + other threads, enforcing order and dependency across threads. + + Args: + threshold (int): Threshold for significant gap detection in + microseconds, used to define group boundaries. + """ + self.logger.info("Enforcing inter-thread order in Kineto traces.") + + def process_thread( + tid: int, + ops: List[KinetoOperator], + ops_by_tid: Dict[int, List[KinetoOperator]], + ) -> None: + self.logger.info(f"Thread {tid}: Identifying gaps for dependency linking with threshold {threshold}us.") + sorted_ops = sorted(ops, key=lambda op: op.timestamp) + last_cpu_node_rf_id = None + + for i, op in enumerate(sorted_ops): + if ( + i == 0 + or (sorted_ops[i].timestamp - sorted_ops[i - 1].timestamp - sorted_ops[i - 1].inclusive_dur) + > threshold + ): + last_cpu_node_rf_id = self.find_last_cpu_node_before_timestamp(ops_by_tid, tid, op.timestamp) + if last_cpu_node_rf_id: + self.logger.debug( + f"Thread {tid}: Linking op '{op.name}' " + f"to CPU node before gap with rf_id " + f"'{last_cpu_node_rf_id}'." + ) + + if last_cpu_node_rf_id: + op.inter_thread_dep = last_cpu_node_rf_id + + with ThreadPoolExecutor() as executor: + futures = { + executor.submit(process_thread, tid, ops, self.kineto_ops_by_tid): tid + for tid, ops in self.kineto_ops_by_tid.items() + } + + for future in as_completed(futures): + tid = futures[future] + try: + future.result() + self.logger.debug(f"Thread {tid} dependencies processed.") + except Exception as e: + self.logger.error(f"Error processing thread {tid}: {e}") + + def find_last_cpu_node_before_timestamp( + self, + ops_by_tid: Dict[int, List[KinetoOperator]], + exclude_tid: int, + timestamp: int, + ) -> Optional[int]: + """ + Finds the last CPU node ID before a given timestamp in threads other + than the excluded one. This ID is used to establish dependencies + between groups across threads. + + Args: + ops_by_tid (Dict[int, List[KinetoOperator]]): Operators grouped by + thread ID. + exclude_tid (int): Thread ID to exclude from the search. + timestamp (int): Timestamp to compare against. + + Returns: + Optional[int]: The ID of the last CPU node found, or None if not found. + """ + self.logger.debug(f"Finding last CPU node before timestamp {timestamp} excluding thread {exclude_tid}.") + last_cpu_node = None + last_cpu_node_rf_id = None + latest_timestamp = 0 + for tid, ops in ops_by_tid.items(): + if tid != exclude_tid: + for op in sorted(ops, key=lambda op: op.timestamp): + if ( + (op.category in ["cpu_op", "user_annotation"]) + and (op.timestamp < timestamp) + and (op.timestamp > latest_timestamp) + ): + last_cpu_node = op + latest_timestamp = op.timestamp + last_cpu_node_rf_id = op.rf_id + if last_cpu_node: + self.logger.debug(f"Last CPU node before timestamp {timestamp} found: {last_cpu_node}") + return last_cpu_node_rf_id + + def link_traces(self) -> None: + """ + Initiates the linking process between PyTorch Execution Traces (ET) and + Kineto Traces to produce an enhanced PyTorch Execution Trace (ET+). This + process relies on the assumption of an 'exact match' between these traces. + """ + self.logger.info("Starting the process of linking PyTorch and Kineto traces.") + self.add_thread_and_process_annotations() + self.map_pytorch_to_kineto_ops() + self.construct_et_plus_data() + self.logger.info("Traces have been successfully linked.") + + def add_thread_and_process_annotations(self) -> None: + """ + Adds thread and process annotations to Kineto operators based on + previously tracked timing information. These annotations are crucial + for aligning Kineto operators with PyTorch ET nodes, ensuring + completeness and compatibility of trace data for analysis. This method + uses the process start and end times, as well as thread start and end + times, collected during the categorization process to insert + appropriate annotations directly into the Kineto operators list. + """ + self.logger.info("Adding process and thread annotations to Kineto operators.") + + # Insert process annotation operator. This operator represents the + # overall time span of the trace process. + process_annotation_op = KinetoOperator( + { + "name": EXECUTION_TRACE_PROCESS_ANNOTATION, + "ts": self.kineto_process_start_time, + "inclusive_dur": self.kineto_process_end_time - self.kineto_process_start_time, + "exclusive_dur": 0, # Process exclusive duration not applicable + } + ) + self.kineto_ops.insert(0, process_annotation_op) + self.logger.debug( + "Process annotation added with start time {} and duration {}.".format( + self.kineto_process_start_time, + self.kineto_process_end_time - self.kineto_process_start_time, + ) + ) + + # Insert thread annotation operators for each thread. These annotations + # are crucial for understanding thread-level execution within the trace. + for tid, (start_ts, end_ts) in self.kineto_thread_info.items(): + inclusive_dur = end_ts - start_ts + thread_annotation_op = KinetoOperator( + { + "name": EXECUTION_TRACE_THREAD_ANNOTATION, + "ts": start_ts, + "inclusive_dur": inclusive_dur, + # Exclusive duration is set to zero in the final annotation. + # This is to avoid constraining the execution schedule to the + # original trace, allowing more flexibility in analyzing + # dependencies without being bound by specific execution timings. + "exclusive_dur": 0, + } + ) + # Find the correct position to insert the thread annotation + position = next( + (i for i, op in enumerate(self.kineto_ops) if op.tid == tid and op.timestamp >= start_ts), + None, + ) + if position is not None: + self.kineto_ops.insert(position, thread_annotation_op) + else: + self.kineto_ops.append(thread_annotation_op) + self.logger.debug( + "Thread {} annotation added with start time {} and duration {}.".format(tid, start_ts, inclusive_dur) + ) + + def map_pytorch_to_kineto_ops(self) -> None: + """ + Maps PyTorch ET nodes to corresponding Kineto operators, ensuring + each PyTorch node has a matching Kineto operator. + """ + self.logger.info("Mapping PyTorch ET nodes to Kineto operators.") + cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers() + + pytorch_ops_count = len(self.pytorch_ops) + kineto_ops_count = len(self.kineto_ops) + if pytorch_ops_count > kineto_ops_count: + # The specific comment is placed within the if block as requested. + self.logger.warning( + f"Number of PyTorch operators ({pytorch_ops_count}) is larger " + f"than the number of Kineto operators ({kineto_ops_count}). " + f"It is expected that the number of PyTorch operators (CPU only) " + f"will be smaller than the number of Kineto operators (CPU and GPU)." + f" A warning is logged if this is not the case, which is a rare " + f"but possible scenario." + ) + + for _, pytorch_op in enumerate(self.pytorch_ops): + if (pytorch_op.rf_id is not None) and (pytorch_op.rf_id in self.kineto_rf_id_to_kineto_op_map): + kineto_op = self.kineto_rf_id_to_kineto_op_map[pytorch_op.rf_id] + if kineto_op is None: + self.logger.warning( + f"No corresponding Kineto op found for PyTorch op " + f"ID: {pytorch_op.id}, Name: '{pytorch_op.name}'." + ) + continue + self.link_ops(pytorch_op, kineto_op, cpu_ev_idx_to_gpu_ops_map) + + self.logger.info("Completed mapping of PyTorch operators to Kineto operators.") + + def group_gpu_ops_by_cpu_launchers(self) -> Dict[str, List[KinetoOperator]]: + """ + Groups GPU operators based on their corresponding CPU launchers. + + This is determined by the 'ev_idx' which links GPU operators to their + initiating CPU launcher events. + + Returns: + Dict[str, List[KinetoOperator]]: Mapping from CPU launch event indices + to GPU operators. + + Raises: + ValueError: If 'ev_idx' is missing for any GPU operator. + """ + cpu_ev_idx_to_gpu_ops_map = {} + for gpu_op in self.kineto_gpu_ops: + parent_cpu_op = self.find_parent_cpu_op(gpu_op) + if not parent_cpu_op: + warning_msg = f"Missing parent CPU operator for GPU op '{gpu_op.name}'. Orphaned GPU operator." + self.logger.warning(warning_msg) + continue + + if parent_cpu_op.ev_idx == "": + error_msg = ( + f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. " + f"Cannot link to GPU op {gpu_op.name} to {parent_cpu_op.name}." + ) + self.logger.warning(error_msg) + continue + + self.logger.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") + + cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op) + + return cpu_ev_idx_to_gpu_ops_map + + def find_parent_cpu_op(self, kineto_gpu_op: KinetoOperator) -> Optional[KinetoOperator]: + """ + Finds the parent CPU operator for a given GPU operator by identifying + the corresponding CUDA runtime operator through the correlation ID. It + then locates the closest preceding CPU operator based on the CUDA runtime's + timestamp, considering the temporal distance between the GPU operation's + start and the initiating CPU operation. + + Args: + kineto_gpu_op (KinetoOperator): The GPU operator. + + Returns: + Optional[KinetoOperator]: The parent CPU operator if found. + + Raises: + ValueError: If no CUDA runtime operator is found for the given + correlation ID. + """ + if kineto_gpu_op.correlation not in self.kineto_cuda_runtime: + warning_msg = "No CUDA runtime operator found for correlation ID {kineto_gpu_op.correlation}." + self.logger.warning(warning_msg) + return None + + kineto_cuda_runtime_op = self.kineto_cuda_runtime[kineto_gpu_op.correlation] + kineto_gpu_op.tid = kineto_cuda_runtime_op.tid + self.logger.debug( + f"Found CUDA runtime operation '{kineto_cuda_runtime_op.name}' for GPU operator '{kineto_gpu_op.name}'." + ) + + kineto_gpu_op.timestamp = self.get_start_timestamp_for_gpu_op(kineto_gpu_op) + + # Find the closest CPU operator that precedes the CUDA runtime operation + parent_cpu_op = self.find_closest_op(kineto_gpu_op, self.sorted_kineto_ops, kineto_cuda_runtime_op.timestamp) + if not parent_cpu_op: + self.logger.warning( + f"No parent CPU operator found for GPU operator '{kineto_gpu_op.name}' " + f"linked to CUDA runtime operation '{kineto_cuda_runtime_op.name}' " + f"(ts: {kineto_cuda_runtime_op.timestamp})." + ) + + return parent_cpu_op + + def get_start_timestamp_for_gpu_op(self, kineto_gpu_op: KinetoOperator) -> int: + """ + Determines the start timestamp for a GPU operator from various sources. + + Args: + kineto_gpu_op (KinetoOperator): The GPU operator. + + Returns: + int: The start timestamp. + + Raises: + RuntimeError: If no valid timestamp is found for the GPU operator. + """ + if kineto_gpu_op.external_id in self.kineto_cpu_launcher_ops: + cpu_launcher_op = self.kineto_cpu_launcher_ops[kineto_gpu_op.external_id] + return cpu_launcher_op.timestamp + cpu_launcher_op.inclusive_dur + if kineto_gpu_op.external_id in self.kineto_ac2g_s_ops: + return self.kineto_ac2g_s_ops[kineto_gpu_op.external_id].timestamp + if kineto_gpu_op.external_id in self.kineto_ac2g_f_ops: + return self.kineto_ac2g_f_ops[kineto_gpu_op.external_id].timestamp + raise RuntimeError(f"No valid timestamp found for GPU operator: {kineto_gpu_op.name}") + + def find_closest_op( + self, kineto_gpu_op: KinetoOperator, kineto_ops: List[KinetoOperator], ts: int + ) -> Optional[KinetoOperator]: + """ + Finds the Kineto operator that is closest in start time to a given timestamp + and has a duration that covers the timestamp. + + Args: + kineto_gpu_op (KinetoOperator): The GPU operator being compared. + kineto_ops (List[KinetoOperator]): List of Kineto operators. + ts (int): The timestamp to compare against. + + Returns: + Optional[KinetoOperator]: The closest Kineto operator if found. + """ + # Searching for the closest timestamp index + index = bisect.bisect_left(self.sorted_kineto_ts, ts) + + if index == 0: + # All operators are later than the timestamp + return None + else: + # The operator immediately before the index is the closest one before the timestamp + closest_op = kineto_ops[index - 1] + + # Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped + if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced": + # Move back to find a non-'nccl:coalesced' operator, if available + for new_index in range(index - 2, -1, -1): + potential_op = kineto_ops[new_index] + if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced": + return potential_op + # If no valid alternative found before 'nccl:coalesced', continue search forward + index = index - 1 # Adjust index to skip 'nccl:coalesced' + + # After skipping 'nccl:coalesced', verify that the closest operation is on the same thread + # as the GPU operation + if closest_op.tid == kineto_gpu_op.tid: + return closest_op + + # If the tids do not match, search forward to find the closest matching tid + for i in range(index - 1, -1, -1): + op = kineto_ops[i] + if op.tid == kineto_gpu_op.tid: + if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": + continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation + if op.timestamp <= ts: + return op + + # If no matching tid is found going forward, return None + return None + + def link_ops( + self, + pytorch_op: PyTorchOperator, + kineto_op: KinetoOperator, + cpu_ev_idx_to_gpu_ops_map: Dict[str, List[KinetoOperator]], + ) -> None: + """ + Links a PyTorch operator to its corresponding Kineto operator and any associated GPU operators. + + Args: + pytorch_op (PyTorchOperator): PyTorch operator to link. + kineto_op (KinetoOperator): Corresponding Kineto operator. + cpu_ev_idx_to_gpu_ops_map (Dict[str, List[KinetoOperator]]): GPU ops mapping. + """ + kineto_op.pytorch_op = pytorch_op + if kineto_op.ev_idx in cpu_ev_idx_to_gpu_ops_map: + self.pytorch_op_id_to_kineto_ops_map[pytorch_op.id] = cpu_ev_idx_to_gpu_ops_map[kineto_op.ev_idx] + self.pytorch_op_id_to_inclusive_dur_map[pytorch_op.id] = kineto_op.inclusive_dur + self.pytorch_op_id_to_exclusive_dur_map[pytorch_op.id] = kineto_op.exclusive_dur + self.pytorch_op_id_to_timestamp_map[pytorch_op.id] = kineto_op.timestamp + if kineto_op.inter_thread_dep: + inter_thread_dep_kineto_op = self.kineto_rf_id_to_kineto_op_map[kineto_op.inter_thread_dep] + if inter_thread_dep_kineto_op.pytorch_op: + self.pytorch_op_id_to_inter_thread_dep_map[pytorch_op.id] = inter_thread_dep_kineto_op.pytorch_op.id + if kineto_op.ev_idx in cpu_ev_idx_to_gpu_ops_map: + self.link_gpu_ops(pytorch_op, cpu_ev_idx_to_gpu_ops_map[kineto_op.ev_idx]) + + def link_gpu_ops(self, pytorch_op: PyTorchOperator, kineto_gpu_ops: List[KinetoOperator]) -> None: + """ + Links GPU operators to a PyTorch operator. + + Args: + pytorch_op (PyTorchOperator): The PyTorch operator to link to. + kineto_gpu_ops (List[KinetoOperator]): GPU operators to link. + """ + for gpu_op in kineto_gpu_ops: + gpu_op.parent_pytorch_op_id = pytorch_op.id + + def construct_et_plus_data(self) -> None: + """ + Constructs the enhanced PyTorch Execution Trace (ET+) data structure by + integrating Kineto data into the original PyTorch Execution Trace. + + This method enriches the PyTorch execution trace with detailed performance + data from the Kineto trace, offering a comprehensive view of the execution. + """ + self.logger.info("Constructing ET+ data.") + with open(self.pytorch_et_file, "r") as file: + pytorch_et_data = json.load(file) + + sorted_nodes = sorted(pytorch_et_data["nodes"], key=lambda x: x["id"]) + gpu_ops = [] + for op in sorted_nodes: + gpu_ops += self.process_op_and_dependents(op) + pytorch_et_data["nodes"] += gpu_ops + + # Update parent-child relationships with new IDs + sorted_nodes = sorted(pytorch_et_data["nodes"], key=lambda x: x["id"]) + for op in sorted_nodes: + if "ctrl_deps" in op: + op["ctrl_deps"] = self.id_assigner.assign_or_retrieve_id(op["ctrl_deps"]) + + self.pytorch_et_plus_data = pytorch_et_data + self.logger.info("ET+ data construction completed.") + + def process_op_and_dependents(self, op: Dict) -> List[Dict]: + """ + Processes a single operator in the PyTorch ET data, assigns a new unique ID, + and processes any dependent GPU operators. + + Args: + op (Dict): The operator to be processed. + + Returns: + List[Dict]: A list of GPU operators processed and linked to the given + operator. + """ + orig_op_id = op["id"] + new_op_id = self.id_assigner.assign_or_retrieve_id(orig_op_id) + op["id"] = new_op_id + + # Update operator with Kineto data if available + if orig_op_id in self.pytorch_op_id_to_inclusive_dur_map: + op["inclusive_dur"] = self.pytorch_op_id_to_inclusive_dur_map[orig_op_id] + op["exclusive_dur"] = self.pytorch_op_id_to_exclusive_dur_map[orig_op_id] + op["ts"] = self.pytorch_op_id_to_timestamp_map[orig_op_id] + if orig_op_id in self.pytorch_op_id_to_inter_thread_dep_map: + op["inter_thread_dep"] = self.id_assigner.lookup_new_id( + self.pytorch_op_id_to_inter_thread_dep_map[orig_op_id] + ) + else: + op["inter_thread_dep"] = None + + # Process and append dependent GPU operators + if orig_op_id in self.pytorch_op_id_to_kineto_ops_map: + gpu_ops = self.process_dependent_gpu_ops(op, orig_op_id) + self.pytorch_op_id_to_kineto_ops_map.pop(orig_op_id) + return gpu_ops + return [] + + def process_dependent_gpu_ops(self, cpu_op: Dict, orig_op_id: int) -> List[Dict]: + """ + Creates and returns a list of GPU operators that are dependent on a + specific CPU operator, sorted by their timestamp. The GPU operators are + deep copies of the existing operators with updated IDs and other relevant + fields from the CPU operator. + + Args: + cpu_op (Dict): The PyTorch CPU operator. + orig_op_id (int): The original ID of the CPU operator. + + Returns: + List[Dict]: A list of processed GPU operators. + """ + updated_gpu_ops = [] + dependent_gpu_ops = self.pytorch_op_id_to_kineto_ops_map.get(orig_op_id, []) + for gpu_op in sorted(dependent_gpu_ops, key=lambda x: x.timestamp): + new_gpu_op = copy.deepcopy(cpu_op) + new_gpu_op_id = self.id_assigner.generate_new_id() + new_gpu_op.update( + { + "id": new_gpu_op_id, + "ctrl_deps": orig_op_id, + "inputs": cpu_op["inputs"], + "outputs": cpu_op["outputs"], + "cat": gpu_op.category, + "name": gpu_op.name, + "ph": gpu_op.phase, + "inclusive_dur": gpu_op.inclusive_dur, + "exclusive_dur": gpu_op.exclusive_dur, + "ts": gpu_op.timestamp, + "stream": gpu_op.stream, + } + ) + updated_gpu_ops.append(new_gpu_op) + + return updated_gpu_ops + + def dump_pytorch_execution_trace_plus(self, output_file: str) -> None: + """ + Dumps the enhanced PyTorch Execution Trace (ET+) data to a file. + + Args: + output_file (str): The file path where the ET+ data will be saved. + """ + self.logger.info(f"Starting to dump ET+ data to {output_file}.") + + if self.pytorch_et_plus_data is None: + self.logger.error("ET+ data not constructed. Please run construct_et_plus_data first.") + return + + if "nodes" in self.pytorch_et_plus_data: + self.pytorch_et_plus_data["nodes"] = sorted(self.pytorch_et_plus_data["nodes"], key=lambda x: x["id"]) + + try: + with open(output_file, "w") as file: + json.dump(self.pytorch_et_plus_data, file, indent=4) + self.logger.info(f"ET+ data dumped to {output_file}.") + except IOError as e: + self.logger.error(f"Failed to dump ET+ data to {output_file}. Error: {e}") + except Exception as e: + self.logger.error(f"An unexpected error occurred while dumping ET+ data. Error: {e}") diff --git a/src/trace_link/unique_id_assigner.py b/src/trace_link/unique_id_assigner.py new file mode 100644 index 00000000..a5179175 --- /dev/null +++ b/src/trace_link/unique_id_assigner.py @@ -0,0 +1,73 @@ +from typing import Dict + + +class UniqueIdAssigner: + """ + Assigns unique IDs to items, ensuring each item gets a distinct ID. + + This class is used to maintain a consistent and unique mapping of original + identifiers to new unique identifiers. It's particularly useful in scenarios + where the uniqueness of IDs across different entities or iterations needs to + be preserved. + + Attributes: + next_id (int): The next unique ID to be assigned. + original_to_new_ids (Dict[int, int]): A mapping from original IDs to their + corresponding new unique IDs. This helps in retrieving already assigned + unique IDs and ensures the same original ID always maps to the same + unique ID. + """ + + def __init__(self) -> None: + """ + Initializes the UniqueIdAssigner with a starting ID of 0. + """ + self.next_id = 0 + self.original_to_new_ids: Dict[int, int] = {} + + def assign_or_retrieve_id(self, original_id: int) -> int: + """ + Assigns a new unique ID to the given original ID if it doesn't have one already; + otherwise, returns the previously assigned unique ID. + + Args: + original_id (int): The original ID for which a unique ID is needed. + + Returns: + int: A unique ID corresponding to the original ID. + """ + if original_id not in self.original_to_new_ids: + self.original_to_new_ids[original_id] = self.next_id + self.next_id += 1 + + return self.original_to_new_ids[original_id] + + def generate_new_id(self) -> int: + """ + Generates a new unique ID without needing an original ID. + + This is useful for cases where new entities are created that do not + have an existing identifier. + + Returns: + int: A new unique ID. + """ + unique_id = self.next_id + self.next_id += 1 + return unique_id + + def lookup_new_id(self, original_id: int) -> int: + """ + Retrieves the new unique ID for a given original ID, if it has been assigned. + + This method is useful for checking if a unique ID has already been + assigned to an original ID and retrieving it. + + Args: + original_id (int): The original ID to look up. + + Returns: + int: The new unique ID if it has been assigned, otherwise returns + the original ID. + """ + return self.original_to_new_ids.get(original_id, original_id) From a836df954793bbe4021e6e85e02b7b97720beeca Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 11:21:12 -0400 Subject: [PATCH 09/15] Install Chakra and PARAM in GH actions --- .github/workflows/python_tests.yml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python_tests.yml b/.github/workflows/python_tests.yml index 399b5a91..d20e4722 100644 --- a/.github/workflows/python_tests.yml +++ b/.github/workflows/python_tests.yml @@ -13,7 +13,18 @@ jobs: - name: Setup Python Environment uses: actions/setup-python@v2 with: - python-version: '3.8' + python-version: '3.10.14' + + - name: Install Chakra + run: | + pip install . + + - name: Install PARAM + run: | + git clone https://github.com/facebookresearch/param.git + cd param/train/compute/python/ + git checkout c83ce8429110a86549c40fec5a01acbd9fbd54a4 + pip install . - name: Install Dependencies run: | From 67badeb1708e8fabbcf0a4436a66f98c62a49fef Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 10:43:48 -0400 Subject: [PATCH 10/15] Refactor trace_link --- src/trace_link/trace_link.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/trace_link/trace_link.py b/src/trace_link/trace_link.py index 479ba799..55a7e105 100644 --- a/src/trace_link/trace_link.py +++ b/src/trace_link/trace_link.py @@ -7,14 +7,12 @@ def main() -> None: """ Main function to execute the trace linking process. - For more detailed steps on collecting traces and converting them to Chakra - traces, visit the guide at: + For more detailed steps on collecting traces and converting them to Chakra traces, visit the guide at: https://github.com/mlcommons/chakra/wiki/Chakra-Execution-Trace-Collection-%E2%80%90-A-Comprehensive-Guide-on-Merging-PyTorch-and-Kineto-Traces """ parser = argparse.ArgumentParser( - description="Link PyTorch execution trace with Kineto trace " - "to produce Chakra traces. For more information, " - "see the guide at https://github.com/mlcommons/chakra/wiki/Chakra-Execution-Trace-Collection-%E2%80%90-A-Comprehensive-Guide-on-Merging-PyTorch-and-Kineto-Traces" + description="Link PyTorch execution trace with Kineto trace to produce Chakra traces." + "For more information, see the guide at https://github.com/mlcommons/chakra/wiki/Chakra-Execution-Trace-Collection-%E2%80%90-A-Comprehensive-Guide-on-Merging-PyTorch-and-Kineto-Traces" ) parser.add_argument( "--pytorch-et-file", From e28d3b278a121a6d3ae21465f24bad86bc8b32e7 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 11:11:19 -0400 Subject: [PATCH 11/15] Refactor KinetoOperator --- src/trace_link/kineto_operator.py | 85 +++++++++++++++---------------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/src/trace_link/kineto_operator.py b/src/trace_link/kineto_operator.py index 22a8c18f..4cb588fc 100644 --- a/src/trace_link/kineto_operator.py +++ b/src/trace_link/kineto_operator.py @@ -1,13 +1,15 @@ from typing import Any, Dict, Optional -from param_bench.train.compute.python.tools.execution_trace import ( - Node as PyTorchOperator, -) +from param_bench.train.compute.python.tools.execution_trace import Node as PyTorchOperator class KinetoOperator: """ - Represents a single operator extracted from the Kineto trace. + Represents a single operator in a Kineto trace by default, with fields primarily sourced + from the Kineto traces. In addition to the default fields from Kineto traces, additional + fields have been introduced for postprocessing purposes. These additional fields facilitate + the correlation of PyTorch operators and the enforcement of dependencies among them, + enhancing trace analysis and utility. Attributes: op_dict (Dict[str, Any]): Dictionary containing the operator data. @@ -38,31 +40,38 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None: kineto_op (Dict[str, Any]): The dictionary representing the operator data. """ - self.op_dict = kineto_op - self.category = kineto_op.get("cat", "") - self.name = kineto_op.get("name", "") - self.phase = kineto_op.get("ph") - self.inclusive_dur = kineto_op.get("dur", 0) - self.exclusive_dur = kineto_op.get("dur", 0) - self.timestamp = kineto_op.get("ts", 0) - self.external_id = "" - self.ev_idx = "" - self.tid = kineto_op.get("tid", 0) + self.op_dict: Dict[str, Any] = kineto_op + self.category: str = kineto_op.get("cat", "") + self.name: str = kineto_op.get("name", "") + self.phase: Optional[str] = kineto_op.get("ph") + self.inclusive_dur: int = kineto_op.get("dur", 0) + self.exclusive_dur: int = kineto_op.get("dur", 0) + self.timestamp: int = kineto_op.get("ts", 0) + self.external_id: str = kineto_op.get("args", {}).get("External id", "") + self.ev_idx: str = kineto_op.get("args", {}).get("Ev Idx", "") + self.tid: int = kineto_op.get("tid", 0) self.pytorch_op: Optional[PyTorchOperator] = None - self.parent_pytorch_op_id = None + self.parent_pytorch_op_id: Optional[int] = None self.inter_thread_dep: Optional[int] = None - self.stream: Optional[int] = None - self.rf_id: Optional[int] = None - self.correlation: int = None + self.stream: Optional[int] = kineto_op.get("args", {}).get("stream") + self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id") + self.correlation: int = kineto_op.get("args", {}).get("correlation", -1) - if "args" in kineto_op: - self.external_id = kineto_op["args"].get("External id") - self.ev_idx = kineto_op["args"].get("Ev Idx", "") - self.stream = kineto_op["args"].get("stream") - if "Record function id" in kineto_op["args"]: - self.rf_id = int(kineto_op["args"]["Record function id"]) - if "correlation" in kineto_op["args"]: - self.correlation = int(kineto_op["args"]["correlation"]) + def __repr__(self) -> str: + """ + Represent the KinetoOperator as a string. + + Returns: + str: A string representation of the KinetoOperator. + """ + return ( + f"KinetoOperator(category={self.category}, name={self.name}, phase={self.phase}, " + f"inclusive_dur={self.inclusive_dur}, exclusive_dur={self.exclusive_dur}, " + f"timestamp={self.timestamp}, external_id={self.external_id}, ev_idx={self.ev_idx}, " + f"tid={self.tid}, parent_pytorch_op_id={self.parent_pytorch_op_id}, " + f"inter_thread_dep={self.inter_thread_dep}, stream={self.stream}, rf_id={self.rf_id}, " + f"correlation={self.correlation})" + ) def is_valid( self, @@ -73,6 +82,12 @@ def is_valid( """ Checks if the operator matches specified filtering criteria. + Comment (TODO): + This is legacy code from a previous implementation. Ideally, we should merge this logic + into trace_linker.py. The purpose of is_valid is ambiguous, and it is unclear whether + the function is essential. However, we keep it as it is to avoid breaking downstream + tools. After properly setting up CI/CD pipelines and testing, we can consider removing it. + Args: category (str): The category to check against. name_exception (str): A name to exclude in the check. @@ -87,21 +102,3 @@ def is_valid( and self.category == category and (phase is None or self.phase == phase) ) - - def __repr__(self) -> str: - """ - Represent the KinetoOperator as a string. - - Returns: - str: A string representation of the KinetoOperator. - """ - return ( - f"KinetoOperator(category={self.category}, " - f"name={self.name}, phase={self.phase}, " - f"inclusive_dur={self.inclusive_dur}, " - f"exclusive_dur={self.exclusive_dur}, " - f"timestamp={self.timestamp}, external_id={self.external_id}, " - f"ev_idx={self.ev_idx}, tid={self.tid}, " - f"rf_id={self.rf_id}, " - f"parent_pytorch_op_id={self.parent_pytorch_op_id})" - ) From 662cc7815fc97a1829526c47f02e621b9b799b65 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 10:52:16 -0400 Subject: [PATCH 12/15] Add unit tests for UniqueIdAssigner --- tests/trace_link/test_unique_id_assigner.py | 62 +++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/trace_link/test_unique_id_assigner.py diff --git a/tests/trace_link/test_unique_id_assigner.py b/tests/trace_link/test_unique_id_assigner.py new file mode 100644 index 00000000..bb0337ba --- /dev/null +++ b/tests/trace_link/test_unique_id_assigner.py @@ -0,0 +1,62 @@ +import pytest + +from src.trace_link.unique_id_assigner import UniqueIdAssigner + + +@pytest.fixture +def assigner(): + """Fixture to create a new UniqueIdAssigner instance for each test.""" + return UniqueIdAssigner() + + +def test_assign_or_retrieve_id_new(assigner): + """ + Test that a new unique ID is correctly assigned to a new original ID. + """ + first_id = assigner.assign_or_retrieve_id(10) + assert first_id == 0 # Expect the first assigned ID to be 0 + + +def test_assign_or_retrieve_id_existing(assigner): + """ + Test that the same original ID retrieves the same unique ID upon subsequent calls. + """ + first_id = assigner.assign_or_retrieve_id(10) + second_id = assigner.assign_or_retrieve_id(10) + assert second_id == first_id # Ensure it retrieves the same ID + + +def test_assign_or_retrieve_id_distinct(assigner): + """ + Test that different original IDs receive different unique IDs. + """ + first_id = assigner.assign_or_retrieve_id(10) + second_id = assigner.assign_or_retrieve_id(20) + assert second_id != first_id + assert second_id == 1 # This should be the next unique ID + + +def test_generate_new_id_sequence(assigner): + """ + Test that generate_new_id consistently returns incrementing IDs. + """ + ids = [assigner.generate_new_id() for _ in range(5)] + expected_ids = list(range(5)) + assert ids == expected_ids + + +def test_lookup_new_id_assigned(assigner): + """ + Test lookup of new IDs, ensuring assigned IDs return the correct new ID. + """ + original_id = 30 + new_id = assigner.assign_or_retrieve_id(original_id) + assert assigner.lookup_new_id(original_id) == new_id + + +def test_lookup_new_id_unassigned(assigner): + """ + Test lookup for an unassigned ID returns the original ID. + """ + unassigned_id = 40 + assert assigner.lookup_new_id(unassigned_id) == unassigned_id From 6b8d5e19f1542c9d33e734d7313e9a4672f64df3 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 11:13:41 -0400 Subject: [PATCH 13/15] Add unit tests for KinetoOperator --- tests/trace_link/test_kineto_operator.py | 60 ++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/trace_link/test_kineto_operator.py diff --git a/tests/trace_link/test_kineto_operator.py b/tests/trace_link/test_kineto_operator.py new file mode 100644 index 00000000..8561609b --- /dev/null +++ b/tests/trace_link/test_kineto_operator.py @@ -0,0 +1,60 @@ +import pytest + +from src.trace_link.kineto_operator import KinetoOperator + + +@pytest.fixture +def sample_operator_data(): + """Provides sample Kineto trace data for testing.""" + return { + "cat": "Kernel", + "name": "cudaLaunchKernel", + "ph": "X", + "dur": 100, + "ts": 1590000000, + "tid": 1234, + "args": {"External id": "ext123", "Ev Idx": "ev456", "stream": 7, "Record function id": 12, "correlation": 99}, + } + + +def test_init_kineto_operator(sample_operator_data): + """Test the initialization and attribute assignment of KinetoOperator.""" + operator = KinetoOperator(sample_operator_data) + assert operator.category == "Kernel" + assert operator.name == "cudaLaunchKernel" + assert operator.phase == "X" + assert operator.inclusive_dur == 100 + assert operator.exclusive_dur == 100 + assert operator.timestamp == 1590000000 + assert operator.external_id == "ext123" + assert operator.ev_idx == "ev456" + assert operator.tid == 1234 + assert operator.stream == 7 + assert operator.rf_id == 12 + assert operator.correlation == 99 + assert operator.pytorch_op is None # Ensure default None + assert operator.parent_pytorch_op_id is None # Ensure default None + assert operator.inter_thread_dep is None # Ensure default None + + +def test_repr_method(sample_operator_data): + """Test the __repr__ method output.""" + operator = KinetoOperator(sample_operator_data) + expected_repr = ( + "KinetoOperator(category=Kernel, name=cudaLaunchKernel, phase=X, " + "inclusive_dur=100, exclusive_dur=100, timestamp=1590000000, external_id=ext123, ev_idx=ev456, " + "tid=1234, parent_pytorch_op_id=None, inter_thread_dep=None, stream=7, rf_id=12, " + "correlation=99)" + ) + assert repr(operator) == expected_repr + + +def test_is_valid_method(sample_operator_data): + """Test the is_valid method under various conditions.""" + operator = KinetoOperator(sample_operator_data) + assert operator.is_valid(category="Kernel") # Matching category + assert not operator.is_valid(category="Memory") # Non-matching category + assert operator.is_valid(category="Kernel", name_exception="cudaMalloc") # Matching category, name not excluded + assert not operator.is_valid(category="Kernel", name_exception="cudaLaunchKernel") # Name excluded + assert operator.is_valid(category="Kernel", phase="X") # Matching phase + assert not operator.is_valid(category="Kernel", phase="B") # Non-matching phase From f1a101a669e1ebcbd78fa42fda559ffaf83a1c3b Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 11:47:26 -0400 Subject: [PATCH 14/15] Update Python version in python_lint.yml --- .github/workflows/python_lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python_lint.yml b/.github/workflows/python_lint.yml index a0683f20..29b77efa 100644 --- a/.github/workflows/python_lint.yml +++ b/.github/workflows/python_lint.yml @@ -13,7 +13,7 @@ jobs: - name: Setup Python Environment uses: actions/setup-python@v2 with: - python-version: '3.8' + python-version: '3.10.14' - name: Install Dependencies run: | From 1ca3c43d04ddc6e7c6d84db8ce745826f6e956aa Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 11:48:22 -0400 Subject: [PATCH 15/15] Add nightly tests --- .github/workflows/nightly_tests.yml | 38 +++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 .github/workflows/nightly_tests.yml diff --git a/.github/workflows/nightly_tests.yml b/.github/workflows/nightly_tests.yml new file mode 100644 index 00000000..bb52bdc6 --- /dev/null +++ b/.github/workflows/nightly_tests.yml @@ -0,0 +1,38 @@ +name: Nightly Tests + +on: + schedule: + # Runs at 2 AM UTC every day + - cron: '0 2 * * *' + +jobs: + nightly-tests: + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v2 + + - name: Setup Python Environment + uses: actions/setup-python@v2 + with: + python-version: '3.10.14' + + - name: Install Chakra + run: | + pip install . + + - name: Install PARAM + run: | + git clone https://github.com/facebookresearch/param.git + cd param/train/compute/python/ + git checkout c83ce8429110a86549c40fec5a01acbd9fbd54a4 + pip install . + + - name: Install Dependencies + run: | + pip install -r requirements-dev.txt + + - name: Run Unit Tests + run: | + python -m pytest -vv tests