diff --git a/deploy/runtools/firesim_topology_core.py b/deploy/runtools/firesim_topology_core.py index 36ed0329..6b73ff49 100644 --- a/deploy/runtools/firesim_topology_core.py +++ b/deploy/runtools/firesim_topology_core.py @@ -4,13 +4,28 @@ topology. """ from runtools.firesim_topology_elements import * from runtools.user_topology import UserTopologies +from typing import List, Callable + class FireSimTopology(UserTopologies): """ A FireSim Topology consists of a list of root FireSimNodes, which connect to other FireSimNodes. This is designed to model tree-like topologies.""" + custom_mapper: Callable - def get_dfs_order(self): + def __init__(self, user_topology_name: str, no_net_num_nodes: int) -> None: + # This just constructs the user topology. an upper level pass manager + # will apply passes to it. + + super().__init__(no_net_num_nodes) + + # a topology can specify a custom target -> host mapping. if left as None, + # the default mapper is used, which handles no network and simple networked cases. + self.custom_mapper = None + config_func = getattr(self, user_topology_name) + config_func() + + def get_dfs_order(self) -> List[FireSimNode]: """ Return all nodes in the topology in dfs order, as a list. """ stack = list(self.roots) retlist = [] @@ -27,26 +42,16 @@ class FireSimTopology(UserTopologies): stack = list(map(lambda x: x.get_downlink_side(), nextup.downlinks)) + stack return retlist - def get_dfs_order_switches(self): + def get_dfs_order_switches(self) -> List[FireSimSwitchNode]: """ Utility function that returns only switches, in dfs order. """ return [x for x in self.get_dfs_order() if isinstance(x, FireSimSwitchNode)] - def get_dfs_order_servers(self): + def get_dfs_order_servers(self) -> List[FireSimServerNode]: """ Utility function that returns only servers, in dfs order. """ return [x for x in self.get_dfs_order() if isinstance(x, FireSimServerNode)] - def get_bfs_order(self): + def get_bfs_order(self) -> None: """ return the nodes in the topology in bfs order """ # don't forget to eliminate dups assert False, "TODO" - def __init__(self, user_topology_name, no_net_num_nodes): - # This just constructs the user topology. an upper level pass manager - # will apply passes to it. - - # a topology can specify a custom target -> host mapping. if left as None, - # the default mapper is used, which handles no network and simple networked cases. - self.custom_mapper = None - self.no_net_num_nodes = no_net_num_nodes - config_func = getattr(self, user_topology_name) - config_func() diff --git a/deploy/runtools/firesim_topology_elements.py b/deploy/runtools/firesim_topology_elements.py index 2cf69461..7aeacc24 100644 --- a/deploy/runtools/firesim_topology_elements.py +++ b/deploy/runtools/firesim_topology_elements.py @@ -1,16 +1,23 @@ """ Node types necessary to construct a FireSimTopology. """ import logging +import abc +from fabric.contrib.project import rsync_project # type: ignore +from fabric.api import run, local, warn_only, get # type: ignore from runtools.switch_model_config import AbstractSwitchToSwitchConfig from runtools.utils import get_local_shared_libraries +from runtools.run_farm_instances import Inst from util.streamlogger import StreamLogger -from fabric.contrib.project import rsync_project # type: ignore +from runtools.workload import WorkloadConfig, JobConfig +from runtools.runtime_config import RuntimeHWConfig +from runtools.utils import MacAddress + +from typing import Optional, List, Tuple, Sequence rootLogger = logging.getLogger() - -class FireSimLink(object): +class FireSimLink: """ This represents a link that connects different FireSimNodes. Terms: @@ -28,12 +35,16 @@ class FireSimLink(object): RootSwitch has a downlink to Sim X. """ - # links have a globally unique identifier, currently used for naming # shmem regions for Shmem Links - next_unique_link_identifier = 0 + next_unique_link_identifier: int = 0 + id: int + id_as_str: str + uplink_side: Optional[FireSimNode] + downlink_side: Optional[FireSimNode] + port: Optional[int] - def __init__(self, uplink_side, downlink_side): + def __init__(self, uplink_side: FireSimNode, downlink_side: FireSimNode) -> None: self.id = FireSimLink.next_unique_link_identifier FireSimLink.next_unique_link_identifier += 1 # format as 100 char hex string padded with zeroes @@ -44,45 +55,52 @@ class FireSimLink(object): self.set_uplink_side(uplink_side) self.set_downlink_side(downlink_side) - def set_uplink_side(self, fsimnode): + def set_uplink_side(self, fsimnode: FireSimNode) -> None: self.uplink_side = fsimnode - def set_downlink_side(self, fsimnode): + def set_downlink_side(self, fsimnode: FireSimNode) -> None: self.downlink_side = fsimnode - def get_uplink_side(self): + def get_uplink_side(self) -> Optional[FireSimNode]: return self.uplink_side - def get_downlink_side(self): + def get_downlink_side(self) -> Optional[FireSimNode]: return self.downlink_side - def link_hostserver_port(self): + def link_hostserver_port(self) -> int: """ Get the port used for this Link. This should only be called for links implemented with SocketPorts. """ if self.port is None: - self.port = self.get_uplink_side().host_instance.allocate_host_port() + uplink_side = self.get_uplink_side() + assert uplink_side is not None + assert uplink_side.host_instance is not None + self.port = uplink_side.host_instance.allocate_host_port() return self.port - def link_hostserver_ip(self): + def link_hostserver_ip(self) -> str: """ Get the IP address used for this Link. This should only be called for links implemented with SocketPorts. """ - assert self.get_uplink_side().host_instance.is_bound_to_real_instance(), "Instances must be bound to private IP to emit switches with uplinks. i.e. you must have a running Run Farm." - return self.get_uplink_side().host_instance.get_private_ip() + uplink_side = self.get_uplink_side() + assert uplink_side is not None + assert uplink_side.host_instance is not None + return uplink_side.host_instance.get_ip() - def link_crosses_hosts(self): + def link_crosses_hosts(self) -> bool: """ Return True if the user has mapped the two endpoints of this link to separate hosts. This implies a SocketServerPort / SocketClientPort will be used to implement the Link. If False, use a sharedmem port to implement the link. """ if type(self.get_downlink_side()) == FireSimDummyServerNode: return False + assert self.get_uplink_side() is not None + assert self.get_downlink_side() is not None return self.get_uplink_side().host_instance != self.get_downlink_side().host_instance - def get_global_link_id(self): + def get_global_link_id(self) -> str: """ Return the globally unique link id, used for naming shmem ports. """ return self.id_as_str -class FireSimNode(object): +class FireSimNode(metaclass=abc.ABCMeta): """ This represents a node in the high-level FireSim Simulation Topology Graph. These nodes are either @@ -99,15 +117,18 @@ class FireSimNode(object): 3) Assigning workloads to run to simulators """ + downlinks: List[FireSimLink] + uplinks: List[FireSimLink] + host_instance: Optional[Inst] - def __init__(self): + def __init__(self) -> None: self.downlinks = [] # used when there are multiple links between switches to disambiguate #self.downlinks_consumed = [] self.uplinks = [] self.host_instance = None - def add_downlink(self, firesimnode): + def add_downlink(self, firesimnode: FireSimNode) -> None: """ A "downlink" is a link that will take you further from the root of the tree. Users define a tree topology by specifying "downlinks". Uplinks are automatically inferred. """ @@ -116,12 +137,13 @@ class FireSimNode(object): self.downlinks.append(linkobj) #self.downlinks_consumed.append(False) - def add_downlinks(self, firesimnodes): + def add_downlinks(self, firesimnodes: Sequence[FireSimNode]) -> None: """ Just a convenience function to add multiple downlinks at once. Assumes downlinks in the supplied list are ordered. """ - [self.add_downlink(node) for node in firesimnodes] + for node in firesimnodes: + self.add_downlink(node) - def add_uplink(self, firesimlink): + def add_uplink(self, firesimlink: FireSimLink) -> None: """ This is only for internal use - uplinks are automatically populated when a node is specified as the downlink of another. @@ -129,40 +151,54 @@ class FireSimNode(object): tree.""" self.uplinks.append(firesimlink) - def num_links(self): + def num_links(self) -> int: """ Return the total number of nodes. """ return len(self.downlinks) + len(self.uplinks) - def run_node_simulation(self): - """ Override this to provide the ability to launch your simulation. """ - pass - - def terminate_node_simulation(self): - """ Override this to provide the ability to terminate your simulation. """ - pass - - def has_assigned_host_instance(self): + def has_assigned_host_instance(self) -> bool: if self.host_instance is None: return False return True - def assign_host_instance(self, host_instance_run_farm_object): + def assign_host_instance(self, host_instance_run_farm_object: Inst) -> None: self.host_instance = host_instance_run_farm_object - def get_host_instance(self): + def get_host_instance(self) -> Optional[Inst]: return self.host_instance + @abc.abstractmethod + def diagramstr(self) -> str: + raise NotImplementedError + class FireSimServerNode(FireSimNode): """ This is a simulated server instance in FireSim. """ - SERVERS_CREATED = 0 + SERVERS_CREATED: int = 0 + server_hardware_config: Optional[RuntimeHWConfig] + server_link_latency: Optional[int] + server_bw_max: Optional[int] + server_profile_interval: Optional[int] + trace_enable: Optional[bool] + trace_select: Optional[str] + trace_start: Optional[str] + trace_end: Optional[str] + trace_output_format: Optional[str] + autocounter_readrate: Optional[int] + zerooutdram: Optional[bool] + disable_asserts: Optional[bool] + print_start: Optional[str] + print_end: Optional[str] + print_cycle_prefix: Optional[bool] + job: Optional[JobConfig] + server_id_internal: int + mac_address: Optional[MacAddress] - def __init__(self, server_hardware_config=None, server_link_latency=None, - server_bw_max=None, server_profile_interval=None, - trace_enable=None, trace_select=None, trace_start=None, trace_end=None, trace_output_format=None, autocounter_readrate=None, - zerooutdram=None, disable_asserts=None, - print_start=None, print_end=None, print_cycle_prefix=None): - super(FireSimServerNode, self).__init__() + def __init__(self, server_hardware_config: Optional[RuntimeHWConfig] = None, server_link_latency: Optional[int] = None, + server_bw_max: Optional[int] = None, server_profile_interval: Optional[int] = None, + trace_enable: Optional[bool] = None, trace_select: Optional[str] = None, trace_start: Optional[str] = None, trace_end: Optional[str] = None, trace_output_format: Optional[str] = None, autocounter_readrate: Optional[int] = None, + zerooutdram: Optional[bool] = None, disable_asserts: Optional[bool] = None, + print_start: Optional[str] = None, print_end: Optional[str] = None, print_cycle_prefix: Optional[int] = None): + super().__init__() self.server_hardware_config = server_hardware_config self.server_link_latency = server_link_latency self.server_bw_max = server_bw_max @@ -180,21 +216,22 @@ class FireSimServerNode(FireSimNode): self.print_cycle_prefix = print_cycle_prefix self.job = None self.server_id_internal = FireSimServerNode.SERVERS_CREATED + self.mac_address = None FireSimServerNode.SERVERS_CREATED += 1 - def set_server_hardware_config(self, server_hardware_config): + def set_server_hardware_config(self, server_hardware_config: RuntimeHWConfig) -> None: self.server_hardware_config = server_hardware_config - def get_server_hardware_config(self): + def get_server_hardware_config(self) -> Optional[RuntimeHWConfig]: return self.server_hardware_config - def assign_mac_address(self, macaddr): + def assign_mac_address(self, macaddr: MacAddress) -> None: self.mac_address = macaddr - def get_mac_address(self): + def get_mac_address(self) -> MacAddress: return self.mac_address - def process_qcow2_rootfses(self, rootfses_list): + def process_qcow2_rootfses(self, rootfses_list: List[str]) -> Sequence[str]: """ Take in list of all rootfses on this node. For the qcow2 ones, find the allocated devices, attach the device to the qcow2 image on the remote node, and replace it in the list with that nbd device. Return @@ -207,7 +244,10 @@ class FireSimServerNode(FireSimNode): result_list = [] for rootfsname in rootfses_list: if rootfsname and rootfsname.endswith(".qcow2"): - allocd_device = self.get_host_instance().nbd_tracker.get_nbd_for_imagename(rootfsname) + host_inst = self.host_instance + assert host_inst is not None + assert isinstance(host_inst, EC2Inst) + allocd_device = host_inst.nbd_tracker.get_nbd_for_imagename(rootfsname) # connect the /dev/nbdX device to the rootfs run("""sudo qemu-nbd -c {devname} {rootfs}""".format(devname=allocd_device, rootfs=rootfsname)) @@ -215,16 +255,18 @@ class FireSimServerNode(FireSimNode): result_list.append(rootfsname) return result_list - def allocate_nbds(self): + def allocate_nbds(self) -> None: """ called by the allocate nbds pass to assign an nbd to a qcow2 image. """ rootfses_list = [self.get_rootfs_name()] for rootfsname in rootfses_list: if rootfsname and rootfsname.endswith(".qcow2"): - allocd_device = self.get_host_instance().nbd_tracker.get_nbd_for_imagename(rootfsname) + assert host_inst is not None + assert isinstance(host_inst, EC2Inst) + allocd_device = host_inst.nbd_tracker.get_nbd_for_imagename(rootfsname) - def diagramstr(self): + def diagramstr(self) -> str: msg = """{}:{}\n----------\nMAC: {}\n{}\n{}""".format("FireSimServerNode", str(self.server_id_internal), str(self.mac_address), @@ -232,7 +274,7 @@ class FireSimServerNode(FireSimNode): str(self.server_hardware_config)) return msg - def run_sim_start_command(self, slotno): + def run_sim_start_command(self, slotno: int) -> None: """ get/run the command to run a simulation. assumes it will be called in a directory where its required_files are already located. """ @@ -247,6 +289,12 @@ class FireSimServerNode(FireSimNode): all_bootbins = [self.get_bootbin_name()] all_shmemportnames = [shmemportname] + assert self.server_hardware_config is not None + assert (self.server_profile_interval is not None and all_bootbins is not None and self.trace_enable is not None and + self.trace_select is not None and self.trace_start is not None and self.trace_end is not None and self.trace_output_format is not None and + self.autocounter_readrate is not None and all_shmemportnames is not None and self.zerooutdram is not None and self.disable_asserts is not None and + self.print_start is not None and self.print_end is not None and self.print_cycle_prefix) + runcommand = self.server_hardware_config.get_boot_simulation_command( slotno, all_macs, all_rootfses, all_linklatencies, all_maxbws, self.server_profile_interval, all_bootbins, self.trace_enable, @@ -256,7 +304,7 @@ class FireSimServerNode(FireSimNode): run(runcommand) - def copy_back_job_results_from_run(self, slotno): + def copy_back_job_results_from_run(self, slotno: int) -> None: """ 1) Make the local directory for this job's output 2) Copy back UART log @@ -280,6 +328,7 @@ class FireSimServerNode(FireSimNode): rootLogger.debug("[localhost] " + str(localcap)) rootLogger.debug("[localhost] " + str(localcap.stderr)) + assert self.host_instance is not None dest_sim_dir = self.host_instance.dest_simulation_dir # mount rootfs, copy files from it back to local system @@ -291,7 +340,7 @@ class FireSimServerNode(FireSimNode): run("""sudo mkdir -p {}""".format(mountpoint)) if is_qcow2: - rfsname = self.get_host_instance().nbd_tracker.get_nbd_for_imagename(rfsname) + rfsname = self.host_instance.nbd_tracker.get_nbd_for_imagename(rfsname) else: rfsname = """{}/sim_slot_{}/{}""".format(dest_sim_dir, simserverindex, rfsname) @@ -337,25 +386,29 @@ class FireSimServerNode(FireSimNode): rootLogger.debug(rsync_cap) rootLogger.debug(rsync_cap.stderr) - def get_sim_kill_command(self, slotno): + def get_sim_kill_command(self, slotno: int) -> str: """ return the command to kill the simulation. assumes it will be called in a directory where its required_files are already located. """ + assert self.server_hardware_config is not None return self.server_hardware_config.get_kill_simulation_command() - def get_required_files_local_paths(self): + def get_required_files_local_paths(self) -> List[Tuple[str, str]]: """ Return local paths of all stuff needed to run this simulation as an array. """ all_paths = [] if self.get_job().rootfs_path() is not None: - all_paths.append([self.get_job().rootfs_path(), self.get_rootfs_name()]) + all_paths.append((self.get_job().rootfs_path(), self.get_rootfs_name())) - all_paths.append([self.get_job().bootbinary_path(), self.get_bootbin_name()]) + all_paths.append((self.get_job().bootbinary_path(), self.get_bootbin_name())) + + + assert self.server_hardware_config is not None driver_path = self.server_hardware_config.get_local_driver_path() - all_paths.append([driver_path, '']) - all_paths.append([self.server_hardware_config.get_local_runtime_conf_path(), '']) + all_paths.append((driver_path, '')) + all_paths.append((self.server_hardware_config.get_local_runtime_conf_path(), '')) # shared libraries all_paths += get_local_shared_libraries(driver_path) @@ -363,29 +416,30 @@ class FireSimServerNode(FireSimNode): all_paths += self.get_job().get_siminputs() return all_paths - def get_agfi(self): + def get_agfi(self) -> str: """ Return the AGFI that should be flashed. """ + assert self.server_hardware_config is not None return self.server_hardware_config.agfi - def assign_job(self, job): + def assign_job(self, job: JobConfig) -> None: """ Assign a job to this node. """ self.job = job - def get_job(self): + def get_job(self) -> JobConfig: """ Get the job assigned to this node. """ return self.job - def get_job_name(self): + def get_job_name(self) -> str: return self.job.jobname - def get_rootfs_name(self): + def get_rootfs_name(self) -> Optional[str]: if self.get_job().rootfs_path() is None: return None # prefix rootfs name with the job name to disambiguate in supernode # cases return self.get_job_name() + "-" + self.get_job().rootfs_path().split("/")[-1] - def get_bootbin_name(self): + def get_bootbin_name(self) -> str: # prefix bootbin name with the job name to disambiguate in supernode # cases return self.get_job_name() + "-" + self.get_job().bootbinary_path().split("/")[-1] @@ -396,10 +450,10 @@ class FireSimSuperNodeServerNode(FireSimServerNode): call out to dummy server nodes to get all the info to launch the one command line to run the FPGA sim that has N > 1 sims on one fpga.""" - def copy_back_job_results_from_run(self, slotno): + def copy_back_job_results_from_run(self, slotno: int) -> None: """ This override is to call copy back job results for all the dummy nodes too. """ # first call the original - super(FireSimSuperNodeServerNode, self).copy_back_job_results_from_run(slotno) + super().copy_back_job_results_from_run(slotno) # call on all siblings num_siblings = self.supernode_get_num_siblings_plus_one() @@ -407,27 +461,30 @@ class FireSimSuperNodeServerNode(FireSimServerNode): # TODO: for now, just hackishly give the siblings a host node. # fixing this properly is going to probably require a larger revamp # of supernode handling - super_server_host = self.get_host_instance() + super_server_host = self.host_instance for sibindex in range(1, num_siblings): sib = self.supernode_get_sibling(sibindex) sib.assign_host_instance(super_server_host) sib.copy_back_job_results_from_run(slotno) - def allocate_nbds(self): + def allocate_nbds(self) -> None: """ called by the allocate nbds pass to assign an nbd to a qcow2 image. """ num_siblings = self.supernode_get_num_siblings_plus_one() + assert self.get_rootfs_name() is not None + rootfses_list = [self.get_rootfs_name()] + [self.supernode_get_sibling_rootfs(x) for x in range(1, num_siblings)] for rootfsname in rootfses_list: if rootfsname.endswith(".qcow2"): - allocd_device = self.get_host_instance().nbd_tracker.get_nbd_for_imagename(rootfsname) + assert self.host_instance is not None + allocd_device = self.host_instance.nbd_tracker.get_nbd_for_imagename(rootfsname) - def supernode_get_num_siblings_plus_one(self): + def supernode_get_num_siblings_plus_one(self) -> int: """ This returns the number of siblings the supernodeservernode has, plus one (because in most places, we use siblings + 1, not just siblings) """ @@ -443,44 +500,46 @@ class FireSimSuperNodeServerNode(FireSimServerNode): count = True return siblings - def supernode_get_sibling(self, siblingindex): + def supernode_get_sibling(self, siblingindex: int) -> FireSimNode: """ return the sibling for supernode mode. siblingindex = 1 -> next sibling, 2 = second, 3 = last one.""" for index, servernode in enumerate(map( lambda x : x.get_downlink_side(), self.uplinks[0].get_uplink_side().downlinks)): if self == servernode: return self.uplinks[0].get_uplink_side().downlinks[index+siblingindex].get_downlink_side() + assert False, "Should return supernode sibling" - def supernode_get_sibling_mac_address(self, siblingindex): + def supernode_get_sibling_mac_address(self, siblingindex: int) -> str: """ return the sibling's mac address for supernode mode. siblingindex = 1 -> next sibling, 2 = second, 3 = last one.""" return self.supernode_get_sibling(siblingindex).get_mac_address() - def supernode_get_sibling_rootfs(self, siblingindex): + def supernode_get_sibling_rootfs(self, siblingindex: int) -> str: """ return the sibling's rootfs for supernode mode. siblingindex = 1 -> next sibling, 2 = second, 3 = last one.""" + assert self.supernode_get_sibling(siblingindex).get_rootfs_name() is not None return self.supernode_get_sibling(siblingindex).get_rootfs_name() - def supernode_get_sibling_bootbin(self, siblingindex): + def supernode_get_sibling_bootbin(self, siblingindex: int) -> str: """ return the sibling's rootfs for supernode mode. siblingindex = 1 -> next sibling, 2 = second, 3 = last one.""" return self.supernode_get_sibling(siblingindex).get_bootbin_name() - def supernode_get_sibling_rootfs_path(self, siblingindex): + def supernode_get_sibling_rootfs_path(self, siblingindex: int) -> str: return self.supernode_get_sibling(siblingindex).get_job().rootfs_path() - def supernode_get_sibling_bootbinary_path(self, siblingindex): + def supernode_get_sibling_bootbinary_path(self, siblingindex: int) -> str: return self.supernode_get_sibling(siblingindex).get_job().bootbinary_path() - def supernode_get_sibling_link_latency(self, siblingindex): + def supernode_get_sibling_link_latency(self, siblingindex: int) -> int: return self.supernode_get_sibling(siblingindex).server_link_latency - def supernode_get_sibling_bw_max(self, siblingindex): + def supernode_get_sibling_bw_max(self, siblingindex: int) -> int: return self.supernode_get_sibling(siblingindex).server_bw_max - def supernode_get_sibling_shmemportname(self, siblingindex): + def supernode_get_sibling_shmemportname(self, siblingindex: int) -> int: return self.supernode_get_sibling(siblingindex).uplinks[0].get_global_link_id() - def run_sim_start_command(self, slotno): + def run_sim_start_command(self, slotno: int) -> None: """ get/run the command to run a simulation. assumes it will be called in a directory where its required_files are already located.""" @@ -504,7 +563,7 @@ class FireSimSuperNodeServerNode(FireSimServerNode): run(runcommand) - def get_required_files_local_paths(self): + def get_required_files_local_paths(self) -> List[Tuple[str, str]]: """ Return local paths of all stuff needed to run this simulation as an array. """ @@ -513,6 +572,8 @@ class FireSimSuperNodeServerNode(FireSimServerNode): def local_and_remote(filepath, index): return [filepath, get_path_trailing(filepath) + str(index)] + assert self.get_rootfs_name() is not None + all_paths = [] if self.get_job().rootfs_path() is not None: all_paths.append([self.get_job().rootfs_path(), @@ -544,13 +605,11 @@ class FireSimSuperNodeServerNode(FireSimServerNode): class FireSimDummyServerNode(FireSimServerNode): """ This is a dummy server node for supernode mode. """ - def __init__(self, server_hardware_config=None, server_link_latency=None, - server_bw_max=None): - super(FireSimDummyServerNode, self).__init__(server_hardware_config, - server_link_latency, - server_bw_max) + def __init__(self, server_hardware_config: Optional[RuntimeHWConfig] = None, server_link_latency: Optional[int] = None, + server_bw_max: Optional[int] = None): + super().__init__(server_hardware_config, server_link_latency, server_bw_max) - def allocate_nbds(self): + def allocate_nbds(self) -> None: """ this is handled by the non-dummy node. override so it does nothing when called""" pass @@ -563,10 +622,16 @@ class FireSimSwitchNode(FireSimNode): much special configuration.""" # used to give switches a global ID - SWITCHES_CREATED = 0 + SWITCHES_CREATED: int = 0 + switch_id_internal: int + switch_table: Optional[List[int]] + switch_link_latency: Optional[int] + switch_switching_latency: Optional[int] + switch_bandwidth: Optional[int] + switch_builder: AbstractSwitchToSwitchConfig - def __init__(self, switching_latency=None, link_latency=None, bandwidth=None): - super(FireSimSwitchNode, self).__init__() + def __init__(self, switching_latency: Optional[int] = None, link_latency: Optional[int] = None, bandwidth: Optional[int] = None): + super().__init__() self.switch_id_internal = FireSimSwitchNode.SWITCHES_CREATED FireSimSwitchNode.SWITCHES_CREATED += 1 self.switch_table = None @@ -580,12 +645,12 @@ class FireSimSwitchNode(FireSimNode): #self.switch_builder = None self.switch_builder = AbstractSwitchToSwitchConfig(self) - def build_switch_sim_binary(self): + def build_switch_sim_binary(self) -> None: """ This actually emits a config and builds the switch binary that can be used to do the simulation. """ self.switch_builder.buildswitch() - def get_required_files_local_paths(self): + def get_required_files_local_paths(self) -> List[Tuple[str, str]]: """ Return local paths of all stuff needed to run this simulation as array. """ all_paths = [] @@ -594,13 +659,13 @@ class FireSimSwitchNode(FireSimNode): all_paths += get_local_shared_libraries(bin) return all_paths - def get_switch_start_command(self): + def get_switch_start_command(self) -> str: return self.switch_builder.run_switch_simulation_command() - def get_switch_kill_command(self): + def get_switch_kill_command(self) -> str: return self.switch_builder.kill_switch_simulation_command() - def copy_back_switchlog_from_run(self, job_results_dir, switch_slot_no): + def copy_back_switchlog_from_run(self, job_results_dir: str, switch_slot_no: int) -> None: """ Copy back the switch log for this switch @@ -623,7 +688,7 @@ class FireSimSwitchNode(FireSimNode): get(remote_path=remote_sim_run_dir + simoutputfile, local_path=job_dir) - def diagramstr(self): + def diagramstr(self) -> str: msg = """{}:{}\n---------\ndownlinks: {}\nswitchingtable: {}""".format( "FireSimSwitchNode", str(self.switch_id_internal), ", ".join(map(str, self.downlinkmacs)), ", ".join(map(str, self.switch_table))) diff --git a/deploy/runtools/firesim_topology_with_passes.py b/deploy/runtools/firesim_topology_with_passes.py index 8d48513a..92fb7db0 100644 --- a/deploy/runtools/firesim_topology_with_passes.py +++ b/deploy/runtools/firesim_topology_with_passes.py @@ -8,17 +8,23 @@ from datetime import datetime from functools import reduce import types from colorama import Fore, Style # type: ignore +from fabric.api import parallel, execute # type: ignore from runtools.switch_model_config import * from runtools.firesim_topology_core import * from runtools.utils import MacAddress from runtools.run_farm import * +from runtools.runtime_config import RuntimeHWDB +from runtools.workload import WorkloadConfig + from util.streamlogger import StreamLogger +from typing import cast + rootLogger = logging.getLogger() @parallel # type: ignore -def instance_liveness(): +def instance_liveness() -> None: """ Confirm that all instances are accessible (are running and can be ssh'ed into) first so that we don't run any actual firesim-related commands on only some of the run farm machines.""" rootLogger.info("""[{}] Checking if host instance is up...""".format(env.host_string)) @@ -31,15 +37,39 @@ class FireSimTopologyWithPasses: >>> tconf = FireSimTargetConfiguration("example_16config") """ + passes_used: List[str] + user_topology_name: str + no_net_num_nodes: int + run_farm: RunFarm + hwdb: RuntimeHWDB + workload: WorkloadConfig + firesimtopol: FireSimTopology + defaulthwconfig: str + defaultlinklatency: int + defaultswitchinglatency: int + defaultnetbandwidth: int + defaultprofileinterval: int + defaulttraceenable: bool + defaulttraceselect: str + defaulttracestart: str + defaulttraceend: str + defaulttraceoutputformat: str + defaultautocounterreadrate: int + defaultzerooutdram: bool + defaultdisableasserts: bool + defaultprintstart: str + defaultprintend: str + defaultprintcycleprefix: int + terminateoncompletion: bool - def __init__(self, user_topology_name, no_net_num_nodes, run_farm, hwdb, - defaulthwconfig, workload, defaultlinklatency, defaultswitchinglatency, - defaultnetbandwidth, defaultprofileinterval, - defaulttraceenable, defaulttraceselect, defaulttracestart, defaulttraceend, - defaulttraceoutputformat, - defaultautocounterreadrate, terminateoncompletion, - defaultzerooutdram, defaultdisableasserts, - defaultprintstart, defaultprintend, defaultprintcycleprefix): + def __init__(self, user_topology_name: str, no_net_num_nodes: int, run_farm: RunFarm, hwdb: RuntimeHWDB, + defaulthwconfig: str, workload: WorkloadConfig, defaultlinklatency: int, defaultswitchinglatency: int, + defaultnetbandwidth: int, defaultprofileinterval: int, + defaulttraceenable: bool, defaulttraceselect: str, defaulttracestart: str, defaulttraceend: str, + defaulttraceoutputformat: str, + defaultautocounterreadrate: int, terminateoncompletion: bool, + defaultzerooutdram: bool, defaultdisableasserts: bool, + defaultprintstart: str, defaultprintend: str, defaultprintcycleprefix: int): self.passes_used = [] self.user_topology_name = user_topology_name self.no_net_num_nodes = no_net_num_nodes @@ -67,12 +97,11 @@ class FireSimTopologyWithPasses: self.phase_one_passes() - def pass_return_dfs(self): + def pass_return_dfs(self) -> List[FireSimNode]: """ Just return the nodes in DFS order """ return self.firesimtopol.get_dfs_order() - - def pass_assign_mac_addresses(self): + def pass_assign_mac_addresses(self) -> None: """ DFS through the topology to assign mac addresses """ self.passes_used.append("pass_assign_mac_addresses") @@ -82,8 +111,7 @@ class FireSimTopologyWithPasses: if isinstance(node, FireSimServerNode): node.assign_mac_address(MacAddress()) - - def pass_compute_switching_tables(self): + def pass_compute_switching_tables(self) -> None: """ This creates the MAC addr -> port lists for switch nodes. a) First, a pass that computes "downlinkmacs" for each node, which @@ -126,7 +154,7 @@ class FireSimTopologyWithPasses: switch.switch_table = switchtab - def pass_create_topology_diagram(self): + def pass_create_topology_diagram(self) -> None: """ Produce a PDF that shows a diagram of the network. Useful for debugging passes to see what has been done to particular nodes. """ @@ -154,14 +182,14 @@ class FireSimTopologyWithPasses: gviz_graph.render(view=False) - def pass_no_net_host_mapping(self): + def pass_no_net_host_mapping(self) -> None: # only if we have no networks - pack simulations # assumes the user has provided enough or more slots servers = self.firesimtopol.get_dfs_order_servers() serverind = 0 run_farm_nodes = self.run_farm.get_all_host_nodes() - fpga_nodes = list(filter(lambda x: x.is_fpga_node(), run_farm_nodes)) + fpga_nodes = cast(List[FPGAInst], list(filter(lambda x: x.is_fpga_node(), run_farm_nodes))) fpga_nodes.sort(reverse=True, key=lambda x: x.get_num_fpga_slots_max()) # largest fpga nodes 1st # find unused fpga (starting from largest) @@ -173,13 +201,13 @@ class FireSimTopologyWithPasses: return assert serverind == len(servers), "ERR: all servers were not assigned to a host." - def pass_simple_networked_host_node_mapping(self): + def pass_simple_networked_host_node_mapping(self) -> None: """ A very simple host mapping strategy. """ switches = self.firesimtopol.get_dfs_order_switches() run_farm_nodes = self.run_farm.get_all_host_nodes() switch_nodes = list(filter(lambda x: not x.is_fpga_node(), run_farm_nodes)) - fpga_nodes = list(filter(lambda x: x.is_fpga_node(), run_farm_nodes)) + fpga_nodes = cast(List[FPGAInst], list(filter(lambda x: x.is_fpga_node(), run_farm_nodes))) fpga_nodes.sort(key=lambda x: x.get_num_fpga_slots_max()) # smallest fpga nodes 1st for switch in switches: @@ -198,29 +226,31 @@ class FireSimTopologyWithPasses: if node.get_num_fpga_slots_consumed() == 0 and node.get_num_fpga_slots_max() >= len(downlinknodes): node.add_switch(switch) for server in downlinknodes: + assert isinstance(server, FireSimServerNode) node.add_simulation(server) else: assert False, "Mixed downlinks currently not supported.""" - def mapping_use_one_fpga_node(self): + def mapping_use_one_fpga_node(self) -> None: """ Just put everything on one fpga node """ switches = self.firesimtopol.get_dfs_order_switches() fpga_nodes_used = 0 run_farm_nodes = self.run_farm.get_all_host_nodes() - fpga_nodes = list(filter(lambda x: x.is_fpga_node(), run_farm_nodes)) + fpga_nodes = cast(List[FPGAInst], list(filter(lambda x: x.is_fpga_node(), run_farm_nodes))) for switch in switches: fpga_nodes[fpga_nodes_used].add_switch(switch) downlinknodes = map(lambda x: x.get_downlink_side(), switch.downlinks) if all([isinstance(x, FireSimServerNode) for x in downlinknodes]): for server in downlinknodes: + assert isinstance(server, FireSimServerNode) fpga_nodes[fpga_nodes_used].add_simulation(server) elif any([isinstance(x, FireSimServerNode) for x in downlinknodes]): assert False, "MIXED DOWNLINKS NOT SUPPORTED." fpga_nodes_used += 1 - def pass_perform_host_node_mapping(self): + def pass_perform_host_node_mapping(self) -> None: """ This pass assigns host nodes to nodes in the abstract FireSim configuration tree. @@ -230,7 +260,8 @@ class FireSimTopologyWithPasses: networked config, """ # enforce that this is only no net in all other non-EC2 cases - if isinstance(self.run_farm, EC2RunFarm): + assert isinstance(self.run_farm, AWSEC2F1) + if isinstance(self.run_farm, AWSEC2F1): if self.firesimtopol.custom_mapper is None: """ Use default mapping strategy. The topol has not specified a special one. """ @@ -244,27 +275,18 @@ class FireSimTopologyWithPasses: # now, we're handling the cycle-accurate networked simulation case # currently, we only handle the case where self.pass_simple_networked_host_node_mapping() - elif type(self.firesimtopol.custom_mapper) == types.FunctionType: + elif isinstance(self.firesimtopol.custom_mapper, types.FunctionType): """ call the mapper fn defined in the topology itself. """ self.firesimtopol.custom_mapper(self) - elif type(self.firesimtopol.custom_mapper) == str: + elif isinstance(self.firesimtopol.custom_mapper, str): """ assume that the mapping strategy is a custom pre-defined strategy given in this class, supplied as a string in the topology """ mapperfunc = getattr(self, self.firesimtopol.custom_mapper) mapperfunc() else: assert False, "IMPROPER MAPPING CONFIGURATION" - else: - # if your roots are servers, just pack as tightly as possible, since - # you have no_net_config - if all([isinstance(x, FireSimServerNode) for x in self.firesimtopol.roots]): - # all roots are servers, so we're in no_net_config - # if the user has specified any 16xlarges, we assign to them first - self.pass_no_net_host_mapping() - else: - assert False, "Only supports no net configs" - def pass_apply_default_hwconfig(self): + def pass_apply_default_hwconfig(self) -> None: """ This is the default mapping pass for hardware configurations - it does 3 things: 1) If a node has a hardware config assigned (as a string), replace @@ -288,7 +310,7 @@ class FireSimTopologyWithPasses: # 3) server.get_server_hardware_config().get_deploytriplet_for_config() - def pass_apply_default_network_params(self): + def pass_apply_default_network_params(self) -> None: """ If the user has not set per-node network parameters in the topology, apply the defaults. """ allnodes = self.firesimtopol.get_dfs_order() @@ -334,7 +356,7 @@ class FireSimTopologyWithPasses: node.print_cycle_prefix = self.defaultprintcycleprefix - def pass_allocate_nbd_devices(self): + def pass_allocate_nbd_devices(self) -> None: """ allocate NBD devices. this must be done here to preserve the data structure for use in runworkload teardown. """ servers = self.firesimtopol.get_dfs_order_servers() @@ -342,13 +364,14 @@ class FireSimTopologyWithPasses: server.allocate_nbds() - def pass_assign_jobs(self): + def pass_assign_jobs(self) -> None: """ assign jobs to simulations. """ servers = self.firesimtopol.get_dfs_order_servers() - [servers[i].assign_job(self.workload.get_job(i)) for i in range(len(servers))] + for i in range(len(servers)): + servers[i].assign_job(self.workload.get_job(i)) - def phase_one_passes(self): + def phase_one_passes(self) -> None: """ These are passes that can run without requiring host-node binding. i.e. can be run before you have run launchrunfarm. They're run automatically when creating this object. """ @@ -362,7 +385,7 @@ class FireSimTopologyWithPasses: self.pass_create_topology_diagram() - def pass_build_required_drivers(self): + def pass_build_required_drivers(self) -> None: """ Build all FPGA drivers. The method we're calling here won't actually repeat the build process more than once per run of the manager. """ servers = self.firesimtopol.get_dfs_order_servers() @@ -370,7 +393,7 @@ class FireSimTopologyWithPasses: for server in servers: server.get_server_hardware_config().build_fpga_driver() - def pass_build_required_switches(self): + def pass_build_required_switches(self) -> None: """ Build all the switches required for this simulation. """ # the way the switch models are designed, this requires hosts to be # bound to instances. @@ -379,7 +402,7 @@ class FireSimTopologyWithPasses: switch.build_switch_sim_binary() - def infrasetup_passes(self, use_mock_instances_for_testing): + def infrasetup_passes(self, use_mock_instances_for_testing: bool) -> None: """ extra passes needed to do infrasetup """ self.run_farm.post_launch_binding(use_mock_instances_for_testing) @@ -387,15 +410,17 @@ class FireSimTopologyWithPasses: self.pass_build_required_switches() @parallel - def infrasetup_node_wrapper(runfarm): + def infrasetup_node_wrapper(runfarm: RunFarm) -> None: my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node is not None + assert my_node.instance_deploy_manager is not None my_node.instance_deploy_manager.infrasetup_instance() all_runfarm_ips = [x.get_ip() for x in self.run_farm.get_all_host_nodes()] execute(instance_liveness, hosts=all_runfarm_ips) execute(infrasetup_node_wrapper, self.run_farm, hosts=all_runfarm_ips) - def boot_simulation_passes(self, use_mock_instances_for_testing, skip_instance_binding=False): + def boot_simulation_passes(self, use_mock_instances_for_testing: bool, skip_instance_binding: bool = False) -> None: """ Passes that setup for boot and boot the simulation. skip instance binding lets users not call the binding pass on the run_farm again, e.g. if this was called by runworkload (because runworkload calls @@ -408,8 +433,10 @@ class FireSimTopologyWithPasses: self.run_farm.post_launch_binding(use_mock_instances_for_testing) @parallel - def boot_switch_wrapper(runfarm): + def boot_switch_wrapper(runfarm: RunFarm) -> None: my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node is not None + assert my_node.instance_deploy_manager is not None my_node.instance_deploy_manager.start_switches_instance() all_runfarm_ips = [x.get_ip() for x in self.run_farm.get_all_host_nodes()] @@ -417,32 +444,38 @@ class FireSimTopologyWithPasses: execute(boot_switch_wrapper, self.run_farm, hosts=all_runfarm_ips) @parallel - def boot_simulation_wrapper(runfarm): + def boot_simulation_wrapper(runfarm: RunFarm) -> None: my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node is not None + assert my_node.instance_deploy_manager is not None my_node.instance_deploy_manager.start_simulations_instance() execute(boot_simulation_wrapper, self.run_farm, hosts=all_runfarm_ips) - def kill_simulation_passes(self, use_mock_instances_for_testing, disconnect_all_nbds=True): + def kill_simulation_passes(self, use_mock_instances_for_testing: bool, disconnect_all_nbds: bool = True) -> None: """ Passes that kill the simulator. """ self.run_farm.post_launch_binding(use_mock_instances_for_testing) all_runfarm_ips = [x.get_ip() for x in self.run_farm.get_all_host_nodes()] @parallel - def kill_switch_wrapper(runfarm): + def kill_switch_wrapper(runfarm: RunFarm) -> None: my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node is not None + assert my_node.instance_deploy_manager is not None my_node.instance_deploy_manager.kill_switches_instance() @parallel - def kill_simulation_wrapper(runfarm): + def kill_simulation_wrapper(runfarm: RunFarm) -> None: my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node is not None + assert my_node.instance_deploy_manager is not None my_node.instance_deploy_manager.kill_simulations_instance(disconnect_all_nbds=disconnect_all_nbds) execute(kill_switch_wrapper, self.run_farm, hosts=all_runfarm_ips) execute(kill_simulation_wrapper, self.run_farm, hosts=all_runfarm_ips) - def screens(): + def screens() -> None: """ poll on screens to make sure kill succeeded. """ with warn_only(): rootLogger.info("Confirming exit...") @@ -460,14 +493,16 @@ class FireSimTopologyWithPasses: execute(screens, hosts=all_runfarm_ips) - def run_workload_passes(self, use_mock_instances_for_testing): + def run_workload_passes(self, use_mock_instances_for_testing: bool) -> None: """ extra passes needed to do runworkload. """ - if use_mock_instances_for_testing: - self.run_farm.bind_mock_instances_to_objects() - else: - self.run_farm.bind_real_instances_to_objects() - all_runfarm_ips = [x.get_private_ip() for x in self.run_farm.get_all_host_nodes()] + if isinstance(self.run_farm, AWSEC2F1): + if use_mock_instances_for_testing: + self.run_farm.bind_mock_instances_to_objects() + else: + self.run_farm.bind_real_instances_to_objects() + + all_runfarm_ips = [x.get_ip() for x in self.run_farm.get_all_host_nodes()] rootLogger.info("""Creating the directory: {}""".format(self.workload.job_results_dir)) with StreamLogger('stdout'), StreamLogger('stderr'): @@ -479,14 +514,15 @@ class FireSimTopologyWithPasses: self.boot_simulation_passes(False, skip_instance_binding=True) @parallel - def monitor_jobs_wrapper(runfarm, completed_jobs, teardown, terminateoncompletion, job_results_dir): + def monitor_jobs_wrapper(runfarm, completed_jobs: List[str], teardown: bool, terminateoncompletion: bool, job_results_dir: str) -> Dict[str, Dict[str, bool]]: """ on each instance, check over its switches and simulations to copy results off. """ my_node = runfarm.lookup_by_ip_addr(env.host_string) + assert my_node.instance_deploy_manager is not None return my_node.instance_deploy_manager.monitor_jobs_instance(completed_jobs, teardown, terminateoncompletion, job_results_dir) - def loop_logger(instancestates, terminateoncompletion): + def loop_logger(instancestates: Dict[str, Any], terminateoncompletion: bool) -> None: """ Print the simulation status nicely. """ instancestate_map = dict() @@ -526,10 +562,10 @@ class FireSimTopologyWithPasses: # clear the screen rootLogger.info('\033[2J') - rootLogger.info("""FireSim Simulation Status @ {}""".format(str(datetime.datetime.utcnow()))) + rootLogger.info("""FireSim Simulation Status @ {}""".format(str(datetime.utcnow()))) rootLogger.info("-"*80) rootLogger.info("""This workload's output is located in:\n{}""".format(self.workload.job_results_dir)) - rootLogger.info("""This run's log is located in:\n{}""".format(rootLogger.handlers[0].baseFilename)) + rootLogger.info("""This run's log is located in:\n{}""".format(rootLogger.handlers[0].filename)) rootLogger.info("""This status will update every 10s.""") rootLogger.info("-"*80) rootLogger.info("Instances") diff --git a/deploy/runtools/run_farm.py b/deploy/runtools/run_farm.py index 4008fb15..c3c5bc26 100644 --- a/deploy/runtools/run_farm.py +++ b/deploy/runtools/run_farm.py @@ -5,12 +5,11 @@ from datetime import timedelta import abc import pprint -from util.streamlogger import StreamLogger from awstools.awstools import * from runtools.run_farm_instances import * from util.inheritors import inheritors -from typing import Dict, List, Any, Optional, Sequence +from typing import Dict, List, Any, Optional rootLogger = logging.getLogger() @@ -97,8 +96,7 @@ class AWSEC2F1(RunFarm): self.f1_2s = [F1Inst(1) for x in range(num_f1_2)] self.m4_16s = [M4_16() for x in range(num_m4_16)] - allinsts = self.f1_16s + self.f1_2s + self.f1_4s + self.m4_16s - for node in allinsts: + for node in [*self.f1_16s, *self.f1_2s, *self.f1_4s, *self.m4_16s]: node.set_sim_dir(self.default_simulation_dir) def bind_mock_instances_to_objects(self) -> None: @@ -115,13 +113,8 @@ class AWSEC2F1(RunFarm): for index in range(len(self.m4_16s)): self.m4_16s[index].assign_boto3_instance_object(MockBoto3Instance()) - def post_launch_binding(self, mock: bool = False) -> None: + def bind_real_instances_to_objects(self) -> None: """ Attach running instances to the Run Farm. """ - - if mock: - self.bind_mock_instances_to_objects() - return - # fetch instances based on tag, # populate IP addr list for use in the rest of our tasks. # we always sort by private IP when handling instances @@ -286,7 +279,7 @@ class AWSEC2F1(RunFarm): def get_all_host_nodes(self) -> List[Inst]: """ Get inst objects for all host nodes in the run farm that are bound to a real instance. """ - allinsts = self.f1_16s + self.f1_2s + self.f1_4s + self.m4_16s + allinsts = [*self.f1_16s, *self.f1_2s, *self.f1_4s, *self.m4_16s] return [inst for inst in allinsts if inst.boto3_instance_object is not None] def lookup_by_ip_addr(self, ipaddr) -> Optional[Inst]: diff --git a/deploy/runtools/run_farm_instances.py b/deploy/runtools/run_farm_instances.py index 0eecc5fb..e98aff62 100644 --- a/deploy/runtools/run_farm_instances.py +++ b/deploy/runtools/run_farm_instances.py @@ -2,34 +2,44 @@ import re import logging - +import abc +from fabric.api import prefix, local, run, env, cd, warn_only, put, settings, hide # type: ignore from fabric.contrib.project import rsync_project # type: ignore -from util.streamlogger import StreamLogger import time +from os.path import join as pjoin + +from runtools.firesim_topology_elements import * +from util.streamlogger import StreamLogger +from awstools.awstools import terminate_instances, get_instance_ids_for_instances + +from typing import List, Dict, Optional, Union +from mypy_boto3_ec2.service_resource import Instance as EC2InstanceResource rootLogger = logging.getLogger() -def remote_kmsg(message): +def remote_kmsg(message: str) -> None: """ This will let you write whatever is passed as message into the kernel log of the remote machine. Useful for figuring what the manager is doing w.r.t output from kernel stuff on the remote node. """ commd = """echo '{}' | sudo tee /dev/kmsg""".format(message) run(commd, shell=True) -class NBDTracker(object): +class NBDTracker: """ Track allocation of NBD devices on an instance. Used for mounting qcow2 images.""" # max number of NBDs allowed by the nbd.ko kernel module - NBDS_MAX = 128 + NBDS_MAX: int = 128 + unallocd: List[str] + allocated_dict: Dict[str, str] - def __init__(self): + def __init__(self) -> None: self.unallocd = ["""/dev/nbd{}""".format(x) for x in range(self.NBDS_MAX)] # this is a mapping from .qcow2 image name to nbd device. self.allocated_dict = {} - def get_nbd_for_imagename(self, imagename): + def get_nbd_for_imagename(self, imagename: str) -> str: """ Call this when you need to allocate an nbd for a particular image, or when you need to know what nbd device is for that image. @@ -47,46 +57,56 @@ class MockBoto3Instance: """ This is used for testing without actually launching instances. """ # don't use 0 unless you want stuff copied to your own instance. - base_ip = 1 + base_ip: int = 1 + ip_addr_int: int + private_ip_address: str - def __init__(self): + def __init__(self) -> None: self.ip_addr_int = MockBoto3Instance.base_ip MockBoto3Instance.base_ip += 1 self.private_ip_address = ".".join([str((self.ip_addr_int >> (8*x)) & 0xFF) for x in [3, 2, 1, 0]]) -class Inst(object): +class Inst(metaclass=abc.ABCMeta): # TODO: this is leftover from when we could only support switch slots. # This can be removed once self.switch_slots is dynamically allocated. # Just make it arbitrarily large for now. - SWITCH_SLOTS = 100000 + SWITCH_SLOTS: int = 100000 + switch_slots: List[Optional[FireSimSwitchNode]] + switch_slots_consumed: int + _next_port: int + override_simulation_dir: Optional[str] + instance_deploy_manager: Optional[InstanceDeployManager] - def __init__(self): - super(Inst, self).__init__() + def __init__(self) -> None: + super().__init__() self.switch_slots = [None for x in range(self.SWITCH_SLOTS)] self.switch_slots_consumed = 0 self._next_port = 10000 # track ports to allocate for server switch model ports self.override_simulation_dir = None + self.instance_deploy_manager = None - def get_ip(self): + @abc.abstractmethod + def get_ip(self) -> str: raise NotImplementedError - def set_ip(self, ip): + @abc.abstractmethod + def set_ip(self, ip: str) -> None: raise NotImplementedError - def set_sim_dir(self, drctry): + def set_sim_dir(self, drctry: str) -> None: self.override_simulation_dir = drctry - def add_switch(self, firesimswitchnode): + def add_switch(self, firesimswitchnode: FireSimSwitchNode) -> None: """ Add a switch to the next available switch slot. """ assert self.switch_slots_consumed < self.SWITCH_SLOTS self.switch_slots[self.switch_slots_consumed] = firesimswitchnode firesimswitchnode.assign_host_instance(self) self.switch_slots_consumed += 1 - def get_num_switch_slots_consumed(self): + def get_num_switch_slots_consumed(self) -> int: return self.switch_slots_consumed - def allocate_host_port(self): + def allocate_host_port(self) -> int: """ Allocate a port to use for something on the host. Successive calls will return a new port. """ retport = self._next_port @@ -94,62 +114,67 @@ class Inst(object): self._next_port += 1 return retport - def is_fpga_node(self): + def is_fpga_node(self) -> bool: return False class EC2Inst(Inst): - def __init__(self): + boto3_instance_object: Optional[Union[EC2InstanceResource, MockBoto3Instance]] + nbd_tracker: NBDTracker + + def __init__(self) -> None: self.boto3_instance_object = None self.instance_deploy_manager = EC2InstanceDeployManager(self) self.nbd_tracker = NBDTracker() - super(EC2Inst, self).__init__() + super().__init__() - def assign_boto3_instance_object(self, boto3obj): + def assign_boto3_instance_object(self, boto3obj: Union[EC2InstanceResource, MockBoto3Instance]) -> None: self.boto3_instance_object = boto3obj - def is_bound_to_real_instance(self): + def is_bound_to_real_instance(self) -> bool: return self.boto3_instance_object is not None - def get_ip(self): - if is_on_ec2(): - return "centos@" + self.boto3_instance_object.private_ip_address - else: - return "centos@" + self.boto3_instance_object.public_ip_address + def get_ip(self) -> str: + assert self.boto3_instance_object is not None # has to be duplicated to satisfy mypy + return "centos@" + self.boto3_instance_object.private_ip_address - def set_ip(self): + def set_ip(self, ip: str) -> None: return -class FPGAInst(object): - def __init__(self): - super(FPGAInst, self).__init__() +class FPGAInst(Inst): + num_fpga_slots: int + fpga_slots: List[Optional[FireSimServerNode]] + fpga_slots_consumed: int + + def __init__(self) -> None: + super().__init__() self.num_fpga_slots = 0 self.fpga_slots = [] self.fpga_slots_consumed = 0 - def get_num_fpga_slots_max(self): + def get_num_fpga_slots_max(self) -> int: """ Get the number of fpga slots. """ return self.num_fpga_slots - def get_num_fpga_slots_consumed(self): + def get_num_fpga_slots_consumed(self) -> int: """ Get the number of fpga slots. """ return self.fpga_slots_consumed - def add_simulation(self, firesimservernode): + def add_simulation(self, firesimservernode: FireSimServerNode) -> None: """ Add a simulation to the next available slot. """ assert self.fpga_slots_consumed < self.num_fpga_slots self.fpga_slots[self.fpga_slots_consumed] = firesimservernode - firesimservernode.assign_host_instance(self) + firesimservernode.assign_host_instance(self) # type: ignore self.fpga_slots_consumed += 1 - def is_fpga_node(self): + def is_fpga_node(self) -> bool: return True class F1Inst(FPGAInst, EC2Inst): - instance_counter = 0 - NAME = "aws-ec2-f1" + instance_counter: int = 0 + instance_id: int - def __init__(self, num_fpga_slots): - super(F1Inst, self).__init__() + def __init__(self, num_fpga_slots: int) -> None: + super().__init__() self.num_fpga_slots = num_fpga_slots self.fpga_slots = [None for x in range(self.num_fpga_slots)] self.instance_id = F1Inst.instance_counter @@ -157,56 +182,40 @@ class F1Inst(FPGAInst, EC2Inst): self.instance_deploy_manager = EC2InstanceDeployManager(self) - class M4_16(EC2Inst): - instance_counter = 0 + instance_counter: int = 0 + instance_id: int - def __init__(self): - super(M4_16, self).__init__() + def __init__(self) -> None: + super().__init__() self.instance_id = M4_16.instance_counter M4_16.instance_counter += 1 -class VitisInst(FPGAInst, Inst): - instance_counter = 0 - NAME = "vitis" +class InstanceDeployManager(metaclass=abc.ABCMeta): + @abc.abstractmethod + def infrasetup_instance(self) -> None: + raise NotImplementedError - def __init__(self, num_fpga_slots): - super(VitisInst, self).__init__() - self.num_fpga_slots = num_fpga_slots - self.fpga_slots = [None for x in range(self.num_fpga_slots)] - self.instance_id = VitisInst.instance_counter - VitisInst.instance_counter += 1 + @abc.abstractmethod + def start_switches_instance(self) -> None: + raise NotImplementedError - self.instance_deploy_manager = VitisInstanceDeployManager(self) - self.ip_addr = None + @abc.abstractmethod + def start_simulations_instance(self) -> None: + raise NotImplementedError - def get_ip(self): - return self.ip_addr + @abc.abstractmethod + def kill_switches_instance(self) -> None: + raise NotImplementedError - def set_ip(self, ip): - self.ip_addr = ip + @abc.abstractmethod + def kill_simulations_instance(self, disconnect_all_nbds: bool = True) -> None: + raise NotImplementedError -class InstanceDeployManager: - def __init__(self): - return - - def infrasetup_instance(self): - return - - def start_switches_instance(self): - return - - def start_simulations_instance(self): - return - - def kill_switches_instance(self): - return - - def kill_simulations_instance(self, disconnect_all_nbds=True): - return - - def monitor_jobs_instance(self): - return + @abc.abstractmethod + def monitor_jobs_instance(self, completed_jobs: List[str], teardown: bool, terminateoncompletion: bool, + job_results_dir: str) -> Dict[str, Dict[str, bool]]: + raise NotImplementedError class EC2InstanceDeployManager(InstanceDeployManager): """ This class manages actually deploying/running stuff based on the @@ -214,16 +223,19 @@ class EC2InstanceDeployManager(InstanceDeployManager): This is in charge of managing the locations of stuff on remote nodes. """ + parentnode: EC2Inst - def __init__(self, parentnode): + def __init__(self, parentnode: EC2Inst) -> None: self.parentnode = parentnode - def instance_logger(self, logstr): + def instance_logger(self, logstr: str) -> None: rootLogger.info("""[{}] """.format(env.host_string) + logstr) - def get_and_install_aws_fpga_sdk(self): + def get_and_install_aws_fpga_sdk(self) -> None: """ Installs the aws-sdk. This gets us access to tools to flash the fpga. """ + assert isinstance(self.parentnode, FPGAInst) + with prefix('cd ../'), \ StreamLogger('stdout'), \ StreamLogger('stderr'): @@ -238,9 +250,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): with cd('/home/centos/aws-fpga'), StreamLogger('stdout'), StreamLogger('stderr'): run('source sdk_setup.sh') - - - def fpga_node_xdma(self): + def fpga_node_xdma(self) -> None: """ Copy XDMA infra to remote node. This assumes that the driver was already built and that a binary exists in the directory on this machine """ @@ -257,7 +267,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): run('make clean') run('make') - def fpga_node_qcow(self): + def fpga_node_qcow(self) -> None: """ Install qemu-img management tools and copy NBD infra to remote node. This assumes that the kernel module was already built and exists in the directory on this machine. @@ -270,16 +280,17 @@ class EC2InstanceDeployManager(InstanceDeployManager): # copy over kernel module put('../build/nbd.ko', '/home/centos/nbd.ko', mirror_local_mode=True) - def load_nbd_module(self): + def load_nbd_module(self) -> None: """ load the nbd module. always unload the module first to ensure it is in a clean state. """ + self.unload_nbd_module() # now load xdma self.instance_logger("Loading NBD Kernel Module.") with StreamLogger('stdout'), StreamLogger('stderr'): run("""sudo insmod /home/centos/nbd.ko nbds_max={}""".format(self.parentnode.nbd_tracker.NBDS_MAX)) - def unload_nbd_module(self): + def unload_nbd_module(self) -> None: """ unload the nbd module. """ self.instance_logger("Unloading NBD Kernel Module.") @@ -288,7 +299,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): run('sudo rmmod nbd') - def disconnect_all_nbds_instance(self): + def disconnect_all_nbds_instance(self) -> None: """ Disconnect all nbds on the instance. """ self.instance_logger("Disconnecting all NBDs.") @@ -301,7 +312,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): run("; ".join(fullcmd)) - def unload_xrt_and_xocl(self): + def unload_xrt_and_xocl(self) -> None: self.instance_logger("Unloading XRT-related Kernel Modules.") with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): @@ -312,7 +323,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): run('sudo yum remove -y xrt xrt-aws') remote_kmsg("removing_xrt_end") - def unload_xdma(self): + def unload_xdma(self) -> None: self.instance_logger("Unloading XDMA Driver Kernel Module.") with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): @@ -325,59 +336,61 @@ class EC2InstanceDeployManager(InstanceDeployManager): #self.instance_logger("Waiting 10 seconds after removing kernel modules (esp. xocl).") #time.sleep(10) - def clear_fpgas(self): - # we always clear ALL fpga slots - for slotno in range(self.parentnode.get_num_fpga_slots_max()): - self.instance_logger("""Clearing FPGA Slot {}.""".format(slotno)) - with StreamLogger('stdout'), StreamLogger('stderr'): - remote_kmsg("""about_to_clear_fpga{}""".format(slotno)) - run("""sudo fpga-clear-local-image -S {} -A""".format(slotno)) - remote_kmsg("""done_clearing_fpga{}""".format(slotno)) + def clear_fpgas(self) -> None: + if isinstance(self.parentnode, FPGAInst): + # we always clear ALL fpga slots + for slotno in range(self.parentnode.get_num_fpga_slots_max()): + self.instance_logger("""Clearing FPGA Slot {}.""".format(slotno)) + with StreamLogger('stdout'), StreamLogger('stderr'): + remote_kmsg("""about_to_clear_fpga{}""".format(slotno)) + run("""sudo fpga-clear-local-image -S {} -A""".format(slotno)) + remote_kmsg("""done_clearing_fpga{}""".format(slotno)) - for slotno in range(self.parentnode.get_num_fpga_slots_max()): - self.instance_logger("""Checking for Cleared FPGA Slot {}.""".format(slotno)) - with StreamLogger('stdout'), StreamLogger('stderr'): - remote_kmsg("""about_to_check_clear_fpga{}""".format(slotno)) - run("""until sudo fpga-describe-local-image -S {} -R -H | grep -q "cleared"; do sleep 1; done""".format(slotno)) - remote_kmsg("""done_checking_clear_fpga{}""".format(slotno)) + for slotno in range(self.parentnode.get_num_fpga_slots_max()): + self.instance_logger("""Checking for Cleared FPGA Slot {}.""".format(slotno)) + with StreamLogger('stdout'), StreamLogger('stderr'): + remote_kmsg("""about_to_check_clear_fpga{}""".format(slotno)) + run("""until sudo fpga-describe-local-image -S {} -R -H | grep -q "cleared"; do sleep 1; done""".format(slotno)) + remote_kmsg("""done_checking_clear_fpga{}""".format(slotno)) - def flash_fpgas(self): - dummyagfi = None - for firesimservernode, slotno in zip(self.parentnode.fpga_slots, range(self.parentnode.get_num_fpga_slots_consumed())): - if firesimservernode is not None: - agfi = firesimservernode.get_agfi() - dummyagfi = agfi - self.instance_logger("""Flashing FPGA Slot: {} with agfi: {}.""".format(slotno, agfi)) + def flash_fpgas(self) -> None: + if isinstance(self.parentnode, FPGAInst): + dummyagfi = None + for firesimservernode, slotno in zip(self.parentnode.fpga_slots, range(self.parentnode.get_num_fpga_slots_consumed())): + if firesimservernode is not None: + agfi = firesimservernode.get_agfi() + dummyagfi = agfi + self.instance_logger("""Flashing FPGA Slot: {} with agfi: {}.""".format(slotno, agfi)) + with StreamLogger('stdout'), StreamLogger('stderr'): + run("""sudo fpga-load-local-image -S {} -I {} -A""".format( + slotno, agfi)) + + # We only do this because XDMA hangs if some of the FPGAs on the instance + # are left in the cleared state. So, if you're only using some of the + # FPGAs on an instance, we flash the rest with one of your images + # anyway. Since the only interaction we have with an FPGA right now + # is over PCIe where the software component is mastering, this can't + # break anything. + for slotno in range(self.parentnode.get_num_fpga_slots_consumed(), self.parentnode.get_num_fpga_slots_max()): + self.instance_logger("""Flashing FPGA Slot: {} with dummy agfi: {}.""".format(slotno, dummyagfi)) with StreamLogger('stdout'), StreamLogger('stderr'): run("""sudo fpga-load-local-image -S {} -I {} -A""".format( - slotno, agfi)) + slotno, dummyagfi)) - # We only do this because XDMA hangs if some of the FPGAs on the instance - # are left in the cleared state. So, if you're only using some of the - # FPGAs on an instance, we flash the rest with one of your images - # anyway. Since the only interaction we have with an FPGA right now - # is over PCIe where the software component is mastering, this can't - # break anything. - for slotno in range(self.parentnode.get_num_fpga_slots_consumed(), self.parentnode.get_num_fpga_slots_max()): - self.instance_logger("""Flashing FPGA Slot: {} with dummy agfi: {}.""".format(slotno, dummyagfi)) - with StreamLogger('stdout'), StreamLogger('stderr'): - run("""sudo fpga-load-local-image -S {} -I {} -A""".format( - slotno, dummyagfi)) + for firesimservernode, slotno in zip(self.parentnode.fpga_slots, range(self.parentnode.get_num_fpga_slots_consumed())): + if firesimservernode is not None: + self.instance_logger("""Checking for Flashed FPGA Slot: {} with agfi: {}.""".format(slotno, agfi)) + with StreamLogger('stdout'), StreamLogger('stderr'): + run("""until sudo fpga-describe-local-image -S {} -R -H | grep -q "loaded"; do sleep 1; done""".format(slotno)) - for firesimservernode, slotno in zip(self.parentnode.fpga_slots, range(self.parentnode.get_num_fpga_slots_consumed())): - if firesimservernode is not None: - self.instance_logger("""Checking for Flashed FPGA Slot: {} with agfi: {}.""".format(slotno, agfi)) + for slotno in range(self.parentnode.get_num_fpga_slots_consumed(), self.parentnode.get_num_fpga_slots_max()): + self.instance_logger("""Checking for Flashed FPGA Slot: {} with agfi: {}.""".format(slotno, dummyagfi)) with StreamLogger('stdout'), StreamLogger('stderr'): run("""until sudo fpga-describe-local-image -S {} -R -H | grep -q "loaded"; do sleep 1; done""".format(slotno)) - for slotno in range(self.parentnode.get_num_fpga_slots_consumed(), self.parentnode.get_num_fpga_slots_max()): - self.instance_logger("""Checking for Flashed FPGA Slot: {} with agfi: {}.""".format(slotno, dummyagfi)) - with StreamLogger('stdout'), StreamLogger('stderr'): - run("""until sudo fpga-describe-local-image -S {} -R -H | grep -q "loaded"; do sleep 1; done""".format(slotno)) - - def load_xdma(self): + def load_xdma(self) -> None: """ load the xdma kernel module. """ # fpga mgmt tools seem to force load xocl after a flash now... # xocl conflicts with the xdma driver, which we actually want to use @@ -389,7 +402,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): with StreamLogger('stdout'), StreamLogger('stderr'): run("sudo insmod /home/centos/xdma/linux_kernel_drivers/xdma/xdma.ko poll_mode=1") - def start_ila_server(self): + def start_ila_server(self) -> None: """ start the vivado hw_server and virtual jtag on simulation instance.) """ self.instance_logger("Starting Vivado hw_server.") with StreamLogger('stdout'), StreamLogger('stderr'): @@ -398,15 +411,18 @@ class EC2InstanceDeployManager(InstanceDeployManager): with StreamLogger('stdout'), StreamLogger('stderr'): run("""screen -S virtual_jtag -d -m bash -c "script -f -c 'sudo fpga-start-virtual-jtag -P 10201 -S 0'"; sleep 1""") - def kill_ila_server(self): + def kill_ila_server(self) -> None: """ Kill the vivado hw_server and virtual jtag """ with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): run("sudo pkill -SIGKILL hw_server") with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): run("sudo pkill -SIGKILL fpga-local-cmd") - def copy_sim_slot_infrastructure(self, slotno): + def copy_sim_slot_infrastructure(self, slotno: int) -> None: """ copy all the simulation infrastructure to the remote node. """ + + assert isinstance(self.parentnode, FPGAInst) + serv = self.parentnode.fpga_slots[slotno] if serv is None: # slot unassigned @@ -434,7 +450,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): run("""cp -r {}/* {}/""".format(remote_sim_rsync_dir, remote_sim_dir), shell=True) - def copy_switch_slot_infrastructure(self, switchslot): + def copy_switch_slot_infrastructure(self, switchslot: int) -> None: self.instance_logger("""Copying switch simulation infrastructure for switch slot: {}.""".format(switchslot)) remote_home_dir = self.parentnode.override_simulation_dir @@ -444,64 +460,75 @@ class EC2InstanceDeployManager(InstanceDeployManager): run("""mkdir -p {}""".format(remote_switch_dir)) switch = self.parentnode.switch_slots[switchslot] + assert switch is not None files_to_copy = switch.get_required_files_local_paths() for local_path, remote_path in files_to_copy: with StreamLogger('stdout'), StreamLogger('stderr'): put(local_path, pjoin(remote_switch_dir, remote_path), mirror_local_mode=True) - def start_switch_slot(self, switchslot): + def start_switch_slot(self, switchslot: int) -> None: self.instance_logger("""Starting switch simulation for switch slot: {}.""".format(switchslot)) remote_home_dir = self.parentnode.override_simulation_dir remote_switch_dir = """{}/switch_slot_{}/""".format(remote_home_dir, switchslot) switch = self.parentnode.switch_slots[switchslot] + assert switch is not None with cd(remote_switch_dir), StreamLogger('stdout'), StreamLogger('stderr'): run(switch.get_switch_start_command()) - def start_sim_slot(self, slotno): + def start_sim_slot(self, slotno: int) -> None: + assert isinstance(self.parentnode, FPGAInst) + self.instance_logger("""Starting FPGA simulation for slot: {}.""".format(slotno)) remote_home_dir = self.parentnode.override_simulation_dir remote_sim_dir = """{}/sim_slot_{}/""".format(remote_home_dir, slotno) server = self.parentnode.fpga_slots[slotno] + assert server is not None with cd(remote_sim_dir), StreamLogger('stdout'), StreamLogger('stderr'): server.run_sim_start_command(slotno) - def kill_switch_slot(self, switchslot): + def kill_switch_slot(self, switchslot: int) -> None: """ kill the switch in slot switchslot. """ self.instance_logger("""Killing switch simulation for switchslot: {}.""".format(switchslot)) switch = self.parentnode.switch_slots[switchslot] + assert switch is not None with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): run(switch.get_switch_kill_command()) - def kill_sim_slot(self, slotno): + def kill_sim_slot(self, slotno: int) -> None: + assert isinstance(self.parentnode, FPGAInst) + self.instance_logger("""Killing FPGA simulation for slot: {}.""".format(slotno)) server = self.parentnode.fpga_slots[slotno] + assert server is not None with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): run(server.get_sim_kill_command(slotno)) - def instance_assigned_simulations(self): + def instance_assigned_simulations(self) -> bool: """ return true if this instance has any assigned fpga simulations. """ - if not isinstance(self.parentnode, M4_16): + if isinstance(self.parentnode, FPGAInst): if any(self.parentnode.fpga_slots): return True return False - def instance_assigned_switches(self): + def instance_assigned_switches(self) -> bool: """ return true if this instance has any assigned switch simulations. """ if any(self.parentnode.switch_slots): return True return False - def infrasetup_instance(self): + def infrasetup_instance(self) -> None: """ Handle infrastructure setup for this instance. """ # check if fpga node if self.instance_assigned_simulations(): # This is an FPGA-host node. + assert isinstance(self.parentnode, FPGAInst) + # copy fpga sim infrastructure for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): self.copy_sim_slot_infrastructure(slotno) @@ -536,7 +563,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): self.copy_switch_slot_infrastructure(slotno) - def start_switches_instance(self): + def start_switches_instance(self) -> None: """ Boot up all the switches in a screen. """ # remove shared mem pages used by switches if self.instance_assigned_switches(): @@ -546,14 +573,15 @@ class EC2InstanceDeployManager(InstanceDeployManager): for slotno in range(self.parentnode.get_num_switch_slots_consumed()): self.start_switch_slot(slotno) - def start_simulations_instance(self): + def start_simulations_instance(self) -> None: """ Boot up all the sims in a screen. """ if self.instance_assigned_simulations(): + assert isinstance(self.parentnode, FPGAInst) # only on sim nodes for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): self.start_sim_slot(slotno) - def kill_switches_instance(self): + def kill_switches_instance(self) -> None: """ Kill all the switches on this instance. """ if self.instance_assigned_switches(): for slotno in range(self.parentnode.get_num_switch_slots_consumed()): @@ -561,9 +589,10 @@ class EC2InstanceDeployManager(InstanceDeployManager): with StreamLogger('stdout'), StreamLogger('stderr'): run("sudo rm -rf /dev/shm/*") - def kill_simulations_instance(self, disconnect_all_nbds=True): + def kill_simulations_instance(self, disconnect_all_nbds: bool = True) -> None: """ Kill all simulations on this instance. """ if self.instance_assigned_simulations(): + assert isinstance(self.parentnode, FPGAInst) # only on sim nodes for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): self.kill_sim_slot(slotno) @@ -571,7 +600,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): # disconnect all NBDs self.disconnect_all_nbds_instance() - def running_simulations(self): + def running_simulations(self) -> Dict[str, List[str]]: """ collect screen results from node to see what's running on it. """ simdrivers = [] switches = [] @@ -581,16 +610,20 @@ class EC2InstanceDeployManager(InstanceDeployManager): if "(Detached)" in line or "(Attached)" in line: line_stripped = line.strip() if "fsim" in line: - line_stripped = re.search('fsim([0-9][0-9]*)', line_stripped).group(0) + re_search_results = re.search('fsim([0-9][0-9]*)', line_stripped) + assert re_search_results is not None + line_stripped = re_search_results.group(0) line_stripped = line_stripped.replace('fsim', '') simdrivers.append(line_stripped) elif "switch" in line: - line_stripped = re.search('switch([0-9][0-9]*)', line_stripped).group(0) + re_search_results = re.search('switch([0-9][0-9]*)', line_stripped) + assert re_search_results is not None + line_stripped = re_search_results.group(0) switches.append(line_stripped) return {'switches': switches, 'simdrivers': simdrivers} - def monitor_jobs_instance(self, completed_jobs, teardown, terminateoncompletion, - job_results_dir): + def monitor_jobs_instance(self, completed_jobs: List[str], teardown: bool, terminateoncompletion: bool, + job_results_dir: str) -> Dict[str, Dict[str, bool]]: """ Job monitoring for this instance. """ # make a local copy of completed_jobs, so that we can update it completed_jobs = list(completed_jobs) @@ -609,6 +642,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): numswitchesused = self.parentnode.get_num_switch_slots_consumed() for counter in range(numswitchesused): switchsim = self.parentnode.switch_slots[counter] + assert switchsim is not None switchsim.copy_back_switchlog_from_run(job_results_dir, counter) if terminateoncompletion: @@ -623,6 +657,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): # not teardown - just get the status of the switch sims switchescompleteddict = {k: False for k in self.running_simulations()['switches']} for switchsim in self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]: + assert switchsim is not None swname = switchsim.switch_builder.switch_binary_name() if swname not in switchescompleteddict.keys(): switchescompleteddict[swname] = True @@ -631,13 +666,15 @@ class EC2InstanceDeployManager(InstanceDeployManager): if self.instance_assigned_simulations(): # this node has fpga sims attached + assert isinstance(self.parentnode, FPGAInst) + # first, figure out which jobs belong to this instance. # if they are all completed already. RETURN, DON'T TRY TO DO ANYTHING # ON THE INSTNACE. parentslots = self.parentnode.fpga_slots rootLogger.debug("parentslots " + str(parentslots)) num_parentslots_used = self.parentnode.fpga_slots_consumed - jobnames = [slot.get_job_name() for slot in parentslots[0:num_parentslots_used]] + jobnames = [slot.get_job_name() for slot in parentslots[0:num_parentslots_used] if slot is not None] rootLogger.debug("jobnames " + str(jobnames)) already_done = all([job in completed_jobs for job in jobnames]) rootLogger.debug("already done? " + str(already_done)) @@ -655,6 +692,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): if self.instance_assigned_switches(): # fill in whether switches have terminated for some reason for switchsim in self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]: + assert switchsim is not None swname = switchsim.switch_builder.switch_binary_name() if swname not in switchescompleteddict.keys(): switchescompleteddict[swname] = True @@ -667,7 +705,9 @@ class EC2InstanceDeployManager(InstanceDeployManager): if str(slotno) not in slotsrunning and jobname not in completed_jobs: self.instance_logger("Slot " + str(slotno) + " completed! copying results.") # NOW, we must copy off the results of this sim, since it just exited - parentslots[slotno].copy_back_job_results_from_run(slotno) + parent = parentslots[slotno] + assert parent is not None + parent.copy_back_job_results_from_run(slotno) # add our job to our copy of completed_jobs, so that next, # we can test again to see if this instance is "done" and # can be terminated @@ -694,6 +734,7 @@ class EC2InstanceDeployManager(InstanceDeployManager): self.kill_switches_instance() for counter, switchsim in enumerate(self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]): + assert switchsim is not None switchsim.copy_back_switchlog_from_run(job_results_dir, counter) if now_done and terminateoncompletion: @@ -704,293 +745,5 @@ class EC2InstanceDeployManager(InstanceDeployManager): return {'switches': switchescompleteddict, 'sims': jobs_done_q} -class VitisInstanceDeployManager(InstanceDeployManager): - """ This class manages actually deploying/running stuff based on the - definition of an instance and the simulations/switches assigned to it. - - This is in charge of managing the locations of stuff on remote nodes. - """ - - def __init__(self, parentnode): - self.parentnode = parentnode - - def instance_logger(self, logstr): - rootLogger.info("""[{}] """.format(env.host_string) + logstr) - - def clear_fpgas(self): - self.instance_logger("""Clearing all FPGA Slots.""") - - card_bdfs = [] - with settings(warn_only=True), hide('everything'): - collect = run('xbutil examine') - for line in collect.splitlines(): - line_stripped = line.strip() - match = re.search('\[(.*)\]', line_stripped) - if match: - card_bdfs.append(match.group(1)) - - for card_bdf in card_bdfs: - with StreamLogger('stdout'), StreamLogger('stderr'): - run("xbutil validate --device {} --run quick".format(card_bdf)) - - def copy_sim_slot_infrastructure(self, slotno): - """ copy all the simulation infrastructure to the remote node. """ - serv = self.parentnode.fpga_slots[slotno] - if serv is None: - # slot unassigned - return - - self.instance_logger("""Copying FPGA simulation infrastructure for slot: {}.""".format(slotno)) - - remote_home_dir = self.parentnode.override_simulation_dir - - remote_sim_dir = """{}/sim_slot_{}/""".format(remote_home_dir, slotno) - remote_sim_rsync_dir = remote_sim_dir + "rsyncdir/" - with StreamLogger('stdout'), StreamLogger('stderr'): - run("""mkdir -p {}""".format(remote_sim_rsync_dir)) - - files_to_copy = serv.get_required_files_local_paths() - for filename in files_to_copy: - # here, filename is a pair of (local path, remote path) - with StreamLogger('stdout'), StreamLogger('stderr'): - # -z --inplace - rsync_cap = rsync_project(local_dir=filename[0], remote_dir=remote_sim_rsync_dir + '/' + filename[1], - ssh_opts="-o StrictHostKeyChecking=no", extra_opts="-L", capture=True) - rootLogger.debug(rsync_cap) - rootLogger.debug(rsync_cap.stderr) - - with StreamLogger('stdout'), StreamLogger('stderr'): - run("""cp -r {}/* {}/""".format(remote_sim_rsync_dir, remote_sim_dir), shell=True) - - - def copy_switch_slot_infrastructure(self, switchslot): - self.instance_logger("""Copying switch simulation infrastructure for switch slot: {}.""".format(switchslot)) - - remote_home_dir = self.parentnode.override_simulation_dir - - remote_switch_dir = """{}/switch_slot_{}/""".format(remote_home_dir, switchslot) - with StreamLogger('stdout'), StreamLogger('stderr'): - run("""mkdir -p {}""".format(remote_switch_dir)) - - switch = self.parentnode.switch_slots[switchslot] - files_to_copy = switch.get_required_files_local_paths() - for filename in files_to_copy: - with StreamLogger('stdout'), StreamLogger('stderr'): - put(filename, remote_switch_dir, mirror_local_mode=True) - - def start_switch_slot(self, switchslot): - self.instance_logger("""Starting switch simulation for switch slot: {}.""".format(switchslot)) - - remote_home_dir = self.parentnode.override_simulation_dir - - remote_switch_dir = """{}/switch_slot_{}/""".format(remote_home_dir, switchslot) - switch = self.parentnode.switch_slots[switchslot] - with cd(remote_switch_dir), StreamLogger('stdout'), StreamLogger('stderr'): - run(switch.get_switch_start_command()) - - def start_sim_slot(self, slotno): - self.instance_logger("""Starting FPGA simulation for slot: {}.""".format(slotno)) - - remote_home_dir = self.parentnode.override_simulation_dir - - remote_sim_dir = """{}/sim_slot_{}/""".format(remote_home_dir, slotno) - server = self.parentnode.fpga_slots[slotno] - with cd(remote_sim_dir), StreamLogger('stdout'), StreamLogger('stderr'): - server.run_sim_start_command(slotno) - - def kill_switch_slot(self, switchslot): - """ kill the switch in slot switchslot. """ - self.instance_logger("""Killing switch simulation for switchslot: {}.""".format(switchslot)) - switch = self.parentnode.switch_slots[switchslot] - with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): - run(switch.get_switch_kill_command()) - - def kill_sim_slot(self, slotno): - self.instance_logger("""Killing FPGA simulation for slot: {}.""".format(slotno)) - server = self.parentnode.fpga_slots[slotno] - with warn_only(), StreamLogger('stdout'), StreamLogger('stderr'): - run(server.get_sim_kill_command(slotno)) - - def instance_assigned_simulations(self): - """ return true if this instance has any assigned fpga simulations. """ - if any(self.parentnode.fpga_slots): - return True - return False - - def instance_assigned_switches(self): - """ return true if this instance has any assigned switch simulations. """ - if any(self.parentnode.switch_slots): - return True - return False - - def infrasetup_instance(self): - """ Handle infrastructure setup for this instance. """ - # check if fpga node - if self.instance_assigned_simulations(): - # This is an FPGA-host node. - - # copy fpga sim infrastructure - for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): - self.copy_sim_slot_infrastructure(slotno) - - self.clear_fpgas() - - if self.instance_assigned_switches(): - # all nodes could have a switch - for slotno in range(self.parentnode.get_num_switch_slots_consumed()): - self.copy_switch_slot_infrastructure(slotno) - - def start_switches_instance(self): - """ Boot up all the switches in a screen. """ - # remove shared mem pages used by switches - if self.instance_assigned_switches(): - with StreamLogger('stdout'), StreamLogger('stderr'): - run("sudo rm -rf /dev/shm/*") - - for slotno in range(self.parentnode.get_num_switch_slots_consumed()): - self.start_switch_slot(slotno) - - def start_simulations_instance(self): - """ Boot up all the sims in a screen. """ - if self.instance_assigned_simulations(): - # only on sim nodes - for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): - self.start_sim_slot(slotno) - - def kill_switches_instance(self): - """ Kill all the switches on this instance. """ - if self.instance_assigned_switches(): - for slotno in range(self.parentnode.get_num_switch_slots_consumed()): - self.kill_switch_slot(slotno) - with StreamLogger('stdout'), StreamLogger('stderr'): - run("sudo rm -rf /dev/shm/*") - - def kill_simulations_instance(self, disconnect_all_nbds=True): - """ Kill all simulations on this instance. """ - if self.instance_assigned_simulations(): - # only on sim nodes - for slotno in range(self.parentnode.get_num_fpga_slots_consumed()): - self.kill_sim_slot(slotno) - - def running_simulations(self): - """ collect screen results from node to see what's running on it. """ - simdrivers = [] - switches = [] - with settings(warn_only=True), hide('everything'): - collect = run('screen -ls') - for line in collect.splitlines(): - if "(Detached)" in line or "(Attached)" in line: - line_stripped = line.strip() - if "fsim" in line: - line_stripped = re.search('fsim([0-9][0-9]*)', line_stripped).group(0) - line_stripped = line_stripped.replace('fsim', '') - simdrivers.append(line_stripped) - elif "switch" in line: - line_stripped = re.search('switch([0-9][0-9]*)', line_stripped).group(0) - switches.append(line_stripped) - return {'switches': switches, 'simdrivers': simdrivers} - - def monitor_jobs_instance(self, completed_jobs, teardown, terminateoncompletion, - job_results_dir): - """ Job monitoring for this instance. """ - # make a local copy of completed_jobs, so that we can update it - completed_jobs = list(completed_jobs) - - rootLogger.debug("completed jobs " + str(completed_jobs)) - - if not self.instance_assigned_simulations() and self.instance_assigned_switches(): - # this node hosts ONLY switches and not fpga sims - # - # just confirm that our switches are still running - # switches will never trigger shutdown in the cycle-accurate - - # they should run forever until torn down - if teardown: - # handle the case where we're just tearing down nodes that have - # ONLY switches - numswitchesused = self.parentnode.get_num_switch_slots_consumed() - for counter in range(numswitchesused): - switchsim = self.parentnode.switch_slots[counter] - switchsim.copy_back_switchlog_from_run(job_results_dir, counter) - - # don't really care about the return val in the teardown case - return {'switches': dict(), 'sims': dict()} - - # not teardown - just get the status of the switch sims - switchescompleteddict = {k: False for k in self.running_simulations()['switches']} - for switchsim in self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]: - swname = switchsim.switch_builder.switch_binary_name() - if swname not in switchescompleteddict.keys(): - switchescompleteddict[swname] = True - return {'switches': switchescompleteddict, 'sims': dict()} - - if self.instance_assigned_simulations(): - # this node has fpga sims attached - - # first, figure out which jobs belong to this instance. - # if they are all completed already. RETURN, DON'T TRY TO DO ANYTHING - # ON THE INSTNACE. - parentslots = self.parentnode.fpga_slots - rootLogger.debug("parentslots " + str(parentslots)) - num_parentslots_used = self.parentnode.fpga_slots_consumed - jobnames = [slot.get_job_name() for slot in parentslots[0:num_parentslots_used]] - rootLogger.debug("jobnames " + str(jobnames)) - already_done = all([job in completed_jobs for job in jobnames]) - rootLogger.debug("already done? " + str(already_done)) - if already_done: - # in this case, all of the nodes jobs have already completed. do nothing. - # this can never happen in the cycle-accurate case at a point where we care - # about switch status, so don't bother to populate it - jobnames_to_completed = {jname: True for jname in jobnames} - return {'sims': jobnames_to_completed, 'switches': dict()} - - # at this point, all jobs are NOT completed. so, see how they're doing now: - instance_screen_status = self.running_simulations() - switchescompleteddict = {k: False for k in instance_screen_status['switches']} - - if self.instance_assigned_switches(): - # fill in whether switches have terminated for some reason - for switchsim in self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]: - swname = switchsim.switch_builder.switch_binary_name() - if swname not in switchescompleteddict.keys(): - switchescompleteddict[swname] = True - - slotsrunning = [x for x in instance_screen_status['simdrivers']] - - rootLogger.debug("slots running") - rootLogger.debug(slotsrunning) - for slotno, jobname in enumerate(jobnames): - if str(slotno) not in slotsrunning and jobname not in completed_jobs: - self.instance_logger("Slot " + str(slotno) + " completed! copying results.") - # NOW, we must copy off the results of this sim, since it just exited - parentslots[slotno].copy_back_job_results_from_run(slotno) - # add our job to our copy of completed_jobs, so that next, - # we can test again to see if this instance is "done" and - # can be terminated - completed_jobs.append(jobname) - - # determine if we're done now. - jobs_done_q = {job: job in completed_jobs for job in jobnames} - now_done = all(jobs_done_q.values()) - rootLogger.debug("now done: " + str(now_done)) - if now_done and self.instance_assigned_switches(): - # we're done AND we have switches running here, so kill them, - # then copy off their logs. this handles the case where you - # have a node with one simulation and some switches, to make - # sure the switch logs are copied off. - # - # the other cases are when you have multiple sims and a cycle-acc network, - # in which case the all() will never actually happen (unless someone builds - # a workload where two sims exit at exactly the same time, which we should - # advise users not to do) - # - # a last use case is when there's no network, in which case - # instance_assigned_switches won't be true, so this won't be called - - self.kill_switches_instance() - - for counter, switchsim in enumerate(self.parentnode.switch_slots[:self.parentnode.get_num_switch_slots_consumed()]): - switchsim.copy_back_switchlog_from_run(job_results_dir, counter) - - return {'switches': switchescompleteddict, 'sims': jobs_done_q} - - + # default return + return {'switches': dict(), 'sims': dict()} diff --git a/deploy/runtools/runtime_config.py b/deploy/runtools/runtime_config.py index 78c24721..84806966 100644 --- a/deploy/runtools/runtime_config.py +++ b/deploy/runtools/runtime_config.py @@ -3,7 +3,6 @@ simulation tasks. """ from __future__ import print_function -import argparse from datetime import timedelta from time import strftime, gmtime import pprint @@ -11,6 +10,7 @@ import logging import yaml import os import sys +from fabric.api import prefix, local # type: ignore from awstools.awstools import * from awstools.afitools import * @@ -20,6 +20,9 @@ from runtools.run_farm import RunFarm from util.streamlogger import StreamLogger from util.inheritors import inheritors +from typing import Dict, List, Any, Optional +import argparse + LOCAL_DRIVERS_BASE = "../sim/output/" LOCAL_SYSROOT_LIB = "../sim/lib-install/lib/" CUSTOM_RUNTIMECONFS_BASE = "../sim/custom-runtime-configs/" @@ -28,8 +31,14 @@ rootLogger = logging.getLogger() class RuntimeHWConfig: """ A pythonic version of the entires in config_hwdb.ini """ + name: str + platform: str + agfi: str + deploytriplet: Optional[str] + customruntimeconfig: str + driver_built: bool - def __init__(self, name, hwconfig_dict): + def __init__(self, name: str, hwconfig_dict: Dict[str, Any]) -> None: self.name = name # TODO: this will change based on the "what-to-build" PR @@ -50,7 +59,7 @@ class RuntimeHWConfig: # note whether we've built a copy of the simulation driver for this hwconf self.driver_built = False - def get_deploytriplet_for_config(self): + def get_deploytriplet_for_config(self) -> str: """ Get the deploytriplet for this configuration. This memoizes the request to the AWS AGFI API.""" if self.deploytriplet is not None: @@ -58,28 +67,30 @@ class RuntimeHWConfig: rootLogger.debug("Setting deploytriplet by querying the AGFI's description.") self.deploytriplet = get_firesim_tagval_for_agfi(self.agfi, 'firesim-deploytriplet') - def get_design_name(self): + return self.deploytriplet + + def get_design_name(self) -> str: """ Returns the name used to prefix MIDAS-emitted files. (The DESIGN make var) """ my_deploytriplet = self.get_deploytriplet_for_config() my_design = my_deploytriplet.split("-")[0] return my_design - def get_local_driver_binaryname(self): + def get_local_driver_binaryname(self) -> str: """ Get the name of the driver binary. """ return self.get_design_name() + "-" + self.platform - def get_local_driver_path(self): + def get_local_driver_path(self) -> str: """ return relative local path of the driver used to run this sim. """ my_deploytriplet = self.get_deploytriplet_for_config() drivers_software_base = LOCAL_DRIVERS_BASE + "/" + self.platform + "/" + my_deploytriplet + "/" fpga_driver_local = drivers_software_base + self.get_local_driver_binaryname() return fpga_driver_local - def get_local_runtimeconf_binaryname(self): + def get_local_runtimeconf_binaryname(self) -> str: """ Get the name of the runtimeconf file. """ return "runtime.conf" if self.customruntimeconfig is None else os.path.basename(self.customruntimeconfig) - def get_local_runtime_conf_path(self): + def get_local_runtime_conf_path(self) -> str: """ return relative local path of the runtime conf used to run this sim. """ my_deploytriplet = self.get_deploytriplet_for_config() drivers_software_base = LOCAL_DRIVERS_BASE + "/" + self.platform + "/" + my_deploytriplet + "/" @@ -90,16 +101,16 @@ class RuntimeHWConfig: runtime_conf_local = CUSTOM_RUNTIMECONFS_BASE + my_runtimeconfig return runtime_conf_local - def get_boot_simulation_command(self, slotid, all_macs, - all_rootfses, all_linklatencies, - all_netbws, profile_interval, - all_bootbinaries, trace_enable, - trace_select, trace_start, trace_end, - trace_output_format, - autocounter_readrate, all_shmemportnames, - enable_zerooutdram, disable_asserts_arg, - print_start, print_end, - enable_print_cycle_prefix): + def get_boot_simulation_command(self, slotid: int, all_macs: List[Optional[MacAddress]], + all_rootfses: List[Optional[str]], all_linklatencies: List[Optional[str]], + all_netbws: List[Optional[str]], profile_interval: str, + all_bootbinaries: List[str], trace_enable: str, + trace_select: str, trace_start: str, trace_end: str, + trace_output_format: str, + autocounter_readrate: str, all_shmemportnames: str, + enable_zerooutdram: bool, disable_asserts_arg: bool, + print_start: str, print_end: str, + enable_print_cycle_prefix: bool) -> str: """ return the command used to boot the simulation. this has to have some external params passed to it, because not everything is contained in a runtimehwconfig. TODO: maybe runtimehwconfig should be renamed to @@ -155,14 +166,12 @@ class RuntimeHWConfig: return basecommand - - - def get_kill_simulation_command(self): + def get_kill_simulation_command(self) -> str: driver = self.get_local_driver_binaryname() # Note that pkill only works for names <=15 characters return """pkill -SIGKILL {driver}""".format(driver=driver[:15]) - def build_fpga_driver(self): + def build_fpga_driver(self) -> None: """ Build FPGA driver for running simulation """ if self.driver_built: # we already built the driver at some point @@ -195,15 +204,16 @@ class RuntimeHWConfig: self.driver_built = True - def __str__(self): + def __str__(self) -> str: return """RuntimeHWConfig: {}\nDeployTriplet: {}\nAGFI: {}\nCustomRuntimeConf: {}""".format(self.name, self.deploytriplet, self.agfi, str(self.customruntimeconfig)) class RuntimeHWDB: """ This class manages the hardware configurations that are available as endpoints on the simulation. """ + hwconf_dict: Dict[str, RuntimeHWConfig] - def __init__(self, hardwaredbconfigfile): + def __init__(self, hardwaredbconfigfile: str) -> None: agfidb_configfile = None with open(hardwaredbconfigfile, "r") as yaml_file: @@ -213,17 +223,41 @@ class RuntimeHWDB: self.hwconf_dict = {s: RuntimeHWConfig(s, v) for s, v in agfidb_dict.items()} - def get_runtimehwconfig_from_name(self, name): + def get_runtimehwconfig_from_name(self, name: str) -> RuntimeHWConfig: return self.hwconf_dict[name] - def __str__(self): + def __str__(self) -> str: return pprint.pformat(vars(self)) class InnerRuntimeConfiguration: """ Pythonic version of config_runtime.yaml """ + run_farm_requested_name: str + run_farm_dispatcher: RunFarm + topology: str + no_net_num_nodes: int + linklatency: int + switchinglatency: int + netbandwidth: int + profileinterval: int + launch_timeout: timedelta + always_expand: bool + trace_enable: bool + trace_select: str + trace_start: str + trace_end: str + trace_output_format: str + autocounter_readrate: int + zerooutdram: bool + disable_asserts: bool + print_start: str + print_end: str + print_cycle_prefix: int + workload_name: str + suffixtag: str + terminateoncompletion: bool - def __init__(self, runtimeconfigfile, runfarmconfigfile, configoverridedata): + def __init__(self, runtimeconfigfile: str, runfarmconfigfile: str, configoverridedata: str) -> None: runtime_configfile = None with open(runtimeconfigfile, "r") as yaml_file: @@ -232,10 +266,9 @@ class InnerRuntimeConfiguration: runtime_dict = runtime_configfile # override parts of the runtime conf if specified - configoverrideval = configoverridedata - if configoverrideval != "": + if configoverridedata != "": ## handle overriding part of the runtime conf - configoverrideval = configoverrideval.split() + configoverrideval = configoverridedata.split() overridesection = configoverrideval[0] overridefield = configoverrideval[1] overridevalue = configoverrideval[2] @@ -307,14 +340,21 @@ class InnerRuntimeConfiguration: self.suffixtag = runtime_dict['workload']['suffix_tag'] if 'suffix_tag' in runtime_dict['workload'] else None self.terminateoncompletion = runtime_dict['workload']['terminate_on_completion'] == "yes" - def __str__(self): + def __str__(self) -> str: return pprint.pformat(vars(self)) class RuntimeConfig: """ This class manages the overall configuration of the manager for running simulation tasks. """ + launch_time: str + args: argparse.Namespace + runtimehwdb: RuntimeHWDB + innerconf: InnerRuntimeConfiguration + run_farm: RunFarm + workload: WorkloadConfig + firesim_topology_with_passes: FireSimTopologyWithPasses - def __init__(self, args: argparse.Namespace): + def __init__(self, args: argparse.Namespace) -> None: """ This reads runtime configuration files, massages them into formats that the rest of the manager expects, and keeps track of other info. """ self.launch_time = strftime("%Y-%m-%d--%H-%M-%S", gmtime()) @@ -356,35 +396,32 @@ class RuntimeConfig: self.innerconf.print_start, self.innerconf.print_end, self.innerconf.print_cycle_prefix) - def launch_run_farm(self): + def launch_run_farm(self) -> None: """ directly called by top-level launchrunfarm command. """ self.run_farm.launch_run_farm() - def terminate_run_farm(self): + def terminate_run_farm(self) -> None: """ directly called by top-level terminaterunfarm command. """ args = self.args self.run_farm.terminate_run_farm(args.terminatesomef116, args.terminatesomef14, args.terminatesomef12, args.terminatesomem416, args.forceterminate) - def infrasetup(self): + def infrasetup(self) -> None: """ directly called by top-level infrasetup command. """ # set this to True if you want to use mock boto3 instances for testing # the manager. use_mock_instances_for_testing = False self.firesim_topology_with_passes.infrasetup_passes(use_mock_instances_for_testing) - def boot(self): + def boot(self) -> None: """ directly called by top-level boot command. """ use_mock_instances_for_testing = False self.firesim_topology_with_passes.boot_simulation_passes(use_mock_instances_for_testing) - def kill(self): + def kill(self) -> None: use_mock_instances_for_testing = False self.firesim_topology_with_passes.kill_simulation_passes(use_mock_instances_for_testing) - def run_workload(self): + def run_workload(self) -> None: use_mock_instances_for_testing = False self.firesim_topology_with_passes.run_workload_passes(use_mock_instances_for_testing) - - - diff --git a/deploy/runtools/switch_model_config.py b/deploy/runtools/switch_model_config.py index a879f6e7..d638717e 100644 --- a/deploy/runtools/switch_model_config.py +++ b/deploy/runtools/switch_model_config.py @@ -5,10 +5,11 @@ import subprocess import random import string import logging - from fabric.api import local # type: ignore from util.streamlogger import StreamLogger +from runtools.firesim_topology_elements import FireSimSwitchNode + rootLogger = logging.getLogger() class AbstractSwitchToSwitchConfig: @@ -17,15 +18,17 @@ class AbstractSwitchToSwitchConfig: that behaves as defined in the FireSimSwitchNode. This assumes that the switch has already been assigned to a host.""" + fsimswitchnode: FireSimSwitchNode + build_disambiguate: str - def __init__(self, fsimswitchnode): + def __init__(self, fsimswitchnode: FireSimSwitchNode) -> None: """ Construct the switch's config file """ self.fsimswitchnode = fsimswitchnode # this lets us run many builds in parallel without conflict across # parallel experiments which may have overlapping switch ids self.build_disambiguate = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(64)) - def emit_init_for_uplink(self, uplinkno): + def emit_init_for_uplink(self, uplinkno: int) -> str: """ Emit an init for a switch to talk to it's uplink.""" linkobj = self.fsimswitchnode.uplinks[uplinkno] @@ -43,7 +46,7 @@ class AbstractSwitchToSwitchConfig: linkbasename = linkobj.get_global_link_id() return "new ShmemPort(" + str(target_local_portno) + ', "' + linkbasename + '", true);\n' - def emit_init_for_downlink(self, downlinkno): + def emit_init_for_downlink(self, downlinkno: int) -> str: """ emit an init for the specified downlink. """ downlinkobj = self.fsimswitchnode.downlinks[downlinkno] downlink = downlinkobj.get_downlink_side() @@ -56,7 +59,7 @@ class AbstractSwitchToSwitchConfig: linkbasename = downlinkobj.get_global_link_id() return "new ShmemPort(" + str(downlinkno) + ', "' + linkbasename + '", false);\n' - def emit_switch_configfile(self): + def emit_switch_configfile(self) -> str: """ Produce a config file for the switch generator for this switch """ constructedstring = "" constructedstring += self.get_header() @@ -66,11 +69,12 @@ class AbstractSwitchToSwitchConfig: return constructedstring # produce mac2port array portion of config - def get_mac2port(self): + def get_mac2port(self) -> str: """ This takes a python array that represents the mac to port mapping, and converts it to a C++ array """ mac2port_pythonarray = self.fsimswitchnode.switch_table + assert mac2port_pythonarray is not None commaseparated = "" for elem in mac2port_pythonarray: @@ -87,13 +91,13 @@ class AbstractSwitchToSwitchConfig: """.format(len(mac2port_pythonarray), commaseparated) return retstr - def get_header(self): + def get_header(self) -> str: """ Produce file header. """ retstr = """// THIS FILE IS MACHINE GENERATED. SEE deploy/buildtools/switchmodelconfig.py """ return retstr - def get_numclientsconfig(self): + def get_numclientsconfig(self) -> str: """ Emit constants for num ports. """ numdownlinks = len(self.fsimswitchnode.downlinks) numuplinks = len(self.fsimswitchnode.uplinks) @@ -107,7 +111,7 @@ class AbstractSwitchToSwitchConfig: #endif""".format(totalports, numdownlinks, numuplinks) return retstr - def get_portsetup(self): + def get_portsetup(self) -> str: """ emit port intialisations. """ initstring = "" for downlinkno in range(len(self.fsimswitchnode.downlinks)): @@ -125,10 +129,10 @@ class AbstractSwitchToSwitchConfig: """.format(initstring) return retstr - def switch_binary_name(self): + def switch_binary_name(self) -> str: return "switch" + str(self.fsimswitchnode.switch_id_internal) - def buildswitch(self): + def buildswitch(self) -> None: """ Generate the config file, build the switch.""" configfile = self.emit_switch_configfile() @@ -141,7 +145,7 @@ class AbstractSwitchToSwitchConfig: rootLogger.debug(str(configfile)) - def local_logged(command): + def local_logged(command: str) -> None: """ Run local command with logging. """ with StreamLogger('stdout'), StreamLogger('stderr'): localcap = local(command, capture=True) @@ -160,7 +164,7 @@ class AbstractSwitchToSwitchConfig: local_logged("cd " + switchbuilddir + " && make") local_logged("mv " + switchbuilddir + "switch " + switchbuilddir + binaryname) - def run_switch_simulation_command(self): + def run_switch_simulation_command(self) -> str: """ Return the command to boot the switch.""" switchlatency = self.fsimswitchnode.switch_switching_latency linklatency = self.fsimswitchnode.switch_link_latency @@ -168,15 +172,15 @@ class AbstractSwitchToSwitchConfig: # insert gdb -ex run --args between sudo and ./ below to start switches in gdb return """screen -S {} -d -m bash -c "script -f -c 'sudo ./{} {} {} {}' switchlog"; sleep 1""".format(self.switch_binary_name(), self.switch_binary_name(), linklatency, switchlatency, bandwidth) - def kill_switch_simulation_command(self): + def kill_switch_simulation_command(self) -> str: """ Return the command to kill the switch. """ return """sudo pkill {}""".format(self.switch_binary_name()) - def switch_build_local_dir(self): + def switch_build_local_dir(self) -> str: """ get local build dir of the switch. """ return "../target-design/switch/" - def switch_binary_local_path(self): + def switch_binary_local_path(self) -> str: """ return the full local path where the switch binary lives. """ binaryname = self.switch_binary_name() switchorigdir = self.switch_build_local_dir() diff --git a/deploy/runtools/user_topology.py b/deploy/runtools/user_topology.py index 811365cc..bff279d0 100644 --- a/deploy/runtools/user_topology.py +++ b/deploy/runtools/user_topology.py @@ -1,14 +1,26 @@ """ Define your additional topologies here. The FireSimTopology class inherits from UserToplogies and thus can instantiate your topology. """ +import types + from runtools.firesim_topology_elements import * +from runtools.firesim_topology_with_passes import FireSimTopologyWithPasses +from runtools.run_farm_instances import FPGAInst +from typing import Callable, List, Any, Union, Sequence, cast -class UserTopologies(object): +class UserTopologies: """ A class that just separates out user-defined/configurable topologies from the rest of the boilerplate in FireSimTopology() """ + custom_mapper: Optional[Union[types.FunctionType, str]] - def clos_m_n_r(self, m, n, r): + roots: Sequence[Union[FireSimSwitchNode, FireSimServerNode]] + no_net_num_nodes: int + + def __init__(self, no_net_num_nodes: int) -> None: + self.no_net_num_nodes = no_net_num_nodes + + def clos_m_n_r(self, m: int, n: int, r: int) -> None: """ DO NOT USE THIS DIRECTLY, USE ONE OF THE INSTANTIATIONS BELOW. """ """ Clos topol where: m = number of root switches @@ -35,10 +47,10 @@ class UserTopologies(object): for leafswitch, servergroup in zip(leafswitches, servers): leafswitch.add_downlinks(servergroup) - def custom_mapper(fsim_topol_with_passes): + def custom_mapper(fsim_topol_with_passes: FireSimTopologyWithPasses) -> None: run_farm_nodes = fsim_topol_with_passes.run_farm.get_all_host_nodes() switch_nodes = list(filter(lambda x: not x.is_fpga_node(), run_farm_nodes)) - fpga_nodes = list(filter(lambda x: x.is_fpga_node(), run_farm_nodes)) + fpga_nodes = cast(List[FPGAInst], list(filter(lambda x: x.is_fpga_node(), run_farm_nodes))) for i, rswitch in enumerate(rootswitches): switch_nodes[i].add_switch(rswitch) @@ -48,23 +60,23 @@ class UserTopologies(object): for sim in servers[j]: fpga_nodes[j].add_simulation(sim) - self.custom_mapper = custom_mapper + self.custom_mapper = custom_mapper # type: ignore - def clos_2_8_2(self): + def clos_2_8_2(self) -> None: """ clos topol with: 2 roots 8 nodes/leaf 2 leaves. """ self.clos_m_n_r(2, 8, 2) - def clos_8_8_16(self): + def clos_8_8_16(self) -> None: """ clos topol with: 8 roots 8 nodes/leaf 16 leaves. = 128 nodes.""" self.clos_m_n_r(8, 8, 16) - def fat_tree_4ary(self): + def fat_tree_4ary(self) -> None: # 4-ary fat tree as described in # http://ccr.sigcomm.org/online/files/p63-alfares.pdf coreswitches = [FireSimSwitchNode() for x in range(4)] @@ -75,8 +87,7 @@ class UserTopologies(object): for switchno in range(len(coreswitches)): core = coreswitches[switchno] base = 0 if switchno < 2 else 1 - dls = range(base, 8, 2) - dls = map(lambda x: aggrswitches[x], dls) + dls = list(map(lambda x: aggrswitches[x], range(base, 8, 2))) core.add_downlinks(dls) for switchbaseno in range(0, len(aggrswitches), 2): switchno = switchbaseno + 0 @@ -89,7 +100,7 @@ class UserTopologies(object): edgeswitches[edgeno].add_downlinks([servers[edgeno*2], servers[edgeno*2+1]]) - def custom_mapper(fsim_topol_with_passes): + def custom_mapper(fsim_topol_with_passes: FireSimTopologyWithPasses) -> None: """ In a custom mapper, you have access to the firesim topology with passes, where you can access the run_farm nodes: @@ -104,7 +115,7 @@ class UserTopologies(object): run_farm_nodes = fsim_topol_with_passes.run_farm.get_all_host_nodes() switch_nodes = list(filter(lambda x: not x.is_fpga_node(), run_farm_nodes)) - fpga_nodes = list(filter(lambda x: x.is_fpga_node(), run_farm_nodes)) + fpga_nodes = cast(List[FPGAInst], list(filter(lambda x: x.is_fpga_node(), run_farm_nodes))) # map the fat tree onto one switch node (i.e m4.16xlarge) (for core switches) # and two fpga nodes with 8 fpgas (i.e. f1.16xlarges) (two pods of aggr/edge/4sims per fpga node) @@ -126,9 +137,9 @@ class UserTopologies(object): for sim in servers[8:]: fpga_nodes[1].add_simulation(sim) - self.custom_mapper = custom_mapper + self.custom_mapper = custom_mapper # type: ignore - def example_multilink(self): + def example_multilink(self) -> None: self.roots = [FireSimSwitchNode()] midswitch = FireSimSwitchNode() lowerlayer = [midswitch for x in range(16)] @@ -136,7 +147,7 @@ class UserTopologies(object): servers = [FireSimServerNode()] midswitch.add_downlinks(servers) - def example_multilink_32(self): + def example_multilink_32(self) -> None: self.roots = [FireSimSwitchNode()] midswitch = FireSimSwitchNode() lowerlayer = [midswitch for x in range(32)] @@ -144,7 +155,7 @@ class UserTopologies(object): servers = [FireSimServerNode()] midswitch.add_downlinks(servers) - def example_multilink_64(self): + def example_multilink_64(self) -> None: self.roots = [FireSimSwitchNode()] midswitch = FireSimSwitchNode() lowerlayer = [midswitch for x in range(64)] @@ -152,7 +163,7 @@ class UserTopologies(object): servers = [FireSimServerNode()] midswitch.add_downlinks(servers) - def example_cross_links(self): + def example_cross_links(self) -> None: self.roots = [FireSimSwitchNode() for x in range(2)] midswitches = [FireSimSwitchNode() for x in range(2)] self.roots[0].add_downlinks(midswitches) @@ -161,8 +172,8 @@ class UserTopologies(object): midswitches[0].add_downlinks([servers[0]]) midswitches[1].add_downlinks([servers[1]]) - def small_hierarchy_8sims(self): - self.custom_mapper = 'mapping_use_one_fpga_node' + def small_hierarchy_8sims(self) -> None: + self.custom_mapper = 'mapping_use_one_fpga_node' # type: ignore self.roots = [FireSimSwitchNode()] midlevel = [FireSimSwitchNode() for x in range(4)] servers = [[FireSimServerNode() for x in range(2)] for x in range(4)] @@ -170,8 +181,8 @@ class UserTopologies(object): for swno in range(len(midlevel)): midlevel[swno].add_downlinks(servers[swno]) - def small_hierarchy_2sims(self): - self.custom_mapper = 'mapping_use_one_fpga_node' + def small_hierarchy_2sims(self) -> None: + self.custom_mapper = 'mapping_use_one_fpga_node' # type: ignore self.roots = [FireSimSwitchNode()] midlevel = [FireSimSwitchNode() for x in range(1)] servers = [[FireSimServerNode() for x in range(2)] for x in range(1)] @@ -179,27 +190,27 @@ class UserTopologies(object): for swno in range(len(midlevel)): midlevel[swno].add_downlinks(servers[swno]) - def example_1config(self): + def example_1config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimServerNode() for y in range(1)] self.roots[0].add_downlinks(servers) - def example_2config(self): + def example_2config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimServerNode() for y in range(2)] self.roots[0].add_downlinks(servers) - def example_4config(self): + def example_4config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimServerNode() for y in range(4)] self.roots[0].add_downlinks(servers) - def example_8config(self): + def example_8config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimServerNode() for y in range(8)] self.roots[0].add_downlinks(servers) - def example_16config(self): + def example_16config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(2)] servers = [[FireSimServerNode() for y in range(8)] for x in range(2)] @@ -210,7 +221,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def example_32config(self): + def example_32config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(4)] servers = [[FireSimServerNode() for y in range(8)] for x in range(4)] @@ -221,7 +232,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def example_64config(self): + def example_64config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(8)] servers = [[FireSimServerNode() for y in range(8)] for x in range(8)] @@ -232,7 +243,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def example_128config(self): + def example_128config(self) -> None: self.roots = [FireSimSwitchNode()] level1switches = [FireSimSwitchNode() for x in range(2)] level2switches = [[FireSimSwitchNode() for x in range(8)] for x in range(2)] @@ -247,7 +258,7 @@ class UserTopologies(object): for switchno in range(len(level2switches[switchgroupno])): level2switches[switchgroupno][switchno].add_downlinks(servers[switchgroupno][switchno]) - def example_256config(self): + def example_256config(self) -> None: self.roots = [FireSimSwitchNode()] level1switches = [FireSimSwitchNode() for x in range(4)] level2switches = [[FireSimSwitchNode() for x in range(8)] for x in range(4)] @@ -263,35 +274,38 @@ class UserTopologies(object): level2switches[switchgroupno][switchno].add_downlinks(servers[switchgroupno][switchno]) @staticmethod - def supernode_flatten(arr): - res = [] + def supernode_flatten(arr: List[Any]) -> List[Any]: + res: List[Any] = [] for x in arr: res = res + x return res - def supernode_example_6config(self): + def supernode_example_6config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimSuperNodeServerNode()] + [FireSimDummyServerNode() for x in range(5)] self.roots[0].add_downlinks(servers) - def supernode_example_4config(self): + def supernode_example_4config(self) -> None: self.roots = [FireSimSwitchNode()] servers = [FireSimSuperNodeServerNode()] + [FireSimDummyServerNode() for x in range(3)] self.roots[0].add_downlinks(servers) - def supernode_example_8config(self): + + def supernode_example_8config(self) -> None: self.roots = [FireSimSwitchNode()] servers = UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(2)]) self.roots[0].add_downlinks(servers) - def supernode_example_16config(self): + + def supernode_example_16config(self) -> None: self.roots = [FireSimSwitchNode()] servers = UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(4)]) self.roots[0].add_downlinks(servers) - def supernode_example_32config(self): + + def supernode_example_32config(self) -> None: self.roots = [FireSimSwitchNode()] servers = UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(8)]) self.roots[0].add_downlinks(servers) - def supernode_example_64config(self): + def supernode_example_64config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(2)] servers = [UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(8)]) for x in range(2)] @@ -300,7 +314,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def supernode_example_128config(self): + def supernode_example_128config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(4)] servers = [UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(8)]) for x in range(4)] @@ -309,7 +323,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def supernode_example_256config(self): + def supernode_example_256config(self) -> None: self.roots = [FireSimSwitchNode()] level2switches = [FireSimSwitchNode() for x in range(8)] servers = [UserTopologies.supernode_flatten([[FireSimSuperNodeServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode(), FireSimDummyServerNode()] for y in range(8)]) for x in range(8)] @@ -318,7 +332,7 @@ class UserTopologies(object): for l2switchNo in range(len(level2switches)): level2switches[l2switchNo].add_downlinks(servers[l2switchNo]) - def supernode_example_512config(self): + def supernode_example_512config(self) -> None: self.roots = [FireSimSwitchNode()] level1switches = [FireSimSwitchNode() for x in range(2)] level2switches = [[FireSimSwitchNode() for x in range(8)] for x in range(2)] @@ -330,7 +344,7 @@ class UserTopologies(object): for switchno in range(len(level2switches[switchgroupno])): level2switches[switchgroupno][switchno].add_downlinks(servers[switchgroupno][switchno]) - def supernode_example_1024config(self): + def supernode_example_1024config(self) -> None: self.roots = [FireSimSwitchNode()] level1switches = [FireSimSwitchNode() for x in range(4)] level2switches = [[FireSimSwitchNode() for x in range(8)] for x in range(4)] @@ -342,7 +356,7 @@ class UserTopologies(object): for switchno in range(len(level2switches[switchgroupno])): level2switches[switchgroupno][switchno].add_downlinks(servers[switchgroupno][switchno]) - def supernode_example_deep64config(self): + def supernode_example_deep64config(self) -> None: self.roots = [FireSimSwitchNode()] level1switches = [FireSimSwitchNode() for x in range(2)] level2switches = [[FireSimSwitchNode() for x in range(1)] for x in range(2)] @@ -354,17 +368,17 @@ class UserTopologies(object): for switchno in range(len(level2switches[switchgroupno])): level2switches[switchgroupno][switchno].add_downlinks(servers[switchgroupno][switchno]) - def dual_example_8config(self): + def dual_example_8config(self) -> None: """ two separate 8-node clusters for experiments, e.g. memcached mutilate. """ - self.roots = [FireSimSwitchNode(), FireSimSwitchNode()] + self.roots = [FireSimSwitchNode()] * 2 servers = [FireSimServerNode() for y in range(8)] servers2 = [FireSimServerNode() for y in range(8)] self.roots[0].add_downlinks(servers) self.roots[1].add_downlinks(servers2) - def triple_example_8config(self): + def triple_example_8config(self) -> None: """ three separate 8-node clusters for experiments, e.g. memcached mutilate. """ - self.roots = [FireSimSwitchNode(), FireSimSwitchNode(), FireSimSwitchNode()] + self.roots = [FireSimSwitchNode()] * 3 servers = [FireSimServerNode() for y in range(8)] servers2 = [FireSimServerNode() for y in range(8)] servers3 = [FireSimServerNode() for y in range(8)] @@ -372,17 +386,18 @@ class UserTopologies(object): self.roots[1].add_downlinks(servers2) self.roots[2].add_downlinks(servers3) - def no_net_config(self): + def no_net_config(self) -> None: self.roots = [FireSimServerNode() for x in range(self.no_net_num_nodes)] - # Spins up all of the precompiled, unnetworked targets - def all_no_net_targets_config(self): - hwdb_entries = [ - "firesim_boom_singlecore_no_nic_l2_llc4mb_ddr3", - "firesim_rocket_quadcore_no_nic_l2_llc4mb_ddr3", - ] - assert len(hwdb_entries) == self.no_net_num_nodes - self.roots = [FireSimServerNode(hwdb_entries[x]) for x in range(self.no_net_num_nodes)] + # TODO: busted since FireSimServerNode needs a RuntimeHWConfig to work (not a str) + ## Spins up all of the precompiled, unnetworked targets + #def all_no_net_targets_config(self) -> None: + # hwdb_entries = [ + # "firesim_boom_singlecore_no_nic_l2_llc4mb_ddr3", + # "firesim_rocket_quadcore_no_nic_l2_llc4mb_ddr3", + # ] + # assert len(hwdb_entries) == self.no_net_num_nodes + # self.roots = [FireSimServerNode(hwdb_entries[x]) for x in range(self.no_net_num_nodes)] # ######Used only for tutorial purposes#################### diff --git a/deploy/runtools/utils.py b/deploy/runtools/utils.py index 01b66923..2dd90e9a 100644 --- a/deploy/runtools/utils.py +++ b/deploy/runtools/utils.py @@ -6,9 +6,11 @@ from os import fspath from os.path import realpath from pathlib import Path +from typing import List, Tuple, Type + rootLogger = logging.getLogger() -def get_local_shared_libraries(elf): +def get_local_shared_libraries(elf: str) -> List[Tuple[str, str]]: """ Given path to executable `exe`, returns a list of path tuples, (A, B), where: A is the local file path on the manager instance to the library B is the destination file path on the runfarm instance relative to the driver @@ -360,10 +362,11 @@ def get_local_shared_libraries(elf): ] libs = list() - rootLogger.debug(f"Identifying ldd dependencies for:{elf}") + rootLogger.debug(f"Identifying ldd dependencies for: {elf}") for dso in lddwrap.list_dependencies(Path(elf)): if dso.soname is None: - assert '/ld-linux' in fspath(dso.path), f"dynamic linker is only allowed no soname, not: {dso}" + if dso.path is not None: + assert '/ld-linux' in fspath(dso.path), f"dynamic linker is only allowed no soname, not: {dso}" continue if 'linux-vdso.so' in dso.soname: continue @@ -399,10 +402,12 @@ class MacAddress(): >>> mac.as_int_no_prefix() 3 """ - next_mac_alloc = 2 - eecs_mac_prefix = 0x00126d000000 + next_mac_alloc: int = 2 + eecs_mac_prefix: int = 0x00126d000000 + mac_without_prefix_as_int: int + mac_as_int: int - def __init__(self): + def __init__(self) -> None: """ Allocate a new mac address, store it, then increment nextmacalloc.""" assert MacAddress.next_mac_alloc < 2**24, "Too many MAC addresses allocated" self.mac_without_prefix_as_int = MacAddress.next_mac_alloc @@ -411,12 +416,12 @@ class MacAddress(): # increment for next call MacAddress.next_mac_alloc += 1 - def as_int_no_prefix(self): + def as_int_no_prefix(self) -> int: """ Return the MAC address as an int. WITHOUT THE PREFIX! Used by the MAC tables in switch models.""" return self.mac_without_prefix_as_int - def __str__(self): + def __str__(self) -> str: """ Return the MAC address in the "regular format": colon separated, show all leading zeroes.""" # format as 12 char hex with leading zeroes @@ -428,12 +433,12 @@ class MacAddress(): return ":".join(split_str_ver) @classmethod - def reset_allocator(cls): + def reset_allocator(cls: Type[MacAddress]) -> None: """ Reset allocator back to default value. """ cls.next_mac_alloc = 2 @classmethod - def next_mac_to_allocate(cls): + def next_mac_to_allocate(cls: Type[MacAddress]) -> int: """ Return the next mac that will be allocated. This basically tells you how many entries you need in your switching tables. """ return cls.next_mac_alloc diff --git a/deploy/runtools/workload.py b/deploy/runtools/workload.py index f5cb8aeb..985848a7 100644 --- a/deploy/runtools/workload.py +++ b/deploy/runtools/workload.py @@ -3,6 +3,8 @@ import json import os +from typing import List, Optional, Dict, Any, Tuple + class JobConfig: """ A single job that runs on a simulation. E.g. one spec benchmark, one of the risc-v tests, etc. @@ -13,24 +15,31 @@ class JobConfig: This essentially describes the local pieces that need to be fed to simulations and the remote outputs that need to be copied back. """ - filesystemsuffix = ".ext2" + filesystemsuffix: str = ".ext2" + parent_workload: WorkloadConfig + jobname: str + outputs: List[str] + simoutputs: List[str] + siminputs: List[str] + bootbinary: str + rootfs: Optional[str] - def __init__(self, singlejob_dict, parent_workload, index=0): + def __init__(self, singlejob_dict: Dict[str, Any], parent_workload: WorkloadConfig, index: int = 0) -> None: self.parent_workload = parent_workload self.jobname = singlejob_dict.get("name", self.parent_workload.workload_name + str(index)) # ignore files, command, we assume they are used only to build rootfses # eventually this functionality will be merged into the manager too joboutputs = singlejob_dict.get("outputs", []) - self.outputs = joboutputs + parent_workload.common_outputs + self.outputs = joboutputs + self.parent_workload.common_outputs simoutputs = singlejob_dict.get("simulation_outputs", []) - self.simoutputs = simoutputs + parent_workload.common_simulation_outputs + self.simoutputs = simoutputs + self.parent_workload.common_simulation_outputs siminputs = singlejob_dict.get("simulation_inputs", []) - self.siminputs = siminputs + parent_workload.common_simulation_inputs + self.siminputs = siminputs + self.parent_workload.common_simulation_inputs if singlejob_dict.get("bootbinary") is not None: - self.bootbinary = singlejob_dict.get("bootbinary") + self.bootbinary = singlejob_dict["bootbinary"] else: - self.bootbinary = parent_workload.common_bootbinary + self.bootbinary = self.parent_workload.common_bootbinary if 'rootfs' in singlejob_dict: if singlejob_dict['rootfs'] is None: @@ -38,30 +47,30 @@ class JobConfig: self.rootfs = None else: # Explicit per-job rootfs - self.rootfs = parent_workload.workload_input_base_dir + singlejob_dict['rootfs'] + self.rootfs = self.parent_workload.workload_input_base_dir + singlejob_dict['rootfs'] else: # No explicit per-job rootfs, inherit from workload - if parent_workload.derive_rootfs: + if self.parent_workload.derive_rootfs: # No explicit workload rootfs, derive path from job name self.rootfs = self.parent_workload.workload_input_base_dir + self.jobname + self.filesystemsuffix - elif parent_workload.common_rootfs is None: + elif self.parent_workload.common_rootfs is None: # Don't include a rootfs self.rootfs = None else: # Explicit rootfs path from workload self.rootfs = self.parent_workload.workload_input_base_dir + self.parent_workload.common_rootfs - def bootbinary_path(self): + def bootbinary_path(self) -> str: return self.parent_workload.workload_input_base_dir + self.bootbinary - def get_siminputs(self): + def get_siminputs(self) -> List[Tuple[str, str]]: # remote filename for a siminput gets prefixed with the job's name return list(map(lambda x: (self.parent_workload.workload_input_base_dir + "/" + x, self.jobname + "-" + x), self.siminputs)) - def rootfs_path(self): + def rootfs_path(self) -> Optional[str]: return self.rootfs - def __str__(self): + def __str__(self) -> str: return self.jobname class WorkloadConfig: @@ -72,10 +81,23 @@ class WorkloadConfig: 2) there is one "job" - a binary/rootfs combo to be run on all sims """ - workloadinputs = 'workloads/' - workloadoutputs = 'results-workloads/' + workloadinputs: str = 'workloads/' + workloadoutputs: str = 'results-workloads/' + workloadfilename: str + common_rootfs: Optional[str] + derive_rootfs: bool + common_bootbinary: str + workload_name: str + common_outputs: str + common_simulation_outputs: List[str] + common_simulation_inputs: List[str] + workload_input_base_dir: str + uniform_mode: bool + jobs: List[JobConfig] + post_run_hook: str + job_results_dir: str - def __init__(self, workloadfilename, launch_time, suffixtag): + def __init__(self, workloadfilename: str, launch_time: str, suffixtag: str) -> None: self.workloadfilename = self.workloadinputs + workloadfilename workloadjson = None with open(self.workloadfilename) as json_data: @@ -120,13 +142,13 @@ class WorkloadConfig: #import code #code.interact(local=locals()) - def get_job(self, index): + def get_job(self, index: int) -> JobConfig: if not self.uniform_mode: return self.jobs[index] else: return JobConfig(dict(), self, index) - def are_all_jobs_assigned(self, numjobsassigned): + def are_all_jobs_assigned(self, numjobsassigned: int) -> bool: """ Return True if each job is assigned to at least one simulation. In the uniform case, always return True """ if not self.uniform_mode: