diff --git a/cirq-core/cirq/transformers/routing/mapping_manager.py b/cirq-core/cirq/transformers/routing/mapping_manager.py index 2b21be25ca4..aee2eab7757 100644 --- a/cirq-core/cirq/transformers/routing/mapping_manager.py +++ b/cirq-core/cirq/transformers/routing/mapping_manager.py @@ -14,21 +14,25 @@ """Manages the mapping from logical to physical qubits during a routing procedure.""" -from typing import Dict, Sequence, TYPE_CHECKING +from typing import List, Dict, Sequence, TYPE_CHECKING import networkx as nx -from cirq import protocols, value +import numpy as np if TYPE_CHECKING: import cirq -@value.value_equality class MappingManager: """Class that manages the mapping from logical to physical qubits. - Convenience methods over distance and mapping queries of the physical qubits are also provided. - All such public methods of this class expect logical qubits. + For efficiency, the mapping manager maps all logical and physical qubits to integers, and + maintains a mapping from logical qubit integers to physical qubit integers. This speedup is + important to avoid qubit hashing in hot-paths like querying distance of two logical qubits + on the device (via `dist_on_device` method). + + All public methods of this class expect logical qubits (or corresponding integers that the + logical qubits are mapped to, via `self.logical_qid_to_int` map). """ def __init__( @@ -36,132 +40,170 @@ def __init__( ) -> None: """Initializes MappingManager. - Sorts the nodes and edges in the device graph to guarantee graph equality. If undirected, - also sorts the nodes within each edge. - Args: device_graph: connectivity graph of qubits in the hardware device. initial_mapping: the initial mapping of logical (keys) to physical qubits (values). """ - # make sure edge insertion order is the same amongst equivalent graphs. - if nx.is_directed(device_graph): - self.device_graph = nx.DiGraph() - self.device_graph.add_nodes_from(sorted(list(device_graph.nodes(data=True)))) - self.device_graph.add_edges_from(sorted(list(device_graph.edges))) - else: - self.device_graph = nx.Graph() - self.device_graph.add_nodes_from(sorted(list(device_graph.nodes(data=True)))) - self.device_graph.add_edges_from( - sorted(list(sorted(edge) for edge in device_graph.edges)) - ) - - self._map = initial_mapping.copy() - self._inverse_map = {v: k for k, v in self._map.items()} - self._induced_subgraph = nx.induced_subgraph(self.device_graph, self._map.values()) + # Map both logical and physical qubits to integers. + self._logical_qid_to_int = {q: i for i, q in enumerate(sorted(initial_mapping.keys()))} + self._int_to_logical_qid = sorted( + self._logical_qid_to_int.keys(), key=lambda x: self._logical_qid_to_int[x] + ) + self._physical_qid_to_int = {q: i for i, q in enumerate(sorted(initial_mapping.values()))} + self._int_to_physical_qid = sorted( + self._physical_qid_to_int.keys(), key=lambda x: self._physical_qid_to_int[x] + ) + logical_qubits, physical_qubits = ( + zip(*[(k, v) for k, v in initial_mapping.items()]) if initial_mapping else ([], []) + ) + num_qubits = len(logical_qubits) + self._logical_to_physical = np.asarray( + [ + self._physical_qid_to_int[physical_qubits[i]] + for i in sorted( + range(num_qubits), key=lambda x: self._logical_qid_to_int[logical_qubits[x]] + ) + ] + ) + self._physical_to_logical = np.asarray( + [ + self._logical_qid_to_int[logical_qubits[i]] + for i in sorted( + range(num_qubits), key=lambda x: self._physical_qid_to_int[physical_qubits[x]] + ) + ] + ) + # Construct the induced subgraph (on integers) and corresponding distance matrix. + self._induced_subgraph_int = nx.relabel_nodes( + nx.induced_subgraph(device_graph, initial_mapping.values()), + {q: self._physical_qid_to_int[q] for q in initial_mapping.values()}, + ) + # Compute floyd warshall dictionary. self._predecessors, self._distances = nx.floyd_warshall_predecessor_and_distance( - self._induced_subgraph + self._induced_subgraph_int ) @property - def map(self) -> Dict['cirq.Qid', 'cirq.Qid']: - """The mapping of logical qubits (keys) to physical qubits (values).""" - return self._map + def physical_qid_to_int(self) -> Dict['cirq.Qid', int]: + """Mapping of physical qubits, that were part of the initial mapping, to unique integers.""" + return self._physical_qid_to_int + + @property + def int_to_physical_qid(self) -> List['cirq.Qid']: + """Inverse mapping of unique integers to corresponding physical qubits. + + `self.physical_qid_to_int[self.int_to_physical_qid[i]] == i` for each i. + """ + return self._int_to_physical_qid + + @property + def logical_qid_to_int(self) -> Dict['cirq.Qid', int]: + """Mapping of logical qubits, that were part of the initial mapping, to unique integers.""" + return self._logical_qid_to_int @property - def inverse_map(self) -> Dict['cirq.Qid', 'cirq.Qid']: - """The mapping of physical qubits (keys) to logical qubits (values).""" - return self._inverse_map + def int_to_logical_qid(self) -> List['cirq.Qid']: + """Inverse mapping of unique integers to corresponding physical qubits. + + `self.logical_qid_to_int[self.int_to_logical_qid[i]] == i` for each i. + """ + return self._int_to_logical_qid + + @property + def logical_to_physical(self) -> np.ndarray: + """The mapping of logical qubit integers to physical qubit integers. + + Let `lq: cirq.Qid` be a logical qubit. Then the corresponding physical qubit that it + maps to can be obtained by: + `self.int_to_physical_qid[self.logical_to_physical[self.logical_qid_to_int[lq]]]` + """ + return self._logical_to_physical + + @property + def physical_to_logical(self) -> np.ndarray: + """The mapping of physical qubits integers to logical qubits integers. + + Let `pq: cirq.Qid` be a physical qubit. Then the corresponding logical qubit that it + maps to can be obtained by: + `self.int_to_logical_qid[self.physical_to_logical[self.physical_qid_to_int[pq]]]` + """ + return self._physical_to_logical @property - def induced_subgraph(self) -> nx.Graph: - """The induced subgraph on the set of physical qubits which are part of `self.map`.""" - return self._induced_subgraph + def induced_subgraph_int(self) -> nx.Graph: + """Induced subgraph on physical qubit integers present in `self.logical_to_physical`.""" + return self._induced_subgraph_int - def dist_on_device(self, lq1: 'cirq.Qid', lq2: 'cirq.Qid') -> int: + def dist_on_device(self, lq1: int, lq2: int) -> int: """Finds distance between logical qubits 'lq1' and 'lq2' on the device. Args: - lq1: the first logical qubit. - lq2: the second logical qubit. + lq1: integer corresponding to the first logical qubit. + lq2: integer corresponding to the second logical qubit. Returns: The shortest path distance. """ - return self._distances[self._map[lq1]][self._map[lq2]] + return self._distances[self.logical_to_physical[lq1]][self.logical_to_physical[lq2]] - def can_execute(self, op: 'cirq.Operation') -> bool: - """Finds whether the given operation acts on qubits that are adjacent on the device. + def is_adjacent(self, lq1: int, lq2: int) -> bool: + """Finds whether logical qubits `lq1` and `lq2` are adjacent on the device. Args: - op: an operation on logical qubits. + lq1: integer corresponding to the first logical qubit. + lq2: integer corresponding to the second logical qubit. Returns: - True, if physical qubits corresponding to logical qubits `op.qubits` are adjacent on + True, if physical qubits corresponding to `lq1` and `lq2` are adjacent on the device. """ - return protocols.num_qubits(op) < 2 or self.dist_on_device(*op.qubits) == 1 + return self.dist_on_device(lq1, lq2) == 1 - def apply_swap(self, lq1: 'cirq.Qid', lq2: 'cirq.Qid') -> None: + def apply_swap(self, lq1: int, lq2: int) -> None: """Updates the mapping to simulate inserting a swap operation between `lq1` and `lq2`. Args: - lq1: the first logical qubit. - lq2: the second logical qubit. + lq1: integer corresponding to the first logical qubit. + lq2: integer corresponding to the second logical qubit. Raises: - ValueError: whenever lq1 and lq2 are no adjacent on the device. + ValueError: whenever lq1 and lq2 are not adjacent on the device. """ if self.dist_on_device(lq1, lq2) > 1: raise ValueError( f"q1: {lq1} and q2: {lq2} are not adjacent on the device. Cannot swap them." ) - pq1, pq2 = self._map[lq1], self._map[lq2] - self._map[lq1], self._map[lq2] = self._map[lq2], self._map[lq1] - - self._inverse_map[pq1], self._inverse_map[pq2] = ( - self._inverse_map[pq2], - self._inverse_map[pq1], - ) + pq1, pq2 = self.logical_to_physical[lq1], self.logical_to_physical[lq2] + self._logical_to_physical[[lq1, lq2]] = self._logical_to_physical[[lq2, lq1]] + self._physical_to_logical[[pq1, pq2]] = self._physical_to_logical[[pq2, pq1]] def mapped_op(self, op: 'cirq.Operation') -> 'cirq.Operation': - """Transforms the given operation with the qubits in self._map. + """Transforms the given logical operation to act on corresponding physical qubits. Args: - op: an operation on logical qubits. + op: logical operation acting on logical qubits. Returns: - The same operation on corresponding physical qubits.""" - return op.transform_qubits(self._map) + The same operation acting on corresponding physical qubits. + """ + logical_ints = [self._logical_qid_to_int[q] for q in op.qubits] + physical_ints = self.logical_to_physical[logical_ints] + qubit_map: Dict['cirq.Qid', 'cirq.Qid'] = { + q: self._int_to_physical_qid[physical_ints[i]] for i, q in enumerate(op.qubits) + } + return op.transform_qubits(qubit_map) - def shortest_path(self, lq1: 'cirq.Qid', lq2: 'cirq.Qid') -> Sequence['cirq.Qid']: - """Find the shortest path between two logical qubits on the device given their mapping. + def shortest_path(self, lq1: int, lq2: int) -> Sequence[int]: + """Find the shortest path between two logical qubits on the device, given their mapping. Args: - lq1: the first logical qubit. - lq2: the second logical qubit. + lq1: integer corresponding to the first logical qubit. + lq2: integer corresponding to the second logical qubit. Returns: - A sequence of logical qubits on the shortest path from lq1 to lq2. + A sequence of logical qubit integers on the shortest path from `lq1` to `lq2`. """ - return [ - self._inverse_map[pq] - for pq in nx.reconstruct_path(self._map[lq1], self._map[lq2], self._predecessors) + return self.physical_to_logical[ + nx.reconstruct_path(*self.logical_to_physical[[lq1, lq2]], self._predecessors) ] - - def _value_equality_values_(self): - graph_equality = ( - tuple(self.device_graph.nodes), - tuple(self.device_graph.edges), - nx.is_directed(self.device_graph), - ) - map_equality = tuple(sorted(self._map.items())) - return (graph_equality, map_equality) - - def __repr__(self) -> str: - graph_type = type(self.device_graph).__name__ - return ( - f'cirq.MappingManager(' - f'nx.{graph_type}({dict(self.device_graph.adjacency())}),' - f' {self._map})' - ) diff --git a/cirq-core/cirq/transformers/routing/mapping_manager_test.py b/cirq-core/cirq/transformers/routing/mapping_manager_test.py index 87b4696cf42..d4ef161d119 100644 --- a/cirq-core/cirq/transformers/routing/mapping_manager_test.py +++ b/cirq-core/cirq/transformers/routing/mapping_manager_test.py @@ -50,12 +50,15 @@ def test_induced_subgraph(): (cirq.NamedQubit("c"), cirq.NamedQubit("d")), ] ) - assert graphs_equal(mm.induced_subgraph, expected_induced_subgraph) + assert graphs_equal( + mm.induced_subgraph_int, nx.relabel_nodes(expected_induced_subgraph, mm.physical_qid_to_int) + ) def test_mapped_op(): device_graph, initial_mapping, q = construct_device_graph_and_mapping() mm = cirq.MappingManager(device_graph, initial_mapping) + q_int = [mm.logical_qid_to_int[q[i]] if q[i] in initial_mapping else -1 for i in range(len(q))] assert mm.mapped_op(cirq.CNOT(q[1], q[3])).qubits == ( cirq.NamedQubit("a"), @@ -68,7 +71,7 @@ def test_mapped_op(): ) # correctly changes mapped qubits when swapped - mm.apply_swap(q[2], q[3]) + mm.apply_swap(q_int[2], q_int[3]) assert mm.mapped_op(cirq.CNOT(q[1], q[2])).qubits == ( cirq.NamedQubit("a"), cirq.NamedQubit("b"), @@ -80,146 +83,70 @@ def test_mapped_op(): ) -def test_distance_on_device_and_can_execute(): +def test_distance_on_device_and_is_adjacent(): device_graph, initial_mapping, q = construct_device_graph_and_mapping() mm = cirq.MappingManager(device_graph, initial_mapping) + q_int = [mm.logical_qid_to_int[q[i]] if q[i] in initial_mapping else -1 for i in range(len(q))] # adjacent qubits have distance 1 and are thus executable - assert mm.dist_on_device(q[1], q[3]) == 1 - assert mm.can_execute(cirq.CNOT(q[1], q[3])) + assert mm.dist_on_device(q_int[1], q_int[3]) == 1 + assert mm.is_adjacent(q_int[1], q_int[3]) # non-adjacent qubits with distance > 1 are not executable - assert mm.dist_on_device(q[1], q[2]) == 2 - assert mm.can_execute(cirq.CNOT(q[1], q[2])) is False + assert mm.dist_on_device(q_int[1], q_int[2]) == 2 + assert mm.is_adjacent(q_int[1], q_int[2]) is False # 'dist_on_device' does not use cirq.NamedQubit("e") to find shorter shortest path - assert mm.dist_on_device(q[1], q[4]) == 3 + assert mm.dist_on_device(q_int[1], q_int[4]) == 3 # distance changes after applying swap - mm.apply_swap(q[2], q[3]) - assert mm.dist_on_device(q[1], q[3]) == 2 - assert mm.can_execute(cirq.CNOT(q[1], q[3])) is False - assert mm.dist_on_device(q[1], q[2]) == 1 - assert mm.can_execute(cirq.CNOT(q[1], q[2])) + mm.apply_swap(q_int[2], q_int[3]) + assert mm.dist_on_device(q_int[1], q_int[3]) == 2 + assert mm.is_adjacent(q_int[1], q_int[3]) is False + assert mm.dist_on_device(q_int[1], q_int[2]) == 1 + assert mm.is_adjacent(q_int[1], q_int[2]) # distance between other qubits doesn't change - assert mm.dist_on_device(q[1], q[4]) == 3 + assert mm.dist_on_device(q_int[1], q_int[4]) == 3 def test_apply_swap(): device_graph, initial_mapping, q = construct_device_graph_and_mapping() mm = cirq.MappingManager(device_graph, initial_mapping) + q_int = [mm.logical_qid_to_int[q[i]] if q[i] in initial_mapping else -1 for i in range(len(q))] # swapping non-adjacent qubits raises error with pytest.raises(ValueError): - mm.apply_swap(q[1], q[2]) + mm.apply_swap(q_int[1], q_int[2]) # applying swap on same qubit does nothing - map_before_swap = mm.map.copy() - mm.apply_swap(q[1], q[1]) - assert map_before_swap == mm.map + logical_to_physical_before_swap = mm.logical_to_physical.copy() + mm.apply_swap(q_int[1], q_int[1]) + assert all(logical_to_physical_before_swap == mm.logical_to_physical) # applying same swap twice does nothing - mm.apply_swap(q[1], q[3]) - mm.apply_swap(q[1], q[3]) - assert map_before_swap == mm.map + mm.apply_swap(q_int[1], q_int[3]) + mm.apply_swap(q_int[1], q_int[3]) + assert all(logical_to_physical_before_swap == mm.logical_to_physical) # qubits in inverse map get swapped correctly - assert mm.inverse_map == {v: k for k, v in mm.map.items()} + for i in range(len(mm.logical_to_physical)): + assert mm.logical_to_physical[mm.physical_to_logical[i]] == i + assert mm.physical_to_logical[mm.logical_to_physical[i]] == i def test_shortest_path(): device_graph, initial_mapping, q = construct_device_graph_and_mapping() mm = cirq.MappingManager(device_graph, initial_mapping) - - one_to_four = [q[1], q[3], q[2], q[4]] - assert mm.shortest_path(q[1], q[2]) == one_to_four[:3] - assert mm.shortest_path(q[1], q[4]) == one_to_four + q_int = [mm.logical_qid_to_int[q[i]] if q[i] in initial_mapping else -1 for i in range(len(q))] + one_to_four = [q_int[1], q_int[3], q_int[2], q_int[4]] + assert all(mm.shortest_path(q_int[1], q_int[2]) == one_to_four[:3]) + assert all(mm.shortest_path(q_int[1], q_int[4]) == one_to_four) # shortest path on symmetric qubit reverses the list - assert mm.shortest_path(q[4], q[1]) == one_to_four[::-1] + assert all(mm.shortest_path(q_int[4], q_int[1]) == one_to_four[::-1]) # swapping changes shortest paths involving the swapped qubits - mm.apply_swap(q[3], q[2]) + mm.apply_swap(q_int[3], q_int[2]) one_to_four[1], one_to_four[2] = one_to_four[2], one_to_four[1] - assert mm.shortest_path(q[1], q[4]) == one_to_four - assert mm.shortest_path(q[1], q[2]) == [q[1], q[2]] - - -def test_value_equality(): - equals_tester = cirq.testing.EqualsTester() - device_graph, initial_mapping, q = construct_device_graph_and_mapping() - - mm = cirq.MappingManager(device_graph, initial_mapping) - - # same as 'device_graph' but with different insertion order of edges - diff_edge_order = nx.Graph( - [ - (cirq.NamedQubit("a"), cirq.NamedQubit("b")), - (cirq.NamedQubit("e"), cirq.NamedQubit("d")), - (cirq.NamedQubit("c"), cirq.NamedQubit("d")), - (cirq.NamedQubit("a"), cirq.NamedQubit("e")), - (cirq.NamedQubit("b"), cirq.NamedQubit("c")), - ] - ) - mm_edge_order = cirq.MappingManager(diff_edge_order, initial_mapping) - equals_tester.add_equality_group(mm, mm_edge_order) - - # same as 'device_graph' but with directed edges (DiGraph) - device_digraph = nx.DiGraph( - [ - (cirq.NamedQubit("a"), cirq.NamedQubit("b")), - (cirq.NamedQubit("b"), cirq.NamedQubit("c")), - (cirq.NamedQubit("c"), cirq.NamedQubit("d")), - (cirq.NamedQubit("a"), cirq.NamedQubit("e")), - (cirq.NamedQubit("e"), cirq.NamedQubit("d")), - ] - ) - mm_digraph = cirq.MappingManager(device_digraph, initial_mapping) - equals_tester.add_equality_group(mm_digraph) - - # same as 'device_graph' but with an added isolated node - isolated_vertex_graph = nx.Graph( - [ - (cirq.NamedQubit("a"), cirq.NamedQubit("b")), - (cirq.NamedQubit("b"), cirq.NamedQubit("c")), - (cirq.NamedQubit("c"), cirq.NamedQubit("d")), - (cirq.NamedQubit("a"), cirq.NamedQubit("e")), - (cirq.NamedQubit("e"), cirq.NamedQubit("d")), - ] - ) - isolated_vertex_graph.add_node(cirq.NamedQubit("z")) - mm = cirq.MappingManager(isolated_vertex_graph, initial_mapping) - equals_tester.add_equality_group(isolated_vertex_graph) - - # mapping manager with same initial graph and initial mapping as 'mm' but with different - # current state - mm_with_swap = cirq.MappingManager(device_graph, initial_mapping) - mm_with_swap.apply_swap(q[1], q[3]) - equals_tester.add_equality_group(mm_with_swap) - - -def test_repr(): - device_graph, initial_mapping, _ = construct_device_graph_and_mapping() - mm = cirq.MappingManager(device_graph, initial_mapping) - cirq.testing.assert_equivalent_repr(mm, setup_code='import cirq\nimport networkx as nx') - - device_digraph = nx.DiGraph( - [ - (cirq.NamedQubit("a"), cirq.NamedQubit("b")), - (cirq.NamedQubit("b"), cirq.NamedQubit("c")), - (cirq.NamedQubit("c"), cirq.NamedQubit("d")), - (cirq.NamedQubit("a"), cirq.NamedQubit("e")), - (cirq.NamedQubit("e"), cirq.NamedQubit("d")), - ] - ) - mm_digraph = cirq.MappingManager(device_digraph, initial_mapping) - cirq.testing.assert_equivalent_repr(mm_digraph, setup_code='import cirq\nimport networkx as nx') - - -def test_str(): - device_graph, initial_mapping, _ = construct_device_graph_and_mapping() - mm = cirq.MappingManager(device_graph, initial_mapping) - assert ( - str(mm) - == f'cirq.MappingManager(nx.Graph({dict(device_graph.adjacency())}), {initial_mapping})' - ) + assert all(mm.shortest_path(q_int[1], q_int[4]) == one_to_four) + assert all(mm.shortest_path(q_int[1], q_int[2]) == [q_int[1], q_int[2]]) diff --git a/cirq-core/cirq/transformers/routing/route_circuit_cqc.py b/cirq-core/cirq/transformers/routing/route_circuit_cqc.py index 7a35824e5d1..32868af9338 100644 --- a/cirq-core/cirq/transformers/routing/route_circuit_cqc.py +++ b/cirq-core/cirq/transformers/routing/route_circuit_cqc.py @@ -14,7 +14,7 @@ """Heuristic qubit routing algorithm based on arxiv:1902.08091.""" -from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING +from typing import Any, Dict, List, Optional, Set, Sequence, Tuple, TYPE_CHECKING from itertools import combinations import networkx as nx @@ -25,22 +25,21 @@ if TYPE_CHECKING: import cirq -QidPair = Tuple['cirq.Qid', 'cirq.Qid'] +QidIntPair = Tuple[int, int] -def disjoint_nc2_combinations(qubit_pairs: List[QidPair]) -> List[Tuple[QidPair, ...]]: +def _disjoint_nc2_combinations( + qubit_pairs: Sequence[QidIntPair], +) -> List[Tuple[QidIntPair, QidIntPair]]: """Gets disjoint pair combinations of qubits pairs. For example: - >>> q = cirq.LineQubit.range(5) - >>> disjoint_swaps = cirq.transformers.routing.route_circuit_cqc.disjoint_nc2_combinations( + >>> q = [*range(5)] + >>> disjoint_swaps = cirq.transformers.routing.route_circuit_cqc._disjoint_nc2_combinations( ... [(q[0], q[1]), (q[2], q[3]), (q[1], q[4])] ... ) - >>> disjoint_swaps == [ - ... ((cirq.LineQubit(0), cirq.LineQubit(1)), (cirq.LineQubit(2), cirq.LineQubit(3))), - ... ((cirq.LineQubit(2), cirq.LineQubit(3)), (cirq.LineQubit(1), cirq.LineQubit(4))) - ... ] + >>> disjoint_swaps == [((q[0], q[1]), (q[2], q[3])), ((q[2], q[3]), (q[1], q[4]))] True Args: @@ -48,7 +47,6 @@ def disjoint_nc2_combinations(qubit_pairs: List[QidPair]) -> List[Tuple[QidPair, Returns: All 2-combinations between qubit pairs that are disjoint. - """ return [pair for pair in combinations(qubit_pairs, 2) if set(pair[0]).isdisjoint(pair[1])] @@ -232,7 +230,10 @@ def route_circuit( return ( circuits.Circuit(circuits.Circuit(m) for m in routed_ops), initial_mapping, - {initial_mapping[k]: v for k, v in mm.map.items()}, + { + initial_mapping[mm.int_to_logical_qid[k]]: mm.int_to_physical_qid[v] + for k, v in enumerate(mm.logical_to_physical) + }, ) @classmethod @@ -288,19 +289,29 @@ def _route( Returns: a list of lists corresponding to timesteps of the routed circuit. """ + two_qubit_ops_ints: List[List[QidIntPair]] = [ + [ + (mm.logical_qid_to_int[op.qubits[0]], mm.logical_qid_to_int[op.qubits[1]]) + for op in timestep_ops + ] + for timestep_ops in two_qubit_ops + ] + routed_ops: List[List['cirq.Operation']] = [] def process_executable_two_qubit_ops(timestep: int) -> int: - unexecutable_ops = [] - for op in two_qubit_ops[timestep]: - if mm.can_execute(op): + unexecutable_ops: List['cirq.Operation'] = [] + unexecutable_ops_ints: List[QidIntPair] = [] + for op, op_ints in zip(two_qubit_ops[timestep], two_qubit_ops_ints[timestep]): + if mm.is_adjacent(*op_ints): routed_ops[timestep].append(mm.mapped_op(op)) else: unexecutable_ops.append(op) + unexecutable_ops_ints.append(op_ints) two_qubit_ops[timestep] = unexecutable_ops + two_qubit_ops_ints[timestep] = unexecutable_ops_ints return len(unexecutable_ops) strats = [cls._choose_single_swap, cls._choose_pair_of_swaps] - routed_ops: List[List['cirq.Operation']] = [] for timestep in range(len(two_qubit_ops)): # Add single-qubit ops with qubits given by the current mapping. @@ -308,22 +319,24 @@ def process_executable_two_qubit_ops(timestep: int) -> int: # swaps applied in the current timestep thus far. This ensures the same swaps # don't get executed twice in the same timestep. - seen: Set[Tuple[Tuple['cirq.Qid', cirq.Qid], ...]] = set() + seen: Set[Tuple[QidIntPair, ...]] = set() while process_executable_two_qubit_ops(timestep): - chosen_swaps: Optional[Tuple[QidPair, ...]] = None + chosen_swaps: Optional[Tuple[QidIntPair, ...]] = None for strat in strats: - chosen_swaps = strat(mm, two_qubit_ops, timestep, lookahead_radius) + chosen_swaps = strat(mm, two_qubit_ops_ints, timestep, lookahead_radius) if chosen_swaps is not None: break if chosen_swaps is None or chosen_swaps in seen: - chosen_swaps = cls._brute_force_strategy(mm, two_qubit_ops, timestep) + chosen_swaps = cls._brute_force_strategy(mm, two_qubit_ops_ints, timestep) else: seen.add(chosen_swaps) for swap in chosen_swaps: - inserted_swap = mm.mapped_op(ops.SWAP(*swap)) + inserted_swap = mm.mapped_op( + ops.SWAP(mm.int_to_logical_qid[swap[0]], mm.int_to_logical_qid[swap[1]]) + ) if tag_inserted_swaps: inserted_swap = inserted_swap.with_tags(ops.RoutingSwapTag()) routed_ops[timestep].append(inserted_swap) @@ -335,56 +348,58 @@ def process_executable_two_qubit_ops(timestep: int) -> int: def _brute_force_strategy( cls, mm: mapping_manager.MappingManager, - two_qubit_ops: List[List['cirq.Operation']], + two_qubit_ops_ints: Sequence[Sequence[QidIntPair]], timestep: int, - ) -> Tuple[QidPair, ...]: + ) -> Tuple[QidIntPair, ...]: """Inserts SWAPS along the shortest path of the qubits that are the farthest. Since swaps along the shortest path are being executed one after the other, in order to achieve the physical swaps (M[q1], M[q2]), (M[q2], M[q3]), ..., (M[q_{i-1}], M[q_i]), we must execute the logical swaps (q1, q2), (q1, q3), ..., (q_1, qi). """ - furthest_op = max(two_qubit_ops[timestep], key=lambda op: mm.dist_on_device(*op.qubits)) - path = mm.shortest_path(*furthest_op.qubits) + furthest_op = max(two_qubit_ops_ints[timestep], key=lambda op: mm.dist_on_device(*op)) + path = mm.shortest_path(*furthest_op) return tuple([(path[0], path[i + 1]) for i in range(len(path) - 2)]) @classmethod def _choose_pair_of_swaps( cls, mm: mapping_manager.MappingManager, - two_qubit_ops: List[List['cirq.Operation']], + two_qubit_ops_ints: Sequence[Sequence[QidIntPair]], timestep: int, lookahead_radius: int, - ) -> Optional[Tuple[QidPair, ...]]: + ) -> Optional[Tuple[QidIntPair, ...]]: """Computes cost function with pairs of candidate swaps that act on disjoint qubits.""" - pair_sigma = disjoint_nc2_combinations( - cls._initial_candidate_swaps(mm, two_qubit_ops[timestep]) + pair_sigma = _disjoint_nc2_combinations( + cls._initial_candidate_swaps(mm, two_qubit_ops_ints[timestep]) + ) + return cls._choose_optimal_swap( + mm, two_qubit_ops_ints, timestep, lookahead_radius, pair_sigma ) - return cls._choose_optimal_swap(mm, two_qubit_ops, timestep, lookahead_radius, pair_sigma) @classmethod def _choose_single_swap( cls, mm: mapping_manager.MappingManager, - two_qubit_ops: List[List['cirq.Operation']], + two_qubit_ops_ints: Sequence[Sequence[QidIntPair]], timestep: int, lookahead_radius: int, - ) -> Optional[Tuple[QidPair, ...]]: + ) -> Optional[Tuple[QidIntPair, ...]]: """Computes cost function with list of single candidate swaps.""" - sigma: List[Tuple[QidPair, ...]] = [ - (swap,) for swap in cls._initial_candidate_swaps(mm, two_qubit_ops[timestep]) + sigma: List[Tuple[QidIntPair, ...]] = [ + (swap,) for swap in cls._initial_candidate_swaps(mm, two_qubit_ops_ints[timestep]) ] - return cls._choose_optimal_swap(mm, two_qubit_ops, timestep, lookahead_radius, sigma) + return cls._choose_optimal_swap(mm, two_qubit_ops_ints, timestep, lookahead_radius, sigma) @classmethod def _choose_optimal_swap( cls, mm: mapping_manager.MappingManager, - two_qubit_ops: List[List['cirq.Operation']], + two_qubit_ops_ints: Sequence[Sequence[QidIntPair]], timestep: int, lookahead_radius: int, - sigma: List[Tuple[QidPair, ...]], - ) -> Optional[Tuple[QidPair, ...]]: + sigma: Sequence[Tuple[QidIntPair, ...]], + ) -> Optional[Tuple[QidIntPair, ...]]: """Optionally returns the swap with minimum cost from a list of n-tuple candidate swaps. Computes a cost (as defined by the overridable function `_cost`) for each candidate swap @@ -393,37 +408,39 @@ def _choose_optimal_swap( timestep. Iterate this this looking ahead process up to the next `lookahead_radius` timesteps. If there still doesn't exist a unique swap with minial cost then returns None. """ - for s in range(timestep, min(lookahead_radius + timestep, len(two_qubit_ops))): + for s in range(timestep, min(lookahead_radius + timestep, len(two_qubit_ops_ints))): if len(sigma) <= 1: break costs = {} for swaps in sigma: - costs[swaps] = cls._cost(mm, swaps, two_qubit_ops[s]) + costs[swaps] = cls._cost(mm, swaps, two_qubit_ops_ints[s]) _, min_cost = min(costs.items(), key=lambda x: x[1]) sigma = [swaps for swaps, cost in costs.items() if cost == min_cost] return ( None - if len(sigma) > 1 and timestep + lookahead_radius <= len(two_qubit_ops) + if len(sigma) > 1 and timestep + lookahead_radius <= len(two_qubit_ops_ints) else sigma[0] ) @classmethod def _initial_candidate_swaps( - cls, mm: mapping_manager.MappingManager, two_qubit_ops: List['cirq.Operation'] - ) -> List[QidPair]: + cls, mm: mapping_manager.MappingManager, two_qubit_ops: Sequence[QidIntPair] + ) -> List[QidIntPair]: """Finds all feasible SWAPs between qubits involved in 2-qubit operations.""" - physical_qubits = (mm.map[op.qubits[i]] for op in two_qubit_ops for i in range(2)) - physical_swaps = mm.induced_subgraph.edges(nbunch=physical_qubits) - return [(mm.inverse_map[q1], mm.inverse_map[q2]) for q1, q2 in physical_swaps] + physical_qubits = (mm.logical_to_physical[lq[i]] for lq in two_qubit_ops for i in range(2)) + physical_swaps = mm.induced_subgraph_int.edges(nbunch=physical_qubits) + return [ + (mm.physical_to_logical[q1], mm.physical_to_logical[q2]) for q1, q2 in physical_swaps + ] @classmethod def _cost( cls, mm: mapping_manager.MappingManager, - swaps: Tuple[QidPair, ...], - two_qubit_ops: List['cirq.Operation'], + swaps: Tuple[QidIntPair, ...], + two_qubit_ops: Sequence[QidIntPair], ) -> Any: """Computes the cost function for the given list of swaps over the current timestep ops. @@ -433,9 +450,8 @@ def _cost( for swap in swaps: mm.apply_swap(*swap) max_length, sum_length = 0, 0 - for op in two_qubit_ops: - q1, q2 = op.qubits - dist = mm.dist_on_device(q1, q2) + for lq in two_qubit_ops: + dist = mm.dist_on_device(*lq) max_length = max(max_length, dist) sum_length += dist for swap in swaps: diff --git a/dev_tools/notebooks/isolated_notebook_test.py b/dev_tools/notebooks/isolated_notebook_test.py index 521ab3b3bda..c131994a7b9 100644 --- a/dev_tools/notebooks/isolated_notebook_test.py +++ b/dev_tools/notebooks/isolated_notebook_test.py @@ -55,6 +55,8 @@ 'docs/noise/qcvv/xeb_calibration_example.ipynb', 'docs/named_topologies.ipynb', 'docs/start/intro.ipynb', + # Circuit routing + 'docs/transform/routing_transformer.ipynb', ] # By default all notebooks should be tested, however, this list contains exceptions to the rule diff --git a/docs/transform/routing_transformer.ipynb b/docs/transform/routing_transformer.ipynb index 39cb1827148..fc7695965a2 100644 --- a/docs/transform/routing_transformer.ipynb +++ b/docs/transform/routing_transformer.ipynb @@ -69,7 +69,8 @@ "id": "RrRN9ilV0Ltg" }, "source": [ - "## Setup" + "## Setup\n", + "Note: this notebook relies on unreleased Cirq features. If you want to try these features, make sure you install cirq via `pip install cirq --pre`." ] }, { @@ -399,22 +400,22 @@ }, "outputs": [], "source": [ - "from typing import Tuple, List, Any\n", + "from typing import Tuple, Sequence, Any\n", "\n", - "QidPair = QidPair = Tuple['cirq.Qid', 'cirq.Qid']\n", + "QidIntPair = Tuple[int, int]\n", "\n", "class RouteCQCSimpleCostFunction(cirq.RouteCQC):\n", " @classmethod\n", " def _cost(\n", " cls,\n", " mm: cirq.MappingManager,\n", - " swaps: Tuple[QidPair, ...],\n", - " two_qubit_ops: List[cirq.Operation],\n", + " swaps: Tuple[QidIntPair, ...],\n", + " two_qubit_ops: Sequence[QidIntPair],\n", " ) -> Any:\n", " \"\"\"Computes the # of 2-qubit gates executable after applying SWAPs.\"\"\"\n", " for swap in swaps:\n", " mm.apply_swap(*swap)\n", - " ret = sum(1 for op in two_qubit_ops if mm.can_execute(op))\n", + " ret = sum(1 for op_ints in two_qubit_ops if mm.is_adjacent(*op_ints))\n", " for swap in swaps:\n", " mm.apply_swap(*swap)\n", " return ret"