From 4f5495aa5954880c79b4ab305a9d60507eab2847 Mon Sep 17 00:00:00 2001 From: Ikko Hamamura Date: Thu, 28 Mar 2024 18:32:09 +0900 Subject: [PATCH] Add EstimatorV2 (#2088) * add EstimatorV2 * lint * stds * add simulator_metadata * improve rng * backend_options and run_options --------- Co-authored-by: Jun Doi --- qiskit_aer/primitives/__init__.py | 1 + qiskit_aer/primitives/estimator_v2.py | 146 +++++++++ test/terra/primitives/test_estimator_v2.py | 352 +++++++++++++++++++++ 3 files changed, 499 insertions(+) create mode 100644 qiskit_aer/primitives/estimator_v2.py create mode 100644 test/terra/primitives/test_estimator_v2.py diff --git a/qiskit_aer/primitives/__init__.py b/qiskit_aer/primitives/__init__.py index 503cdadd8..857de5f36 100644 --- a/qiskit_aer/primitives/__init__.py +++ b/qiskit_aer/primitives/__init__.py @@ -32,4 +32,5 @@ Classes """ from .estimator import Estimator +from .estimator_v2 import EstimatorV2 from .sampler import Sampler diff --git a/qiskit_aer/primitives/estimator_v2.py b/qiskit_aer/primitives/estimator_v2.py new file mode 100644 index 000000000..f3a116727 --- /dev/null +++ b/qiskit_aer/primitives/estimator_v2.py @@ -0,0 +1,146 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Estimator V2 implementation for Aer.""" + +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass, field + +import numpy as np +from qiskit.primitives.base import BaseEstimatorV2 +from qiskit.primitives.containers import EstimatorPubLike, PrimitiveResult, PubResult +from qiskit.primitives.containers.estimator_pub import EstimatorPub +from qiskit.primitives.primitive_job import PrimitiveJob +from qiskit.quantum_info import Pauli + +from qiskit_aer import AerSimulator + + +@dataclass +class Options: + """Options for :class:`~.EstimatorV2`.""" + + default_precision: float = 0.0 + """The default precision to use if none are specified in :meth:`~run`. + """ + + backend_options: dict = field(default_factory=dict) + """backend_options: Options passed to AerSimulator.""" + + run_options: dict = field(default_factory=dict) + """run_options: Options passed to run.""" + + +class EstimatorV2(BaseEstimatorV2): + """Evaluates expectation values for provided quantum circuit and observable combinations. + + Run a fast simulation using Aer. + Sampling from a normal distribution ``N(expval, precison)`` when set to ``precision``. + + * ``default_precision``: The default precision to use if none are specified in :meth:`~run`. + Default: 0.0. + + * ``backend_options``: Options passed to AerSimulator. + Default: {}. + + * ``run_options``: Options passed to :meth:`AerSimulator.run`. + Default: {}. + """ + + def __init__( + self, + *, + options: dict | None = None, + ): + """ + Args: + options: The options to control the default precision (``default_precision``), + the backend options (``backend_options``), and + the runtime options (``run_options``). + """ + self._options = Options(**options) if options else Options() + method = "density_matrix" if "noise_model" in self.options.backend_options else "automatic" + self._backend = AerSimulator(method=method, **self.options.backend_options) + + @property + def options(self) -> Options: + """Return the options""" + return self._options + + def run( + self, pubs: Iterable[EstimatorPubLike], *, precision: float | None = None + ) -> PrimitiveJob[PrimitiveResult[PubResult]]: + if precision is None: + precision = self._options.default_precision + coerced_pubs = [EstimatorPub.coerce(pub, precision) for pub in pubs] + self._validate_pubs(coerced_pubs) + job = PrimitiveJob(self._run, coerced_pubs) + job._submit() + return job + + def _validate_pubs(self, pubs: list[EstimatorPub]): + for i, pub in enumerate(pubs): + if pub.precision < 0.0: + raise ValueError( + f"The {i}-th pub has precision less than 0 ({pub.precision}). ", + "But precision should be equal to or larger than 0.", + ) + + def _run(self, pubs: list[EstimatorPub]) -> PrimitiveResult[PubResult]: + return PrimitiveResult([self._run_pub(pub) for pub in pubs]) + + def _run_pub(self, pub: EstimatorPub) -> PubResult: + circuit = pub.circuit.copy() + observables = pub.observables + parameter_values = pub.parameter_values + precision = pub.precision + + # calculate broadcasting of parameters and observables + param_shape = parameter_values.shape + param_indices = np.fromiter(np.ndindex(param_shape), dtype=object).reshape(param_shape) + bc_param_ind, bc_obs = np.broadcast_arrays(param_indices, observables) + + parameter_binds = {} + param_array = parameter_values.as_array(circuit.parameters) + parameter_binds = {p: param_array[..., i].ravel() for i, p in enumerate(circuit.parameters)} + + # save expval + paulis = {pauli for obs_dict in observables.ravel() for pauli in obs_dict.keys()} + for pauli in paulis: + circuit.save_expectation_value( + Pauli(pauli), qubits=range(circuit.num_qubits), label=pauli + ) + result = self._backend.run( + circuit, parameter_binds=[parameter_binds], **self.options.run_options + ).result() + + # calculate expectation values (evs) and standard errors (stds) + rng = np.random.default_rng(self.options.run_options.get("seed_simulator")) + flat_indices = list(param_indices.ravel()) + evs = np.zeros_like(bc_param_ind, dtype=float) + stds = np.full(bc_param_ind.shape, precision) + for index in np.ndindex(*bc_param_ind.shape): + param_index = bc_param_ind[index] + flat_index = flat_indices.index(param_index) + for pauli, coeff in bc_obs[index].items(): + expval = result.data(flat_index)[pauli] + evs[index] += expval * coeff + if precision > 0: + evs = rng.normal(evs, precision) + data_bin_cls = self._make_data_bin(pub) + data_bin = data_bin_cls(evs=evs, stds=stds) + return PubResult( + data_bin, + metadata={"target_precision": precision, "simulator_metadata": result.metadata}, + ) diff --git a/test/terra/primitives/test_estimator_v2.py b/test/terra/primitives/test_estimator_v2.py new file mode 100644 index 000000000..10f06f70b --- /dev/null +++ b/test/terra/primitives/test_estimator_v2.py @@ -0,0 +1,352 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for Estimator V2.""" + +from __future__ import annotations + +import unittest +from test.terra.common import QiskitAerTestCase + +import numpy as np +from qiskit.circuit import Parameter, QuantumCircuit +from qiskit.circuit.library import RealAmplitudes +from qiskit.primitives import StatevectorEstimator +from qiskit.primitives.containers.bindings_array import BindingsArray +from qiskit.primitives.containers.estimator_pub import EstimatorPub +from qiskit.primitives.containers.observables_array import ObservablesArray +from qiskit.quantum_info import SparsePauliOp +from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager + +from qiskit_aer import AerSimulator +from qiskit_aer.primitives import EstimatorV2 + + +class TestEstimatorV2(QiskitAerTestCase): + """Test Estimator""" + + def setUp(self): + super().setUp() + self._precision = 5e-3 + self._rtol = 3e-1 + self._seed = 15 + self._rng = np.random.default_rng(self._seed) + self._options = { + "run_options": {"seed_simulator": self._seed}, + "default_precision": self._precision, + } + self.ansatz = RealAmplitudes(num_qubits=2, reps=2) + self.observable = SparsePauliOp.from_list( + [ + ("II", -1.052373245772859), + ("IZ", 0.39793742484318045), + ("ZI", -0.39793742484318045), + ("ZZ", -0.01128010425623538), + ("XX", 0.18093119978423156), + ] + ) + self.expvals = -1.0284380963435145, -1.284366511861733 + + self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) + self.params = tuple(psi.parameters for psi in self.psi) + self.hamiltonian = ( + SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), + SparsePauliOp.from_list([("IZ", 1)]), + SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), + ) + self.theta = ( + [0, 1, 1, 2, 3, 5], + [0, 1, 1, 2, 3, 5, 8, 13], + [1, 2, 3, 4, 5, 6], + ) + self.backend = AerSimulator() + + def test_estimator_run(self): + """Test Estimator.run()""" + psi1, psi2 = self.psi + hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian + theta1, theta2, theta3 = self.theta + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + psi1, psi2 = pm.run([psi1, psi2]) + estimator = EstimatorV2(options=self._options) + # Specify the circuit and observable by indices. + # calculate [ ] + ham1 = hamiltonian1.apply_layout(psi1.layout) + job = estimator.run([(psi1, ham1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956], rtol=self._rtol) + + # Objects can be passed instead of indices. + # Note that passing objects has an overhead + # since the corresponding indices need to be searched. + # User can append a circuit and observable. + # calculate [ ] + ham1 = hamiltonian1.apply_layout(psi2.layout) + result2 = estimator.run([(psi2, ham1, theta2)]).result() + np.testing.assert_allclose(result2[0].data.evs, [2.97797666], rtol=self._rtol) + + # calculate [ , ] + ham2 = hamiltonian2.apply_layout(psi1.layout) + ham3 = hamiltonian3.apply_layout(psi1.layout) + result3 = estimator.run([(psi1, [ham2, ham3], theta1)]).result() + np.testing.assert_allclose(result3[0].data.evs, [-0.551653, 0.07535239], rtol=self._rtol) + + # calculate [ [, + # ], + # [] ] + ham1 = hamiltonian1.apply_layout(psi1.layout) + ham3 = hamiltonian3.apply_layout(psi1.layout) + ham2 = hamiltonian2.apply_layout(psi2.layout) + result4 = estimator.run( + [ + (psi1, [ham1, ham3], [theta1, theta3]), + (psi2, ham2, theta2), + ] + ).result() + np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318], rtol=self._rtol) + np.testing.assert_allclose(result4[1].data.evs, [0.17849238], rtol=self._rtol) + + def test_estimator_with_pub(self): + """Test estimator with explicit EstimatorPubs.""" + psi1, psi2 = self.psi + hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian + theta1, theta2, theta3 = self.theta + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + psi1, psi2 = pm.run([psi1, psi2]) + + ham1 = hamiltonian1.apply_layout(psi1.layout) + ham3 = hamiltonian3.apply_layout(psi1.layout) + obs1 = ObservablesArray.coerce([ham1, ham3]) + bind1 = BindingsArray.coerce({tuple(psi1.parameters): [theta1, theta3]}) + pub1 = EstimatorPub(psi1, obs1, bind1) + + ham2 = hamiltonian2.apply_layout(psi2.layout) + obs2 = ObservablesArray.coerce(ham2) + bind2 = BindingsArray.coerce({tuple(psi2.parameters): theta2}) + pub2 = EstimatorPub(psi2, obs2, bind2) + + estimator = EstimatorV2(options=self._options) + result4 = estimator.run([pub1, pub2]).result() + np.testing.assert_allclose(result4[0].data.evs, [1.55555728, -1.08766318], rtol=self._rtol) + np.testing.assert_allclose(result4[1].data.evs, [0.17849238], rtol=self._rtol) + + def test_estimator_run_no_params(self): + """test for estimator without parameters""" + circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + circuit = pm.run(circuit) + est = EstimatorV2(options=self._options) + observable = self.observable.apply_layout(circuit.layout) + result = est.run([(circuit, observable)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1.284366511861733], rtol=self._rtol) + + def test_run_single_circuit_observable(self): + """Test for single circuit and single observable case.""" + est = EstimatorV2(options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, target=self.backend.target) + + with self.subTest("No parameter"): + qc = QuantumCircuit(1) + qc.x(0) + qc = pm.run(qc) + op = SparsePauliOp("Z") + op = op.apply_layout(qc.layout) + param_vals = [None, [], [[]], np.array([]), np.array([[]]), [np.array([])]] + target = [-1] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) + self.assertEqual(result[0].metadata["target_precision"], self._precision) + + with self.subTest("One parameter"): + param = Parameter("x") + qc = QuantumCircuit(1) + qc.ry(param, 0) + qc = pm.run(qc) + op = SparsePauliOp("Z") + op = op.apply_layout(qc.layout) + param_vals = [ + [np.pi], + np.array([np.pi]), + ] + target = [-1] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) + self.assertEqual(result[0].metadata["target_precision"], self._precision) + + with self.subTest("More than one parameter"): + qc = self.psi[0] + qc = pm.run(qc) + op = self.hamiltonian[0] + op = op.apply_layout(qc.layout) + param_vals = [ + self.theta[0], + [self.theta[0]], + np.array(self.theta[0]), + np.array([self.theta[0]]), + [np.array(self.theta[0])], + ] + target = [1.5555572817900956] + for val in param_vals: + self.subTest(f"{val}") + result = est.run([(qc, op, val)]).result() + np.testing.assert_allclose(result[0].data.evs, target, rtol=self._rtol) + self.assertEqual(result[0].metadata["target_precision"], self._precision) + + def test_run_1qubit(self): + """Test for 1-qubit cases""" + qc = QuantumCircuit(1) + qc2 = QuantumCircuit(1) + qc2.x(0) + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + qc, qc2 = pm.run([qc, qc2]) + + op = SparsePauliOp.from_list([("I", 1)]) + op2 = SparsePauliOp.from_list([("Z", 1)]) + + est = EstimatorV2(options=self._options) + op_1 = op.apply_layout(qc.layout) + result = est.run([(qc, op_1)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_2 = op2.apply_layout(qc.layout) + result = est.run([(qc, op_2)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_3 = op.apply_layout(qc2.layout) + result = est.run([(qc2, op_3)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_4 = op2.apply_layout(qc2.layout) + result = est.run([(qc2, op_4)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1], rtol=self._rtol) + + def test_run_2qubits(self): + """Test for 2-qubit cases (to check endian)""" + qc = QuantumCircuit(2) + qc2 = QuantumCircuit(2) + qc2.x(0) + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + qc, qc2 = pm.run([qc, qc2]) + + op = SparsePauliOp.from_list([("II", 1)]) + op2 = SparsePauliOp.from_list([("ZI", 1)]) + op3 = SparsePauliOp.from_list([("IZ", 1)]) + + est = EstimatorV2(options=self._options) + op_1 = op.apply_layout(qc.layout) + result = est.run([(qc, op_1)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_2 = op.apply_layout(qc2.layout) + result = est.run([(qc2, op_2)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_3 = op2.apply_layout(qc.layout) + result = est.run([(qc, op_3)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_4 = op2.apply_layout(qc2.layout) + result = est.run([(qc2, op_4)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_5 = op3.apply_layout(qc.layout) + result = est.run([(qc, op_5)]).result() + np.testing.assert_allclose(result[0].data.evs, [1], rtol=self._rtol) + + op_6 = op3.apply_layout(qc2.layout) + result = est.run([(qc2, op_6)]).result() + np.testing.assert_allclose(result[0].data.evs, [-1], rtol=self._rtol) + + def test_run_errors(self): + """Test for errors""" + qc = QuantumCircuit(1) + qc2 = QuantumCircuit(2) + + op = SparsePauliOp.from_list([("I", 1)]) + op2 = SparsePauliOp.from_list([("II", 1)]) + + est = EstimatorV2(options=self._options) + with self.assertRaises(ValueError): + est.run([(qc, op2)]).result() + with self.assertRaises(ValueError): + est.run([(qc, op, [[1e4]])]).result() + with self.assertRaises(ValueError): + est.run([(qc2, op2, [[1, 2]])]).result() + with self.assertRaises(ValueError): + est.run([(qc, [op, op2], [[1]])]).result() + with self.assertRaises(ValueError): + est.run([(qc, op)], precision=-1).result() + with self.assertRaises(ValueError): + est.run([(qc, 1j * op)], precision=0.1).result() + # precision < 0 + with self.assertRaises(ValueError): + est.run([(qc, op, None, -1)]).result() + with self.assertRaises(ValueError): + est.run([(qc, op)], precision=-1).result() + # Comment out after Qiskit 1.0.3 + # with self.subTest("missing []"): + # with self.assertRaisesRegex(ValueError, "An invalid Estimator pub-like was given"): + # _ = est.run((qc, op)).result() + + def test_run_numpy_params(self): + """Test for numpy array as parameter values""" + qc = RealAmplitudes(num_qubits=2, reps=2) + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + qc = pm.run(qc) + op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) + op = op.apply_layout(qc.layout) + k = 5 + params_array = self._rng.random((k, qc.num_parameters)) + params_list = params_array.tolist() + params_list_array = list(params_array) + statevector_estimator = StatevectorEstimator(seed=123) + target = statevector_estimator.run([(qc, op, params_list)]).result() + + estimator = EstimatorV2(options=self._options) + + with self.subTest("ndarrary"): + result = estimator.run([(qc, op, params_array)]).result() + self.assertEqual(result[0].data.evs.shape, (k,)) + np.testing.assert_allclose(result[0].data.evs, target[0].data.evs, rtol=self._rtol) + + with self.subTest("list of ndarray"): + result = estimator.run([(qc, op, params_list_array)]).result() + self.assertEqual(result[0].data.evs.shape, (k,)) + np.testing.assert_allclose(result[0].data.evs, target[0].data.evs, rtol=self._rtol) + + def test_precision(self): + """Test for precision""" + estimator = EstimatorV2(options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, backend=self.backend) + psi1 = pm.run(self.psi[0]) + hamiltonian1 = self.hamiltonian[0].apply_layout(psi1.layout) + theta1 = self.theta[0] + job = estimator.run([(psi1, hamiltonian1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.901141473854881], rtol=self._rtol) + # The result of the second run is the same + job = estimator.run([(psi1, hamiltonian1, [theta1]), (psi1, hamiltonian1, [theta1])]) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.901141473854881], rtol=self._rtol) + np.testing.assert_allclose(result[1].data.evs, [1.901141473854881], rtol=self._rtol) + # apply smaller precision value + job = estimator.run([(psi1, hamiltonian1, [theta1])], precision=self._precision * 0.5) + result = job.result() + np.testing.assert_allclose(result[0].data.evs, [1.5555572817900956], rtol=self._rtol) + + +if __name__ == "__main__": + unittest.main()