qmcpack/nexus/lib/machines.py

4386 lines
147 KiB
Python

##################################################################
## (c) Copyright 2015- by Jaron T. Krogel ##
##################################################################
#====================================================================#
# machines.py #
# Representations of local machine environments including #
# workstations and supercomputers and the jobs that will be #
# executed on them. #
# #
# Content summary: #
# Job #
# Class to represent a generic simulation job. #
# #
# Machine #
# Represents a generic machine. #
# Base class for specific machine types. #
# #
# Workstation #
# Represents a workstation with a fixed number of cores. #
# #
# InteractiveCluster #
# Represents a supercomputer in interactive mode. #
# Similar to a workstation with many cores. #
# #
# Supercomputer #
# Represents a generic supercomputer with a batch queue. #
# Base class for specific supercomputers. #
# See Jaguar, Kraken, Golub, OIC5, Hopper, Edison, BlueWatersXE,#
# BlueWatersXK, Titan, EOS, Vesta, Cetus, Mira, Lonestar, #
# Matisse, Komodo, and Amos #
# #
# cpu_count #
# Function to return the number of cores on the local machine. #
# #
# Options #
# Class representing command line options for a simulation job, #
# including arguments to the simulation executable, #
# the run launcher (aprun/mpirun, etc), and the job submitter #
# (e.g. qsub). #
# #
#====================================================================#
import os
import time
#from multiprocessing import cpu_count
from socket import gethostname
from subprocess import Popen,PIPE
from numpy import array,mod,floor,ceil,round,log,empty
from generic import obj
from developer import DevBase,to_str
from nexus_base import NexusCore,nexus_core
from execute import execute
from debug import *
import re,subprocess
import importlib.util
import importlib.machinery
def our_load_source(modname, filename):
"""" Replacement for the deprecated imp.load_source function"""
loader = importlib.machinery.SourceFileLoader(modname, filename)
spec = importlib.util.spec_from_file_location(modname, filename, loader=loader)
module = importlib.util.module_from_spec(spec)
# The module is always executed and not cached in sys.modules.
# Uncomment the following line to cache the module.
# sys.modules[module.__name__] = module
loader.exec_module(module)
return module
def cpu_count():
""" Number of virtual or physical CPUs on this system, i.e.
user/real as output by time(1) when called with an optimally scaling
userspace-only program"""
# Python 2.6+
try:
import multiprocessing
return multiprocessing.cpu_count()
except (ImportError,NotImplementedError):
None
#end try
# POSIX
try:
res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
if res > 0:
return res
#end if
except (AttributeError,ValueError):
None
#end try
#end def cpu_count
class Options(DevBase):
def __init__(self,**kwargs):
self.add(**kwargs)
#end def __init__
def add(self,**kwargs):
self.transfer_from(kwargs)
#end def add
def read(self,options):
if isinstance(options,(dict,obj)):
self.add(**options)
elif isinstance(options,list):
for o in options:
if not isinstance(o,str):
self.error('each option must be a string')
#end if
self[str(len(self))] = o
#end for
elif isinstance(options,str):
self[str(len(self))] = options
else:
self.error('invalid type provided to Options')
#end if
#end def read
def write(self):
s = ''
for k in sorted(self.keys()):
s += ' '+str(self[k])
#end for
return s
#end def write
#end class Options
job_defaults_assign = obj(
name = 'jobname',
type = None,
directory = None,
subdir = None,
app_name = None, # name of/path to application
app_command = None, # command used to launch application
app_props = None,
full_command = None, # custom command including e.g. mpirun
outfile = None,
errfile = None,
env = None,
user_env = True, # import user environment
presub = '', # shell text executed just prior to submission
postsub = '', # shell text executed just after submission
queue = None,
bundled_jobs = None,
relative = False,
cores = None, # number of cores for the job
nodes = None, # number of nodes for the job
threads = 1, # number of openmp threads for the job
hyperthreads = None,
ppn = None,
gpus = None, # number of gpus per node
serial = False, # run job serially, no mpi
local = False, # run job locally, no queue submission
days = 0,
hours = 0,
minutes = 0,
seconds = 0,
subfile = None,
grains = None,
procs = None,
processes = None,
processes_per_proc = None,
processes_per_node = None,
account = None,
email = None,
constraint = None, # slurm specific, Cori
core_spec = None, # slurm specific, Cori
switches = None, # slurm specific, SuperMUC-NG
alloc_flags = None, # lsf specific, Summit
filesystems = None, # pbs specific
qos = None,
group_list = None,
default_cpus_per_task = False, # optionally bypass typical nexus processing for supermucng
ntasks_per_core = None,
cpus_per_task = None,
template = None,
)
# these are not assigned directly
job_defaults_nonassign = obj(
fake = False,
app = None, # name of/path to application
machine = None,
options = None,
app_flags = None,
app_options = None,
run_options = None,
sub_options = None,
skip_machine = False,
)
job_defaults = obj(job_defaults_assign,job_defaults_nonassign)
class Job(NexusCore):
machine = None #default machine if none is specified in settings
states = obj(
none = 0,
waiting = 1,
submitted = 2,
running = 3,
finished = 4
)
state_names = states.inverse()
job_count = 0
@staticmethod
def restore_default_settings():
Job.machine = None
#end def restore_default_settings
@staticmethod
def generate_jobid():
Job.job_count += 1
return Job.job_count
#end def generate_jobid
@classmethod
def zero_time(cls):
time = obj(days=0,hours=0,minutes=0,seconds=0)
return time
#end def zero_time
def __init__(self,**kwargs):
# rewrap keyword arguments
kw = obj(**kwargs)
# save information used to initialize job object
self.init_info = kw.copy()
# set defaults
kw.set_optional(**job_defaults)
# extract keywords not assigned
app = kw.delete('app')
machine = kw.delete('machine')
options = kw.delete('options')
app_flags = kw.delete('app_flags')
app_options = kw.delete('app_options')
run_options = kw.delete('run_options')
sub_options = kw.delete('sub_options')
env = kw.delete('env')
fake = kw.delete('fake')
skip_machine = kw.delete('skip_machine')
# assign keywords
self.set(**kw)
# assign fake job
self.fake_job = fake
# check template
if self.template is not None and not isinstance(self.template,str):
self.error('template must be a string\nReceived type: {}'.format(self.template.__class__.__name__))
#end if
# initialize other internal variables
self.app_options = Options()
self.run_options = Options()
self.sub_options = Options()
self.env = None
self.internal_id = None
self.system_id = None
self.tot_cores = None
self.identifier = None
self.submitted = False
self.status = self.states.none
self.crashed = False
self.overtime = False
self.successful = False
self.finished = False
self.user_app_command = self.app_command is not None
if app is not None:
self.app_name = app
#end if
if app_options is not None:
self.app_options.read(app_options)
#end if
if run_options is not None:
self.run_options.read(run_options)
#end if
if sub_options is not None:
self.sub_options.read(sub_options)
#end if
if app_flags is not None:
self.app_options.read(app_flags)
#end if
if options is not None:
self.run_options.read(options)
#end if
if self.app_props is None:
self.app_props = []
#end if
if self.serial:
self.cores = 1
self.nodes = None
#end if
if skip_machine:
self.machine = machine
else:
if machine is not None:
self.machine = machine
#end if
if env is not None:
self.set_environment(**env)
#end if
#check that the machine exists and have it complete the job info
self.process()
machine = self.get_machine()
self.batch_mode = machine.in_batch_mode()
if self.bundled_jobs is not None and not machine.batch_capable:
self.error('running batched/bundled jobs on {0} is either not possible or not yet implemented, sorry.'.format(machine.name))
#end if
#end if
self.normalize_time()
#end def __init__
def get_machine(self):
return Machine.get(self.machine)
#end def get_machine
def process(self,machine=None):
if self.template is not None:
return
#end if
if machine is None:
machine = self.get_machine()
#end if
machine.process_job(self)
#end def process
# test needed
def process_options(self,machine=None):
if self.template is not None:
return
#end if
if machine is None:
machine = self.get_machine()
#end if
machine.process_job_options(self)
#end def process_options
# test needed
def initialize(self,sim):
self.set_id()
self.identifier = sim.identifier
machine = self.get_machine()
if machine.prefixed_output:
sim.outfile = sim.identifier + machine.outfile_extension
sim.errfile = sim.identifier + machine.errfile_extension
#end if
if self.directory is None:
self.directory = sim.locdir
self.abs_dir = os.path.abspath(sim.locdir)
elif self.abs_dir is None:
self.abs_dir = os.path.abspath(self.directory)
#end if
if self.subdir is None:
if machine.local_directory!=None:
self.subdir = os.path.join(machine.local_directory,nexus_core.runs,sim.path)
self.abs_subdir = self.subdir
else:
self.subdir = self.directory
self.abs_subdir = self.abs_dir
#end if
#end if
if self.app_name is None:
app_name = sim.app_name
else:
app_name = self.app_name
#end if
if app_name!=None and not '/' in app_name:
ads = machine.app_directories
ad = machine.app_directory
new_app_name = None
if ads!=None and self.app_name in ads:
new_app_name = os.path.join(ads[self.app_name],app_name)
elif ad!=None:
new_app_name = os.path.join(ad,app_name)
#end if
if new_app_name!=None:
self.app_name = new_app_name
#end if
#end if
sim.set_app_name(app_name)
self.set(
name = sim.identifier,
simid = sim.simid,
outfile = sim.outfile,
errfile = sim.errfile
)
if self.app_command is None:
self.app_command = sim.app_command()
#end if
if self.app_props==None:
self.app_props = list(sim.app_props)
#end if
# ensure job is processed properly by this initialization stage
self.process()
#end def initialize
# test needed
def renew_app_command(self,sim):
if not self.user_app_command:
self.app_command = sim.app_command()
#end if
#end def renew_app_command
def set_id(self):
self.internal_id = Job.generate_jobid()
#end def set_id
# remove?
def set_processes(self):
if self.processes is None:
self.error('processes should have been set before now\ncontact the developers and have them fix this','Developer')
self.processes = int(ceil(float(self.cores)/self.threads))
#end if
#end def set_processes
def set_environment(self,limited_env=False,clear_env=False,**env):
machine = self.get_machine()
if isinstance(machine,Supercomputer):
limited_env = True
#end if
if self.env is None:
self.env = os.environ.copy()
if limited_env:
self.env.clear()
#end if
#end if
if clear_env:
self.env.clear()
#end if
for n,v in env.items():
self.env[n]=str(v)
#end for
#end def set_environment
def divert_out_err(self):
self.identifier += '_divert'
#end def divert_out_err
def get_time(self):
time = obj(
days = self.days,
hours = self.hours,
minutes = self.minutes,
seconds = self.seconds
)
return time
#end def get_time
def max_time(self,time):
t = time.seconds + 60*(time.minutes+60*(time.hours+24*time.days))
ts = self.seconds + 60*(self.minutes+60*(self.hours+24*self.days))
if ts>t:
time.days = self.days
time.hours = self.hours
time.minutes = self.minutes
time.seconds = self.seconds
#end if
return time
#end def max_time
def serial_only(self):
return 'serial' in self.app_props and len(self.app_props)==1
#end if
# remove?
def determine_end_status(self,status):
if not nexus_core.generate_only:
self.successful = False # not really implemented yet
#end if
#end def determine_end_status
# test needed
def write(self,file=False):
machine = self.get_machine()
return machine.write_job(self,file=file)
#end def write
# test needed
def submit(self):
machine = self.get_machine()
machine.add_job(self)
self.submitted = True
#end def submit
# test needed
def reenter_queue(self):
machine = self.get_machine()
machine.requeue_job(self)
#end def reenter_queue
def run_command(self,launcher=None,redirect=False,serial=False):
if self.template is not None:
return ''
#end if
machine = self.get_machine()
if launcher is None:
launcher = machine.app_launcher
#end if
c = ''
if self.bundled_jobs is None:
if self.full_command is not None:
c = self.full_command
else:
if self.app_command is None:
self.error('app_command has not been provided')
#end if
if launcher=='runjob':
separator = ' : '
else:
separator = ' '
#end if
if self.serial and self.processes==1:
c = ''
else:
c = launcher + self.run_options.write() + separator
#end if
c+=self.app_command+self.app_options.write()
if redirect:
c+=' >'+self.outfile+' 2>'+self.errfile
if not serial:
c+='&'
#end if
elif machine.redirect_output and self.outfile is not None:
c+=' >'+self.outfile+' 2>'+self.errfile
#end if
#end if
elif machine.special_bundling:
c+='\n'
c+=machine.specialized_bundle_commands(self,launcher,serial)
elif self.relative:
cdir = self.abs_subdir
c+='\n'
for job in self.bundled_jobs:
c+='\ncd '+os.path.relpath(job.abs_subdir,cdir)+'\n'
c+=job.run_command(launcher,redirect=True,serial=serial)+'\n'
cdir = job.abs_subdir
#end for
c+='\nwait\n'
else:
c+='\n'
for job in self.bundled_jobs:
c+='\ncd '+job.abs_subdir+'\n'
c+=job.run_command(launcher,redirect=True,serial=serial)+'\n'
#end for
c+='\nwait\n'
#end if
return c
#end def run_command
def pbs_walltime(self):
walltime=\
str(int(self.hours )).zfill(2)+':'\
+str(int(self.minutes)).zfill(2)+':'\
+str(int(self.seconds)).zfill(2)
if self.days!=0:
walltime = str(self.days)+':'+walltime
#end if
return walltime
#end def pbs_walltime
def sbatch_walltime(self):
walltime=\
str(int(24*self.days+self.hours)).zfill(2)+':'\
+str(int(self.minutes)).zfill(2)+':'\
+str(int(self.seconds)).zfill(2)
return walltime
#end def sbatch_walltime
def ll_walltime(self):
walltime=\
str(int(24*self.days+self.hours)).zfill(2)+':'\
+str(int(self.minutes)).zfill(2)+':'\
+str(int(self.seconds)).zfill(2)
return walltime
#end def ll_walltime
def lsf_walltime(self):
walltime=\
str(int(24*self.days+self.hours)).zfill(2)+':'\
+str(int(self.minutes)).zfill(2)
return walltime
#end def lsf_walltime
def normalize_time(self):
t = self.total_seconds()
d = int(t/(24*3600))
t -= d*24*3600
h = int(t/3600)
t -= h*3600
m = int(t/60)
t -= m*60
s = int(t)
self.days = d
self.hours = h
self.minutes = m
self.seconds = s
#end def normalize_time
def total_seconds(self):
return self.seconds+60*(self.minutes+60*(self.hours+24*self.days))
#end def total_seconds
def total_minutes(self):
return int(self.total_seconds()/60)
#end def total_minutes
def total_hours(self):
return int(self.total_seconds()/3600)
#end def total_hours
def total_days(self):
return int(self.total_seconds()/(24*3600))
#end def total_days
def clone(self):
job = self.copy()
job.set_id()
return job
#end def clone
def serial_clone(self):
kw = self.init_info.copy()
kw.serial=True
return Job(**kw)
#end def serial_clone
def split_nodes(self,n):
run_options = self.run_options
if not isinstance(n,int):
self.error('cannot split job by nodes\nrequested split value must be an integer\nreceived type: {0}\nwith value: {1}'.format(n.__class__.__name__,n))
elif n<1 or n>=self.nodes:
self.error('cannot split job by nodes\nrequested split must be in the range [1,{0})\nrequested split: {1}'.format(self.nodes,n))
#end if
m = self.get_machine()
if m.app_launcher=='srun':
self.error('splitting jobs by nodes is not currently supported on machine "{0}" (SLURM)'.format(m.name))
#end if
job1 = self.clone()
job2 = self.clone()
job1.nodes = n
job2.nodes = self.nodes - n
m.process_job(job1)
m.process_job(job2)
return job1,job2
#end def split_nodes
#end class Job
class Machine(NexusCore):
machines = obj()
modes = obj(
none = 0,
interactive = 1,
batch = 2
)
mode = modes.none
batch_capable = False
requires_account = False
executable_subfile = False
redirect_output = False
query_with_username = False
special_bundling = False
prefixed_output = False
outfile_extension = None
errfile_extension = None
allow_warnings = True
queue_configs = None
@staticmethod
def get_hostname():
hostname = gethostname()
if '.' in hostname:
machine_name = hostname.split('.')[0]
else:
machine_name = hostname
#end if
return machine_name.lower()
#end def get_hostname
@staticmethod
def exists(machine_name):
return machine_name in Machine.machines
#end def exists
@staticmethod
def is_unique(machine):
return id(machine)==id(Machine.machines[machine.name])
#end def is_unique
@staticmethod
def add(machine):
if not isinstance(machine,Machine):
Machine.class_error('attempted to add non-machine instance')
#end if
if not 'name' in machine:
Machine.class_error('attempted to add a machine without a name')
#end if
name = machine.name
if not name in Machine.machines:
Machine.machines[name] = machine
else:
Machine.class_error('attempted to create machine {0}, but it already exists'.format(name))
#end if
#end def add
@staticmethod
def get(machine_name):
if isinstance(machine_name,str):
machine_name = machine_name.lower()
else:
Machine.class_error('machine name must be a string, you provided a '+machine_name.__class__.__name__)
#end if
if Machine.exists(machine_name):
machine = Machine.machines[machine_name]
else:
machs = sorted(Machine.machines.keys())
Machine.class_error('attempted to get machine '+machine_name+', but it is unknown\nknown options are '+str(machs))
#end if
return machine
#end def get
def warn(self,*args,**kwargs):
if Machine.allow_warnings:
NexusCore.warn(self,*args,**kwargs)
#end if
#end def warn
def validate(self):
if Machine.exists(self.name):
if not Machine.is_unique(self):
self.error('duplicate instance of machine '+self.name+' encountered\n this is either a developer error, or you have created a duplicate machine')
#end if
else:
self.error('machine {0} id {1} was created without calling Machine.__init__() and is therefore invalid'.format(self.name,id(self)))
#end if
#end def validate
def in_batch_mode(self):
return self.mode==self.modes.batch
#end def in_batch_mode
def query_queue(self):
self.not_implemented()
#end def query_queue
def submit_jobs(self):
self.not_implemented()
#end def submit_jobs
# update all job information, must be idempotent
def process_job(self,job):
self.not_implemented()
#end def process_job
def process_job_options(self,job):
self.not_implemented()
#end def process_job_options
def write_job(self,job,file=False):
self.not_implemented()
#end def write_job
def submit_job(self,job):
self.not_implemented()
#end def submit_job
def specialized_bundle_commands(self,job,launcher,serial):
self.not_implemented()
#end def specialized_bundle_commands
def __init__(self,name,queue_size=0):
self.name = name
self.queue_size = queue_size
self.processes = obj()
self.jobs = obj()
self.waiting = set()
self.running = set()
self.finished= set()
#user defined variables
self.account = None
self.user = None
self.local_directory = None
self.app_directory = None
self.app_directories = None
if not isinstance(name,str):
self.error('machine name must be a string\nyou provided '+str(name))
#end if
Machine.add(self)
#end def __init__
def restore_default_settings(self):
self.account = None
self.user = None
self.local_directory = None
self.app_directory = None
self.app_directories = None
#end def restore_default_settings
def add_job(self,job):
if isinstance(job,Job):
if job.template is None:
self.process_job(job)
#end if
self.write_job(job)
jid = job.internal_id
self.jobs[jid] = job
job.status = job.states.waiting
self.waiting.add(jid)
#self.write_job_states('add_job')
else:
self.error('add_job received non-Job instance '+job.__class__.__name__)
#end if
#end def add_job
def requeue_job(self,job):
None
#end def requeue_job
allowed_user_info = set(['account','local_directory','app_directory','app_directories'])
def incorporate_user_info(self,infoin):
info = obj(**infoin)
vars = set(info.keys())
invalid = vars-self.allowed_user_info
if len(invalid)>0:
self.error('invalid inputs encountered in incorporate_user_info\nallowed inputs: {0}\n invalid inputs: {1}'.format(list(self.allowed_user_info),list(invalid)))
#end if
if 'app_directories' in info:
ad = info.app_directories
if not isinstance(ad,dict) and not isinstance(ad,obj):
self.error('app_directories must be of type dict or obj\nyou provided '+ad.__class__.__name__)
#end if
#end if
self.transfer_from(info)
#end def incorporate_user_info
#end class Machine
class Workstation(Machine):
mode = Machine.modes.interactive
batch_capable = False
def __init__(self,
name = 'workstation',
cores = None,
app_launcher = 'mpirun',
process_granularity = 1
):
Machine.__init__(self,name)
self.app_launcher = app_launcher
if cores==None:
self.cores = cpu_count()
else:
self.cores = cores
#end if
self.queue_size = cores
self.process_granularity = process_granularity
#end def __init__
def process_job(self,job):
if job.serial_only():
job.cores=1
elif job.cores==None:
if job.processes!=None:
job.cores = job.processes*job.threads
else:
job.cores = self.cores
#end if
#end if
job.processes = max(1,int(floor(float(job.cores)/job.threads)))
grains = int(ceil(float(job.cores)/self.process_granularity))
if abs(grains-1-float(job.cores)/self.process_granularity)<1e-6:
grains-=1
#end if
job.grains = grains
job.cores = grains*self.process_granularity
self.process_job_options(job)
#end def process_job
def process_job_options(self,job):
job.run_options.add(np='-np '+str(job.processes))
#end def process_job_options
def write_job_states(self,title=''):
self.log(title,n=2)
n=3
self.log('{0} {1} {2} job states'.format(self.__class__.__name__,self.name,id(self)),n=n )
self.log('processes',n=n+1)
for process in self.processes:
job = process.job
self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
#end for
self.log('jobs',n=n+1)
jobids = list(self.jobs.keys())
jobids.sort()
for jobid in jobids:
job = self.jobs[jobid]
self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
#end for
self.log('waiting',n=n+1)
jobids = list(self.waiting)
jobids.sort()
for jobid in jobids:
job = self.jobs[jobid]
self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
#end for
self.log('running',n=n+1)
jobids = list(self.running)
jobids.sort()
for jobid in jobids:
job = self.jobs[jobid]
self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
#end for
self.log('finished',n=n+1)
jobids = list(self.finished)
jobids.sort()
for jobid in jobids:
job = self.jobs[jobid]
self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
#end for
self.log('end job states',n=1)
#end def write_job_states
def query_queue(self):
#self.write_job_states('query queue')
self.validate()
done = []
for pid,process in self.processes.items():
if nexus_core.generate_only or not nexus_core.monitor:
qpid,status = pid,0
else:
qpid,status = os.waitpid(pid,os.WNOHANG)
#end if
if pid==qpid:
job = process.job
job.status = job.states.finished
job.finished = True
job.determine_end_status(status)
iid = job.internal_id
self.running.remove(iid)
self.finished.add(iid)
done.append(pid)
if not nexus_core.generate_only:
job.out.close()
job.err.close()
#end if
#end if
#end for
for pid in done:
del self.processes[pid]
#end for
#end def query_queue
def submit_jobs(self):
cores_used = 0
for process in self.processes:
cores_used += process.job.cores
#end for
cores_available = self.cores-cores_used
core_req = []
job_req = []
for iid in self.waiting:
job = self.jobs[iid]
job_req.append(job)
core_req.append(job.cores)
#end for
core_req = array(core_req,dtype=int)
# The following line does not work correctly under Numpy 1.10 or greater.
# It should create an ndarray of Job objects from a list of Job objects.
# Instead it creates nested ndarray's with inner type of bool
#job_req = array(job_req ,dtype=object)
job_req_tmp = job_req
job_req = empty(len(job_req_tmp) ,dtype=object)
for idx,job in enumerate(job_req_tmp):
job_req[idx] = job
#end for
order = core_req.argsort()
job_req = job_req[order]
for job in job_req:
if job.cores>self.cores and not nexus_core.generate_only:
self.error('job '+str(job.internal_id)+' is too large to run on this machine\ncores requested: '+str(job.cores)+'\nmachine cores: '+str(self.cores))
#end if
if job.cores<=cores_available:
iid = job.internal_id
self.waiting.remove(iid)
self.running.add(iid)
self.submit_job(job)
cores_available-=job.cores
elif job.cores>self.cores:
self.error('job requested more cores than are present on '+self.name+'\ncores requested: {0}\ncores present: {1}'.format(job.cores,self.cores))
else:
break
#end if
#end for
#end def submit_jobs
def job_command(self,job,pad=None):
command = 'export OMP_NUM_THREADS='+str(job.threads)+'\n'
if len(job.presub)>0:
command += job.presub+'\n'
#end if
if job.serial is not None:
command += job.run_command(self.app_launcher,serial=job.serial)
else:
command += job.run_command(self.app_launcher)
#end if
if len(job.postsub)>0:
command += job.postsub+'\n'
#end if
if pad!=None:
command = ('\n'+command).replace('\n','\n '+pad)
#end if
return command
#end def job_command
def write_job(self,job,file=False):
c = self.job_command(job)
return c
#end def write_job
def submit_job(self,job):
pad = self.enter(job.directory,msg=job.simid)
command = self.job_command(job,pad=pad)
job.status = job.states.running
process = obj()
process.job = job
if nexus_core.generate_only:
self.log(pad+'Would have executed: '+command)
job.system_id = job.internal_id
else:
if nexus_core.monitor:
self.log(pad+'Executing: '+command)
job.out = open(job.outfile,'w')
job.err = open(job.errfile,'w')
p = Popen(command,env=job.env,stdout=job.out,stderr=job.err,shell=True)
process.popen = p
job.system_id = p.pid
else:
command+=' >'+job.outfile+' 2>'+job.errfile+'&'
self.log(pad+'Executing: '+command)
os.system(command)
job.system_id = job.internal_id
#end if
#end if
self.processes[job.system_id] = process
self.leave()
#end def submit_job
#end class Workstation
# test needed
class InteractiveCluster(Workstation):
def __init__(self,*args,**kwargs):
if len(args)==0 or not isinstance(args[0],Supercomputer):
self.init_from_args(*args,**kwargs)
else:
super = args[0]
cores = args[1]
self.init_from_supercomputer(super,cores)
#end if
Machine.__init__(self,self.name,self.queue_size)
#end def __init__
def init_from_args(self,
name = 'icluster',
nodes = None,
procs_per_node = None,
cores_per_proc = None,
process_granularity = None,
ram_per_node = None,
app_launcher = None
):
self.name = name
self.nodes = nodes
self.procs_per_node = procs_per_node
self.cores_per_proc = cores_per_proc
self.process_granularity = process_granularity
self.ram_per_node = ram_per_node
self.app_launcher = app_launcher
self.cores_per_node = self.cores_per_proc*self.procs_per_node
if process_granularity is None:
self.process_granularity = self.cores_per_node
#end if
self.procs = self.procs_per_node*self.nodes
self.cores = self.cores_per_proc*self.procs
self.ram = self.ram_per_node*self.nodes
self.queue_size = self.cores
#end def init_from_args
def init_from_supercomputer(self,super,cores):
nodes = cores//super.cores_per_node
if cores-nodes*super.cores_per_node!=0:
self.error('interactive cores corresponds to a fractional number of nodes\n cores '+str(cores)+'\n cores per node '+str(super.cores_per_node))
#end if
self.init_from_args(super.name+'_interactive',nodes,super.procs_per_node,
super.cores_per_proc,super.cores_per_node,
super.ram_per_node,super.app_launcher)
#end def init_from_supercomputer
def process_job(self,job):
job.cores = min(job.cores,self.cores)
Workstation.process_job(self,job)
job.nodes = job.grains
job.procs = job.nodes*self.procs_per_node
if mod(job.processes,job.nodes)!=0:
job.processes_per_node = None
else:
job.processes_per_node = job.processes//job.nodes
#end if
if mod(job.processes,job.procs)!=0:
job.processes_per_proc = None
else:
job.processes_per_proc = job.processes//job.procs
#end if
#end def process_job
#end class InteractiveCluster
class Supercomputer(Machine):
mode = Machine.modes.batch
name = 'supercomputer'
batch_capable = False #only set to true for specific machines
aprun_options = set(['n','d'])
required_inputs = [
'nodes',
'procs_per_node',
'cores_per_proc',
'ram_per_node',
'queue_size',
'app_launcher',
'sub_launcher',
'queue_querier',
'job_remover'
]
def __init__(self,
nodes = None,
procs_per_node = None,
cores_per_proc = None,
ram_per_node = None,
queue_size = 0,
app_launcher = None,
sub_launcher = None,
queue_querier = None,
job_remover = None,
name = None,
):
if name is None:
if self.name is not None:
name = self.name
else:
name = self.__class__.__name__.lower()
#end if
#end if
Machine.__init__(self,name)
self.nodes = nodes # # of nodes
self.procs_per_node = procs_per_node # # of processors/sockets on a node
self.cores_per_proc = cores_per_proc # # of cores on a processor/socket
self.ram_per_node = ram_per_node
self.queue_size = queue_size
self.app_launcher = app_launcher
self.sub_launcher = sub_launcher
self.queue_querier = queue_querier
self.job_remover = job_remover
for var in Supercomputer.required_inputs:
if self[var] is None:
self.error('input variable '+var+' is required to initialize Supercomputer object.')
#end if
#end for
self.cores_per_node = self.cores_per_proc*self.procs_per_node
self.procs = self.procs_per_node*self.nodes
self.cores = self.cores_per_proc*self.procs
self.ram = self.ram_per_node*self.nodes
# 'complete' is the only actively used status so far
# At least one queue state should correspond to 'complete',
# though even this is not strictly necessary.
# In general if a pid is missing from the queue,
# that job is assumed to be complete.
self.system_queue = obj()
if self.queue_querier=='qstat':
self.job_states=dict(E = 'exiting',
H = 'held',
Q = 'queued',
R = 'running',
S = 'suspended',
T = 'transferring',
W = 'waiting',
C = 'complete',
F = 'complete',
B = 'has_subjob',
M = 'moved_to_another_server',
U = 'user_suspended',
X = 'subjob_completed',
)
elif self.queue_querier=='qstata':
#already gives status as queued, running, etc.
None
elif self.queue_querier=='squeue':
self.job_states=dict(CG = 'exiting',
TO = 'timeout',
CA = 'failed',
F = 'failed',
NF = 'node_fail',
PD = 'waiting',
R = 'running',
S = 'suspended',
CD = 'complete',
RD = 'held',
BF = 'failed',
CF = 'configuring',
DL = 'deadline',
OOM= 'out_of_memory',
PR = 'preempted',
RF = 'requeue_fed',
RH = 'requeue_hold',
RQ = 'requeued',
RS = 'resizing',
RV = 'revoked',
SI = 'signaling',
SE = 'special_exit',
SO = 'stage_out',
ST = 'stopped',
)
elif self.queue_querier=='sacct':
self.job_states=dict(CANCELLED = 'failed', #long form
COMPLETED = 'complete',
COMPLETING = 'exiting',
CONFIGURING = 'waiting',
FAILED = 'failed',
PREMEEMPTED = 'failed',
PENDING = 'waiting',
NODE_FAIL = 'failed',
RESIZING = 'resizing',
RUNNING = 'running',
SUSPENDED = 'suspended',
TIMEOUT = 'failed',
CA = 'failed', #short form
CD = 'complete',
CG = 'exiting',
CF = 'waiting',
F = 'failed',
PR = 'failed',
PD = 'waiting',
NF = 'failed',
RS = 'resizing',
R = 'running',
S = 'suspended',
TO = 'failed'
)
elif self.queue_querier=='llq':
self.job_states=dict(I = 'idle',
NQ = 'not_queued',
H = 'user_hold',
S = 'system_hold',
HS = 'user_system_hold',
D = 'deferred',
R = 'running',
P = 'pending',
ST = 'starting',
C = 'complete',
CA = 'canceled',
E = 'preempted',
EP = 'preempt_pending',
MP = 'resume_pending',
)
elif self.queue_querier=='bjobs':
self.job_states=dict(PEND = 'pending',
RUN = 'running',
DONE = 'complete',
EXIT = 'failed',
PSUSP = 'suspended',
USUSP = 'suspended',
SSUSP = 'suspended',
)
elif self.queue_querier=='test_query':
None
else:
self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented')
#end if
#end def __init__
# test needed
def interactive_representation(self,cores):
return InteractiveCluster(self,cores)
#end def interactive_representation
# test needed
def requeue_job(self,job):
if isinstance(job,Job):
jid = job.internal_id
pid = job.system_id
if pid is None:
self.error('job {0} does not have a process id issued by the scheduler'.format(jid))
#end if
self.process_job(job)
self.jobs[jid] = job
job.status = job.states.running
self.running.add(jid)
process = obj(job=job)
self.processes[pid] = process
else:
self.error('requeue_job received non-Job instance '+job.__class__.__name__)
#end if
#end def requeue_job
def process_job(self,job):
if job.fake_job:
return
#end if
self.pre_process_job(job)
job.subfile = job.name+'.'+self.sub_launcher+'.in'
no_cores = job.cores is None
no_nodes = job.nodes is None
if no_cores and no_nodes:
self.error('job did not specify cores or nodes\nAt least one must be provided')
elif no_cores:
job.cores = self.cores_per_node*job.nodes
elif no_nodes:
job.nodes = int(ceil(float(job.cores)/self.cores_per_node))
if abs(job.nodes-1-float(job.cores)/self.cores_per_node)<1e-6:
job.nodes-=1
#end if
else:
job.cores = min(job.cores,job.nodes*self.cores_per_node)
#end if
if job.processes_per_node is not None:
job.processes = job.nodes*job.processes_per_node
else:
job.processes = max(1,int(float(job.cores)/job.threads))
#end if
job.tot_cores = job.nodes*self.cores_per_node
job.procs = job.nodes*self.procs_per_node
if mod(job.processes,job.nodes)!=0:
job.processes_per_node = None
else:
job.processes_per_node = job.processes//job.nodes
#end if
if mod(job.processes,job.procs)!=0:
job.processes_per_proc = None
else:
job.processes_per_proc = job.processes//job.procs
#end if
if job.ppn is None:
job.ppn = self.cores_per_node
#end if
if job.account is None:
if self.account is not None:
job.account = self.account
elif self.requires_account:
self.error('account not specified for job on '+self.name)
#end if
#end if
self.post_process_job(job)
job.set_environment(OMP_NUM_THREADS=job.threads)
self.process_job_options(job)
#end def process_job
def process_job_options(self,job):
launcher = self.app_launcher
if launcher=='mpirun':
job.run_options.add(np='-np '+str(job.processes))
elif launcher=='mpiexec':
job.run_options.add(n='-n '+str(job.processes))
elif launcher=='aprun':
if 'n' in self.aprun_options:
job.run_options.add(n='-n '+str(job.processes))
#end if
if 'd' in self.aprun_options and job.threads>1:
job.run_options.add(d='-d '+str(job.threads))
#end if
if 'N' in self.aprun_options and job.processes_per_node is not None:
job.run_options.add(N='-N '+str(job.processes_per_node))
#end if
if 'S' in self.aprun_options and job.processes_per_proc is not None:
job.run_options.add(S='-S '+str(job.processes_per_proc))
#end if
elif launcher=='runjob':
#bypass setup_environment
if job.env is not None:
envs='--envs'
for name,value in job.env.items():
envs+=' {0}={1}'.format(name,value)
#end for
job.env = None
elif 'envs' in job.run_options:
envs = job.run_options.envs
else:
self.error('failed to set env options for runjob')
#end if
job.run_options.add(
np = '--np '+str(job.processes),
p = '-p '+str(job.processes_per_node),
xlocargs = '$LOCARGS',
verbose = '--verbose=INFO',
envs = envs
)
elif launcher=='srun': # Amos contribution from Ryan McAvoy
None
elif launcher=='ibrun': # Lonestar contribution from Paul Young
job.run_options.add(
np = '-n '+str(job.processes),
p = '-o '+str(0),
)
elif launcher=='jsrun': # Summit
None # Summit class takes care of this in post_process_job
elif launcher=='lrun': # Lassen
None # Lassen class takes care of this in post_process_job
else:
self.error(launcher+' is not yet implemented as an application launcher')
#end if
#end def process_job_options
def pre_process_job(self,job):
None
#end def pre_process_job
def post_process_job(self,job):
None
#end def post_process_job
def query_queue(self,out=None):
self.system_queue.clear()
if self.query_with_username and self.user is None:
self.error('querying queue on machine "{}" requires user name\nplease provide username via the "user" keyword in settings'.format(self.name))
#end if
if self.queue_querier=='qstat':
if out is None:
out,err,rc = execute('qstat -a')
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
if '.' in tokens[0]:
spid = tokens[0].split('.')[0]
else:
spid = tokens[0]
#endif
if spid.isdigit() and len(tokens)==11:
pid = int(spid)
jid,uname,queue,jname,sessid,nodes,tasks,mem,rtime,status,etime = tokens
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
#end if
#end if
#end for
elif self.queue_querier=='qstata':
if out is None:
out,err,rc = execute('qstat')
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
if '.' in tokens[0]:
spid = tokens[0].split('.')[0]
else:
spid = tokens[0]
#endif
if spid.isdigit() and len(tokens)==6:
pid = int(spid)
jid,uname,wtime,nodes,status,loc = tokens
self.system_queue[pid] = status
#end if
#end if
#end for
elif self.queue_querier=='squeue': # contributed by Ryan McAvoy
if out is None:
if isinstance(self.user,bool) and self.user==False:
extra = ''
elif self.user is not None:
extra = ' -u {}'.format(self.user)
else:
extra = ' --user=$USER'
#end if
out,err,rc = execute('squeue'+extra)
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
if '.' in tokens[0]:
spid = tokens[0].split('.')[0]
else:
spid = tokens[0]
#endif
if spid.isdigit():
pid = int(spid)
status = None
jid,loc,name,uname,status,wtime,nodes,reason = tokens[:8]
if status is not None:
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
#end if
#end if
#end if
#end for
elif self.queue_querier=='sacct': # contributed by Ryan McAvoy
if out is None:
out,err,rc = execute('sacct')
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
if '.' in tokens[0]:
spid = tokens[0].split('.')[0]
else:
spid = tokens[0]
#endif
if spid.isdigit() and len(tokens)==6: #if account is empty, only 6 tokens.
pid = int(spid)
jid,name,loc,cores,status,exit_code = tokens
status = status.split('+')[0] ## get rid of '+' in the end
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
elif spid.isdigit() and len(tokens)==7:
pid = int(spid)
jid,name,loc,uname,cores,status,exit_code = tokens
status = status.split('+')[0] ## get rid of '+' in the end
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
#end if
#end if
#end for
elif self.queue_querier=='llq':
if out is None:
out,err,rc = execute('sacct')
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
if '.' in tokens[0]:
spid = tokens[0].split('.')[1]
else:
spid = tokens[0]
#endif
if spid.isdigit() and (len(tokens)==7 or len(tokens)==8):
pid = int(spid)
if len(tokens)==7:
jid,owner,subdate,subtime,status,pri,class_ = tokens
elif len(tokens)==8:
jid,owner,subdate,subtime,status,pri,class_,running_on = tokens
#end if
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
#end if
#end if
#end for
elif self.queue_querier=='bjobs':
if out is None:
out,err,rc = execute('bjobs')
#end if
lines = out.splitlines()
for line in lines:
tokens=line.split()
if len(tokens)>0:
spid = tokens[0]
if spid.isdigit() and len(tokens)==8:
pid = int(spid)
jid,uname,status,slots,queue,start,finish,jname = tokens
if status in self.job_states:
self.system_queue[pid] = self.job_states[status]
else:
self.error('job state '+status+' is unrecognized')
#end if
#end if
#end if
#end for
elif self.queue_querier=='test_query': # for testing
# pretend that all jobs have finished
for pid in self.processes.keys():
self.system_queue[pid] = 'complete'
#end for
else:
self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented')
#end if
done = []
for pid,process in self.processes.items():
if not pid in self.system_queue or self.system_queue[pid]=='complete' or nexus_core.generate_only:
job = process.job
job.status = job.states.finished
job.finished = True
iid = job.internal_id
self.running.remove(iid)
self.finished.add(iid)
done.append(pid)
#end if
#end for
for pid in done:
del self.processes[pid]
#end for
return self.system_queue
#end def query_queue
def submit_jobs(self):
nprocesses_running = len(self.processes)
queue_slots_available = self.queue_size-nprocesses_running
remove = []
for iid in self.waiting:
if queue_slots_available>0:
remove.append(iid)
self.running.add(iid)
job = self.jobs[iid]
self.submit_job(job)
queue_slots_available -= 1
else:
break
#end if
#end for
for iid in remove:
self.waiting.remove(iid)
#end for
#end def submit_jobs
def submit_job(self,job):
pad = self.enter(job.directory,msg=job.internal_id)
if job.subfile==None:
self.error('submission file not specified for job')
elif not os.path.exists(job.subfile):
self.error('job submission file was not written prior to submission\n submission file: '+os.path.join(job.directory,job.subfile))
#end if
command = self.sub_command(job)
if nexus_core.generate_only:
self.log(pad+'Would have executed: '+command)
job.status = job.states.running
process = obj()
process.job = job
self.processes[job.internal_id] = process
else:
self.log(pad+'Executing: '+command)
job.status = job.states.running
process = obj()
process.job = job
out,err,rc = execute(command)
output=out+'\n'+err
pid = self.read_process_id(output)
if pid is None:
self.error('process id could not be determined from submission output\n output:\n'+output)
else:
self.log(pad+' pid: {0}'.format(pid))
#end if
#pid = 'fakepid_'+str(job.internal_id)
job.system_id = pid
self.processes[pid] = process
#end if
self.leave()
#end def submit_job
def sub_command(self,job):
return self.sub_launcher+job.sub_options.write()+' '+job.subfile
#end def sub_command
def remove_job(self,job):
if self.job_remover=='qdel':
command = 'qdel '+str(job.system_id)
elif self.job_remover=='scancel':
command = 'scancel '+str(job.system_id)
else:
self.error('ability to remove job using '+self.job_remover+' has not yet been implemented')
#endif
os.system(command)
#end def remove_job
def setup_environment(self,job):
env = ''
if job.env is not None:
for name,val in job.env.items():
env +='export {0}={1}\n'.format(name,val)
#end for
#end if
return env
#end def setup_environment
def write_job(self,job,file=False):
job.subfile = job.name+'.'+self.sub_launcher+'.in'
if job.template is None:
env = self.setup_environment(job)
command = job.run_command(self.app_launcher,serial=job.serial)
c = self.write_job_header(job)+'\n'
if len(job.presub)>0:
c+=job.presub+'\n'
#end if
c+=env
c+=command+'\n'
if len(job.postsub)>0:
c+=job.postsub+'\n'
#end if
else:
c = job.template
#end if
if file:
filepath = os.path.join(job.directory,job.subfile)
fobj = open(filepath,'w')
fobj.write(c)
fobj.close()
if self.executable_subfile:
os.system('chmod +x '+filepath)
#end if
#end if
return c
#end def write_job
def write_job_header(self,job):
self.not_implemented()
#end def write_job_header
@staticmethod
def walltime_to_seconds(walltime_str):
"""
Convert walltime string to total seconds
Handles formats: 'dd:hh:mm:ss', 'hh:mm:ss', 'mm:ss', 'seconds'
"""
try:
parts = walltime_str.split(':')
seconds = 0
if len(parts) == 4: # dd:hh:mm:ss
seconds += int(parts[0]) * 24 * 3600 # days
seconds += int(parts[1]) * 3600 # hours
seconds += int(parts[2]) * 60 # minutes
seconds += int(parts[3]) # seconds
elif len(parts) == 3: # hh:mm:ss
seconds += int(parts[0]) * 3600 # hours
seconds += int(parts[1]) * 60 # minutes
seconds += int(parts[2]) # seconds
elif len(parts) == 2: # mm:ss
seconds += int(parts[0]) * 60 # minutes
seconds += int(parts[1]) # seconds
elif len(parts) == 1: # seconds only
seconds += int(parts[0])
else:
raise ValueError(f"Invalid walltime format: {walltime_str}, accepted formats: 'dd:hh:mm:ss', 'hh:mm:ss', 'mm:ss', 'seconds'")
return seconds
except (ValueError, IndexError) as e:
raise ValueError(f"Failed to parse walltime '{walltime_str}': {str(e)}")
#end try
#end def walltime_to_seconds
@ staticmethod
def seconds_to_walltime(seconds):
"""
Convert total seconds to walltime string
Handles formats: 'dd:hh:mm:ss', 'hh:mm:ss', 'mm:ss', 'seconds'
"""
days = seconds // 86400
hours = (seconds % 86400) // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
return f"{days}:{hours:02d}:{minutes:02d}:{seconds:02d}"
#end def seconds_to_walltime
def validate_queue_config(self, job):
"""
Validate job against queue configuration constraints
Returns True if valid, raises an error with detailed message if invalid
Queue config format:
queue_configs = {
'default': 'queue_name',
'queue_name': {
'min_nodes' : minimum number of nodes,
'max_nodes' : maximum number of nodes,
'max_walltime' : maximum walltime in hours,
'cores_per_node': cores per node,
'ram_per_node' : RAM per node in GB,
'constraints' : {
'knl': {
'max_nodes': knl specific max nodes,
'max_time': knl specific max time,
...
},
'haswell': {...},
'gpu': {...},
'cpu': {...}
}
}
}
"""
if self.queue_configs is None:
return True
#end if
# Use only warnings here, as smaller computers may not have a queue
if job.queue is None:
if 'default' in self.queue_configs:
job.queue = self.queue_configs['default']
self.warn('No default queue is specified. Using default queue {}'.format(job.queue))
else:
# No queue or default queue is specified
self.warn('No queue or default queue is specified.')
return True
#end if
#end if
# Check if queue exists
if job.queue not in self.queue_configs:
# Queue is defined but config is not available
self.warn('Queue "{}" is not available. Available queues: {}'.format(
job.queue, list(self.queue_configs.keys())))
return False
else:
# Queue is defined and config is available
config = self.queue_configs[job.queue]
#end if
errors = []
# Get constraint-specific config if applicable
constraint_config = None
if hasattr(job, 'constraint') and job.constraint is not None:
if 'constraints' in config and job.constraint in config['constraints']:
constraint_config = config['constraints'][job.constraint]
else:
valid_constraints = list(config.get('constraints', {}).keys())
if valid_constraints:
errors.append('Invalid constraint "{}". Valid constraints for queue {}: {}'.format(
job.constraint, job.queue, valid_constraints))
#end if
#end if
#end if
# Use constraint specific config if available, otherwise use base config
active_config = constraint_config if constraint_config is not None else config
# Validate nodes
if 'min_nodes' in active_config and job.nodes < active_config['min_nodes']:
errors.append('Number of nodes ({}) is below minimum ({}) for queue {}'.format(
job.nodes, active_config['min_nodes'], job.queue))
#end if
if 'max_nodes' in active_config and job.nodes > active_config['max_nodes']:
errors.append('Number of nodes ({}) exceeds maximum ({}) for queue {}'.format(
job.nodes, active_config['max_nodes'], job.queue))
#end if
# Validate cores per node
if 'cores_per_node' in active_config and hasattr(job, 'processes_per_node'):
if job.processes_per_node > active_config['cores_per_node']:
errors.append('Processes per node ({}) exceeds cores per node ({}) for queue {}'.format(
job.processes_per_node, active_config['cores_per_node'], job.queue))
#end if
#end if
# Validate memory
if 'ram_per_node' in active_config and hasattr(job, 'ram_per_node'):
if job.ram_per_node > active_config['ram_per_node']:
errors.append('RAM per node ({} GB) exceeds maximum ({} GB) for queue {}'.format(
job.ram_per_node, active_config['ram_per_node'], job.queue))
#end if
#end if
# Validate walltime
if 'max_time' in active_config:
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > active_config['max_time']:
errors.append('Walltime ({} hours) exceeds maximum ({} hours) for queue {}'.format(
job.total_hours, active_config['max_time'], job.queue))
#end if
#end if
# Validate GPU requirements if specified
if 'gpus_per_node' in active_config and hasattr(job, 'gpus_per_node'):
if job.gpus_per_node > active_config['gpus_per_node']:
errors.append('GPUs per node ({}) exceeds maximum ({}) for queue {}'.format(
job.gpus_per_node, active_config['gpus_per_node'], job.queue))
#end if
#end if
# Report all validation errors
if errors:
self.error('Queue validation failed for queue {}:\n {}'.format(
job.queue, '\n '.join(errors)))
return False
#end if
return True
#end def validate_queue_config
def read_process_id(self,output):
pid = None
lines = output.splitlines()
if self.sub_launcher=='llsubmit': # specialization for load leveler (SuperMUC)
for line in lines:
if 'llsubmit: The job' in line and '"' in line:
spid = line.split('"')[1].split('.')[1].strip()
if spid.isdigit():
pid = int(spid)
break
#end if
#end if
#end for
else: # most other machines follow the pattern below
for line in lines:
ls = line.strip()
if ls.isdigit():
pid = int(ls)
break
elif '.' in line:
spid = line.split('.')[0]
if spid.isdigit():
pid = int(spid)
break
#end if
elif ' ' in line: # specialized for Amos?
spid = line.split(' ')[-1]
if spid.isdigit():
pid = int(spid)
break
#end if
#end if
#end for
#end if
return pid
#end def read_process_id
#end class Supercomputer
# Load local class for local cluster's setting from ~/.nexus/local.py
# The following is an example of this file (see machines.py for more examples):
'''
from machines import Supercomputer
class Clustername(Supercomputer):
name = 'clustername'
requires_account = False
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue='batch'
#end if
c= '#!/bin/bash\n'
c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn)
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -N '+job.name +'\n'
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
c+='#PBS -V\n'
c+='#PBS -q '+job.queue+'\n'
c+=' \n '
return c
#end def write_job_header
#end class Clustername
# nodes sockets cores ram qslots qlaunch qsubmit qstatus qdelete
Clustername( 4, 1, 16, 24, 4, 'mpirun', 'qsub', 'qstat', 'qdel')
'''
try:
our_load_source('*',os.path.expanduser('~/.nexus/local_machines.py'))
except IOError:
pass
except:
raise
#end try
#Decommissioned
class Kraken(Supercomputer):
name = 'kraken'
requires_account = True
def write_job_header(self,job):
c='#!/bin/bash\n'
c+='#PBS -A '+str(job.account)+'\n'
c+='#PBS -N '+str(job.name)+'\n'
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -l size='+str(job.tot_cores)+'\n'
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
cd $PBS_O_WORKDIR
export MPICH_PTL_SEND_CREDITS=-1
export MPICH_MAX_SHORT_MSG_SIZE=1024
export MPICH_PTL_UNEX_EVENTS=800000
export MPICH_UNEX_BUFFER_SIZE=16M
export MPI_MSGS_PER_PROC=32768
'''
return c
#end def write_job_header
#end class Kraken
#Decommissioned
class Jaguar(Supercomputer):
name = 'jaguar'
requires_account = True
def write_job_header(self,job):
if job.queue is None:
job.queue='batch'
#end if
c='#!/bin/bash\n'
c+='#PBS -A '+str(job.account)+'\n'
c+='#PBS -q '+job.queue+'\n'
c+='#PBS -N '+str(job.name)+'\n'
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -l size='+str(job.tot_cores)+'\n'
c+='#PBS -l gres=widow2%widow3\n'
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
export MPICH_PTL_SEND_CREDITS=-1
export MPICH_MAX_SHORT_MSG_SIZE=1024
export MPICH_PTL_UNEX_EVENTS=800000
export MPICH_UNEX_BUFFER_SIZE=16M
export MPI_MSGS_PER_PROC=32768
'''
return c
#end def write_job_header
#end class Jaguar
#Unknown
class Golub(Supercomputer):
name = 'golub'
def write_job_header(self,job):
if job.queue is None:
job.queue='secondary'
#end if
c=''
c+='#PBS -q '+job.queue+'\n'
c+='#PBS -N '+job.name+'\n'
c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn)
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -e '+job.errfile+'\n'
c+='#PBS -o '+job.outfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
cd ${PBS_O_WORKDIR}
'''
return c
#end def write_job_header
#end class Golub
# Decommissioned
class OIC5(Supercomputer):
name = 'oic5'
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'mstqmc13q'
#end if
ppn = job.processes_per_node
#ppn = 32/job.threads
#if ppn*job.threads!=32:
# self.error('ppn is not being set properly for OIC5\n perhaps the number of threads requested does not evenly divide the 32 cores\n you requested {0} threads'.format(job.threads))
##end if
c='#!/bin/bash\n'
c+='#PBS -q '+job.queue+'\n'
c+='#PBS -N '+str(job.name)+'\n'
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,ppn)
c+='#PBS -W x=\"NACCESSPOLICY:SINGLEJOB\"\n'
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
def read_process_id(self,output):
pid = None
lines = output.splitlines()
for line in lines:
if 'oic.ornl.gov' in line:
spid = line.split('.')[0]
if spid.isdigit():
pid = int(spid)
#end if
#end if
#end for
return pid
#end def read_process_id
#end class OIC5
class NerscMachine(Supercomputer):
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'regular'
#end if
c='#!/bin/bash\n'
c+='#SBATCH -p '+job.queue+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if job.user_env:
c+='#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c+='#SBATCH --export=NONE\n'
#end if
c+='''
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
return c
#end def write_job_header
#end class NerscMachine
#Decommissioned
class Cori(NerscMachine):
name = 'cori'
def pre_process_job(self,job):
if job.queue is None:
job.queue = 'regular'
#end if
if job.constraint is None:
job.constraint = 'knl'
#end if
# account for dual nature of Cori
if 'knl' in job.constraint:
self.nodes = 9688
self.procs_per_node = 1
self.cores_per_node = 68
self.ram_per_node = 96
elif 'haswell' in job.constraint:
self.nodes = 2388
self.procs_per_node = 2
self.cores_per_node = 32
self.ram_per_node = 128
elif 'amd' in job.constraint:
self.nodes = 20
self.procs_per_node = 2
self.cores_per_node = 32
self.ram_per_node = 2048
else:
self.error('SLURM input "constraint" must contain either "knl", "haswell", or "amd" on Cori\nyou provided: {0}'.format(job.constraint))
#end if
if job.core_spec is not None:
self.cores_per_node -= job.core_spec
#end if
#end def pre_process_job
def write_job_header(self,job):
self.pre_process_job(job) # sync machine view with job
if 'knl' in job.constraint:
hyperthreads = 4
elif 'haswell' in job.constraint:
hyperthreads = 2
elif 'amd' in job.constraint:
hyperthreads = 2
else:
self.error('SLURM input "constraint" must contain either "knl", "haswell" or "amd" on Cori\nyou provided: {0}'.format(job.constraint))
#end if
cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads
c='#!/bin/bash\n'
if job.account is not None:
c+= '#SBATCH -A '+job.account+'\n'
#end if
c+='#SBATCH -p '+job.queue+'\n'
c+='#SBATCH -C '+str(job.constraint)+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --tasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task)
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if job.user_env:
c+='#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c+='#SBATCH --export=NONE\n'
#end if
c+='''
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
if job.threads>1:
c+='''
export OMP_PROC_BIND=true
export OMP_PLACES=threads
'''
#end if
return c
#end def write_job_header
#end class Cori
# Active
class Perlmutter(NerscMachine):
name = 'perlmutter'
def pre_process_job(self,job):
# Set default queue and node type
if job.queue is None:
job.queue = 'regular'
#end if
if job.constraint is None:
job.constraint = 'cpu'
#end if
# Account for dual nature of Perlmutter
if 'cpu' in job.constraint:
self.nodes = 3072
self.procs_per_node = 2
self.cores_per_node = 128
self.ram_per_node = 512
elif 'gpu' in job.constraint:
self.nodes = 1536
self.procs_per_node = 1
self.cores_per_node = 64
self.ram_per_node = 256
self.gpus_per_node = 4
else:
self.error('SLURM input "constraint" must contain either "cpu" or "gpu" on Perlmutter\nyou provided: {0}'.format(job.constraint))
#end if
#end def pre_process_job
def write_job_header(self,job):
self.pre_process_job(job) # sync machine view with job
# Check if the user gave reasonable processes_per_node
if 'cpu' in job.constraint:
if job.processes_per_node > self.cores_per_node:
self.error('processes_per_node can not be greater than logical CPUs per node (256)\nyou provided: {0}'.format(job.processes_per_node))
#end if
elif 'gpu' in job.constraint:
if job.processes_per_node > self.gpus_per_node:
self.error('processes_per_node can not be greater than GPUs per node (4)\nyou provided: {0}'.format(job.processes_per_node))
#end if
# Also check if the user forgot to include '_g' in the account name for GPU jobs
if ('_g' in job.account) == False:
job.account = job.account + '_g'
#end if
#end if
# Check if the user gave reasonable queue inputs
if job.queue == 'debug':
base_partition = 1
max_partition = 8
max_time = 0.5
elif job.queue == 'regular':
base_partition = 1
max_partition = self.nodes
max_time = 12
elif job.queue == 'preempt':
base_partition = 1
max_partition = 128
max_time = 24
elif job.queue == 'overrun':
base_partition = 1
max_partition = self.nodes
max_time = 12
else:
self.error('The requested queue is not implemented.')
#end if
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time:
self.error('The maximum runtime on {0} queue should not be more than {1} hours\n you requested: {2} hours'.format(job.queue,max_time,job.total_hours))
#end if
if job.nodes<base_partition:
self.error('The number of nodes on {0} queue should not be less than {1}\n you requested: {2}'.format(job.queue,base_partition,job.nodes))
elif job.nodes>max_partition:
self.error('The number of nodes on {0} queue should not be more than {1}\n you requested: {2}'.format(job.queue,max_partition,job.nodes))
#end if
# Use the user cpus_per_task if specified. If not specified, then use available cpus for each process
if job.cpus_per_task is not None:
cpus_per_task = job.cpus_per_task
else:
hyperthreads = 2 # Both CPU and GPU nodes use the same AMD EPYC 7763 (Milan) CPUs
cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads
#end if
c='#!/bin/bash\n'
if job.account is not None:
c+= '#SBATCH -A '+job.account+'\n'
#end if
c+='#SBATCH -C '+str(job.constraint)+'\n'
c+='#SBATCH -q '+job.queue+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH -c '+str(cpus_per_task)+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if 'gpu' in job.constraint:
gpus_per_task = int(floor(float(self.gpus_per_node)/job.processes_per_node))
c+='#SBATCH --gpus-per-task={0}\n'.format(gpus_per_task)
#end if
if job.user_env:
c+='#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c+='#SBATCH --export=NONE\n'
#end if
c+='''
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
if (job.threads>1) and ('cpu' in job.constraint):
c+='''
export OMP_PROC_BIND=true
export OMP_PLACES=threads
'''
#end if
if 'gpu' in job.constraint:
c+='''
export SLURM_CPU_BIND="cores"
'''
#end if
return c
#end def write_job_header
#end class Perlmutter
# Active
class BlueWatersXK(Supercomputer):
name = 'bluewaters_xk'
requires_account = False
batch_capable = True
def write_job_header(self,job):
c='#!/bin/bash\n'
c+='#PBS -N '+str(job.name)+'\n'
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -l nodes={0}:ppn={1}:xk\n'.format(job.nodes,job.ppn)
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
#end class BlueWatersXK
# Active
class BlueWatersXE(Supercomputer):
name = 'bluewaters_xe'
requires_account = False
batch_capable = True
def write_job_header(self,job):
c='#!/bin/bash\n'
c+='#PBS -N '+str(job.name)+'\n'
c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
c+='#PBS -l nodes={0}:ppn={1}:xe\n'.format(job.nodes,job.ppn)
c+='#PBS -o '+job.outfile+'\n'
c+='#PBS -e '+job.errfile+'\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
#end class BlueWatersXE
#Decommissioned
class Titan(Supercomputer):
name = 'titan'
requires_account = True
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'batch'
#end if
c= '#!/bin/bash\n'
c+='#PBS -A {0}\n'.format(job.account)
c+='#PBS -q {0}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
c+='#PBS -l nodes={0}\n'.format(job.nodes)
#c+='#PBS -l gres=widow3\n'
c+='#PBS -l gres=atlas1\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
#end class Titan
# Active
class EOS(Supercomputer):
name = 'eos'
requires_account = True
batch_capable = True
def post_process_job(self,job):
if job.threads>1:
if job.threads<=8:
job.run_options.add(ss='-ss')
#end if
job.run_options.add(cc='-cc numa_node')
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'batch'
#end if
c= '#!/bin/bash\n'
c+='#PBS -A {0}\n'.format(job.account)
c+='#PBS -q {0}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
c+='#PBS -l nodes={0}\n'.format(job.nodes)
c+='#PBS -l gres=atlas1\n'
if job.user_env:
c+='#PBS -V\n'
#end if
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
#end class EOS
class ALCF_Machine(Supercomputer):
requires_account = True
batch_capable = True
executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
base_partition = None
def post_process_job(self,job):
job.sub_options.add(
env = '--env BG_SHAREDMEMSIZE=32',
mode = '--mode script'
)
#if job.processes<job.nodes: # seems like a good idea, but breaks idempotency
# job.processes_per_node=1
##end if
if job.nodes<self.base_partition:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} should not be less than {1}\n you requested: {2}'.format(self.name,self.base_partition,job.nodes))
else:
partition = log(float(job.nodes)/self.base_partition)/log(2.)
if abs(partition-int(partition))>1e-6:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,self.base_partition,job.nodes,self.base_partition*2**int(round(partition))))
#end if
#end if
valid_ppn = (1,2,4,8,16,32,64)
if job.processes_per_node is None:
self.warn('job may not run properly\nplease specify processes_per_node in each job to be launched with runjob on {0}'.format(self.name))
elif job.processes_per_node not in valid_ppn:
self.warn('job may not run properly\nprocesses_per_node is not a valid value for {0}\nprocesses_per_node provided: {1}\nvalid options are: {2}'.format(self.name,job.processes_per_node,valid_ppn))
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'default'
#end if
c= '#!/bin/bash\n'
c+='#COBALT -q {0}\n'.format(job.queue)
c+='#COBALT -A {0}\n'.format(job.account)
c+='#COBALT -n {0}\n'.format(job.nodes)
c+='#COBALT -t {0}\n'.format(job.total_minutes())
c+='#COBALT -O {0}\n'.format(job.identifier)
c+='\nLOCARGS="--block $COBALT_PARTNAME ${COBALT_CORNER:+--corner} $COBALT_CORNER ${COBALT_SHAPE:+--shape} $COBALT_SHAPE"\n'
c+='echo "Cobalt location args: $LOCARGS" >&2\n\n'
return c
#end def write_job_header
#end class ALCF_Machine
# Decommissioned
class Vesta(ALCF_Machine):
name = 'vesta'
base_partition = 32
#end class Vesta
# Decommissioned
class Cetus(ALCF_Machine):
name = 'cetus'
base_partition = 128
#end class Cetus
# Decommissioned
class Mira(ALCF_Machine):
name = 'mira'
base_partition = 512
#end class Mira
# Active
class Cooley(Supercomputer):
name = 'cooley'
requires_account = True
batch_capable = True
executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def post_process_job(self,job):
#if job.processes_per_node is None and job.threads!=1:
# self.error('threads must be 1,2,3,4,6, or 12 on Cooley\nyou provided: {0}'.format(job.threads))
##end if
#job.run_options.add(
#f = '-f $COBALT_NODEFILE',
#ppn = '-ppn {0}'.format(job.processes_per_node),
#)
return
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'default'
#end if
c= '#!/bin/bash\n'
c+='#COBALT -q {0}\n'.format(job.queue)
c+='#COBALT -A {0}\n'.format(job.account)
c+='#COBALT -n {0}\n'.format(job.nodes)
c+='#COBALT -t {0}\n'.format(job.total_minutes())
c+='#COBALT -O {0}\n'.format(job.identifier)
return c
#end def write_job_header
#end class Cooley
# Active
class Theta(Supercomputer):
name = 'theta'
requires_account = True
batch_capable = True
executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def post_process_job(self,job):
if job.hyperthreads is None:
job.hyperthreads = 1
#end if
job.run_options.add(
N = '-N {0}'.format(job.processes_per_node),
cc = '-cc depth',
d = '-d {0}'.format(job.threads),
j = '-j {0}'.format(job.hyperthreads),
e = '-e OMP_NUM_THREADS={0}'.format(job.threads),
)
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'default'
#end if
c= '#!/bin/bash\n'
c+='#COBALT -q {0}\n'.format(job.queue)
c+='#COBALT -A {0}\n'.format(job.account)
c+='#COBALT -n {0}\n'.format(job.nodes)
c+='#COBALT -t {0}\n'.format(job.total_minutes())
c+='#COBALT -O {0}\n'.format(job.identifier)
c+='#COBALT --attrs mcdram=cache:numa=quad\n'
return c
#end def write_job_header
#end class Theta
# Active
class Lonestar(Supercomputer): # Lonestar contribution from Paul Young
name = 'lonestar' # will be converted to lowercase anyway
requires_account = False
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'batch'
#end if
c= '#!/bin/bash\n'
#c+='#$ -A {0}\n'.format(job.account)
c+='#$ -q {0}\n'.format(job.queue)
c+='#$ -N {0}\n'.format(job.name)
c+='#$ -o {0}\n'.format(job.outfile)
c+='#$ -e {0}\n'.format(job.errfile)
c+='#$ -l h_rt={0}\n'.format(job.pbs_walltime())
c+='#$ -pe 12way {0}\n'.format(job.nodes*12)
c+='#$ -cwd\n'
if job.user_env:
c+='#$ -V\n'
#end if
return c
#end def write_job_header
def read_process_id(self,output):
pid = None
lines = output.splitlines()
for line in lines:
if 'Your job' in line:
spid = line.split(' ')[2]
if spid.isdigit():
pid = int(spid)
#end if
#end if
#end for
return pid
#end def read_process_id
#end class Lonestar
class ICMP_Machine(Supercomputer): # ICMP and Amos contributions from Ryan McAvoy
batch_capable = True
executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def write_job_header(self,job):
if job.queue is None:
job.queue = 'defq'
#end if
c= '#!/bin/bash -x\n'
c+='#SBATCH --export=ALL\n'
c+='#SBATCH -J {0}\n'.format(job.identifier)
c+='#SBATCH -p {0}\n'.format(job.queue)
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
c+='#SBATCH --nodes {0}\n'.format(job.nodes)
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
return c
#end def write_job_header
#end class ICMP_Machine
# Unknown
class Komodo(ICMP_Machine):
name = 'komodo'
#end class Komodo
# Unknown
class Matisse(ICMP_Machine):
name = 'matisse'
#end class Matisse
# Active
class Amos(Supercomputer):
name = 'amos'
#requires_account = True
batch_capable = True
executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def write_job_header(self,job):
if job.queue is None:
job.queue = 'debug'
#end if
if job.queue == 'debug':
base_partition = 1
max_partition = 32
max_time =1
elif job.queue == 'small':
base_partition = 1
max_partition = 64
max_time =24
elif job.queue == 'medium':
base_partition = 128
max_partition = 512
max_time =12
elif job.queue == 'large':
base_partition = 1024
max_partition = 2048
max_time =6
elif job.queue == 'verylarge':
base_partition = 3072
max_partition = 4096
max_time =6
#end if
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time:
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
job.hours = max_time
job.minutes =0
job.seconds =0
#end if
if job.nodes<base_partition:
self.warn('!!! ATTENTION !!!\n number of nodes in {0} should not be less than {1}\n you requested: {2}'.format(job.queue,base_partition,job.nodes))
elif job.nodes>max_partition:
self.warn('!!! ATTENTION !!!\n number of nodes in {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_partition,job.nodes))
else:
if job.queue != 'verylarge':
partition = log(float(job.nodes)/base_partition)/log(2.)
if abs(partition-int(partition))>1e-6:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,base_partition,job.nodes,base_partition*2**int(round(partition))))
elif job.nodes != 3072 and job.nodes != 4096:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be 3072 or 4096 you requested {1}'.format(self.name,job.nodes))
#end if
#end if
c= '#!/bin/bash -x\n'
c+='#SBATCH --export=ALL\n'
#c+=#SBATCH -D /gpfs/sb/data/<project>/<user>/
c+='#SBATCH -J {0}\n'.format(job.identifier)
c+='#SBATCH -p {0}\n'.format(job.queue)
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
c+='#SBATCH --nodes {0}\n'.format(job.nodes)
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
# c+='#SBATCH --mail-type=ALL'
# c+='#SBATCH --mail-user=<{0}>'
return c
#end def write_job_header
#end class Amos
class SnlMachine(Supercomputer):
requires_account = True
batch_capable = True
#executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
#for mpiexec
def post_process_job(self,job):
job.run_options.add(bindto="--bind-to none",npernode="--npernode {}".format(job.processes_per_node))
def write_job_header(self,job):
if job.queue is None:
job.queue='batch'
#end if
cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))
if job.qos == 'long':
max_time = 96
elif 'short' in job.queue:
max_time = 4
else:
max_time = 48
#end if
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time: # warn if job will take more than 48 hrs.
if job.qos == 'long':
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1} with --qos=\'long\'\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
elif 'short' in job.queue:
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1} with -p short[,batch]\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
else:
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
#end if
job.hours = max_time
job.minutes = 0
job.seconds = 0
#end if
c='#!/bin/bash\n'
c+='#SBATCH -p '+str(job.queue)+'\n'
c+='#SBATCH --job-name '+str(job.name)+'\n'
c+='#SBATCH --account='+str(job.account)+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
if job.qos:
c+='#SBATCH --qos={}\n'.format(job.qos)
c+='\n'
return c
#end def write_job_header
#end class SnlMachine
#Unknown
class Chama(SnlMachine):
name = 'chama'
#end class Chama
#Unknown
class Skybridge(SnlMachine):
name = 'skybridge'
#end class Skybridge
#Unknown
class Eclipse(SnlMachine):
name = 'eclipse'
#end class Eclipse
#Unknown
class Attaway(SnlMachine):
name = 'attaway'
#end class Attaway
#Unknown
class Manzano(SnlMachine):
name = 'manzano'
#end class Manzano
#Unknown
class Ghost(SnlMachine):
name = 'ghost'
#end class Ghost
#Unknown
class Amber(SnlMachine):
name = 'amber'
#end class Amber
#Unknown
class Uno(SnlMachine):
name = 'uno'
#end class Uno
#Unknown
class Solo(SnlMachine):
name = 'solo'
#end class Solo
# Active
# machines at LRZ https://www.lrz.de/english/
class SuperMUC(Supercomputer):
name = 'supermuc'
requires_account = False
batch_capable = True
query_with_username = False
def write_job_header(self,job):
if job.queue is None:
job.queue = 'general'
#end if
if job.type is None:
job.type = 'MPICH'
else:
job.type = job.type.lower()
if job.type=='mpich':
job.type=job.type.upper()
#end if
#end if
ibm = job.type=='parallel'
intel = job.type=='MPICH'
omp = isinstance(job.threads,int) and job.threads>1
if not ibm and not intel:
self.error('the only types of MPI supported are "parallel" and "MPICH"\nreceived MPI with type: {0}'.format(job.type))
#end if
c ='#!/bin/bash\n'
c+='#@ job_name = {0}\n'.format(job.name)
c+='#@ job_type = {0}\n'.format(job.type)
c+='#@ class = {0}\n'.format(job.queue)
c+='#@ node = {0}\n'.format(job.nodes)
if job.nodes<512:
icmin = 1
icmax = 1
else:
icmin = int(job.nodes//512)+1
icmax = icmin+1
#end if
c+='#@ island_count = {0},{1}\n'.format(icmin,icmax)
if intel and omp:
c+='#@ tasks_per_node = {0}\n'.format(job.processes_per_node)
else:
c+='#@ total_tasks = {0}\n'.format(job.processes)
#end if
c+='#@ wall_clock_limit = {0}\n'.format(job.ll_walltime())
c+='#@ network.MPI = sn_all,not_shared,us\n'
c+='#@ initialdir = {0}\n'.format(job.abs_dir)
c+='#@ output = {0}\n'.format(job.outfile)
c+='#@ error = {0}\n'.format(job.errfile)
c+='#@ energy_policy_tag = my_energy_tag\n'
c+='#@ minimize_time_to_solution = yes\n'
if job.email is None:
c+='#@ notification = never\n'
else:
c+='#@ notification = always\n'
c+='#@ notify_user = {0}\n'.format(job.email)
#end if
c+='#@ queue\n'
c+='. /etc/profile\n'
c+='. /etc/profile.d/modules.sh\n'
if ibm and omp:
c+='export MP_SINGLE_THREAD=no\n'
c+='export MP_TASK_AFFINITY=core:{0}\n'.format(job.threads)
elif intel and not omp:
c+='module unload mpi.ibm\n'
c+='module load mpi.intel\n'
elif intel and omp:
c+='module unload mpi.ibm\n'
c+='module load mpi.intel\n'
c+='export OMP_NUM_THREADS={0}\n'.format(job.threads)
#c+='module load mpi_pinning/hybrid_blocked\n'
#end if
return c
#end def write_job_header
#end class SuperMUC
# Active
class SuperMUC_NG(Supercomputer):
name = 'supermucng'
requires_account = True
batch_capable = True
query_with_username = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'general'
#end if
if job.hyperthreads is None:
job.hyperthreads = job.processes_per_node//48
if job.hyperthreads==0:
job.hyperthreads=None
#end if
#end if
if job.constraint is None:
job.contraint = 'scratch&work'
#end if
c ='#!/bin/bash\n'
c+='#SBATCH --account={}\n'.format(job.account)
c+='#SBATCH --partition={}\n'.format(job.queue)
c+='#SBATCH -J {}\n'.format(job.name)
c+='#SBATCH --time={}\n'.format(job.sbatch_walltime())
c+='#SBATCH -o ./{}\n'.format(job.outfile)
c+='#SBATCH -e ./{}\n'.format(job.errfile)
if job.switches is not None:
c+='#SBATCH --switches={}\n'.format(job.switches)
#end if
c+='#SBATCH --nodes={}\n'.format(job.nodes)
c+='#SBATCH --ntasks-per-node={}\n'.format(job.processes_per_node)
if job.ntasks_per_core is not None:
c+='#SBATCH --ntasks-per-core={}\n'.format(job.ntasks_per_core)
elif job.hyperthreads is not None:
c+='#SBATCH --ntasks-per-core={}\n'.format(job.hyperthreads)
#end if
if not job.default_cpus_per_task:
if job.cpus_per_task is None:
c+='#SBATCH --cpus-per-task={}\n'.format(job.threads)
else:
c+='#SBATCH --cpus-per-task={}\n'.format(job.cpus_per_task)
#end if
#end if
c+='#SBATCH -D ./\n'
c+='#SBATCH --no-requeue\n'
if job.constraint is not None:
c+='#--constraint="{}"\n'.format(job.constraint)
#end if
if job.email is not None:
c+='#SBATCH --mail-type=ALL\n'
c+='#SBATCH --mail-user={}\n'.format(job.email)
#end if
c+='#SBATCH --export=NONE\n'
if job.user_env:
c+='#SBATCH --get-user-env\n'
#end if
return c
#end def write_job_header
#end class SuperMUC_NG
# Decommissioned
class Stampede2(Supercomputer):
name = 'stampede2'
requires_account = True
batch_capable = True
#executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def write_job_header(self,job):
if job.queue is None:
job.queue='normal'
#end if
if job.queue == 'development':
max_nodes = 16
max_time = 2
elif job.queue == 'normal':
max_nodes = 256
max_time = 48
elif job.queue == 'large':
max_nodes = 2048
max_time = 48
elif job.queue == 'long':
max_nodes = 32
max_time = 96
elif job.queue == 'flat_quadrant':
max_nodes = 24
max_time = 48
elif job.queue == 'skx-dev':
max_nodes = 4
max_time = 2
elif job.queue == 'skx-normal':
max_nodes = 128
max_time = 48
elif job.queue == 'skx-large':
max_nodes = 868
max_time = 48
#end if
if 'skx' in job.queue:
max_processes_per_node = 48
else:
max_processes_per_node = 68
#end if
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time:
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
job.hours = max_time
job.minutes =0
job.seconds =0
#end if
if job.nodes > max_nodes:
self.warn('!!! ATTENTION !!!\n the maximum nodes on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_nodes,job.nodes))
job.nodes = max_nodes
#end if
if job.processes_per_node > max_processes_per_node:
self.warn('!!! ATTENTION !!!\n the maximum number of processes per node on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_processes_per_node,job.processes_per_node))
job.processes_per_node = max_processes_per_node
#end if
c='#!/bin/bash\n'
c+='#SBATCH --job-name '+str(job.name)+'\n'
c+='#SBATCH --account='+str(job.account)+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
c+='#SBATCH -p {0}\n'.format(job.queue)
c+='\n'
return c
#end def write_job_header
#end class Stampede2
# Decommissioned
# CADES at ORNL
class CadesMoab(Supercomputer):
name = 'cades_moab'
requires_account = True
batch_capable = True
def post_process_job(self,job):
ppn = job.processes_per_node
if job.threads>1 and ppn is not None and ppn>1:
processes_per_socket = int(floor(job.processes_per_node/2))
job.run_options.add(npersocket='--npersocket {0}'.format(processes_per_socket))
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'skylake'
#end if
if job.qos is None:
job.qos = 'std'
#end if
if job.group_list is None:
job.group_list = 'cades-'+job.account
#end if
c= '#!/bin/bash\n'
c+='#PBS -A {0}\n'.format(job.account)
c+='#PBS -W group_list={0}\n'.format(job.group_list)
c+='#PBS -q {0}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='#PBS -l qos={0}\n'.format(job.qos) # This could be qos=burst as well, but then it can be cancelled by others
c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes, job.ppn)
c+='''
echo $PBS_O_WORKDIR
cd $PBS_O_WORKDIR
'''
return c
#end def write_job_header
#end class CadesMoab
# Active
class CadesSlurm(Supercomputer):
name = 'cades'
requires_account = True
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'skylake'
#end if
c = '#!/bin/bash\n'
c += '#SBATCH -A {}\n'.format(job.account)
c += '#SBATCH -p {}\n'.format(job.queue)
c += '#SBATCH -J {}\n'.format(job.name)
c += '#SBATCH -t {}\n'.format(job.sbatch_walltime())
c += '#SBATCH -N {}\n'.format(job.nodes)
c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c += '#SBATCH --mem=0\n' # required on Cades
c += '#SBATCH -o '+job.outfile+'\n'
c += '#SBATCH -e '+job.errfile+'\n'
c += '#SBATCH --exclusive\n'
if job.user_env:
c += '#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c += '#SBATCH --export=NONE\n'
#end if
return c
#end def write_job_header
#end class CadesSlurm
# Active
# Inti at ORNL
class Inti(Supercomputer):
name = 'inti'
requires_account = False
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'QMCREGULAR'
#end if
c = '#!/bin/bash\n'
# c += '#SBATCH -A {}\n'.format(job.account)
c += '#SBATCH -p {}\n'.format(job.queue)
c += '#SBATCH -J {}\n'.format(job.name)
c += '#SBATCH -t {}\n'.format(job.sbatch_walltime())
c += '#SBATCH -N {}\n'.format(job.nodes)
c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c += '#SBATCH --mem=0\n' # required on Cades
c += '#SBATCH -o '+job.outfile+'\n'
c += '#SBATCH -e '+job.errfile+'\n'
c += '#SBATCH --exclusive\n'
if job.user_env:
c += '#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c += '#SBATCH --export=NONE\n'
#end if
return c
#end def write_job_header
#end class Inti
# Active
# Baseline at ORNL https://docs.cades.olcf.ornl.gov/baseline_user_guide/baseline_user_guide.html
class Baseline(Supercomputer):
name = 'baseline'
requires_account = True
batch_capable = True
queue_configs={
'default': 'batch_cnms',
'batch': {
'max_nodes': 138,
'max_walltime': '24:00:00',
},
'batch_low_memory': {
'max_nodes': 68,
'max_walltime': '24:00:00',
},
'batch_high_memory': {
'max_nodes': 70,
'max_walltime': '24:00:00',
},
'batch_ccsi': {
'max_nodes': 20,
'max_walltime': '24:00:00',
},
'batch_cnms': {
'max_nodes': 20,
'max_walltime': '24:00:00',
},
'gpu_acmhs': {
'max_nodes': 1,
'max_walltime': '24:00:00',
}
}
def write_job_header(self,job):
self.validate_queue_config(job)
c = '#!/bin/bash\n'
c += '#SBATCH -A {}\n'.format(job.account)
c += '#SBATCH -p {}\n'.format(job.queue)
c += '#SBATCH -J {}\n'.format(job.name)
c += '#SBATCH -t {}\n'.format(job.sbatch_walltime())
c += '#SBATCH -N {}\n'.format(job.nodes)
c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c += '#SBATCH -o '+job.outfile+'\n'
c += '#SBATCH -e '+job.errfile+'\n'
if job.user_env:
c += '#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c += '#SBATCH --export=NONE\n'
#end if
return c
#end def write_job_header
#end class Baseline
class Frontier(Supercomputer):
name = 'frontier'
requires_account = True
batch_capable = True
queue_configs = {
'default': 'batch',
'batch': {
'max_nodes': 9664,
'max_walltime': '12:00:00',
},
'extended': {
'max_nodes': 1894,
'max_walltime': '24:00:00',
},
'debug': {
'max_nodes': 1,
'max_walltime': '01:00:00',
}
}
def pre_process_job(self,job):
# Set default queue and node type
if job.queue is None:
job.queue = 'batch'
#end if
if job.constraint is None:
job.constraint = 'cpu'
#end if
# Account for dual nature of Frontier
if 'cpu' in job.constraint:
self.cores_per_node = 56
elif 'gpu' in job.constraint:
self.cores_per_node = 56
self.gpus_per_node = 4
else:
self.error('SLURM input "constraint" must contain either "cpu" or "gpu" on Frontier\nyou provided: {0}'.format(job.constraint))
#end if
#end def pre_process_job
def post_process_job(self, job):
if 'cpu' in job.constraint:
job.run_options.add(
cpu_bind='--cpu-bind=threads',
threads_per_core='--threads-per-core={0}'.format(job.threads)
)
elif 'gpu' in job.constraint:
gpus_per_task = int(floor(float(self.gpus_per_node)/job.processes_per_node))
job.run_options.add(
gpu_bind='--gpu-bind=closest',
gpus_per_task='--gpus-per-task={0}'.format(gpus_per_task)
)
#end if
job.run_options.add(
N='-N {}'.format(job.nodes),
n='-n {}'.format(job.processes),
c='-c {}'.format(job.threads),
)
def write_job_header(self, job):
self.validate_queue_config(job)
if job.queue is None:
job.queue = 'batch'
elif job.queue == 'debug':
job.qos = 'debug'
job.queue = 'batch'
#end if
c = '#!/bin/sh\n'
c += '#SBATCH -A {account}\n'.format(account=job.account)
c += '#SBATCH -p {queue}\n'.format(queue=job.queue)
c += '#SBATCH -J {name}\n'.format(name=job.name)
c += '#SBATCH -t {time}\n'.format(time=job.lsf_walltime())
c += '#SBATCH -N {nodes}\n'.format(nodes=job.nodes)
c += '#SBATCH -S 8\n' # Uses default low-noise mode layout(default), reduces number of cores from 64 to 56.
c += '#SBATCH -o {name}.out\n'.format(name=job.name)
c += '#SBATCH -e {name}.err\n'.format(name=job.name)
return c
#end class Frontier
# Active
# BESMS is at ORNL
class Besms(Supercomputer):
name = 'besms'
requires_account = True
batch_capable = True
# Using sinfo to get the queue configs
queue_configs={
'default': 't92',
't92': {
'max_nodes': 10,
'max_walltime': '672:00:00',
},
't92-burst': {
'max_nodes': 25,
'max_walltime': '672:00:00',
},
'burst': {
'max_nodes': 166,
'max_walltime': '48:00:00',
},
}
def write_job_header(self,job):
self.validate_queue_config(job)
c = '#!/bin/bash\n'
c += '#SBATCH -A {}\n'.format(job.account)
c += '#SBATCH -p {}\n'.format(job.queue)
c += '#SBATCH -J {}\n'.format(job.name)
c += '#SBATCH -t {}\n'.format(job.sbatch_walltime())
c += '#SBATCH -N {}\n'.format(job.nodes)
c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c += '#SBATCH -o '+job.outfile+'\n'
c += '#SBATCH -e '+job.errfile+'\n'
c += '#SBATCH --mem=350G\n'
c += '#SBATCH --exclusive\n'
if job.user_env:
c += '#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c += '#SBATCH --export=NONE\n'
#end if
return c
#end def write_job_header
#end class Baseline
# Decommissioned
# Summit at ORNL
class Summit(Supercomputer):
name = 'summit'
requires_account = True
batch_capable = True
def post_process_job(self,job):
# add the options only if the user has not supplied options
if len(job.run_options)==0:
opt = obj(
launch_dist = '-d packed',
bind = '-b rs',
)
if job.gpus is None:
job.gpus = 6 # gpus to use per node
#end if
if job.alloc_flags is None:
job.alloc_flags = 'smt1'
#end if
if job.gpus==0:
if job.processes%2==0:
resource_sets_per_node = 2
else:
resource_sets_per_node = 1
#end if
nrs = job.nodes*resource_sets_per_node
pprs = job.processes_per_node//resource_sets_per_node
gpurs = 0
else:
ppn = job.processes_per_node
if ppn is None:
self.warn('job may not run properly on Summit\nat least one mpi process should be present for each node\nplease check the generated bsub file for correctness')
ppn = 0
#end if
if ppn%job.gpus!=0:
self.warn('job may not run properly on Summit\nprocesses per node should divide evenly into number of gpus requested\nprocesses per node requested: {0}\ngpus per node requested: {1}\nplease check the generated bsub file for correctness'.format(job.processes_per_node,job.gpus))
#end if
resource_sets_per_node = job.gpus
nrs = job.nodes*resource_sets_per_node
pprs = ppn//resource_sets_per_node
gpurs = 1
#end if
opt.set(
resource_sets= '-n {0}'.format(nrs),
rs_per_node = '-r {0}'.format(resource_sets_per_node),
tasks_per_rs = '-a {0}'.format(pprs),
cpus_per_rs = '-c {0}'.format(pprs*job.threads),
gpus_per_rs = '-g {0}'.format(gpurs),
)
job.run_options.add(**opt)
#end if
#end def post_process_job
def write_job_header(self,job):
c ='#!/bin/bash\n'
c+='#BSUB -P {0}\n'.format(job.account)
if job.queue is not None:
c+='#BSUB -q {0}\n'.format(job.queue)
#end if
c+='#BSUB -J {0}\n'.format(job.name)
c+='#BSUB -o {0}\n'.format(job.outfile)
c+='#BSUB -e {0}\n'.format(job.errfile)
c+='#BSUB -W {0}\n'.format(job.lsf_walltime())
c+='#BSUB -nnodes {0}\n'.format(job.nodes)
if job.alloc_flags is not None:
c+='#BSUB -alloc_flags "{0}"\n'.format(job.alloc_flags)
#end if
return c
#end def write_job_header
def read_process_id(self,output):
pid = None
tokens = output.split()
for t in tokens:
if t.startswith('<'):
spid = t.strip('<>').strip()
if spid.isdigit():
pid = int(spid)
break
#end if
#end if
#end for
return pid
#end def read_process_id
#end class Summit
# Unknown
## Added 28/11/2019 by A Zen
class Rhea(Supercomputer):
name = 'rhea'
requires_account = True
batch_capable = True
#executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def post_process_job(self,job):
job.run_options.add(
N='-N {}'.format(job.nodes),
n='-n {}'.format(job.processes),
)
if job.threads>1:
job.run_options.add(
c = '-c {}'.format(job.threads),
)
if 'cpu_bind' not in job.run_options:
if job.processes_per_node==self.cores_per_node:
cpu_bind = '--cpu-bind=threads'
else:
cpu_bind = '--cpu-bind=cores'
#end if
job.run_options.add(
cpu_bind = cpu_bind
)
#end if
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue='batch'
#end if
base_partition = None
max_partition = 384
if job.nodes <= 16:
max_time = 48
elif job.nodes <= 64:
max_time = 36
else:
max_time = 3
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time: # warn if job will take more than 96 hrs.
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
job.hours = max_time
job.minutes =0
job.seconds =0
#end if
c='#!/bin/bash\n'
c+='#SBATCH --job-name '+str(job.name)+'\n'
c+='#SBATCH --account='+str(job.account)+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
if job.email is not None:
c+='#SBATCH --mail-user {}\n'.format(job.email)
c+='#SBATCH --mail-type ALL\n'
#c+='#SBATCH --mail-type FAIL\n'
#end if
c+='\n'
c+='cd $SLURM_SUBMIT_DIR\n'
c+='\n'
c+='echo JobID : $SLURM_JOBID \n'
c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n'
c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n'
c+='\n'
return c
#end def write_job_header
#end class Rhea
# Active
# Andes at ORNL
## Added 19/03/2021 by A Zen
class Andes(Supercomputer):
name = 'andes'
requires_account = True
batch_capable = True
#executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def post_process_job(self,job):
if job.threads>1:
job.run_options.add(
c = '-c {}'.format(job.threads),
)
if 'cpu_bind' not in job.run_options:
if job.processes_per_node==self.cores_per_node:
cpu_bind = '--cpu-bind=threads'
else:
cpu_bind = '--cpu-bind=cores'
#end if
job.run_options.add(
cpu_bind = cpu_bind
)
#end if
#end if
job.run_options.add(
N='-N {}'.format(job.nodes),
n='-n {}'.format(job.processes),
)
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue='batch'
#end if
base_partition = None
max_partition = 384
if job.nodes <= 16:
max_time = 48
elif job.nodes <= 64:
max_time = 36
else:
max_time = 3
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time: # warn if job will take more than 96 hrs.
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
job.hours = max_time
job.minutes =0
job.seconds =0
#end if
c='#!/bin/bash\n'
c+='#SBATCH --job-name '+str(job.name)+'\n'
c+='#SBATCH --account='+str(job.account)+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
if job.email is not None:
c+='#SBATCH --mail-user {}\n'.format(job.email)
c+='#SBATCH --mail-type ALL\n'
#c+='#SBATCH --mail-type FAIL\n'
#end if
c+='\n'
c+='cd $SLURM_SUBMIT_DIR\n'
c+='\n'
c+='echo JobID : $SLURM_JOBID \n'
c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n'
c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n'
c+='\n'
return c
#end def write_job_header
#end class Andes
# Active
## Added 05/04/2022 by A Zen
class Archer2(Supercomputer):
# https://docs.archer2.ac.uk/user-guide/hardware/
name = 'archer2'
requires_account = True
batch_capable = True
#executable_subfile = True
prefixed_output = True
outfile_extension = '.output'
errfile_extension = '.error'
def post_process_job(self,job):
job.run_options.add(
distribution='--distribution=block:block',
hint='--hint=nomultithread',
N='-N {}'.format(job.nodes),
n='-n {}'.format(job.processes),
)
if job.threads>1:
job.run_options.add(
c = '-c {}'.format(job.threads),
)
# if 'cpu_bind' not in job.run_options:
# if job.processes_per_node==self.cores_per_node:
# cpu_bind = '--cpu-bind=threads'
# else:
# cpu_bind = '--cpu-bind=cores'
# #end if
# job.run_options.add(
# cpu_bind = cpu_bind
# )
#end if
#end if
#end def post_process_job
def write_job_header(self,job):
if job.qos is None:
job.qos='standard'
#end if
base_partition = None
if job.qos == 'long':
max_time = 48
max_partition = 64
elif 'short' in job.qos:
max_time = 20.0/60.0
max_partition = 32
else:
max_time = 24
max_partition = 1024
#end if
job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
if job.total_hours > max_time:
self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours))
job.hours = max_time
job.minutes =0
job.seconds =0
#end if
if job.nodes > max_partition:
self.warn('!!! ATTENTION !!!\n the maximum nodes on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_partition,job.nodes))
job.nodes = max_partition
#end if
c='#!/bin/bash\n'
c+='#SBATCH --job-name '+str(job.name)+'\n'
c+='#SBATCH --account='+str(job.account)+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
c+='#SBATCH -o {0}\n'.format(job.outfile)
c+='#SBATCH -e {0}\n'.format(job.errfile)
c+='#SBATCH --partition=standard\n'
c+='#SBATCH --qos={0}\n'.format(job.qos)
if job.email is not None:
c+='#SBATCH --mail-user {}\n'.format(job.email)
c+='#SBATCH --mail-type ALL\n'
#c+='#SBATCH --mail-type FAIL\n'
#end if
c+='\n'
#c+='cd $SLURM_SUBMIT_DIR\n'
#c+='\n'
c+='echo JobID : $SLURM_JOBID\n'
c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES\n'
c+='echo List of nodes assigned to the job: $SLURM_NODELIST\n'
c+='\n'
return c
#end def write_job_header
#end class Archer2
# Unknown
class Tomcat3(Supercomputer):
name = 'tomcat3'
requires_account = False
batch_capable = True
redirect_output = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'tomcat'
#end if
c = '#!/bin/bash -l\n'
c+='#SBATCH -J {}\n'.format(job.name)
c+='#SBATCH -N {}\n'.format(job.nodes)
c+='#SBATCH -t {}\n'.format(job.sbatch_walltime())
c+='#SBATCH -p {}\n'.format(job.queue)
if job.email is not None:
c+='#SBATCH --mail-user {}\n'.format(job.email)
c+='#SBATCH --mail-type ALL\n'
#end if
c+='#. /home/rcohen/.bashrc\n'
if len(job.presub)==0:
c+='unalias cd; source /mnt/beegfs/intel/parallel_studio_xe_2019.3.062/bin/psxevars.sh\n'
c+='ulimit -a\n'
#end if
return c
#end def write_job_header
#end class Tomcat3
# Active
# Polaris at ANL
class Polaris(Supercomputer):
name = 'polaris'
requires_account = True
batch_capable = True
special_bundling = True
def post_process_job(self,job):
if len(job.run_options)==0:
opt = obj(
ppn = '--ppn {}'.format(job.processes_per_node),
depth = '--depth={}'.format(job.threads),
cpubind = '--cpu-bind depth',
threads = '--env OMP_NUM_THREADS={}'.format(job.threads),
)
job.run_options.add(**opt)
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'prod'
#end if
if job.filesystems is None:
job.filesystems = 'home:eagle:grand'
#end if
c= '#!/bin/sh\n'
c+='#PBS -l select={}:system=polaris\n'.format(job.nodes)
c+='#PBS -l place=scatter\n'
c+='#PBS -l filesystems={}\n'.format(job.filesystems)
c+='#PBS -l walltime={}\n'.format(job.pbs_walltime())
c+='#PBS -A {}\n'.format(job.account)
c+='#PBS -q {}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -k doe\n'
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='\n'
c+='cd ${PBS_O_WORKDIR}\n'
return c
#end def write_job_header
def specialized_bundle_commands(self,job,launcher,serial):
c = ''
j0 = job.bundled_jobs[0]
c+='split --lines={} --numeric-suffixes=1 --suffix-length=3 $PBS_NODEFILE local_hostfile.\n'.format(j0.nodes)
c+='\n'
lhfiles = ['local_hostfile.'+str(n+1).zfill(3) for n in range(len(job.bundled_jobs))]
for j,lh in zip(job.bundled_jobs,lhfiles):
c+='cp {} {}\n'.format(lh,j.abs_subdir)
#end for
for j,lh in zip(job.bundled_jobs,lhfiles):
j.run_options.add(hostfile='--hostfile '+lh)
c+='\ncd '+j.abs_subdir+'\n'
c+=j.run_command(launcher,redirect=True,serial=serial)+'\n'
#end for
c+='\nwait\n'
return c
#end def specialized_bundle_commands
#end class Polaris
# Active
# Aurora at ANL
class Aurora(Supercomputer):
name = 'aurora'
requires_account = True
batch_capable = True
special_bundling = True
def pre_process_job(self,job):
# Set default queue and node type
if job.queue is None:
job.queue = 'prod'
#end if
if job.filesystems is None:
job.filesystems = 'flare'
#end if
if job.constraint is None:
job.constraint = 'cpu'
#end if
# Account for dual nature of Perlmutter
if 'cpu' in job.constraint:
self.procs_per_node = 2
self.cores_per_node = 102
self.ram_per_node = 1024
elif 'gpu' in job.constraint:
self.procs_per_node = 1
self.cores_per_node = 102
self.ram_per_node = 768
self.gpus_per_node = 6
else:
self.error('CPU or GPU constraint must be specified for Aurora')
#end if
#end def pre_process_job
def post_process_job(self,job):
if len(job.run_options)==0:
if 'cpu' in job.constraint:
threads = '--env OMP_NUM_THREADS={} --env OMP_PLACES=cores'.format(job.threads)
cpubind = '--cpu-bind depth'
elif 'gpu' in job.constraint:
threads = '--env OMP_NUM_THREADS={}'.format(job.threads)
cpubind = '--cpu-bind=list'
ind = 1
for _ in range(job.processes_per_node):
cpubind += ':{}-{}'.format(ind, ind + job.threads - 1)
if ind + 2*(job.threads - 1) > 53 and ind < 53:
ind = 53
else:
ind += job.threads
# end for
else:
self.error('CPU or GPU constraint must be specified for Aurora')
#end if
opt = obj(
ppn = '--ppn {}'.format(job.processes_per_node),
depth = '--depth={}'.format(job.threads),
cpubind = cpubind,
threads = threads,
# affinity= '/soft/tools/mpi_wrapper_utils/gpu_tile_compact.sh',
)
job.run_options.add(**opt)
#end if
#end def post_process_job
def write_job_header(self,job):
c= '#!/bin/sh\n'
c+='#PBS -l select={}\n'.format(job.nodes)
c+='#PBS -l place=scatter\n'
c+='#PBS -l filesystems={}\n'.format(job.filesystems)
c+='#PBS -l walltime={}\n'.format(job.pbs_walltime())
c+='#PBS -A {}\n'.format(job.account)
c+='#PBS -q {}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -k doe\n'
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='\n'
c+='cd ${PBS_O_WORKDIR}\n'
if 'gpu' in job.constraint:
c+='export MPICH_GPU_SUPPORT_ENABLED=1\n'
return c
#end def write_job_header
def specialized_bundle_commands(self,job,launcher,serial):
c = ''
j0 = job.bundled_jobs[0]
c+='split --lines={} --numeric-suffixes=1 --suffix-length=3 $PBS_NODEFILE local_hostfile.\n'.format(j0.nodes)
c+='\n'
lhfiles = ['local_hostfile.'+str(n+1).zfill(3) for n in range(len(job.bundled_jobs))]
for j,lh in zip(job.bundled_jobs,lhfiles):
c+='cp {} {}\n'.format(lh,j.abs_subdir)
#end for
for j,lh in zip(job.bundled_jobs,lhfiles):
j.run_options.add(hostfile='--hostfile '+lh)
c+='\ncd '+j.abs_subdir+'\n'
c+=j.run_command(launcher,redirect=True,serial=serial)+'\n'
#end for
c+='\nwait\n'
return c
#end def specialized_bundle_commands
#end class Aurora
# Active
# Improv at ANL (LCRC)
class Improv(Supercomputer):
name = 'improv'
requires_account = True
batch_capable = True
def post_process_job(self,job):
if len(job.run_options)==0:
opt = obj(
mapby = '--map-by ppr:{}:package'.format(job.processes_per_proc),
bindto = '--bind-to socket',
)
job.run_options.add(**opt)
#end if
if job.threads>1 and job.env is None:
job.set_environment(
OMP_PLACES = 'cores',
OMP_PROC_BIND = 'close',
)
#end if
#end def post_process_job
def write_job_header(self,job):
if job.queue is None:
job.queue = 'compute'
#end if
c= '#!/bin/bash -l\n'
if job.threads>1:
c+='#PBS -l select={}:ncpus=128:mpiprocs={}:ompthreads={}\n'.format(job.nodes,job.processes_per_node,job.threads)
else:
c+='#PBS -l select={}:ncpus=128:mpiprocs={}\n'.format(job.nodes,job.processes)
#end if
c+='#PBS -l walltime={}\n'.format(job.pbs_walltime())
c+='#PBS -A {}\n'.format(job.account)
c+='#PBS -q {}\n'.format(job.queue)
c+='#PBS -N {0}\n'.format(job.name)
c+='#PBS -k doe\n'
c+='#PBS -o {0}\n'.format(job.outfile)
c+='#PBS -e {0}\n'.format(job.errfile)
c+='\n'
c+='cd ${PBS_O_WORKDIR}\n'
return c
#end def write_job_header
#end class Improv
# Active
# Kagayaki at JAIST
## Added 05/04/2023 by Tom Ichibha
class Kagayaki(Supercomputer):
name = 'kagayaki'
requires_account = False
batch_capable = True
special_bundling = False
def process_job_options(self,job):
# job.run_options.add(nodefile='-machinefile $PBS_NODEFILE', np='-np '+str(job.processes))
opt = obj(
nodefile='-machinefile $PBS_NODEFILE',
omp='-x OMP_NUM_THREADS',
np='-np {}'.format(job.processes),
)
job.run_options.add(**opt)
def write_job_header(self,job):
ppn = 16 if job.queue in ['Default', 'SINGLE', 'LONG', 'DEFAULT'] else 128
c=''
c+='#!/bin/bash\n'
if (job.queue is not None):
c+='#PBS -q ' + job.queue + '\n'
c+='#PBS -N ' + job.name + '\n'
c+='#PBS -o ' + job.outfile +'\n'
c+='#PBS -e ' + job.errfile + '\n'
c+='#PBS -l select={0}:ncpus={1}:mpiprocs={1}\n'.format(job.nodes, ppn)
c+='cd $PBS_O_WORKDIR\n'
c+='export OMP_NUM_THREADS=' + str(job.threads) + '\n'
return c
#end def write_job_header
#end class Kagayaki
# Active
# Kestrel at NREL
class Kestrel(Supercomputer):
name = 'kestrel'
requires_account = True
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'regular'
#end if
c='#!/bin/bash\n'
c+='#SBATCH -A '+job.account+'\n'
c+='#SBATCH -p '+job.queue+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if job.user_env:
c+='#SBATCH --export=ALL\n' # equiv to PBS -V
else:
c+='#SBATCH --export=NONE\n'
#end if
c+='''
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
return c
#end def write_job_header
#end class Kestrel
# Active
# Lassen at LLNL
class Lassen(Supercomputer):
name = 'lassen'
requires_account = True
batch_capable = True
def post_process_job(self,job):
# add the options only if the user has not supplied options
if len(job.run_options)==0:
opt = obj(
nodes = '-N {}'.format(job.nodes),
tasks = '-T {}'.format(job.processes_per_node),
)
if job.threads>1:
opt.threads = '--threads={}'.format(job.threads)
#end if
if job.gpus is None:
job.gpus = 4# gpus to use per node
#end if
#if job.alloc_flags is None:
# job.alloc_flags = 'smt1'
##end if
if job.gpus==0:
None
else:
opt.mgpu = '-M "-gpu"'
#end if
job.run_options.add(**opt)
#end if
#end def post_process_job
def write_job_header(self,job):
c ='#!/bin/bash\n'
c+='#BSUB -G {0}\n'.format(job.account)
if job.queue is not None:
c+='#BSUB -q {0}\n'.format(job.queue)
#end if
c+='#BSUB -J {0}\n'.format(job.name)
c+='#BSUB -o {0}\n'.format(job.outfile)
c+='#BSUB -e {0}\n'.format(job.errfile)
c+='#BSUB -W {0}\n'.format(job.lsf_walltime())
c+='#BSUB -nnodes {0}\n'.format(job.nodes)
#if job.alloc_flags is not None:
# c+='#BSUB -alloc_flags "{0}"\n'.format(job.alloc_flags)
##end if
return c
#end def write_job_header
def read_process_id(self,output):
pid = None
tokens = output.split()
for t in tokens:
if t.startswith('<'):
spid = t.strip('<>').strip()
if spid.isdigit():
pid = int(spid)
break
#end if
#end if
#end for
return pid
#end def read_process_id
#end class Lassen
# Ruby at LLNL
class Ruby(Supercomputer):
name = 'ruby'
requires_account = True
batch_capable = True
def write_job_header(self,job):
if job.queue is None:
job.queue = 'regular'
#end if
c='#!/bin/bash\n'
c+='#SBATCH -A '+job.account+'\n'
c+='#SBATCH -p '+job.queue+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if job.user_env:
c+='#SBATCH --export=ALL\n'
else:
c+='#SBATCH --export=NONE\n'
#end if
c+='''
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
return c
#end def write_job_header
#end class Ruby
#Known machines
# workstations
for cores in range(1,128+1):
Workstation('ws'+str(cores),cores,'mpirun'),
#end for
# supercomputers and clusters
# nodes sockets cores ram qslots qlaunch qsubmit qstatus qdelete
Jaguar( 18688, 2, 8, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel')
Kraken( 9408, 2, 6, 16, 100, 'aprun', 'qsub', 'qstat', 'qdel')
Golub( 512, 2, 6, 32, 1000, 'mpirun', 'qsub', 'qstat', 'qdel')
OIC5( 28, 2, 16, 128, 1000, 'mpirun', 'qsub', 'qstat', 'qdel')
Cori( 9688, 1, 68, 96, 100, 'srun', 'sbatch', 'squeue', 'scancel')
BlueWatersXK( 3072, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel')
BlueWatersXE(22640, 2, 16, 64, 100, 'aprun', 'qsub', 'qstat', 'qdel')
Titan( 18688, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel')
EOS( 744, 2, 8, 64, 1000, 'aprun', 'qsub', 'qstat', 'qdel')
Vesta( 2048, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel')
Cetus( 1024, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel')
Mira( 49152, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel')
Cooley( 126, 2, 6, 384, 10, 'mpirun', 'qsub', 'qstata', 'qdel')
Theta( 4392, 1, 64, 192, 1000, 'aprun', 'qsub', 'qstata', 'qdel')
Lonestar( 22656, 2, 6, 12, 128, 'ibrun', 'qsub', 'qstat', 'qdel')
Matisse( 20, 2, 8, 64, 2, 'mpirun', 'sbatch', 'sacct', 'scancel')
Komodo( 24, 2, 6, 48, 2, 'mpirun', 'sbatch', 'sacct', 'scancel')
Amos( 5120, 1, 16, 16, 128, 'srun', 'sbatch', 'sacct', 'scancel')
Chama( 1232, 2, 8, 64, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Uno( 168, 2, 8, 128, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Eclipse( 1488, 2, 18, 128, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Attaway( 1488, 2, 18, 192, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Manzano( 1488, 2, 24, 192, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Ghost( 740, 2, 18, 128, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Amber( 1496, 2, 56, 256, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Skybridge( 1848, 2, 8, 64, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
Solo( 374, 2, 18, 128, 1000,'mpiexec', 'sbatch', 'squeue', 'scancel')
SuperMUC( 512, 1, 28, 256, 8,'mpiexec', 'llsubmit', 'llq','llcancel')
Stampede2( 4200, 1, 68, 96, 50, 'ibrun', 'sbatch', 'squeue', 'scancel')
CadesMoab( 156, 2, 18, 128, 100, 'mpirun', 'qsub', 'qstat', 'qdel')
CadesSlurm( 156, 2, 18, 128, 100, 'mpirun', 'sbatch', 'squeue', 'scancel')
Summit( 4608, 2, 21, 512, 100, 'jsrun', 'bsub', 'bjobs', 'bkill')
Rhea( 512, 2, 8, 128, 1000, 'srun', 'sbatch', 'squeue', 'scancel')
Andes( 704, 2, 16, 256, 1000, 'srun', 'sbatch', 'squeue', 'scancel')
Tomcat3( 8, 1, 64, 192, 1000, 'mpirun', 'sbatch', 'sacct', 'scancel')
SuperMUC_NG( 6336, 1, 48, 96, 1000,'mpiexec', 'sbatch', 'sacct', 'scancel')
Archer2( 5860, 2, 64, 512, 1000, 'srun', 'sbatch', 'squeue', 'scancel')
Polaris( 560, 1, 32, 512, 8,'mpiexec', 'qsub', 'qstat', 'qdel')
Kagayaki( 240, 2, 64, 512, 20, 'mpirun', 'qsub', 'qstat', 'qdel')
Perlmutter( 3072, 2, 128, 512, 5000, 'srun', 'sbatch', 'squeue', 'scancel')
Improv( 825, 2, 64, 256, 100, 'mpirun', 'qsub', 'qstat', 'qdel')
Lassen( 756, 2, 21, 512, 100, 'lrun', 'bsub', 'bjobs', 'bkill')
Ruby( 1480, 2, 28, 192, 100, 'srun', 'sbatch', 'squeue', 'scancel')
Kestrel( 2144, 2, 52, 256, 100, 'srun', 'sbatch', 'squeue', 'scancel')
Inti( 13, 2, 64, 256, 100, 'srun', 'sbatch', 'squeue', 'scancel')
Baseline( 128, 2, 64, 512, 100, 'srun', 'sbatch', 'squeue', 'scancel')
Besms( 166, 1, 96, 768, 1000, 'srun', 'sbatch', 'squeue', 'scancel')
Frontier( 9856, 4, 14, 64, 1000, 'srun', 'sbatch', 'squeue', 'scancel')
Aurora( 10624, 2, 102, 512, 1000,'mpiexec', 'qsub', 'qstat', 'qdel')
#machine accessor functions
get_machine_name = Machine.get_hostname
get_machine = Machine.get
#rename Job with lowercase
job=Job