add hook for calling platform-dependent pre-kill action on a timed out test

differential review: https://reviews.llvm.org/D24850

reviewers: clayborg, labath
llvm-svn: 282258
This commit is contained in:
Todd Fiala 2016-09-23 16:10:01 +00:00
parent e9eb0913a9
commit 72525622b2
7 changed files with 351 additions and 27 deletions

View File

@ -0,0 +1,55 @@
# pre\_kill\_hook package
## Overview
The pre\_kill\_hook package provides a per-platform method for running code
after a test process times out but before the concurrent test runner kills the
timed-out process.
## Detailed Description of Usage
If a platform defines the hook, then the hook gets called right after a timeout
is detected in a test run, but before the process is killed.
The pre-kill-hook mechanism works as follows:
* When a timeout is detected in the process_control.ProcessDriver class that
runs the per-test lldb process, a new overridable on\_timeout\_pre\_kill() method
is called on the ProcessDriver instance.
* The concurrent test driver's derived ProcessDriver overrides this method. It
looks to see if a module called
"lldbsuite.pre\_kill\_hook.{platform-system-name}" module exists, where
platform-system-name is replaced with platform.system().lower(). (e.g.
"Darwin" becomes the darwin.py module).
* If that module doesn't exist, the rest of the new behavior is skipped.
* If that module does exist, it is loaded, and the method
"do\_pre\_kill(process\_id, context\_dict, output\_stream)" is called. If
that method throws an exception, we log it and we ignore further processing
of the pre-killed process.
* The process\_id argument of the do\_pre\_kill function is the process id as
returned by the ProcessDriver.pid property.
* The output\_stream argument of the do\_pre\_kill function takes a file-like
object. Output to be collected from doing any processing on the
process-to-be-killed should be written into the file-like object. The
current impl uses a six.StringIO and then writes this output to
{TestFilename}-{pid}.sample in the session directory.
* Platforms where platform.system() is "Darwin" will get a pre-kill action that
runs the 'sample' program on the lldb that has timed out. That data will be
collected on CI and analyzed to determine what is happening during timeouts.
(This has an advantage over a core in that it is much smaller and that it
clearly demonstrates any liveness of the process, if there is any).
## Running the tests
To run the tests in the pre\_kill\_hook package, open a console, change into
this directory and run the following:
```
python -m unittest discover
```

View File

@ -0,0 +1 @@
"""Initialize the package."""

View File

@ -0,0 +1,46 @@
"""Provides a pre-kill method to run on macOS."""
from __future__ import print_function
# system imports
import subprocess
import sys
# third-party module imports
import six
def do_pre_kill(process_id, runner_context, output_stream, sample_time=3):
"""Samples the given process id, and puts the output to output_stream.
@param process_id the local process to sample.
@param runner_context a dictionary of details about the architectures
and platform on which the given process is running. Expected keys are
archs (array of architectures), platform_name, platform_url, and
platform_working_dir.
@param output_stream file-like object that should be used to write the
results of sampling.
@param sample_time specifies the time in seconds that should be captured.
"""
# Validate args.
if runner_context is None:
raise Exception("runner_context argument is required")
if not isinstance(runner_context, dict):
raise Exception("runner_context argument must be a dictionary")
# We will try to run sample on the local host only if there is no URL
# to a remote.
if "platform_url" in runner_context and (
runner_context["platform_url"] is not None):
import pprint
sys.stderr.write(
"warning: skipping timeout pre-kill sample invocation because we "
"don't know how to run on a remote yet. runner_context={}\n"
.format(pprint.pformat(runner_context)))
output = subprocess.check_output(['sample', six.text_type(process_id),
str(sample_time)])
output_stream.write(output)

View File

