Move ElasticWork to dfpt_works

This commit is contained in:
Matteo Giantomassi 2018-08-06 02:41:00 +02:00
parent 5fad77ae64
commit 5e94f9fec6
9 changed files with 192 additions and 114 deletions

View File

@ -477,12 +477,17 @@ class NotebookWriter(object):
if which("jupyter") is None:
raise RuntimeError("Cannot find jupyter in $PATH. Install it with `conda install jupyter or `pip install jupyter`")
# Use jupyter-lab instead of classic notebook if possible.
has_jupyterlab = which("jupyter-lab") is not None
#has_jupyterlab = False
appname = "jupyter-lab" if has_jupyterlab else "jupyter notebook"
if foreground:
return os.system("jupyter notebook %s" % nbpath)
return os.system("%s %s" % (appname, nbpath))
fd, tmpname = tempfile.mkstemp(text=True)
cmd = "jupyter notebook %s" % nbpath
cmd = "%s %s" % (appname, nbpath)
print("Executing:", cmd)
print("stdout and stderr redirected to %s" % tmpname)
import subprocess

View File

@ -377,6 +377,11 @@ class AbipyTest(PymatgenTest):
except ImportError:
return False
def run_nbpath(self, nbpath):
"""Test that the notebook in question runs all cells correctly."""
nb, errors = notebook_run(nbpath)
return nb, errors
def has_ipywidgets():
"""Return True if ipywidgets_ package is available."""
@ -507,6 +512,14 @@ class AbipyTest(PymatgenTest):
assert not errors
def abivalidate_work(self, work):
"""Invoke Abinit to test validity of the inputs of a |Work|"""
from abipy.flowtk import Flow
tmpdir = tempfile.mkdtemp()
flow = Flow(workdir=tmpdir)
return self.abivalidate_flow(flow)
def abivalidate_flow(flow):
@ -528,3 +541,36 @@ class AbipyTest(PymatgenTest):
def get_gsinput_si(*args, **kwargs):
return get_gsinput_si(*args, **kwargs)
def notebook_run(path):
Execute a notebook via nbconvert and collect output.
Taken from
path (str): file path for the notebook object
Returns: (parsed nb object, execution errors)
import nbformat
dirname, __ = os.path.split(path)
with tempfile.NamedTemporaryFile(suffix=".ipynb") as fout:
args = ["jupyter", "nbconvert", "--to", "notebook", "--execute",
"--output",, path]
nb =, nbformat.current_nbformat)
errors = [output for cell in nb.cells if "outputs" in cell
for output in cell["outputs"]\
if output.output_type == "error"]
return nb, errors

View File

@ -640,7 +640,7 @@ class GrunsNcFile(AbinitNcFile, Has_Structure, NotebookWriter):
nqsmall: Defines the homogeneous q-mesh used for the DOS. Gives the number of divisions
used to sample the smallest lattice vector. If 0, DOS is not computed and
(phbst, None) is returned.
qppa: Defines the homogeneous q-mesh used for the DOS in units of q-points per reciproval atom.
qppa: Defines the homogeneous q-mesh used for the DOS in units of q-points per reciprocal atom.
Overrides nqsmall.
ndivsm: Number of division used for the smallest segment of the q-path.
line_density: Defines the a density of k-points per reciprocal atom to plot the phonon dispersion.

View File

@ -73,4 +73,3 @@ class GrunsFileTest(AbipyTest):
ddb_list = [os.path.join(path, "mp-149_{:+d}_DDB".format(s)) for s in strains]
g = GrunsNcFile.from_ddb_list(ddb_list, ndivsm=3, nqsmall=3)

View File

@ -19,7 +19,7 @@ except ImportError:
from import Status
from import *
from import EphTask
from import EphTask, ElasticTask
from import *
from import (Flow, G0W0WithQptdmFlow, bandstructure_flow, PhononFlow,
g0w0_flow, phonon_flow, phonon_conv_flow, NonLinearCoeffFlow)
@ -27,6 +27,8 @@ from import AbinitTimerParser, AbinitTimerSection
from import GroundStateScfCycle, D2DEScfCycle
from import *
#from abipy.flowtk.gs_works import EosWork
from abipy.flowtk.dfpt_works import NscfDdksWork, ElasticWork
def flow_main(main): # pragma: no cover

View File

