mirror of https://github.com/Qiskit/qiskit.git
876 lines
37 KiB
Python
876 lines
37 KiB
Python
# This code is part of Qiskit.
|
|
#
|
|
# (C) Copyright IBM 2024.
|
|
#
|
|
# This code is licensed under the Apache License, Version 2.0. You may
|
|
# obtain a copy of this license in the LICENSE.txt file in the root directory
|
|
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
|
|
#
|
|
# Any modifications or derivative works of this code must retain this
|
|
# copyright notice, and modified files need to carry a notice indicating
|
|
# that they have been altered from the originals.
|
|
|
|
"""Tests for Backend Sampler V2."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import unittest
|
|
from test import QiskitTestCase, combine
|
|
|
|
import numpy as np
|
|
from ddt import ddt
|
|
from numpy.typing import NDArray
|
|
|
|
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
|
|
from qiskit.circuit import Parameter
|
|
from qiskit.circuit.library import RealAmplitudes, UnitaryGate
|
|
from qiskit.primitives import PrimitiveResult, PubResult, StatevectorSampler
|
|
from qiskit.primitives.backend_sampler_v2 import BackendSamplerV2
|
|
from qiskit.primitives.containers import BitArray
|
|
from qiskit.primitives.containers.data_bin import DataBin
|
|
from qiskit.primitives.containers.sampler_pub import SamplerPub
|
|
from qiskit.providers import JobStatus
|
|
from qiskit.providers.basic_provider import BasicProviderJob, BasicSimulator
|
|
from qiskit.providers.fake_provider import GenericBackendV2
|
|
from qiskit.result import Result
|
|
from qiskit.result.models import MeasReturnType, MeasLevel
|
|
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
|
|
from qiskit.utils import optionals
|
|
from ..legacy_cmaps import LAGOS_CMAP
|
|
|
|
BACKENDS = [
|
|
BasicSimulator(),
|
|
GenericBackendV2(
|
|
num_qubits=7,
|
|
basis_gates=["id", "rz", "sx", "x", "cx", "reset"],
|
|
coupling_map=LAGOS_CMAP,
|
|
seed=42,
|
|
),
|
|
]
|
|
|
|
|
|
class Level1BackendV2(GenericBackendV2):
|
|
"""Wrapper around GenericBackendV2 adding level 1 data support for testing
|
|
|
|
GenericBackendV2 is used to run the simulation. Then level 1 data (a
|
|
complex number per classical register per shot) is generated by mapping 0
|
|
to -1 and 1 to 1 with a random number added to each shot drawn from a
|
|
normal distribution with a standard deviation of ``level1_sigma``. Each
|
|
data point has ``1j * idx`` added to it where ``idx`` is the index of the
|
|
classical register. For ``meas_return="avg"``, the individual shot results
|
|
are still calculated and then averaged.
|
|
"""
|
|
|
|
level1_sigma = 0.1
|
|
|
|
def run(self, run_input, **options):
|
|
# Validate level 1 options
|
|
if "meas_level" not in options or "meas_return" not in options:
|
|
raise ValueError(f"{type(self)} requires 'meas_level' and 'meas_return' run options!")
|
|
meas_level = options.pop("meas_level")
|
|
if meas_level != 1:
|
|
raise ValueError(f"'meas_level' must be 1, not {meas_level}")
|
|
meas_return = options.pop("meas_return")
|
|
if meas_return not in ("single", "avg"):
|
|
raise ValueError(f"Unexpected value for 'meas_return': {meas_return}")
|
|
|
|
options["memory"] = True
|
|
|
|
rng = np.random.default_rng(seed=options.get("seed_simulator"))
|
|
|
|
inner_job = super().run(run_input, **options)
|
|
result_dict = inner_job.result().to_dict()
|
|
for circ, exp_result in zip(run_input, result_dict["results"]):
|
|
num_clbits = sum(cr.size for cr in circ.cregs)
|
|
bitstrings = [
|
|
format(int(x, 16), f"0{num_clbits}b") for x in exp_result["data"]["memory"]
|
|
]
|
|
new_data = [
|
|
[
|
|
[2 * int(d) - 1 + rng.normal(scale=self.level1_sigma), i]
|
|
for i, d in enumerate(reversed(bs))
|
|
]
|
|
for bs in bitstrings
|
|
]
|
|
if meas_return == "avg":
|
|
new_data = [
|
|
[sum(shot[idx][0] for shot in new_data) / len(new_data), idx]
|
|
for idx in range(num_clbits)
|
|
]
|
|
exp_result["meas_return"] = MeasReturnType.AVERAGE
|
|
else:
|
|
exp_result["meas_return"] = MeasReturnType.SINGLE
|
|
exp_result["data"] = {"memory": new_data}
|
|
exp_result["meas_level"] = MeasLevel.KERNELED
|
|
|
|
result = Result.from_dict(result_dict)
|
|
return BasicProviderJob(self, inner_job.job_id(), result)
|
|
|
|
|
|
@ddt
|
|
class TestBackendSamplerV2(QiskitTestCase):
|
|
"""Test for BackendSamplerV2"""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self._shots = 10000
|
|
self._seed = 123
|
|
self._options = {"default_shots": self._shots, "seed_simulator": self._seed}
|
|
|
|
self._cases = []
|
|
hadamard = QuantumCircuit(1, 1, name="Hadamard")
|
|
hadamard.h(0)
|
|
hadamard.measure(0, 0)
|
|
self._cases.append((hadamard, None, {0: 5000, 1: 5000})) # case 0
|
|
|
|
bell = QuantumCircuit(2, name="Bell")
|
|
bell.h(0)
|
|
bell.cx(0, 1)
|
|
bell.measure_all()
|
|
self._cases.append((bell, None, {0: 5000, 3: 5000})) # case 1
|
|
|
|
pqc = RealAmplitudes(num_qubits=2, reps=2)
|
|
pqc.measure_all()
|
|
self._cases.append((pqc, [0] * 6, {0: 10000})) # case 2
|
|
self._cases.append((pqc, [1] * 6, {0: 168, 1: 3389, 2: 470, 3: 5973})) # case 3
|
|
self._cases.append((pqc, [0, 1, 1, 2, 3, 5], {0: 1339, 1: 3534, 2: 912, 3: 4215})) # case 4
|
|
self._cases.append((pqc, [1, 2, 3, 4, 5, 6], {0: 634, 1: 291, 2: 6039, 3: 3036})) # case 5
|
|
|
|
pqc2 = RealAmplitudes(num_qubits=2, reps=3)
|
|
pqc2.measure_all()
|
|
self._cases.append(
|
|
(pqc2, [0, 1, 2, 3, 4, 5, 6, 7], {0: 1898, 1: 6864, 2: 928, 3: 311})
|
|
) # case 6
|
|
|
|
def _assert_allclose(self, bitarray: BitArray, target: NDArray | BitArray, rtol=1e-1, atol=5e2):
|
|
self.assertEqual(bitarray.shape, target.shape)
|
|
for idx in np.ndindex(bitarray.shape):
|
|
int_counts = bitarray.get_int_counts(idx)
|
|
target_counts = (
|
|
target.get_int_counts(idx) if isinstance(target, BitArray) else target[idx]
|
|
)
|
|
max_key = max(max(int_counts.keys()), max(target_counts.keys()))
|
|
ary = np.array([int_counts.get(i, 0) for i in range(max_key + 1)])
|
|
tgt = np.array([target_counts.get(i, 0) for i in range(max_key + 1)])
|
|
np.testing.assert_allclose(ary, tgt, rtol=rtol, atol=atol, err_msg=f"index: {idx}")
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_sampler_run(self, backend):
|
|
"""Test run()."""
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
|
|
with self.subTest("single"):
|
|
bell, _, target = self._cases[1]
|
|
bell = pm.run(bell)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
job = sampler.run([bell], shots=self._shots)
|
|
result = job.result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertIsInstance(result.metadata, dict)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[0].data, DataBin)
|
|
self.assertIsInstance(result[0].data.meas, BitArray)
|
|
self._assert_allclose(result[0].data.meas, np.array(target))
|
|
|
|
with self.subTest("single with param"):
|
|
pqc, param_vals, target = self._cases[2]
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pqc = pm.run(pqc)
|
|
params = (param.name for param in pqc.parameters)
|
|
job = sampler.run([(pqc, {params: param_vals})], shots=self._shots)
|
|
result = job.result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertIsInstance(result.metadata, dict)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[0].data, DataBin)
|
|
self.assertIsInstance(result[0].data.meas, BitArray)
|
|
self._assert_allclose(result[0].data.meas, np.array(target))
|
|
|
|
with self.subTest("multiple"):
|
|
pqc, param_vals, target = self._cases[2]
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pqc = pm.run(pqc)
|
|
params = (param.name for param in pqc.parameters)
|
|
job = sampler.run(
|
|
[(pqc, {params: [param_vals, param_vals, param_vals]})], shots=self._shots
|
|
)
|
|
result = job.result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertIsInstance(result.metadata, dict)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[0].data, DataBin)
|
|
self.assertIsInstance(result[0].data.meas, BitArray)
|
|
self._assert_allclose(result[0].data.meas, np.array([target, target, target]))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_sampler_run_multiple_times(self, backend):
|
|
"""Test run() returns the same results if the same input is given."""
|
|
bell, _, _ = self._cases[1]
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
bell = pm.run(bell)
|
|
result1 = sampler.run([bell], shots=self._shots).result()
|
|
meas1 = result1[0].data.meas
|
|
result2 = sampler.run([bell], shots=self._shots).result()
|
|
meas2 = result2[0].data.meas
|
|
self._assert_allclose(meas1, meas2, rtol=0)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_sample_run_multiple_circuits(self, backend):
|
|
"""Test run() with multiple circuits."""
|
|
bell, _, target = self._cases[1]
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
bell = pm.run(bell)
|
|
result = sampler.run([bell, bell, bell], shots=self._shots).result()
|
|
self.assertEqual(len(result), 3)
|
|
self._assert_allclose(result[0].data.meas, np.array(target))
|
|
self._assert_allclose(result[1].data.meas, np.array(target))
|
|
self._assert_allclose(result[2].data.meas, np.array(target))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_1qubit(self, backend):
|
|
"""test for 1-qubit cases"""
|
|
qc = QuantumCircuit(1)
|
|
qc.measure_all()
|
|
qc2 = QuantumCircuit(1)
|
|
qc2.x(0)
|
|
qc2.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc, qc2 = pm.run([qc, qc2])
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([qc, qc2], shots=self._shots).result()
|
|
self.assertEqual(len(result), 2)
|
|
for i in range(2):
|
|
self._assert_allclose(result[i].data.meas, np.array({i: self._shots}))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_2qubit(self, backend):
|
|
"""test for 2-qubit cases"""
|
|
qc0 = QuantumCircuit(2)
|
|
qc0.measure_all()
|
|
qc1 = QuantumCircuit(2)
|
|
qc1.x(0)
|
|
qc1.measure_all()
|
|
qc2 = QuantumCircuit(2)
|
|
qc2.x(1)
|
|
qc2.measure_all()
|
|
qc3 = QuantumCircuit(2)
|
|
qc3.x([0, 1])
|
|
qc3.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc0, qc1, qc2, qc3 = pm.run([qc0, qc1, qc2, qc3])
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([qc0, qc1, qc2, qc3], shots=self._shots).result()
|
|
self.assertEqual(len(result), 4)
|
|
for i in range(4):
|
|
self._assert_allclose(result[i].data.meas, np.array({i: self._shots}))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_single_circuit(self, backend):
|
|
"""Test for single circuit case."""
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
|
|
with self.subTest("No parameter"):
|
|
circuit, _, target = self._cases[1]
|
|
circuit = pm.run(circuit)
|
|
param_target = [
|
|
(None, np.array(target)),
|
|
({}, np.array(target)),
|
|
]
|
|
for param, target in param_target:
|
|
with self.subTest(f"{circuit.name} w/ {param}"):
|
|
result = sampler.run([(circuit, param)], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.meas, target)
|
|
|
|
with self.subTest("One parameter"):
|
|
circuit = QuantumCircuit(1, 1, name="X gate")
|
|
param = Parameter("x")
|
|
circuit.ry(param, 0)
|
|
circuit.measure(0, 0)
|
|
circuit = pm.run(circuit)
|
|
param_target = [
|
|
({"x": np.pi}, np.array({1: self._shots})),
|
|
({param: np.pi}, np.array({1: self._shots})),
|
|
({"x": np.array(np.pi)}, np.array({1: self._shots})),
|
|
({param: np.array(np.pi)}, np.array({1: self._shots})),
|
|
({"x": [np.pi]}, np.array({1: self._shots})),
|
|
({param: [np.pi]}, np.array({1: self._shots})),
|
|
({"x": np.array([np.pi])}, np.array({1: self._shots})),
|
|
({param: np.array([np.pi])}, np.array({1: self._shots})),
|
|
]
|
|
for param, target in param_target:
|
|
with self.subTest(f"{circuit.name} w/ {param}"):
|
|
result = sampler.run([(circuit, param)], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.c, target)
|
|
|
|
with self.subTest("More than one parameter"):
|
|
circuit, param, target = self._cases[3]
|
|
circuit = pm.run(circuit)
|
|
param_target = [
|
|
(param, np.array(target)),
|
|
(tuple(param), np.array(target)),
|
|
(np.array(param), np.array(target)),
|
|
((param,), np.array([target])),
|
|
([param], np.array([target])),
|
|
(np.array([param]), np.array([target])),
|
|
]
|
|
for param, target in param_target:
|
|
with self.subTest(f"{circuit.name} w/ {param}"):
|
|
result = sampler.run([(circuit, param)], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.meas, target)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_reverse_meas_order(self, backend):
|
|
"""test for sampler with reverse measurement order"""
|
|
x = Parameter("x")
|
|
y = Parameter("y")
|
|
|
|
qc = QuantumCircuit(3, 3)
|
|
qc.rx(x, 0)
|
|
qc.rx(y, 1)
|
|
qc.x(2)
|
|
qc.measure(0, 2)
|
|
qc.measure(1, 1)
|
|
qc.measure(2, 0)
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc = pm.run(qc)
|
|
|
|
sampler = BackendSamplerV2(backend=backend)
|
|
sampler.options.seed_simulator = self._seed
|
|
result = sampler.run([(qc, [0, 0]), (qc, [np.pi / 2, 0])], shots=self._shots).result()
|
|
self.assertEqual(len(result), 2)
|
|
|
|
# qc({x: 0, y: 0})
|
|
self._assert_allclose(result[0].data.c, np.array({1: self._shots}))
|
|
|
|
# qc({x: pi/2, y: 0})
|
|
self._assert_allclose(result[1].data.c, np.array({1: self._shots / 2, 5: self._shots / 2}))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_errors(self, backend):
|
|
"""Test for errors with run method"""
|
|
qc1 = QuantumCircuit(1)
|
|
qc1.measure_all()
|
|
qc2 = RealAmplitudes(num_qubits=1, reps=1)
|
|
qc2.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc1, qc2 = pm.run([qc1, qc2])
|
|
|
|
sampler = BackendSamplerV2(backend=backend)
|
|
with self.subTest("set parameter values to a non-parameterized circuit"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc1, [1e2])]).result()
|
|
with self.subTest("missing all parameter values for a parameterized circuit"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([qc2]).result()
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc2, [])]).result()
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc2, None)]).result()
|
|
with self.subTest("missing some parameter values for a parameterized circuit"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc2, [1e2])]).result()
|
|
with self.subTest("too many parameter values for a parameterized circuit"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc2, [1e2] * 100)]).result()
|
|
with self.subTest("negative shots, run arg"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([qc1], shots=-1).result()
|
|
with self.subTest("negative shots, pub-like"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc1, None, -1)]).result()
|
|
with self.subTest("negative shots, pub"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([SamplerPub(qc1, shots=-1)]).result()
|
|
with self.subTest("zero shots, run arg"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([qc1], shots=0).result()
|
|
with self.subTest("zero shots, pub-like"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([(qc1, None, 0)]).result()
|
|
with self.subTest("zero shots, pub"):
|
|
with self.assertRaises(ValueError):
|
|
_ = sampler.run([SamplerPub(qc1, shots=0)]).result()
|
|
with self.subTest("missing []"):
|
|
with self.assertRaisesRegex(ValueError, "An invalid Sampler pub-like was given"):
|
|
_ = sampler.run(qc1).result()
|
|
with self.subTest("missing [] for pqc"):
|
|
with self.assertRaisesRegex(ValueError, "Note that if you want to run a single pub,"):
|
|
_ = sampler.run((qc2, [0, 1])).result()
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_empty_parameter(self, backend):
|
|
"""Test for empty parameter"""
|
|
n = 5
|
|
qc = QuantumCircuit(n, n - 1)
|
|
qc.measure(range(n - 1), range(n - 1))
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc = pm.run(qc)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
with self.subTest("one circuit"):
|
|
result = sampler.run([qc], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.c, np.array({0: self._shots}))
|
|
|
|
with self.subTest("two circuits"):
|
|
result = sampler.run([qc, qc], shots=self._shots).result()
|
|
self.assertEqual(len(result), 2)
|
|
for i in range(2):
|
|
self._assert_allclose(result[i].data.c, np.array({0: self._shots}))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_numpy_params(self, backend):
|
|
"""Test for numpy array as parameter values"""
|
|
qc = RealAmplitudes(num_qubits=2, reps=2)
|
|
qc.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc = pm.run(qc)
|
|
k = 5
|
|
params_array = np.linspace(0, 1, k * qc.num_parameters).reshape((k, qc.num_parameters))
|
|
params_list = params_array.tolist()
|
|
sampler = StatevectorSampler(seed=self._seed)
|
|
target = sampler.run([(qc, params_list)], shots=self._shots).result()
|
|
|
|
with self.subTest("ndarray"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([(qc, params_array)], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.meas, target[0].data.meas)
|
|
|
|
with self.subTest("split a list"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run(
|
|
[(qc, params) for params in params_list], shots=self._shots
|
|
).result()
|
|
self.assertEqual(len(result), k)
|
|
for i in range(k):
|
|
self._assert_allclose(
|
|
result[i].data.meas, np.array(target[0].data.meas.get_int_counts(i))
|
|
)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_with_shots_option(self, backend):
|
|
"""test with shots option."""
|
|
bell, _, _ = self._cases[1]
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
bell = pm.run(bell)
|
|
shots = 100
|
|
|
|
with self.subTest("run arg"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([bell], shots=shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].data.meas.num_shots, shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots)
|
|
|
|
with self.subTest("default shots"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
default_shots = sampler.options.default_shots
|
|
result = sampler.run([bell]).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].data.meas.num_shots, default_shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots)
|
|
|
|
with self.subTest("setting default shots"):
|
|
default_shots = 100
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
sampler.options.default_shots = default_shots
|
|
self.assertEqual(sampler.options.default_shots, default_shots)
|
|
result = sampler.run([bell]).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].data.meas.num_shots, default_shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots)
|
|
|
|
with self.subTest("pub-like"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([(bell, None, shots)]).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].data.meas.num_shots, shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots)
|
|
|
|
with self.subTest("pub"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([SamplerPub(bell, shots=shots)]).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0].data.meas.num_shots, shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots)
|
|
|
|
with self.subTest("multiple pubs"):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
shots1 = 100
|
|
shots2 = 200
|
|
result = sampler.run(
|
|
[
|
|
SamplerPub(bell, shots=shots1),
|
|
SamplerPub(bell, shots=shots2),
|
|
],
|
|
shots=self._shots,
|
|
).result()
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0].data.meas.num_shots, shots1)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots1)
|
|
self.assertEqual(result[1].data.meas.num_shots, shots2)
|
|
self.assertEqual(sum(result[1].data.meas.get_counts().values()), shots2)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_run_shots_result_size(self, backend):
|
|
"""test with shots option to validate the result size"""
|
|
n = 7 # should be less than or equal to the number of qubits of backend
|
|
qc = QuantumCircuit(n)
|
|
qc.h(range(n))
|
|
qc.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc = pm.run(qc)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([qc], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertLessEqual(result[0].data.meas.num_shots, self._shots)
|
|
self.assertEqual(sum(result[0].data.meas.get_counts().values()), self._shots)
|
|
|
|
def test_run_level1(self):
|
|
"""Test running with meas_level=1"""
|
|
nq = 2
|
|
shots = 100
|
|
|
|
backend = Level1BackendV2(nq)
|
|
qc = QuantumCircuit(nq)
|
|
qc.x(1)
|
|
qc.measure_all()
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc = pm.run(qc)
|
|
options = {
|
|
"default_shots": shots,
|
|
"seed_simulator": self._seed,
|
|
"run_options": {
|
|
"meas_level": 1,
|
|
"meas_return": "single",
|
|
},
|
|
}
|
|
sampler = BackendSamplerV2(backend=backend, options=options)
|
|
result_single = sampler.run([qc]).result()
|
|
|
|
options = {
|
|
"default_shots": shots,
|
|
"seed_simulator": self._seed,
|
|
"run_options": {
|
|
"meas_level": 1,
|
|
"meas_return": "avg",
|
|
},
|
|
}
|
|
sampler = BackendSamplerV2(backend=backend, options=options)
|
|
result_avg = sampler.run([qc]).result()
|
|
|
|
# Check that averaging the meas_return="single" data matches the
|
|
# meas_return="avg" data.
|
|
averaged_singles = np.average(result_single[0].join_data(), axis=0)
|
|
average_data = result_avg[0].join_data()
|
|
self.assertLessEqual(
|
|
max(abs(averaged_singles - average_data)),
|
|
backend.level1_sigma / np.sqrt(shots) * 6,
|
|
)
|
|
|
|
# Check that average data matches expected form for the circuit
|
|
expected_average = np.array([-1, 1 + 1j])
|
|
self.assertLessEqual(
|
|
max(abs(expected_average - average_data)),
|
|
backend.level1_sigma / np.sqrt(shots) * 6,
|
|
)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_primitive_job_status_done(self, backend):
|
|
"""test primitive job's status"""
|
|
bell, _, _ = self._cases[1]
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
bell = pm.run(bell)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
job = sampler.run([bell], shots=self._shots)
|
|
_ = job.result()
|
|
self.assertEqual(job.status(), JobStatus.DONE)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_circuit_with_unitary(self, backend):
|
|
"""Test for circuit with unitary gate."""
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
|
|
with self.subTest("identity"):
|
|
gate = UnitaryGate(np.eye(2))
|
|
|
|
circuit = QuantumCircuit(1)
|
|
circuit.append(gate, [0])
|
|
circuit.measure_all()
|
|
circuit = pm.run(circuit)
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([circuit], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.meas, np.array({0: self._shots}))
|
|
|
|
with self.subTest("X"):
|
|
gate = UnitaryGate([[0, 1], [1, 0]])
|
|
|
|
circuit = QuantumCircuit(1)
|
|
circuit.append(gate, [0])
|
|
circuit.measure_all()
|
|
circuit = pm.run(circuit)
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([circuit], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
self._assert_allclose(result[0].data.meas, np.array({1: self._shots}))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_circuit_with_multiple_cregs(self, backend):
|
|
"""Test for circuit with multiple classical registers."""
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
cases = []
|
|
|
|
# case 1
|
|
a = ClassicalRegister(1, "a")
|
|
b = ClassicalRegister(2, "b")
|
|
c = ClassicalRegister(3, "c")
|
|
|
|
qc = QuantumCircuit(QuantumRegister(3), a, b, c)
|
|
qc.h(range(3))
|
|
qc.measure([0, 1, 2, 2], [0, 2, 4, 5])
|
|
qc = pm.run(qc)
|
|
target = {"a": {0: 5000, 1: 5000}, "b": {0: 5000, 2: 5000}, "c": {0: 5000, 6: 5000}}
|
|
cases.append(("use all cregs", qc, target))
|
|
|
|
# case 2
|
|
a = ClassicalRegister(1, "a")
|
|
b = ClassicalRegister(5, "b")
|
|
c = ClassicalRegister(3, "c")
|
|
|
|
qc = QuantumCircuit(QuantumRegister(3), a, b, c)
|
|
qc.h(range(3))
|
|
qc.measure([0, 1, 2, 2], [0, 2, 4, 5])
|
|
qc = pm.run(qc)
|
|
target = {
|
|
"a": {0: 5000, 1: 5000},
|
|
"b": {0: 2500, 2: 2500, 24: 2500, 26: 2500},
|
|
"c": {0: 10000},
|
|
}
|
|
cases.append(("use only a and b", qc, target))
|
|
|
|
# case 3
|
|
a = ClassicalRegister(1, "a")
|
|
b = ClassicalRegister(2, "b")
|
|
c = ClassicalRegister(3, "c")
|
|
|
|
qc = QuantumCircuit(QuantumRegister(3), a, b, c)
|
|
qc.h(range(3))
|
|
qc.measure(1, 5)
|
|
qc = pm.run(qc)
|
|
target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}}
|
|
cases.append(("use only c", qc, target))
|
|
|
|
# case 4
|
|
a = ClassicalRegister(1, "a")
|
|
b = ClassicalRegister(2, "b")
|
|
c = ClassicalRegister(3, "c")
|
|
|
|
qc = QuantumCircuit(QuantumRegister(3), a, b, c)
|
|
qc.h(range(3))
|
|
qc.measure([0, 1, 2], [5, 5, 5])
|
|
qc = pm.run(qc)
|
|
target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}}
|
|
cases.append(("use only c multiple qubits", qc, target))
|
|
|
|
# case 5
|
|
a = ClassicalRegister(1, "a")
|
|
b = ClassicalRegister(2, "b")
|
|
c = ClassicalRegister(3, "c")
|
|
|
|
qc = QuantumCircuit(QuantumRegister(3), a, b, c)
|
|
qc.h(range(3))
|
|
qc = pm.run(qc)
|
|
target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 10000}}
|
|
cases.append(("no measure", qc, target))
|
|
|
|
for title, qc, target in cases:
|
|
with self.subTest(title):
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([qc], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
data = result[0].data
|
|
self.assertEqual(len(data), 3)
|
|
for creg in qc.cregs:
|
|
self.assertTrue(hasattr(data, creg.name))
|
|
self._assert_allclose(getattr(data, creg.name), np.array(target[creg.name]))
|
|
|
|
@unittest.skipUnless(optionals.HAS_AER, "Aer is required to simulate control flow")
|
|
def test_circuit_with_aliased_cregs(self):
|
|
"""Test for circuit with aliased classical registers."""
|
|
backend = GenericBackendV2(
|
|
num_qubits=7,
|
|
basis_gates=["id", "rz", "sx", "x", "cx", "reset"],
|
|
coupling_map=LAGOS_CMAP,
|
|
seed=42,
|
|
control_flow=True,
|
|
)
|
|
q = QuantumRegister(3, "q")
|
|
c1 = ClassicalRegister(1, "c1")
|
|
c2 = ClassicalRegister(1, "c2")
|
|
|
|
qc = QuantumCircuit(q, c1, c2)
|
|
qc.ry(np.pi / 4, 2)
|
|
qc.cx(2, 1)
|
|
qc.cx(0, 1)
|
|
qc.h(0)
|
|
qc.measure(0, c1)
|
|
qc.measure(1, c2)
|
|
with qc.if_test((c1, 1)):
|
|
qc.z(2)
|
|
with qc.if_test((c2, 1)):
|
|
qc.x(2)
|
|
qc2 = QuantumCircuit(5, 5)
|
|
qc2.compose(qc, [0, 2, 3], [2, 4], inplace=True)
|
|
cregs = [creg.name for creg in qc2.cregs]
|
|
target = {
|
|
cregs[0]: {0: 4255, 4: 4297, 16: 720, 20: 726},
|
|
cregs[1]: {0: 5000, 1: 5000},
|
|
cregs[2]: {0: 8500, 1: 1500},
|
|
}
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
qc2 = pm.run(qc2)
|
|
result = sampler.run([qc2], shots=self._shots).result()
|
|
self.assertEqual(len(result), 1)
|
|
data = result[0].data
|
|
self.assertEqual(len(data), 3)
|
|
for creg_name, creg in target.items():
|
|
self.assertTrue(hasattr(data, creg_name))
|
|
self._assert_allclose(getattr(data, creg_name), np.array(creg))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_no_cregs(self, backend):
|
|
"""Test that the sampler works when there are no classical register in the circuit."""
|
|
qc = QuantumCircuit(2)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
with self.assertWarns(UserWarning):
|
|
result = sampler.run([qc]).result()
|
|
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(len(result[0].data), 0)
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_empty_creg(self, backend):
|
|
"""Test that the sampler works if provided a classical register with no bits."""
|
|
# Test case for issue #12043
|
|
q = QuantumRegister(1, "q")
|
|
c1 = ClassicalRegister(0, "c1")
|
|
c2 = ClassicalRegister(1, "c2")
|
|
qc = QuantumCircuit(q, c1, c2)
|
|
qc.h(0)
|
|
qc.measure(0, 0)
|
|
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
result = sampler.run([qc], shots=self._shots).result()
|
|
self.assertEqual(result[0].data.c1.array.shape, (self._shots, 0))
|
|
|
|
@combine(backend=BACKENDS)
|
|
def test_diff_shots(self, backend):
|
|
"""Test of pubs with different shots"""
|
|
pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
|
|
|
|
bell, _, target = self._cases[1]
|
|
bell = pm.run(bell)
|
|
sampler = BackendSamplerV2(backend=backend, options=self._options)
|
|
shots2 = self._shots + 2
|
|
target2 = {k: v + 1 for k, v in target.items()}
|
|
job = sampler.run([(bell, None, self._shots), (bell, None, shots2)])
|
|
result = job.result()
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0].data.meas.num_shots, self._shots)
|
|
self._assert_allclose(result[0].data.meas, np.array(target))
|
|
self.assertEqual(result[1].data.meas.num_shots, shots2)
|
|
self._assert_allclose(result[1].data.meas, np.array(target2))
|
|
|
|
def test_job_size_limit_backend_v2(self):
|
|
"""Test BackendSamplerV2 respects backend's job size limit."""
|
|
|
|
class FakeBackendLimitedCircuits(GenericBackendV2):
|
|
"""Generic backend V2 with job size limit."""
|
|
|
|
@property
|
|
def max_circuits(self):
|
|
return 1
|
|
|
|
qc = QuantumCircuit(1)
|
|
qc.measure_all()
|
|
qc2 = QuantumCircuit(1)
|
|
qc2.x(0)
|
|
qc2.measure_all()
|
|
sampler = BackendSamplerV2(backend=FakeBackendLimitedCircuits(num_qubits=5))
|
|
result = sampler.run([qc, qc2], shots=self._shots).result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertEqual(len(result), 2)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[1], PubResult)
|
|
self._assert_allclose(result[0].data.meas, np.array({0: self._shots}))
|
|
self._assert_allclose(result[1].data.meas, np.array({1: self._shots}))
|
|
|
|
def test_job_size_limit_backend_v1(self):
|
|
"""Test BackendSamplerV2 respects backend's job size limit."""
|
|
backend = GenericBackendV2(2, basis_gates=["cx", "u1", "u2", "u3"], seed=42)
|
|
qc = QuantumCircuit(1)
|
|
qc.measure_all()
|
|
qc2 = QuantumCircuit(1)
|
|
qc2.x(0)
|
|
qc2.measure_all()
|
|
sampler = BackendSamplerV2(backend=backend)
|
|
result = sampler.run([qc, qc2], shots=self._shots).result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertEqual(len(result), 2)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[1], PubResult)
|
|
self._assert_allclose(result[0].data.meas, np.array({0: self._shots}))
|
|
self._assert_allclose(result[1].data.meas, np.array({1: self._shots}))
|
|
|
|
def test_iter_pub(self):
|
|
"""Test of an iterable of pubs"""
|
|
backend = BasicSimulator()
|
|
qc = QuantumCircuit(1)
|
|
qc.measure_all()
|
|
qc2 = QuantumCircuit(1)
|
|
qc2.x(0)
|
|
qc2.measure_all()
|
|
sampler = BackendSamplerV2(backend=backend)
|
|
result = sampler.run(iter([qc, qc2]), shots=self._shots).result()
|
|
self.assertIsInstance(result, PrimitiveResult)
|
|
self.assertEqual(len(result), 2)
|
|
self.assertIsInstance(result[0], PubResult)
|
|
self.assertIsInstance(result[1], PubResult)
|
|
self._assert_allclose(result[0].data.meas, np.array({0: self._shots}))
|
|
self._assert_allclose(result[1].data.meas, np.array({1: self._shots}))
|
|
|
|
def test_metadata(self):
|
|
"""Test for metadata"""
|
|
qc = QuantumCircuit(2)
|
|
qc.measure_all()
|
|
qc2 = qc.copy()
|
|
qc2.metadata = {"a": 1}
|
|
backend = BasicSimulator()
|
|
sampler = BackendSamplerV2(backend=backend)
|
|
result = sampler.run([(qc, None, 10), (qc2, None, 20)]).result()
|
|
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result.metadata, {"version": 2})
|
|
self.assertEqual(result[0].metadata, {"shots": 10, "circuit_metadata": qc.metadata})
|
|
self.assertEqual(result[1].metadata, {"shots": 20, "circuit_metadata": qc2.metadata})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|