diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 71696ca..32f7696 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -541,7 +541,7 @@ def map_host_to_device_ops( ]: """Map Chakra host operators to corresponding device operators.""" logging.debug("Mapping Charka host operators to corresponding device operators.") - cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( + cpu_external_id_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( kineto_gpu_ops, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts ) @@ -569,7 +569,7 @@ def map_host_to_device_ops( ) = self.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_device_op_map, kineto_external_id_to_kineto_op_map, ) @@ -593,7 +593,7 @@ def group_gpu_ops_by_cpu_launchers( """ Group GPU operators based on their corresponding CPU launchers. - This is determined by the 'ev_idx' which links GPU operators to their initiating CPU launcher events. + This is determined by the 'external_id' which links GPU operators to their initiating CPU launcher events. Args: kineto_gpu_ops (List[KinetoOperator]): List of Kineto GPU operators. @@ -607,9 +607,9 @@ def group_gpu_ops_by_cpu_launchers( Dict[int, List[KinetoOperator]]: Mapping from CPU launch event indices to GPU operators. Raises: - ValueError: If 'ev_idx' is missing for any GPU operator. + ValueError: If 'external_id' is missing for any GPU operator. """ - cpu_ev_idx_to_gpu_ops_map = {} + cpu_external_id_to_gpu_ops_map = {} for gpu_op in kineto_gpu_ops: parent_cpu_op = self.find_parent_cpu_op( gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts @@ -619,9 +619,9 @@ def group_gpu_ops_by_cpu_launchers( logging.warning(warning_msg) continue - if parent_cpu_op.ev_idx == "": + if parent_cpu_op.external_id == "": error_msg = ( - f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. " + f"Missing 'external_id' for CPU operator {parent_cpu_op.name}. " f"Cannot link GPU op {gpu_op.name} to {parent_cpu_op.name}." ) logging.warning(error_msg) @@ -629,9 +629,9 @@ def group_gpu_ops_by_cpu_launchers( logging.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") - cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op) + cpu_external_id_to_gpu_ops_map.setdefault(parent_cpu_op.external_id, []).append(gpu_op) - return cpu_ev_idx_to_gpu_ops_map + return cpu_external_id_to_gpu_ops_map def find_parent_cpu_op( self, @@ -713,48 +713,35 @@ def find_closest_op( Returns: Optional[KinetoOperator]: The closest Kineto operator if found. """ - # Searching for the closest timestamp index + # Step 1: Find the initial closest index index = bisect.bisect_left(sorted_kineto_cpu_op_ts, ts) if index == 0: # All operators are later than the timestamp return None - else: - # The operator immediately before the index is the closest one before the timestamp - closest_op = sorted_kineto_cpu_ops[index - 1] - - # Check for NCCL specifics: if it's an NCCL operation and 'nccl:coalesced' should be skipped - if "nccl" in kineto_gpu_op.name.lower() and closest_op.name == "nccl:coalesced": - # Move back to find a non-'nccl:coalesced' operator, if available - for new_index in range(index - 2, -1, -1): - potential_op = sorted_kineto_cpu_ops[new_index] - if potential_op.tid == kineto_gpu_op.tid and potential_op.name != "nccl:coalesced": - return potential_op - # If no valid alternative found before 'nccl:coalesced', continue search forward - index = index - 1 # Adjust index to skip 'nccl:coalesced' - - # After skipping 'nccl:coalesced', verify that the closest operation is on the same thread - # as the GPU operation - if closest_op.tid == kineto_gpu_op.tid: - return closest_op - - # If the tids do not match, search forward to find the closest matching tid - for i in range(index - 1, -1, -1): - op = sorted_kineto_cpu_ops[i] - if op.tid == kineto_gpu_op.tid: - if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": - continue # Skip 'nccl:coalesced' if it's an NCCL-related GPU operation - if op.timestamp <= ts: - return op - - # If no matching tid is found going forward, return None - return None + + # Step 2: Find the closest operator + tid_only_match = None # Track the best operator with matching tid + for i in range(index - 1, -1, -1): + op = sorted_kineto_cpu_ops[i] + # Skip 'nccl:coalesced' for NCCL-related GPU operations + if "nccl" in kineto_gpu_op.name.lower() and op.name == "nccl:coalesced": + continue + # Return the operator matching both tid and external_id + if op.tid == kineto_gpu_op.tid and op.external_id == kineto_gpu_op.external_id: + return op + # Track the tid_only_match operator with matching tid if no full match is found + if tid_only_match is None and op.tid == kineto_gpu_op.tid: + tid_only_match = op + + # Step 3: Return the best match or None if no match is found + return tid_only_match def link_ops( self, host_op: PyTorchOperator, kineto_op: KinetoOperator, - cpu_ev_idx_to_gpu_ops_map: Dict[int, List[KinetoOperator]], + cpu_external_id_to_gpu_ops_map: Dict[int, List[KinetoOperator]], kineto_rf_id_to_device_op_map: Dict[int, KinetoOperator], kineto_external_id_to_kineto_op_map: Dict[int, KinetoOperator], ) -> Tuple[List[KinetoOperator], int, int, int, Optional[int]]: @@ -764,7 +751,7 @@ def link_ops( Args: host_op (PyTorchOperator): Chakra host operator to link. kineto_op (KinetoOperator): Corresponding Kineto operator. - cpu_ev_idx_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping. + cpu_external_id_to_gpu_ops_map (Dict[int, List[KinetoOperator]]): GPU ops mapping. kineto_rf_id_to_device_op_map (Dict[int, KinetoOperator]): Kineto operator mapping. kineto_external_id_to_kineto_op_map (Dict[int, KinetoOperator]): Mapping from external id to KinetoOperators. @@ -779,7 +766,7 @@ def link_ops( - List[int]: List of synchronization dependency IDs. """ kineto_op.host_op = host_op - linked_gpu_ops = cpu_ev_idx_to_gpu_ops_map.get(kineto_op.ev_idx, []) + linked_gpu_ops = cpu_external_id_to_gpu_ops_map.get(kineto_op.external_id, []) inclusive_dur = kineto_op.inclusive_dur exclusive_dur = kineto_op.exclusive_dur timestamp = kineto_op.timestamp diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index a0441ae..8430867 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -381,14 +381,14 @@ def test_group_gpu_ops_by_cpu_launchers(trace_linker): kineto_gpu_op2.tid = 2 kineto_runtime_op1 = MagicMock(spec=KinetoOperator) - kineto_runtime_op1.ev_idx = "cpu_op1" + kineto_runtime_op1.external_id = "cpu_op1" kineto_runtime_op1.timestamp = 100 kineto_runtime_op1.tid = 1 kineto_runtime_op1.name = "runtime_op1" kineto_runtime_op1.correlation = 123 kineto_runtime_op2 = MagicMock(spec=KinetoOperator) - kineto_runtime_op2.ev_idx = "cpu_op2" + kineto_runtime_op2.external_id = "cpu_op2" kineto_runtime_op2.timestamp = 200 kineto_runtime_op2.tid = 2 kineto_runtime_op2.name = "runtime_op2" @@ -445,7 +445,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): MagicMock(spec=PyTorchOperator, id=1), MagicMock( spec=KinetoOperator, - ev_idx="1", + external_id="1", inclusive_dur=100, exclusive_dur=50, timestamp=123456, @@ -461,7 +461,7 @@ def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): MagicMock(spec=PyTorchOperator, id=2), MagicMock( spec=KinetoOperator, - ev_idx="2", + external_id="2", inclusive_dur=200, exclusive_dur=150, timestamp=223456, @@ -491,7 +491,7 @@ def test_link_ops( ): mock_get_inter_thread_dep.return_value = expected_inter_thread_dep - cpu_ev_idx_to_gpu_ops_map = {kineto_op.ev_idx: expected_linked_gpu_ops} + cpu_external_id_to_gpu_ops_map = {kineto_op.external_id: expected_linked_gpu_ops} kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=42))} kineto_external_id_to_kineto_op_map = { 2: MagicMock(spec=KinetoOperator, host_op=MagicMock(id=3)), @@ -501,7 +501,7 @@ def test_link_ops( result = trace_linker.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_kineto_op_map, kineto_external_id_to_kineto_op_map, ) @@ -520,7 +520,7 @@ def test_link_ops_with_no_gpu_ops(trace_linker): host_op = MagicMock(spec=PyTorchOperator, id=1) kineto_op = MagicMock( spec=KinetoOperator, - ev_idx="1", + external_id="1", inclusive_dur=100, exclusive_dur=50, timestamp=123456, @@ -529,14 +529,14 @@ def test_link_ops_with_no_gpu_ops(trace_linker): sync_dep=[], ) - cpu_ev_idx_to_gpu_ops_map = {} + cpu_external_id_to_gpu_ops_map = {} kineto_rf_id_to_kineto_op_map = {} kineto_external_id_to_kineto_op_map = {} result = trace_linker.link_ops( host_op, kineto_op, - cpu_ev_idx_to_gpu_ops_map, + cpu_external_id_to_gpu_ops_map, kineto_rf_id_to_kineto_op_map, kineto_external_id_to_kineto_op_map, )