Add BackendSampler for Sampler from an arbitrary backend (#8668)

* Add BackendSampler for Sampler from an arbitrary backend

This commit adds a new Sampler implementation to qiskit-terra,
BackendSampler. This sampler implementation enables a sampler object
to be constructed from any backend object. It can be used to enable
backends from providers that don't have native primitive implementations
(which is most providers) to leverage tooling built on primtives. The
API works exactly the same as other sampler implementations except it
takes a required argument on the constructor `backend` which is used
to specify the `Backend` object to execute circuits on.

Co-authored-by: Takashi Imamichi <31178928+t-imamichi@users.noreply.github.com>
Co-authored-by: Ikko Hamamura <ikkoham@users.noreply.github.com>

* Update qiskit/primitives/backend_sampler.py

Co-authored-by: Jake Lishman <jake@binhbar.com>

* Remove unused method

* Update qiskit/primitives/backend_sampler.py

Co-authored-by: Takashi Imamichi <31178928+t-imamichi@users.noreply.github.com>

* Update API for primitive interface changes

* Fix initial statevector handling in basic aer

* Fix type hints

* follow new interfaces

* remove deprecated methods

* remove deprecated methods

Co-authored-by: Takashi Imamichi <31178928+t-imamichi@users.noreply.github.com>
Co-authored-by: Ikko Hamamura <ikkoham@users.noreply.github.com>
Co-authored-by: Jake Lishman <jake@binhbar.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Matthew Treinish 2022-09-29 22:32:18 -04:00 committed by GitHub
parent 2f665491ae
commit 6f84c70aa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 505 additions and 0 deletions

View File

@ -40,6 +40,7 @@ Sampler
BaseSampler
Sampler
BackendSampler
Results
=======
@ -58,5 +59,6 @@ from .base_sampler import BaseSampler
from .backend_estimator import BackendEstimator
from .estimator import Estimator
from .estimator_result import EstimatorResult
from .backend_sampler import BackendSampler
from .sampler import Sampler
from .sampler_result import SamplerResult

View File

@ -0,0 +1,203 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Sampler implementation for an artibtrary Backend object."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, cast
from qiskit.circuit.quantumcircuit import QuantumCircuit
from qiskit.providers.backend import BackendV1, BackendV2
from qiskit.providers.options import Options
from qiskit.result import QuasiDistribution, Result
from qiskit.transpiler.passmanager import PassManager
from .base_sampler import BaseSampler
from .primitive_job import PrimitiveJob
from .sampler_result import SamplerResult
from .utils import _circuit_key
class BackendSampler(BaseSampler):
"""A :class:`~.BaseSampler` implementation that provides an interface for leveraging
the sampler interface from any backend.
This class provides a sampler interface from any backend and doesn't do
any measurement mitigation, it just computes the probability distribution
from the counts.
"""
def __init__(
self,
backend: BackendV1 | BackendV2,
options: dict | None = None,
bound_pass_manager: PassManager | None = None,
skip_transpilation: bool = False,
):
"""Initialize a new BackendSampler
Args:
backend: Required: the backend to run the sampler primitive on
options: Default options.
bound_pass_manager: An optional pass manager to run after
parameter binding.
skip_transpilation: If this is set to True the internal compilation
of the input circuits is skipped and the circuit objects
will be directly executed when this objected is called.
Raises:
ValueError: If backend is not provided
"""
super().__init__(None, None, options)
self._backend = backend
self._transpile_options = Options()
self._bound_pass_manager = bound_pass_manager
self._preprocessed_circuits: list[QuantumCircuit] | None = None
self._transpiled_circuits: list[QuantumCircuit] | None = None
self._skip_transpilation = skip_transpilation
def __new__( # pylint: disable=signature-differs
cls,
backend: BackendV1 | BackendV2, # pylint: disable=unused-argument
**kwargs, # pylint: disable=unused-argument
):
self = super().__new__(cls)
return self
@property
def preprocessed_circuits(self) -> list[QuantumCircuit]:
"""
Preprocessed quantum circuits produced by preprocessing
Returns:
List of the transpiled quantum circuit
Raises:
QiskitError: if the instance has been closed.
"""
return list(self._circuits)
@property
def transpiled_circuits(self) -> list[QuantumCircuit]:
"""
Transpiled quantum circuits.
Returns:
List of the transpiled quantum circuit
Raises:
QiskitError: if the instance has been closed.
"""
if self._skip_transpilation:
self._transpiled_circuits = list(self._circuits)
elif self._transpiled_circuits is None:
# Only transpile if have not done so yet
self._transpile()
return self._transpiled_circuits
@property
def backend(self) -> BackendV1 | BackendV2:
"""
Returns:
The backend which this sampler object based on
"""
return self._backend
@property
def transpile_options(self) -> Options:
"""Return the transpiler options for transpiling the circuits."""
return self._transpile_options
def set_transpile_options(self, **fields):
"""Set the transpiler options for transpiler.
Args:
**fields: The fields to update the options.
Returns:
self.
Raises:
QiskitError: if the instance has been closed.
"""
self._transpile_options.update_options(**fields)
def _call(
self,
circuits: Sequence[int],
parameter_values: Sequence[Sequence[float]],
**run_options,
) -> SamplerResult:
# This line does the actual transpilation
transpiled_circuits = self.transpiled_circuits
bound_circuits = [
transpiled_circuits[i]
if len(value) == 0
else transpiled_circuits[i].bind_parameters((dict(zip(self._parameters[i], value))))
for i, value in zip(circuits, parameter_values)
]
bound_circuits = self._bound_pass_manager_run(bound_circuits)
# Run
result = self._backend.run(bound_circuits, **run_options).result()
return self._postprocessing(result, bound_circuits)
def _postprocessing(self, result: Result, circuits: list[QuantumCircuit]) -> SamplerResult:
counts = result.get_counts()
if not isinstance(counts, list):
counts = [counts]
shots = sum(counts[0].values())
probabilies = []
metadata: list[dict[str, Any]] = [{}] * len(circuits)
for count in counts:
prob_dist = {k: v / shots for k, v in count.int_outcomes().items()}
probabilies.append(QuasiDistribution(prob_dist))
for metadatum in metadata:
metadatum["shots"] = shots
return SamplerResult(probabilies, metadata)
def _transpile(self):
from qiskit.compiler import transpile
self._transpiled_circuits = cast(
"list[QuantumCircuit]",
transpile(
self.preprocessed_circuits,
self.backend,
**self.transpile_options.__dict__,
),
)
def _bound_pass_manager_run(self, circuits):
if self._bound_pass_manager is None:
return circuits
else:
return cast("list[QuantumCircuit]", self._bound_pass_manager.run(circuits))
def _run(
self,
circuits: Sequence[QuantumCircuit],
parameter_values: Sequence[Sequence[float]],
**run_options,
) -> PrimitiveJob:
circuit_indices = []
for circuit in circuits:
index = self._circuit_ids.get(_circuit_key(circuit))
if index is not None:
circuit_indices.append(index)
else:
circuit_indices.append(len(self._circuits))
self._circuit_ids[_circuit_key(circuit)] = len(self._circuits)
self._circuits.append(circuit)
self._parameters.append(circuit.parameters)
job = PrimitiveJob(self._call, circuit_indices, parameter_values, **run_options)
job.submit()
return job

View File

@ -0,0 +1,18 @@
---
features:
- |
Added a new :class:`~.BaseSampler` implementation, :class:`~.BackendSampler`, which is
used to create a sampler implementation from a :class:~.Backend` object. If a provider
doesn't provide native primitive implementations the :class:`~.BackendSampler` class
can be used to run anything that requires a sampler primitive object with any backend.
For example::
from qiskit.primitives import BackendSampler
from qiskit.providers.fake_provider import FakeVigo
sampler_for_retired_device = BackendSampler(backend=FakeVigo())
qc = QuantumCircuit(2)
qc.h(0)
qc.cx(0, 1)
qc.measure_all()
sampler_for_retired_device.run(qc)

View File

@ -0,0 +1,282 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Tests for BackendSampler."""
import unittest
from test import combine
import numpy as np
from ddt import ddt
from qiskit import QuantumCircuit
from qiskit.circuit.library import RealAmplitudes
from qiskit.primitives import BackendSampler, SamplerResult
from qiskit.providers import JobStatus, JobV1
from qiskit.providers.fake_provider import FakeNairobi, FakeNairobiV2
from qiskit.test import QiskitTestCase
BACKENDS = [FakeNairobi(), FakeNairobiV2()]
@ddt
class TestBackendSampler(QiskitTestCase):
"""Test BackendSampler"""
def setUp(self):
super().setUp()
hadamard = QuantumCircuit(1, 1)
hadamard.h(0)
hadamard.measure(0, 0)
bell = QuantumCircuit(2)
bell.h(0)
bell.cx(0, 1)
bell.measure_all()
self._circuit = [hadamard, bell]
self._target = [
{0: 0.5, 1: 0.5},
{0: 0.5, 3: 0.5, 1: 0, 2: 0},
]
self._pqc = RealAmplitudes(num_qubits=2, reps=2)
self._pqc.measure_all()
self._pqc2 = RealAmplitudes(num_qubits=2, reps=3)
self._pqc2.measure_all()
self._pqc_params = [[0.0] * 6, [1.0] * 6]
self._pqc_target = [{0: 1}, {0: 0.0148, 1: 0.3449, 2: 0.0531, 3: 0.5872}]
self._theta = [
[0, 1, 1, 2, 3, 5],
[1, 2, 3, 4, 5, 6],
[0, 1, 2, 3, 4, 5, 6, 7],
]
def _generate_circuits_target(self, indices):
if isinstance(indices, list):
circuits = [self._circuit[j] for j in indices]
target = [self._target[j] for j in indices]
else:
raise ValueError(f"invalid index {indices}")
return circuits, target
def _generate_params_target(self, indices):
if isinstance(indices, int):
params = self._pqc_params[indices]
target = self._pqc_target[indices]
elif isinstance(indices, list):
params = [self._pqc_params[j] for j in indices]
target = [self._pqc_target[j] for j in indices]
else:
raise ValueError(f"invalid index {indices}")
return params, target
def _compare_probs(self, prob, target):
if not isinstance(prob, list):
prob = [prob]
if not isinstance(target, list):
target = [target]
self.assertEqual(len(prob), len(target))
for p, targ in zip(prob, target):
for key, t_val in targ.items():
if key in p:
self.assertAlmostEqual(p[key], t_val, delta=0.1)
else:
self.assertAlmostEqual(t_val, 0, delta=0.1)
@combine(backend=BACKENDS)
def test_sampler_run(self, backend):
"""Test Sampler.run()."""
bell = self._circuit[1]
sampler = BackendSampler(backend=backend)
job = sampler.run(circuits=[bell])
self.assertIsInstance(job, JobV1)
result = job.result()
self.assertIsInstance(result, SamplerResult)
# print([q.binary_probabilities() for q in result.quasi_dists])
self._compare_probs(result.quasi_dists, self._target[1])
@combine(backend=BACKENDS)
def test_sample_run_multiple_circuits(self, backend):
"""Test Sampler.run() with multiple circuits."""
# executes three Bell circuits
# Argument `parameters` is optional.
bell = self._circuit[1]
sampler = BackendSampler(backend=backend)
result = sampler.run([bell, bell, bell]).result()
# print([q.binary_probabilities() for q in result.quasi_dists])
self._compare_probs(result.quasi_dists[0], self._target[1])
self._compare_probs(result.quasi_dists[1], self._target[1])
self._compare_probs(result.quasi_dists[2], self._target[1])
@combine(backend=BACKENDS)
def test_sampler_run_with_parameterized_circuits(self, backend):
"""Test Sampler.run() with parameterized circuits."""
# parameterized circuit
pqc = self._pqc
pqc2 = self._pqc2
theta1, theta2, theta3 = self._theta
sampler = BackendSampler(backend=backend)
result = sampler.run([pqc, pqc, pqc2], [theta1, theta2, theta3]).result()
# result of pqc(theta1)
prob1 = {
"00": 0.1309248462975777,
"01": 0.3608720796028448,
"10": 0.09324865232050054,
"11": 0.41495442177907715,
}
self.assertDictAlmostEqual(result.quasi_dists[0].binary_probabilities(), prob1, delta=0.1)
# result of pqc(theta2)
prob2 = {
"00": 0.06282290651933871,
"01": 0.02877144385576705,
"10": 0.606654494132085,
"11": 0.3017511554928094,
}
self.assertDictAlmostEqual(result.quasi_dists[1].binary_probabilities(), prob2, delta=0.1)
# result of pqc2(theta3)
prob3 = {
"00": 0.1880263994380416,
"01": 0.6881971261189544,
"10": 0.09326232720582443,
"11": 0.030514147237179892,
}
self.assertDictAlmostEqual(result.quasi_dists[2].binary_probabilities(), prob3, delta=0.1)
@combine(backend=BACKENDS)
def test_run_1qubit(self, backend):
"""test for 1-qubit cases"""
qc = QuantumCircuit(1)
qc.measure_all()
qc2 = QuantumCircuit(1)
qc2.x(0)
qc2.measure_all()
sampler = BackendSampler(backend=backend)
result = sampler.run([qc, qc2]).result()
self.assertIsInstance(result, SamplerResult)
self.assertEqual(len(result.quasi_dists), 2)
self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1)
self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1)
@combine(backend=BACKENDS)
def test_run_2qubit(self, backend):
"""test for 2-qubit cases"""
qc0 = QuantumCircuit(2)
qc0.measure_all()
qc1 = QuantumCircuit(2)
qc1.x(0)
qc1.measure_all()
qc2 = QuantumCircuit(2)
qc2.x(1)
qc2.measure_all()
qc3 = QuantumCircuit(2)
qc3.x([0, 1])
qc3.measure_all()
sampler = BackendSampler(backend=backend)
result = sampler.run([qc0, qc1, qc2, qc3]).result()
self.assertIsInstance(result, SamplerResult)
self.assertEqual(len(result.quasi_dists), 4)
self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1)
self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1)
self.assertDictAlmostEqual(result.quasi_dists[2], {2: 1}, 0.1)
self.assertDictAlmostEqual(result.quasi_dists[3], {3: 1}, 0.1)
@combine(backend=BACKENDS)
def test_run_errors(self, backend):
"""Test for errors"""
qc1 = QuantumCircuit(1)
qc1.measure_all()
qc2 = RealAmplitudes(num_qubits=1, reps=1)
qc2.measure_all()
sampler = BackendSampler(backend=backend)
with self.assertRaises(ValueError):
sampler.run([qc1], [[1e2]]).result()
with self.assertRaises(ValueError):
sampler.run([qc2], [[]]).result()
with self.assertRaises(ValueError):
sampler.run([qc2], [[1e2]]).result()
@combine(backend=BACKENDS)
def test_run_empty_parameter(self, backend):
"""Test for empty parameter"""
n = 5
qc = QuantumCircuit(n, n - 1)
qc.measure(range(n - 1), range(n - 1))
sampler = BackendSampler(backend=backend)
with self.subTest("one circuit"):
result = sampler.run([qc], shots=1000).result()
self.assertEqual(len(result.quasi_dists), 1)
for q_d in result.quasi_dists:
quasi_dist = {k: v for k, v in q_d.items() if v != 0.0}
self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1)
self.assertEqual(len(result.metadata), 1)
with self.subTest("two circuits"):
result = sampler.run([qc, qc], shots=1000).result()
self.assertEqual(len(result.quasi_dists), 2)
for q_d in result.quasi_dists:
quasi_dist = {k: v for k, v in q_d.items() if v != 0.0}
self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1)
self.assertEqual(len(result.metadata), 2)
@combine(backend=BACKENDS)
def test_run_numpy_params(self, backend):
"""Test for numpy array as parameter values"""
qc = RealAmplitudes(num_qubits=2, reps=2)
qc.measure_all()
k = 5
params_array = np.random.rand(k, qc.num_parameters)
params_list = params_array.tolist()
params_list_array = list(params_array)
sampler = BackendSampler(backend=backend)
target = sampler.run([qc] * k, params_list).result()
with self.subTest("ndarrary"):
result = sampler.run([qc] * k, params_array).result()
self.assertEqual(len(result.metadata), k)
for i in range(k):
self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1)
with self.subTest("list of ndarray"):
result = sampler.run([qc] * k, params_list_array).result()
self.assertEqual(len(result.metadata), k)
for i in range(k):
self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1)
@combine(backend=BACKENDS)
def test_run_with_shots_option(self, backend):
"""test with shots option."""
params, target = self._generate_params_target([1])
sampler = BackendSampler(backend=backend)
result = sampler.run(
circuits=[self._pqc], parameter_values=params, shots=1024, seed=15
).result()
self._compare_probs(result.quasi_dists, target)
@combine(backend=BACKENDS)
def test_primitive_job_status_done(self, backend):
"""test primitive job's status"""
bell = self._circuit[1]
sampler = BackendSampler(backend=backend)
job = sampler.run(circuits=[bell])
self.assertEqual(job.status(), JobStatus.DONE)
if __name__ == "__main__":
unittest.main()