diff --git a/project_suite/library/machines.py b/project_suite/library/machines.py index 8d81d7ad8..6ea01adac 100644 --- a/project_suite/library/machines.py +++ b/project_suite/library/machines.py @@ -4,7 +4,7 @@ import time #from multiprocessing import cpu_count from socket import gethostname from subprocess import Popen,PIPE -from numpy import array,mod,floor,ceil +from numpy import array,mod,floor,ceil,round,log from generic import obj from project_base import Pobj from debug import * @@ -35,6 +35,56 @@ def cpu_count(): +class Options(Pobj): + def __init__(self,**kwargs): + self.add(**kwargs) + #end def __init__ + + + def read(self,options): + nopts = 0 + intext = False + nstart = -2 + nend = -2 + n = 0 + for c in options: + if c=='-' and nstart!=n-1 and not intext: + prevdash=True + if nopts>0: + opt = options[nstart:n].strip() + name = opt.replace('-','').replace('=',' ').split()[0] + self[name]=opt + #end if + nopts+=1 + nstart = n + elif c=='"' or c=="'": + intext=not intext + #end if + n+=1 + #end for + if nopts>0: + opt = options[nstart:n].strip() + name = opt.replace('-','').replace('=',' ').split()[0] + self[name]=opt + #end if + #end def read + + + def add(self,**kwargs): + self.transfer_from(kwargs) + #end def add + + + def write(self): + s = '' + for o in self: + s += ' '+str(o) + #end for + return s + #end def write +#end class Options + + class Job(Pobj): @@ -95,6 +145,9 @@ class Job(Pobj): compiler = None, override = None, options = None, + app_options = None, + run_options = None, + sub_options = None, serial = False, days = 0, hours = 0, @@ -105,13 +158,13 @@ class Job(Pobj): procs = None, processes = None, processes_per_proc = None, - processes_per_node = None): + processes_per_node = None + ): self.directory = directory self.subdir = sub_dir self.app_name = app_name self.app_command = app_command - self.app_flags = app_flags self.app_props = app_props self.outfile = outfile self.errfile = errfile @@ -125,7 +178,9 @@ class Job(Pobj): self.ppn = ppn self.compiler = compiler self.override = override - self.options = options + self.app_options = Options() + self.run_options = Options() + self.sub_options = Options() self.serial = serial self.days = days self.hours = hours @@ -148,10 +203,22 @@ class Job(Pobj): self.successful = False self.finished = False - - if app_flags is None: - self.app_flags = '' + if app_options != None: + self.app_options.read(app_options) #end if + if run_options != None: + self.run_options.read(run_options) + #end if + if sub_options != None: + self.sub_options.read(sub_options) + #end if + if app_flags != None: + self.app_options.read(app_flags) + #end if + if options != None: + self.run_options.read(options) + #end if + if app_props==None: self.app_props = [] #end if @@ -181,6 +248,8 @@ class Job(Pobj): self.error('running batched/bundled jobs on {0} is either not possible or not yet implemented, sorry.'.format(machine.name)) #end if #end if + + self.normalize_time() #end def __init__ @@ -251,6 +320,17 @@ class Job(Pobj): #end if #end def set_processes + + def get_time(self): + time = obj( + days = self.days, + hours = self.hours, + minutes = self.minutes, + seconds = self.seconds + ) + return time + #end def get_time + def max_time(self,time): t = time.seconds + 60*(time.minutes+60*(time.hours+24*time.days)) @@ -308,23 +388,17 @@ class Job(Pobj): if self.app_command is None: self.error('app_command has not been provided') #end if - spacer = ' ' - if self.serial and self.processes==1: - spacer = '' - elif launcher=='mpirun': - c = self.mpirun_command() - elif launcher=='aprun': - machine = self.get_machine() - c = machine.aprun_command(self) + if launcher=='runjob': + separator = ' : ' else: - self.error(launcher+' is not yet implemented as an application launcher') + separator = ' ' #end if - options = '' - if isinstance(self.options,str): - options+=' '+self.options + if self.serial and self.processes==1: + c = '' + else: + c = launcher + self.run_options.write() + separator #end if - c+=options - c+=spacer+self.app_command+self.app_flags + c+=self.app_command+self.app_options.write() if redirect: c+=' >'+self.outfile+' 2>'+self.errfile+'&' #end if @@ -350,11 +424,6 @@ class Job(Pobj): #end def run_command - def mpirun_command(self): - return 'mpirun -np '+str(self.processes) - #end def mpirun_command - - def pbs_walltime(self): walltime=\ str(int(self.hours )).zfill(2)+':'\ @@ -367,15 +436,40 @@ class Job(Pobj): #end def pbs_walltime - def add_option(self,opt): - if self.options is None: - self.options = opt - elif isinstance(self.options,list): - self.options.append(opt) - else: - self.options += ' '+opt - #end if - #end def add_option + def normalize_time(self): + t = self.total_seconds() + d = int(t/(24*3600)) + t -= d*24*3600 + h = int(t/3600) + t -= h*3600 + m = int(t/60) + t -= m*60 + s = int(t) + self.days = d + self.hours = h + self.minutes = m + self.seconds = s + #end def normalize_time + + + def total_seconds(self): + return self.seconds+60*(self.minutes+60*(self.hours+24*self.days)) + #end def total_seconds + + + def total_minutes(self): + return int(self.total_seconds()/60) + #end def total_minutes + + + def total_hours(self): + return int(self.total_seconds()/3600) + #end def total_hours + + + def total_days(self): + return int(self.total_seconds()/(24*3600)) + #end def total_days #end class Job @@ -392,8 +486,9 @@ class Machine(Pobj): ) mode = modes.none - batch_capable = False - requires_account = False + batch_capable = False + requires_account = False + executable_subfile = False @staticmethod @@ -587,6 +682,8 @@ class Workstation(Machine): #end if job.grains = grains job.cores = grains*self.process_granularity + + job.run_options.add(np='-np '+str(self.processes)) #end def process_job @@ -930,6 +1027,34 @@ class Supercomputer(Machine): #end if self.process_job_extra(job) + + + launcher = self.app_launcher + if launcher=='mpirun': + job.run_options.add(np='-np '+str(job.processes)) + elif launcher=='aprun': + if 'n' in self.aprun_options: + job.run_options.add(n='-n '+str(job.processes)) + #end if + if 'd' in job.aprun_options and job.threads>1: + job.run_options.add('-d '+str(job.threads)) + #end if + if 'N' in job.aprun_options and job.processes_per_node!=None: + job.run_options.add(N='-N '+str(job.processes_per_node)) + #end if + if 'S' in job.aprun_options and job.processes_per_proc!=None: + job.run_options.add(S='-S '+str(job.processes_per_proc)) + #end if + elif launcher=='runjob': + job.run_options.add( + np = '--np '+str(job.processes), + p = '-p '+str(job.processes_per_node), + block = '--block $COBALT_PARTNAME', + verbose = '--verbose=INFO' + ) + else: + self.error(launcher+' is not yet implemented as an application launcher') + #end if #end def process_job @@ -937,24 +1062,6 @@ class Supercomputer(Machine): None #end def process_job_extra - - def aprun_command(self,job): - command = 'aprun' - if 'n' in self.aprun_options: - command += ' -n '+str(job.processes) - #end if - if 'd' in self.aprun_options and job.threads>1: - command += ' -d '+str(job.threads) - #end if - if 'N' in self.aprun_options and job.processes_per_node!=None: - command += ' -N '+str(job.processes_per_node) - #end if - if 'S' in self.aprun_options and job.processes_per_proc!=None: - command += ' -S '+str(job.processes_per_proc) - #end if - return command - #end def aprun_command - def query_queue(self): @@ -1045,7 +1152,7 @@ class Supercomputer(Machine): elif not os.path.exists(job.subfile): self.error('job submission file was not written prior to submission\n submission file: '+os.path.join(job.directory,job.subfile)) #end if - command = self.sub_launcher+' '+job.subfile + command = self.sub_launcher+job.sub_options.write()+' '+job.subfile if self.generate_only: self.log(pad+'Would have executed: '+command) job.status = job.states.running @@ -1103,6 +1210,9 @@ class Supercomputer(Machine): fobj = open(filepath,'w') fobj.write(c) fobj.close() + if self.executable_subfile: + os.system('chmod +x '+filepath) + #end if #end def write_job @@ -1465,11 +1575,11 @@ class EOS(Supercomputer): batch_capable = True def process_job_extra(self,job): - if job.threads>1 and job.options is None: + if job.threads>1: if job.threads<=8: - job.add_option('-ss') + job.run_options.add(ss='-ss') #end if - job.add_option('-cc numa_node') + job.run_options.add(cc='-cc numa_node') #end if #end def process_job_extra @@ -1498,6 +1608,62 @@ cd $PBS_O_WORKDIR +class ALCF_Machine(Supercomputer): + requires_account = True + batch_capable = True + executable_subfile = True + + base_partition = None + + def process_job_extra(self,job): + job.sub_options.add( + env = '--env OMP_NUM_THREADS='+str(job.threads), + mode = '--mode script' + ) + if job.nodes1e-6: + self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,self.base_partition,job.nodes,self.base_partition*2**int(round(partition)))) + #end if + #end if + #end def process_job_extra + + def write_job_header(self,job): + if job.queue is None: + job.queue = 'default' + #end if + c= '#!/bin/bash\n' + c+='#COBALT -q {0}\n'.format(job.queue) + c+='#COBALT -A {0}\n'.format(job.account) + c+='#COBALT -n {0}\n'.format(job.nodes) + c+='#COBALT -t {0}\n'.format(job.total_minutes()) + c+='#COBALT -O {0}\n'.format(job.outfile) + return c + #end def write_job_header +#end class ALCF_Machine + + +class Vesta(ALCF_Machine): + name = 'vesta' + base_partition = 32 +#end class Vesta + +class Cetus(ALCF_Machine): + name = 'cetus' + base_partition = 128 +#end class Cetus + +class Mira(ALCF_Machine): + name = 'mira' + base_partition = 512 +#end class Mira + + + + + #Known machines # workstations Workstation('oahu',12,'mpirun') @@ -1520,7 +1686,9 @@ BlueWatersXK( 3072, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel') BlueWatersXE(22640, 2, 16, 64, 100, 'aprun', 'qsub', 'qstat', 'qdel') Titan( 18688, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel') EOS( 744, 2, 8, 64, 1000, 'aprun', 'qsub', 'qstat', 'qdel') - +Vesta( 2048, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel') +Cetus( 1024, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel') +Mira( 49152, 1, 16, 16, 10, 'runjob', 'qsub', 'qstat', 'qdel') #machine accessor functions get_machine_name = Machine.get_hostname diff --git a/project_suite/library/pwscf_analyzer.py b/project_suite/library/pwscf_analyzer.py index b047fc34b..12a25f9f4 100644 --- a/project_suite/library/pwscf_analyzer.py +++ b/project_suite/library/pwscf_analyzer.py @@ -60,7 +60,7 @@ def pwscf_time(tsin): class PwscfAnalyzer(SimulationAnalyzer): - def __init__(self,arg0=None,infile_name=None,outfile_name=None,pw2c_outfile_name=None,analyze=False,xml=False): + def __init__(self,arg0=None,infile_name=None,outfile_name=None,pw2c_outfile_name=None,analyze=False,xml=False,warn=False): if isinstance(arg0,Simulation): sim = arg0 path = sim.locdir @@ -77,7 +77,7 @@ class PwscfAnalyzer(SimulationAnalyzer): self.abspath = os.path.abspath(path) self.pw2c_outfile_name = pw2c_outfile_name - self.info = obj(xml=xml) + self.info = obj(xml=xml,warn=warn) self.input = PwscfInput(os.path.join(self.path,self.infile_name)) @@ -140,6 +140,9 @@ class PwscfAnalyzer(SimulationAnalyzer): except Exception: eigs = array([]) occs = array([]) + if self.info.warn: + self.warn('band read failed, line: '+l) + #end if #end try @@ -154,24 +157,24 @@ class PwscfAnalyzer(SimulationAnalyzer): occs = occs ) #end if - #end while - try: - seigs = '' - for j in range(i+1,i_occ): - seigs+=lines[j] - #end for - seigs = seigs.strip() - eigs = array(seigs.split(),dtype=float) - - soccs = '' - for j in range(i_occ+1,i_occ+1+(i_occ-i)-2): - soccs+= lines[j] - #end for - occs = array(soccs.split(),dtype=float) - except Exception: - eigs = array([]) - occs = array([]) - #end try + #end if + #try: + # seigs = '' + # for j in range(i+1,i_occ): + # seigs+=lines[j] + # #end for + # seigs = seigs.strip() + # eigs = array(seigs.split(),dtype=float) + # + # soccs = '' + # for j in range(i_occ+1,i_occ+1+(i_occ-i)-2): + # soccs+= lines[j] + # #end for + # occs = array(soccs.split(),dtype=float) + #except Exception: + # eigs = array([]) + # occs = array([]) + ##end try if nfound==1: bands.up = obj(