qiskit/test/python/dagcircuit/test_collect_blocks.py

1028 lines
37 KiB
Python

# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Test functionality to collect, split and consolidate blocks from DAGCircuits."""
import unittest
import ddt
from qiskit import QuantumRegister, ClassicalRegister
from qiskit.converters import (
circuit_to_dag,
circuit_to_dagdependency,
circuit_to_instruction,
dag_to_circuit,
dagdependency_to_circuit,
)
from qiskit.circuit import QuantumCircuit, Measure, Clbit
from qiskit.dagcircuit.collect_blocks import BlockCollector, BlockSplitter, BlockCollapser
from test import QiskitTestCase # pylint: disable=wrong-import-order
@ddt.ddt
class TestCollectBlocks(QiskitTestCase):
"""Tests to verify correctness of collecting, splitting, and consolidating blocks
from DAGCircuit and DAGDependency. Additional tests appear as a part of
CollectLinearFunctions and CollectCliffords passes.
"""
def test_collect_gates_from_dagcircuit_1(self):
"""Test collecting CX gates from DAGCircuits."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.z(0)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name == "cx",
split_blocks=False,
min_block_size=2,
)
# The middle z-gate leads to two blocks of size 2 each
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 2)
self.assertEqual(len(blocks[1]), 2)
def test_collect_gates_from_dagcircuit_2(self):
"""Test collecting both CX and Z gates from DAGCircuits."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.z(0)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "z"],
split_blocks=False,
min_block_size=1,
)
# All of the gates are part of a single block
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
def test_collect_gates_from_dagcircuit_3(self):
"""Test collecting CX gates from DAGCircuits."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.z(0)
qc.cx(1, 3)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx"],
split_blocks=False,
min_block_size=1,
)
# We should end up with two CX blocks: even though there is "a path
# around z(0)", without commutativity analysis z(0) prevents from
# including all CX-gates into the same block
self.assertEqual(len(blocks), 2)
def test_collect_gates_from_dagdependency_1(self):
"""Test collecting CX gates from DAGDependency."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.z(0)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dagdependency(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name == "cx",
split_blocks=False,
min_block_size=1,
)
# The middle z-gate commutes with CX-gates, which leads to a single block of length 4
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 4)
def test_collect_gates_from_dagdependency_2(self):
"""Test collecting both CX and Z gates from DAGDependency."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.z(0)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dagdependency(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "z"],
split_blocks=False,
min_block_size=1,
)
# All the gates are part of a single block
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
def test_collect_and_split_gates_from_dagcircuit(self):
"""Test collecting and splitting blocks from DAGCircuit."""
qc = QuantumCircuit(6)
qc.cx(0, 1)
qc.cx(3, 5)
qc.cx(2, 4)
qc.swap(1, 0)
qc.cz(5, 3)
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: True,
split_blocks=False,
min_block_size=1,
)
# All the gates are part of a single block
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
# Split the first block into sub-blocks over disjoint qubit sets
# We should get 3 sub-blocks
split_blocks = BlockSplitter().run(blocks[0])
self.assertEqual(len(split_blocks), 3)
def test_collect_and_split_gates_from_dagdependency(self):
"""Test collecting and splitting blocks from DAGDependency."""
qc = QuantumCircuit(6)
qc.cx(0, 1)
qc.cx(3, 5)
qc.cx(2, 4)
qc.swap(1, 0)
qc.cz(5, 3)
block_collector = BlockCollector(circuit_to_dagdependency(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: True,
split_blocks=False,
min_block_size=1,
)
# All the gates are part of a single block
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
# Split the first block into sub-blocks over disjoint qubit sets
# We should get 3 sub-blocks
split_blocks = BlockSplitter().run(blocks[0])
self.assertEqual(len(split_blocks), 3)
def test_circuit_has_measure(self):
"""Test that block collection works properly when there is a measure in the
middle of the circuit."""
qc = QuantumCircuit(2, 1)
qc.cx(1, 0)
qc.x(0)
qc.x(1)
qc.measure(0, 0)
qc.x(0)
qc.cx(1, 0)
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"],
split_blocks=False,
min_block_size=1,
)
# measure prevents combining the two blocks
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 3)
self.assertEqual(len(blocks[1]), 2)
def test_circuit_has_measure_dagdependency(self):
"""Test that block collection works properly when there is a measure in the
middle of the circuit."""
qc = QuantumCircuit(2, 1)
qc.cx(1, 0)
qc.x(0)
qc.x(1)
qc.measure(0, 0)
qc.x(0)
qc.cx(1, 0)
block_collector = BlockCollector(circuit_to_dagdependency(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"],
split_blocks=False,
min_block_size=1,
)
# measure prevents combining the two blocks
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 3)
self.assertEqual(len(blocks[1]), 2)
def test_circuit_has_conditional_gates(self):
"""Test that block collection works properly when there the circuit
contains conditional gates."""
qc = QuantumCircuit(2, 1)
qc.x(0)
qc.x(1)
qc.cx(1, 0)
with self.assertWarns(DeprecationWarning):
qc.x(1).c_if(0, 1)
qc.x(0)
qc.x(1)
qc.cx(0, 1)
# If the filter_function does not look at conditions, we should collect all
# gates into the block.
block_collector = BlockCollector(circuit_to_dag(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"],
split_blocks=False,
min_block_size=1,
)
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 7)
# If the filter_function does look at conditions, we should not collect the middle
# conditional gate (note that x(1) following the measure is collected into the first
# block).
block_collector = BlockCollector(circuit_to_dag(qc))
with self.assertWarns(DeprecationWarning):
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"]
and not getattr(node.op, "condition", None),
split_blocks=False,
min_block_size=1,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 4)
self.assertEqual(len(blocks[1]), 2)
def test_circuit_has_conditional_gates_dagdependency(self):
"""Test that block collection works properly when there the circuit
contains conditional gates."""
qc = QuantumCircuit(2, 1)
qc.x(0)
qc.x(1)
qc.cx(1, 0)
with self.assertWarns(DeprecationWarning):
qc.x(1).c_if(0, 1)
qc.x(0)
qc.x(1)
qc.cx(0, 1)
# If the filter_function does not look at conditions, we should collect all
# gates into the block.
block_collector = BlockCollector(circuit_to_dagdependency(qc))
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"],
split_blocks=False,
min_block_size=1,
)
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 7)
# If the filter_function does look at conditions, we should not collect the middle
# conditional gate (note that x(1) following the measure is collected into the first
# block).
block_collector = BlockCollector(circuit_to_dag(qc))
with self.assertWarns(DeprecationWarning):
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["x", "cx"]
and not getattr(node.op, "condition", None),
split_blocks=False,
min_block_size=1,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 4)
self.assertEqual(len(blocks[1]), 2)
def test_multiple_collection_methods(self):
"""Test that block collection allows to collect blocks using several different
filter functions."""
qc = QuantumCircuit(5)
qc.cx(0, 1)
qc.cx(0, 2)
qc.swap(1, 4)
qc.swap(4, 3)
qc.z(0)
qc.z(1)
qc.z(2)
qc.z(3)
qc.z(4)
qc.swap(3, 4)
qc.cx(0, 3)
qc.cx(0, 4)
block_collector = BlockCollector(circuit_to_dag(qc))
linear_blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=1,
)
cx_blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx"],
split_blocks=False,
min_block_size=1,
)
swapz_blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["swap", "z"],
split_blocks=False,
min_block_size=1,
)
# We should end up with two linear blocks
self.assertEqual(len(linear_blocks), 2)
self.assertEqual(len(linear_blocks[0]), 4)
self.assertEqual(len(linear_blocks[1]), 3)
# We should end up with two cx blocks
self.assertEqual(len(cx_blocks), 2)
self.assertEqual(len(cx_blocks[0]), 2)
self.assertEqual(len(cx_blocks[1]), 2)
# We should end up with one swap,z blocks
self.assertEqual(len(swapz_blocks), 1)
self.assertEqual(len(swapz_blocks[0]), 8)
def test_min_block_size(self):
"""Test that the option min_block_size for collecting blocks works correctly."""
# original circuit
circuit = QuantumCircuit(2)
circuit.cx(0, 1)
circuit.h(0)
circuit.cx(0, 1)
circuit.cx(1, 0)
circuit.h(0)
circuit.cx(0, 1)
circuit.cx(1, 0)
circuit.cx(0, 1)
block_collector = BlockCollector(circuit_to_dag(circuit))
# When min_block_size = 1, we should obtain 3 linear blocks
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=1,
)
self.assertEqual(len(blocks), 3)
# When min_block_size = 2, we should obtain 2 linear blocks
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=2,
)
self.assertEqual(len(blocks), 2)
# When min_block_size = 3, we should obtain 1 linear block
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=3,
)
self.assertEqual(len(blocks), 1)
# When min_block_size = 4, we should obtain no linear blocks
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=4,
)
self.assertEqual(len(blocks), 0)
def test_split_blocks(self):
"""Test that splitting blocks of nodes into sub-blocks works correctly."""
# original circuit is linear
circuit = QuantumCircuit(5)
circuit.cx(0, 2)
circuit.cx(1, 4)
circuit.cx(2, 0)
circuit.cx(0, 3)
circuit.swap(3, 2)
circuit.swap(4, 1)
block_collector = BlockCollector(circuit_to_dag(circuit))
# If we do not split block into sub-blocks, we expect a single linear block.
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=2,
)
self.assertEqual(len(blocks), 1)
# If we do split block into sub-blocks, we expect two linear blocks:
# one over qubits {0, 2, 3}, and another over qubits {1, 4}.
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=True,
min_block_size=2,
)
self.assertEqual(len(blocks), 2)
def test_do_not_split_blocks(self):
"""Test that splitting blocks of nodes into sub-blocks works correctly."""
# original circuit is linear
circuit = QuantumCircuit(5)
circuit.cx(0, 3)
circuit.cx(0, 2)
circuit.cx(1, 4)
circuit.swap(4, 2)
block_collector = BlockCollector(circuit_to_dagdependency(circuit))
# Check that we have a single linear block
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=True,
min_block_size=1,
)
self.assertEqual(len(blocks), 1)
def test_collect_blocks_with_cargs(self):
"""Test collecting and collapsing blocks with classical bits appearing as cargs."""
qc = QuantumCircuit(3)
qc.h(0)
qc.h(1)
qc.h(2)
qc.measure_all()
dag = circuit_to_dag(qc)
# Collect all measure instructions
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: isinstance(node.op, Measure), split_blocks=False, min_block_size=1
)
# We should have a single block consisting of 3 measures
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 3)
self.assertEqual(blocks[0][0].op, Measure())
self.assertEqual(blocks[0][1].op, Measure())
self.assertEqual(blocks[0][2].op, Measure())
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 5)
self.assertEqual(collapsed_qc.data[0].operation.name, "h")
self.assertEqual(collapsed_qc.data[1].operation.name, "h")
self.assertEqual(collapsed_qc.data[2].operation.name, "h")
self.assertEqual(collapsed_qc.data[3].operation.name, "barrier")
self.assertEqual(collapsed_qc.data[4].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[4].operation.definition.num_qubits, 3)
self.assertEqual(collapsed_qc.data[4].operation.definition.num_clbits, 3)
def test_collect_blocks_with_cargs_dagdependency(self):
"""Test collecting and collapsing blocks with classical bits appearing as cargs,
using DAGDependency."""
qc = QuantumCircuit(3)
qc.h(0)
qc.h(1)
qc.h(2)
qc.measure_all()
dag = circuit_to_dagdependency(qc)
# Collect all measure instructions
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: isinstance(node.op, Measure), split_blocks=False, min_block_size=1
)
# We should have a single block consisting of 3 measures
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 3)
self.assertEqual(blocks[0][0].op, Measure())
self.assertEqual(blocks[0][1].op, Measure())
self.assertEqual(blocks[0][2].op, Measure())
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dagdependency_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 5)
self.assertEqual(collapsed_qc.data[0].operation.name, "h")
self.assertEqual(collapsed_qc.data[1].operation.name, "h")
self.assertEqual(collapsed_qc.data[2].operation.name, "h")
self.assertEqual(collapsed_qc.data[3].operation.name, "barrier")
self.assertEqual(collapsed_qc.data[4].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[4].operation.definition.num_qubits, 3)
self.assertEqual(collapsed_qc.data[4].operation.definition.num_clbits, 3)
def test_collect_blocks_with_clbits(self):
"""Test collecting and collapsing blocks with classical bits appearing under
condition."""
qc = QuantumCircuit(4, 3)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(0, 1)
qc.cx(2, 3)
qc.cx(1, 2)
qc.cx(0, 1)
with self.assertWarns(DeprecationWarning):
qc.cx(2, 3).c_if(1, 0)
dag = circuit_to_dag(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 4)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 2)
def test_collect_blocks_with_clbits_dagdependency(self):
"""Test collecting and collapsing blocks with classical bits appearing
under conditions, using DAGDependency."""
qc = QuantumCircuit(4, 3)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(0, 1)
qc.cx(2, 3)
qc.cx(1, 2)
qc.cx(0, 1)
with self.assertWarns(DeprecationWarning):
qc.cx(2, 3).c_if(1, 0)
dag = circuit_to_dagdependency(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 5)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dagdependency_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 4)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 2)
def test_collect_blocks_with_clbits2(self):
"""Test collecting and collapsing blocks with classical bits appearing under
condition."""
qreg = QuantumRegister(4, "qr")
creg = ClassicalRegister(3, "cr")
cbit = Clbit()
qc = QuantumCircuit(qreg, creg, [cbit])
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[1], 1)
with self.assertWarns(DeprecationWarning):
qc.cx(2, 3).c_if(cbit, 0)
qc.cx(1, 2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[2], 1)
dag = circuit_to_dag(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 4)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 4)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 3)
def test_collect_blocks_with_clbits2_dagdependency(self):
"""Test collecting and collapsing blocks with classical bits appearing under
condition, using DAGDependency."""
qreg = QuantumRegister(4, "qr")
creg = ClassicalRegister(3, "cr")
cbit = Clbit()
qc = QuantumCircuit(qreg, creg, [cbit])
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[1], 1)
with self.assertWarns(DeprecationWarning):
qc.cx(2, 3).c_if(cbit, 0)
qc.cx(1, 2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[2], 1)
dag = circuit_to_dag(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 4)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 4)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 3)
def test_collect_blocks_with_cregs(self):
"""Test collecting and collapsing blocks with classical registers appearing under
condition."""
qreg = QuantumRegister(4, "qr")
creg = ClassicalRegister(3, "cr")
creg2 = ClassicalRegister(2, "cr2")
qc = QuantumCircuit(qreg, creg, creg2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg, 3)
qc.cx(1, 2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[2], 1)
dag = circuit_to_dag(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 3)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(len(collapsed_qc.data[0].operation.definition.cregs), 1)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 3)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 3)
def test_collect_blocks_with_cregs_dagdependency(self):
"""Test collecting and collapsing blocks with classical registers appearing under
condition, using DAGDependency."""
qreg = QuantumRegister(4, "qr")
creg = ClassicalRegister(3, "cr")
creg2 = ClassicalRegister(2, "cr2")
qc = QuantumCircuit(qreg, creg, creg2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg, 3)
qc.cx(1, 2)
with self.assertWarns(DeprecationWarning):
qc.cx(0, 1).c_if(creg[2], 1)
dag = circuit_to_dagdependency(qc)
# Collect all cx gates (including the conditional ones)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda node: node.op.name == "cx", split_blocks=False, min_block_size=1
)
# We should have a single block consisting of all CX nodes
self.assertEqual(len(blocks), 1)
self.assertEqual(len(blocks[0]), 3)
def _collapse_fn(circuit):
op = circuit_to_instruction(circuit)
op.name = "COLLAPSED"
return op
# Collapse block with measures into a single "COLLAPSED" block
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(blocks, _collapse_fn)
collapsed_qc = dagdependency_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.name, "COLLAPSED")
self.assertEqual(len(collapsed_qc.data[0].operation.definition.cregs), 1)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 3)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 3)
def test_collect_blocks_backwards_dagcircuit(self):
"""Test collecting H gates from DAGCircuit in the forward vs. the reverse
directions."""
qc = QuantumCircuit(4)
qc.h(0)
qc.h(1)
qc.h(2)
qc.h(3)
qc.cx(1, 2)
qc.z(0)
qc.z(1)
qc.z(2)
qc.z(3)
block_collector = BlockCollector(circuit_to_dag(qc))
# When collecting in the forward direction, there are two blocks of
# single-qubit gates: the first of size 6, and the second of size 2.
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["h", "z"],
split_blocks=False,
min_block_size=1,
collect_from_back=False,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 6)
self.assertEqual(len(blocks[1]), 2)
# When collecting in the backward direction, there are also two blocks of
# single-qubit ates: but now the first is of size 2, and the second is of size 6.
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["h", "z"],
split_blocks=False,
min_block_size=1,
collect_from_back=True,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 2)
self.assertEqual(len(blocks[1]), 6)
def test_collect_blocks_backwards_dagdependency(self):
"""Test collecting H gates from DAGDependency in the forward vs. the reverse
directions."""
qc = QuantumCircuit(4)
qc.z(0)
qc.z(1)
qc.z(2)
qc.z(3)
qc.cx(1, 2)
qc.h(0)
qc.h(1)
qc.h(2)
qc.h(3)
block_collector = BlockCollector(circuit_to_dagdependency(qc))
# When collecting in the forward direction, there are two blocks of
# single-qubit gates: the first of size 6, and the second of size 2.
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["h", "z"],
split_blocks=False,
min_block_size=1,
collect_from_back=False,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 6)
self.assertEqual(len(blocks[1]), 2)
# When collecting in the backward direction, there are also two blocks of
# single-qubit ates: but now the first is of size 1, and the second is of size 7
# (note that z(1) and CX(1, 2) commute).
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["h", "z"],
split_blocks=False,
min_block_size=1,
collect_from_back=True,
)
self.assertEqual(len(blocks), 2)
self.assertEqual(len(blocks[0]), 1)
self.assertEqual(len(blocks[1]), 7)
@ddt.data(circuit_to_dag, circuit_to_dagdependency)
def test_max_block_width_default(self, converter):
"""Test that not explicitly specifying ``max_block_width`` works as expected."""
# original circuit
circuit = QuantumCircuit(6)
circuit.h(0)
circuit.cx(0, 1)
circuit.cx(1, 2)
circuit.cx(2, 3)
circuit.cx(3, 4)
circuit.cx(4, 5)
block_collector = BlockCollector(converter(circuit))
# When max_block_width is not specified, we should obtain 1 block
blocks = block_collector.collect_all_matching_blocks(
lambda node: True,
min_block_size=1,
)
self.assertEqual(len(blocks), 1)
@ddt.data(
(circuit_to_dag, None, 1),
(circuit_to_dag, 2, 5),
(circuit_to_dag, 3, 3),
(circuit_to_dag, 4, 2),
(circuit_to_dag, 6, 1),
(circuit_to_dag, 10, 1),
(circuit_to_dagdependency, None, 1),
(circuit_to_dagdependency, 2, 5),
(circuit_to_dagdependency, 3, 3),
(circuit_to_dagdependency, 4, 2),
(circuit_to_dagdependency, 6, 1),
(circuit_to_dagdependency, 10, 1),
)
@ddt.unpack
def test_max_block_width(self, converter, max_block_width, num_expected_blocks):
"""Test that the option ``max_block_width`` for collecting blocks works correctly."""
# original circuit
circuit = QuantumCircuit(6)
circuit.h(0)
circuit.cx(0, 1)
circuit.cx(1, 2)
circuit.cx(2, 3)
circuit.cx(3, 4)
circuit.cx(4, 5)
block_collector = BlockCollector(converter(circuit))
blocks = block_collector.collect_all_matching_blocks(
lambda node: True,
min_block_size=1,
max_block_width=max_block_width,
)
self.assertEqual(len(blocks), num_expected_blocks)
def test_split_layers_dagcircuit(self):
"""Test that splitting blocks of nodes into layers works correctly."""
# original circuit is linear
circuit = QuantumCircuit(5)
circuit.cx(0, 2)
circuit.cx(1, 4)
circuit.cx(2, 0)
circuit.cx(0, 3)
circuit.swap(3, 2)
circuit.swap(4, 1)
block_collector = BlockCollector(circuit_to_dag(circuit))
# If we split the gates into depth-1 layers, we expect four linear blocks:
# CX(0, 2), CX(1, 4)
# CX(2, 0), CX(4, 1)
# CX(0, 3)
# CX(3, 2)
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=1,
split_layers=True,
)
self.assertEqual(len(blocks), 4)
self.assertEqual(len(blocks[0]), 2)
self.assertEqual(len(blocks[1]), 2)
self.assertEqual(len(blocks[2]), 1)
self.assertEqual(len(blocks[3]), 1)
def test_split_layers_dagdependency(self):
"""Test that splitting blocks of nodes into layers works correctly."""
# original circuit is linear
circuit = QuantumCircuit(5)
circuit.cx(0, 2)
circuit.cx(1, 4)
circuit.cx(2, 0)
circuit.cx(0, 3)
circuit.swap(3, 2)
circuit.swap(4, 1)
block_collector = BlockCollector(circuit_to_dagdependency(circuit))
# If we split the gates into depth-1 layers, we expect four linear blocks:
# CX(0, 2), CX(1, 4)
# CX(2, 0), CX(4, 1)
# CX(0, 3)
# CX(3, 2)
blocks = block_collector.collect_all_matching_blocks(
lambda node: node.op.name in ["cx", "swap"],
split_blocks=False,
min_block_size=1,
split_layers=True,
)
self.assertEqual(len(blocks), 4)
self.assertEqual(len(blocks[0]), 2)
self.assertEqual(len(blocks[1]), 2)
self.assertEqual(len(blocks[2]), 1)
self.assertEqual(len(blocks[3]), 1)
def test_block_collapser_register_condition(self):
"""Test that BlockCollapser can handle a register being used more than once."""
qc = QuantumCircuit(1, 2)
with self.assertWarns(DeprecationWarning):
qc.x(0).c_if(qc.cregs[0], 0)
with self.assertWarns(DeprecationWarning):
qc.y(0).c_if(qc.cregs[0], 1)
dag = circuit_to_dag(qc)
blocks = BlockCollector(dag).collect_all_matching_blocks(
lambda _: True, split_blocks=False, min_block_size=1
)
with self.assertWarns(DeprecationWarning):
dag = BlockCollapser(dag).collapse_to_operation(
blocks, lambda circ: circ.to_instruction()
)
collapsed_qc = dag_to_circuit(dag)
self.assertEqual(len(collapsed_qc.data), 1)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_qubits, 1)
self.assertEqual(collapsed_qc.data[0].operation.definition.num_clbits, 2)
if __name__ == "__main__":
unittest.main()