Library of Unparameterizable Standard Gates Commutations (#11192)

* library of standard gates commutations

* Changed transpiler passes to use SessionCommutationChecker instead of a new CommutationChecker each time

* addresses code review

* change back to max_num_qubits=3 for commutation resolution

* up

* up

* fix lint

* Update commutation_checker.py

* improved documentation

* black

* Update releasenotes/notes/add-commutation-library-88b7ff65b3d35f9a.yaml

Co-authored-by: Alexander Ivrii <alexi@il.ibm.com>

* Update releasenotes/notes/add-commutation-library-88b7ff65b3d35f9a.yaml

Co-authored-by: Alexander Ivrii <alexi@il.ibm.com>

* move the commutation library generation to the root tools directory

* up

---------

Co-authored-by: Alexander Ivrii <alexi@il.ibm.com>
This commit is contained in:
Sebastian Brandhofer 2024-01-31 20:02:45 +01:00 committed by GitHub
parent 43ea026bab
commit eda0435895
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 3916 additions and 199 deletions

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,10 @@
"""Code from commutative_analysis pass that checks commutation relations between DAG nodes."""
from functools import lru_cache
from typing import List
from typing import List, Union
import numpy as np
from qiskit.circuit import Qubit
from qiskit.circuit.operation import Operation
from qiskit.circuit.controlflow import ControlFlowOp
from qiskit.quantum_info.operators import Operator
@ -37,32 +38,20 @@ class CommutationChecker:
evicting from the cache less useful entries, etc.
"""
def __init__(self):
def __init__(self, standard_gate_commutations: dict = None, cache_max_entries: int = 10**6):
super().__init__()
self.cache = {}
if standard_gate_commutations is None:
self._standard_commutations = {}
else:
self._standard_commutations = standard_gate_commutations
self._cache_max_entries = cache_max_entries
def _hashable_parameters(self, params):
"""Convert the parameters of a gate into a hashable format for lookup in a dictionary.
This aims to be fast in common cases, and is not intended to work outside of the lifetime of a
single commutation pass; it does not handle mutable state correctly if the state is actually
changed."""
try:
hash(params)
return params
except TypeError:
pass
if isinstance(params, (list, tuple)):
return tuple(self._hashable_parameters(x) for x in params)
if isinstance(params, np.ndarray):
# We trust that the arrays will not be mutated during the commutation pass, since nothing
# would work if they were anyway. Using the id can potentially cause some additional cache
# misses if two UnitaryGate instances are being compared that have been separately
# constructed to have the same underlying matrix, but in practice the cost of string-ifying
# the matrix to get a cache key is far more expensive than just doing a small matmul.
return (np.ndarray, id(params))
# Catch anything else with a slow conversion.
return ("fallback", str(params))
# self._cached_commutation has the same structure as standard_gate_commutations, i.e. a
# dict[pair of gate names][relative placement][tuple of gate parameters] := True/False
self._cached_commutations = {}
self._current_cache_entries = 0
self._cache_miss = 0
self._cache_hit = 0
def commute(
self,
@ -93,84 +82,322 @@ class CommutationChecker:
Returns:
bool: whether two operations commute.
"""
# pylint: disable=too-many-return-statements
structural_commutation = _commutation_precheck(
op1, qargs1, cargs1, op2, qargs2, cargs2, max_num_qubits
)
# We don't support commutation of conditional gates for now due to bugs in
# CommutativeCancellation. See gh-8553.
if (
getattr(op1, "condition", None) is not None
or getattr(op2, "condition", None) is not None
):
return False
if structural_commutation is not None:
return structural_commutation
# Commutation of ControlFlow gates also not supported yet. This may be
# pending a control flow graph.
if isinstance(op1, ControlFlowOp) or isinstance(op2, ControlFlowOp):
return False
first_op_tuple, second_op_tuple = _order_operations(
op1, qargs1, cargs1, op2, qargs2, cargs2
)
first_op, first_qargs, _ = first_op_tuple
second_op, second_qargs, _ = second_op_tuple
first_params = first_op.params
second_params = second_op.params
# These lines are adapted from dag_dependency and say that two gates over
# different quantum and classical bits necessarily commute. This is more
# permissive that the check from commutation_analysis, as for example it
# allows to commute X(1) and Measure(0, 0).
# Presumably this check was not present in commutation_analysis as
# it was only called on pairs of connected nodes from DagCircuit.
intersection_q = set(qargs1).intersection(set(qargs2))
intersection_c = set(cargs1).intersection(set(cargs2))
if not (intersection_q or intersection_c):
return True
commutation_lookup = self.check_commutation_entries(
first_op, first_qargs, second_op, second_qargs
)
# Skip the check if the number of qubits for either operation is too large
if len(qargs1) > max_num_qubits or len(qargs2) > max_num_qubits:
return False
if commutation_lookup is not None:
return commutation_lookup
# These lines are adapted from commutation_analysis, which is more restrictive than the
# check from dag_dependency when considering nodes with "_directive". It would be nice to
# think which optimizations from dag_dependency can indeed be used.
for op in [op1, op2]:
if (
getattr(op, "_directive", False)
or op.name in {"measure", "reset", "delay"}
or (getattr(op, "is_parameterized", False) and op.is_parameterized())
):
return False
# Compute commutation via matrix multiplication
is_commuting = _commute_matmul(first_op, first_qargs, second_op, second_qargs)
# The main code is adapted from commutative analysis.
# Assign indices to each of the qubits such that all `node1`'s qubits come first, followed by
# any _additional_ qubits `node2` addresses. This helps later when we need to compose one
# operator with the other, since we can easily expand `node1` with a suitable identity.
qarg = {q: i for i, q in enumerate(qargs1)}
num_qubits = len(qarg)
for q in qargs2:
if q not in qarg:
qarg[q] = num_qubits
num_qubits += 1
qarg1 = tuple(qarg[q] for q in qargs1)
qarg2 = tuple(qarg[q] for q in qargs2)
# Store result in this session's commutation_library
# TODO implement LRU cache or similar
# Rebuild cache if current cache exceeded max size
if self._current_cache_entries >= self._cache_max_entries:
self.clear_cached_commutations()
node1_key = (op1.name, self._hashable_parameters(op1.params), qarg1)
node2_key = (op2.name, self._hashable_parameters(op2.params), qarg2)
try:
# We only need to try one orientation of the keys, since if we've seen the compound key
# before, we've set it in both orientations.
return self.cache[node1_key, node2_key]
except KeyError:
pass
operator_1 = Operator(op1, input_dims=(2,) * len(qarg1), output_dims=(2,) * len(qarg1))
operator_2 = Operator(op2, input_dims=(2,) * len(qarg2), output_dims=(2,) * len(qarg2))
if qarg1 == qarg2:
# Use full composition if possible to get the fastest matmul paths.
op12 = operator_1.compose(operator_2)
op21 = operator_2.compose(operator_1)
if len(first_params) > 0 or len(second_params) > 0:
self._cached_commutations.setdefault((first_op.name, second_op.name), {}).setdefault(
_get_relative_placement(first_qargs, second_qargs), {}
)[
(_hashable_parameters(first_params), _hashable_parameters(second_params))
] = is_commuting
else:
# Expand operator_1 to be large enough to contain operator_2 as well; this relies on qargs1
# being the lowest possible indices so the identity can be tensored before it.
extra_qarg2 = num_qubits - len(qarg1)
if extra_qarg2:
id_op = _identity_op(extra_qarg2)
operator_1 = id_op.tensor(operator_1)
op12 = operator_1.compose(operator_2, qargs=qarg2, front=False)
op21 = operator_1.compose(operator_2, qargs=qarg2, front=True)
self.cache[node1_key, node2_key] = self.cache[node2_key, node1_key] = ret = op12 == op21
return ret
self._cached_commutations.setdefault((first_op.name, second_op.name), {})[
_get_relative_placement(first_qargs, second_qargs)
] = is_commuting
self._current_cache_entries += 1
return is_commuting
def num_cached_entries(self):
"""Returns number of cached entries"""
return self._current_cache_entries
def clear_cached_commutations(self):
"""Clears the dictionary holding cached commutations"""
self._current_cache_entries = 0
self._cache_miss = 0
self._cache_hit = 0
self._cached_commutations = {}
def check_commutation_entries(
self,
first_op: Operation,
first_qargs: List,
second_op: Operation,
second_qargs: List,
) -> Union[bool, None]:
"""Returns stored commutation relation if any
Args:
first_op: first operation.
first_qargs: first operation's qubits.
second_op: second operation.
second_qargs: second operation's qubits.
Return:
bool: True if the gates commute and false if it is not the case.
"""
# We don't precompute commutations for parameterized gates, yet
commutation = _query_commutation(
first_op,
first_qargs,
second_op,
second_qargs,
self._standard_commutations,
)
if commutation is not None:
return commutation
commutation = _query_commutation(
first_op,
first_qargs,
second_op,
second_qargs,
self._cached_commutations,
)
if commutation is None:
self._cache_miss += 1
else:
self._cache_hit += 1
return commutation
def _hashable_parameters(params):
"""Convert the parameters of a gate into a hashable format for lookup in a dictionary."""
try:
hash(params)
return params
except TypeError:
pass
if isinstance(params, (list, tuple)):
return tuple(_hashable_parameters(x) for x in params)
if isinstance(params, np.ndarray):
# Using the bytes of the matrix as key is runtime efficient but requires more space: 128 bits
# times the number of parameters instead of a single 64 bit id. However, by using the bytes as
# an id, we can reuse the cached commutations between different passes.
return (np.ndarray, params.tobytes())
# Catch anything else with a slow conversion.
return ("fallback", str(params))
_skipped_op_names = {"measure", "reset", "delay"}
def _commutation_precheck(
op1: Operation,
qargs1: List,
cargs1: List,
op2: Operation,
qargs2: List,
cargs2: List,
max_num_qubits,
):
# pylint: disable=too-many-return-statements
# We don't support commutation of conditional gates for now due to bugs in
# CommutativeCancellation. See gh-8553.
if getattr(op1, "condition", None) is not None or getattr(op2, "condition", None) is not None:
return False
# Commutation of ControlFlow gates also not supported yet. This may be
# pending a control flow graph.
if isinstance(op1, ControlFlowOp) or isinstance(op2, ControlFlowOp):
return False
# These lines are adapted from dag_dependency and say that two gates over
# different quantum and classical bits necessarily commute. This is more
# permissive that the check from commutation_analysis, as for example it
# allows to commute X(1) and Measure(0, 0).
# Presumably this check was not present in commutation_analysis as
# it was only called on pairs of connected nodes from DagCircuit.
intersection_q = set(qargs1).intersection(set(qargs2))
intersection_c = set(cargs1).intersection(set(cargs2))
if not (intersection_q or intersection_c):
return True
# Skip the check if the number of qubits for either operation is too large
if len(qargs1) > max_num_qubits or len(qargs2) > max_num_qubits:
return False
# These lines are adapted from commutation_analysis, which is more restrictive than the
# check from dag_dependency when considering nodes with "_directive". It would be nice to
# think which optimizations from dag_dependency can indeed be used.
if op1.name in _skipped_op_names or op2.name in _skipped_op_names:
return False
if getattr(op1, "_directive", False) or getattr(op2, "_directive", False):
return False
if (getattr(op1, "is_parameterized", False) and op1.is_parameterized()) or (
getattr(op2, "is_parameterized", False) and op2.is_parameterized()
):
return False
return None
def _get_relative_placement(first_qargs: List[Qubit], second_qargs: List[Qubit]) -> tuple:
"""Determines the relative qubit placement of two gates. Note: this is NOT symmetric.
Args:
first_qargs (DAGOpNode): first gate
second_qargs (DAGOpNode): second gate
Return:
A tuple that describes the relative qubit placement. The relative placement is defined by the
gate qubit arrangements as q2^{-1}[q1[i]] where q1[i] is the ith qubit of the first gate and
q2^{-1}[q] returns the qubit index of qubit q in the second gate (possibly 'None'). E.g.
_get_relative_placement(CX(0, 1), CX(1, 2)) would return (None, 0) as there is no overlap on
the first qubit of the first gate but there is an overlap on the second qubit of the first gate,
i.e. qubit 0 of the second gate. Likewise, _get_relative_placement(CX(1, 2), CX(0, 1)) would
return (1, None)
"""
qubits_g2 = {q_g1: i_g1 for i_g1, q_g1 in enumerate(second_qargs)}
return tuple(qubits_g2.get(q_g0, None) for q_g0 in first_qargs)
@lru_cache(maxsize=10**3)
def _persistent_id(op_name: str) -> int:
"""Returns an integer id of a string that is persistent over different python executions (note that
hash() can not be used, i.e. its value can change over two python executions)
Args:
op_name (str): The string whose integer id should be determined.
Return:
The integer id of the input string.
"""
return int.from_bytes(bytes(op_name, encoding="ascii"), byteorder="big", signed=True)
def _order_operations(
op1: Operation, qargs1: List, cargs1: List, op2: Operation, qargs2: List, cargs2: List
):
"""Orders two operations in a canonical way that is persistent over
@different python versions and executions
Args:
op1: first operation.
qargs1: first operation's qubits.
cargs1: first operation's clbits.
op2: second operation.
qargs2: second operation's qubits.
cargs2: second operation's clbits.
Return:
The input operations in a persistent, canonical order.
"""
op1_tuple = (op1, qargs1, cargs1)
op2_tuple = (op2, qargs2, cargs2)
least_qubits_op, most_qubits_op = (
(op1_tuple, op2_tuple) if op1.num_qubits < op2.num_qubits else (op2_tuple, op1_tuple)
)
# prefer operation with the least number of qubits as first key as this results in shorter keys
if op1.num_qubits != op2.num_qubits:
return least_qubits_op, most_qubits_op
else:
return (
(op1_tuple, op2_tuple)
if _persistent_id(op1.name) < _persistent_id(op2.name)
else (op2_tuple, op1_tuple)
)
def _query_commutation(
first_op: Operation,
first_qargs: List,
second_op: Operation,
second_qargs: List,
_commutation_lib: dict,
) -> Union[bool, None]:
"""Queries and returns the commutation of a pair of operations from a provided commutation library
Args:
first_op: first operation.
first_qargs: first operation's qubits.
first_cargs: first operation's clbits.
second_op: second operation.
second_qargs: second operation's qubits.
second_cargs: second operation's clbits.
_commutation_lib (dict): dictionary of commutation relations
Return:
True if first_op and second_op commute, False if they do not commute and
None if the commutation is not in the library
"""
commutation = _commutation_lib.get((first_op.name, second_op.name), None)
# Return here if the commutation is constant over all relative placements of the operations
if commutation is None or isinstance(commutation, bool):
return commutation
# If we arrive here, there is an entry in the commutation library but it depends on the
# placement of the operations and also possibly on operation parameters
if isinstance(commutation, dict):
commutation_after_placement = commutation.get(
_get_relative_placement(first_qargs, second_qargs), None
)
# if we have another dict in commutation_after_placement, commutation depends on params
if isinstance(commutation_after_placement, dict):
# Param commutation entry exists and must be a dict
return commutation_after_placement.get(
(_hashable_parameters(first_op.params), _hashable_parameters(second_op.params)),
None,
)
else:
# queried commutation is True, False or None
return commutation_after_placement
else:
raise ValueError("Expected commutation to be None, bool or a dict")
def _commute_matmul(
first_ops: Operation, first_qargs: List, second_op: Operation, second_qargs: List
):
qarg = {q: i for i, q in enumerate(first_qargs)}
num_qubits = len(qarg)
for q in second_qargs:
if q not in qarg:
qarg[q] = num_qubits
num_qubits += 1
first_qarg = tuple(qarg[q] for q in first_qargs)
second_qarg = tuple(qarg[q] for q in second_qargs)
operator_1 = Operator(
first_ops, input_dims=(2,) * len(first_qarg), output_dims=(2,) * len(first_qarg)
)
operator_2 = Operator(
second_op, input_dims=(2,) * len(second_qarg), output_dims=(2,) * len(second_qarg)
)
if first_qarg == second_qarg:
# Use full composition if possible to get the fastest matmul paths.
op12 = operator_1.compose(operator_2)
op21 = operator_2.compose(operator_1)
else:
# Expand operator_1 to be large enough to contain operator_2 as well; this relies on qargs1
# being the lowest possible indices so the identity can be tensored before it.
extra_qarg2 = num_qubits - len(first_qarg)
if extra_qarg2:
id_op = _identity_op(extra_qarg2)
operator_1 = id_op.tensor(operator_1)
op12 = operator_1.compose(operator_2, qargs=second_qarg, front=False)
op21 = operator_1.compose(operator_2, qargs=second_qarg, front=True)
ret = op12 == op21
return ret

View File

@ -0,0 +1,20 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Provides a commutation checker that caches the determined commutation results during this session """
from qiskit.circuit import CommutationChecker
from qiskit.circuit._standard_gates_commutations import standard_gates_commutations
StandardGateCommutations = standard_gates_commutations
SessionCommutationChecker = CommutationChecker(StandardGateCommutations)

View File

@ -19,12 +19,12 @@ from collections import OrderedDict, defaultdict
import rustworkx as rx
from qiskit.circuit.commutation_library import SessionCommutationChecker as scc
from qiskit.circuit.controlflow import condition_resources
from qiskit.circuit.quantumregister import QuantumRegister, Qubit
from qiskit.circuit.classicalregister import ClassicalRegister, Clbit
from qiskit.dagcircuit.exceptions import DAGDependencyError
from qiskit.dagcircuit.dagdepnode import DAGDepNode
from qiskit.circuit.commutation_checker import CommutationChecker
# ToDo: DagDependency needs to be refactored:
@ -112,7 +112,7 @@ class DAGDependency:
self.duration = None
self.unit = "dt"
self.comm_checker = CommutationChecker()
self.comm_checker = scc
@property
def global_phase(self):

View File

@ -14,9 +14,9 @@
from collections import defaultdict
from qiskit.circuit.commutation_library import SessionCommutationChecker as scc
from qiskit.dagcircuit import DAGOpNode
from qiskit.transpiler.basepasses import AnalysisPass
from qiskit.circuit.commutation_checker import CommutationChecker
class CommutationAnalysis(AnalysisPass):
@ -29,7 +29,7 @@ class CommutationAnalysis(AnalysisPass):
def __init__(self):
super().__init__()
self.comm_checker = CommutationChecker()
self.comm_checker = scc
def run(self, dag):
"""Run the CommutationAnalysis pass on `dag`.

View File

@ -11,13 +11,11 @@
# that they have been altered from the originals.
"""Cancel pairs of inverse gates exploiting commutation relations."""
from qiskit.circuit.commutation_library import SessionCommutationChecker as scc
from qiskit.dagcircuit import DAGCircuit, DAGOpNode
from qiskit.quantum_info import Operator
from qiskit.quantum_info.operators.predicates import matrix_equal
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.circuit.commutation_checker import CommutationChecker
class CommutativeInverseCancellation(TransformationPass):
@ -94,8 +92,8 @@ class CommutativeInverseCancellation(TransformationPass):
removed = [False for _ in range(circ_size)]
cc = scc
phase_update = 0
cc = CommutationChecker()
for idx1 in range(0, circ_size):
if self._skip_node(topo_sorted_nodes[idx1]):

View File

@ -0,0 +1,13 @@
---
features:
- |
Adds a commutation library to the :class:`.CommutationChecker`. The commutation library stores all
commutation relations of unparameterizable standard gates into a dictionary that allows for efficient
lookup at runtime. Furthermore, the :class:`.CommutationChecker` was refactored and an upper limit was
set to the number of cached commutation relations that are not in the commutation library. A session
commutation checker was added, that can be used to cache commutations computed during one qiskit
execution. Addresses `#8020 <https://github.com/Qiskit/qiskit-terra/issues/8020>_` and
`#7101 <https://github.com/Qiskit/qiskit-terra/issues/7101>_`
- |
Adds a `SessionCommutationChecker`, i.e. a commutation checker with commutations that are cached
consistently during the runtime of a python execution.

View File

@ -18,7 +18,8 @@ import numpy as np
from qiskit import ClassicalRegister
from qiskit.circuit import QuantumRegister, Parameter, Qubit
from qiskit.circuit import CommutationChecker
from qiskit.circuit.commutation_library import SessionCommutationChecker as scc
from qiskit.circuit.library import (
ZGate,
XGate,
@ -41,42 +42,40 @@ class TestCommutationChecker(QiskitTestCase):
"""Check simple commutation relations between gates, experimenting with
different orders of gates, different orders of qubits, different sets of
qubits over which gates are defined, and so on."""
comm_checker = CommutationChecker()
# should commute
res = comm_checker.commute(ZGate(), [0], [], CXGate(), [0, 1], [])
res = scc.commute(ZGate(), [0], [], CXGate(), [0, 1], [])
self.assertTrue(res)
# should not commute
res = comm_checker.commute(ZGate(), [1], [], CXGate(), [0, 1], [])
res = scc.commute(ZGate(), [1], [], CXGate(), [0, 1], [])
self.assertFalse(res)
# should not commute
res = comm_checker.commute(XGate(), [0], [], CXGate(), [0, 1], [])
res = scc.commute(XGate(), [0], [], CXGate(), [0, 1], [])
self.assertFalse(res)
# should commute
res = comm_checker.commute(XGate(), [1], [], CXGate(), [0, 1], [])
res = scc.commute(XGate(), [1], [], CXGate(), [0, 1], [])
self.assertTrue(res)
# should not commute
res = comm_checker.commute(XGate(), [1], [], CXGate(), [1, 0], [])
res = scc.commute(XGate(), [1], [], CXGate(), [1, 0], [])
self.assertFalse(res)
# should commute
res = comm_checker.commute(XGate(), [0], [], CXGate(), [1, 0], [])
res = scc.commute(XGate(), [0], [], CXGate(), [1, 0], [])
self.assertTrue(res)
# should commute
res = comm_checker.commute(CXGate(), [1, 0], [], XGate(), [0], [])
res = scc.commute(CXGate(), [1, 0], [], XGate(), [0], [])
self.assertTrue(res)
# should not commute
res = comm_checker.commute(CXGate(), [1, 0], [], XGate(), [1], [])
res = scc.commute(CXGate(), [1, 0], [], XGate(), [1], [])
self.assertFalse(res)
# should commute
res = comm_checker.commute(
res = scc.commute(
CXGate(),
[1, 0],
[],
@ -87,7 +86,7 @@ class TestCommutationChecker(QiskitTestCase):
self.assertTrue(res)
# should not commute
res = comm_checker.commute(
res = scc.commute(
CXGate(),
[1, 0],
[],
@ -98,7 +97,7 @@ class TestCommutationChecker(QiskitTestCase):
self.assertFalse(res)
# should commute
res = comm_checker.commute(
res = scc.commute(
CXGate(),
[1, 0],
[],
@ -109,7 +108,7 @@ class TestCommutationChecker(QiskitTestCase):
self.assertTrue(res)
# should not commute
res = comm_checker.commute(
res = scc.commute(
CXGate(),
[1, 0],
[],
@ -120,7 +119,7 @@ class TestCommutationChecker(QiskitTestCase):
self.assertFalse(res)
# should commute
res = comm_checker.commute(
res = scc.commute(
CXGate(),
[1, 0],
[],
@ -130,74 +129,123 @@ class TestCommutationChecker(QiskitTestCase):
)
self.assertTrue(res)
res = comm_checker.commute(XGate(), [2], [], CCXGate(), [0, 1, 2], [])
res = scc.commute(XGate(), [2], [], CCXGate(), [0, 1, 2], [])
self.assertTrue(res)
res = comm_checker.commute(CCXGate(), [0, 1, 2], [], CCXGate(), [0, 2, 1], [])
res = scc.commute(CCXGate(), [0, 1, 2], [], CCXGate(), [0, 2, 1], [])
self.assertFalse(res)
def test_passing_quantum_registers(self):
"""Check that passing QuantumRegisters works correctly."""
comm_checker = CommutationChecker()
qr = QuantumRegister(4)
# should commute
res = comm_checker.commute(CXGate(), [qr[1], qr[0]], [], CXGate(), [qr[1], qr[2]], [])
res = scc.commute(CXGate(), [qr[1], qr[0]], [], CXGate(), [qr[1], qr[2]], [])
self.assertTrue(res)
# should not commute
res = comm_checker.commute(CXGate(), [qr[0], qr[1]], [], CXGate(), [qr[1], qr[2]], [])
res = scc.commute(CXGate(), [qr[0], qr[1]], [], CXGate(), [qr[1], qr[2]], [])
self.assertFalse(res)
def test_standard_gates_commutations(self):
"""Check that commutativity checker uses standard gates commutations as expected."""
scc.clear_cached_commutations()
scc.clear_cached_commutations()
res = scc.commute(ZGate(), [0], [], CXGate(), [0, 1], [])
self.assertTrue(res)
self.assertEqual(scc.num_cached_entries(), 0)
def test_caching_positive_results(self):
"""Check that hashing positive results in commutativity checker works as expected."""
scc.clear_cached_commutations()
NewGateCX = type("MyClass", (CXGate,), {"content": {}})
NewGateCX.name = "cx_new"
comm_checker = CommutationChecker()
res = comm_checker.commute(ZGate(), [0], [], CXGate(), [0, 1], [])
res = scc.commute(ZGate(), [0], [], NewGateCX(), [0, 1], [])
self.assertTrue(res)
self.assertGreater(len(comm_checker.cache), 0)
self.assertGreater(len(scc._cached_commutations), 0)
def test_caching_lookup_with_non_overlapping_qubits(self):
"""Check that commutation lookup with non-overlapping qubits works as expected."""
scc.clear_cached_commutations()
res = scc.commute(CXGate(), [0, 2], [], CXGate(), [0, 1], [])
self.assertTrue(res)
res = scc.commute(CXGate(), [0, 1], [], CXGate(), [1, 2], [])
self.assertFalse(res)
self.assertEqual(len(scc._cached_commutations), 0)
def test_caching_store_and_lookup_with_non_overlapping_qubits(self):
"""Check that commutations storing and lookup with non-overlapping qubits works as expected."""
cc_lenm = scc.num_cached_entries()
NewGateCX = type("MyClass", (CXGate,), {"content": {}})
NewGateCX.name = "cx_new"
res = scc.commute(NewGateCX(), [0, 2], [], CXGate(), [0, 1], [])
self.assertTrue(res)
res = scc.commute(NewGateCX(), [0, 1], [], CXGate(), [1, 2], [])
self.assertFalse(res)
res = scc.commute(NewGateCX(), [1, 4], [], CXGate(), [1, 6], [])
self.assertTrue(res)
res = scc.commute(NewGateCX(), [5, 3], [], CXGate(), [3, 1], [])
self.assertFalse(res)
self.assertEqual(scc.num_cached_entries(), cc_lenm + 2)
def test_caching_negative_results(self):
"""Check that hashing negative results in commutativity checker works as expected."""
scc.clear_cached_commutations()
NewGateCX = type("MyClass", (CXGate,), {"content": {}})
NewGateCX.name = "cx_new"
comm_checker = CommutationChecker()
res = comm_checker.commute(XGate(), [0], [], CXGate(), [0, 1], [])
res = scc.commute(XGate(), [0], [], NewGateCX(), [0, 1], [])
self.assertFalse(res)
self.assertGreater(len(comm_checker.cache), 0)
self.assertGreater(len(scc._cached_commutations), 0)
def test_caching_different_qubit_sets(self):
"""Check that hashing same commutativity results over different qubit sets works as expected."""
comm_checker = CommutationChecker()
scc.clear_cached_commutations()
NewGateCX = type("MyClass", (CXGate,), {"content": {}})
NewGateCX.name = "cx_new"
# All the following should be cached in the same way
# though each relation gets cached twice: (A, B) and (B, A)
comm_checker.commute(XGate(), [0], [], CXGate(), [0, 1], [])
comm_checker.commute(XGate(), [10], [], CXGate(), [10, 20], [])
comm_checker.commute(XGate(), [10], [], CXGate(), [10, 5], [])
comm_checker.commute(XGate(), [5], [], CXGate(), [5, 7], [])
self.assertEqual(len(comm_checker.cache), 2)
scc.commute(XGate(), [0], [], NewGateCX(), [0, 1], [])
scc.commute(XGate(), [10], [], NewGateCX(), [10, 20], [])
scc.commute(XGate(), [10], [], NewGateCX(), [10, 5], [])
scc.commute(XGate(), [5], [], NewGateCX(), [5, 7], [])
self.assertEqual(len(scc._cached_commutations), 1)
self.assertEqual(scc._cache_miss, 1)
self.assertEqual(scc._cache_hit, 3)
def test_cache_with_param_gates(self):
"""Check commutativity between (non-parameterized) gates with parameters."""
scc.clear_cached_commutations()
res = scc.commute(RZGate(0), [0], [], XGate(), [0], [])
self.assertTrue(res)
res = scc.commute(RZGate(np.pi / 2), [0], [], XGate(), [0], [])
self.assertFalse(res)
res = scc.commute(RZGate(np.pi / 2), [0], [], RZGate(0), [0], [])
self.assertTrue(res)
res = scc.commute(RZGate(np.pi / 2), [1], [], XGate(), [1], [])
self.assertFalse(res)
self.assertEqual(scc.num_cached_entries(), 3)
self.assertEqual(scc._cache_miss, 3)
self.assertEqual(scc._cache_hit, 1)
def test_gates_with_parameters(self):
"""Check commutativity between (non-parameterized) gates with parameters."""
comm_checker = CommutationChecker()
res = comm_checker.commute(RZGate(0), [0], [], XGate(), [0], [])
res = scc.commute(RZGate(0), [0], [], XGate(), [0], [])
self.assertTrue(res)
res = comm_checker.commute(RZGate(np.pi / 2), [0], [], XGate(), [0], [])
res = scc.commute(RZGate(np.pi / 2), [0], [], XGate(), [0], [])
self.assertFalse(res)
res = comm_checker.commute(RZGate(np.pi / 2), [0], [], RZGate(0), [0], [])
res = scc.commute(RZGate(np.pi / 2), [0], [], RZGate(0), [0], [])
self.assertTrue(res)
def test_parameterized_gates(self):
"""Check commutativity between parameterized gates, both with free and with
bound parameters."""
comm_checker = CommutationChecker()
# gate that has parameters but is not considered parameterized
rz_gate = RZGate(np.pi / 2)
self.assertEqual(len(rz_gate.params), 1)
@ -215,23 +263,23 @@ class TestCommutationChecker(QiskitTestCase):
self.assertFalse(cx_gate.is_parameterized())
# We should detect that these gates commute
res = comm_checker.commute(rz_gate, [0], [], cx_gate, [0, 1], [])
res = scc.commute(rz_gate, [0], [], cx_gate, [0, 1], [])
self.assertTrue(res)
# We should detect that these gates commute
res = comm_checker.commute(rz_gate, [0], [], rz_gate, [0], [])
res = scc.commute(rz_gate, [0], [], rz_gate, [0], [])
self.assertTrue(res)
# We should detect that parameterized gates over disjoint qubit subsets commute
res = comm_checker.commute(rz_gate_theta, [0], [], rz_gate_theta, [1], [])
res = scc.commute(rz_gate_theta, [0], [], rz_gate_theta, [1], [])
self.assertTrue(res)
# We should detect that parameterized gates over disjoint qubit subsets commute
res = comm_checker.commute(rz_gate_theta, [0], [], rz_gate_phi, [1], [])
res = scc.commute(rz_gate_theta, [0], [], rz_gate_phi, [1], [])
self.assertTrue(res)
# We should detect that parameterized gates over disjoint qubit subsets commute
res = comm_checker.commute(rz_gate_theta, [2], [], cx_gate, [1, 3], [])
res = scc.commute(rz_gate_theta, [2], [], cx_gate, [1, 3], [])
self.assertTrue(res)
# However, for now commutativity checker should return False when checking
@ -239,114 +287,93 @@ class TestCommutationChecker(QiskitTestCase):
# the two gates are over intersecting qubit subsets.
# This check should be changed if commutativity checker is extended to
# handle parameterized gates better.
res = comm_checker.commute(rz_gate_theta, [0], [], cx_gate, [0, 1], [])
res = scc.commute(rz_gate_theta, [0], [], cx_gate, [0, 1], [])
self.assertFalse(res)
res = comm_checker.commute(rz_gate_theta, [0], [], rz_gate, [0], [])
res = scc.commute(rz_gate_theta, [0], [], rz_gate, [0], [])
self.assertFalse(res)
def test_measure(self):
"""Check commutativity involving measures."""
comm_checker = CommutationChecker()
# Measure is over qubit 0, while gate is over a disjoint subset of qubits
# We should be able to swap these.
res = comm_checker.commute(Measure(), [0], [0], CXGate(), [1, 2], [])
res = scc.commute(Measure(), [0], [0], CXGate(), [1, 2], [])
self.assertTrue(res)
# Measure and gate have intersecting set of qubits
# We should not be able to swap these.
res = comm_checker.commute(Measure(), [0], [0], CXGate(), [0, 2], [])
res = scc.commute(Measure(), [0], [0], CXGate(), [0, 2], [])
self.assertFalse(res)
# Measures over different qubits and clbits
res = comm_checker.commute(Measure(), [0], [0], Measure(), [1], [1])
res = scc.commute(Measure(), [0], [0], Measure(), [1], [1])
self.assertTrue(res)
# Measures over different qubits but same classical bit
# We should not be able to swap these.
res = comm_checker.commute(Measure(), [0], [0], Measure(), [1], [0])
res = scc.commute(Measure(), [0], [0], Measure(), [1], [0])
self.assertFalse(res)
# Measures over same qubits but different classical bit
# ToDo: can we swap these?
# Currently checker takes the safe approach and returns False.
res = comm_checker.commute(Measure(), [0], [0], Measure(), [0], [1])
res = scc.commute(Measure(), [0], [0], Measure(), [0], [1])
self.assertFalse(res)
def test_barrier(self):
"""Check commutativity involving barriers."""
comm_checker = CommutationChecker()
# A gate should not commute with a barrier
# (at least if these are over intersecting qubit sets).
res = comm_checker.commute(Barrier(4), [0, 1, 2, 3], [], CXGate(), [1, 2], [])
res = scc.commute(Barrier(4), [0, 1, 2, 3], [], CXGate(), [1, 2], [])
self.assertFalse(res)
# Does it even make sense to have a barrier over a subset of qubits?
# Though in this case, it probably makes sense to say that barrier and gate can be swapped.
res = comm_checker.commute(Barrier(4), [0, 1, 2, 3], [], CXGate(), [5, 6], [])
res = scc.commute(Barrier(4), [0, 1, 2, 3], [], CXGate(), [5, 6], [])
self.assertTrue(res)
def test_reset(self):
"""Check commutativity involving resets."""
comm_checker = CommutationChecker()
# A gate should not commute with reset when the qubits intersect.
res = comm_checker.commute(Reset(), [0], [], CXGate(), [0, 2], [])
res = scc.commute(Reset(), [0], [], CXGate(), [0, 2], [])
self.assertFalse(res)
# A gate should commute with reset when the qubits are disjoint.
res = comm_checker.commute(Reset(), [0], [], CXGate(), [1, 2], [])
res = scc.commute(Reset(), [0], [], CXGate(), [1, 2], [])
self.assertTrue(res)
def test_conditional_gates(self):
"""Check commutativity involving conditional gates."""
comm_checker = CommutationChecker()
qr = QuantumRegister(3)
cr = ClassicalRegister(2)
# Currently, in all cases commutativity checker should returns False.
# This is definitely suboptimal.
res = comm_checker.commute(
CXGate().c_if(cr[0], 0), [qr[0], qr[1]], [], XGate(), [qr[2]], []
)
res = scc.commute(CXGate().c_if(cr[0], 0), [qr[0], qr[1]], [], XGate(), [qr[2]], [])
self.assertFalse(res)
res = comm_checker.commute(
CXGate().c_if(cr[0], 0), [qr[0], qr[1]], [], XGate(), [qr[1]], []
)
res = scc.commute(CXGate().c_if(cr[0], 0), [qr[0], qr[1]], [], XGate(), [qr[1]], [])
self.assertFalse(res)
res = comm_checker.commute(
res = scc.commute(
CXGate().c_if(cr[0], 0), [qr[0], qr[1]], [], CXGate().c_if(cr[0], 0), [qr[0], qr[1]], []
)
self.assertFalse(res)
res = comm_checker.commute(
XGate().c_if(cr[0], 0), [qr[0]], [], XGate().c_if(cr[0], 1), [qr[0]], []
)
res = scc.commute(XGate().c_if(cr[0], 0), [qr[0]], [], XGate().c_if(cr[0], 1), [qr[0]], [])
self.assertFalse(res)
res = comm_checker.commute(XGate().c_if(cr[0], 0), [qr[0]], [], XGate(), [qr[0]], [])
res = scc.commute(XGate().c_if(cr[0], 0), [qr[0]], [], XGate(), [qr[0]], [])
self.assertFalse(res)
def test_complex_gates(self):
"""Check commutativity involving more complex gates."""
comm_checker = CommutationChecker()
lf1 = LinearFunction([[0, 1, 0], [1, 0, 0], [0, 0, 1]])
lf2 = LinearFunction([[1, 0, 0], [0, 0, 1], [0, 1, 0]])
# lf1 is equivalent to swap(0, 1), and lf2 to swap(1, 2).
# These do not commute.
res = comm_checker.commute(lf1, [0, 1, 2], [], lf2, [0, 1, 2], [])
res = scc.commute(lf1, [0, 1, 2], [], lf2, [0, 1, 2], [])
self.assertFalse(res)
lf3 = LinearFunction([[0, 1, 0], [0, 0, 1], [1, 0, 0]])
@ -354,27 +381,27 @@ class TestCommutationChecker(QiskitTestCase):
# lf3 is permutation 1->2, 2->3, 3->1.
# lf3 is the inverse permutation 1->3, 2->1, 3->2.
# These commute.
res = comm_checker.commute(lf3, [0, 1, 2], [], lf4, [0, 1, 2], [])
res = scc.commute(lf3, [0, 1, 2], [], lf4, [0, 1, 2], [])
self.assertTrue(res)
def test_c7x_gate(self):
"""Test wide gate works correctly."""
qargs = [Qubit() for _ in [None] * 8]
res = CommutationChecker().commute(XGate(), qargs[:1], [], XGate().control(7), qargs, [])
res = scc.commute(XGate(), qargs[:1], [], XGate().control(7), qargs, [])
self.assertFalse(res)
def test_wide_gates_over_nondisjoint_qubits(self):
"""Test that checking wide gates does not lead to memory problems."""
res = CommutationChecker().commute(MCXGate(29), list(range(30)), [], XGate(), [0], [])
res = scc.commute(MCXGate(29), list(range(30)), [], XGate(), [0], [])
self.assertFalse(res)
res = CommutationChecker().commute(XGate(), [0], [], MCXGate(29), list(range(30)), [])
res = scc.commute(XGate(), [0], [], MCXGate(29), list(range(30)), [])
self.assertFalse(res)
def test_wide_gates_over_disjoint_qubits(self):
"""Test that wide gates still commute when they are over disjoint sets of qubits."""
res = CommutationChecker().commute(MCXGate(29), list(range(30)), [], XGate(), [30], [])
res = scc.commute(MCXGate(29), list(range(30)), [], XGate(), [30], [])
self.assertTrue(res)
res = CommutationChecker().commute(XGate(), [30], [], MCXGate(29), list(range(30)), [])
res = scc.commute(XGate(), [30], [], MCXGate(29), list(range(30)), [])
self.assertTrue(res)

View File

@ -16,6 +16,7 @@
import unittest
import numpy as np
from qiskit.circuit.commutation_library import SessionCommutationChecker as scc
from qiskit import QuantumRegister, QuantumCircuit
from qiskit.circuit import Parameter
from qiskit.quantum_info import Operator
@ -779,6 +780,7 @@ class TestTemplateMatching(QiskitTestCase):
clifford_3_1(),
]
pm = PassManager(TemplateOptimization(template_list=template_list))
scc.clear_cached_commutations()
for seed in range(10):
qc = random_clifford_circuit(
num_qubits=5,
@ -788,6 +790,8 @@ class TestTemplateMatching(QiskitTestCase):
)
qc_opt = pm.run(qc)
self.assertTrue(Operator(qc) == Operator(qc_opt))
# All of these gates are in the commutation library, i.e. the cache should not be used
self.assertEqual(scc.num_cached_entries(), 0)
if __name__ == "__main__":

View File

@ -0,0 +1,164 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Determines a commutation library over the unparameterizable standard gates, i.e. a dictionary for
each pair of parameterizable standard gates and all qubit overlaps that maps to either True or False,
depending on the present commutation relation.
"""
import itertools
from functools import lru_cache
from typing import List
from qiskit.circuit.commutation_checker import _get_relative_placement, _order_operations
from qiskit.circuit import Gate, CommutationChecker
import qiskit.circuit.library.standard_gates as stdg
from qiskit.dagcircuit import DAGOpNode
@lru_cache(None)
def _get_unparameterizable_gates() -> List[Gate]:
"""Retrieve a list of non-parmaterized gates with up to 3 qubits, using the python inspection module
Return:
A list of non-parameterized gates to be considered in the commutation library
"""
# These two gates may require a large runtime in later processing steps
# blocked_types = [C3SXGate, C4XGate]
gates = list(stdg.get_standard_gate_name_mapping().values())
return [g for g in gates if len(g.params) == 0]
def _generate_commutation_dict(considered_gates: List[Gate] = None) -> dict:
"""Compute the commutation relation of considered gates
Args:
considered_gates List[Gate]: a list of gates between which the commutation should be determined
Return:
A dictionary that includes the commutation relation for each
considered pair of operations and each relative placement
"""
commutations = {}
cc = CommutationChecker()
for gate0 in considered_gates:
node0 = DAGOpNode(op=gate0, qargs=list(range(gate0.num_qubits)), cargs=[])
for gate1 in considered_gates:
# only consider canonical entries
(
(
first_gate,
_,
_,
),
(second_gate, _, _),
) = _order_operations(gate0, None, None, gate1, None, None)
if (first_gate, second_gate) != (gate0, gate1) and gate0.name != gate1.name:
continue
# enumerate all relative gate placements with overlap between gate qubits
gate_placements = itertools.permutations(
range(gate0.num_qubits + gate1.num_qubits - 1), gate0.num_qubits
)
gate_pair_commutation = {}
for permutation in gate_placements:
permutation_list = list(permutation)
gate1_qargs = []
# use idx_non_overlapping qubits to represent qubits on g1 that are not connected to g0
next_non_overlapping_qubit_idx = gate0.num_qubits
for i in range(gate1.num_qubits):
if i in permutation_list:
gate1_qargs.append(permutation_list.index(i))
else:
gate1_qargs.append(next_non_overlapping_qubit_idx)
next_non_overlapping_qubit_idx += 1
node1 = DAGOpNode(op=gate1, qargs=gate1_qargs, cargs=[])
# replace non-overlapping qubits with None to act as a key in the commutation library
relative_placement = _get_relative_placement(node0.qargs, node1.qargs)
if not gate0.is_parameterized() and not gate1.is_parameterized():
# if no gate includes parameters, compute commutation relation using
# matrix multiplication
op1 = node0.op
qargs1 = node0.qargs
cargs1 = node0.cargs
op2 = node1.op
qargs2 = node1.qargs
cargs2 = node1.cargs
commutation_relation = cc.commute(
op1, qargs1, cargs1, op2, qargs2, cargs2, max_num_qubits=4
)
else:
pass
# TODO
gate_pair_commutation[relative_placement] = commutation_relation
commutations[gate0.name, gate1.name] = gate_pair_commutation
return commutations
def _simplify_commuting_dict(commuting_dict: dict) -> dict:
"""Compress some of the commutation library entries
Args:
commuting_dict (dict): A commutation dictionary
Return:
commuting_dict (dict): A commutation dictionary with simplified entries
"""
# Remove relative placement key if commutation is independent of relative placement
for ops in commuting_dict.keys():
gates_commutations = set(commuting_dict[ops].values())
if len(gates_commutations) == 1:
commuting_dict[ops] = next(iter(gates_commutations))
return commuting_dict
def _dump_commuting_dict_as_python(
commutations: dict, file_name: str = "../_standard_gates_commutations.py"
):
"""Write commutation dictionary as python object to ./qiskit/circuit/_standard_gates_commutations.py.
Args:
commutations (dict): a dictionary that includes the commutation relation for
each considered pair of operations
"""
with open(file_name, "w") as fp:
dir_str = "standard_gates_commutations = {\n"
for k, v in commutations.items():
if not isinstance(v, dict):
dir_str += ' ("{}", "{}"): {},\n'.format(*k, v)
else:
dir_str += ' ("{}", "{}"): {{\n'.format(*k)
for entry_key, entry_val in v.items():
dir_str += " {}: {},\n".format(entry_key, entry_val)
dir_str += " },\n"
dir_str += "}\n"
fp.write(dir_str.replace("'", ""))
if __name__ == "__main__":
cgates = [
g for g in _get_unparameterizable_gates() if g.name not in ["reset", "measure", "delay"]
]
commutation_dict = _generate_commutation_dict(considered_gates=cgates)
commutation_dict = _simplify_commuting_dict(commutation_dict)
_dump_commuting_dict_as_python(commutation_dict)