qiskit-aer/qiskit_aer/primitives/estimator.py

707 lines
28 KiB
Python

# This code is part of Qiskit.
#
# (C) Copyright IBM 2022, 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
Estimator class.
"""
from __future__ import annotations
from collections import defaultdict
from collections.abc import Sequence
from warnings import warn
import numpy as np
from qiskit.circuit import ParameterExpression, QuantumCircuit
from qiskit.compiler import transpile
from qiskit.primitives import BaseEstimator, EstimatorResult
from qiskit.primitives.primitive_job import PrimitiveJob
from qiskit.primitives.utils import _circuit_key, _observable_key, init_observable
from qiskit.providers import Options
from qiskit.quantum_info import Pauli, PauliList
from qiskit.quantum_info.operators.base_operator import BaseOperator
from qiskit.result.models import ExperimentResult
from qiskit.transpiler import CouplingMap, PassManager
from qiskit.transpiler.passes import (
ApplyLayout,
EnlargeWithAncilla,
FullAncillaAllocation,
Optimize1qGatesDecomposition,
SetLayout,
)
from qiskit.utils import deprecate_func
from .. import AerError, AerSimulator
class Estimator(BaseEstimator):
"""
Aer implmentation of Estimator.
:Run Options:
- **shots** (None or int) --
The number of shots. If None and approximation is True, it calculates the exact
expectation values. Otherwise, it calculates expectation values with sampling.
- **seed** (int) --
Set a fixed seed for the sampling.
.. note::
Precedence of seeding for ``seed_simulator`` is as follows:
1. ``seed_simulator`` in runtime (i.e. in :meth:`run`)
2. ``seed`` in runtime (i.e. in :meth:`run`)
3. ``seed_simulator`` of ``backend_options``.
4. default.
``seed`` is also used for sampling from a normal distribution when approximation is True.
When combined with the approximation option, we get the expectation values as follows:
* shots is None and approximation=False: Return an expectation value with sampling-noise w/
warning.
* shots is int and approximation=False: Return an expectation value with sampling-noise.
* shots is None and approximation=True: Return an exact expectation value.
* shots is int and approximation=True: Return expectation value with sampling-noise using a
normal distribution approximation.
"""
def __init__(
self,
*,
backend_options: dict | None = None,
transpile_options: dict | None = None,
run_options: dict | None = None,
approximation: bool = False,
skip_transpilation: bool = False,
abelian_grouping: bool = True,
):
"""
Args:
backend_options: Options passed to AerSimulator.
transpile_options: Options passed to transpile.
run_options: Options passed to run.
approximation: If True, it calculates expectation values with normal distribution
approximation. Note that this appproximation ignores readout errors.
skip_transpilation: If True, transpilation is skipped.
abelian_grouping: Whether the observable should be grouped into commuting.
If approximation is True, this parameter is ignored and assumed to be False.
"""
warn(
"Estimator has been deprecated as of Aer 0.15, please use EstimatorV2 instead.",
DeprecationWarning,
stacklevel=3,
)
super().__init__(options=run_options)
# These three private attributes used to be created by super, but were deprecated in Qiskit
# 0.46. See https://github.com/Qiskit/qiskit/pull/11051
self._circuits = []
self._parameters = []
self._observables = []
backend_options = {} if backend_options is None else backend_options
method = (
"density_matrix" if approximation and "noise_model" in backend_options else "automatic"
)
self._backend = AerSimulator(method=method)
self._backend.set_options(**backend_options)
self._transpile_options = Options()
if transpile_options is not None:
self._transpile_options.update_options(**transpile_options)
if not approximation:
warn(
"Option approximation=False is deprecated as of qiskit-aer 0.13. "
"It will be removed no earlier than 3 months after the release date. "
"Instead, use BackendEstimator from qiskit.primitives.",
DeprecationWarning,
stacklevel=3,
)
self._approximation = approximation
self._skip_transpilation = skip_transpilation
self._cache: dict[tuple[tuple[int], tuple[int], bool], tuple[dict, dict]] = {}
self._transpiled_circuits: dict[int, QuantumCircuit] = {}
self._layouts: dict[int, list[int]] = {}
self._circuit_ids: dict[tuple, int] = {}
self._observable_ids: dict[tuple, int] = {}
self._abelian_grouping = abelian_grouping
@property
@deprecate_func(
since="0.13",
package_name="qiskit-aer",
is_property=True,
)
def approximation(self):
"""The approximation property"""
return self._approximation
@approximation.setter
@deprecate_func(
since="0.13",
package_name="qiskit-aer",
is_property=True,
)
def approximation(self, approximation):
"""Setter for approximation"""
if not approximation:
warn(
"Option approximation=False is deprecated as of qiskit-aer 0.13. "
"It will be removed no earlier than 3 months after the release date. "
"Instead, use BackendEstimator from qiskit.primitives.",
DeprecationWarning,
stacklevel=3,
)
self._approximation = approximation
def _call(
self,
circuits: Sequence[int],
observables: Sequence[int],
parameter_values: Sequence[Sequence[float]],
**run_options,
) -> EstimatorResult:
seed = run_options.pop("seed", None)
if seed is not None:
run_options.setdefault("seed_simulator", seed)
if self._approximation:
return self._compute_with_approximation(
circuits, observables, parameter_values, run_options, seed
)
else:
return self._compute(circuits, observables, parameter_values, run_options)
def _run(
self,
circuits: Sequence[QuantumCircuit],
observables: Sequence[BaseOperator],
parameter_values: Sequence[Sequence[float]],
**run_options,
) -> PrimitiveJob:
circuit_indices: list = []
for circuit in circuits:
index = self._circuit_ids.get(_circuit_key(circuit))
if index is not None:
circuit_indices.append(index)
else:
circuit_indices.append(len(self._circuits))
self._circuit_ids[_circuit_key(circuit)] = len(self._circuits)
self._circuits.append(circuit)
self._parameters.append(circuit.parameters)
observable_indices: list = []
for observable in observables:
observable = init_observable(observable)
index = self._observable_ids.get(_observable_key(observable))
if index is not None:
observable_indices.append(index)
else:
observable_indices.append(len(self._observables))
self._observable_ids[_observable_key(observable)] = len(self._observables)
self._observables.append(observable)
job = PrimitiveJob(
self._call,
circuit_indices,
observable_indices,
parameter_values,
**run_options,
)
# The public submit method was removed in Qiskit 0.46
(job.submit if hasattr(job, "submit") else job._submit)() # pylint: disable=no-member
return job
def _compute(self, circuits, observables, parameter_values, run_options):
if "shots" in run_options and run_options["shots"] is None:
warn(
"If `shots` is None and `approximation` is False, "
"the number of shots is automatically set to backend options' "
f"shots={self._backend.options.shots}.",
RuntimeWarning,
)
# Key for cache
key = (tuple(circuits), tuple(observables), self._approximation)
# Create expectation value experiments.
if key in self._cache: # Use a cache
experiments_dict, obs_maps = self._cache[key]
exp_map = self._pre_process_params(circuits, observables, parameter_values, obs_maps)
experiments, parameter_binds = self._flatten(experiments_dict, exp_map)
post_processings = self._create_post_processing(
circuits, observables, parameter_values, obs_maps, exp_map
)
else:
self._transpile_circuits(circuits)
circ_obs_map = defaultdict(list)
# Aggregate observables
for circ_ind, obs_ind in zip(circuits, observables):
circ_obs_map[circ_ind].append(obs_ind)
experiments_dict = {}
obs_maps = {} # circ_ind => obs_ind => term_ind (Original Pauli) => basis_ind
# Group and create measurement circuit
for circ_ind, obs_indices in circ_obs_map.items():
pauli_list = sum(
self._observables[obs_ind].paulis for obs_ind in obs_indices
).unique()
if self._abelian_grouping:
pauli_lists = pauli_list.group_commuting(qubit_wise=True)
else:
pauli_lists = [PauliList(pauli) for pauli in pauli_list]
obs_map = defaultdict(list)
for obs_ind in obs_indices:
for pauli in self._observables[obs_ind].paulis:
for basis_ind, pauli_list in enumerate(pauli_lists):
if pauli in pauli_list:
obs_map[obs_ind].append(basis_ind)
break
obs_maps[circ_ind] = obs_map
bases = [_paulis2basis(pauli_list) for pauli_list in pauli_lists]
if len(bases) == 1 and not bases[0].x.any() and not bases[0].z.any(): # identity
break
meas_circuits = [self._create_meas_circuit(basis, circ_ind) for basis in bases]
circuit = (
self._circuits[circ_ind]
if self._skip_transpilation
else self._transpiled_circuits[circ_ind]
)
experiments_dict[circ_ind] = self._combine_circs(circuit, meas_circuits)
self._cache[key] = experiments_dict, obs_maps
exp_map = self._pre_process_params(circuits, observables, parameter_values, obs_maps)
# Flatten
experiments, parameter_binds = self._flatten(experiments_dict, exp_map)
# Create PostProcessing
post_processings = self._create_post_processing(
circuits, observables, parameter_values, obs_maps, exp_map
)
# Run experiments
if experiments:
results = (
self._backend.run(
experiments,
parameter_binds=parameter_binds if any(parameter_binds) else None,
**run_options,
)
.result()
.results
)
else:
results = []
# Post processing (calculate expectation values)
expectation_values, metadata = zip(
*(post_processing.run(results) for post_processing in post_processings)
)
return EstimatorResult(np.real_if_close(expectation_values), list(metadata))
def _pre_process_params(self, circuits, observables, parameter_values, obs_maps):
exp_map = defaultdict(dict) # circ_ind => basis_ind => (parameter, parameter_values)
for circ_ind, obs_ind, param_val in zip(circuits, observables, parameter_values):
self._validate_parameter_length(param_val, circ_ind)
parameter = self._parameters[circ_ind]
for basis_ind in obs_maps[circ_ind][obs_ind]:
if (
circ_ind in exp_map
and basis_ind in exp_map[circ_ind]
and len(self._parameters[circ_ind]) > 0
):
param_vals = exp_map[circ_ind][basis_ind][1]
if param_val not in param_vals:
param_vals.append(param_val)
else:
exp_map[circ_ind][basis_ind] = (parameter, [param_val])
return exp_map
@staticmethod
def _flatten(experiments_dict, exp_map):
experiments_list = []
parameter_binds = []
for circ_ind in experiments_dict:
experiments_list.extend(experiments_dict[circ_ind])
for _, (parameter, param_vals) in exp_map[circ_ind].items():
parameter_binds.extend(
[
{
param: [param_val[i] for param_val in param_vals]
for i, param in enumerate(parameter)
}
]
)
return experiments_list, parameter_binds
def _create_meas_circuit(self, basis: Pauli, circuit_index: int):
qargs = np.arange(basis.num_qubits)[basis.z | basis.x]
meas_circuit = QuantumCircuit(basis.num_qubits, len(qargs))
for clbit, qarg in enumerate(qargs):
if basis.x[qarg]:
if basis.z[qarg]:
meas_circuit.sdg(qarg)
meas_circuit.h(qarg)
meas_circuit.measure(qarg, clbit)
meas_circuit.metadata = {"basis": basis}
if self._skip_transpilation:
return meas_circuit
layout = self._layouts[circuit_index]
passmanager = PassManager([SetLayout(layout)])
opt1q = Optimize1qGatesDecomposition(target=self._backend.target)
passmanager.append(opt1q)
if isinstance(self._backend.coupling_map, CouplingMap):
coupling_map = self._backend.coupling_map
passmanager.append(FullAncillaAllocation(coupling_map))
passmanager.append(EnlargeWithAncilla())
passmanager.append(ApplyLayout())
return passmanager.run(meas_circuit)
@staticmethod
def _combine_circs(circuit: QuantumCircuit, meas_circuits: list[QuantumCircuit]):
circs = []
for meas_circuit in meas_circuits:
new_circ = circuit.copy()
for creg in meas_circuit.cregs:
new_circ.add_register(creg)
new_circ.compose(meas_circuit, inplace=True)
_update_metadata(new_circ, meas_circuit.metadata)
circs.append(new_circ)
return circs
@staticmethod
def _calculate_result_index(circ_ind, obs_ind, term_ind, param_val, obs_maps, exp_map) -> int:
basis_ind = obs_maps[circ_ind][obs_ind][term_ind]
result_index = 0
for _circ_ind, basis_map in exp_map.items():
for _basis_ind, (_, (_, param_vals)) in enumerate(basis_map.items()):
if circ_ind == _circ_ind and basis_ind == _basis_ind:
result_index += param_vals.index(param_val)
return result_index
result_index += len(param_vals)
raise AerError("Bug. Please report from issue: https://github.com/Qiskit/qiskit-aer/issues")
def _create_post_processing(
self, circuits, observables, parameter_values, obs_maps, exp_map
) -> list[_PostProcessing]:
post_processings = []
for circ_ind, obs_ind, param_val in zip(circuits, observables, parameter_values):
result_indices: list[int | None] = []
paulis = []
coeffs = []
observable = self._observables[obs_ind]
for term_ind, (pauli, coeff) in enumerate(zip(observable.paulis, observable.coeffs)):
# Identity
if not pauli.x.any() and not pauli.z.any():
result_indices.append(None)
paulis.append(PauliList(pauli))
coeffs.append([coeff])
continue
result_index = self._calculate_result_index(
circ_ind, obs_ind, term_ind, param_val, obs_maps, exp_map
)
if result_index in result_indices:
i = result_indices.index(result_index)
paulis[i] += pauli
coeffs[i].append(coeff)
else:
result_indices.append(result_index)
paulis.append(PauliList(pauli))
coeffs.append([coeff])
post_processings.append(_PostProcessing(result_indices, paulis, coeffs))
return post_processings
def _compute_with_approximation(
self, circuits, observables, parameter_values, run_options, seed
):
# Key for cache
key = (tuple(circuits), tuple(observables), self._approximation)
shots = run_options.pop("shots", None)
# Create expectation value experiments.
if key in self._cache: # Use a cache
parameter_binds = defaultdict(dict)
for i, j, value in zip(circuits, observables, parameter_values):
self._validate_parameter_length(value, i)
for k, v in zip(self._parameters[i], value):
if k in parameter_binds[(i, j)]:
parameter_binds[(i, j)][k].append(v)
else:
parameter_binds[(i, j)][k] = [v]
experiment_manager = self._cache[key]
experiment_manager.parameter_binds = list(parameter_binds.values())
else:
self._transpile_circuits(circuits)
experiment_manager = _ExperimentManager()
for i, j, value in zip(circuits, observables, parameter_values):
self._validate_parameter_length(value, i)
if (i, j) in experiment_manager.keys:
key_index = experiment_manager.keys.index((i, j))
circuit = experiment_manager.experiment_circuits[key_index]
else:
circuit = (
self._circuits[i].copy()
if self._skip_transpilation
else self._transpiled_circuits[i].copy()
)
observable = self._observables[j]
if shots is None:
circuit.save_expectation_value(observable, self._layouts[i])
else:
for term_ind, pauli in enumerate(observable.paulis):
circuit.save_expectation_value(
pauli, self._layouts[i], label=str(term_ind)
)
experiment_manager.append(
key=(i, j),
parameter_bind=dict(zip(self._parameters[i], value)),
experiment_circuit=circuit,
)
self._cache[key] = experiment_manager
result = self._backend.run(
experiment_manager.experiment_circuits,
parameter_binds=experiment_manager.parameter_binds,
**run_options,
).result()
# Post processing (calculate expectation values)
if shots is None:
expectation_values = [
result.data(i)["expectation_value"] for i in experiment_manager.experiment_indices
]
metadata = [
{"simulator_metadata": result.results[i].metadata}
for i in experiment_manager.experiment_indices
]
else:
expectation_values = []
rng = np.random.default_rng(seed)
metadata = []
experiment_indices = experiment_manager.experiment_indices
for i in range(len(experiment_manager)):
combined_expval = 0.0
combined_var = 0.0
result_index = experiment_indices[i]
observable_key = experiment_manager.get_observable_key(i)
coeffs = np.real_if_close(self._observables[observable_key].coeffs)
for term_ind, expval in result.data(result_index).items():
var = 1 - expval**2
coeff = coeffs[int(term_ind)]
combined_expval += expval * coeff
combined_var += var * coeff**2
# Sampling from normal distribution
standard_error = np.sqrt(combined_var / shots)
expectation_values.append(rng.normal(combined_expval, standard_error))
metadata.append(
{
"variance": np.real_if_close(combined_var).item(),
"shots": shots,
"simulator_metadata": result.results[result_index].metadata,
}
)
return EstimatorResult(np.real_if_close(expectation_values), metadata)
def _validate_parameter_length(self, parameter, circuit_index):
if len(parameter) != len(self._parameters[circuit_index]):
raise ValueError(
f"The number of values ({len(parameter)}) does not match "
f"the number of parameters ({len(self._parameters[circuit_index])})."
)
def _transpile(self, circuits):
if self._skip_transpilation:
return circuits
return transpile(circuits, self._backend, **self._transpile_options.__dict__)
def _transpile_circuits(self, circuits):
if self._skip_transpilation:
for i in set(circuits):
num_qubits = self._circuits[i].num_qubits
self._layouts[i] = list(range(num_qubits))
return
for i in set(circuits):
if i not in self._transpiled_circuits:
circuit = self._circuits[i].copy()
circuit.measure_all()
num_qubits = circuit.num_qubits
self._backend.set_max_qubits(num_qubits)
circuit = self._transpile(circuit)
bit_map = {bit: index for index, bit in enumerate(circuit.qubits)}
layout = [bit_map[qr[0]] for _, qr, _ in circuit[-num_qubits:]]
circuit.remove_final_measurements()
self._transpiled_circuits[i] = circuit
self._layouts[i] = layout
def _expval_with_variance(counts) -> tuple[float, float]:
denom = 0
expval = 0.0
for bin_outcome, freq in counts.items():
outcome = int(bin_outcome, 16)
denom += freq
expval += freq * (-1) ** bin(outcome).count("1")
# Divide by total shots
expval /= denom
# Compute variance
variance = 1 - expval**2
return expval, variance
class _PostProcessing:
def __init__(
self,
result_indices: list[int],
paulis: list[PauliList],
coeffs: list[list[float]],
):
self._result_indices = result_indices
self._paulis = paulis
self._coeffs = [np.array(c) for c in coeffs]
def run(self, results: list[ExperimentResult]) -> tuple[float, dict]:
"""Coumpute expectation value.
Args:
results: list of ExperimentResult.
Returns:
tuple of an expectation value and metadata.
"""
combined_expval = 0.0
combined_var = 0.0
simulator_metadata = []
for c_i, paulis, coeffs in zip(self._result_indices, self._paulis, self._coeffs):
if c_i is None:
# Observable is identity
expvals, variances = np.array([1], dtype=complex), np.array([0], dtype=complex)
shots = 0
else:
result = results[c_i]
count = result.data.counts
shots = sum(count.values())
basis = result.header.metadata["basis"]
indices = np.where(basis.z | basis.x)[0]
measured_paulis = PauliList.from_symplectic(
paulis.z[:, indices], paulis.x[:, indices], 0
)
expvals, variances = _pauli_expval_with_variance(count, measured_paulis)
simulator_metadata.append(result.metadata)
combined_expval += np.dot(expvals, coeffs)
combined_var += np.dot(variances, coeffs**2)
metadata = {
"shots": shots,
"variance": np.real_if_close(combined_var).item(),
"simulator_metadata": simulator_metadata,
}
return combined_expval, metadata
def _update_metadata(circuit: QuantumCircuit, metadata: dict) -> QuantumCircuit:
if circuit.metadata:
circuit.metadata.update(metadata)
else:
circuit.metadata = metadata
return circuit
def _pauli_expval_with_variance(counts: dict, paulis: PauliList) -> tuple[np.ndarray, np.ndarray]:
# Diag indices
size = len(paulis)
diag_inds = _paulis2inds(paulis)
expvals = np.zeros(size, dtype=float)
denom = 0 # Total shots for counts dict
for hex_outcome, freq in counts.items():
outcome = int(hex_outcome, 16)
denom += freq
for k in range(size):
coeff = (-1) ** _parity(diag_inds[k] & outcome)
expvals[k] += freq * coeff
# Divide by total shots
expvals /= denom
variances = 1 - expvals**2
return expvals, variances
def _paulis2inds(paulis: PauliList) -> list[int]:
nonid = paulis.z | paulis.x
packed_vals = np.packbits(nonid, axis=1, bitorder="little").astype( # pylint:disable=no-member
object
)
power_uint8 = 1 << (8 * np.arange(packed_vals.shape[1], dtype=object))
inds = packed_vals @ power_uint8
return inds.tolist()
def _parity(integer: int) -> int:
"""Return the parity of an integer"""
return bin(integer).count("1") % 2
def _paulis2basis(paulis: PauliList) -> Pauli:
return Pauli(
(
np.logical_or.reduce(paulis.z), # pylint:disable=no-member
np.logical_or.reduce(paulis.x), # pylint:disable=no-member
)
)
class _ExperimentManager:
def __init__(self):
self.keys: list[tuple[int, int]] = []
self.experiment_circuits: list[QuantumCircuit] = []
self.parameter_binds: list[dict[ParameterExpression, list[float]]] = []
self._input_indices: list[list[int]] = []
self._num_experiment: int = 0
def __len__(self):
return self._num_experiment
@property
def experiment_indices(self):
"""indices of experiments"""
return np.argsort(sum(self._input_indices, [])).tolist()
def append(
self,
key: tuple[int, int],
parameter_bind: dict[ParameterExpression, float],
experiment_circuit: QuantumCircuit,
):
"""append experiments"""
if key in self.keys and parameter_bind:
key_index = self.keys.index(key)
for k, vs in self.parameter_binds[key_index].items():
vs.append(parameter_bind[k])
self._input_indices[key_index].append(self._num_experiment)
else:
self.experiment_circuits.append(experiment_circuit)
self.keys.append(key)
self.parameter_binds.append({k: [v] for k, v in parameter_bind.items()})
self._input_indices.append([self._num_experiment])
self._num_experiment += 1
def get_observable_key(self, index):
"""return key of observables"""
for i, inputs in enumerate(self._input_indices):
if index in inputs:
return self.keys[i][1]
raise AerError("Unexpected behavior.")