source: trunk/python/Rappture/queue.py @ 829

Last change on this file since 829 was 665, checked in by dkearney, 17 years ago

Updates to Rappture::Utils::progress for all languages
removed the dependancy on Rappture.Units from within number.py, it should only depend on Rappture which will include Rappture.Units
added Rappture.Units as a module to load when people import Rappture in python.
added -V pbs variable to queue.py to include qsub environment variables in the submitted job.
updated setup.py.in to install Rappture.Utils
added progress bar to all app-fermi examples showing how to use the Rappture::Utils::progress function in all languages.
added destructor definitions to Node classes in src2/core

File size: 21.8 KB
Line 
1# ----------------------------------------------------------------------
2#
3# ======================================================================
4#  AUTHOR:  Derrick Kearney, Purdue University
5#  Copyright (c) 2005  Purdue Research Foundation, West Lafayette, IN
6# ======================================================================
7
8import sys,os
9import re
10import time
11import shutil
12import datetime
13
14# import Rappture Related libs
15from tools import *
16
17#######################################################################
18
19# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52235
20class CallbaseError(AttributeError):
21    pass
22
23def callbase(obj, base, methodname='__init__', args=(), raiseIfMissing=None):
24    try: method = getattr(base, methodname)
25    except AttributeError:
26        if raiseIfMissing:
27            raise CallbaseError, methodname
28        return None
29    if args is None: args = ()
30    return method(obj, *args)
31
32class RpQueueError(Exception):
33    pass
34
35class queue:
36
37    def __init__(self):
38
39        # initialize the general queue variable with safe, default values
40        self._queue_vars = {}
41        self.__initQueueVars__()
42
43    def __initQueueVars__(self):
44        self._queue_vars['walltime'] = 1
45        self._queue_vars['resultsDir'] = ""
46        self._queue_vars['executable'] = ""
47        self._queue_vars['execArgs'] = ""
48        self._queue_vars['nodes'] = 1
49        self._queue_vars['ppn'] = 1
50        self._queue_vars['queue'] = ""
51        self._queue_vars['outFile'] = ""
52        self._queue_vars['errFile'] = ""
53        self._queue_vars['logFile'] = ""
54        self._queue_vars['jobName'] = ""
55
56    def walltime(self,walltime=''):
57        walltime = str(walltime)
58        if (walltime == '') or (walltime == 'None'):
59            return self._queue_vars['walltime']
60        elif re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
61            self._queue_vars['walltime'] = walltime
62        else:
63            error = "Incorrect walltime format: should be 00:00:00"
64            raise RpQueueError,error
65
66#        walltime = str(walltime)
67#        if re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
68#        else:
69#            try:
70#                walltime = int(walltime)
71#            except:
72#                error = "walltime must match format"
73#        if (walltime <= 0):
74#            return self._queue_vars['walltime']
75#        else:
76#            self._queue_vars['walltime'] = walltime
77
78    def resultsDir(self,resultsDir=''):
79        resultsDir = str(resultsDir)
80        if (resultsDir == 'None') or (resultsDir == ''):
81            return self._queue_vars['resultsDir']
82        else:
83            self._queue_vars['resultsDir'] = resultsDir
84
85    def executable(self,executable=''):
86        executable = str(executable)
87        if (executable == 'None') or (executable == ''):
88            return self._queue_vars['executable']
89        else:
90            self._queue_vars['executable'] = executable
91
92    def execArgs(self,execArgs=''):
93        execArgs = str(execArgs)
94        if (execArgs == 'None') or (execArgs == ''):
95            return self._queue_vars['execArgs']
96        else:
97            self._queue_vars['execArgs'] = execArgs
98
99    def nodes(self,nodes=''):
100        if nodes == '':
101            return self._queue_vars['nodes']
102        nodes = int(nodes)
103        if nodes > 0:
104            self._queue_vars['nodes'] = nodes
105        else:
106            error = 'nodes must be a positive integer'
107            raise RpQueueError, error
108
109    def ppn(self,ppn=''):
110        if ppn == '':
111            return self._queue_vars['ppn']
112        ppn = int(ppn)
113        if ppn > 0:
114            self._queue_vars['ppn'] = ppn
115        else:
116            error = 'ppn must be a positive integer'
117            raise RpQueueError, error
118
119    def queue(self,queue=''):
120        queue = str(queue)
121        if (queue == 'None') or (queue == ''):
122            return self._queue_vars['queue']
123        else :
124            self._queue_vars['queue'] = queue
125
126    def outFile(self,outFile=''):
127        outFile = str(outFile)
128        if (outFile == 'None') or (outFile == ''):
129            return self._queue_vars['outFile']
130        else:
131            self._queue_vars['outFile'] = outFile
132
133    def errFile(self,errFile=''):
134        errFile = str(errFile)
135        if (errFile == 'None') or (errFile == ''):
136            return self._queue_vars['errFile']
137        else:
138            self._queue_vars['errFile'] = errFile
139
140    def logFile(self,logFile=''):
141        logFile = str(logFile)
142        if (logFile == 'None') or (logFile == ''):
143            return self._queue_vars['logFile']
144        else:
145            self._queue_vars['logFile'] = logFile
146
147    def jobName(self,jobName=''):
148        jobName = str(jobName)
149        if jobName == 'None' or (jobName == ''):
150            return self._queue_vars['jobName']
151        else:
152            self._queue_vars['jobName'] = jobName
153
154    def __convertWalltime__(self):
155        pass
156
157    def submit(self):
158        pass
159
160    def __checkFiles__(self,chkFileName):
161        retVal = False
162        if not os.path.isfile(chkFileName):
163            retVal = True
164        return retVal
165
166    def status(self):
167        min = 0
168        tCount = 0
169        sleepTime = 10
170
171        while self.__checkFiles__(self.errFile()):
172            if tCount == 60 :
173                min += 1
174                tCount = 0
175
176            self._currStatus = self.getCurrentStatus()
177            if (self._currStatus != self._prevStatus):
178                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
179                sys.stdout.flush()
180                self._prevStatus = self._currStatus
181
182            time.sleep(sleepTime)
183            tCount += sleepTime
184
185class pbs(queue):
186    """the pbs class organizes the information needed to submit jobs to
187nanohub's pbs queues through Rappture"""
188
189    def __init__(   self,
190                    jobName,
191                    resultsDir,
192                    nodes,
193                    executable,
194                    execArgs='',
195                    walltime='00:01:00' ):
196
197        # call the base class's init
198        for base in self.__class__.__bases__:
199            callbase(self, base)
200
201        self._pbs_msgs = {}
202        self.__fillStatusDict__()
203
204        # initialize pbs script vars
205        self._pbs_vars = {}
206        self.__initPbsScriptVars__()
207
208        # initialize jobId for this object
209        self._jobId = ""
210
211        # set vars based on user input
212        self.resultsDir(resultsDir)
213        self.jobName(jobName)
214        self.nodes(nodes)
215        self.walltime(walltime)
216        self.executable(executable)
217        self.execArgs(execArgs)
218        nanoHUBQ = os.getenv("NANOHUB_PBS_QUEUE")
219        if nanoHUBQ is not None :
220            self.queue(nanoHUBQ)
221
222    def __initPbsScriptVars__(self):
223        self._pbs_vars['cmd'] = ""
224
225    def __fillStatusDict__(self):
226        # Possible PBS Job States
227        # From qstat manpage on hamlet.rcac.purdue.edu
228        #   E -  Job is exiting after having run.
229        #   H -  Job is held.
230        #   Q -  job is queued, eligible to run or routed.
231        #   R -  job is running.
232        #   T -  job is being moved to new location.
233        #   W -  job is waiting for its execution time
234        #         (-a option) to be reached.
235        #   S -  job is suspended.
236
237        self._pbs_msgs['E'] = 'Job is exiting after having run'
238        self._pbs_msgs['H'] = 'Job is held'
239        self._pbs_msgs['Q'] = 'Simulation Queued'
240        self._pbs_msgs['R'] = 'Running Simulation'
241        self._pbs_msgs['T'] = 'Job is being moved to new location. Please Wait...'
242        self._pbs_msgs['W'] = 'Job is waiting for its execution time. Please Wait...'
243        self._pbs_msgs['S'] = 'Job is suspended'
244        self._pbs_msgs['DEFAULT'] = 'Returning Results...'
245
246    def __makePbsScript__(self):
247        # create the directory where output should be placed
248        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
249
250        # if os.path.exists(self._resultDirPath):
251        #     shutil.rmtree(self._resultDirPath)
252        # os.mkdir(self._resultDirPath)
253
254        # set the out and err files
255        if self.outFile() == '':
256            self.outFile(os.path.join(self.resultsDir(),self.jobName()+".out"))
257
258        if self.errFile() == '':
259            self.errFile(os.path.join(self.resultsDir(),self.jobName()+".err"))
260
261        # remove existing "FLAG" files if they exist
262        if os.path.isfile(self.outFile()):
263            os.remove(self.outFile())
264        if os.path.isfile(self.errFile()):
265            os.remove(self.errFile())
266
267        if self.cmd() == '':
268            # find the mpirun command
269            mpiCommand = getCommandOutput('which mpirun')
270            if mpiCommand == '':
271                # command not found?
272                error = 'mpirun command not found'
273                raise RpQueueError, error
274            else :
275                mpiCommand = mpiCommand.strip()
276
277            # check to make sure user specified an executable
278            if self._queue_vars['executable'] == '':
279                error = 'executable not set within pbs queue'
280                raise RpQueueError, error
281
282            self.cmd('%s -np %d -machinefile $PBS_NODEFILE %s %s' % \
283                (mpiCommand, self.nodes(), self.executable(), self.execArgs()))
284
285        script = """#PBS -S /bin/bash
286#PBS -l nodes=%s:ppn=%s
287#PBS -l walltime=%s
288""" % (self.nodes(),self.ppn(),self.walltime())
289        if self.queue() :
290            script += "#PBS -q %s" % (self.queue())
291        script += """
292#PBS -o %s
293#PBS -e %s
294#PBS -mn
295#PBS -N %s
296#PBS -V
297cd $PBS_O_WORKDIR
298
299cmd="%s"
300echo == running from $HOSTNAME:
301echo $cmd
302$cmd
303
304touch %s
305""" % (self.outFile(),self.errFile(),self.jobName(),self.cmd(),self.errFile())
306
307        return script
308
309    def cmd(self,cmd=''):
310        cmd = str(cmd)
311        if (cmd == 'None') or (cmd == ''):
312            return self._pbs_vars['cmd']
313        else :
314            self._pbs_vars['cmd'] = cmd
315
316    def getCurrentStatus(self):
317        pbsServer = ''
318        nanoHUBQ = ''
319        retVal = self._pbs_msgs['DEFAULT']
320        if self._jobId:
321            nanoHUBQ = self.queue()
322            if (nanoHUBQ != "") and (nanoHUBQ is not None):
323                atLocation = nanoHUBQ.find('@')
324                if atLocation > -1:
325                    pbsServer = nanoHUBQ[atLocation+1:]
326
327            cmd = ''
328            if pbsServer == '':
329                cmd = "qstat -a | grep \'^ *%s\'" % (self._jobId)
330            else:
331                cmd = "qstat @%s -a | grep \'^ *%s\'" % (pbsServer,self._jobId)
332            cmdOutput = getCommandOutput(cmd)
333
334            # parse a string that should look like this:
335            # 32049.hamlet.rc kearneyd ncn      run.21557.   6104   8   8    --  24:00 R 00:00
336            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
337            # results should look like this:
338            # ['32049.hamlet.rc','kearneyd','ncn','run.21557.','6104','8','8','--','24:00','R','00:00']
339
340            if len(parseResults) > 9:
341                qstat = parseResults[9]
342                retVal = self._pbs_msgs[qstat]
343
344        return '('+self._jobId+') '+retVal
345
346    def submit(self):
347        submitFileData = self.__makePbsScript__()
348        submitFileName = self._resultDirPath+'/'+self.jobName()+'.pbs'
349        writeFile(submitFileName,submitFileData)
350
351        myCWD = os.getcwd()
352        os.chdir(self._resultDirPath)
353        cmdOutData = getCommandOutput('qsub %s' % (submitFileName))
354        os.chdir(myCWD)
355
356        self._prevStatus = ''
357        self._currStatus = ''
358
359        # a successful submission (on hamlet/lear) follows this regexp
360        if re.match(r'[0-9]+',cmdOutData):
361            self._jobId = re.search(r'[0-9]+',cmdOutData).group()
362        else:
363            error = 'Submission to PBS Queue Failed'
364            raise RpQueueError, error
365
366#    def status(self):
367#        min = 0
368#        tCount = 0
369#        sleepTime = 10
370#
371#        while self.__checkFiles__(self.errFile()):
372#            if tCount == 60 :
373#                min += 1
374#                tCount = 0
375#
376#            self._currStatus = self.getCurrentStatus()
377#            if (self._currStatus != self._prevStatus):
378#                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
379#                sys.stdout.flush()
380#                self._prevStatus = self._currStatus
381#
382#            time.sleep(sleepTime)
383#            tCount += sleepTime
384
385
386
387class condor (queue):
388    # this class is not working!
389
390    USE_MPI = 1
391
392    def __init__(   self,
393                    jobName,
394                    resultsDir,
395                    nodes,
396                    executable,
397                    ppn=2,
398                    execArgs='',
399                    walltime='00:01:00',
400                    flags=0             ):
401
402        # call the base class's init
403        for base in self.__class__.__bases__:
404            callbase(self, base)
405
406        self._condor_msgs = {}
407        self.__fillStatusDict__()
408
409        self._flags = flags
410        self.__setWorkingDir__()
411
412        # setup the empty process lists
413        self._processList = []
414
415        # set vars based on user input
416        self.resultsDir(resultsDir)
417        self.jobName(jobName)
418        self.nodes(nodes)
419        self.ppn(ppn)
420        self.walltime(walltime)
421        self.executable(executable)
422        self.execArgs(execArgs)
423        self.queue("tg_workq@lear")
424        self.outFile("out.$(cluster).$(process)")
425        self.errFile("err.$(cluster).$(process)")
426        self.logFile("log.$(cluster)")
427
428    def __fillStatusDict__(self):
429        self._condor_msgs['I'] = 'Simulation Queued'
430        self._condor_msgs['H'] = 'Simulation Held'
431        self._condor_msgs['R'] = 'Running Simulation'
432        self._condor_msgs['C'] = 'Simulation Complete'
433        self._condor_msgs['X'] = 'Simulation Marked For Deletion'
434        self._condor_msgs['DEFAULT'] = 'Returning Results...'
435
436    def __setWorkingDir__(self):
437        session = os.getenv("SESSION","00000X")
438        epoch = int(time.mktime(datetime.datetime.utcnow().timetuple()))
439        self._workingdir = str(epoch) + "_" + session
440
441    def __makeCondorScript__(self):
442#        return """universe=grid
443#    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org:2120/jobmanager-pbs
444#    executable = EXECUTABLE
445#    output = out.$(cluster).$(process)
446#    error = err.$(cluster).$(process)
447#    should_transfer_files = YES
448#    when_to_transfer_output = ON_EXIT
449#    initialDir = MY_RESULT_DIR
450#    log = log.$(cluster)
451#    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(count=2)(hostcount=2)(maxWallTime=WALLTIME)
452#    globusrsl = (project=TG-ECD060000N)(maxWallTime=WALLTIME)MPIRSL(queue=tg_workq@lear)
453#    notification = never
454#    """
455
456        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
457
458        self._mpirsl = ""
459        if (self._flags & self.USE_MPI):
460            # host_xcount is the number of machines
461            # xcount is the number of procs per machine
462            self._mpirsl = "(jobType=mpi)(xcount=%d)(host_xcount=%d)" % \
463                            (self.ppn(), self.nodes())
464
465        (wall_hr,wall_min,wall_sec) = self.walltime().split(":")
466        walltime =  int(wall_hr)*60 + int(wall_min) + int(wall_sec)/60.00
467
468        return """universe=grid
469    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org/jobmanager-pbs
470    executable = %s
471    output = %s
472    error = %s
473    should_transfer_files = YES
474    when_to_transfer_output = ON_EXIT
475    initialDir = %s
476    log = %s
477    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(xcount=2)(hostxcount=2)(maxWallTime=WALLTIME)
478    globusrsl = (project=TG-ECD060000N)(maxWallTime=%g)%s(queue=%s)
479    notification = never
480    """ % ( self.executable(),
481            self.outFile(),
482            self.errFile(),
483            self.resultsDir(),
484            self.logFile(),
485            walltime,
486            self._mpirsl,
487            self.queue()    )
488
489    def addProcess(self,argsList=[],inputFiles=[]):
490#    return """arguments = ARGUMENT resultsID.tar.gz WORKINGDIR_ID
491# transfer_input_files = INFILE,\\
492#                        APPROOT/grid.1.poly,\\
493#                        APPROOT/grid.1.node,\\
494#                        APPROOT/grid.1.ele,\\
495#                        APPROOT/grid.1.edge
496# transfer_output_files = resultsID.tar.gz
497# Queue
498# """
499        args = " ".join(argsList)
500        transInFiles = ",\\\\\n\t\t".join(inputFiles)
501        nextProcId = len(self._processList)
502        resultsTarName = "%s%d.tar.gz" % (self.jobName(),nextProcId)
503        newProcess = """arguments = %s %s %s_%d
504    transfer_input_files = %s
505    transfer_output_files = %s
506    Queue
507
508    """ % ( args,
509            resultsTarName,
510            self._workingdir,
511            nextProcId,
512            transInFiles,
513            resultsTarName  )
514        self._processList.append(newProcess)
515
516
517    def getCurrentStatus(self):
518        if self._jobId:
519            chkCmd = "condor_q | grep \'^ *%s\'" % (self._jobId)
520            cmdOutput = getCommandOutput(chkCmd)
521
522            # parse a string that should look like this:
523            # 61.0   dkearney       12/9  22:47   0+00:00:00 I  0   0.0  nanowire.sh input-
524
525            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
526
527            # results should look like this:
528            # ['61.0', 'dkearney', '12/9', '22:47', '0+00:00:00', 'I', '0', '0.0', 'nanowire.sh', 'input-']
529
530            retVal = ''
531
532            if len(parseResults) > 6:
533                qstat = parseResults[5]
534                retVal = self._condor_msgs[qstat]
535
536            if retVal == '':
537                retVal = self._condor_msgs['DEFAULT']
538
539        return retVal
540
541    def __checkFiles__(self):
542        numJobs = len(self._processList)
543        retVal = numJobs
544        for i in xrange(numJobs):
545            resultTarPath = self.resultsDir() + "/" + self.jobName() + str(i) + ".tar.gz"
546
547            # this might be the point of a race condition,
548            # might need to change this to a try/catch statement.
549            # check to see that the result file exists, if not raise error
550            if (not os.path.exists(resultTarPath)):
551                error = 'Condor Result file not created: %s' % (resultTarPath)
552                raise RuntimeError, error
553                sys.exit(-1)
554
555            # check to see that the result tar size is greater than 0
556            resultTarSize = os.path.getsize(resultTarPath)
557            if (resultTarSize):
558                retVal -= 1
559
560        return retVal
561
562    def status(self):
563        min = 0
564        tCount = 0
565        sleepTime = 10
566
567        while self.__checkFiles__():
568            if tCount == 60 :
569                min += 1
570                tCount = 0
571
572            self._currStatus = self.getCurrentStatus()
573            if (self._currStatus != self._prevStatus):
574                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
575                sys.stdout.flush()
576                self._prevStatus = self._currStatus
577
578            time.sleep(sleepTime)
579            tCount += sleepTime
580
581    def submit(self):
582
583        if len(self._processList) == 0:
584            self.addProcess()
585
586        submitFileData = self.__makeCondorScript__() + "\n".join(self._processList)
587        submitFileName = self._resultDirPath+'/'+self.jobName()+'.condor'
588        writeFile(submitFileName,submitFileData)
589
590        # submit the condor job
591        myCWD = os.getcwd()
592        os.chdir(self._resultDirPath)
593        condorCmd = 'condor_submit %s 2> condor_submit.error' % (submitFileName)
594        cmdOutData = getCommandOutput(condorCmd)
595        os.chdir(myCWD)
596
597        self._prevStatus = ''
598        self._currStatus = ''
599
600        # a successful submission (on hamlet/lear) follows this regexp
601        if re.search(r'cluster ([0-9]+)',cmdOutData):
602            self._jobId = re.search(r'cluster ([0-9]+)',cmdOutData).group(1)
603        else:
604            print "cmdOutData returned :%s:" % cmdOutData
605            error = 'Submission to Condor Queue Failed'
606            # raise RpQueueError, error
607
608#######################################################################
609# main python script
610#######################################################################
611
612#######################################################################
613
614def createDir(dirName):
615    resultDirPath = os.getcwd() + "/" + dirName
616    if os.path.exists(resultDirPath):
617        shutil.rmtree(resultDirPath)
618    os.mkdir(resultDirPath)
619    return resultDirPath
620
621if __name__ == '__main__':
622
623    nodes = 5
624    walltime = '01:00:00'
625    mpiCommand = getCommandOutput('which mpirun')
626
627    if mpiCommand == '':
628        # command not found?
629        sys.exit(-1)
630    else :
631        mpiCommand = mpiCommand.strip()
632
633    sys.stdout.write("testing hello with condor\n")
634    jobName = 'helloMPITest'
635    resultsDir = createDir('4321')
636    executable = './hello.sh'
637    shutil.copy('hello/hello.sh',resultsDir)
638    shutil.copy('hello/hello',resultsDir)
639    myCondorObj = condor(jobName,resultsDir,2,executable,walltime=walltime,flags=condor.USE_MPI)
640    myCondorObj.submit()
641    myCondorObj.status()
642    sys.stdout.flush()
643
644    sys.stdout.write("testing hello\n")
645    jobName = 'helloRP_MPITest'
646    resultsDir = createDir('abab')
647    executable = 'helloRP'
648    driver = "driver7878.xml"
649    shutil.copy('hello/helloRP',resultsDir)
650    shutil.copy(driver,resultsDir)
651    myPBSObj =  pbs(jobName,resultsDir,nodes,executable,driver)
652    myPBSObj.submit()
653    myPBSObj.status()
654    sys.stdout.flush()
655
656
657    sys.stdout.write("testing hello\n")
658    jobName = 'helloMPITest'
659    resultsDir = createDir('1234')
660    executable = 'hello'
661    shutil.copy('hello/hello',resultsDir)
662    myPBSObj =  pbs(jobName,resultsDir,nodes,executable)
663    myPBSObj.submit()
664    myPBSObj.status()
665    sys.stdout.flush()
666
667    sys.stdout.write("testing cycle\n")
668    jobName = 'cycleMPITest'
669    resultsDir = createDir('5678')
670    cmd = '%s -np %d -machinefile $PBS_NODEFILE cycle' % (mpiCommand,nodes)
671    shutil.copy('cycle/cycle',resultsDir)
672    myPBSObj =  pbs(jobName,resultsDir,nodes,"")
673    myPBSObj.cmd(cmd)
674    myPBSObj.walltime('00:02:00')
675    myPBSObj.submit()
676    myPBSObj.status()
677    sys.stdout.flush()
678
679    # exit the program
680    sys.exit()
681
682#######################################################################
Note: See TracBrowser for help on using the repository browser.