adding ALCF machines: Vesta, Cetus, & Mira

git-svn-id: https://subversion.assembla.com/svn/qmcdev/trunk@6314 e5b18d87-469d-4833-9cc0-8cdfa06e9491
This commit is contained in:
Jaron Krogel 2014-06-09 12:43:21 +00:00
parent 251c05e73c
commit 916572c8db
2 changed files with 251 additions and 80 deletions

View File

@ -4,7 +4,7 @@ import time
#from multiprocessing import cpu_count
from socket import gethostname
from subprocess import Popen,PIPE
from numpy import array,mod,floor,ceil
from numpy import array,mod,floor,ceil,round,log
from generic import obj
from project_base import Pobj
from debug import *
@ -35,6 +35,56 @@ def cpu_count():
class Options(Pobj):
def __init__(self,**kwargs):
self.add(**kwargs)
#end def __init__
def read(self,options):
nopts = 0
intext = False
nstart = -2
nend = -2
n = 0
for c in options:
if c=='-' and nstart!=n-1 and not intext:
prevdash=True
if nopts>0:
opt = options[nstart:n].strip()
name = opt.replace('-','').replace('=',' ').split()[0]
self[name]=opt
#end if
nopts+=1
nstart = n
elif c=='"' or c=="'":
intext=not intext
#end if
n+=1
#end for
if nopts>0:
opt = options[nstart:n].strip()
name = opt.replace('-','').replace('=',' ').split()[0]
self[name]=opt
#end if
#end def read
def add(self,**kwargs):
self.transfer_from(kwargs)
#end def add
def write(self):
s = ''
for o in self:
s += ' '+str(o)
#end for
return s
#end def write
#end class Options
class Job(Pobj):
@ -95,6 +145,9 @@ class Job(Pobj):
compiler = None,
override = None,
options = None,
app_options = None,
run_options = None,
sub_options = None,
serial = False,
days = 0,
hours = 0,
@ -105,13 +158,13 @@ class Job(Pobj):
procs = None,
processes = None,
processes_per_proc = None,
processes_per_node = None):
processes_per_node = None
):
self.directory = directory
self.subdir = sub_dir
self.app_name = app_name
self.app_command = app_command
self.app_flags = app_flags
self.app_props = app_props
self.outfile = outfile
self.errfile = errfile
@ -125,7 +178,9 @@ class Job(Pobj):
self.ppn = ppn
self.compiler = compiler
self.override = override
self.options = options
self.app_options = Options()
self.run_options = Options()
self.sub_options = Options()
self.serial = serial
self.days = days
self.hours = hours
@ -148,10 +203,22 @@ class Job(Pobj):
self.successful = False
self.finished = False
if app_flags is None:
self.app_flags = ''
if app_options != None:
self.app_options.read(app_options)
#end if
if run_options != None:
self.run_options.read(run_options)
#end if
if sub_options != None:
self.sub_options.read(sub_options)
#end if
if app_flags != None:
self.app_options.read(app_flags)
#end if
if options != None:
self.run_options.read(options)
#end if
if app_props==None:
self.app_props = []
#end if
@ -181,6 +248,8 @@ class Job(Pobj):
self.error('running batched/bundled jobs on {0} is either not possible or not yet implemented, sorry.'.format(machine.name))
#end if
#end if
self.normalize_time()
#end def __init__
@ -251,6 +320,17 @@ class Job(Pobj):
#end if
#end def set_processes
def get_time(self):
time = obj(
days = self.days,
hours = self.hours,
minutes = self.minutes,
seconds = self.seconds
)
return time
#end def get_time
def max_time(self,time):
t = time.seconds + 60*(time.minutes+60*(time.hours+24*time.days))
@ -308,23 +388,17 @@ class Job(Pobj):
if self.app_command is None:
self.error('app_command has not been provided')
#end if
spacer = ' '
if self.serial and self.processes==1:
spacer = ''
elif launcher=='mpirun':
c = self.mpirun_command()
elif launcher=='aprun':
machine = self.get_machine()
c = machine.aprun_command(self)
if launcher=='runjob':
separator = ' : '
else:
self.error(launcher+' is not yet implemented as an application launcher')
separator = ' '
#end if
options = ''
if isinstance(self.options,str):
options+=' '+self.options
if self.serial and self.processes==1:
c = ''
else:
c = launcher + self.run_options.write() + separator
#end if
c+=options
c+=spacer+self.app_command+self.app_flags
c+=self.app_command+self.app_options.write()
if redirect:
c+=' >'+self.outfile+' 2>'+self.errfile+'&'
#end if
@ -350,11 +424,6 @@ class Job(Pobj):
#end def run_command
def mpirun_command(self):
return 'mpirun -np '+str(self.processes)
#end def mpirun_command
def pbs_walltime(self):
walltime=\
str(int(self.hours )).zfill(2)+':'\
@ -367,15 +436,40 @@ class Job(Pobj):
#end def pbs_walltime
def add_option(self,opt):
if self.options is None:
self.options = opt
elif isinstance(self.options,list):
self.options.append(opt)
else:
self.options += ' '+opt
#end if
#end def add_option
def normalize_time(self):
t = self.total_seconds()
d = int(t/(24*3600))
t -= d*24*3600
h = int(t/3600)
t -= h*3600
m = int(t/60)
t -= m*60
s = int(t)
self.days = d
self.hours = h
self.minutes = m
self.seconds = s
#end def normalize_time
def total_seconds(self):
return self.seconds+60*(self.minutes+60*(self.hours+24*self.days))
#end def total_seconds
def total_minutes(self):
return int(self.total_seconds()/60)
#end def total_minutes
def total_hours(self):
return int(self.total_seconds()/3600)
#end def total_hours
def total_days(self):
return int(self.total_seconds()/(24*3600))
#end def total_days
#end class Job
@ -392,8 +486,9 @@ class Machine(Pobj):
)
mode = modes.none
batch_capable = False
requires_account = False
batch_capable = False
requires_account = False
executable_subfile = False
@staticmethod
@ -587,6 +682,8 @@ class Workstation(Machine):
#end if
job.grains = grains
job.cores = grains*self.process_granularity
job.run_options.add(np='-np '+str(self.processes))
#end def process_job
@ -930,6 +1027,34 @@ class Supercomputer(Machine):
#end if
self.process_job_extra(job)
launcher = self.app_launcher
if launcher=='mpirun':
job.run_options.add(np='-np '+str(job.processes))
elif launcher=='aprun':
if 'n' in self.aprun_options:
job.run_options.add(n='-n '+str(job.processes))
#end if
if 'd' in job.aprun_options and job.threads>1:
job.run_options.add('-d '+str(job.threads))
#end if
if 'N' in job.aprun_options and job.processes_per_node!=None:
job.run_options.add(N='-N '+str(job.processes_per_node))
#end if
if 'S' in job.aprun_options and job.processes_per_proc!=None:
job.run_options.add(S='-S '+str(job.processes_per_proc))
#end if
elif launcher=='runjob':
job.run_options.add(
np = '--np '+str(job.processes),
p = '-p '+str(job.processes_per_node),
block = '--block $COBALT_PARTNAME',
verbose = '--verbose=INFO'
)
else:
self.error(launcher+' is not yet implemented as an application launcher')
#end if
#end def process_job
@ -937,24 +1062,6 @@ class Supercomputer(Machine):
None
#end def process_job_extra
def aprun_command(self,job):
command = 'aprun'
if 'n' in self.aprun_options:
command += ' -n '+str(job.processes)
#end if
if 'd' in self.aprun_options and job.threads>1:
command += ' -d '+str(job.threads)
#end if
if 'N' in self.aprun_options and job.processes_per_node!=None:
command += ' -N '+str(job.processes_per_node)
#end if
if 'S' in self.aprun_options and job.processes_per_proc!=None:
command += ' -S '+str(job.processes_per_proc)
#end if
return command
#end def aprun_command
def query_queue(self):
@ -1045,7 +1152,7 @@ class Supercomputer(Machine):
elif not os.path.exists(job.subfile):
self.error('job submission file was not written prior to submission\n submission file: '+os.path.join(job.directory,job.subfile))
#end if
command = self.sub_launcher+' '+job.subfile
command = self.sub_launcher+job.sub_options.write()+' '+job.subfile
if self.generate_only:
self.log(pad+'Would have executed: '+command)
job.status = job.states.running
@ -1103,6 +1210,9 @@ class Supercomputer(Machine):
fobj = open(filepath,'w')
fobj.write(c)
fobj.close()
if self.executable_subfile:
os.system('chmod +x '+filepath)
#end if
#end def write_job
@ -1465,11 +1575,11 @@ class EOS(Supercomputer):
batch_capable = True
def process_job_extra(self,job):
if job.threads>1 and job.options is None:
if job.threads>1:
if job.threads<=8:
job.add_option('-ss')
job.run_options.add(ss='-ss')
#end if
job.add_option('-cc numa_node')
job.run_options.add(cc='-cc numa_node')
#end if
#end def process_job_extra
@ -1498,6 +1608,62 @@ cd $PBS_O_WORKDIR
class ALCF_Machine(Supercomputer):
requires_account = True
batch_capable = True
executable_subfile = True
base_partition = None
def process_job_extra(self,job):
job.sub_options.add(
env = '--env OMP_NUM_THREADS='+str(job.threads),
mode = '--mode script'
)
if job.nodes<self.base_partition:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} cannot be less than {1}\n you requested: {2}'.format(self.name,self.base_partition,job.nodes))
else:
partition = log(float(job.nodes)/self.base_partition)/log(2.)
if abs(partition-int(partition))>1e-6:
self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,self.base_partition,job.nodes,self.base_partition*2**int(round(partition))))
#end if
#end if
#end def process_job_extra
def write_job_header(self,job):
if job.queue is None:
job.queue = 'default'
#end if
c= '#!/bin/bash\n'
c+='#COBALT -q {0}\n'.format(job.queue)
c+='#COBALT -A {0}\n'.format(job.account)
c+='#COBALT -n {0}\n'.format(job.nodes)
c+='#COBALT -t {0}\n'.format(job.total_minutes())
c+='#COBALT -O {0}\n'.format(job.outfile)
return c
#end def write_job_header
#end class ALCF_Machine
class Vesta(ALCF_Machine):
name = 'vesta'
base_partition = 32
#end class Vesta
class Cetus(ALCF_Machine):
name = 'cetus'
base_partition = 128
#end class Cetus
class Mira(ALCF_Machine):
name = 'mira'
base_partition = 512
#end class Mira
#Known machines
# workstations
Workstation('oahu',12,'mpirun')
@ -1520,7 +1686,9 @@ BlueWatersXK( 3072, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel')
BlueWatersXE(22640, 2, 16, 64, 100, 'aprun', 'qsub', 'qstat', 'qdel')
Titan( 18688, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel')
EOS( 744, 2, 8, 64, 1000, 'aprun', 'qsub', 'qstat', 'qdel')
Vesta( 2048, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel')
Cetus( 1024, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel')
Mira( 49152, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel')
#machine accessor functions
get_machine_name = Machine.get_hostname

View File

@ -60,7 +60,7 @@ def pwscf_time(tsin):
class PwscfAnalyzer(SimulationAnalyzer):
def __init__(self,arg0=None,infile_name=None,outfile_name=None,pw2c_outfile_name=None,analyze=False,xml=False):
def __init__(self,arg0=None,infile_name=None,outfile_name=None,pw2c_outfile_name=None,analyze=False,xml=False,warn=False):
if isinstance(arg0,Simulation):
sim = arg0
path = sim.locdir
@ -77,7 +77,7 @@ class PwscfAnalyzer(SimulationAnalyzer):
self.abspath = os.path.abspath(path)
self.pw2c_outfile_name = pw2c_outfile_name
self.info = obj(xml=xml)
self.info = obj(xml=xml,warn=warn)
self.input = PwscfInput(os.path.join(self.path,self.infile_name))
@ -140,6 +140,9 @@ class PwscfAnalyzer(SimulationAnalyzer):
except Exception:
eigs = array([])
occs = array([])
if self.info.warn:
self.warn('band read failed, line: '+l)
#end if
#end try
@ -154,24 +157,24 @@ class PwscfAnalyzer(SimulationAnalyzer):
occs = occs
)
#end if
#end while
try:
seigs = ''
for j in range(i+1,i_occ):
seigs+=lines[j]
#end for
seigs = seigs.strip()
eigs = array(seigs.split(),dtype=float)
soccs = ''
for j in range(i_occ+1,i_occ+1+(i_occ-i)-2):
soccs+= lines[j]
#end for
occs = array(soccs.split(),dtype=float)
except Exception:
eigs = array([])
occs = array([])
#end try
#end if
#try:
# seigs = ''
# for j in range(i+1,i_occ):
# seigs+=lines[j]
# #end for
# seigs = seigs.strip()
# eigs = array(seigs.split(),dtype=float)
#
# soccs = ''
# for j in range(i_occ+1,i_occ+1+(i_occ-i)-2):
# soccs+= lines[j]
# #end for
# occs = array(soccs.split(),dtype=float)
#except Exception:
# eigs = array([])
# occs = array([])
##end try
if nfound==1:
bands.up = obj(