quantum-espresso/test-suite/testcode/lib/testcode2/__init__.py

650 lines
28 KiB
Python

'''
testcode2
---------
A framework for regression testing numerical programs.
:copyright: (c) 2012 James Spencer.
:license: modified BSD; see LICENSE for more details.
'''
from __future__ import print_function
import glob
import os
import pipes
import shutil
import subprocess
import sys
try:
import yaml
_HAVE_YAML = True
except ImportError:
_HAVE_YAML = False
import testcode2.dir_lock as dir_lock
import testcode2.exceptions as exceptions
import testcode2.queues as queues
import testcode2.compatibility as compat
import testcode2.util as util
import testcode2.validation as validation
DIR_LOCK = dir_lock.DirLock()
# Do not change! Bad things will happen...
_FILESTEM_TUPLE = (
('test', 'test.out'),
('error', 'test.err'),
('benchmark', 'benchmark.out'),
)
_FILESTEM_DICT = dict( _FILESTEM_TUPLE )
# We can change FILESTEM if needed.
# However, this should only be done to compare two sets of test output or two
# sets of benchmarks.
# Bad things will happen if tests are run without the default FILESTEM!
FILESTEM = dict( _FILESTEM_TUPLE )
class TestProgram:
'''Store and access information about the program being tested.'''
def __init__(self, name, exe, test_id, benchmark, **kwargs):
# Set sane defaults (mostly null) for keyword arguments.
self.name = name
# Running
self.exe = exe
self.test_id = test_id
self.run_cmd_template = ('tc.program tc.args tc.input > '
'tc.output 2> tc.error')
self.launch_parallel = 'mpirun -np tc.nprocs'
self.submit_pattern = 'testcode.run_cmd'
# dummy job with default settings (e.g tolerance)
self.default_test_settings = None
# Analysis
self.benchmark = benchmark
self.ignore_fields = []
self.data_tag = None
self.extract_cmd_template = 'tc.extract tc.args tc.file'
self.extract_program = None
self.extract_args = ''
self.extract_fmt = 'table'
self.skip_cmd_template = 'tc.skip tc.args tc.test'
self.skip_program = None
self.skip_args = ''
self.verify = False
# Info
self.vcs = None
# Set values passed in as keyword options.
for (attr, val) in list(kwargs.items()):
setattr(self, attr, val)
# If using an external verification program, then set the default
# extract command template.
if self.verify and 'extract_cmd_template' not in kwargs:
self.extract_cmd_template = 'tc.extract tc.args tc.test tc.bench'
# Can we actually extract the data?
if self.extract_fmt == 'yaml' and not _HAVE_YAML:
err = 'YAML data format cannot be used: PyYAML is not installed.'
raise exceptions.TestCodeError(err)
def run_cmd(self, input_file, args, nprocs=0):
'''Create run command.'''
output_file = util.testcode_filename(FILESTEM['test'], self.test_id,
input_file, args)
error_file = util.testcode_filename(FILESTEM['error'], self.test_id,
input_file, args)
# Need to escape filenames for passing them to the shell.
exe = pipes.quote(self.exe)
output_file = pipes.quote(output_file)
error_file = pipes.quote(error_file)
cmd = self.run_cmd_template.replace('tc.program', exe)
if type(input_file) is str:
input_file = pipes.quote(input_file)
cmd = cmd.replace('tc.input', input_file)
else:
cmd = cmd.replace('tc.input', '')
if type(args) is str:
cmd = cmd.replace('tc.args', args)
else:
cmd = cmd.replace('tc.args', '')
cmd = cmd.replace('tc.output', output_file)
cmd = cmd.replace('tc.error', error_file)
if nprocs > 0 and self.launch_parallel:
cmd = '%s %s' % (self.launch_parallel, cmd)
cmd = cmd.replace('tc.nprocs', str(nprocs))
return cmd
def extract_cmd(self, path, input_file, args):
'''Create extraction command(s).'''
test_file = util.testcode_filename(FILESTEM['test'], self.test_id,
input_file, args)
bench_file = self.select_benchmark_file(path, input_file, args)
cmd = self.extract_cmd_template
cmd = cmd.replace('tc.extract', pipes.quote(self.extract_program))
cmd = cmd.replace('tc.args', self.extract_args)
if self.verify:
# Single command to compare benchmark and test outputs.
cmd = cmd.replace('tc.test', pipes.quote(test_file))
cmd = cmd.replace('tc.bench', pipes.quote(bench_file))
return (cmd,)
else:
# Need to return commands to extract data from the test and
# benchmark outputs.
test_cmd = cmd.replace('tc.file', pipes.quote(test_file))
bench_cmd = cmd.replace('tc.file', pipes.quote(bench_file))
return (bench_cmd, test_cmd)
def skip_cmd(self, input_file, args):
'''Create skip command.'''
test_file = util.testcode_filename(FILESTEM['test'], self.test_id,
input_file, args)
cmd = self.skip_cmd_template
cmd = cmd.replace('tc.skip', pipes.quote(self.skip_program))
cmd = cmd.replace('tc.args', self.skip_args)
cmd = cmd.replace('tc.test', pipes.quote(test_file))
return cmd
def select_benchmark_file(self, path, input_file, args):
'''Find the first benchmark file out of all benchmark IDs which exists.'''
benchmark = None
benchmarks = []
for bench_id in self.benchmark:
benchfile = util.testcode_filename(FILESTEM['benchmark'], bench_id,
input_file, args)
benchmarks.append(benchfile)
if os.path.exists(os.path.join(path, benchfile)):
benchmark = benchfile
break
if not benchmark:
err = 'No benchmark found in %s. Checked for: %s.'
raise exceptions.TestCodeError(err % (path, ', '.join(benchmarks)))
return benchmark
class Test:
'''Store and execute a test.'''
def __init__(self, name, test_program, path, **kwargs):
self.name = name
# program
self.test_program = test_program
# running
self.path = path
self.inputs_args = None
self.output = None
self.nprocs = 0
self.min_nprocs = 0
self.max_nprocs = compat.maxint
self.submit_template = None
# Run jobs in this concurrently rather than consecutively?
# Only used when setting tests up in testcode2.config: if true then
# each pair of input file and arguments are assigned to a different
# Test object rather than a single Test object.
self.run_concurrent = False
# Analysis
self.default_tolerance = None
self.tolerances = {}
# Set values passed in as keyword options.
for (attr, val) in list(kwargs.items()):
setattr(self, attr, val)
if not self.inputs_args:
self.inputs_args = [('', '')]
self.status = dict( (inp_arg, None) for inp_arg in self.inputs_args )
# 'Decorate' functions which require a directory lock in order for file
# access to be thread-safe.
# As we use the in_dir decorator, which requires knowledge of the test
# directory (a per-instance property), we cannot use the @decorator
# syntactic sugar. Fortunately we can still modify them at
# initialisation time. Thank you python for closures!
self.start_job = DIR_LOCK.in_dir(self.path)(self._start_job)
self.move_output_to_test_output = DIR_LOCK.in_dir(self.path)(
self._move_output_to_test_output)
self.move_old_output_files = DIR_LOCK.in_dir(self.path)(
self._move_old_output_files)
self.verify_job = DIR_LOCK.in_dir(self.path)(self._verify_job)
self.skip_job = DIR_LOCK.in_dir(self.path)(self._skip_job)
def __hash__(self):
return hash(self.path)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
else:
# Compare values we care about...
cmp_vals = ['test_program', 'path', 'inputs_args', 'output',
'nprocs', 'min_nprocs', 'max_nprocs', 'submit_template',
'default_tolerance', 'tolerances', 'status']
comparison = tuple(getattr(other, cmp_val) == getattr(self, cmp_val) for cmp_val in cmp_vals)
return compat.compat_all(comparison)
def run_test(self, verbose=1, cluster_queue=None, rundir=None):
'''Run all jobs in test.'''
try:
# Construct tests.
test_cmds = []
test_files = []
for (test_input, test_arg) in self.inputs_args:
if (test_input and
not os.path.exists(os.path.join(self.path,test_input))):
err = 'Input file does not exist: %s' % (test_input,)
raise exceptions.RunError(err)
test_cmds.append(self.test_program.run_cmd(test_input, test_arg,
self.nprocs))
test_files.append(util.testcode_filename(FILESTEM['test'],
self.test_program.test_id, test_input, test_arg))
# Move files matching output pattern out of the way.
self.move_old_output_files(verbose)
# Run tests one-at-a-time locally or submit job in single submit
# file to a queueing system.
if cluster_queue:
if self.output:
for (ind, test) in enumerate(test_cmds):
# Don't quote self.output if it contains any wildcards
# (assume the user set it up correctly!)
out = self.output
if not compat.compat_any(wild in self.output for wild in
['*', '?', '[', '{']):
out = pipes.quote(self.output)
test_cmds[ind] = '%s; mv %s %s' % (test_cmds[ind],
out, pipes.quote(test_files[ind]))
test_cmds = ['\n'.join(test_cmds)]
for (ind, test) in enumerate(test_cmds):
job = self.start_job(test, cluster_queue, verbose)
job.wait()
# Analyse tests as they finish.
if cluster_queue:
# Did all of them at once.
for (test_input, test_arg) in self.inputs_args:
self.verify_job(test_input, test_arg, verbose, rundir)
else:
# Did one job at a time.
(test_input, test_arg) = self.inputs_args[ind]
err = []
if self.output:
try:
self.move_output_to_test_output(test_files[ind])
except exceptions.RunError:
err.append(sys.exc_info()[1])
status = validation.Status()
if job.returncode != 0:
err.insert(0, 'Error running job. Return code: %i'
% job.returncode)
(status, msg) = self.skip_job(test_input, test_arg,
verbose)
if status.skipped():
self._update_status(status, (test_input, test_arg))
if verbose > 0 and verbose < 3:
sys.stdout.write(
util.info_line(self.path,
test_input, test_arg, rundir)
)
status.print_status(msg, verbose)
elif err:
# re-raise first error we hit.
raise exceptions.RunError(err[0])
else:
self.verify_job(test_input, test_arg, verbose, rundir)
sys.stdout.flush()
except exceptions.RunError:
err = sys.exc_info()[1]
if verbose > 2:
err = 'Test(s) in %s failed.\n%s' % (self.path, err)
status = validation.Status([False])
self._update_status(status, (test_input, test_arg))
if verbose > 0 and verbose < 3:
info_line = util.info_line(self.path, test_input, test_arg, rundir)
sys.stdout.write(info_line)
status.print_status(err, verbose)
# Shouldn't run remaining tests after such a catastrophic failure.
# Mark all remaining tests as skipped so the user knows that they
# weren't run.
err = 'Previous test in %s caused a system failure.' % (self.path)
status = validation.Status(name='skipped')
for ((test_input, test_arg), stat) in list(self.status.items()):
if not self.status[(test_input,test_arg)]:
self._update_status(status, (test_input, test_arg))
if verbose > 2:
cmd = self.test_program.run_cmd(test_input, test_arg,
self.nprocs)
print(('Test using %s in %s' % (cmd, self.path)))
elif verbose > 0:
info_line = util.info_line(self.path, test_input,
test_arg, rundir)
sys.stdout.write(info_line)
status.print_status(err, verbose)
def _start_job(self, cmd, cluster_queue=None, verbose=1):
'''Start test running. Requires directory lock.
IMPORTANT: use self.start_job rather than self._start_job if using multiple
threads.
Decorated to start_job, which acquires directory lock and enters self.path
first, during initialisation.'''
if cluster_queue:
tp_ptr = self.test_program
submit_file = '%s.%s' % (os.path.basename(self.submit_template),
tp_ptr.test_id)
job = queues.ClusterQueueJob(submit_file, system=cluster_queue)
job.create_submit_file(tp_ptr.submit_pattern, cmd,
self.submit_template)
if verbose > 2:
print(('Submitting tests using %s (template submit file) in %s'
% (self.submit_template, self.path)))
job.start_job()
else:
# Run locally via subprocess.
if verbose > 2:
print(('Running test using %s in %s\n' % (cmd, self.path)))
try:
job = subprocess.Popen(cmd, shell=True)
except OSError:
# slightly odd syntax in order to be compatible with python 2.5
# and python 2.6/3
err = 'Execution of test failed: %s' % (sys.exc_info()[1],)
raise exceptions.RunError(err)
# Return either Popen object or ClusterQueueJob object. Both have
# a wait method which returns only once job has finished.
return job
def _move_output_to_test_output(self, test_files_out):
'''Move output to the testcode output file. Requires directory lock.
This is used when a program writes to standard output rather than to STDOUT.
IMPORTANT: use self.move_output_to_test_output rather than
self._move_output_to_test_output if using multiple threads.
Decorated to move_output_to_test_output, which acquires the directory lock and
enters self.path.
'''
# self.output might be a glob which works with e.g.
# mv self.output test_files[ind]
# if self.output matches only one file. Reproduce that
# here so that running tests through the queueing system
# and running tests locally have the same behaviour.
out_files = glob.glob(self.output)
if len(out_files) == 1:
shutil.move(out_files[0], test_files_out)
else:
err = ('Output pattern (%s) matches %s files (%s).'
% (self.output, len(out_files), out_files))
raise exceptions.RunError(err)
def _move_old_output_files(self, verbose=1):
'''Move output to the testcode output file. Requires directory lock.
This is used when a program writes to standard output rather than to STDOUT.
IMPORTANT: use self.move_oold_output_files rather than
self._move_old_output_files if using multiple threads.
Decorated to move_old_output_files, which acquires the directory lock and
enters self.path.
'''
if self.output:
old_out_files = glob.glob(self.output)
if old_out_files:
out_dir = 'test.prev.output.%s' % (self.test_program.test_id)
if verbose > 2:
print(('WARNING: found existing files matching output '
'pattern: %s.' % self.output))
print(('WARNING: moving existing output files (%s) to %s.\n'
% (', '.join(old_out_files), out_dir)))
if not os.path.exists(out_dir):
os.mkdir(out_dir)
for out_file in old_out_files:
shutil.move(out_file, out_dir)
def _verify_job(self, input_file, args, verbose=1, rundir=None):
'''Check job against benchmark.
Assume function is executed in self.path.
IMPORTANT: use self.verify_job rather than self._verify_job if using multiple
threads.
Decorated to verify_job, which acquires directory lock and enters self.path
first, during initialisation.'''
# We already have DIR_LOCK, so use _skip_job instead of skip_job.
(status, msg) = self._skip_job(input_file, args, verbose)
try:
if self.test_program.verify and not status.skipped():
(status, msg) = self.verify_job_external(input_file, args,
verbose)
elif not status.skipped():
(bench_out, test_out) = self.extract_data(input_file, args,
verbose)
(comparable, status, msg) = validation.compare_data(bench_out,
test_out, self.default_tolerance, self.tolerances,
self.test_program.ignore_fields)
if verbose > 2:
# Include data tables in output.
if comparable:
# Combine test and benchmark dictionaries.
data_table = util.pretty_print_table(
['benchmark', 'test'],
[bench_out, test_out])
else:
# Print dictionaries separately--couldn't even compare
# them!
data_table = '\n'.join((
util.pretty_print_table(['benchmark'], [bench_out]),
util.pretty_print_table(['test '], [test_out])))
if msg.strip():
# join data table with error message from
# validation.compare_data.
msg = '\n'.join((msg, data_table))
else:
msg = data_table
except (exceptions.AnalysisError, exceptions.TestCodeError):
if msg.strip():
msg = '%s\n%s' % (msg, sys.exc_info()[1])
else:
msg = sys.exc_info()[1]
status = validation.Status([False])
self._update_status(status, (input_file, args))
if verbose > 0 and verbose < 3:
info_line = util.info_line(self.path, input_file, args, rundir)
sys.stdout.write(info_line)
status.print_status(msg, verbose)
return (status, msg)
def _skip_job(self, input_file, args, verbose=1):
'''Run user-supplied command to check if test should be skipped.
IMPORTANT: use self.skip_job rather than self._skip_job if using multiple
threads.
Decorated to skip_job, which acquires directory lock and enters self.path
first, during initialisation.'''
status = validation.Status()
if self.test_program.skip_program:
cmd = self.test_program.skip_cmd(input_file, args)
try:
if verbose > 2:
print(('Testing whether to skip test using %s in %s.' %
(cmd, self.path)))
skip_popen = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
skip_popen.wait()
if skip_popen.returncode == 0:
# skip this test
status = validation.Status(name='skipped')
except OSError:
# slightly odd syntax in order to be compatible with python
# 2.5 and python 2.6/3
if verbose > 2:
print(('Test to skip test: %s' % (sys.exc_info()[1],)))
return (status, '')
def verify_job_external(self, input_file, args, verbose=1):
'''Run user-supplied verifier script.
Assume function is executed in self.path.'''
verify_cmd, = self.test_program.extract_cmd(self.path, input_file, args)
try:
if verbose > 2:
print(('Analysing test using %s in %s.' %
(verify_cmd, self.path)))
verify_popen = subprocess.Popen(verify_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
verify_popen.wait()
except OSError:
# slightly odd syntax in order to be compatible with python 2.5
# and python 2.6/3
err = 'Analysis of test failed: %s' % (sys.exc_info()[1],)
raise exceptions.AnalysisError(err)
output = verify_popen.communicate()[0].decode('utf-8')
if verbose < 2:
# Suppress output. (hackhack)
output = ''
if verify_popen.returncode == 0:
return (validation.Status([True]), output)
else:
return (validation.Status([False]), output)
def extract_data(self, input_file, args, verbose=1):
'''Extract data from output file.
Assume function is executed in self.path.'''
tp_ptr = self.test_program
if tp_ptr.data_tag:
# Using internal data extraction function.
data_files = [
tp_ptr.select_benchmark_file(self.path, input_file, args),
util.testcode_filename(FILESTEM['test'],
tp_ptr.test_id, input_file, args),
]
if verbose > 2:
print(('Analysing output using data_tag %s in %s on files %s.' %
(tp_ptr.data_tag, self.path, ' and '.join(data_files))))
outputs = [util.extract_tagged_data(tp_ptr.data_tag, dfile)
for dfile in data_files]
else:
# Using external data extraction script.
# Get extraction commands.
extract_cmds = tp_ptr.extract_cmd(self.path, input_file, args)
# Extract data.
outputs = []
for cmd in extract_cmds:
try:
if verbose > 2:
print(('Analysing output using %s in %s.' %
(cmd, self.path)))
extract_popen = subprocess.run(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
# slightly odd syntax in order to be compatible with python
# 2.5 and python 2.6/3
err = 'Analysing output failed: %s' % (sys.exc_info()[1],)
raise exceptions.AnalysisError(err)
# Convert data string from extract command to dictionary format.
if extract_popen.returncode != 0:
err = extract_popen.communicate()[1].decode('utf-8')
err = 'Analysing output failed: %s' % (err)
raise exceptions.AnalysisError(err)
data_string = extract_popen.stdout.decode('utf-8')
if self.test_program.extract_fmt == 'table':
outputs.append(util.dict_table_string(data_string))
elif self.test_program.extract_fmt == 'yaml':
outputs.append({})
# convert values to be in a tuple so the format matches
# that from dict_table_string.
# ensure all keys are strings so they can be sorted
# (different data types cause problems!)
for (key, val) in list(yaml.safe_load(data_string).items()):
if isinstance(val, list):
outputs[-1][str(key)] = tuple(val)
else:
outputs[-1][str(key)] = tuple((val,))
return tuple(outputs)
def create_new_benchmarks(self, benchmark, copy_files_since=None,
copy_files_path='testcode_data'):
'''Copy the test files to benchmark files.'''
oldcwd = os.getcwd()
os.chdir(self.path)
test_files = []
for (inp, arg) in self.inputs_args:
test_file = util.testcode_filename(FILESTEM['test'],
self.test_program.test_id, inp, arg)
err_file = util.testcode_filename(FILESTEM['error'],
self.test_program.test_id, inp, arg)
bench_file = util.testcode_filename(_FILESTEM_DICT['benchmark'],
benchmark, inp, arg)
test_files.extend((test_file, err_file, bench_file))
shutil.copy(test_file, bench_file)
if copy_files_since:
if not os.path.isdir(copy_files_path):
os.mkdir(copy_files_path)
if os.path.isdir(copy_files_path):
for data_file in glob.glob('*'):
if (os.path.isfile(data_file) and
os.stat(data_file)[-2] >= copy_files_since and
data_file not in test_files):
bench_data_file = os.path.join(copy_files_path,
data_file)
# shutil.copy can't overwrite files so remove old ones
# with the same name.
if os.path.exists(bench_data_file):
os.unlink(bench_data_file)
shutil.copy(data_file, bench_data_file)
os.chdir(oldcwd)
def _update_status(self, status, inp_arg):
'''Update self.status with success of a test.'''
if status:
self.status[inp_arg] = status
else:
# Something went wrong. Store a Status failed object.
self.status[inp_arg] = validation.Status([False])
def get_status(self):
'''Get number of passed and number of ran tasks.'''
# If there's an object (other than None/False) in the corresponding
# dict entry in self.status, then that test must have ran (albeit not
# necessarily successfuly!).
status = {}
status['passed'] = sum(True for stat in list(self.status.values())
if stat and stat.passed())
status['warning'] = sum(True for stat in list(self.status.values())
if stat and stat.warning())
status['skipped'] = sum(True for stat in list(self.status.values())
if stat and stat.skipped())
status['failed'] = sum(True for stat in list(self.status.values())
if stat and stat.failed())
status['unknown'] = sum(True for stat in list(self.status.values())
if stat and stat.unknown())
status['ran'] = sum(True for stat in list(self.status.values()) if stat)
return status