Terminate outside inst. deploy manager | Fix monitor loop

This commit is contained in:
abejgonzalez 2022-12-06 18:33:44 -08:00 committed by Abraham Gonzalez
parent 98fa2f9b25
commit 964ab8ca0b
5 changed files with 179 additions and 141 deletions

View File

@ -521,13 +521,18 @@ class FireSimTopologyWithPasses:
self.boot_simulation_passes(False, skip_instance_binding=True)
@parallel
def monitor_jobs_wrapper(run_farm, completed_jobs: List[str], teardown: bool, terminateoncompletion: bool, job_results_dir: str) -> Dict[str, Dict[str, bool]]:
def monitor_jobs_wrapper(
run_farm: RunFarm,
prior_completed_jobs: List[str],
is_final_loop: bool,
is_networked: bool,
terminateoncompletion: bool,
job_results_dir: str) -> Dict[str, Dict[str, bool]]:
""" on each instance, check over its switches and simulations
to copy results off. """
my_node = run_farm.lookup_by_host(env.host_string)
assert my_node.instance_deploy_manager is not None
return my_node.instance_deploy_manager.monitor_jobs_instance(completed_jobs, teardown, terminateoncompletion, job_results_dir)
return my_node.instance_deploy_manager.monitor_jobs_instance(prior_completed_jobs, is_final_loop, is_networked, terminateoncompletion, job_results_dir)
def loop_logger(instancestates: Dict[str, Any], terminateoncompletion: bool) -> None:
""" Print the simulation status nicely. """
@ -601,8 +606,8 @@ class FireSimTopologyWithPasses:
rootLogger.info("""{}/{} simulations are still running.""".format(runningsims, totalsims))
rootLogger.info("-"*80)
# teardown is required if roots are switches
teardown_required = isinstance(self.firesimtopol.roots[0], FireSimSwitchNode)
# is networked if a switch node is the root
is_networked = isinstance(self.firesimtopol.roots[0], FireSimSwitchNode)
# run polling loop
while True:
@ -612,24 +617,21 @@ class FireSimTopologyWithPasses:
def get_jobs_completed_local_info():
# this is a list of jobs completed, since any completed job will have
# a directory within this directory.
jobscompleted = os.listdir(self.workload.job_monitoring_dir)
rootLogger.debug("Monitoring dir jobs completed: " + str(jobscompleted))
return jobscompleted
monitored_jobs_completed = os.listdir(self.workload.job_monitoring_dir)
rootLogger.debug(f"Monitoring dir jobs completed: {monitored_jobs_completed}")
return monitored_jobs_completed
jobscompleted = get_jobs_completed_local_info()
# this job on the instance should return all the state about the instance
# e.g.:
# if an instance has been terminated (really - is termination
# requested and no jobs are left, then we will have implicitly
# terminated
teardown = False
instancestates = execute(monitor_jobs_wrapper, self.run_farm,
jobscompleted, teardown,
self.terminateoncompletion,
self.workload.job_results_dir,
hosts=all_run_farm_ips)
# return all the state about the instance (potentially copy back results and/or terminate)
is_final_run = False
monitored_jobs_completed = get_jobs_completed_local_info()
instancestates = execute(monitor_jobs_wrapper,
self.run_farm,
monitored_jobs_completed,
is_final_run,
is_networked,
self.terminateoncompletion,
self.workload.job_results_dir,
hosts=all_run_farm_ips)
# log sim state, raw
rootLogger.debug(pprint.pformat(instancestates))
@ -637,31 +639,37 @@ class FireSimTopologyWithPasses:
# log sim state, properly
loop_logger(instancestates, self.terminateoncompletion)
jobs_complete_dict = dict()
jobs_complete_dict = {}
simstates = [x['sims'] for x in instancestates.values()]
for x in simstates:
jobs_complete_dict.update(x)
global_status = jobs_complete_dict.values()
rootLogger.debug("jobs complete dict " + str(jobs_complete_dict))
rootLogger.debug("global status: " + str(global_status))
rootLogger.debug(f"Jobs complete: {jobs_complete_dict}")
rootLogger.debug(f"Global status: {global_status}")
if is_networked and any(global_status):
# at least one simulation has finished
if teardown_required and any(global_status):
# in this case, do the teardown, then call exec again, then exit
rootLogger.info("Teardown required, manually tearing down...")
rootLogger.info("Networked simulation, manually tearing down all instances...")
# do not disconnect nbds, because we may need them for copying
# results. the process of copying results will tear them down anyway
self.kill_simulation_passes(use_mock_instances_for_testing, disconnect_all_nbds=False)
rootLogger.debug("continuing one more loop to fully copy results and terminate")
teardown = True
# get latest local info about jobs completed. avoid extra copy
jobscompleted = get_jobs_completed_local_info()
instancestates = execute(monitor_jobs_wrapper, self.run_farm,
jobscompleted, teardown,
self.terminateoncompletion,
self.workload.job_results_dir,
hosts=all_run_farm_ips)
rootLogger.debug("One more loop to fully copy results and terminate.")
is_final_run = True
monitored_jobs_completed = get_jobs_completed_local_info()
instancestates = execute(monitor_jobs_wrapper,
self.run_farm,
monitored_jobs_completed,
is_final_run,
is_networked,
self.terminateoncompletion,
self.workload.job_results_dir,
hosts=all_run_farm_ips)
break
if not teardown_required and all(global_status):
if not is_networked and all(global_status):
break
time.sleep(10)

