Add JUnit/XUnit-formatted output to the lldb test run system

Also introduces the test event system into our test runner framework.
See the following for details:
http://reviews.llvm.org/D12831

llvm-svn: 247722
This commit is contained in:
Todd Fiala 2015-09-15 21:38:04 +00:00
parent c26bbfe6b6
commit 68615ce606
6 changed files with 1537 additions and 114 deletions

View File

@ -32,6 +32,7 @@ ulimit -c unlimited
echo core.%p | sudo tee /proc/sys/kernel/core_pattern
"""
import asyncore
import fnmatch
import multiprocessing
import multiprocessing.pool
@ -44,10 +45,9 @@ import subprocess
import sys
import threading
import dotest_channels
import dotest_args
from optparse import OptionParser
def get_timeout_command():
"""Search for a suitable timeout command."""
@ -76,6 +76,9 @@ test_name_len = None
dotest_options = None
output_on_success = False
RESULTS_FORMATTER = None
RUNNER_PROCESS_ASYNC_MAP = None
RESULTS_LISTENER_CHANNEL = None
def setup_global_variables(lock, counter, total, name_len, options):
global output_lock, test_counter, total_tests, test_name_len
@ -147,12 +150,39 @@ def parse_test_results(output):
return passes, failures, unexpected_successes
def inferior_session_interceptor(forwarding_func, event):
"""Intercepts session begin/end events, passing through everyting else.
@param forwarding_func a callable object to pass along the event if it
is not one that gets intercepted.
@param event the test result event received.
"""
if event is not None and isinstance(event, dict):
if "event" in event:
if event["event"] == "session_begin":
# Swallow it. Could report on inferior here if we
# cared.
return
elif event["event"] == "session_end":
# Swallow it. Could report on inferior here if we
# cared. More usefully, we can verify that the
# inferior went down hard if we don't receive this.
return
# Pass it along.
forwarding_func(event)
def call_with_timeout(command, timeout, name, inferior_pid_events):
"""Run command with a timeout if possible."""
"""-s QUIT will create a coredump if they are enabled on your system"""
"""Run command with a timeout if possible.
-s QUIT will create a coredump if they are enabled on your system
"""
process = None
if timeout_command and timeout != "0":
command = [timeout_command, '-s', 'QUIT', timeout] + command
# Specifying a value for close_fds is unsupported on Windows when using
# subprocess.PIPE
if os.name != "nt":
@ -170,7 +200,14 @@ def call_with_timeout(command, timeout, name, inferior_pid_events):
if inferior_pid_events:
inferior_pid_events.put_nowait(('created', inferior_pid))
output = process.communicate()
# The inferior should now be entirely wrapped up.
exit_status = process.returncode
if exit_status is None:
raise Exception(
"no exit status available after the inferior dotest.py "
"should have completed")
if inferior_pid_events:
inferior_pid_events.put_nowait(('destroyed', inferior_pid))
@ -180,6 +217,10 @@ def call_with_timeout(command, timeout, name, inferior_pid_events):
# only stderr does.
report_test_pass(name, output[1])
else:
# TODO need to differentiate a failing test from a run that
# was broken out of by a SIGTERM/SIGKILL, reporting those as
# an error. If a signal-based completion, need to call that
# an error.
report_test_failure(name, command, output[1])
return name, exit_status, passes, failures, unexpected_successes
@ -250,9 +291,7 @@ def process_dir_worker_multiprocessing_pool(args):
return process_dir(*args)
def process_dir_worker_threading(
a_test_counter, a_total_tests, a_test_name_len,
a_dotest_options, job_queue, result_queue, inferior_pid_events):
def process_dir_worker_threading(job_queue, result_queue, inferior_pid_events):
"""Worker thread main loop when in threading mode.
This one supports the hand-rolled pooling support.
@ -413,6 +452,150 @@ def initialize_global_vars_threading(num_threads, test_work_items):
initialize_global_vars_common(num_threads, test_work_items)
def ctrl_c_loop(main_op_func, done_func, ctrl_c_handler):
"""Provides a main loop that is Ctrl-C protected.
The main loop calls the main_op_func() repeatedly until done_func()
returns true. The ctrl_c_handler() method is called with a single
int parameter that contains the number of times the ctrl_c has been
hit (starting with 1). The ctrl_c_handler() should mutate whatever
it needs to have the done_func() return True as soon as it is desired
to exit the loop.
"""
done = False
ctrl_c_count = 0
while not done:
try:
# See if we're done. Start with done check since it is
# the first thing executed after a Ctrl-C handler in the
# following loop.
done = done_func()
if not done:
# Run the main op once.
main_op_func()
except KeyboardInterrupt:
ctrl_c_count += 1
ctrl_c_handler(ctrl_c_count)
def pump_workers_and_asyncore_map(workers, asyncore_map):
"""Prunes out completed workers and maintains the asyncore loop.
The asyncore loop contains the optional socket listener
and handlers. When all workers are complete, this method
takes care of stopping the listener. It also runs the
asyncore loop for the given async map for 10 iterations.
@param workers the list of worker Thread/Process instances.
@param asyncore_map the asyncore threading-aware map that
indicates which channels are in use and still alive.
"""
# Check on all the workers, removing them from the workers
# list as they complete.
dead_workers = []
for worker in workers:
# This non-blocking join call is what allows us
# to still receive keyboard interrupts.
worker.join(0.01)
if not worker.is_alive():
dead_workers.append(worker)
# Clear out the completed workers
for dead_worker in dead_workers:
workers.remove(dead_worker)
# If there are no more workers and there is a listener,
# close the listener.
global RESULTS_LISTENER_CHANNEL
if len(workers) == 0 and RESULTS_LISTENER_CHANNEL is not None:
RESULTS_LISTENER_CHANNEL.close()
RESULTS_LISTENER_CHANNEL = None
# Pump the asyncore map if it isn't empty.
if len(asyncore_map) > 0:
asyncore.loop(0.1, False, asyncore_map, 10)
def handle_ctrl_c(ctrl_c_count, job_queue, workers, inferior_pid_events,
stop_all_inferiors_func):
"""Performs the appropriate ctrl-c action for non-pool parallel test runners
@param ctrl_c_count starting with 1, indicates the number of times ctrl-c
has been intercepted. The value is 1 on the first intercept, 2 on the
second, etc.
@param job_queue a Queue object that contains the work still outstanding
(i.e. hasn't been assigned to a worker yet).
@param workers list of Thread or Process workers.
@param inferior_pid_events specifies a Queue of inferior process
construction and destruction events. Used to build the list of inferior
processes that should be killed if we get that far.
@param stop_all_inferiors_func a callable object that takes the
workers and inferior_pid_events parameters (in that order) if a hard
stop is to be used on the workers.
"""
# Print out which Ctrl-C we're handling.
key_name = [
"first",
"second",
"third",
"many"]
if ctrl_c_count < len(key_name):
name_index = ctrl_c_count - 1
else:
name_index = len(key_name) - 1
message = "\nHandling {} KeyboardInterrupt".format(key_name[name_index])
with output_lock:
print message
if ctrl_c_count == 1:
# Remove all outstanding items from the work queue so we stop
# doing any more new work.
while not job_queue.empty():
try:
# Just drain it to stop more work from being started.
job_queue.get_nowait()
except Queue.Empty:
pass
with output_lock:
print "Stopped more work from being started."
elif ctrl_c_count == 2:
# Try to stop all inferiors, even the ones currently doing work.
stop_all_inferiors_func(workers, inferior_pid_events)
else:
with output_lock:
print "All teardown activities kicked off, should finish soon."
def workers_and_async_done(workers, async_map):
"""Returns True if the workers list and asyncore channels are all done.
@param workers list of workers (threads/processes). These must adhere
to the threading Thread or multiprocessing.Process interface.
@param async_map the threading-aware asyncore channel map to check
for live channels.
@return False if the workers list exists and has any entries in it, or
if the async_map exists and has any entries left in it; otherwise, True.
"""
if workers is not None and len(workers) > 0:
# We're not done if we still have workers left.
return False
if async_map is not None and len(async_map) > 0:
return False
# We're done.
return True
def multiprocessing_test_runner(num_threads, test_work_items):
"""Provides hand-wrapped pooling test runner adapter with Ctrl-C support.
@ -464,36 +647,72 @@ def multiprocessing_test_runner(num_threads, test_work_items):
worker.start()
workers.append(worker)
# Wait for all workers to finish, handling ^C as needed.
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
# First try to drain the queue of work and let the
# running tests complete.
while not job_queue.empty():
try:
# Just drain it to stop more work from being started.
job_queue.get_nowait()
except Queue.Empty:
pass
# Main loop: wait for all workers to finish and wait for
# the socket handlers to wrap up.
ctrl_c_loop(
# Main operation of loop
lambda: pump_workers_and_asyncore_map(
workers, RUNNER_PROCESS_ASYNC_MAP),
print ('\nFirst KeyboardInterrupt received, stopping '
'future work. Press again to hard-stop existing tests.')
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
print ('\nSecond KeyboardInterrupt received, killing '
'all worker process trees.')
kill_all_worker_processes(workers, inferior_pid_events)
# Return True when we're done with the main loop.
lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
# Indicate what we do when we receive one or more Ctrl-Cs.
lambda ctrl_c_count: handle_ctrl_c(
ctrl_c_count, job_queue, workers, inferior_pid_events,
kill_all_worker_processes))
# Reap the test results.
test_results = []
while not result_queue.empty():
test_results.append(result_queue.get(block=False))
return test_results
def map_async_run_loop(future, channel_map, listener_channel):
"""Blocks until the Pool.map_async completes and the channel completes.
@param future an AsyncResult instance from a Pool.map_async() call.
@param channel_map the asyncore dispatch channel map that should be pumped.
Optional: may be None.
@param listener_channel the channel representing a listener that should be
closed once the map_async results are available.
@return the results from the async_result instance.
"""
map_results = None
done = False
while not done:
# Check if we need to reap the map results.
if map_results is None:
if future.ready():
# Get the results.
map_results = future.get()
# Close the runner process listener channel if we have
# one: no more connections will be incoming.
if listener_channel is not None:
listener_channel.close()
# Pump the asyncore loop if we have a listener socket.
if channel_map is not None:
asyncore.loop(0.01, False, channel_map, 10)
# Figure out if we're done running.
done = map_results is not None
if channel_map is not None:
# We have a runner process async map. Check if it
# is complete.
if len(channel_map) > 0:
# We still have an asyncore channel running. Not done yet.
done = False
return map_results
def multiprocessing_test_runner_pool(num_threads, test_work_items):
# Initialize our global state.
initialize_global_vars_multiprocessing(num_threads, test_work_items)
@ -503,7 +722,12 @@ def multiprocessing_test_runner_pool(num_threads, test_work_items):
initializer=setup_global_variables,
initargs=(output_lock, test_counter, total_tests, test_name_len,
dotest_options))
return pool.map(process_dir_worker_multiprocessing_pool, test_work_items)
# Start the map operation (async mode).
map_future = pool.map_async(
process_dir_worker_multiprocessing_pool, test_work_items)
return map_async_run_loop(
map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
def threading_test_runner(num_threads, test_work_items):
@ -541,53 +765,28 @@ def threading_test_runner(num_threads, test_work_items):
for _ in range(num_threads):
worker = threading.Thread(
target=process_dir_worker_threading,
args=(test_counter,
total_tests,
test_name_len,
dotest_options,
job_queue,
args=(job_queue,
result_queue,
inferior_pid_events))
worker.start()
workers.append(worker)
# Wait for all workers to finish, handling ^C as needed.
try:
# We do some trickery here to ensure we can catch keyboard
# interrupts.
while len(workers) > 0:
# Make a pass throug the workers, checking for who is done.
dead_workers = []
for worker in workers:
# This non-blocking join call is what allows us
# to still receive keyboard interrupts.
worker.join(0.01)
if not worker.isAlive():
dead_workers.append(worker)
# Clear out the completed workers
for dead_worker in dead_workers:
workers.remove(dead_worker)
# Main loop: wait for all workers to finish and wait for
# the socket handlers to wrap up.
ctrl_c_loop(
# Main operation of loop
lambda: pump_workers_and_asyncore_map(
workers, RUNNER_PROCESS_ASYNC_MAP),
except KeyboardInterrupt:
# First try to drain the queue of work and let the
# running tests complete.
while not job_queue.empty():
try:
# Just drain it to stop more work from being started.
job_queue.get_nowait()
except Queue.Empty:
pass
# Return True when we're done with the main loop.
lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
print ('\nFirst KeyboardInterrupt received, stopping '
'future work. Press again to hard-stop existing tests.')
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
print ('\nSecond KeyboardInterrupt received, killing '
'all worker process trees.')
kill_all_worker_threads(workers, inferior_pid_events)
# Indicate what we do when we receive one or more Ctrl-Cs.
lambda ctrl_c_count: handle_ctrl_c(
ctrl_c_count, job_queue, workers, inferior_pid_events,
kill_all_worker_threads))
# Reap the test results.
test_results = []
while not result_queue.empty():
test_results.append(result_queue.get(block=False))
@ -598,20 +797,49 @@ def threading_test_runner_pool(num_threads, test_work_items):
# Initialize our global state.
initialize_global_vars_threading(num_threads, test_work_items)
pool = multiprocessing.pool.ThreadPool(
num_threads
# initializer=setup_global_variables,
# initargs=(output_lock, test_counter, total_tests, test_name_len,
# dotest_options)
)
return pool.map(process_dir_worker_threading_pool, test_work_items)
pool = multiprocessing.pool.ThreadPool(num_threads)
map_future = pool.map_async(
process_dir_worker_threading_pool, test_work_items)
return map_async_run_loop(
map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
def asyncore_run_loop(channel_map):
try:
asyncore.loop(None, False, channel_map)
except:
# Swallow it, we're seeing:
# error: (9, 'Bad file descriptor')
# when the listener channel is closed. Shouldn't be the case.
pass
def inprocess_exec_test_runner(test_work_items):
# Initialize our global state.
initialize_global_vars_multiprocessing(1, test_work_items)
return map(process_dir_mapper_inprocess, test_work_items)
# Run the listener and related channel maps in a separate thread.
# global RUNNER_PROCESS_ASYNC_MAP
global RESULTS_LISTENER_CHANNEL
if RESULTS_LISTENER_CHANNEL is not None:
socket_thread = threading.Thread(
target=lambda: asyncore_run_loop(RUNNER_PROCESS_ASYNC_MAP))
socket_thread.start()
# Do the work.
test_results = map(process_dir_mapper_inprocess, test_work_items)
# If we have a listener channel, shut it down here.
if RESULTS_LISTENER_CHANNEL is not None:
# Close down the channel.
RESULTS_LISTENER_CHANNEL.close()
RESULTS_LISTENER_CHANNEL = None
# Wait for the listener and handlers to complete.
socket_thread.join()
return test_results
def walk_and_invoke(test_directory, test_subdir, dotest_argv,
test_runner_func):
@ -624,6 +852,22 @@ def walk_and_invoke(test_directory, test_subdir, dotest_argv,
test_subdir - lldb/test/ or a subfolder with the tests we're interested in
running
"""
# The async_map is important to keep all thread-related asyncore
# channels distinct when we call asyncore.loop() later on.
global RESULTS_LISTENER_CHANNEL, RUNNER_PROCESS_ASYNC_MAP
RUNNER_PROCESS_ASYNC_MAP = {}
# If we're outputting side-channel test results, create the socket
# listener channel and tell the inferior to send results to the
# port on which we'll be listening.
if RESULTS_FORMATTER is not None:
forwarding_func = lambda event: inferior_session_interceptor(
RESULTS_FORMATTER.process_event, event)
RESULTS_LISTENER_CHANNEL = (
dotest_channels.UnpicklingForwardingListenerChannel(
RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
dotest_argv.append("--results-port")
dotest_argv.append(str(RESULTS_LISTENER_CHANNEL.address[1]))
# Collect the test files that we'll run.
test_work_items = []
@ -654,7 +898,7 @@ def getExpectedTimeouts(platform_name):
if platform_name is None:
target = sys.platform
else:
m = re.search('remote-(\w+)', platform_name)
m = re.search(r'remote-(\w+)', platform_name)
target = m.group(1)
expected_timeout = set()
@ -759,20 +1003,94 @@ def get_test_runner_strategies(num_threads):
# threading-pool uses threading for the workers (in-process)
# and uses the multiprocessing.pool thread-enabled pool.
# This does not properly support Ctrl-C.
"threading-pool":
(lambda work_items: threading_test_runner_pool(
num_threads, work_items)),
# serial uses the subprocess-based, single process
# test runner. This provides process isolation but
# no concurrent test running.
# no concurrent test execution.
"serial":
inprocess_exec_test_runner
}
def _remove_option(args, option_name, removal_count):
"""Removes option and related option arguments from args array.
@param args the array of command line arguments (in/out)
@param option_name the full command line representation of the
option that will be removed (including '--' or '-').
@param the count of elements to remove. A value of 1 will remove
just the found option, while 2 will remove the option and its first
argument.
"""
try:
index = args.index(option_name)
# Handle the exact match case.
del args[index:index+removal_count]
return
except ValueError:
# Thanks to argparse not handling options with known arguments
# like other options parsing libraries (see
# https://bugs.python.org/issue9334), we need to support the
# --results-formatter-options={second-level-arguments} (note
# the equal sign to fool the first-level arguments parser into
# not treating the second-level arguments as first-level
# options). We're certainly at risk of getting this wrong
# since now we're forced into the business of trying to figure
# out what is an argument (although I think this
# implementation will suffice).
regex_string = "^" + option_name + "="
regex = re.compile(regex_string)
for index in range(len(args)):
match = regex.match(args[index])
if match:
print "found matching option= at index {}".format(index)
del args[index]
return
print "failed to find regex '{}'".format(regex_string)
# We didn't find the option but we should have.
raise Exception("failed to find option '{}' in args '{}'".format(
option_name, args))
def adjust_inferior_options(dotest_argv):
"""Adjusts the commandline args array for inferiors.
This method adjusts the inferior dotest commandline options based
on the parallel test runner's options. Some of the inferior options
will need to change to properly handle aggregation functionality.
"""
global dotest_options
# If we don't have a session directory, create one.
if not dotest_options.s:
# no session log directory, we need to add this to prevent
# every dotest invocation from creating its own directory
import datetime
# The windows platforms don't like ':' in the pathname.
timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
dotest_argv.append('-s')
dotest_argv.append(timestamp_started)
dotest_options.s = timestamp_started
# Adjust inferior results formatter options - if the parallel
# test runner is collecting into the user-specified test results,
# we'll have inferiors spawn with the --results-port option and
# strip the original test runner options.
if dotest_options.results_file is not None:
_remove_option(dotest_argv, "--results-file", 2)
if dotest_options.results_port is not None:
_remove_option(dotest_argv, "--results-port", 2)
if dotest_options.results_formatter is not None:
_remove_option(dotest_argv, "--results-formatter", 2)
if dotest_options.results_formatter_options is not None:
_remove_option(dotest_argv, "--results-formatter-options", 2)
def main(print_details_on_success, num_threads, test_subdir,
test_runner_name):
test_runner_name, results_formatter):
"""Run dotest.py in inferior mode in parallel.
@param print_details_on_success the parsed value of the output-on-success
@ -793,53 +1111,31 @@ def main(print_details_on_success, num_threads, test_subdir,
system to choose the most appropriate test runner given desired
thread count and OS type.
@param results_formatter if specified, provides the TestResultsFormatter
instance that will format and output test result data from the
side-channel test results. When specified, inferior dotest calls
will send test results side-channel data over a socket to the parallel
test runner, which will forward them on to results_formatter.
"""
dotest_argv = sys.argv[1:]
global output_on_success
global output_on_success, RESULTS_FORMATTER
output_on_success = print_details_on_success
RESULTS_FORMATTER = results_formatter
# We can't use sys.path[0] to determine the script directory
# because it doesn't work under a debugger
test_directory = os.path.dirname(os.path.realpath(__file__))
parser = OptionParser(usage="""\
Run lldb test suite using a separate process for each test file.
Each test will run with a time limit of 10 minutes by default.
Override the default time limit of 10 minutes by setting
the environment variable LLDB_TEST_TIMEOUT.
E.g., export LLDB_TEST_TIMEOUT=10m
Override the time limit for individual tests by setting
the environment variable LLDB_[TEST NAME]_TIMEOUT.
E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
Set to "0" to run without time limit.
E.g., export LLDB_TEST_TIMEOUT=0
or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
""")
parser = dotest_args.create_parser()
global dotest_options
dotest_options = dotest_args.parse_args(parser, dotest_argv)
if not dotest_options.s:
# no session log directory, we need to add this to prevent
# every dotest invocation from creating its own directory
import datetime
# The windows platforms don't like ':' in the pathname.
timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
dotest_argv.append('-s')
dotest_argv.append(timestamp_started)
dotest_options.s = timestamp_started
adjust_inferior_options(dotest_argv)
session_dir = os.path.join(os.getcwd(), dotest_options.s)
# The root directory was specified on the command line
test_directory = os.path.dirname(os.path.realpath(__file__))
if test_subdir and len(test_subdir) > 0:
test_subdir = os.path.join(test_directory, test_subdir)
else:

View File

@ -20,15 +20,20 @@ Type:
for available options.
"""
import atexit
import commands
import importlib
import os
import dotest_args
import errno
import platform
import progress
import signal
import socket
import subprocess
import sys
import test_results
from test_results import EventBuilder
import inspect
import unittest2
import lldbtest_config
@ -251,6 +256,14 @@ output_on_success = False
no_multiprocess_test_runner = False
test_runner_name = None
# Test results handling globals
results_filename = None
results_port = None
results_file_object = None
results_formatter_name = None
results_formatter_object = None
results_formatter_options = None
def usage(parser):
parser.print_help()
if verbose > 0:
@ -497,6 +510,10 @@ def parseOptionsAndInitTestdirs():
global output_on_success
global no_multiprocess_test_runner
global test_runner_name
global results_filename
global results_formatter_name
global results_formatter_options
global results_port
do_help = False
@ -782,6 +799,24 @@ def parseOptionsAndInitTestdirs():
if args.test_runner_name:
test_runner_name = args.test_runner_name
# Capture test results-related args.
if args.results_file:
results_filename = args.results_file
if args.results_port:
results_port = args.results_port
if args.results_file and args.results_port:
sys.stderr.write(
"only one of --results-file and --results-port should "
"be specified\n")
usage(args)
if args.results_formatter:
results_formatter_name = args.results_formatter
if args.results_formatter_options:
results_formatter_options = args.results_formatter_options
if args.lldb_platform_name:
lldb_platform_name = args.lldb_platform_name
if args.lldb_platform_url:
@ -886,6 +921,85 @@ def getXcodeOutputPaths(lldbRootDirectory):
return result
def createSocketToLocalPort(port):
def socket_closer(s):
"""Close down an opened socket properly."""
s.shutdown(socket.SHUT_RDWR)
s.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", port))
return (sock, lambda: socket_closer(sock))
def setupTestResults():
"""Sets up test results-related objects based on arg settings."""
global results_filename
global results_file_object
global results_formatter_name
global results_formatter_object
global results_formatter_options
global results_port
default_formatter_name = None
cleanup_func = None
if results_filename:
# Open the results file for writing.
results_file_object = open(results_filename, "w")
cleanup_func = results_file_object.close
default_formatter_name = "test_results.XunitFormatter"
elif results_port:
# Connect to the specified localhost port.
results_file_object, cleanup_func = createSocketToLocalPort(
results_port)
default_formatter_name = "test_results.RawPickledFormatter"
if results_file_object:
# We care about the formatter. Choose user-specified or, if
# none specified, use the default for the output type.
if results_formatter_name:
formatter_name = results_formatter_name
else:
formatter_name = default_formatter_name
# Create an instance of the class. First figure out the package/module.
components = formatter_name.split(".")
module = importlib.import_module(".".join(components[:-1]))
# Create the class name we need to load.
clazz = getattr(module, components[-1])
# Handle formatter options for the results formatter class.
formatter_arg_parser = clazz.arg_parser()
if results_formatter_options and len(results_formatter_options) > 0:
import shlex
split_options = shlex.split(results_formatter_options)
else:
split_options = []
formatter_options = formatter_arg_parser.parse_args(split_options)
# Create the TestResultsFormatter given the processed options.
results_formatter_object = clazz(results_file_object, formatter_options)
# Start the results formatter session - we'll only have one
# during a given dotest process invocation.
results_formatter_object.begin_session()
def shutdown_formatter():
# Tell the formatter to write out anything it may have
# been saving until the very end (e.g. xUnit results
# can't complete its output until this point).
results_formatter_object.end_session()
# And now close out the output file-like object.
cleanup_func()
atexit.register(shutdown_formatter)
def getOutputPaths(lldbRootDirectory):
"""
Returns typical build output paths for the lldb executable
@ -1293,13 +1407,20 @@ if __name__ == "__main__":
#
parseOptionsAndInitTestdirs()
# Setup test results (test results formatter and output handling).
setupTestResults()
# If we are running as the multiprocess test runner, kick off the
# multiprocess test runner here.
if isMultiprocessTestRunner():
import dosep
dosep.main(output_on_success, num_threads, multiprocess_test_subdir,
test_runner_name)
test_runner_name, results_formatter_object)
raise Exception("should never get here")
elif is_inferior_test_runner:
# Shut off Ctrl-C processing in inferiors. The parallel
# test runner handles this more holistically.
signal.signal(signal.SIGINT, signal.SIG_IGN)
setupSysPath()
setupCrashInfoHook()
@ -1653,6 +1774,7 @@ if __name__ == "__main__":
self.progressbar = progress.ProgressWithEvents(stdout=self.stream,start=0,end=suite.countTestCases(),width=width-10)
except:
self.progressbar = None
self.results_formatter = results_formatter_object
def _config_string(self, test):
compiler = getattr(test, "getCompiler", None)
@ -1729,12 +1851,18 @@ if __name__ == "__main__":
if self.showAll:
self.stream.write(self.fmt % self.counter)
super(LLDBTestResult, self).startTest(test)
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_start(test))
def addSuccess(self, test):
global parsable
super(LLDBTestResult, self).addSuccess(test)
if parsable:
self.stream.write("PASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_success(test))
def addError(self, test, err):
global sdir_has_content
@ -1746,6 +1874,9 @@ if __name__ == "__main__":
method()
if parsable:
self.stream.write("FAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_error(test, err))
def addCleanupError(self, test, err):
global sdir_has_content
@ -1757,6 +1888,10 @@ if __name__ == "__main__":
method()
if parsable:
self.stream.write("CLEANUP ERROR: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_cleanup_error(
test, err))
def addFailure(self, test, err):
global sdir_has_content
@ -1776,6 +1911,10 @@ if __name__ == "__main__":
failuresPerCategory[category] = failuresPerCategory[category] + 1
else:
failuresPerCategory[category] = 1
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_failure(test, err))
def addExpectedFailure(self, test, err, bugnumber):
global sdir_has_content
@ -1787,6 +1926,10 @@ if __name__ == "__main__":
method(err, bugnumber)
if parsable:
self.stream.write("XFAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_expected_failure(
test, err, bugnumber))
def addSkip(self, test, reason):
global sdir_has_content
@ -1798,6 +1941,9 @@ if __name__ == "__main__":
method()
if parsable:
self.stream.write("UNSUPPORTED: LLDB (%s) :: %s (%s) \n" % (self._config_string(test), str(test), reason))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_skip(test, reason))
def addUnexpectedSuccess(self, test, bugnumber):
global sdir_has_content
@ -1809,6 +1955,11 @@ if __name__ == "__main__":
method(bugnumber)
if parsable:
self.stream.write("XPASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test)))
if self.results_formatter:
self.results_formatter.process_event(
EventBuilder.event_for_unexpected_success(
test, bugnumber))
if parsable:
v = 0

View File

@ -140,6 +140,31 @@ def create_parser():
' multiprocessing-pool, serial, threading, threading-pool')
)
# Test results support.
group = parser.add_argument_group('Test results options')
group.add_argument(
'--results-file',
action='store',
help=('Specifies the file where test results will be written '
'according to the results-formatter class used'))
group.add_argument(
'--results-port',
action='store',
type=int,
help=('Specifies the localhost port to which the results '
'formatted output should be sent'))
group.add_argument(
'--results-formatter',
action='store',
help=('Specifies the full package/module/class name used to translate '
'test events into some kind of meaningful report, written to '
'the designated output results file-like object'))
group.add_argument(
'--results-formatter-options',
action='store',
help=('Specify comma-separated options to pass to the formatter. '
'Use --results-formatter-options="--option1[,--option2[,...]]" '
'syntax. Note the "=" is critical, and don\'t use whitespace.'))
# Remove the reference to our helper function
del X