@ -0,0 +1,107 @@
"""Test the pre-kill hook on Darwin."""
from __future__ import print_function
# system imports
from multiprocessing import Process, Queue
import platform
import re
from unittest import main, TestCase
# third party
from six import StringIO
def do_child_process(child_work_queue, parent_work_queue, verbose):
import os
pid = os.getpid()
if verbose:
print("child: pid {} started, sending to parent".format(pid))
parent_work_queue.put(pid)
if verbose:
print("child: waiting for shut-down request from parent")
child_work_queue.get()
if verbose:
print("child: received shut-down request. Child exiting.")
class DarwinPreKillTestCase(TestCase):
def __init__(self, methodName):
super(DarwinPreKillTestCase, self).__init__(methodName)
self.process = None
self.child_work_queue = None
self.verbose = False
def tearDown(self):
if self.verbose:
print("parent: sending shut-down request to child")
if self.process:
self.child_work_queue.put("hello, child")
self.process.join()
if self.verbose:
print("parent: child is fully shut down")
def test_sample(self):
# Ensure we're Darwin.
if platform.system() != 'Darwin':
self.skipTest("requires a Darwin-based OS")
# Start the child process.
self.child_work_queue = Queue()
parent_work_queue = Queue()
self.process = Process(target=do_child_process,
args=(self.child_work_queue, parent_work_queue,
self.verbose))
if self.verbose:
print("parent: starting child")
self.process.start()
# Wait for the child to report its pid. Then we know we're running.
if self.verbose:
print("parent: waiting for child to start")
child_pid = parent_work_queue.get()
# Sample the child process.
from darwin import do_pre_kill
context_dict = {
"archs": [platform.machine()],
"platform_name": None,
"platform_url": None,
"platform_working_dir": None
}
if self.verbose:
print("parent: running pre-kill action on child")
output_io = StringIO()
do_pre_kill(child_pid, context_dict, output_io)
output = output_io.getvalue()
if self.verbose:
print("parent: do_pre_kill() wrote the following output:", output)
self.assertIsNotNone(output)
# We should have a line with:
# Process: .* [{pid}]
process_re = re.compile(r"Process:[^[]+\[([^]]+)\]")
match = process_re.search(output)
self.assertIsNotNone(match, "should have found process id for "
"sampled process")
self.assertEqual(1, len(match.groups()))
self.assertEqual(child_pid, int(match.group(1)))
# We should see a Call graph: section.
callgraph_re = re.compile(r"Call graph:")
match = callgraph_re.search(output)
self.assertIsNotNone(match, "should have found the Call graph section"
"in sample output")
# We should see a Binary Images: section.
binary_images_re = re.compile(r"Binary Images:")
match = binary_images_re.search(output)
self.assertIsNotNone(match, "should have found the Binary Images "
"section in sample output")
if __name__ == "__main__":
main()

View File