View File

@ -18,8 +18,8 @@ from util.io import firesim_input
from runtools.run_farm_deploy_managers import InstanceDeployManager, EC2InstanceDeployManager
from typing import Any, Dict, Optional, List, Union, Set, Type, Tuple, TYPE_CHECKING
from mypy_boto3_ec2.service_resource import Instance as EC2InstanceResource
if TYPE_CHECKING:
from mypy_boto3_ec2.service_resource import Instance as EC2InstanceResource
from runtools.firesim_topology_elements import FireSimSwitchNode, FireSimServerNode
rootLogger = logging.getLogger()
@ -28,6 +28,7 @@ class Inst(metaclass=abc.ABCMeta):
"""Run farm hosts that can hold simulations or switches.
Attributes:
run_farm: handle to run farm this instance is a part of
MAX_SWITCH_SLOTS_ALLOWED: max switch slots allowed (hardcoded)
switch_slots: switch node slots
_next_switch_port: next switch port to assign
@ -39,6 +40,8 @@ class Inst(metaclass=abc.ABCMeta):
metasimulation_enabled: true if this instance will be running metasimulations
"""
run_farm: RunFarm
# switch variables
# restricted by default security group network model port alloc (10000 to 11000)
MAX_SWITCH_SLOTS_ALLOWED: int = 1000
@ -58,8 +61,11 @@ class Inst(metaclass=abc.ABCMeta):
metasimulation_enabled: bool
def __init__(self, max_sim_slots_allowed: int, instance_deploy_manager: Type[InstanceDeployManager], sim_dir: Optional[str] = None, metasimulation_enabled: bool = False) -> None:
def __init__(self, run_farm: RunFarm, max_sim_slots_allowed: int, instance_deploy_manager: Type[InstanceDeployManager], sim_dir: Optional[str] = None, metasimulation_enabled: bool = False) -> None:
super().__init__()
self.run_farm = run_farm
self.switch_slots = []
self._next_switch_port = 10000 # track ports to allocate for server switch model ports
@ -111,6 +117,10 @@ class Inst(metaclass=abc.ABCMeta):
""" Return True iff any simulation on this Inst requires qcow2. """
return any([x.qcow2_support_required() for x in self.sim_slots])
def terminate_self(self) -> None:
""" Terminate the current host for the Inst. """
self.run_farm.terminate_by_inst(self)
class RunFarm(metaclass=abc.ABCMeta):
"""Abstract class to represent how to manage run farm hosts (similar to `BuildFarm`).
In addition to having to implement how to spawn/terminate nodes, the child classes must
@ -250,6 +260,11 @@ class RunFarm(metaclass=abc.ABCMeta):
"""Return run farm host based on host."""
raise NotImplementedError
@abc.abstractmethod
def terminate_by_inst(self, inst: Inst) -> None:
"""Terminate run farm host based on Inst object."""
raise NotImplementedError
def invert_filter_sort(input_dict: Dict[str, int]) -> List[Tuple[int, str]]:
"""Take a dict, convert to list of pairs, flip key and value,
remove all keys equal to zero, then sort on the new key."""
@ -350,7 +365,7 @@ class AWSEC2F1(RunFarm):
insts: List[Tuple[Inst, Optional[Union[EC2InstanceResource, MockBoto3Instance]]]] = []
for _ in range(num_insts):
insts.append((Inst(num_sim_slots, dispatch_dict[platform], simulation_dir, self.metasimulation_enabled), None))
insts.append((Inst(self, num_sim_slots, dispatch_dict[platform], simulation_dir, self.metasimulation_enabled), None))
self.run_farm_hosts_dict[inst_handle] = insts
self.mapper_consumed[inst_handle] = 0
@ -496,6 +511,18 @@ class AWSEC2F1(RunFarm):
return host_node
assert False, f"Unable to find host node by {host}"
def terminate_by_inst(self, inst: Inst) -> None:
"""Terminate run farm host based on host."""
for sim_host_handle in sorted(self.SIM_HOST_HANDLE_TO_MAX_FPGA_SLOTS):
inst_list = self.run_farm_hosts_dict[sim_host_handle]
for inner_inst, boto in inst_list:
if inner_inst.get_host() == inst.get_host():
# EC2InstanceResource can only be used for typing checks
# preventing its use for the isinstance() check
assert boto is not None and not isinstance(boto, MockBoto3Instance)
instanceids = get_instance_ids_for_instances([boto])
terminate_instances(instanceids, dryrun=False)
class ExternallyProvisioned(RunFarm):
"""This manages the set of externally provisioned instances. This class doesn't manage
launch/terminating instances. It is assumed that the instances are "ready to use".
@ -552,7 +579,7 @@ class ExternallyProvisioned(RunFarm):
platform = host_spec.get("override_platform", default_platform)
simulation_dir = host_spec.get("override_simulation_dir", self.default_simulation_dir)
inst = Inst(num_sims, dispatch_dict[platform], simulation_dir, self.metasimulation_enabled)
inst = Inst(self, num_sims, dispatch_dict[platform], simulation_dir, self.metasimulation_enabled)
inst.set_host(ip_addr)
assert not ip_addr in self.run_farm_hosts_dict, f"Duplicate host name found in 'run_farm_hosts': {ip_addr}"
self.run_farm_hosts_dict[ip_addr] = [(inst, None)]
@ -586,3 +613,7 @@ class ExternallyProvisioned(RunFarm):
if host_node.get_host() == host:
return host_node
assert False, f"Unable to find host node by {host} host name"
def terminate_by_inst(self, inst: Inst) -> None:
rootLogger.info(f"WARNING: Skipping terminate_by_inst since run hosts are externally provisioned.")
return