View File

@ -0,0 +1,173 @@
"""
The LLVM Compiler Infrastructure
This file is distributed under the University of Illinois Open Source
License. See LICENSE.TXT for details.
Sync lldb and related source from a local machine to a remote machine.
This facilitates working on the lldb sourcecode on multiple machines
and multiple OS types, verifying changes across all.
This module provides asyncore channels used within the LLDB test
framework.
"""
import asyncore
import cPickle
import socket
class UnpicklingForwardingReaderChannel(asyncore.dispatcher):
"""Provides an unpickling, forwarding asyncore dispatch channel reader.
Inferior dotest.py processes with side-channel-based test results will
send test result event data in a pickled format, one event at a time.
This class supports reconstructing the pickled data and forwarding it
on to its final destination.
The channel data is written in the form:
{num_payload_bytes}#{payload_bytes}
The bulk of this class is devoted to reading and parsing out
the payload bytes.
"""
def __init__(self, file_object, async_map, forwarding_func):
asyncore.dispatcher.__init__(self, sock=file_object, map=async_map)
self.header_contents = ''
self.packet_bytes_remaining = 0
self.reading_header = True
self.ibuffer = ''
self.forwarding_func = forwarding_func
if forwarding_func is None:
# This whole class is useless if we do nothing with the
# unpickled results.
raise Exception("forwarding function must be set")
def deserialize_payload(self):
"""Unpickles the collected input buffer bytes and forwards."""
if len(self.ibuffer) > 0:
self.forwarding_func(cPickle.loads(self.ibuffer))
self.ibuffer = ''
def consume_header_bytes(self, data):
"""Consumes header bytes from the front of data.
@param data the incoming data stream bytes
@return any data leftover after consuming header bytes.
"""
# We're done if there is no content.
if not data or (len(data) == 0):
return None
for index in range(len(data)):
byte = data[index]
if byte != '#':
# Header byte.
self.header_contents += byte
else:
# End of header.
self.packet_bytes_remaining = int(self.header_contents)
self.header_contents = ''
self.reading_header = False
return data[(index+1):]
# If we made it here, we've exhausted the data and
# we're still parsing header content.
return None
def consume_payload_bytes(self, data):
"""Consumes payload bytes from the front of data.
@param data the incoming data stream bytes
@return any data leftover after consuming remaining payload bytes.
"""
if not data or (len(data) == 0):
# We're done and there's nothing to do.
return None
data_len = len(data)
if data_len <= self.packet_bytes_remaining:
# We're consuming all the data provided.
self.ibuffer += data
self.packet_bytes_remaining -= data_len
# If we're no longer waiting for payload bytes,
# we flip back to parsing header bytes and we
# unpickle the payload contents.
if self.packet_bytes_remaining < 1:
self.reading_header = True
self.deserialize_payload()
# We're done, no more data left.
return None
else:
# We're only consuming a portion of the data since
# the data contains more than the payload amount.
self.ibuffer += data[:self.packet_bytes_remaining]
data = data[self.packet_bytes_remaining:]
# We now move on to reading the header.
self.reading_header = True
self.packet_bytes_remaining = 0
# And we can deserialize the payload.
self.deserialize_payload()
# Return the remaining data.
return data
def handle_read(self):
data = self.recv(8192)
# print 'driver socket READ: %d bytes' % len(data)
while data and (len(data) > 0):
# If we're reading the header, gather header bytes.
if self.reading_header:
data = self.consume_header_bytes(data)
else:
data = self.consume_payload_bytes(data)
def handle_close(self):
# print "socket reader: closing port"
self.close()
class UnpicklingForwardingListenerChannel(asyncore.dispatcher):
"""Provides a socket listener asyncore channel for unpickling/forwarding.
This channel will listen on a socket port (use 0 for host-selected). Any
client that connects will have an UnpicklingForwardingReaderChannel handle
communication over the connection.
The dotest parallel test runners, when collecting test results, open the
test results side channel over a socket. This channel handles connections
from inferiors back to the test runner. Each worker fires up a listener
for each inferior invocation. This simplifies the asyncore.loop() usage,
one of the reasons for implementing with asyncore. This listener shuts
down once a single connection is made to it.
"""
def __init__(self, async_map, host, port, forwarding_func):
asyncore.dispatcher.__init__(self, map=async_map)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.address = self.socket.getsockname()
self.listen(5)
self.handler = None
self.async_map = async_map
self.forwarding_func = forwarding_func
if forwarding_func is None:
# This whole class is useless if we do nothing with the
# unpickled results.
raise Exception("forwarding function must be set")
def handle_accept(self):
(sock, addr) = self.socket.accept()
if sock and addr:
# print 'Incoming connection from %s' % repr(addr)
self.handler = UnpicklingForwardingReaderChannel(
sock, self.async_map, self.forwarding_func)
def handle_close(self):
self.close()

View File

@ -165,7 +165,7 @@ class SettingsCommandTestCase(TestBase):
self.expect("settings show auto-confirm", SETTING_MSG("auto-confirm"),
startstr = "auto-confirm (boolean) = false")
@skipUnlessArch(['x86-64', 'i386', 'i686'])
@skipUnlessArch(['x86_64', 'i386', 'i686'])
def test_disassembler_settings(self):
"""Test that user options for the disassembler take effect."""
self.buildDefault()
@ -449,7 +449,7 @@ class SettingsCommandTestCase(TestBase):
SETTING_MSG("disassembly-format"),
substrs = [ 'disassembly-format (format-string) = "foo "'])
self.runCmd("settings clear disassembly-format", check=False)
def test_all_settings_exist (self):
self.expect ("settings show",
substrs = [ "auto-confirm",

778
lldb/test/test_results.py Normal file
View File

@ -0,0 +1,778 @@
"""
The LLVM Compiler Infrastructure
This file is distributed under the University of Illinois Open Source
License. See LICENSE.TXT for details.
Provides classes used by the test results reporting infrastructure
within the LLDB test suite.
"""
import argparse
import cPickle
import inspect
import os
import sys
import threading
import time
import xml.sax.saxutils
class EventBuilder(object):
"""Helper class to build test result event dictionaries."""
@staticmethod
def _get_test_name_info(test):
"""Returns (test-class-name, test-method-name) from a test case instance.
@param test a unittest.TestCase instance.
@return tuple containing (test class name, test method name)
"""
test_class_components = test.id().split(".")
test_class_name = ".".join(test_class_components[:-1])
test_name = test_class_components[-1]
return (test_class_name, test_name)
@staticmethod
def _event_dictionary_common(test, event_type):
"""Returns an event dictionary setup with values for the given event type.
@param test the unittest.TestCase instance
@param event_type the name of the event type (string).
@return event dictionary with common event fields set.
"""
test_class_name, test_name = EventBuilder._get_test_name_info(test)
return {
"event": event_type,
"test_class": test_class_name,
"test_name": test_name,
"event_time": time.time()
}
@staticmethod
def _error_tuple_class(error_tuple):
"""Returns the unittest error tuple's error class as a string.
@param error_tuple the error tuple provided by the test framework.
@return the error type (typically an exception) raised by the
test framework.
"""
type_var = error_tuple[0]
module = inspect.getmodule(type_var)
if module:
return "{}.{}".format(module.__name__, type_var.__name__)
else:
return type_var.__name__
@staticmethod
def _error_tuple_message(error_tuple):
"""Returns the unittest error tuple's error message.
@param error_tuple the error tuple provided by the test framework.
@return the error message provided by the test framework.
"""
return str(error_tuple[1])
@staticmethod
def _event_dictionary_test_result(test, status):
"""Returns an event dictionary with common test result fields set.
@param test a unittest.TestCase instance.
@param status the status/result of the test
(e.g. "success", "failure", etc.)
@return the event dictionary
"""
event = EventBuilder._event_dictionary_common(test, "test_result")
event["status"] = status
return event
@staticmethod
def _event_dictionary_issue(test, status, error_tuple):
"""Returns an event dictionary with common issue-containing test result
fields set.
@param test a unittest.TestCase instance.
@param status the status/result of the test
(e.g. "success", "failure", etc.)
@param error_tuple the error tuple as reported by the test runner.
This is of the form (type<error>, error).
@return the event dictionary
"""
event = EventBuilder._event_dictionary_test_result(test, status)
event["issue_class"] = EventBuilder._error_tuple_class(error_tuple)
event["issue_message"] = EventBuilder._error_tuple_message(error_tuple)
return event
@staticmethod
def event_for_start(test):
"""Returns an event dictionary for the test start event.
@param test a unittest.TestCase instance.
@return the event dictionary
"""
return EventBuilder._event_dictionary_common(test, "test_start")
@staticmethod
def event_for_success(test):
"""Returns an event dictionary for a successful test.
@param test a unittest.TestCase instance.
@return the event dictionary
"""
return EventBuilder._event_dictionary_test_result(test, "success")
@staticmethod
def event_for_unexpected_success(test, bugnumber):
"""Returns an event dictionary for a test that succeeded but was
expected to fail.
@param test a unittest.TestCase instance.
@param bugnumber the issue identifier for the bug tracking the
fix request for the test expected to fail (but is in fact
passing here).
@return the event dictionary
"""
event = EventBuilder._event_dictionary_test_result(
test, "unexpected_success")
if bugnumber:
event["bugnumber"] = str(bugnumber)
return event
@staticmethod
def event_for_failure(test, error_tuple):
"""Returns an event dictionary for a test that failed.
@param test a unittest.TestCase instance.
@param error_tuple the error tuple as reported by the test runner.
This is of the form (type<error>, error).
@return the event dictionary
"""
return EventBuilder._event_dictionary_issue(
test, "failure", error_tuple)
@staticmethod
def event_for_expected_failure(test, error_tuple, bugnumber):
"""Returns an event dictionary for a test that failed as expected.
@param test a unittest.TestCase instance.
@param error_tuple the error tuple as reported by the test runner.
This is of the form (type<error>, error).
@param bugnumber the issue identifier for the bug tracking the
fix request for the test expected to fail.
@return the event dictionary
"""
event = EventBuilder._event_dictionary_issue(
test, "expected_failure", error_tuple)
if bugnumber:
event["bugnumber"] = str(bugnumber)
return event
@staticmethod
def event_for_skip(test, reason):
"""Returns an event dictionary for a test that was skipped.
@param test a unittest.TestCase instance.
@param reason the reason why the test is being skipped.
@return the event dictionary
"""
event = EventBuilder._event_dictionary_test_result(test, "skip")
event["skip_reason"] = reason
return event
@staticmethod
def event_for_error(test, error_tuple):
"""Returns an event dictionary for a test that hit a test execution error.
@param test a unittest.TestCase instance.
@param error_tuple the error tuple as reported by the test runner.
This is of the form (type<error>, error).
@return the event dictionary
"""
return EventBuilder._event_dictionary_issue(test, "error", error_tuple)
@staticmethod
def event_for_cleanup_error(test, error_tuple):
"""Returns an event dictionary for a test that hit a test execution error
during the test cleanup phase.
@param test a unittest.TestCase instance.
@param error_tuple the error tuple as reported by the test runner.
This is of the form (type<error>, error).
@return the event dictionary
"""
event = EventBuilder._event_dictionary_issue(
test, "error", error_tuple)
event["issue_phase"] = "cleanup"
return event
class ResultsFormatter(object):
"""Provides interface to formatting test results out to a file-like object.
This class allows the LLDB test framework's raw test-realted
events to be processed and formatted in any manner desired.
Test events are represented by python dictionaries, formatted
as in the EventBuilder class above.
ResultFormatter instances are given a file-like object in which
to write their results.
ResultFormatter lifetime looks like the following:
# The result formatter is created.
# The argparse options dictionary is generated from calling
# the SomeResultFormatter.arg_parser() with the options data
# passed to dotest.py via the "--results-formatter-options"
# argument. See the help on that for syntactic requirements
# on getting that parsed correctly.
formatter = SomeResultFormatter(file_like_object, argpared_options_dict)
# Single call to session start, before parsing any events.
formatter.begin_session()
# Zero or more calls specified for events recorded during the test session.
# The parallel test runner manages getting results from all the inferior
# dotest processes, so from a new format perspective, don't worry about
# that. The formatter will be presented with a single stream of events
# sandwiched between a single begin_session()/end_session() pair in the
# parallel test runner process/thread.
for event in zero_or_more_test_events():
formatter.process_event(event)
# Single call to session end. Formatters that need all the data before
# they can print a correct result (e.g. xUnit/JUnit), this is where
# the final report can be generated.
formatter.end_session()
It is not the formatter's responsibility to close the file_like_object.
(i.e. do not close it).
The lldb test framework passes these test events in real time, so they
arrive as they come in.
In the case of the parallel test runner, the dotest inferiors
add a 'pid' field to the dictionary that indicates which inferior
pid generated the event.
Note more events may be added in the future to support richer test
reporting functionality. One example: creating a true flaky test
result category so that unexpected successes really mean the test
is marked incorrectly (either should be marked flaky, or is indeed
passing consistently now and should have the xfail marker
removed). In this case, a flaky_success and flaky_fail event
likely will be added to capture these and support reporting things
like percentages of flaky test passing so we can see if we're
making some things worse/better with regards to failure rates.
Another example: announcing all the test methods that are planned
to be run, so we can better support redo operations of various kinds
(redo all non-run tests, redo non-run tests except the one that
was running [perhaps crashed], etc.)
Implementers are expected to override all the public methods
provided in this class. See each method's docstring to see
expectations about when the call should be chained.
"""
@classmethod
def arg_parser(cls):
"""@return arg parser used to parse formatter-specific options."""
parser = argparse.ArgumentParser(
description='{} options'.format(cls.__name__),
usage=('dotest.py --results-formatter-options='
'"--option1 value1 [--option2 value2 [...]]"'))
return parser
def __init__(self, out_file, options):
super(ResultsFormatter, self).__init__()
self.out_file = out_file
self.options = options
if not self.out_file:
raise Exception("ResultsFormatter created with no file object")
self.start_time_by_test = {}
# Lock that we use while mutating inner state, like the
# total test count and the elements. We minimize how
# long we hold the lock just to keep inner state safe, not
# entirely consistent from the outside.
self.lock = threading.Lock()
def begin_session(self):
"""Begins a test session.
All process_event() calls must be sandwiched between
begin_session() and end_session() calls.
Derived classes may override this but should call this first.
"""
pass
def end_session(self):
"""Ends a test session.
All process_event() calls must be sandwiched between
begin_session() and end_session() calls.
All results formatting should be sent to the output
file object by the end of this call.
Derived classes may override this but should call this after
the dervied class's behavior is complete.
"""
pass
def process_event(self, test_event):
"""Processes the test event for collection into the formatter output.
Derived classes may override this but should call down to this
implementation first.
@param test_event the test event as formatted by one of the
event_for_* calls.
"""
pass
def track_start_time(self, test_class, test_name, start_time):
"""Tracks the start time of a test so elapsed time can be computed.
This alleviates the need for test results to be processed serially
by test. It will save the start time for the test so that
elapsed_time_for_test() can compute the elapsed time properly.
"""
if test_class is None or test_name is None:
return
test_key = "{}.{}".format(test_class, test_name)
with self.lock:
self.start_time_by_test[test_key] = start_time
def elapsed_time_for_test(self, test_class, test_name, end_time):
"""Returns the elapsed time for a test.
This function can only be called once per test and requires that
the track_start_time() method be called sometime prior to calling
this method.
"""
if test_class is None or test_name is None:
return -2.0
test_key = "{}.{}".format(test_class, test_name)
with self.lock:
if test_key not in self.start_time_by_test:
return -1.0
else:
start_time = self.start_time_by_test[test_key]
del self.start_time_by_test[test_key]
return end_time - start_time
class XunitFormatter(ResultsFormatter):
"""Provides xUnit-style formatted output.
"""
# Result mapping arguments
RM_IGNORE = 'ignore'
RM_SUCCESS = 'success'
RM_FAILURE = 'failure'
RM_PASSTHRU = 'passthru'
@staticmethod
def _quote_attribute(text):
"""Returns the given text in a manner safe for usage in an XML attribute.
@param text the text that should appear within an XML attribute.
@return the attribute-escaped version of the input text.
"""
return xml.sax.saxutils.quoteattr(text)
@classmethod
def arg_parser(cls):
"""@return arg parser used to parse formatter-specific options."""
parser = super(XunitFormatter, cls).arg_parser()
# These are valid choices for results mapping.
results_mapping_choices = [
XunitFormatter.RM_IGNORE,
XunitFormatter.RM_SUCCESS,
XunitFormatter.RM_FAILURE,
XunitFormatter.RM_PASSTHRU]
parser.add_argument(
"--xpass", action="store", choices=results_mapping_choices,
default=XunitFormatter.RM_FAILURE,
help=('specify mapping from unexpected success to jUnit/xUnit '
'result type'))
parser.add_argument(
"--xfail", action="store", choices=results_mapping_choices,
default=XunitFormatter.RM_IGNORE,
help=('specify mapping from expected failure to jUnit/xUnit '
'result type'))
return parser
def __init__(self, out_file, options):
"""Initializes the XunitFormatter instance.
@param out_file file-like object where formatted output is written.
@param options_dict specifies a dictionary of options for the
formatter.
"""
# Initialize the parent
super(XunitFormatter, self).__init__(out_file, options)
self.text_encoding = "UTF-8"
self.total_test_count = 0
self.elements = {
"successes": [],
"errors": [],
"failures": [],
"skips": [],
"unexpected_successes": [],
"expected_failures": [],
"all": []
}
self.status_handlers = {
"success": self._handle_success,
"failure": self._handle_failure,
"error": self._handle_error,
"skip": self._handle_skip,
"expected_failure": self._handle_expected_failure,
"unexpected_success": self._handle_unexpected_success
}
def begin_session(self):
super(XunitFormatter, self).begin_session()
def process_event(self, test_event):
super(XunitFormatter, self).process_event(test_event)
event_type = test_event["event"]
if event_type is None:
return
if event_type == "test_start":
self.track_start_time(
test_event["test_class"],
test_event["test_name"],
test_event["event_time"])
elif event_type == "test_result":
self._process_test_result(test_event)
else:
sys.stderr.write("unknown event type {} from {}\n".format(
event_type, test_event))
def _handle_success(self, test_event):
"""Handles a test success.
@param test_event the test event to handle.
"""
result = self._common_add_testcase_entry(test_event)
with self.lock:
self.elements["successes"].append(result)
def _handle_failure(self, test_event):
"""Handles a test failure.
@param test_event the test event to handle.
"""
result = self._common_add_testcase_entry(
test_event,
inner_content='<failure type={} message={} />'.format(
XunitFormatter._quote_attribute(test_event["issue_class"]),
XunitFormatter._quote_attribute(test_event["issue_message"])))
with self.lock:
self.elements["failures"].append(result)
def _handle_error(self, test_event):
"""Handles a test error.
@param test_event the test event to handle.
"""
result = self._common_add_testcase_entry(
test_event,
inner_content='<error type={} message={} />'.format(
XunitFormatter._quote_attribute(test_event["issue_class"]),
XunitFormatter._quote_attribute(test_event["issue_message"])))
with self.lock:
self.elements["errors"].append(result)
def _handle_skip(self, test_event):
"""Handles a skipped test.
@param test_event the test event to handle.
"""
result = self._common_add_testcase_entry(
test_event,
inner_content='<skipped message={} />'.format(
XunitFormatter._quote_attribute(test_event["skip_reason"])))
with self.lock:
self.elements["skips"].append(result)
def _handle_expected_failure(self, test_event):
"""Handles a test that failed as expected.
@param test_event the test event to handle.
"""
if self.options.xfail == XunitFormatter.RM_PASSTHRU:
# This is not a natively-supported junit/xunit
# testcase mode, so it might fail a validating
# test results viewer.
if "bugnumber" in test_event:
bug_id_attribute = 'bug-id={} '.format(
XunitFormatter._quote_attribute(test_event["bugnumber"]))
else:
bug_id_attribute = ''
result = self._common_add_testcase_entry(
test_event,
inner_content=(
'<expected-failure {}type={} message={} />'.format(
bug_id_attribute,
XunitFormatter._quote_attribute(
test_event["issue_class"]),
XunitFormatter._quote_attribute(
test_event["issue_message"]))
))
with self.lock:
self.elements["expected_failures"].append(result)
elif self.options.xfail == XunitFormatter.RM_SUCCESS:
result = self._common_add_testcase_entry(test_event)
with self.lock:
self.elements["successes"].append(result)
elif self.options.xfail == XunitFormatter.RM_FAILURE:
result = self._common_add_testcase_entry(
test_event,
inner_content='<failure type={} message={} />'.format(
XunitFormatter._quote_attribute(test_event["issue_class"]),
XunitFormatter._quote_attribute(
test_event["issue_message"])))
with self.lock:
self.elements["failures"].append(result)
elif self.options.xfail == XunitFormatter.RM_IGNORE:
pass
else:
raise Exception(
"unknown xfail option: {}".format(self.options.xfail))
def _handle_unexpected_success(self, test_event):
"""Handles a test that passed but was expected to fail.
@param test_event the test event to handle.
"""
if self.options.xpass == XunitFormatter.RM_PASSTHRU:
# This is not a natively-supported junit/xunit
# testcase mode, so it might fail a validating
# test results viewer.
result = self._common_add_testcase_entry(
test_event,
inner_content=("<unexpected-success />"))
with self.lock:
self.elements["unexpected_successes"].append(result)
elif self.options.xpass == XunitFormatter.RM_SUCCESS:
# Treat the xpass as a success.
result = self._common_add_testcase_entry(test_event)
with self.lock:
self.elements["successes"].append(result)
elif self.options.xpass == XunitFormatter.RM_FAILURE:
# Treat the xpass as a failure.
if "bugnumber" in test_event:
message = "unexpected success (bug_id:{})".format(
test_event["bugnumber"])
else:
message = "unexpected success (bug_id:none)"
result = self._common_add_testcase_entry(
test_event,
inner_content='<failure type={} message={} />'.format(
XunitFormatter._quote_attribute("unexpected_success"),
XunitFormatter._quote_attribute(message)))
with self.lock:
self.elements["failures"].append(result)
elif self.options.xpass == XunitFormatter.RM_IGNORE:
# Ignore the xpass result as far as xUnit reporting goes.
pass
else:
raise Exception("unknown xpass option: {}".format(
self.options.xpass))
def _process_test_result(self, test_event):
"""Processes the test_event known to be a test result.
This categorizes the event appropriately and stores the data needed
to generate the final xUnit report. This method skips events that
cannot be represented in xUnit output.
"""
if "status" not in test_event:
raise Exception("test event dictionary missing 'status' key")
status = test_event["status"]
if status not in self.status_handlers:
raise Exception("test event status '{}' unsupported".format(
status))
# Call the status handler for the test result.
self.status_handlers[status](test_event)
def _common_add_testcase_entry(self, test_event, inner_content=None):
"""Registers a testcase result, and returns the text created.
The caller is expected to manage failure/skip/success counts
in some kind of appropriate way. This call simply constructs
the XML and appends the returned result to the self.all_results
list.
@param test_event the test event dictionary.
@param inner_content if specified, gets included in the <testcase>
inner section, at the point before stdout and stderr would be
included. This is where a <failure/>, <skipped/>, <error/>, etc.
could go.
@return the text of the xml testcase element.
"""
# Get elapsed time.
test_class = test_event["test_class"]
test_name = test_event["test_name"]
event_time = test_event["event_time"]
time_taken = self.elapsed_time_for_test(
test_class, test_name, event_time)
# Plumb in stdout/stderr once we shift over to only test results.
test_stdout = ''
test_stderr = ''
# Formulate the output xml.
if not inner_content:
inner_content = ""
result = (
'<testcase classname="{}" name="{}" time="{:.3f}">'
'{}{}{}</testcase>'.format(
test_class,
test_name,
time_taken,
inner_content,
test_stdout,
test_stderr))
# Save the result, update total test count.
with self.lock:
self.total_test_count += 1
self.elements["all"].append(result)
return result
def _end_session_internal(self):
"""Flushes out the report of test executions to form valid xml output.
xUnit output is in XML. The reporting system cannot complete the
formatting of the output without knowing when there is no more input.
This call addresses notifcation of the completed test run and thus is
when we can finish off the report output.
"""
# Figure out the counts line for the testsuite. If we have
# been counting either unexpected successes or expected
# failures, we'll output those in the counts, at the risk of
# being invalidated by a validating test results viewer.
# These aren't counted by default so they won't show up unless
# the user specified a formatter option to include them.
xfail_count = len(self.elements["expected_failures"])
xpass_count = len(self.elements["unexpected_successes"])
if xfail_count > 0 or xpass_count > 0:
extra_testsuite_attributes = (
' expected-failures="{}"'
' unexpected-successes="{}"'.format(xfail_count, xpass_count))
else:
extra_testsuite_attributes = ""
# Output the header.
self.out_file.write(
'<?xml version="1.0" encoding="{}"?>\n'
'<testsuite name="{}" tests="{}" errors="{}" failures="{}" '
'skip="{}"{}>\n'.format(
self.text_encoding,
"LLDB test suite",
self.total_test_count,
len(self.elements["errors"]),
len(self.elements["failures"]),
len(self.elements["skips"]),
extra_testsuite_attributes))
# Output each of the test result entries.
for result in self.elements["all"]:
self.out_file.write(result + '\n')
# Close off the test suite.
self.out_file.write('</testsuite>\n')
super(XunitFormatter, self).end_session()
def end_session(self):
with self.lock:
self._end_session_internal()
class RawPickledFormatter(ResultsFormatter):
"""Formats events as a pickled stream.
The parallel test runner has inferiors pickle their results and send them
over a socket back to the parallel test. The parallel test runner then
aggregates them into the final results formatter (e.g. xUnit).
"""
@classmethod
def arg_parser(cls):
"""@return arg parser used to parse formatter-specific options."""
parser = super(RawPickledFormatter, cls).arg_parser()
return parser
def __init__(self, out_file, options):
super(RawPickledFormatter, self).__init__(out_file, options)
self.pid = os.getpid()
def begin_session(self):
super(RawPickledFormatter, self).begin_session()
self.process_event({
"event": "session_begin",
"event_time": time.time(),
"pid": self.pid
})
def process_event(self, test_event):
super(RawPickledFormatter, self).process_event(test_event)
# Add pid to the event for tracking.
# test_event["pid"] = self.pid
# Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes}
pickled_message = cPickle.dumps(test_event)
self.out_file.send(
"{}#{}".format(len(pickled_message), pickled_message))
def end_session(self):
self.process_event({
"event": "session_end",
"event_time": time.time(),
"pid": self.pid
})
super(RawPickledFormatter, self).end_session()