@ -4,7 +4,7 @@ from __future__ import print_function, division, unicode_literals, absolute_impo
import numpy as np
from .works import Work
from .works import Work, MergeDdb
class NscfDdksWork(Work):
@ -60,3 +60,111 @@ class NscfDdksWork(Work):
return new
class ElasticWork(Work, MergeDdb):
This Work computes the elastic constants and (optionally) the piezoelectric tensor.
It consists of Response function calculations for:
* rigid-atom elastic tensor
* rigid-atom piezoelectric tensor
* interatomic force constants at gamma
* Born effective charges
The structure is assumed to be already relaxed
Create a `Flow` for phonon calculations. The flow has one works with:
- 1 GS Task
- 3 DDK Task
- 4 Phonon Tasks (Gamma point)
- 6 Elastic tasks (3 uniaxial + 3 shear strain)
The Phonon tasks and the elastic task will read the DDK produced at the beginning
def from_scf_input(cls, scf_input, with_relaxed_ion=True, with_piezo=False, with_dde=False,
tolerances=None, den_deps=None, manager=None):
with_dde: Compute electric field perturbations.
tolerances: Dict of tolerances
Similar to `from_scf_task`, the difference is that this method requires
an input for SCF calculation instead of a ScfTask. All the tasks (Scf + Phonon)
are packed in a single Work whereas in the previous case we usually have multiple works.
if tolerances is None: tolerances = {}
new = cls(manager=manager)
# Register task for WFK0 calculation (either SCF or NCSCF if den_deps is given)
if den_deps is None:
wfk_task = new.register_scf_task(scf_input)
tolwfr = 1.0e-20
if "nscf" in tolerances:
tolwfr = tolerances["nscf"]["tolwfr"]
nscf_input = scf_input.new_with_vars(iscf=-2, tolwfr=tolwfr)
wfk_task = new.register_nscf_task(nscf_input, deps=den_deps)
if with_piezo or with_dde:
# Calculate the ddk wf's needed for piezoelectric tensor and Born effective charges.
#ddk_tolerance = {"tolwfr": 1.0e-20}
ddk_tolerance = tolerances.get("ddk", None)
ddk_multi = scf_input.make_ddk_inputs(tolerance=ddk_tolerance, manager=manager)
ddk_tasks = []
for inp in ddk_multi:
ddk_task = new.register_ddk_task(inp, deps={wfk_task: "WFK"})
ddk_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks}
if with_dde:
# Add tasks for electric field perturbation.
#dde_tolerance = None
dde_tolerance = tolerances.get("dde", None)
dde_multi = scf_input.make_dde_inputs(tolerance=dde_tolerance, use_symmetries=True, manager=manager)
dde_deps = {wfk_task: "WFK"}
for inp in dde_multi:
new.register_dde_task(inp, deps=dde_deps)
# Build input files for strain and (optionally) phonons.
#strain_tolerance = {"tolvrs": 1e-10}
strain_tolerance = tolerances.get("strain", None)
strain_multi = scf_input.make_strain_perts_inputs(tolerance=strain_tolerance, manager=manager,
phonon_pert=with_relaxed_ion, kptopt=2)
if with_relaxed_ion:
# Phonon perturbation (read DDK if piezo).
ph_deps = {wfk_task: "WFK"}
if with_piezo: ph_deps.update(ddk_deps)
for inp in strain_multi:
if inp.get("rfphon", 0) == 1:
new.register_phonon_task(inp, deps=ph_deps)
# Finally compute strain pertubations (read DDK if piezo).
elast_deps = {wfk_task: "WFK"}
if with_piezo: elast_deps.update(ddk_deps)
for inp in strain_multi:
if inp.get("rfstrs", 0) != 0:
new.register_elastic_task(inp, deps=elast_deps)
return new
def on_all_ok(self):
This method is called when all the tasks of the Work reach S_OK.
Ir runs `mrgddb` in sequential on the local machine to produce
the final DDB file in the outdir of the `Work`.
# Merge DDB files.
out_ddb = self.merge_ddb_files(delete_source_ddbs=False, only_dfpt_tasks=False)
results = self.Results(node=self, returncode=0, message="DDB merge done")
return results

View File

@ -3,10 +3,9 @@ from __future__ import print_function, division, unicode_literals, absolute_impo
import as abidata
import abipy.flowtk as flowtk
#import abipy.flowtk
from abipy.core.testing import AbipyTest
from abipy.flowtk import dfpt_works
#from abipy.flowtk import dfpt_works
class TestDfptWorks(AbipyTest):
@ -14,6 +13,29 @@ class TestDfptWorks(AbipyTest):
def test_nscfddkswork(self):
"""Testing NscfDdksWork."""
scf_task = self.get_gsinput_si(as_task=True)
work = dfpt_works.NscfDdksWork.from_scf_task(scf_task, ddk_ngkpt=[8, 8, 8],
work = flowtk.NscfDdksWork.from_scf_task(scf_task, ddk_ngkpt=[8, 8, 8],
ddk_shiftk=[0, 0, 0], ddk_nband=10)
assert len(work) == 4
def test_elastic_work(self):
"""Testing ElasticWork."""
scf_task = self.get_gsinput_si(as_task=True)
scf_input = scf_task.input
den_deps = {scf_task: "DEN"}
tolerances = dict(nscf={"tolwfr": 1.0e-10}, ddk={"tolwfr": 1.0e-12}, strain={"tolvrs": 1.0e-10})
work = flowtk.ElasticWork.from_scf_input(scf_input,
with_relaxed_ion=True, with_piezo=True, with_dde=True, tolerances=tolerances,
den_deps=den_deps, manager=None)
#assert len(work) == 4
assert work[0].input["iscf"] == -2
assert work[0].input["tolwfr"] == tolerances["nscf"]["tolwfr"]
assert isinstance(work[0], flowtk.NscfTask)
for task in work[1:]:
assert task.input["kptopt"] == 2
assert all(isinstance(task, flowtk.ElasticTask) for task in work[-6:])
for task in work[-6:]:
assert task.input["tolvrs"] == tolerances["strain"]["tolvrs"]

View File

@ -12,7 +12,7 @@ from abipy.flowtk import gs_works
class TestGsWorks(AbipyTest):
def test_eoswork(self):
def test_eos_work(self):
"""Testing EosWork."""
scf_input = self.get_gsinput_si()
work = gs_works.EosWork.from_scf_input(scf_input, npoints=4, deltap_vol=0.25, ecutsm=2.0, move_atoms=True)

View File

@ -4,107 +4,3 @@ from __future__ import print_function, division, unicode_literals, absolute_impo
import numpy as np
from import Work, MergeDdb
class ElasticWork(Work, MergeDdb):
This Work computes the elastic constants and (optionally) the piezoelectric tensor.
It consists of Response function calculations for:
* rigid-atom elastic tensor
* rigid-atom piezoelectric tensor
* interatomic force constants at gamma
* Born effective charges
The structure is assumed to be already relaxed
Create a `Flow` for phonon calculations. The flow has one works with:
- 1 GS Task
- 3 DDK Task
- 4 Phonon Tasks (Gamma point)
- 6 Elastic tasks (3 uniaxial + 3 shear strain)
The Phonon tasks and the elastic task will read the DDK produced at the beginning
def from_scf_input(cls, scf_input, with_relaxed_ion=True, with_piezo=False, with_dde=False,
tolerances=None, den_deps=None, manager=None):
with_dde: Compute electric field perturbations.
tolerances: Dict of tolerances
Similar to `from_scf_task`, the difference is that this method requires
an input for SCF calculation instead of a ScfTask. All the tasks (Scf + Phonon)
are packed in a single Work whereas in the previous case we usually have multiple works.
if tolerances is None: tolerances = {}
new = cls(manager=manager)
# Register task for WFK0 calculation (either SCF or NCSCF if den_deps is given.
if den_deps is None:
wfk_task = new.register_scf_task(scf_input)
wfk_task = new.register_nscf_task(scf_input.new_with_vars(iscf=-2), deps=den_deps)
if with_piezo or with_dde:
# Calculate the ddk wf's needed for piezoelectric tensor and Born effective charges.
ddk_tolerance = {"tolwfr": 1.0e-20}
#ddk_tolerance = tolerances.get("ddk", None)
ddk_multi = scf_input.make_ddk_inputs(tolerance=ddk_tolerance, manager=manager)
ddk_tasks = []
for inp in ddk_multi:
ddk_task = new.register_ddk_task(inp, deps={wfk_task: "WFK"})
ddk_deps = {ddk_task: "DDK" for ddk_task in ddk_tasks}
if with_dde:
# Add tasks for electric field perturbation.
dde_tolerance = None
#dde_tolerance = tolerances.get("dde", None)
dde_multi = scf_input.make_dde_inputs(tolerance=dde_tolerance, use_symmetries=True, manager=manager)
dde_deps = {wfk_task: "WFK"}
for inp in dde_multi:
new.register_dde_task(inp, deps=dde_deps)
# Build input files for strain and (optionally) phonons.
strain_tolerance = {"tolvrs": 1e-10}
#strain_tolerance = tolerances.get("strain", None)
strain_multi = scf_input.make_strain_perts_inputs(tolerance=strain_tolerance, manager=manager,
phonon_pert=with_relaxed_ion, kptopt=2)
if with_relaxed_ion:
# Phonon perturbation (read DDK if piezo).
ph_deps = {wfk_task: "WFK"}
if with_piezo: ph_deps.update(ddk_deps)
for inp in strain_multi:
if inp.get("rfphon", 0) == 1:
new.register_phonon_task(inp, deps=ph_deps)
# Finally compute strain pertubations (read DDK if piezo).
elast_deps = {wfk_task: "WFK"}
if with_piezo: elast_deps.update(ddk_deps)
for inp in strain_multi:
if inp.get("rfstrs", 0) != 0:
new.register_elastic_task(inp, deps=elast_deps)
return new
def on_all_ok(self):
This method is called when all the tasks of the Work reach S_OK.
Ir runs `mrgddb` in sequential on the local machine to produce
the final DDB file in the outdir of the `Work`.
# Merge DDB files.
out_ddb = self.merge_ddb_files(delete_source_ddbs=False, only_dfpt_tasks=False)
results = self.Results(node=self, returncode=0, message="DDB merge done")
return results