source: trunk/python/Rappture/queue.py @ 539

Last change on this file since 539 was 539, checked in by dkearney, 18 years ago

fixed up condor code and example so it now runs properly

File size: 22.1 KB
Line 
1# ----------------------------------------------------------------------
2#
3# ======================================================================
4#  AUTHOR:  Derrick Kearney, Purdue University
5#  Copyright (c) 2005  Purdue Research Foundation, West Lafayette, IN
6# ======================================================================
7
8import sys,os
9import re
10import time
11import shutil
12import datetime
13
14# import Rappture Related libs
15from tools import *
16
17#######################################################################
18
19# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52235
20class CallbaseError(AttributeError):
21    pass
22
23def callbase(obj, base, methodname='__init__', args=(), raiseIfMissing=None):
24    try: method = getattr(base, methodname)
25    except AttributeError:
26        if raiseIfMissing:
27            raise CallbaseError, methodname
28        return None
29    if args is None: args = ()
30    return method(obj, *args)
31
32class RpQueueError(Exception):
33    pass
34
35class queue:
36
37    def __init__(self):
38
39        # initialize the general queue variable with safe, default values
40        self._queue_vars = {}
41        self.__initQueueVars__()
42
43    def __initQueueVars__(self):
44        self._queue_vars['walltime'] = 1
45        self._queue_vars['resultsDir'] = ""
46        self._queue_vars['executable'] = ""
47        self._queue_vars['execArgs'] = ""
48        self._queue_vars['nodes'] = 1
49        self._queue_vars['ppn'] = 1
50        self._queue_vars['queue'] = ""
51        self._queue_vars['outFile'] = ""
52        self._queue_vars['errFile'] = ""
53        self._queue_vars['logFile'] = ""
54        self._queue_vars['jobName'] = ""
55
56    def walltime(self,walltime=''):
57        walltime = str(walltime)
58        if (walltime == '') or (walltime == 'None'):
59            return self._queue_vars['walltime']
60        elif re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
61            self._queue_vars['walltime'] = walltime
62        else:
63            error = "Incorrect walltime format: should be 00:00:00"
64            raise RpQueueError,error
65
66#        walltime = str(walltime)
67#        if re.match(r'^\d{2}:\d{2}:\d{2}',walltime):
68#        else:
69#            try:
70#                walltime = int(walltime)
71#            except:
72#                error = "walltime must match format"
73#        if (walltime <= 0):
74#            return self._queue_vars['walltime']
75#        else:
76#            self._queue_vars['walltime'] = walltime
77
78    def resultsDir(self,resultsDir=''):
79        resultsDir = str(resultsDir)
80        if (resultsDir == 'None') or (resultsDir == ''):
81            return self._queue_vars['resultsDir']
82        else:
83            self._queue_vars['resultsDir'] = resultsDir
84
85    def executable(self,executable=''):
86        executable = str(executable)
87        if (executable == 'None') or (executable == ''):
88            return self._queue_vars['executable']
89        else:
90            self._queue_vars['executable'] = executable
91
92    def execArgs(self,execArgs=''):
93        execArgs = str(execArgs)
94        if (execArgs == 'None') or (execArgs == ''):
95            return self._queue_vars['execArgs']
96        else:
97            self._queue_vars['execArgs'] = execArgs
98
99    def nodes(self,nodes=''):
100        if nodes == '':
101            return self._queue_vars['nodes']
102        nodes = int(nodes)
103        if nodes > 0:
104            self._queue_vars['nodes'] = nodes
105        else:
106            error = 'nodes must be a positive integer'
107            raise RpQueueError, error
108
109    def ppn(self,ppn=''):
110        if ppn == '':
111            return self._queue_vars['ppn']
112        ppn = int(ppn)
113        if ppn > 0:
114            self._queue_vars['ppn'] = ppn
115        else:
116            error = 'ppn must be a positive integer'
117            raise RpQueueError, error
118
119    def queue(self,queue=''):
120        queue = str(queue)
121        if (queue == 'None') or (queue == ''):
122            return self._queue_vars['queue']
123        else :
124            self._queue_vars['queue'] = queue
125
126    def outFile(self,outFile=''):
127        outFile = str(outFile)
128        if (outFile == 'None') or (outFile == ''):
129            return self._queue_vars['outFile']
130        else:
131            self._queue_vars['outFile'] = outFile
132
133    def errFile(self,errFile=''):
134        errFile = str(errFile)
135        if (errFile == 'None') or (errFile == ''):
136            return self._queue_vars['errFile']
137        else:
138            self._queue_vars['errFile'] = errFile
139
140    def logFile(self,logFile=''):
141        logFile = str(logFile)
142        if (logFile == 'None') or (logFile == ''):
143            return self._queue_vars['logFile']
144        else:
145            self._queue_vars['logFile'] = logFile
146
147    def jobName(self,jobName=''):
148        jobName = str(jobName)
149        if jobName == 'None' or (jobName == ''):
150            return self._queue_vars['jobName']
151        else:
152            self._queue_vars['jobName'] = jobName
153
154    def transferFiles(self,transferFiles=[]):
155        transferFiles = list(transferFiles)
156        if (len(transferFiles) == 0)  or (jobName == []):
157            return self._queue_vars['transferFiles']
158        else:
159            self._queue_vars['transferFiles'] = transferFiles
160
161    def __convertWalltime__(self):
162        pass
163
164    def submit(self):
165        pass
166
167    def __checkFiles__(self,chkFileName):
168        retVal = False
169        if not os.path.isfile(chkFileName):
170            retVal = True
171        return retVal
172
173    def status(self):
174        min = 0
175        tCount = 0
176        sleepTime = 10
177
178        while self.__checkFiles__(self.errFile()):
179            if tCount == 60 :
180                min += 1
181                tCount = 0
182
183            self._currStatus = self.getCurrentStatus()
184            if (self._currStatus != self._prevStatus):
185                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
186                sys.stdout.flush()
187                self._prevStatus = self._currStatus
188
189            time.sleep(sleepTime)
190            tCount += sleepTime
191
192class pbs(queue):
193    """the pbs class organizes the information needed to submit jobs to
194nanohub's pbs queues through Rappture"""
195
196    def __init__(   self,
197                    jobName,
198                    resultsDir,
199                    nodes,
200                    executable,
201                    execArgs='',
202                    walltime='00:01:00' ):
203
204        # call the base class's init
205        for base in self.__class__.__bases__:
206            callbase(self, base)
207
208        self._pbs_msgs = {}
209        self.__fillStatusDict__()
210
211        # initialize pbs script vars
212        self._pbs_vars = {}
213        self.__initPbsScriptVars__()
214
215        # initialize jobId for this object
216        self._jobId = ""
217
218        # set vars based on user input
219        self.resultsDir(resultsDir)
220        self.jobName(jobName)
221        self.nodes(nodes)
222        self.walltime(walltime)
223        self.executable(executable)
224        self.execArgs(execArgs)
225        nanoHUBQ = os.getenv("NANOHUB_PBS_QUEUE")
226        if nanoHUBQ is None :
227            self.queue("@vma118.punch.purdue.edu")
228        else:
229            self.queue(nanoHUBQ)
230
231    def __initPbsScriptVars__(self):
232        self._pbs_vars['cmd'] = ""
233
234    def __fillStatusDict__(self):
235        # Possible PBS Job States
236        # From qstat manpage on hamlet.rcac.purdue.edu
237        #   E -  Job is exiting after having run.
238        #   H -  Job is held.
239        #   Q -  job is queued, eligible to run or routed.
240        #   R -  job is running.
241        #   T -  job is being moved to new location.
242        #   W -  job is waiting for its execution time
243        #         (-a option) to be reached.
244        #   S -  job is suspended.
245
246        self._pbs_msgs['E'] = 'Job is exiting after having run'
247        self._pbs_msgs['H'] = 'Job is held'
248        self._pbs_msgs['Q'] = 'Simulation Queued'
249        self._pbs_msgs['R'] = 'Running Simulation'
250        self._pbs_msgs['T'] = 'Job is being moved to new location. Please Wait...'
251        self._pbs_msgs['W'] = 'Job is waiting for its execution time. Please Wait...'
252        self._pbs_msgs['S'] = 'Job is suspended'
253        self._pbs_msgs['DEFAULT'] = 'Returning Results...'
254
255    def __makePbsScript__(self):
256        # create the directory where output should be placed
257        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
258
259        # if os.path.exists(self._resultDirPath):
260        #     shutil.rmtree(self._resultDirPath)
261        # os.mkdir(self._resultDirPath)
262
263        # set the out and err files
264        if self.outFile() == '':
265            self.outFile(os.path.join(self.resultsDir(),self.jobName()+".out"))
266
267        if self.errFile() == '':
268            self.errFile(os.path.join(self.resultsDir(),self.jobName()+".err"))
269
270        # remove existing "FLAG" files if they exist
271        if os.path.isfile(self.outFile()):
272            os.remove(self.outFile())
273        if os.path.isfile(self.errFile()):
274            os.remove(self.errFile())
275
276        if self.cmd() == '':
277            # find the mpirun command
278            mpiCommand = getCommandOutput('which mpirun')
279            if mpiCommand == '':
280                # command not found?
281                error = 'mpirun command not found'
282                raise RpQueueError, error
283            else :
284                mpiCommand = mpiCommand.strip()
285
286            # check to make sure user specified an executable
287            if self._queue_vars['executable'] == '':
288                error = 'executable not set within pbs queue'
289                raise RpQueueError, error
290
291            self.cmd('%s -np %d -machinefile $PBS_NODEFILE %s %s' % \
292                (mpiCommand, self.nodes(), self.executable(), self.execArgs()))
293
294        return """#PBS -S /bin/bash
295#PBS -l nodes=%s:ppn=%s
296#PBS -l walltime=%s
297#PBS -q %s
298#PBS -o %s
299#PBS -e %s
300#PBS -mn
301#PBS -N %s
302cd $PBS_O_WORKDIR
303
304cmd="%s"
305echo == running from $HOSTNAME:
306echo $cmd
307$cmd
308
309touch %s
310""" % ( self.nodes(),
311        self.ppn(),
312        self.walltime(),
313        self.queue(),
314        self.outFile(),
315        self.errFile(),
316        self.jobName(),
317        self.cmd(),
318        self.errFile()   )
319
320    def cmd(self,cmd=''):
321        cmd = str(cmd)
322        if (cmd == 'None') or (cmd == ''):
323            return self._pbs_vars['cmd']
324        else :
325            self._pbs_vars['cmd'] = cmd
326
327    def getCurrentStatus(self):
328        retVal = self._pbs_msgs['DEFAULT']
329        if self._jobId:
330            nanoHUBQ = self.queue()
331            if nanoHUBQ != "":
332                atLocation = nanoHUBQ.find('@')
333                if atLocation > -1:
334                    pbsServer = nanoHUBQ[atLocation+1:]
335
336            if pbsServer == '':
337                cmd = "qstat -a | grep \'^ *%s\'" % (self._jobId)
338            else:
339                cmd = "qstat @%s -a | grep \'^ *%s\'" % (pbsServer,self._jobId)
340            cmdOutput = getCommandOutput(cmd)
341
342            # parse a string that should look like this:
343            # 32049.hamlet.rc kearneyd ncn      run.21557.   6104   8   8    --  24:00 R 00:00
344            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
345            # results should look like this:
346            # ['32049.hamlet.rc','kearneyd','ncn','run.21557.','6104','8','8','--','24:00','R','00:00']
347
348            if len(parseResults) > 9:
349                qstat = parseResults[9]
350                retVal = self._pbs_msgs[qstat]
351
352        return '('+self._jobId+') '+retVal
353
354    def submit(self):
355        submitFileData = self.__makePbsScript__()
356        submitFileName = self._resultDirPath+'/'+self.jobName()+'.pbs'
357        writeFile(submitFileName,submitFileData)
358
359        myCWD = os.getcwd()
360        os.chdir(self._resultDirPath)
361        cmdOutData = getCommandOutput('qsub %s' % (submitFileName))
362        os.chdir(myCWD)
363
364        self._prevStatus = ''
365        self._currStatus = ''
366
367        # a successful submission (on hamlet/lear) follows this regexp
368        if re.match(r'[0-9]+',cmdOutData):
369            self._jobId = re.search(r'[0-9]+',cmdOutData).group()
370        else:
371            error = 'Submission to PBS Queue Failed'
372            raise RpQueueError, error
373
374#    def status(self):
375#        min = 0
376#        tCount = 0
377#        sleepTime = 10
378#
379#        while self.__checkFiles__(self.errFile()):
380#            if tCount == 60 :
381#                min += 1
382#                tCount = 0
383#
384#            self._currStatus = self.getCurrentStatus()
385#            if (self._currStatus != self._prevStatus):
386#                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
387#                sys.stdout.flush()
388#                self._prevStatus = self._currStatus
389#
390#            time.sleep(sleepTime)
391#            tCount += sleepTime
392
393
394
395class condor (queue):
396
397    USE_MPI = 1
398
399    def __init__(   self,
400                    jobName,
401                    resultsDir,
402                    nodes,
403                    executable,
404                    ppn=2,
405                    execArgs='',
406                    walltime='00:01:00',
407                    transferFiles=[],
408                    flags=0             ):
409
410        # call the base class's init
411        for base in self.__class__.__bases__:
412            callbase(self, base)
413
414        self._condor_msgs = {}
415        self.__fillStatusDict__()
416
417        self._flags = flags
418        self.__setWorkingDir__()
419
420        # setup the empty process lists
421        self._processList = []
422
423        # set vars based on user input
424        self.resultsDir(resultsDir)
425        self.jobName(jobName)
426        self.nodes(nodes)
427        self.ppn(ppn)
428        self.walltime(walltime)
429        self.executable(executable)
430        self.execArgs(execArgs)
431        self.queue("tg_workq@lear")
432        self.outFile("out.$(cluster).$(process)")
433        self.errFile("err.$(cluster).$(process)")
434        self.logFile("log.$(cluster)")
435        self.transferFiles(transferFiles)
436
437    def __fillStatusDict__(self):
438        self._condor_msgs['I'] = 'Simulation Queued'
439        self._condor_msgs['H'] = 'Simulation Held'
440        self._condor_msgs['R'] = 'Running Simulation'
441        self._condor_msgs['C'] = 'Simulation Complete'
442        self._condor_msgs['X'] = 'Simulation Marked For Deletion'
443        self._condor_msgs['DEFAULT'] = 'Returning Results...'
444
445    def __setWorkingDir__(self):
446        session = os.getenv("SESSION","00000X")
447        epoch = int(time.mktime(datetime.datetime.utcnow().timetuple()))
448        self._workingdir = str(epoch) + "_" + session
449
450    def __makeCondorScript__(self):
451#        return """universe=grid
452#    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org:2120/jobmanager-pbs
453#    executable = EXECUTABLE
454#    output = out.$(cluster).$(process)
455#    error = err.$(cluster).$(process)
456#    should_transfer_files = YES
457#    when_to_transfer_output = ON_EXIT
458#    initialDir = MY_RESULT_DIR
459#    log = log.$(cluster)
460#    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(count=2)(hostcount=2)(maxWallTime=WALLTIME)
461#    globusrsl = (project=TG-ECD060000N)(maxWallTime=WALLTIME)MPIRSL(queue=tg_workq@lear)
462#    notification = never
463#    """
464
465        self._resultDirPath = os.path.join(os.getcwd(),self.resultsDir())
466
467        self._mpirsl = ""
468        if (self._flags & self.USE_MPI):
469            # host_xcount is the number of machines
470            # xcount is the number of procs per machine
471            self._mpirsl = "(jobType=mpi)(xcount=%d)(host_xcount=%d)" % \
472                            (self.ppn(), self.nodes())
473
474        (wall_hr,wall_min,wall_sec) = self.walltime().split(":")
475        walltime =  int(wall_hr)*60 + int(wall_min) + int(wall_sec)/60.00
476
477        return """universe=grid
478    gridresource = gt2 tg-gatekeeper.purdue.teragrid.org/jobmanager-pbs
479    executable = %s
480    output = %s
481    error = %s
482    should_transfer_files = YES
483    when_to_transfer_output = ON_EXIT
484    initialDir = %s
485    log = %s
486    #globusrsl = (project=TG-ECD060000N)(jobType=mpi)(xcount=2)(hostxcount=2)(maxWallTime=WALLTIME)
487    globusrsl = (project=TG-ECD060000N)(maxWallTime=%g)%s(queue=%s)
488    notification = never
489    """ % ( self.executable(),
490            self.outFile(),
491            self.errFile(),
492            self.resultsDir(),
493            self.logFile(),
494            walltime,
495            self._mpirsl,
496            self.queue()    )
497
498    def addProcess(self,argsList=[],inputFiles=[]):
499#    return """arguments = ARGUMENT resultsID.tar.gz WORKINGDIR_ID
500# transfer_input_files = INFILE,\\
501#                        APPROOT/grid.1.poly,\\
502#                        APPROOT/grid.1.node,\\
503#                        APPROOT/grid.1.ele,\\
504#                        APPROOT/grid.1.edge
505# transfer_output_files = resultsID.tar.gz
506# Queue
507# """
508        args = " ".join(argsList)
509        transInFiles = ",\\\\\n\t\t".join(inputFiles)
510        nextProcId = len(self._processList)
511        resultsTarName = "%s%d.tar.gz" % (self.jobName(),nextProcId)
512        newProcess = """arguments = %s %s %s_%d
513    transfer_input_files = %s
514    transfer_output_files = %s
515    Queue
516
517    """ % ( args,
518            resultsTarName,
519            self._workingdir,
520            nextProcId,
521            transInFiles,
522            resultsTarName  )
523        self._processList.append(newProcess)
524
525
526    def getCurrentStatus(self):
527        if self._jobId:
528            chkCmd = "condor_q | grep \'^ *%s\'" % (self._jobId)
529            cmdOutput = getCommandOutput(chkCmd)
530
531            # parse a string that should look like this:
532            # 61.0   dkearney       12/9  22:47   0+00:00:00 I  0   0.0  nanowire.sh input-
533
534            parseResults = cmdOutput.split() # re.compile(r'\W+').split(cmdOutput)
535
536            # results should look like this:
537            # ['61.0', 'dkearney', '12/9', '22:47', '0+00:00:00', 'I', '0', '0.0', 'nanowire.sh', 'input-']
538
539            retVal = ''
540
541            if len(parseResults) > 6:
542                qstat = parseResults[5]
543                retVal = self._condor_msgs[qstat]
544
545            if retVal == '':
546                retVal = self._condor_msgs['DEFAULT']
547
548        return retVal
549
550    def __checkFiles__(self):
551        numJobs = len(self._processList)
552        retVal = numJobs
553        for i in xrange(numJobs):
554            resultTarPath = self.resultsDir() + "/" + self.jobName() + str(i) + ".tar.gz"
555
556            # this might be the point of a race condition,
557            # might need to change this to a try/catch statement.
558            # check to see that the result file exists, if not raise error
559            if (not os.path.exists(resultTarPath)):
560                error = 'Condor Result file not created: %s' % (resultTarPath)
561                raise RuntimeError, error
562                sys.exit(-1)
563
564            # check to see that the result tar size is greater than 0
565            resultTarSize = os.path.getsize(resultTarPath)
566            if (resultTarSize):
567                retVal -= 1
568
569        return retVal
570
571    def status(self):
572        min = 0
573        tCount = 0
574        sleepTime = 10
575
576        while self.__checkFiles__():
577            if tCount == 60 :
578                min += 1
579                tCount = 0
580
581            self._currStatus = self.getCurrentStatus()
582            if (self._currStatus != self._prevStatus):
583                sys.stdout.write("%s: %s\n" % ( self._currStatus, time.ctime() ) )
584                sys.stdout.flush()
585                self._prevStatus = self._currStatus
586
587            time.sleep(sleepTime)
588            tCount += sleepTime
589
590    def submit(self):
591
592        if len(self._processList) == 0:
593            self.addProcess(inputFiles=self.transferFiles())
594
595        submitFileData = self.__makeCondorScript__() + "\n".join(self._processList)
596        submitFileName = self._resultDirPath+'/'+self.jobName()+'.condor'
597        writeFile(submitFileName,submitFileData)
598
599        # submit the condor job
600        myCWD = os.getcwd()
601        os.chdir(self._resultDirPath)
602        condorCmd = 'condor_submit %s 2> condor_submit.error' % (submitFileName)
603        cmdOutData = getCommandOutput(condorCmd)
604        os.chdir(myCWD)
605
606        self._prevStatus = ''
607        self._currStatus = ''
608
609        # a successful submission (on hamlet/lear) follows this regexp
610        if re.search(r'cluster ([0-9]+)',cmdOutData):
611            self._jobId = re.search(r'cluster ([0-9]+)',cmdOutData).group(1)
612        else:
613            print "cmdOutData returned :%s:" % cmdOutData
614            error = 'Submission to Condor Queue Failed'
615            # raise RpQueueError, error
616
617#######################################################################
618# main python script
619#######################################################################
620
621#######################################################################
622
623def createDir(dirName):
624    resultDirPath = os.getcwd() + "/" + dirName
625    if os.path.exists(resultDirPath):
626        shutil.rmtree(resultDirPath)
627    os.mkdir(resultDirPath)
628    return resultDirPath
629
630if __name__ == '__main__':
631
632    nodes = 5
633    walltime = '01:00:00'
634    mpiCommand = getCommandOutput('which mpirun')
635
636    if mpiCommand == '':
637        # command not found?
638        sys.exit(-1)
639    else :
640        mpiCommand = mpiCommand.strip()
641
642    sys.stdout.write("testing hello with condor\n")
643    jobName = 'helloMPITest'
644    resultsDir = createDir('4321')
645    executable = './hello.sh'
646    txFileList = ['hello']
647    shutil.copy('hello/hello.sh',resultsDir)
648    shutil.copy('hello/hello',resultsDir)
649    myCondorObj = condor(jobName,resultsDir,2,executable,ppn=1,walltime=walltime,transferFiles=txFileList,flags=condor.USE_MPI)
650    myCondorObj.submit()
651    myCondorObj.status()
652    sys.stdout.flush()
653
654    sys.stdout.write("testing hello\n")
655    jobName = 'helloRP_MPITest'
656    resultsDir = createDir('abab')
657    executable = 'helloRP'
658    driver = "driver7878.xml"
659    shutil.copy('hello/helloRP',resultsDir)
660    shutil.copy(driver,resultsDir)
661    myPBSObj =  pbs(jobName,resultsDir,nodes,executable,driver)
662    myPBSObj.submit()
663    myPBSObj.status()
664    sys.stdout.flush()
665
666
667    sys.stdout.write("testing hello\n")
668    jobName = 'helloMPITest'
669    resultsDir = createDir('1234')
670    executable = 'hello'
671    shutil.copy('hello/hello',resultsDir)
672    myPBSObj =  pbs(jobName,resultsDir,nodes,executable)
673    myPBSObj.submit()
674    myPBSObj.status()
675    sys.stdout.flush()
676
677    sys.stdout.write("testing cycle\n")
678    jobName = 'cycleMPITest'
679    resultsDir = createDir('5678')
680    cmd = '%s -np %d -machinefile $PBS_NODEFILE cycle' % (mpiCommand,nodes)
681    shutil.copy('cycle/cycle',resultsDir)
682    myPBSObj =  pbs(jobName,resultsDir,nodes,"")
683    myPBSObj.cmd(cmd)
684    myPBSObj.walltime('00:02:00')
685    myPBSObj.submit()
686    myPBSObj.status()
687    sys.stdout.flush()
688
689    # exit the program
690    sys.exit()
691
692#######################################################################
Note: See TracBrowser for help on using the repository browser.