mirror of https://github.com/Qiskit/qiskit.git
Setting limits for CSPLayout (#3515)
* limit in the solver * cleaning up * lint * add no limit for call_limit=None * add time limit * adding reason to stop * but on reason * limit tests * remove qasm * docstring * Update qiskit/transpiler/passes/mapping/csp_layout.py Co-Authored-By: Kevin Krsulich <kevin@krsulich.net> * rename the property CSP_stop_reason * release notes
This commit is contained in:
parent
06e56a900e
commit
9c140b4894
|
@ -18,6 +18,7 @@ satisfy the circuit, i.e. no further swap is needed. If no solution is
|
|||
found, no ``property_set['layout']`` is set.
|
||||
"""
|
||||
import random
|
||||
from time import time
|
||||
|
||||
from qiskit.transpiler.layout import Layout
|
||||
from qiskit.transpiler.basepasses import AnalysisPass
|
||||
|
@ -29,20 +30,33 @@ class CSPLayout(AnalysisPass):
|
|||
If possible, chooses a Layout as a CSP, using backtracking.
|
||||
"""
|
||||
|
||||
def __init__(self, coupling_map, strict_direction=False, seed=None):
|
||||
def __init__(self, coupling_map, strict_direction=False, seed=None, call_limit=1000,
|
||||
time_limit=10):
|
||||
"""
|
||||
If possible, chooses a Layout as a CSP, using backtracking. If not possible,
|
||||
does not set the layout property.
|
||||
does not set the layout property. In all the cases, the property ``CSPLayout_stop_reason``
|
||||
will be added with one of the following values:
|
||||
- solution found: If a perfect layout was found.
|
||||
- nonexistent solution: If no perfect layout was found and every combination was checked.
|
||||
- call limit reached: If no perfect layout was found and the call limit was reached.
|
||||
- time limit reached: If no perfect layout was found and the time limit was reached.
|
||||
|
||||
Args:
|
||||
coupling_map (Coupling): Directed graph representing a coupling map.
|
||||
strict_direction (bool): If True, considers the direction of the coupling map.
|
||||
Default is False.
|
||||
seed (int): Sets the seed of the PRNG.
|
||||
call_limit (int): Amount of times that
|
||||
``constraint.RecursiveBacktrackingSolver.recursiveBacktracking`` will be called.
|
||||
None means no call limit. Default: 1000.
|
||||
time_limit (int): Amount of seconds that the pass will try to find a solution.
|
||||
None means no time limit. Default: 10 seconds.
|
||||
"""
|
||||
super().__init__()
|
||||
self.coupling_map = coupling_map
|
||||
self.strict_direction = strict_direction
|
||||
self.call_limit = call_limit
|
||||
self.time_limit = time_limit
|
||||
self.seed = seed
|
||||
|
||||
def run(self, dag):
|
||||
|
@ -59,10 +73,54 @@ class CSPLayout(AnalysisPass):
|
|||
qubits.index(gate.qargs[1])))
|
||||
edges = self.coupling_map.get_edges()
|
||||
|
||||
problem = Problem(RecursiveBacktrackingSolver())
|
||||
class CustomSolver(RecursiveBacktrackingSolver):
|
||||
"""A wrap to RecursiveBacktrackingSolver to support ``call_limit``"""
|
||||
|
||||
def __init__(self, call_limit=None, time_limit=None):
|
||||
self.call_limit = call_limit
|
||||
self.time_limit = time_limit
|
||||
self.call_current = None
|
||||
self.time_start = None
|
||||
self.time_current = None
|
||||
super().__init__()
|
||||
|
||||
def limit_reached(self):
|
||||
"""Checks if a limit is reached."""
|
||||
if self.call_current is not None:
|
||||
self.call_current += 1
|
||||
if self.call_current > self.call_limit:
|
||||
return True
|
||||
if self.time_start is not None:
|
||||
self.time_current = time() - self.time_start
|
||||
if self.time_current > self.time_limit:
|
||||
return True
|
||||
return False
|
||||
|
||||
def getSolution(self, # pylint: disable=invalid-name
|
||||
domains, constraints, vconstraints):
|
||||
"""Wrap RecursiveBacktrackingSolver.getSolution to add the limits."""
|
||||
if self.call_limit is not None:
|
||||
self.call_current = 0
|
||||
if self.time_limit is not None:
|
||||
self.time_start = time()
|
||||
return super().getSolution(domains, constraints, vconstraints)
|
||||
|
||||
def recursiveBacktracking(self, # pylint: disable=invalid-name
|
||||
solutions, domains, vconstraints, assignments, single):
|
||||
"""Like ``constraint.RecursiveBacktrackingSolver.recursiveBacktracking`` but
|
||||
limited in the amount of calls by ``self.call_limit`` """
|
||||
if self.limit_reached():
|
||||
return None
|
||||
return super().recursiveBacktracking(solutions, domains, vconstraints, assignments,
|
||||
single)
|
||||
|
||||
if self.time_limit is None and self.call_limit is None:
|
||||
solver = RecursiveBacktrackingSolver()
|
||||
else:
|
||||
solver = CustomSolver(call_limit=self.call_limit, time_limit=self.time_limit)
|
||||
|
||||
problem = Problem(solver)
|
||||
problem.addVariables(list(range(len(qubits))), self.coupling_map.physical_qubits)
|
||||
|
||||
problem.addConstraint(AllDifferentConstraint()) # each wire is map to a single qbit
|
||||
|
||||
if self.strict_direction:
|
||||
|
@ -79,6 +137,14 @@ class CSPLayout(AnalysisPass):
|
|||
solution = problem.getSolution()
|
||||
|
||||
if solution is None:
|
||||
return
|
||||
stop_reason = 'nonexistent solution'
|
||||
if isinstance(solver, CustomSolver):
|
||||
if solver.time_limit is not None and solver.time_current >= self.time_limit:
|
||||
stop_reason = 'time limit reached'
|
||||
elif solver.call_limit is not None and solver.call_current >= self.call_limit:
|
||||
stop_reason = 'call limit reached'
|
||||
else:
|
||||
stop_reason = 'solution found'
|
||||
self.property_set['layout'] = Layout({v: qubits[k] for k, v in solution.items()})
|
||||
|
||||
self.property_set['layout'] = Layout({v: qubits[k] for k, v in solution.items()})
|
||||
self.property_set['CSPLayout_stop_reason'] = stop_reason
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
upgrade:
|
||||
- |
|
||||
The pass ``CSPLayout`` was extended with two new parameters: ``call_limit`` and ``time_limit``.
|
||||
These options allow to limit how long this pass will run. The option ``call_limit`` limits the
|
||||
amount of time that the recursive function in the backtracking solver is called. Similarly,
|
||||
``call_limit`` limits how long (in seconds) the solver will be running. The defaults
|
||||
are ``1000`` calls and ``10`` seconds respectively.
|
|
@ -15,13 +15,14 @@
|
|||
"""Test the CSPLayout pass"""
|
||||
|
||||
import unittest
|
||||
from time import process_time
|
||||
|
||||
from qiskit import QuantumRegister, QuantumCircuit
|
||||
from qiskit.transpiler import CouplingMap
|
||||
from qiskit.transpiler.passes import CSPLayout
|
||||
from qiskit.converters import circuit_to_dag
|
||||
from qiskit.test import QiskitTestCase
|
||||
from qiskit.test.mock import FakeTenerife, FakeRueschlikon
|
||||
from qiskit.test.mock import FakeTenerife, FakeRueschlikon, FakeTokyo
|
||||
|
||||
try:
|
||||
import constraint # pylint: disable=unused-import, import-error
|
||||
|
@ -52,6 +53,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
|
||||
self.assertEqual(layout[qr[0]], 0)
|
||||
self.assertEqual(layout[qr[1]], 1)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_3q_circuit_5q_coupling(self):
|
||||
""" 3 qubits in Tenerife, without considering the direction
|
||||
|
@ -77,6 +79,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
self.assertEqual(layout[qr[0]], 0)
|
||||
self.assertEqual(layout[qr[1]], 1)
|
||||
self.assertEqual(layout[qr[2]], 2)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_9q_circuit_16q_coupling(self):
|
||||
""" 9 qubits in Rueschlikon, without considering the direction
|
||||
|
@ -107,6 +110,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
self.assertEqual(layout[qr1[2]], 7)
|
||||
self.assertEqual(layout[qr1[3]], 3)
|
||||
self.assertEqual(layout[qr1[4]], 15)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_2q_circuit_2q_coupling_sd(self):
|
||||
""" A simple example, considering the direction
|
||||
|
@ -124,6 +128,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
|
||||
self.assertEqual(layout[qr[0]], 1)
|
||||
self.assertEqual(layout[qr[1]], 0)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_3q_circuit_5q_coupling_sd(self):
|
||||
""" 3 qubits in Tenerife, considering the direction
|
||||
|
@ -149,6 +154,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
self.assertEqual(layout[qr[0]], 1)
|
||||
self.assertEqual(layout[qr[1]], 2)
|
||||
self.assertEqual(layout[qr[2]], 0)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_9q_circuit_16q_coupling_sd(self):
|
||||
""" 9 qubits in Rueschlikon, considering the direction
|
||||
|
@ -179,6 +185,7 @@ class TestCSPLayout(QiskitTestCase):
|
|||
self.assertEqual(layout[qr1[2]], 7)
|
||||
self.assertEqual(layout[qr1[3]], 3)
|
||||
self.assertEqual(layout[qr1[4]], 15)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'solution found')
|
||||
|
||||
def test_5q_circuit_16q_coupling_no_solution(self):
|
||||
""" 5 qubits in Rueschlikon, no solution
|
||||
|
@ -200,6 +207,71 @@ class TestCSPLayout(QiskitTestCase):
|
|||
pass_.run(dag)
|
||||
layout = pass_.property_set['layout']
|
||||
self.assertIsNone(layout)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'nonexistent solution')
|
||||
|
||||
@staticmethod
|
||||
def create_hard_dag():
|
||||
"""Creates a particularly hard circuit (returns its dag) for Tokyo"""
|
||||
circuit = QuantumCircuit(20)
|
||||
circuit.cx(13, 12)
|
||||
circuit.cx(6, 0)
|
||||
circuit.cx(5, 10)
|
||||
circuit.cx(10, 7)
|
||||
circuit.cx(5, 12)
|
||||
circuit.cx(2, 15)
|
||||
circuit.cx(16, 18)
|
||||
circuit.cx(6, 4)
|
||||
circuit.cx(10, 3)
|
||||
circuit.cx(11, 10)
|
||||
circuit.cx(18, 16)
|
||||
circuit.cx(5, 12)
|
||||
circuit.cx(4, 0)
|
||||
circuit.cx(18, 16)
|
||||
circuit.cx(2, 15)
|
||||
circuit.cx(7, 8)
|
||||
circuit.cx(9, 6)
|
||||
circuit.cx(16, 17)
|
||||
circuit.cx(9, 3)
|
||||
circuit.cx(14, 12)
|
||||
circuit.cx(2, 15)
|
||||
circuit.cx(1, 16)
|
||||
circuit.cx(5, 3)
|
||||
circuit.cx(8, 12)
|
||||
circuit.cx(2, 1)
|
||||
circuit.cx(5, 3)
|
||||
circuit.cx(13, 5)
|
||||
circuit.cx(12, 14)
|
||||
circuit.cx(12, 13)
|
||||
circuit.cx(6, 4)
|
||||
circuit.cx(15, 18)
|
||||
circuit.cx(15, 18)
|
||||
return circuit_to_dag(circuit)
|
||||
|
||||
def test_time_limit(self):
|
||||
"""Hard to solve situations hit the time limit"""
|
||||
dag = TestCSPLayout.create_hard_dag()
|
||||
coupling_map = CouplingMap(FakeTokyo().configuration().coupling_map)
|
||||
pass_ = CSPLayout(coupling_map, call_limit=None, time_limit=1)
|
||||
|
||||
start = process_time()
|
||||
pass_.run(dag)
|
||||
runtime = process_time() - start
|
||||
|
||||
self.assertLess(runtime, 2)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'time limit reached')
|
||||
|
||||
def test_call_limit(self):
|
||||
"""Hard to solve situations hit the call limit"""
|
||||
dag = TestCSPLayout.create_hard_dag()
|
||||
coupling_map = CouplingMap(FakeTokyo().configuration().coupling_map)
|
||||
pass_ = CSPLayout(coupling_map, call_limit=1, time_limit=None)
|
||||
|
||||
start = process_time()
|
||||
pass_.run(dag)
|
||||
runtime = process_time() - start
|
||||
|
||||
self.assertLess(runtime, 1)
|
||||
self.assertEqual(pass_.property_set['CSPLayout_stop_reason'], 'call limit reached')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue