source: trunk/lang/python/Rappture/queue.py @ 3177

Last change on this file since 3177 was 3177, checked in by mmc, 12 years ago

Updated all of the copyright notices to reference the transfer to
the new HUBzero Foundation, LLC.

File size: 21.8 KB
Line 
1# ----------------------------------------------------------------------
2#
3# ======================================================================
4#  AUTHOR:  Derrick Kearney, Purdue University
5#  Copyright (c) 2004-2012  HUBzero Foundation, LLC
6# ======================================================================
7
8import sys,os
9import re
10import time
11import shutil
12import datetime
13
14# import Rappture Related libs
15from tools import *
16
17#######################################################################
18
19# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52235
20class CallbaseError(AttributeError):
21    pass
22
23def callbase(obj, base, methodname='__init__', args=(), raiseIfMissing=None):
24    try: method = getattr(base, methodname)
25    except AttributeError:
26        if raiseIfMissing:
27            raise CallbaseError, methodname
28        return None
29    if args is None: args = ()
30    return method(obj, *args)
31
32class RpQueueError(Exception):
33    pass
34
35class queue:
36
37    def __init__(self):
38
39        # initialize the general queue variable with safe, default values
40        self._queue_vars = {}
41        self.__initQueueVars__()
42
43    def __initQueueVars__(self):
44        self._queue_vars['walltime'] = 1
45        self._queue_vars['resultsDir'] = ""
46        self._queue_vars['executable'] = ""
47        self._queue_vars['execArgs'] = ""
48        self._queue_vars['nodes'] = 1
49        self._queue_vars['ppn'] = 1
50        self._queue_vars['queue'] = ""
51        self._queue_vars['outFile'] = ""
52        self._queue_vars['errFile'] = ""
53        self._queue_vars['logFile'] = ""
54        self._queue_vars['jobName'] = ""
55
56    def walltime(self,walltime=''):
57        walltime = str(walltime)
58        if (walltime == '') or (walltime == 'None'):
59            return self._queue_vars['walltime']
60        elif re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
61            self._queue_vars['walltime'] = walltime
62        else:
63            error = "Incorrect walltime format: should be 00:00:00"
64            raise RpQueueError,error
65
66#        walltime = str(walltime)
67#        if re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
68#        else:
69#            try:
70#                walltime = int(walltime)
71#            except:
72#                error = "walltime must match format"
73#        if (walltime <= 0):
74#            return self._queue_vars['walltime']
75#        else:
76#            self._queue_vars['walltime'] = walltime
77
78    def resultsDir(self,resultsDir=''):
79        resultsDir = str(resultsDir)
80        if (resultsDir == 'None') or (resultsDir == ''):
81            return self._queue_vars['resultsDir']
82        else:
83            self._queue_vars['resultsDir'] = resultsDir
84
85    def executable(self,executable=''):
86        executable = str(executable)
87        if (executable == 'None') or (executable == ''):
88            return self._queue_vars['executable']
89        else:
90            self._queue_vars['executable'] = executable
91
92    def execArgs(self,execArgs=''):
93        execArgs = str(execArgs)
94        if (execArgs == 'None') or (execArgs == ''):
95            return self._queue_vars['execArgs']
96        else:
97            self._queue_vars['execArgs'] = execArgs
98
99    def nodes(self,nodes=''):
100        if nodes == '':
101            return self._queue_vars['nodes']
102        nodes = int(nodes)
103        if nodes > 0:
104            self._queue_vars['nodes'] = nodes
105        else:
106            error = 'nodes must be a positive integer'
107            raise RpQueueError, error
108
109    def ppn(self,ppn=''):
110        if ppn == '':
111            return self._queue_vars['ppn']
112        ppn = int(ppn)
113        if ppn > 0:
114            self._queue_vars['ppn'] = ppn
115        else:
116            error = 'ppn must be a positive integer'
117            raise RpQueueError, error
118
119    def queue(self,queue=''):
120        queue = str(queue)
121        if (queue == 'None') or (queue == ''):
122            return self._queue_vars['queue']
123        else :
124            self._queue_vars['queue'] = queue
125
126    def outFile(self,outFile=''):
127        outFile = str(outFile)
128        if (outFile == 'None') or (outFile == ''):
129            return self._queue_vars['outFile']
130        else:
131            self._queue_vars['outFile'] = outFile
132
133    def errFile(self,errFile=''):
134        errFile = str(errFile)
135        if (errFile == 'None') or (errFile == ''):
136            return self._queue_vars['errFile']
137        else:
138            self._queue_vars['errFile'] = errFile
139
140    def logFile(self,logFile=''):
141        logFile = str(logFile)
142        if (logFile == 'None') or (logFile == ''):
143            return self._queue_vars['logFile']
144        else:
145            self._queue_vars['logFile'] = logFile
146
147    def jobName(self,jobName=''):
148        jobName = str(jobName)
149        if jobName == 'None' or (jobName == ''):
150            return self._queue_vars['jobName']
151        else:
152            self._queue_vars['jobName'] = jobName
153
154    def __convertWalltime__(self):
155        pass
156
157    def submit(self):
158        pass
159
160    def __checkFiles__(self,chkFileName):
161        retVal = False
162        if not os.path.isfile(chkFileName):
163            retVal = True
164        return retVal
165
166    def status(self):
167        min = 0
168        tCount = 0
169        sleepTime = 10
170
171        while self.__checkFiles__(self.errFile()):
172            if tCount == 60 :
173                min += 1
174                tCount = 0
175
176            self._currStatus = self.getCurrentStatus()
177            if (self._currStatus != self._prevStatus):
178                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
179                sys.stdout.flush()
180                self._prevStatus = self._currStatus
181
182            time.sleep(sleepTime)
183            tCount += sleepTime
184
185class pbs(queue):
186    """the pbs class organizes the information needed to submit jobs to
187nanohub's pbs queues through Rappture"""
188
189    def __init__(   self,
190                    jobName,
191                    resultsDir,
192                    nodes,
193                    executable,
194                    execArgs='',
195                    walltime='00:01:00' ):
196
197        # call the base class's init
198        for base in self.__class__.__bases__:
199            callbase(self, base)
200
201        self._pbs_msgs = {}
202        self.__fillStatusDict__()
203
204        # initialize pbs script vars
205        self._pbs_vars = {}
206        self.__initPbsScriptVars__()
207
208        # initialize jobId for this object
209        self._jobId = ""
210
211        # set vars based on user input
212        self.resultsDir(resultsDir)
213        self.jobName(jobName)
214        self.nodes(nodes)
215        self.walltime(walltime)
216        self.executable(executable)
217        self.execArgs(execArgs)
218        nanoHUBQ = os.getenv("NANOHUB_PBS_QUEUE")
219        if nanoHUBQ is not None :
220            self.queue(nanoHUBQ)
221
222    def __initPbsScriptVars__(self):
223        self._pbs_vars['cmd'] = ""
224
225    def __fillStatusDict__(self):
226        # Possible PBS Job States
227        # From qstat manpage on hamlet.rcac.purdue.edu
228        #   E -  Job is exiting after having run.
229        #   H -  Job is held.
230        #   Q -  job is queued, eligible to run or routed.
231        #   R -  job is running.
232        #   T -  job is being moved to new location.
233        #   W -  job is waiting for its execution time
234        #         (-a option) to be reached.
235        #   S -  job is suspended.
236
237        self._pbs_msgs['E'] = 'Job is exiting after having run'
238        self._pbs_msgs['H'] = 'Job is held'
239        self._pbs_msgs['Q'] = 'Simulation Queued'
240        self._pbs_msgs['R'] = 'Running Simulation'
241        self._pbs_msgs['T'] = 'Job is being moved to new location. Please Wait...'
242        self._pbs_msgs['W'] = 'Job is waiting for its execution time. Please Wait...'
243        self._pbs_msgs['S'] = 'Job is suspended'
244        self._pbs_msgs['DEFAULT'] = 'Returning Results...'
245
246    def __makePbsScript__(self):
247        # create the directory where output should be placed
248        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
249
250        # if os.path.exists(self._resultDirPath):
251        #     shutil.rmtree(self._resultDirPath)
252        # os.mkdir(self._resultDirPath)
253
254        # set the out and err files
255        if self.outFile() == '':
256            self.outFile(os.path.join(self.resultsDir(),self.jobName()+".out"))
257
258        if self.errFile() == '':
259            self.errFile(os.path.join(self.resultsDir(),self.jobName()+".err"))
260
261        # remove existing "FLAG" files if they exist
262        if os.path.isfile(self.outFile()):
263            os.remove(self.outFile())
264        if os.path.isfile(self.errFile()):
265            os.remove(self.errFile())
266
267        if self.cmd() == '':
268            # find the mpirun command
269            mpiCommand = getCommandOutput('which mpirun')
270            if mpiCommand == '':
271                # command not found?
272                error = 'mpirun command not found'
273                raise RpQueueError, error
274            else :
275                mpiCommand = mpiCommand.strip()
276
277            # check to make sure user specified an executable
278            if self._queue_vars['executable'] == '':
279                error = 'executable not set within pbs queue'
280                raise RpQueueError, error
281
282            self.cmd('%s -np %d -machinefile $PBS_NODEFILE %s %s' % \
283                (mpiCommand, self.nodes(), self.executable(), self.execArgs()))
284
285        script = """#PBS -S /bin/bash
286#PBS -l nodes=%s:ppn=%s
287#PBS -l walltime=%s
288""" % (self.nodes(),self.ppn(),self.walltime())
289        if self.queue() :
290            script += "#PBS -q %s" % (self.queue())
291        script += """
292#PBS -o %s
293#PBS -e %s
294#PBS -mn
295#PBS -N %s
296#PBS -V
297cd $PBS_O_WORKDIR
298
299cmd="%s"
300echo == running from $HOSTNAME:
301echo $cmd
302$cmd
303
304touch %s
305""" % (self.outFile(),self.errFile(),self.jobName(),self.cmd(),self.errFile())
306
307        return script
308
309    def cmd(self,cmd=''):
310        cmd = str(cmd)
311        if (cmd == 'None') or (cmd == ''):
312            return self._pbs_vars['cmd']
313        else :
314            self._pbs_vars['cmd'] = cmd
315
316    def getCurrentStatus(self):
317        pbsServer = ''
318        nanoHUBQ = ''
319        retVal = self._pbs_msgs['DEFAULT']
320        if self._jobId:
321            nanoHUBQ = self.queue()
322            if (nanoHUBQ != "") and (nanoHUBQ is not None):
323                atLocation = nanoHUBQ.find('@')
324                if atLocation > -1:
325                    pbsServer = nanoHUBQ[atLocation+1:]
326
327            cmd = ''
328            if pbsServer == '':
329                cmd = "qstat -a | grep \'^ *%s\'" % (self._jobId)
330            else:
331                cmd = "qstat @%s -a | grep \'^ *%s\'" % (pbsServer,self._jobId)
332            cmdOutput = getCommandOutput(cmd)
333
334            # parse a string that should look like this:
335            # 32049.hamlet.rc kearneyd ncn      run.21557.   6104   8   8    --  24:00 R 00:00
336            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
337            # results should look like this:
338            # ['32049.hamlet.rc','kearneyd','ncn','run.21557.','6104','8','8','--','24:00','R','00:00']
339
340            if len(parseResults) > 9:
341                qstat = parseResults[9]
342                retVal = self._pbs_msgs[qstat]
343
344        return '('+self._jobId+') '+retVal
345
346    def submit(self):
347        submitFileData = self.__makePbsScript__()
348        submitFileName = self._resultDirPath+'/'+self.jobName()+'.pbs'
349        writeFile(submitFileName,submitFileData)
350
351        myCWD = os.getcwd()
352        os.chdir(self._resultDirPath)
353        cmdOutData = getCommandOutput('qsub %s' % (submitFileName))
354        os.chdir(myCWD)
355
356        self._prevStatus = ''
357        self._currStatus = ''
358
359        # a successful submission (on hamlet/lear) follows this regexp
360        if re.match(r'[0-9]+',cmdOutData):
361            self._jobId = re.search(r'[0-9]+',cmdOutData).group()
362        else:
363            error = 'Submission to PBS Queue Failed'
364            raise RpQueueError, error
365
366#    def status(self):
367#        min = 0
368#        tCount = 0
369#        sleepTime = 10
370#
371#        while self.__checkFiles__(self.errFile()):
372#            if tCount == 60 :
373#                min += 1
374#                tCount = 0
375#
376#            self._currStatus = self.getCurrentStatus()
377#            if (self._currStatus != self._prevStatus):
378#                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
379#                sys.stdout.flush()
380#                self._prevStatus = self._currStatus
381#
382#            time.sleep(sleepTime)
383#            tCount += sleepTime
384
385
386
387class condor (queue):
388    # this class is not working!
389
390    USE_MPI = 1
391
392    def __init__(   self,
393                    jobName,
394                    resultsDir,
395                    nodes,
396                    executable,
397                    ppn=2,
398                    execArgs='',
399                    walltime='00:01:00',
400                    flags=0             ):
401
402        # call the base class's init
403        for base in self.__class__.__bases__:
404            callbase(self, base)
405
406        self._condor_msgs = {}
407        self.__fillStatusDict__()
408
409        self._flags = flags
410        self.__setWorkingDir__()
411
412        # setup the empty process lists
413        self._processList = []
414
415        # set vars based on user input
416        self.resultsDir(resultsDir)
417        self.jobName(jobName)
418        self.nodes(nodes)
419        self.ppn(ppn)
420        self.walltime(walltime)
421        self.executable(executable)
422        self.execArgs(execArgs)
423        self.queue("tg_workq@lear")
424        self.outFile("out.$(cluster).$(process)")
425        self.errFile("err.$(cluster).$(process)")
426        self.logFile("log.$(cluster)")
427
428    def __fillStatusDict__(self):
429        self._condor_msgs['I'] = 'Simulation Queued'
430        self._condor_msgs['H'] = 'Simulation Held'
431        self._condor_msgs['R'] = 'Running Simulation'
432        self._condor_msgs['C'] = 'Simulation Complete'
433        self._condor_msgs['X'] = 'Simulation Marked For Deletion'
434        self._condor_msgs['DEFAULT'] = 'Returning Results...'
435
436    def __setWorkingDir__(self):
437        session = os.getenv("SESSION","00000X")
438        epoch = int(time.mktime(datetime.datetime.utcnow().timetuple()))
439        self._workingdir = str(epoch) + "_" + session
440
441    def __makeCondorScript__(self):
442#        return """universe=grid
443#    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org:2120/jobmanager-pbs
444#    executable = EXECUTABLE
445#    output = out.$(cluster).$(process)
446#    error = err.$(cluster).$(process)
447#    should_transfer_files = YES
448#    when_to_transfer_output = ON_EXIT
449#    initialDir = MY_RESULT_DIR
450#    log = log.$(cluster)
451#    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(count=2)(hostcount=2)(maxWallTime=WALLTIME)
452#    globusrsl = (project=TG-ECD060000N)(maxWallTime=WALLTIME)MPIRSL(queue=tg_workq@lear)
453#    notification = never
454#    """
455
456        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
457
458        self._mpirsl = ""
459        if (self._flags & self.USE_MPI):
460            # host_xcount is the number of machines
461            # xcount is the number of procs per machine
462            self._mpirsl = "(jobType=mpi)(xcount=%d)(host_xcount=%d)" % \
463                            (self.ppn(), self.nodes())
464
465        (wall_hr,wall_min,wall_sec) = self.walltime().split(":")
466        walltime =  int(wall_hr)*60 + int(wall_min) + int(wall_sec)/60.00
467
468        return """universe=grid
469    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org/jobmanager-pbs
470    executable = %s
471    output = %s
472    error = %s
473    should_transfer_files = YES
474    when_to_transfer_output = ON_EXIT
475    initialDir = %s
476    log = %s
477    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(xcount=2)(hostxcount=2)(maxWallTime=WALLTIME)
478    globusrsl = (project=TG-ECD060000N)(maxWallTime=%g)%s(queue=%s)
479    notification = never
480    """ % ( self.executable(),
481            self.outFile(),
482            self.errFile(),
483            self.resultsDir(),
484            self.logFile(),
485            walltime,
486            self._mpirsl,
487            self.queue()    )
488
489    def addProcess(self,argsList=[],inputFiles=[]):
490#    return """arguments = ARGUMENT resultsID.tar.gz WORKINGDIR_ID
491# transfer_input_files = INFILE,\\
492#                        APPROOT/grid.1.poly,\\
493#                        APPROOT/grid.1.node,\\
494#                        APPROOT/grid.1.ele,\\
495#                        APPROOT/grid.1.edge
496# transfer_output_files = resultsID.tar.gz
497# Queue
498# """
499        args = " ".join(argsList)
500        transInFiles = ",\\\\\n\t\t".join(inputFiles)
501        nextProcId = len(self._processList)
502        resultsTarName = "%s%d.tar.gz" % (self.jobName(),nextProcId)
503        newProcess = """arguments = %s %s %s_%d
504    transfer_input_files = %s
505    transfer_output_files = %s
506    Queue
507
508    """ % ( args,
509            resultsTarName,
510            self._workingdir,
511            nextProcId,
512            transInFiles,
513            resultsTarName  )
514        self._processList.append(newProcess)
515
516
517    def getCurrentStatus(self):
518        if self._jobId:
519            chkCmd = "condor_q | grep \'^ *%s\'" % (self._jobId)
520            cmdOutput = getCommandOutput(chkCmd)
521
522            # parse a string that should look like this:
523            # 61.0   dkearney       12/9  22:47   0+00:00:00 I  0   0.0  nanowire.sh input-
524
525            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
526
527            # results should look like this:
528            # ['61.0', 'dkearney', '12/9', '22:47', '0+00:00:00', 'I', '0', '0.0', 'nanowire.sh', 'input-']
529
530            retVal = ''
531
532            if len(parseResults) > 6:
533                qstat = parseResults[5]
534                retVal = self._condor_msgs[qstat]
535
536            if retVal == '':
537                retVal = self._condor_msgs['DEFAULT']
538
539        return retVal
540
541    def __checkFiles__(self):
542        numJobs = len(self._processList)
543        retVal = numJobs
544        for i in xrange(numJobs):
545            resultTarPath = self.resultsDir() + "/" + self.jobName() + str(i) + ".tar.gz"
546
547            # this might be the point of a race condition,
548            # might need to change this to a try/catch statement.
549            # check to see that the result file exists, if not raise error
550            if (not os.path.exists(resultTarPath)):
551                error = 'Condor Result file not created: %s' % (resultTarPath)
552                raise RuntimeError, error
553                sys.exit(-1)
554
555            # check to see that the result tar size is greater than 0
556            resultTarSize = os.path.getsize(resultTarPath)
557            if (resultTarSize):
558                retVal -= 1
559
560        return retVal
561
562    def status(self):
563        min = 0
564        tCount = 0
565        sleepTime = 10
566
567        while self.__checkFiles__():
568            if tCount == 60 :
569                min += 1
570                tCount = 0
571
572            self._currStatus = self.getCurrentStatus()
573            if (self._currStatus != self._prevStatus):
574                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
575                sys.stdout.flush()
576                self._prevStatus = self._currStatus
577
578            time.sleep(sleepTime)
579            tCount += sleepTime
580
581    def submit(self):
582
583        if len(self._processList) == 0:
584            self.addProcess()
585
586        submitFileData = self.__makeCondorScript__() + "\n".join(self._processList)
587        submitFileName = self._resultDirPath+'/'+self.jobName()+'.condor'
588        writeFile(submitFileName,submitFileData)
589
590        # submit the condor job
591        myCWD = os.getcwd()
592        os.chdir(self._resultDirPath)
593        condorCmd = 'condor_submit %s 2> condor_submit.error' % (submitFileName)
594        cmdOutData = getCommandOutput(condorCmd)
595        os.chdir(myCWD)
596
597        self._prevStatus = ''
598        self._currStatus = ''
599
600        # a successful submission (on hamlet/lear) follows this regexp
601        if re.search(r'cluster ([0-9]+)',cmdOutData):
602            self._jobId = re.search(r'cluster ([0-9]+)',cmdOutData).group(1)
603        else:
604            print "cmdOutData returned :%s:" % cmdOutData
605            error = 'Submission to Condor Queue Failed'
606            # raise RpQueueError, error
607
608#######################################################################
609# main python script
610#######################################################################
611
612#######################################################################
613
614def createDir(dirName):
615    resultDirPath = os.getcwd() + "/" + dirName
616    if os.path.exists(resultDirPath):
617        shutil.rmtree(resultDirPath)
618    os.mkdir(resultDirPath)
619    return resultDirPath
620
621if __name__ == '__main__':
622
623    nodes = 5
624    walltime = '01:00:00'
625    mpiCommand = getCommandOutput('which mpirun')
626
627    if mpiCommand == '':
628        # command not found?
629        sys.exit(-1)
630    else :
631        mpiCommand = mpiCommand.strip()
632
633    sys.stdout.write("testing hello with condor\n")
634    jobName = 'helloMPITest'
635    resultsDir = createDir('4321')
636    executable = './hello.sh'
637    shutil.copy('hello/hello.sh',resultsDir)
638    shutil.copy('hello/hello',resultsDir)
639    myCondorObj = condor(jobName,resultsDir,2,executable,walltime=walltime,flags=condor.USE_MPI)
640    myCondorObj.submit()
641    myCondorObj.status()
642    sys.stdout.flush()
643
644    sys.stdout.write("testing hello\n")
645    jobName = 'helloRP_MPITest'
646    resultsDir = createDir('abab')
647    executable = 'helloRP'
648    driver = "driver7878.xml"
649    shutil.copy('hello/helloRP',resultsDir)
650    shutil.copy(driver,resultsDir)
651    myPBSObj =  pbs(jobName,resultsDir,nodes,executable,driver)
652    myPBSObj.submit()
653    myPBSObj.status()
654    sys.stdout.flush()
655
656
657    sys.stdout.write("testing hello\n")
658    jobName = 'helloMPITest'
659    resultsDir = createDir('1234')
660    executable = 'hello'
661    shutil.copy('hello/hello',resultsDir)
662    myPBSObj =  pbs(jobName,resultsDir,nodes,executable)
663    myPBSObj.submit()
664    myPBSObj.status()
665    sys.stdout.flush()
666
667    sys.stdout.write("testing cycle\n")
668    jobName = 'cycleMPITest'
669    resultsDir = createDir('5678')
670    cmd = '%s -np %d -machinefile $PBS_NODEFILE cycle' % (mpiCommand,nodes)
671    shutil.copy('cycle/cycle',resultsDir)
672    myPBSObj =  pbs(jobName,resultsDir,nodes,"")
673    myPBSObj.cmd(cmd)
674    myPBSObj.walltime('00:02:00')
675    myPBSObj.submit()
676    myPBSObj.status()
677    sys.stdout.flush()
678
679    # exit the program
680    sys.exit()
681
682#######################################################################
Note: See TracBrowser for help on using the repository browser.