View File

@ -11,11 +11,9 @@ from fabric.contrib.project import rsync_project # type: ignore
import time
from os.path import join as pjoin
from awstools.awstools import terminate_instances, get_instance_ids_for_instances
from runtools.utils import has_sudo
from typing import List, Dict, Optional, Union, TYPE_CHECKING
from mypy_boto3_ec2.service_resource import Instance as EC2InstanceResource
if TYPE_CHECKING:
from runtools.firesim_topology_elements import FireSimSwitchNode, FireSimServerNode
from runtools.run_farm import Inst
@ -96,9 +94,12 @@ class InstanceDeployManager(metaclass=abc.ABCMeta):
"""
raise NotImplementedError
def instance_logger(self, logstr: str) -> None:
def instance_logger(self, logstr: str, debug: bool = False) -> None:
""" Log with this host's info as prefix. """
rootLogger.info("""[{}] """.format(env.host_string) + logstr)
if debug:
rootLogger.debug("""[{}] """.format(env.host_string) + logstr)
else:
rootLogger.info("""[{}] """.format(env.host_string) + logstr)
def sim_node_qcow(self) -> None:
""" If NBD is available and qcow2 support is required, install qemu-img
@ -305,122 +306,125 @@ class InstanceDeployManager(metaclass=abc.ABCMeta):
switches.append(line_stripped)
return {'switches': switches, 'simdrivers': simdrivers}
def monitor_jobs_instance(self, completed_jobs: List[str], teardown: bool, terminateoncompletion: bool,
def monitor_jobs_instance(self,
prior_completed_jobs: List[str],
is_final_loop: bool,
is_networked: bool,
terminateoncompletion: bool,
job_results_dir: str) -> Dict[str, Dict[str, bool]]:
""" Job monitoring for this host. """
# make a local copy of completed_jobs, so that we can update it
completed_jobs = list(completed_jobs)
self.instance_logger(f"Final loop?: {is_final_loop} Is networked?: {is_networked} Terminateoncomplete: {terminateoncompletion}", debug=True)
self.instance_logger(f"Prior completed jobs: {prior_completed_jobs}", debug=True)
def do_terminate():
if (not is_networked) or (is_networked and is_final_loop):
if terminateoncompletion:
self.terminate_instance()
rootLogger.debug("completed jobs " + str(completed_jobs))
if not self.instance_assigned_simulations() and self.instance_assigned_switches():
# this node hosts ONLY switches and not sims
#
self.instance_logger(f"Polling switch-only node", debug=True)
# just confirm that our switches are still running
# switches will never trigger shutdown in the cycle-accurate -
# they should run forever until torn down
if teardown:
# handle the case where we're just tearing down nodes that have
# ONLY switches
if is_final_loop:
self.instance_logger(f"Completing copies for switch-only node", debug=True)
for counter in range(len(self.parent_node.switch_slots)):
switchsim = self.parent_node.switch_slots[counter]
switchsim.copy_back_switchlog_from_run(job_results_dir, counter)
if terminateoncompletion:
# terminate the instance since teardown is called and instance
# termination is enabled
self.terminate_instance()
do_terminate()
# don't really care about the return val in the teardown case
return {'switches': dict(), 'sims': dict()}
# not teardown - just get the status of the switch sims
switchescompleteddict = {k: False for k in self.running_simulations()['switches']}
for switchsim in self.parent_node.switch_slots:
swname = switchsim.switch_builder.switch_binary_name()
if swname not in switchescompleteddict.keys():
switchescompleteddict[swname] = True
return {'switches': switchescompleteddict, 'sims': dict()}
if self.instance_assigned_simulations():
# this node has sims attached
# first, figure out which jobs belong to this instance.
# if they are all completed already. RETURN, DON'T TRY TO DO ANYTHING
# ON THE INSTNACE.
parentslots = self.parent_node.sim_slots
rootLogger.debug("parentslots " + str(parentslots))
jobnames = [slot.get_job_name() for slot in parentslots if slot is not None]
rootLogger.debug("jobnames " + str(jobnames))
already_done = all([job in completed_jobs for job in jobnames])
rootLogger.debug("already done? " + str(already_done))
if already_done:
# in this case, all of the nodes jobs have already completed. do nothing.
# this can never happen in the cycle-accurate case at a point where we care
# about switch status, so don't bother to populate it
jobnames_to_completed = {jname: True for jname in jobnames}
return {'sims': jobnames_to_completed, 'switches': dict()}
# at this point, all jobs are NOT completed. so, see how they're doing now:
instance_screen_status = self.running_simulations()
switchescompleteddict = {k: False for k in instance_screen_status['switches']}
if self.instance_assigned_switches():
# fill in whether switches have terminated for some reason
return {'switches': {}, 'sims': {}}
else:
# get the status of the switch sims
switchescompleteddict = {k: False for k in self.running_simulations()['switches']}
for switchsim in self.parent_node.switch_slots:
swname = switchsim.switch_builder.switch_binary_name()
if swname not in switchescompleteddict.keys():
switchescompleteddict[swname] = True
slotsrunning = [x for x in instance_screen_status['simdrivers']]
return {'switches': switchescompleteddict, 'sims': {}}
rootLogger.debug("slots running")
rootLogger.debug(slotsrunning)
if self.instance_assigned_simulations():
# this node has sims attached
self.instance_logger(f"Polling node with simulations (and potentially switches)", debug=True)
sim_slots = self.parent_node.sim_slots
jobnames = [slot.get_job_name() for slot in sim_slots]
all_jobs_completed = all([(job in prior_completed_jobs) for job in jobnames])
self.instance_logger(f"jobnames: {jobnames}", debug=True)
self.instance_logger(f"All jobs completed?: {all_jobs_completed}", debug=True)
if all_jobs_completed:
do_terminate()
# in this case, all of the nodes jobs have already completed. do nothing.
# this can never happen in the cycle-accurate case at a point where we care
# about switch status, so don't bother to populate it
jobnames_to_completed = {jname: True for jname in jobnames}
return {'sims': jobnames_to_completed, 'switches': {}}
# at this point, all jobs are NOT completed. so, see how they're doing now:
instance_screen_status = self.running_simulations()
switchescompleteddict = {k: False for k in instance_screen_status['switches']}
slotsrunning = [x for x in instance_screen_status['simdrivers']]
self.instance_logger(f"Switch Slots running: {switchescompleteddict}", debug=True)
self.instance_logger(f"Sim Slots running: {slotsrunning}", debug=True)
if self.instance_assigned_switches():
# fill in whether switches have terminated
for switchsim in self.parent_node.switch_slots:
sw_name = switchsim.switch_builder.switch_binary_name()
if sw_name not in switchescompleteddict.keys():
switchescompleteddict[sw_name] = True
# fill in whether sims have terminated
completed_jobs = prior_completed_jobs.copy() # create local copy to append to
for slotno, jobname in enumerate(jobnames):
if str(slotno) not in slotsrunning and jobname not in completed_jobs:
self.instance_logger("Slot " + str(slotno) + " completed! copying results.")
# NOW, we must copy off the results of this sim, since it just exited
parent = parentslots[slotno]
parent.copy_back_job_results_from_run(slotno, has_sudo())
# add our job to our copy of completed_jobs, so that next,
# we can test again to see if this instance is "done" and
# can be terminated
if (str(slotno) not in slotsrunning) and (jobname not in completed_jobs):
self.instance_logger(f"Slot {slotno}, Job {jobname} completed!")
completed_jobs.append(jobname)
# determine if we're done now.
jobs_done_q = {job: job in completed_jobs for job in jobnames}
now_done = all(jobs_done_q.values())
rootLogger.debug("now done: " + str(now_done))
if now_done and self.instance_assigned_switches():
# we're done AND we have switches running here, so kill them,
# then copy off their logs. this handles the case where you
# have a node with one simulation and some switches, to make
# sure the switch logs are copied off.
#
# the other cases are when you have multiple sims and a cycle-acc network,
# in which case the all() will never actually happen (unless someone builds
# a workload where two sims exit at exactly the same time, which we should
# advise users not to do)
#
# a last use case is when there's no network, in which case
# instance_assigned_switches won't be true, so this won't be called
# this writes the job monitoring file
sim_slots[slotno].copy_back_job_results_from_run(slotno, has_sudo())
self.kill_switches_instance()
jobs_complete_dict = {job: job in completed_jobs for job in jobnames}
now_all_jobs_complete = all(jobs_complete_dict.values())
self.instance_logger(f"Now done?: {now_all_jobs_complete}", debug=True)
for counter, switchsim in enumerate(self.parent_node.switch_slots):
switchsim.copy_back_switchlog_from_run(job_results_dir, counter)
if now_all_jobs_complete:
if self.instance_assigned_switches():
# we have switches running here, so kill them,
# then copy off their logs. this handles the case where you
# have a node with one simulation and some switches, to make
# sure the switch logs are copied off.
#
# the other cases are when you have multiple sims and a cycle-acc network,
# in which case the all() will never actually happen (unless someone builds
# a workload where two sims exit at exactly the same time, which we should
# advise users not to do)
#
# a last use case is when there's no network, in which case
# instance_assigned_switches won't be true, so this won't be called
if now_done and terminateoncompletion:
# terminate the instance since everything is done and instance
# termination is enabled
self.terminate_instance()
self.kill_switches_instance()
return {'switches': switchescompleteddict, 'sims': jobs_done_q}
for counter, switch_slot in enumerate(self.parent_node.switch_slots):
switch_slot.copy_back_switchlog_from_run(job_results_dir, counter)
do_terminate()
return {'switches': switchescompleteddict, 'sims': jobs_complete_dict}
assert False
def remote_kmsg(message: str) -> None:
""" This will let you write whatever is passed as message into the kernel
log of the remote machine. Useful for figuring what the manager is doing
@ -435,11 +439,9 @@ class EC2InstanceDeployManager(InstanceDeployManager):
This is in charge of managing the locations of stuff on remote nodes.
"""
boto3_instance_object: Optional[Union[EC2InstanceResource, MockBoto3Instance]]
def __init__(self, parent_node: Inst) -> None:
super().__init__(parent_node)
self.boto3_instance_object = None
self.nbd_tracker = NBDTracker()
def get_and_install_aws_fpga_sdk(self) -> None:
@ -618,10 +620,8 @@ class EC2InstanceDeployManager(InstanceDeployManager):
self.copy_switch_slot_infrastructure(slotno)
def terminate_instance(self) -> None:
assert isinstance(self.boto3_instance_object, EC2InstanceResource)
instanceids = get_instance_ids_for_instances([self.boto3_instance_object])
terminate_instances(instanceids, dryrun=False)
self.instance_logger("Terminating instance", debug=True)
self.parent_node.terminate_self()
class VitisInstanceDeployManager(InstanceDeployManager):
""" This class manages a Vitis-enabled instance """
@ -665,5 +665,4 @@ class VitisInstanceDeployManager(InstanceDeployManager):
def terminate_instance(self) -> None:
""" VitisInstanceDeployManager machines cannot be terminated. """
pass
return

View File

@ -38,7 +38,7 @@ autocounter:
workload:
workload_name: linux-poweroff-uniform.json
terminate_on_completion: no
terminate_on_completion: yes
suffix_tag: null
host_debug:

View File

@ -36,7 +36,7 @@ autocounter:
workload:
workload_name: linux-poweroff-uniform.json
terminate_on_completion: no
terminate_on_completion: yes
suffix_tag: null
host_debug: