qiskit-aer/qiskit_aer/backends/aerbackend.py

698 lines
28 KiB
Python

# This code is part of Qiskit.
#
# (C) Copyright IBM 2018, 2019.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
Aer qasm simulator backend.
"""
import copy
import datetime
import logging
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from qiskit.circuit import QuantumCircuit, ParameterExpression, Delay
from qiskit.providers import BackendV2 as Backend
from qiskit.providers.models.backendstatus import BackendStatus
from qiskit.pulse import Schedule, ScheduleBlock
from qiskit.result import Result
from qiskit.transpiler import CouplingMap
from qiskit.transpiler.target import Target
from ..aererror import AerError
from ..jobs import AerJob
from ..noise.noise_model import NoiseModel, QuantumErrorLocation
from ..noise.errors.base_quantum_error import QuantumChannelInstruction
from .aer_compiler import compile_circuit, assemble_circuits, generate_aer_config
from .backend_utils import format_save_type, circuit_optypes
from .name_mapping import NAME_MAPPING
# pylint: disable=import-error, no-name-in-module, abstract-method
from .controller_wrappers import AerConfig
# Logger
logger = logging.getLogger(__name__)
class AerBackend(Backend, ABC):
"""Aer Backend class."""
def __init__(
self, configuration, properties=None, provider=None, target=None, backend_options=None
):
"""Aer class for backends.
This method should initialize the module and its configuration, and
raise an exception if a component of the module is
not available.
Args:
configuration (AerBackendConfiguration): backend configuration.
properties (AerBackendProperties or None): Optional, backend properties.
provider (Provider): Optional, provider responsible for this backend.
target (Target): initial target for backend
backend_options (dict or None): Optional set custom backend options.
Raises:
AerError: if there is no name in the configuration
"""
# Init configuration and provider in Backend
super().__init__(
provider=provider,
name=configuration.backend_name,
description=configuration.description,
backend_version=configuration.backend_version,
)
# Initialize backend configuration
self._properties = properties
self._configuration = configuration
# Custom option values for config
self._options_configuration = {}
self._options_properties = {}
self._target = target
self._mapping = NAME_MAPPING
if target is not None:
self._from_backend = True
else:
self._from_backend = False
# Set options from backend_options dictionary
if backend_options is not None:
self.set_options(**backend_options)
# build coupling map
if self.configuration().coupling_map is not None:
self._coupling_map = CouplingMap(self.configuration().coupling_map)
def _convert_circuit_binds(self, circuit, binds, idx_map):
parameterizations = []
def append_param_values(index, bind_pos, param):
if param in binds:
parameterizations.append([(index, bind_pos), binds[param]])
elif isinstance(param, ParameterExpression):
# If parameter expression has no unbound parameters
# it's already bound and should be skipped
if not param.parameters:
return
if not binds:
raise AerError("The element of parameter_binds is empty.")
len_vals = len(next(iter(binds.values())))
bind_list = [
{
parameter: binds[parameter][i]
for parameter in param.parameters & binds.keys()
}
for i in range(len_vals)
]
bound_values = [float(param.bind(x)) for x in bind_list]
parameterizations.append([(index, bind_pos), bound_values])
append_param_values(AerConfig.GLOBAL_PHASE_POS, -1, circuit.global_phase)
for index, instruction in enumerate(circuit.data):
if instruction.operation.is_parameterized():
for bind_pos, param in enumerate(instruction.operation.params):
append_param_values(idx_map[index] if idx_map else index, bind_pos, param)
return parameterizations
def _convert_binds(self, circuits, parameter_binds, idx_maps=None):
if isinstance(circuits, QuantumCircuit):
if len(parameter_binds) > 1:
raise AerError("More than 1 parameter table provided for a single circuit")
return [self._convert_circuit_binds(circuits, parameter_binds[0], None)]
elif len(parameter_binds) != len(circuits):
raise AerError(
"Number of input circuits does not match number of input "
"parameter bind dictionaries"
)
parameterizations = [
self._convert_circuit_binds(circuit, parameter_binds[idx], idx_maps[idx])
for idx, circuit in enumerate(circuits)
]
return parameterizations
# pylint: disable=arguments-renamed
def run(self, circuits, parameter_binds=None, **run_options):
"""Run circuits on the backend.
Args:
circuits (QuantumCircuit or list): The QuantumCircuit (or list
of QuantumCircuit objects) to run
parameter_binds (list): A list of parameter binding dictionaries.
See additional information (default: None).
run_options (kwargs): additional run time backend options.
Returns:
AerJob: The simulation job.
Raises:
TypeError: If ``parameter_binds`` is specified with an input or
has a length mismatch with the number of circuits.
Additional Information:
* Each parameter binding dictionary is of the form::
{
param_a: [val_1, val_2],
param_b: [val_3, val_1],
}
for all parameters in that circuit. The length of the value
list must be the same for all parameters, and the number of
parameter dictionaries in the list must match the length of
``circuits`` (if ``circuits`` is a single ``QuantumCircuit``
object it should a list of length 1).
* kwarg options specified in ``run_options`` will temporarily override
any set options of the same name for the current run.
Raises:
ValueError: if run is not implemented
"""
if isinstance(circuits, (QuantumCircuit, Schedule, ScheduleBlock)):
circuits = [circuits]
return self._run_circuits(circuits, parameter_binds, **run_options)
def _run_circuits(self, circuits, parameter_binds, **run_options):
"""Run circuits by generating native circuits."""
# Submit job
job_id = str(uuid.uuid4())
aer_job = AerJob(
self,
job_id,
self._execute_circuits_job,
parameter_binds=parameter_binds,
circuits=circuits,
run_options=run_options,
)
aer_job.submit()
return aer_job
def configuration(self):
"""Return the simulator backend configuration.
Returns:
BackendConfiguration: the configuration for the backend.
"""
config = copy.copy(self._configuration)
for key, val in self._options_configuration.items():
setattr(config, key, val)
# If config has custom instructions add them to
# basis gates to include them for the qiskit transpiler
if hasattr(config, "custom_instructions"):
config.basis_gates = config.basis_gates + config.custom_instructions
return config
def properties(self):
"""Return the simulator backend properties if set.
Returns:
BackendProperties: The backend properties or ``None`` if the
backend does not have properties set.
"""
properties = copy.copy(self._properties)
for key, val in self._options_properties.items():
setattr(properties, key, val)
return properties
@property
def max_circuits(self):
if hasattr(self.configuration(), "max_experiments"):
return self.configuration().max_experiments
else:
return None
@property
def target(self):
if self._from_backend:
return self._target
# make target for AerBackend
# importing packages where they are needed, to avoid cyclic-import.
# pylint: disable=cyclic-import
from qiskit.transpiler.target import InstructionProperties
from qiskit.circuit.controlflow import ForLoopOp, IfElseOp, SwitchCaseOp, WhileLoopOp
from qiskit.circuit.library.standard_gates import get_standard_gate_name_mapping
from qiskit.circuit.parameter import Parameter
from qiskit.circuit.gate import Gate
from qiskit.circuit.controlflow import CONTROL_FLOW_OP_NAMES
from qiskit.providers.backend import QubitProperties
required = ["measure", "delay"]
configuration = self.configuration()
properties = self.properties()
# Load Qiskit object representation
qiskit_inst_mapping = get_standard_gate_name_mapping()
qiskit_inst_mapping.update(NAME_MAPPING)
qiskit_control_flow_mapping = {
"if_else": IfElseOp,
"while_loop": WhileLoopOp,
"for_loop": ForLoopOp,
"switch_case": SwitchCaseOp,
}
in_data = {"num_qubits": configuration.num_qubits}
# Parse global configuration properties
if hasattr(configuration, "dt"):
in_data["dt"] = configuration.dt
if hasattr(configuration, "timing_constraints"):
in_data.update(configuration.timing_constraints)
# Create instruction property placeholder from backend configuration
basis_gates = set(getattr(configuration, "basis_gates", []))
supported_instructions = set(getattr(configuration, "supported_instructions", []))
gate_configs = {gate.name: gate for gate in configuration.gates}
all_instructions = set.union(
basis_gates, set(required), supported_instructions.intersection(CONTROL_FLOW_OP_NAMES)
)
inst_name_map = {} # type: Dict[str, Instruction]
faulty_ops = set()
faulty_qubits = set()
unsupported_instructions = []
# Create name to Qiskit instruction object repr mapping
for name in all_instructions:
if name in qiskit_control_flow_mapping:
continue
if name in qiskit_inst_mapping:
inst_name_map[name] = qiskit_inst_mapping[name]
elif name in gate_configs:
this_config = gate_configs[name]
params = list(map(Parameter, getattr(this_config, "parameters", [])))
coupling_map = getattr(this_config, "coupling_map", [])
inst_name_map[name] = Gate(
name=name,
num_qubits=len(coupling_map[0]) if coupling_map else 0,
params=params,
)
else:
warnings.warn(
f"No gate definition for {name} can be found and is being excluded "
"from the generated target.",
RuntimeWarning,
)
unsupported_instructions.append(name)
for name in unsupported_instructions:
all_instructions.remove(name)
# Create inst properties placeholder
# Without any assignment, properties value is None,
# which defines a global instruction that can be applied to any qubit(s).
# The None value behaves differently from an empty dictionary.
# See API doc of Target.add_instruction for details.
prop_name_map = dict.fromkeys(all_instructions)
for name in all_instructions:
if name in gate_configs:
if coupling_map := getattr(gate_configs[name], "coupling_map", None):
# Respect operational qubits that gate configuration defines
# This ties instruction to particular qubits even without properties information.
# Note that each instruction is considered to be ideal unless
# its spec (e.g. error, duration) is bound by the properties object.
prop_name_map[name] = dict.fromkeys(map(tuple, coupling_map))
# Populate instruction properties
if properties:
def _get_value(prop_dict, prop_name):
if ndval := prop_dict.get(prop_name, None):
return ndval[0]
return None
# is_qubit_operational is a bit of expensive operation so precache the value
faulty_qubits = {
q for q in range(configuration.num_qubits) if not properties.is_qubit_operational(q)
}
qubit_properties = []
for qi in range(0, configuration.num_qubits):
# TODO faulty qubit handling might be needed since
# faulty qubit reporting qubit properties doesn't make sense.
try:
prop_dict = properties.qubit_property(qubit=qi)
except KeyError:
continue
qubit_properties.append(
QubitProperties(
t1=prop_dict.get("T1", (None, None))[0],
t2=prop_dict.get("T2", (None, None))[0],
frequency=prop_dict.get("frequency", (None, None))[0],
)
)
in_data["qubit_properties"] = qubit_properties
for name in all_instructions:
for qubits, params in properties.gate_property(name).items():
if set.intersection(
faulty_qubits, qubits
) or not properties.is_gate_operational(name, qubits):
try:
# Qubits might be pre-defined by the gate config
# However properties objects says the qubits is non-operational
del prop_name_map[name][qubits]
except KeyError:
pass
faulty_ops.add((name, qubits))
continue
if prop_name_map[name] is None:
prop_name_map[name] = {}
prop_name_map[name][qubits] = InstructionProperties(
error=_get_value(params, "gate_error"),
duration=_get_value(params, "gate_length"),
)
if isinstance(prop_name_map[name], dict) and any(
v is None for v in prop_name_map[name].values()
):
# Properties provides gate properties only for subset of qubits
# Associated qubit set might be defined by the gate config here
logger.info(
"Gate properties of instruction %s are not provided for every qubits. "
"This gate is ideal for some qubits and the rest is with finite error. "
"Created backend target may confuse error-aware circuit optimization.",
name,
)
# Measure instruction property is stored in qubit property
prop_name_map["measure"] = {}
for qubit_idx in range(configuration.num_qubits):
if qubit_idx in faulty_qubits:
continue
qubit_prop = properties.qubit_property(qubit_idx)
prop_name_map["measure"][(qubit_idx,)] = InstructionProperties(
error=_get_value(qubit_prop, "readout_error"),
duration=_get_value(qubit_prop, "readout_length"),
)
for op in required:
# Map required ops to each operational qubit
if prop_name_map[op] is None:
prop_name_map[op] = {
(q,): None for q in range(configuration.num_qubits) if q not in faulty_qubits
}
# Add parsed properties to target
target = Target(**in_data)
for inst_name in all_instructions:
if inst_name in qiskit_control_flow_mapping:
# Control flow operator doesn't have gate property.
target.add_instruction(
instruction=qiskit_control_flow_mapping[inst_name],
name=inst_name,
)
else:
target.add_instruction(
instruction=inst_name_map[inst_name],
properties=prop_name_map.get(inst_name, None),
name=inst_name,
)
if self._coupling_map is not None:
target._coupling_graph = self._coupling_map.graph.copy()
return target
def set_max_qubits(self, max_qubits):
"""Set maximun number of qubits to be used for this backend."""
if not self._from_backend:
self._configuration.n_qubits = max_qubits
self._set_configuration_option("n_qubits", max_qubits)
def clear_options(self):
"""Reset the simulator options to default values."""
self._options = self._default_options()
self._options_configuration = {}
self._options_properties = {}
def status(self):
"""Return backend status.
Returns:
BackendStatus: the status of the backend.
"""
return BackendStatus(
backend_name=self.name,
backend_version=self.configuration().backend_version,
operational=True,
pending_jobs=0,
status_msg="",
)
def _execute_circuits_job(
self, circuits, parameter_binds, run_options, job_id="", format_result=True
):
"""Run a job"""
# Start timer
start = time.time()
# Compile circuits
circuits, noise_model = self._compile(circuits, **run_options)
if self._target is not None:
aer_circuits, idx_maps = assemble_circuits(circuits, self.configuration().basis_gates)
else:
aer_circuits, idx_maps = assemble_circuits(circuits)
if parameter_binds:
run_options["parameterizations"] = self._convert_binds(
circuits, parameter_binds, idx_maps
)
elif not all(len(circuit.parameters) == 0 for circuit in circuits):
raise AerError("circuits have parameters but parameter_binds is not specified.")
for circ_id, aer_circuit in enumerate(aer_circuits):
aer_circuit.circ_id = circ_id
config = generate_aer_config(circuits, self.options, **run_options)
# Run simulation
metadata_map = {
aer_circuit.circ_id: circuit.metadata
for aer_circuit, circuit in zip(aer_circuits, circuits)
}
output = self._execute_circuits(aer_circuits, noise_model, config)
# Validate output
if not isinstance(output, dict):
logger.error("%s: simulation failed.", self.name)
if output:
logger.error("Output: %s", output)
raise AerError("simulation terminated without returning valid output.")
# Format results
output["job_id"] = job_id
output["date"] = datetime.datetime.now().isoformat()
output["backend_name"] = self.name
output["backend_version"] = self.configuration().backend_version
# Push metadata to experiment headers
for result in output["results"]:
if "header" not in result:
continue
result["header"]["metadata"] = metadata_map[result.pop("circ_id")]
# Add execution time
output["time_taken"] = time.time() - start
# Display warning if simulation failed
if not output.get("success", False):
msg = "Simulation failed"
if "status" in output:
msg += f" and returned the following error message:\n{output['status']}"
logger.warning(msg)
if format_result:
return self._format_results(output)
return output
@staticmethod
def _format_results(output):
"""Format C++ simulator output for constructing Result"""
for result in output["results"]:
data = result.get("data", {})
metadata = result.get("metadata", {})
save_types = metadata.get("result_types", {})
save_subtypes = metadata.get("result_subtypes", {})
for key, val in data.items():
if key in save_types:
data[key] = format_save_type(val, save_types[key], save_subtypes[key])
return Result.from_dict(output)
def _compile(self, circuits, **run_options):
"""Compile circuits and noise model"""
if isinstance(circuits, (QuantumCircuit, Schedule, ScheduleBlock)):
circuits = [circuits]
optypes = [circuit_optypes(circ) for circ in circuits]
# Compile Qasm3 instructions
circuits, optypes = compile_circuit(circuits, optypes=optypes)
# run option noise model
circuits, noise_model, run_options = self._assemble_noise_model(
circuits, optypes, **run_options
)
return circuits, noise_model
def _assemble_noise_model(self, circuits, optypes, **run_options):
"""Move quantum error instructions from circuits to noise model"""
# Make a shallow copy so we can modify list elements if required
run_circuits = copy.copy(circuits)
# Flag for if we need to make a deep copy of the noise model
# This avoids unnecessarily copying the noise model for circuits
# that do not contain a quantum error
updated_noise = False
noise_model = run_options.get("noise_model", getattr(self.options, "noise_model", None))
# Add custom pass noise only to QuantumCircuit objects that contain delay
# instructions since this is the only instruction handled by the noise pass
# at present
if noise_model and all(isinstance(circ, QuantumCircuit) for circ in run_circuits):
npm = noise_model._pass_manager()
if npm is not None:
# Get indicies of circuits that need noise transpiling
transpile_idxs = [idx for idx, optype in enumerate(optypes) if Delay in optype]
# Transpile only the required circuits
transpiled_circuits = npm.run([run_circuits[i] for i in transpile_idxs])
if isinstance(transpiled_circuits, QuantumCircuit):
transpiled_circuits = [transpiled_circuits]
# Update the circuits with transpiled ones
for idx, circ in zip(transpile_idxs, transpiled_circuits):
run_circuits[idx] = circ
optypes[idx] = circuit_optypes(circ)
# Check if circuits contain quantum error instructions
for idx, circ in enumerate(run_circuits):
if QuantumChannelInstruction in optypes[idx] and not isinstance(
circ, (Schedule, ScheduleBlock)
):
updated_circ = False
new_data = []
for datum in circ.data:
inst, qargs, cargs = datum.operation, datum.qubits, datum.clbits
if isinstance(inst, QuantumChannelInstruction):
updated_circ = True
if not updated_noise:
# Deep copy noise model on first update
if noise_model is None:
noise_model = NoiseModel()
else:
noise_model = copy.deepcopy(noise_model)
updated_noise = True
# Extract error and replace with place holder
qerror = inst._quantum_error
qerror_loc = QuantumErrorLocation(qerror)
new_data.append((qerror_loc, qargs, cargs))
optypes[idx].add(QuantumErrorLocation)
# Add error to noise model
if qerror.id not in noise_model._default_quantum_errors:
noise_model.add_all_qubit_quantum_error(qerror, qerror.id)
else:
new_data.append((inst, qargs, cargs))
if updated_circ:
new_circ = circ.copy()
new_circ.data = new_data
run_circuits[idx] = new_circ
optypes[idx].discard(QuantumChannelInstruction)
# Return the possibly updated circuits and noise model
return run_circuits, noise_model, run_options
def _get_executor(self, **run_options):
"""Get the executor"""
if "executor" in run_options:
return run_options["executor"]
else:
return getattr(self._options, "executor", None)
@abstractmethod
def _execute_circuits(self, aer_circuits, noise_model, config):
"""Execute aer circuits on the backend.
Args:
aer_circuits (List of AerCircuit): simulator input.
noise_model (NoiseModel): noise model
config (Dict): configuration for simulation
Returns:
dict: return a dictionary of results.
"""
pass
def set_option(self, key, value):
"""Special handling for setting backend options.
This method should be extended by sub classes to
update special option values.
Args:
key (str): key to update
value (any): value to update.
Raises:
AerError: if key is 'method' and val isn't in available methods.
"""
# Add all other options to the options dict
# TODO: in the future this could be replaced with an options class
# for the simulators like configuration/properties to show all
# available options
if hasattr(self._configuration, key):
self._set_configuration_option(key, value)
elif hasattr(self._properties, key):
self._set_properties_option(key, value)
else:
if not hasattr(self._options, key):
raise AerError(f"Invalid option {key}")
if value is not None:
# Only add an option if its value is not None
setattr(self._options, key, value)
else:
# If setting an existing option to None reset it to default
# this is for backwards compatibility when setting it to None would
# remove it from the options dict
setattr(self._options, key, getattr(self._default_options(), key))
def set_options(self, **fields):
"""Set the simulator options"""
for key, value in fields.items():
self.set_option(key, value)
def _set_configuration_option(self, key, value):
"""Special handling for setting backend configuration options."""
if value is not None:
self._options_configuration[key] = value
elif key in self._options_configuration:
self._options_configuration.pop(key)
def _set_properties_option(self, key, value):
"""Special handling for setting backend properties options."""
if value is not None:
self._options_properties[key] = value
elif key in self._options_properties:
self._options_properties.pop(key)
def __repr__(self):
"""String representation of an AerBackend."""
name = self.__class__.__name__
display = f"'{self.name}'"
return f"{name}({display})"