source: trunk/python/Rappture/queue.py @ 515

Last change on this file since 515 was 515, checked in by dkearney, 18 years ago

added units of days (d), hours (h), minutes (min), /m2.
minutes units was not added to tcl because tcl thinks it milli-in's
added a few simple tests to test day/hour/minute/second conversions
adjusted python's queue module to get condor working mostly.
adjustments to a few make files

File size: 21.1 KB
Line 
1# ----------------------------------------------------------------------
2#
3# ======================================================================
4#  AUTHOR:  Derrick Kearney, Purdue University
5#  Copyright (c) 2005  Purdue Research Foundation, West Lafayette, IN
6# ======================================================================
7
8import sys,os
9import re
10import time
11import shutil
12import datetime
13
14# import Rappture Related libs
15from tools import *
16
17#######################################################################
18
19# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52235
20class CallbaseError(AttributeError):
21    pass
22
23def callbase(obj, base, methodname='__init__', args=(), raiseIfMissing=None):
24    try: method = getattr(base, methodname)
25    except AttributeError:
26        if raiseIfMissing:
27            raise CallbaseError, methodname
28        return None
29    if args is None: args = ()
30    return method(obj, *args)
31
32class RpQueueError(Exception):
33    pass
34
35class queue:
36
37    def __init__(self):
38
39        # initialize the general queue variable with safe, default values
40        self._queue_vars = {}
41        self.__initQueueVars__()
42
43    def __initQueueVars__(self):
44        self._queue_vars['walltime'] = 1
45        self._queue_vars['resultsDir'] = ""
46        self._queue_vars['executable'] = ""
47        self._queue_vars['execArgs'] = ""
48        self._queue_vars['nodes'] = 1
49        self._queue_vars['ppn'] = 1
50        self._queue_vars['queue'] = ""
51        self._queue_vars['outFile'] = ""
52        self._queue_vars['errFile'] = ""
53        self._queue_vars['logFile'] = ""
54        self._queue_vars['jobName'] = ""
55
56    def walltime(self,walltime=''):
57        walltime = str(walltime)
58        if (walltime == '') or (walltime == 'None'):
59            return self._queue_vars['walltime']
60        elif re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
61            self._queue_vars['walltime'] = walltime
62        else:
63            error = "Incorrect walltime format: should be 00:00:00"
64            raise RpQueueError,error
65
66#        walltime = str(walltime)
67#        if re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
68#        else:
69#            try:
70#                walltime = int(walltime)
71#            except:
72#                error = "walltime must match format"
73#        if (walltime <= 0):
74#            return self._queue_vars['walltime']
75#        else:
76#            self._queue_vars['walltime'] = walltime
77
78    def resultsDir(self,resultsDir=''):
79        resultsDir = str(resultsDir)
80        if (resultsDir == 'None') or (resultsDir == ''):
81            return self._queue_vars['resultsDir']
82        else:
83            self._queue_vars['resultsDir'] = resultsDir
84
85    def executable(self,executable=''):
86        executable = str(executable)
87        if (executable == 'None') or (executable == ''):
88            return self._queue_vars['executable']
89        else:
90            self._queue_vars['executable'] = executable
91
92    def execArgs(self,execArgs=''):
93        execArgs = str(execArgs)
94        if (execArgs == 'None') or (execArgs == ''):
95            return self._queue_vars['execArgs']
96        else:
97            self._queue_vars['execArgs'] = execArgs
98
99    def nodes(self,nodes=''):
100        if nodes == '':
101            return self._queue_vars['nodes']
102        nodes = int(nodes)
103        if nodes > 0:
104            self._queue_vars['nodes'] = nodes
105        else:
106            error = 'nodes must be a positive integer'
107            raise RpQueueError, error
108
109    def ppn(self,ppn=''):
110        if ppn == '':
111            return self._queue_vars['ppn']
112        ppn = int(ppn)
113        if ppn > 0:
114            self._queue_vars['ppn'] = ppn
115        else:
116            error = 'ppn must be a positive integer'
117            raise RpQueueError, error
118
119    def queue(self,queue=''):
120        queue = str(queue)
121        if (queue == 'None') or (queue == ''):
122            return self._queue_vars['queue']
123        else :
124            self._queue_vars['queue'] = queue
125
126    def outFile(self,outFile=''):
127        outFile = str(outFile)
128        if (outFile == 'None') or (outFile == ''):
129            return self._queue_vars['outFile']
130        else:
131            self._queue_vars['outFile'] = outFile
132
133    def errFile(self,errFile=''):
134        errFile = str(errFile)
135        if (errFile == 'None') or (errFile == ''):
136            return self._queue_vars['errFile']
137        else:
138            self._queue_vars['errFile'] = errFile
139
140    def logFile(self,logFile=''):
141        logFile = str(logFile)
142        if (logFile == 'None') or (logFile == ''):
143            return self._queue_vars['logFile']
144        else:
145            self._queue_vars['logFile'] = logFile
146
147    def jobName(self,jobName=''):
148        jobName = str(jobName)
149        if jobName == 'None' or (jobName == ''):
150            return self._queue_vars['jobName']
151        else:
152            self._queue_vars['jobName'] = jobName
153
154    def __convertWalltime__(self):
155        pass
156
157    def submit(self):
158        pass
159
160    def __checkFiles__(self,chkFileName):
161        retVal = False
162        if not os.path.isfile(chkFileName):
163            retVal = True
164        return retVal
165
166    def status(self):
167        min = 0
168        tCount = 0
169        sleepTime = 10
170
171        while self.__checkFiles__(self.errFile()):
172            if tCount == 60 :
173                min += 1
174                tCount = 0
175
176            self._currStatus = self.getCurrentStatus()
177            if (self._currStatus != self._prevStatus):
178                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
179                sys.stdout.flush()
180                self._prevStatus = self._currStatus
181
182            time.sleep(sleepTime)
183            tCount += sleepTime
184
185class pbs(queue):
186    """the pbs class organizes the information needed to submit jobs to
187nanohub's pbs queues through Rappture"""
188
189    def __init__(   self,
190                    jobName,
191                    resultsDir,
192                    nodes,
193                    executable,
194                    execArgs='',
195                    walltime='00:01:00' ):
196
197        # call the base class's init
198        for base in self.__class__.__bases__:
199            callbase(self, base)
200
201        self._pbs_msgs = {}
202        self.__fillStatusDict__()
203
204        # initialize pbs script vars
205        self._pbs_vars = {}
206        self.__initPbsScriptVars__()
207
208        # initialize jobId for this object
209        self._jobId = ""
210
211        # set vars based on user input
212        self.resultsDir(resultsDir)
213        self.jobName(jobName)
214        self.nodes(nodes)
215        self.walltime(walltime)
216        self.executable(executable)
217        self.execArgs(execArgs)
218        self.queue("@vma118.punch.purdue.edu")
219
220    def __initPbsScriptVars__(self):
221        self._pbs_vars['cmd'] = ""
222
223    def __fillStatusDict__(self):
224        # Possible PBS Job States
225        # From qstat manpage on hamlet.rcac.purdue.edu
226        #   E -  Job is exiting after having run.
227        #   H -  Job is held.
228        #   Q -  job is queued, eligible to run or routed.
229        #   R -  job is running.
230        #   T -  job is being moved to new location.
231        #   W -  job is waiting for its execution time
232        #         (-a option) to be reached.
233        #   S -  job is suspended.
234
235        self._pbs_msgs['E'] = 'Job is exiting after having run'
236        self._pbs_msgs['H'] = 'Job is held'
237        self._pbs_msgs['Q'] = 'Simulation Queued'
238        self._pbs_msgs['R'] = 'Running Simulation'
239        self._pbs_msgs['T'] = 'Job is being moved to new location. Please Wait...'
240        self._pbs_msgs['W'] = 'Job is waiting for its execution time. Please Wait...'
241        self._pbs_msgs['S'] = 'Job is suspended'
242        self._pbs_msgs['DEFAULT'] = 'Returning Results...'
243
244    def __makePbsScript__(self):
245        # create the directory where output should be placed
246        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
247
248        # if os.path.exists(self._resultDirPath):
249        #     shutil.rmtree(self._resultDirPath)
250        # os.mkdir(self._resultDirPath)
251
252        # set the out and err files
253        if self.outFile() == '':
254            self.outFile(os.path.join(self.resultsDir(),self.jobName()+".out"))
255
256        if self.errFile() == '':
257            self.errFile(os.path.join(self.resultsDir(),self.jobName()+".err"))
258
259        # remove existing "FLAG" files if they exist
260        if os.path.isfile(self.outFile()):
261            os.remove(self.outFile())
262        if os.path.isfile(self.errFile()):
263            os.remove(self.errFile())
264
265        if self.cmd() == '':
266            # find the mpirun command
267            mpiCommand = getCommandOutput('which mpirun')
268            if mpiCommand == '':
269                # command not found?
270                error = 'mpirun command not found'
271                raise RpQueueError, error
272            else :
273                mpiCommand = mpiCommand.strip()
274
275            # check to make sure user specified an executable
276            if self._queue_vars['executable'] == '':
277                error = 'executable not set within pbs queue'
278                raise RpQueueError, error
279
280            self.cmd('%s -np %d -machinefile $PBS_NODEFILE %s %s' % \
281                (mpiCommand, self.nodes(), self.executable(), self.execArgs()))
282
283        return """#PBS -S /bin/bash
284#PBS -l nodes=%s:ppn=%s
285#PBS -l walltime=%s
286#PBS -q %s
287#PBS -o %s
288#PBS -e %s
289#PBS -mn
290#PBS -N %s
291cd $PBS_O_WORKDIR
292
293cmd="%s"
294echo == running from $HOSTNAME:
295echo $cmd
296$cmd
297
298touch %s
299""" % ( self.nodes(),
300        self.ppn(),
301        self.walltime(),
302        self.queue(),
303        self.outFile(),
304        self.errFile(),
305        self.jobName(),
306        self.cmd(),
307        self.errFile()   )
308
309    def cmd(self,cmd=''):
310        cmd = str(cmd)
311        if (cmd == 'None') or (cmd == ''):
312            return self._pbs_vars['cmd']
313        else :
314            self._pbs_vars['cmd'] = cmd
315
316    def getCurrentStatus(self):
317        retVal = self._pbs_msgs['DEFAULT']
318        if self._jobId:
319            cmdOutput = getCommandOutput("qstat -a | grep \'^ *%s\'" % (self._jobId))
320            # parse a string that should look like this:
321            # 32049.hamlet.rc kearneyd ncn      run.21557.   6104   8   8    --  24:00 R 00:00
322            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
323            # results should look like this:
324            # ['32049.hamlet.rc','kearneyd','ncn','run.21557.','6104','8','8','--','24:00','R','00:00']
325
326            if len(parseResults) > 9:
327                qstat = parseResults[9]
328                retVal = self._pbs_msgs[qstat]
329
330        return '('+self._jobId+') '+retVal
331
332    def submit(self):
333        submitFileData = self.__makePbsScript__()
334        submitFileName = self._resultDirPath+'/'+self.jobName()+'.pbs'
335        writeFile(submitFileName,submitFileData)
336
337        myCWD = os.getcwd()
338        os.chdir(self._resultDirPath)
339        cmdOutData = getCommandOutput('qsub %s' % (submitFileName))
340        os.chdir(myCWD)
341
342        self._prevStatus = ''
343        self._currStatus = ''
344
345        # a successful submission (on hamlet/lear) follows this regexp
346        if re.match(r'[0-9]+',cmdOutData):
347            self._jobId = re.search(r'[0-9]+',cmdOutData).group()
348        else:
349            error = 'Submission to PBS Queue Failed'
350            raise RpQueueError, error
351
352#    def status(self):
353#        min = 0
354#        tCount = 0
355#        sleepTime = 10
356#
357#        while self.__checkFiles__(self.errFile()):
358#            if tCount == 60 :
359#                min += 1
360#                tCount = 0
361#
362#            self._currStatus = self.getCurrentStatus()
363#            if (self._currStatus != self._prevStatus):
364#                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
365#                sys.stdout.flush()
366#                self._prevStatus = self._currStatus
367#
368#            time.sleep(sleepTime)
369#            tCount += sleepTime
370
371
372
373class condor (queue):
374    # this class is not working!
375
376    USE_MPI = 1
377
378    def __init__(   self,
379                    jobName,
380                    resultsDir,
381                    nodes,
382                    executable,
383                    execArgs='',
384                    walltime='00:01:00',
385                    flags=0             ):
386
387        # call the base class's init
388        for base in self.__class__.__bases__:
389            callbase(self, base)
390
391        self._condor_msgs = {}
392        self.__fillStatusDict__()
393
394        self._flags = flags
395        self.__setWorkingDir__()
396
397        # setup the empty process lists
398        self._processList = []
399
400        # set vars based on user input
401        self.resultsDir(resultsDir)
402        self.jobName(jobName)
403        self.nodes(nodes)
404        self.walltime(walltime)
405        self.executable(executable)
406        self.execArgs(execArgs)
407        self.queue("tg_workq@lear")
408        self.outFile("out.$(cluster).$(process)")
409        self.errFile("err.$(cluster).$(process)")
410        self.logFile("log.$(cluster)")
411
412    def __fillStatusDict__(self):
413        self._condor_msgs['I'] = 'Simulation Queued'
414        self._condor_msgs['H'] = 'Simulation Held'
415        self._condor_msgs['R'] = 'Running Simulation'
416        self._condor_msgs['C'] = 'Simulation Complete'
417        self._condor_msgs['X'] = 'Simulation Marked For Deletion'
418        self._condor_msgs['DEFAULT'] = 'Returning Results...'
419
420    def __setWorkingDir__(self):
421        session = os.getenv("SESSION","00000X")
422        epoch = int(time.mktime(datetime.datetime.utcnow().timetuple()))
423        self._workingdir = str(epoch) + "_" + session
424
425    def __makeCondorScript__(self):
426#        return """universe=grid
427#    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org:2120/jobmanager-pbs
428#    executable = EXECUTABLE
429#    output = out.$(cluster).$(process)
430#    error = err.$(cluster).$(process)
431#    should_transfer_files = YES
432#    when_to_transfer_output = ON_EXIT
433#    initialDir = MY_RESULT_DIR
434#    log = log.$(cluster)
435#    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(count=2)(hostcount=2)(maxWallTime=WALLTIME)
436#    globusrsl = (project=TG-ECD060000N)(maxWallTime=WALLTIME)MPIRSL(queue=tg_workq@lear)
437#    notification = never
438#    """
439
440        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
441
442        self._mpirsl = ""
443        if (self._flags & self.USE_MPI):
444            self._mpirsl = "(jobType=mpi)(count=%d)(hostcount=%d)" % \
445                            (self.nodes(), self.nodes())
446
447        (wall_hr,wall_min,wall_sec) = self.walltime().split(":")
448        walltime =  int(wall_hr)*60 + int(wall_min) + int(wall_sec)/60.00
449
450        return """universe=grid
451    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org:2120/jobmanager-pbs
452    executable = %s
453    output = %s
454    error = %s
455    should_transfer_files = YES
456    when_to_transfer_output = ON_EXIT
457    initialDir = %s
458    log = %s
459    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(count=2)(hostcount=2)(maxWallTime=WALLTIME)
460    globusrsl = (project=TG-ECD060000N)(maxWallTime=%g)%s(queue=%s)
461    notification = never
462    """ % ( self.executable(),
463            self.outFile(),
464            self.errFile(),
465            self.resultsDir(),
466            self.logFile(),
467            walltime,
468            self._mpirsl,
469            self.queue()    )
470
471    def addProcess(self,argsList=[],inputFiles=[]):
472#    return """arguments = ARGUMENT resultsID.tar.gz WORKINGDIR_ID
473# transfer_input_files = INFILE,\\
474#                        APPROOT/grid.1.poly,\\
475#                        APPROOT/grid.1.node,\\
476#                        APPROOT/grid.1.ele,\\
477#                        APPROOT/grid.1.edge
478# transfer_output_files = resultsID.tar.gz
479# Queue
480# """
481        args = " ".join(argsList)
482        transInFiles = ",\\\\\n\t\t".join(inputFiles)
483        nextProcId = len(self._processList)
484        resultsTarName = "%s%d.tar.gz" % (self.jobName(),nextProcId)
485        newProcess = """arguments = %s %s %s_%d
486    transfer_input_files = %s
487    transfer_output_files = %s
488    Queue
489
490    """ % ( args,
491            resultsTarName,
492            self._workingdir,
493            nextProcId,
494            transInFiles,
495            resultsTarName  )
496        self._processList.append(newProcess)
497
498
499    def getCurrentStatus(self):
500        if self._jobId:
501            chkCmd = "condor_q | grep \'^ *%s\'" % (self._jobId)
502            cmdOutput = getCommandOutput(chkCmd)
503
504            # parse a string that should look like this:
505            # 61.0   dkearney       12/9  22:47   0+00:00:00 I  0   0.0  nanowire.sh input-
506
507            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
508
509            # results should look like this:
510            # ['61.0', 'dkearney', '12/9', '22:47', '0+00:00:00', 'I', '0', '0.0', 'nanowire.sh', 'input-']
511
512            retVal = ''
513
514            if len(parseResults) > 6:
515                qstat = parseResults[5]
516                retVal = self._condor_msgs[qstat]
517
518            if retVal == '':
519                retVal = self._condor_msgs['DEFAULT']
520
521        return retVal
522
523    def __checkFiles__(self):
524        numJobs = len(self._processList)
525        retVal = numJobs
526        for i in xrange(numJobs):
527            resultTarPath = self.resultsDir() + "/" + self.jobName() + str(i) + ".tar.gz"
528
529            # this might be the point of a race condition,
530            # might need to change this to a try/catch statement.
531            # check to see that the result file exists, if not raise error
532            if (not os.path.exists(resultTarPath)):
533                error = 'Condor Result file not created: %s' % (resultTarPath)
534                raise RuntimeError, error
535                sys.exit(-1)
536
537            # check to see that the result tar size is greater than 0
538            resultTarSize = os.path.getsize(resultTarPath)
539            if (resultTarSize):
540                retVal -= 1
541
542        return retVal
543
544    def status(self):
545        min = 0
546        tCount = 0
547        sleepTime = 10
548
549        while self.__checkFiles__():
550            if tCount == 60 :
551                min += 1
552                tCount = 0
553
554            self._currStatus = self.getCurrentStatus()
555            if (self._currStatus != self._prevStatus):
556                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
557                sys.stdout.flush()
558                self._prevStatus = self._currStatus
559
560            time.sleep(sleepTime)
561            tCount += sleepTime
562
563    def submit(self):
564
565        if len(self._processList) == 0:
566            self.addProcess()
567
568        submitFileData = self.__makeCondorScript__() + "\n".join(self._processList)
569        submitFileName = self._resultDirPath+'/'+self.jobName()+'.condor'
570        writeFile(submitFileName,submitFileData)
571
572        # submit the condor job
573        myCWD = os.getcwd()
574        os.chdir(self._resultDirPath)
575        condorCmd = 'condor_submit %s 2> condor_submit.error' % (submitFileName)
576        cmdOutData = getCommandOutput(condorCmd)
577        os.chdir(myCWD)
578
579        self._prevStatus = ''
580        self._currStatus = ''
581
582        # a successful submission (on hamlet/lear) follows this regexp
583        if re.search(r'cluster ([0-9]+)',cmdOutData):
584            self._jobId = re.search(r'cluster ([0-9]+)',cmdOutData).group(1)
585        else:
586            print "cmdOutData returned :%s:" % cmdOutData
587            error = 'Submission to Condor Queue Failed'
588            # raise RpQueueError, error
589
590#######################################################################
591# main python script
592#######################################################################
593
594#######################################################################
595
596def createDir(dirName):
597    resultDirPath = os.getcwd() + "/" + dirName
598    if os.path.exists(resultDirPath):
599        shutil.rmtree(resultDirPath)
600    os.mkdir(resultDirPath)
601    return resultDirPath
602
603if __name__ == '__main__':
604
605    nodes = 5
606    walltime = '01:00:00'
607    mpiCommand = getCommandOutput('which mpirun')
608
609    if mpiCommand == '':
610        # command not found?
611        sys.exit(-1)
612    else :
613        mpiCommand = mpiCommand.strip()
614
615    sys.stdout.write("testing hello with condor\n")
616    jobName = 'helloMPITest'
617    resultsDir = createDir('4321')
618    executable = 'hello.sh'
619    shutil.copy('hello/hello.sh',resultsDir)
620    shutil.copy('hello/hello',resultsDir)
621    myCondorObj = condor(jobName,resultsDir,2,executable,walltime=walltime,flags=condor.USE_MPI)
622    myCondorObj.submit()
623    myCondorObj.status()
624    sys.stdout.flush()
625
626    sys.stdout.write("testing hello\n")
627    jobName = 'helloRP_MPITest'
628    resultsDir = createDir('abab')
629    executable = 'helloRP'
630    driver = "driver7878.xml"
631    shutil.copy('hello/helloRP',resultsDir)
632    shutil.copy(driver,resultsDir)
633    myPBSObj =  pbs(jobName,resultsDir,nodes,executable,driver)
634    myPBSObj.submit()
635    myPBSObj.status()
636    sys.stdout.flush()
637
638
639    sys.stdout.write("testing hello\n")
640    jobName = 'helloMPITest'
641    resultsDir = createDir('1234')
642    executable = 'hello'
643    shutil.copy('hello/hello',resultsDir)
644    myPBSObj =  pbs(jobName,resultsDir,nodes,executable)
645    myPBSObj.submit()
646    myPBSObj.status()
647    sys.stdout.flush()
648
649    sys.stdout.write("testing cycle\n")
650    jobName = 'cycleMPITest'
651    resultsDir = createDir('5678')
652    cmd = '%s -np %d -machinefile $PBS_NODEFILE cycle' % (mpiCommand,nodes)
653    shutil.copy('cycle/cycle',resultsDir)
654    myPBSObj =  pbs(jobName,resultsDir,nodes,"")
655    myPBSObj.cmd(cmd)
656    myPBSObj.walltime('00:02:00')
657    myPBSObj.submit()
658    myPBSObj.status()
659    sys.stdout.flush()
660
661    # exit the program
662    sys.exit()
663
664#######################################################################
Note: See TracBrowser for help on using the repository browser.