@ -46,6 +46,7 @@ import signal
import sys
import threading
from six import StringIO
from six.moves import queue
# Our packages and modules
@ -64,6 +65,8 @@ from .test_runner import process_control
# Status codes for running command with timeout.
eTimedOut, ePassed, eFailed = 124, 0, 1
g_session_dir = None
g_runner_context = None
output_lock = None
test_counter = None
total_tests = None
@ -227,6 +230,39 @@ class DoTestProcessDriver(process_control.ProcessDriver):
failures,
unexpected_successes)
def on_timeout_pre_kill(self):
# We're just about to have a timeout take effect. Here's our chance
# to do a pre-kill action.
# For now, we look to see if the lldbsuite.pre_kill module has a
# runner for our platform.
module_name = "lldbsuite.pre_kill_hook." + platform.system().lower()
import importlib
try:
module = importlib.import_module(module_name)
except ImportError:
# We don't have one for this platform. Skip.
sys.stderr.write("\nwarning: no timeout handler module: " +
module_name)
return
# Try to run the pre-kill-hook method.
try:
# Run the pre-kill command.
output_io = StringIO()
module.do_pre_kill(self.pid, g_runner_context, output_io)
# Write the output to a filename associated with the test file and
# pid.
basename = "{}-{}.sample".format(self.file_name, self.pid)
sample_path = os.path.join(g_session_dir, basename)
with open(sample_path, "w") as output_file:
output_file.write(output_io.getvalue())
except Exception as e:
sys.stderr.write("caught exception while running "
"pre-kill action: {}".format(e))
return
def is_exceptional_exit(self):
"""Returns whether the process returned a timeout.
@ -635,12 +671,16 @@ def find_test_files_in_dir_tree(dir_root, found_func):
found_func(root, tests)
def initialize_global_vars_common(num_threads, test_work_items):
global total_tests, test_counter, test_name_len
def initialize_global_vars_common(num_threads, test_work_items, session_dir,
runner_context):
global g_session_dir, g_runner_context, total_tests, test_counter
global test_name_len
total_tests = sum([len(item[1]) for item in test_work_items])
test_counter = multiprocessing.Value('i', 0)
test_name_len = multiprocessing.Value('i', 0)
g_session_dir = session_dir
g_runner_context = runner_context
if not (RESULTS_FORMATTER and RESULTS_FORMATTER.is_using_terminal()):
print(
"Testing: %d test suites, %d thread%s" %
@ -652,20 +692,31 @@ def initialize_global_vars_common(num_threads, test_work_items):
update_progress()
def initialize_global_vars_multiprocessing(num_threads, test_work_items):
def initialize_global_vars_multiprocessing(num_threads, test_work_items,
session_dir, runner_context):
# Initialize the global state we'll use to communicate with the
# rest of the flat module.
global output_lock
output_lock = multiprocessing.RLock()
initialize_global_vars_common(num_threads, test_work_items)
initialize_global_vars_common(num_threads, test_work_items, session_dir,
runner_context)
def initialize_global_vars_threading(num_threads, test_work_items):
def initialize_global_vars_threading(num_threads, test_work_items, session_dir,
runner_context):
"""Initializes global variables used in threading mode.
@param num_threads specifies the number of workers used.
@param test_work_items specifies all the work items
that will be processed.
@param session_dir the session directory where test-run-speciif files are
written.
@param runner_context a dictionary of platform-related data that is passed
to the timeout pre-kill hook.
"""
# Initialize the global state we'll use to communicate with the
# rest of the flat module.
@ -686,7 +737,8 @@ def initialize_global_vars_threading(num_threads, test_work_items):
global GET_WORKER_INDEX
GET_WORKER_INDEX = get_worker_index_threading
initialize_global_vars_common(num_threads, test_work_items)
initialize_global_vars_common(num_threads, test_work_items, session_dir,
runner_context)
def ctrl_c_loop(main_op_func, done_func, ctrl_c_handler):
@ -833,7 +885,8 @@ def workers_and_async_done(workers, async_map):
return True
def multiprocessing_test_runner(num_threads, test_work_items):
def multiprocessing_test_runner(num_threads, test_work_items, session_dir,
runner_context):
"""Provides hand-wrapped pooling test runner adapter with Ctrl-C support.
This concurrent test runner is based on the multiprocessing
@ -847,10 +900,17 @@ def multiprocessing_test_runner(num_threads, test_work_items):
@param test_work_items the iterable of test work item tuples
to run.
@param session_dir the session directory where test-run-speciif files are
written.
@param runner_context a dictionary of platform-related data that is passed
to the timeout pre-kill hook.
"""
# Initialize our global state.
initialize_global_vars_multiprocessing(num_threads, test_work_items)
initialize_global_vars_multiprocessing(num_threads, test_work_items,
session_dir, runner_context)
# Create jobs.
job_queue = multiprocessing.Queue(len(test_work_items))
@ -955,9 +1015,11 @@ def map_async_run_loop(future, channel_map, listener_channel):
return map_results
def multiprocessing_test_runner_pool(num_threads, test_work_items):
def multiprocessing_test_runner_pool(num_threads, test_work_items, session_dir,
runner_context):
# Initialize our global state.
initialize_global_vars_multiprocessing(num_threads, test_work_items)
initialize_global_vars_multiprocessing(num_threads, test_work_items,
session_dir, runner_context)
manager = multiprocessing.Manager()
worker_index_map = manager.dict()
@ -975,7 +1037,8 @@ def multiprocessing_test_runner_pool(num_threads, test_work_items):
map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
def threading_test_runner(num_threads, test_work_items):
def threading_test_runner(num_threads, test_work_items, session_dir,
runner_context):
"""Provides hand-wrapped pooling threading-based test runner adapter
with Ctrl-C support.
@ -987,10 +1050,17 @@ def threading_test_runner(num_threads, test_work_items):
@param test_work_items the iterable of test work item tuples
to run.
"""
@param session_dir the session directory where test-run-speciif files are
written.
@param runner_context a dictionary of platform-related data that is passed
to the timeout pre-kill hook.
"""
# Initialize our global state.
initialize_global_vars_threading(num_threads, test_work_items)
initialize_global_vars_threading(num_threads, test_work_items, session_dir,
runner_context)
# Create jobs.
job_queue = queue.Queue()
@ -1038,9 +1108,11 @@ def threading_test_runner(num_threads, test_work_items):
return test_results
def threading_test_runner_pool(num_threads, test_work_items):
def threading_test_runner_pool(num_threads, test_work_items, session_dir,
runner_context):
# Initialize our global state.
initialize_global_vars_threading(num_threads, test_work_items)
initialize_global_vars_threading(num_threads, test_work_items, session_dir,
runner_context)
pool = multiprocessing.pool.ThreadPool(num_threads)
map_future = pool.map_async(
@ -1060,9 +1132,10 @@ def asyncore_run_loop(channel_map):
pass
def inprocess_exec_test_runner(test_work_items):
def inprocess_exec_test_runner(test_work_items, session_dir, runner_context):
# Initialize our global state.
initialize_global_vars_multiprocessing(1, test_work_items)
initialize_global_vars_multiprocessing(1, test_work_items, session_dir,
runner_context)
# We're always worker index 0
global GET_WORKER_INDEX
@ -1205,12 +1278,20 @@ def find(pattern, path):
return result
def get_test_runner_strategies(num_threads):
def get_test_runner_strategies(num_threads, session_dir, runner_context):
"""Returns the test runner strategies by name in a dictionary.
@param num_threads specifies the number of threads/processes
that will be used for concurrent test runners.
@param session_dir specifies the session dir to use for
auxiliary files.
@param runner_context a dictionary of details on the architectures and
platform used to run the test suite. This is passed along verbatim to
the timeout pre-kill handler, allowing that decoupled component to do
process inspection in a platform-specific way.
@return dictionary with key as test runner strategy name and
value set to a callable object that takes the test work item
and returns a test result tuple.
@ -1220,32 +1301,34 @@ def get_test_runner_strategies(num_threads):
# multiprocessing.Pool.
"multiprocessing":
(lambda work_items: multiprocessing_test_runner(
num_threads, work_items)),
num_threads, work_items, session_dir, runner_context)),
# multiprocessing-pool uses multiprocessing.Pool but
# does not support Ctrl-C.
"multiprocessing-pool":
(lambda work_items: multiprocessing_test_runner_pool(
num_threads, work_items)),
num_threads, work_items, session_dir, runner_context)),
# threading uses a hand-rolled worker pool much
# like multiprocessing, but instead uses in-process
# worker threads. This one supports Ctrl-C.
"threading":
(lambda work_items: threading_test_runner(num_threads, work_items)),
(lambda work_items: threading_test_runner(
num_threads, work_items, session_dir, runner_context)),
# threading-pool uses threading for the workers (in-process)
# and uses the multiprocessing.pool thread-enabled pool.
# This does not properly support Ctrl-C.
"threading-pool":
(lambda work_items: threading_test_runner_pool(
num_threads, work_items)),
num_threads, work_items, session_dir, runner_context)),
# serial uses the subprocess-based, single process
# test runner. This provides process isolation but
# no concurrent test execution.
"serial":
inprocess_exec_test_runner
(lambda work_items: inprocess_exec_test_runner(
work_items, session_dir, runner_context))
}
@ -1425,7 +1508,8 @@ def default_test_runner_name(num_threads):
return test_runner_name
def rerun_tests(test_subdir, tests_for_rerun, dotest_argv):
def rerun_tests(test_subdir, tests_for_rerun, dotest_argv, session_dir,
runner_context):
# Build the list of test files to rerun. Some future time we'll
# enable re-run by test method so we can constrain the rerun set
# to just the method(s) that were in issued within a file.
@ -1465,7 +1549,8 @@ def rerun_tests(test_subdir, tests_for_rerun, dotest_argv):
print("rerun will use the '{}' test runner strategy".format(
rerun_runner_name))
runner_strategies_by_name = get_test_runner_strategies(rerun_thread_count)
runner_strategies_by_name = get_test_runner_strategies(
rerun_thread_count, session_dir, runner_context)
rerun_runner_func = runner_strategies_by_name[
rerun_runner_name]
if rerun_runner_func is None:
@ -1546,8 +1631,19 @@ def main(num_threads, test_subdir, test_runner_name, results_formatter):
if results_formatter is not None:
results_formatter.set_expected_timeouts_by_basename(expected_timeout)
# Setup the test runner context. This is a dictionary of information that
# will be passed along to the timeout pre-kill handler and allows for loose
# coupling of its implementation.
runner_context = {
"archs": configuration.archs,
"platform_name": configuration.lldb_platform_name,
"platform_url": configuration.lldb_platform_url,
"platform_working_dir": configuration.lldb_platform_working_dir,
}
# Figure out which testrunner strategy we'll use.
runner_strategies_by_name = get_test_runner_strategies(num_threads)
runner_strategies_by_name = get_test_runner_strategies(
num_threads, session_dir, runner_context)
# If the user didn't specify a test runner strategy, determine
# the default now based on number of threads and OS type.
@ -1594,7 +1690,8 @@ def main(num_threads, test_subdir, test_runner_name, results_formatter):
"exceeded".format(
configuration.rerun_max_file_threshold))
else:
rerun_tests(test_subdir, tests_for_rerun, dotest_argv)
rerun_tests(test_subdir, tests_for_rerun, dotest_argv,
session_dir, runner_context)
# The results formatter - if present - is done now. Tell it to
# terminate.

View File

@ -483,6 +483,19 @@ class ProcessDriver(object):
def on_process_exited(self, command, output, was_timeout, exit_status):
pass
def on_timeout_pre_kill(self):
"""Called after the timeout interval elapses but before killing it.
This method is added to enable derived classes the ability to do
something to the process prior to it being killed. For example,
this would be a good spot to run a program that samples the process
to see what it was doing (or not doing).
Do not attempt to reap the process (i.e. use wait()) in this method.
That will interfere with the kill mechanism and return code processing.
"""
pass
def write(self, content):
# pylint: disable=no-self-use
# Intended - we want derived classes to be able to override
@ -640,6 +653,11 @@ class ProcessDriver(object):
# Reap the child process here.
self.returncode = self.process.wait()
else:
# Allow derived classes to do some work after we detected
# a timeout but before we touch the timed-out process.
self.on_timeout_pre_kill()
# Prepare to stop the process
process_terminated = completed_normally
terminate_attempt_count = 0