179 lines
5.8 KiB
Python
179 lines
5.8 KiB
Python
from qiskit_aer import AerSimulator
|
|
import logging
|
|
from typing import Optional
|
|
import time
|
|
import numpy as np
|
|
from scipy.optimize import minimize
|
|
|
|
from qiskit import QuantumCircuit
|
|
from qiskit_ibm_runtime import (
|
|
EstimatorV2 as Estimator,
|
|
SamplerV2 as Sampler,
|
|
QiskitRuntimeService,
|
|
Session,
|
|
)
|
|
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
|
|
from qiskit_serverless import (
|
|
distribute_task,
|
|
get_arguments,
|
|
get,
|
|
save_result,
|
|
)
|
|
|
|
def build_callback(ansatz, hamiltonian, estimator, callback_dict):
|
|
"""Return callback function that uses Estimator instance,
|
|
and stores intermediate values into a dictionary.
|
|
|
|
Parameters:
|
|
ansatz (QuantumCircuit): Parameterized ansatz circuit
|
|
hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
|
|
estimator (Estimator): Estimator primitive instance
|
|
callback_dict (dict): Mutable dict for storing values
|
|
|
|
Returns:
|
|
Callable: Callback function object
|
|
"""
|
|
|
|
def callback(current_vector):
|
|
"""Callback function storing previous solution vector,
|
|
computing the intermediate cost value, and displaying number
|
|
of completed iterations and average time per iteration.
|
|
|
|
Values are stored in pre-defined 'callback_dict' dictionary.
|
|
|
|
Parameters:
|
|
current_vector (ndarray): Current vector of parameters
|
|
returned by optimizer
|
|
"""
|
|
# Keep track of the number of iterations
|
|
callback_dict["iters"] += 1
|
|
# Set the prev_vector to the latest one
|
|
callback_dict["prev_vector"] = current_vector
|
|
# Compute the value of the cost function at the current vector
|
|
callback_dict["cost_history"].append(
|
|
estimator.run([(ansatz, hamiltonian, current_vector)])
|
|
.result()[0]
|
|
)
|
|
# Grab the current time
|
|
current_time = time.perf_counter()
|
|
# Find the total time of the execute (after the 1st iteration)
|
|
if callback_dict["iters"] > 1:
|
|
callback_dict["_total_time"] += current_time - callback_dict["_prev_time"]
|
|
# Set the previous time to the current time
|
|
callback_dict["_prev_time"] = current_time
|
|
# Compute the average time per iteration and round it
|
|
time_str = (
|
|
round(callback_dict["_total_time"] / (callback_dict["iters"] - 1), 2)
|
|
if callback_dict["_total_time"]
|
|
else "-"
|
|
)
|
|
# Print to screen on single line
|
|
print(
|
|
"Iters. done: {} [Avg. time per iter: {}]".format(
|
|
callback_dict["iters"], time_str
|
|
),
|
|
end="\r",
|
|
flush=True,
|
|
)
|
|
|
|
return callback
|
|
|
|
|
|
def cost_func(params, ansatz, hamiltonian, estimator):
|
|
"""Return estimate of energy from estimator
|
|
|
|
Parameters:
|
|
params (ndarray): Array of ansatz parameters
|
|
ansatz (QuantumCircuit): Parameterized ansatz circuit
|
|
hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
|
|
estimator (Estimator): Estimator primitive instance
|
|
|
|
Returns:
|
|
float: Energy estimate
|
|
"""
|
|
energy = (
|
|
estimator.run([(ansatz, hamiltonian, params)]).result()[0].data.evs
|
|
)
|
|
return energy
|
|
|
|
|
|
def run_vqe(initial_parameters, ansatz, operator, estimator, method):
|
|
callback_dict = {
|
|
"prev_vector": None,
|
|
"iters": 0,
|
|
"cost_history": [],
|
|
"_total_time": 0,
|
|
"_prev_time": None,
|
|
}
|
|
callback = build_callback(ansatz, operator, estimator, callback_dict)
|
|
result = minimize(
|
|
cost_func,
|
|
initial_parameters,
|
|
args=(ansatz, operator, estimator),
|
|
method=method,
|
|
callback=callback,
|
|
)
|
|
return result, callback_dict
|
|
|
|
|
|
if __name__ == "__main__":
|
|
arguments = get_arguments()
|
|
|
|
service = arguments.get("service")
|
|
|
|
ansatz = arguments.get("ansatz")
|
|
operator = arguments.get("operator")
|
|
method = arguments.get("method", "COBYLA")
|
|
initial_parameters = arguments.get("initial_parameters")
|
|
if service:
|
|
backend = service.least_busy(operational=True, simulator=False)
|
|
else:
|
|
backend = AerSimulator()
|
|
if initial_parameters is None:
|
|
initial_parameters = 2 * np.pi * np.random.rand(ansatz.num_parameters)
|
|
pm = generate_preset_pass_manager(backend=backend, optimization_level=1)
|
|
ansatz_isa = pm.run(ansatz)
|
|
operator_isa = operator.apply_layout(ansatz_isa.layout)
|
|
|
|
if service:
|
|
with Session(service=service, backend=backend) as session:
|
|
estimator = Estimator(session=session)
|
|
vqe_result, callback_dict = run_vqe(
|
|
initial_parameters=initial_parameters,
|
|
ansatz=ansatz_isa,
|
|
operator=operator_isa,
|
|
estimator=estimator,
|
|
method=method,
|
|
)
|
|
else:
|
|
estimator = Estimator(backend=backend)
|
|
vqe_result, callback_dict = run_vqe(
|
|
initial_parameters=initial_parameters,
|
|
ansatz=ansatz_isa,
|
|
operator=operator_isa,
|
|
estimator=estimator,
|
|
method=method,
|
|
)
|
|
|
|
|
|
qc = ansatz.assign_parameters(vqe_result.x)
|
|
qc.measure_all()
|
|
qc_isa = pm.run(qc)
|
|
|
|
if service:
|
|
with Session(service=service, backend=backend) as session:
|
|
sampler = Sampler(session=session)
|
|
samp_dist = sampler.run([qc_isa], shots=int(1e4)).result()[0].data.meas.get_counts()
|
|
else:
|
|
sampler = Sampler(backend=backend)
|
|
samp_dist = sampler.run([qc_isa], shots=int(1e4)).result()[0].data.meas.get_counts()
|
|
|
|
save_result(
|
|
{
|
|
"result": samp_dist,
|
|
"optimal_point": vqe_result.x.tolist(),
|
|
"optimal_value": vqe_result.fun,
|
|
"optimizer_time": callback_dict.get("_total_time", 0),
|
|
}
|
|
)
|