wiki:workflow_pegasus
Last modified 4 years ago Last modified on 02/11/11 12:55:58

For purposes of this discussion a workflow be will considered to mean the execution of several programs with interdependencies. Pegasus provides the ability to describe such a set of programs and their interdependencies and subsequently execute them in the proper order. The individual programs can be run locally or submitted to various grid infrastructures. There is some overlap in the functionality provided by Pegasus and the HUB submit infrastructure. Through the use of an example one possible combination of pegasus and submit will be presented. Additionally Rappture will be used to provide data exchange functionality in addition to the typical GUI.

Application example: CNTbands

As an example, consider the following application:
Rappture GUI
For purposes of this discussion only the combination of Structure = Carbon Nanotube and Simulation Method = Extended Huckel Theory will be used. The remaining five parameters can be set within bounds by the user. In the off the shelf version of the application the user would specify values of interest for the five parameters and click the Simulate button. Rappture would launch a process to carry out the simulation. For CNTbands the simulation consists of the execution of several small programs that exchange data through files as illustrated here
DAG
Each node (rectangle) represents a program that must be executed. The data exchange between nodes is represented by the labeled arrows. Labels with extensions indicate that a file is used for data exchange, simple labels indicate use of GUI supplied parameters.

Workflow Approach

The approach taken to reconfigure the application as a workflow is that each node should be executed as an individual Rappture simulation. Rappture provides a well defined API for data exchange with such a process. The processes can then be arbitrarily connected together to execute the overall simulation. This approach allows for extending the use of the individual simulation to additional applications and could be regarded as services. The Rappture process dictates that for each node data be collected into a driver file which in combination with an executable program is used to produce a run file.

Pegasus requires a XML file containing a workflow description. Higher level language API's are provided to programmatically generate a proper XML file. The key information to be provided for each node is the program and any required arguments, input files, output files, and prerequisite nodes. The programs can be started as shell process or as Condor jobs. Condor submission allows for simultaneous execution of multiple nodes when interdependencies allow.

The Condor local universe is used to execute programs within the HUB container. This is achieved by configuring a Condor instance within the HUB container such that communication with the outside world is cutoff. The configuration is generic and not specific to an application or user.

submit is used in the standard fashion to execute programs offsite. It is required that Rappture be installed offsite. As a test case Rappture has been installed on the Purdue clusters but a similar installation could be made at grid sites. Several of the programs in the CNTbands application also require that Octave 3.0.0 or later be available.

The relationship between the various building block components is illustrated below.
Component Architecture

Sample Scripts

Representative scripts are presented here to give an indication of how the various pieces fit together.

submitpegasus

This script builds the workflow XML file and submits a job to Pegasus. pegasus-rappture is a modified version of pegasus-status that repeatedly reports the workflow status until the job is complete. This functionality could be folded into pegasus-status as there is nothing Rappture specific about it despite what the name suggests.

#!/bin/sh

. /apps/pegasus/setup-3.0.0.sh

#set -x
#set -e

TOPDIR=`pwd`

DRIVER=$1
ID=$(md5sum ${DRIVER} | cut -d' ' -f 1)

# generate the dax
./cntbandsdax.py ${DRIVER} ${ID} > cntbands.dax

export CONDOR_CONFIG=/home/nanohub/clarksm/Condor/condor_config

# create the site catalog
cat >sites.xml <<EOF
<?xml version="1.0" encoding="UTF-8"?>
<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-3.0.xsd" version="3.0">
    <site  handle="local" arch="x86" os="LINUX">
        <grid  type="gt2" contact="localhost/jobmanager-fork" scheduler="Fork" jobtype="auxillary"/>
        <grid  type="gt2" contact="localhost/jobmanager-fork" scheduler="Fork" jobtype="compute"/>
        <head-fs>
            <scratch>
                <shared>
                    <file-server protocol="file" url="file://" mount-point="$TOPDIR/outputs"/>
                    <internal-mount-point mount-point="$TOPDIR/outputs" free-size="100G" total-size="30G"/>
                </shared>
            </scratch>
            <storage>
                <shared>
                    <file-server protocol="file" url="file://" mount-point="$TOPDIR/outputs"/>
                    <internal-mount-point mount-point="$TOPDIR/outputs" free-size="100G" total-size="30G"/>
                </shared>
            </storage>
        </head-fs>
        <replica-catalog  type="LRC" url="rlsn://dummyValue.url.edu" />
        <profile namespace="env" key="HOME" >$HOME</profile>
        <profile namespace="env" key="SESSIONDIR" >$SESSIONDIR</profile>
        <profile namespace="env" key="PEGASUS_HOME" >$PEGASUS_HOME</profile>
    </site>
</sitecatalog>
EOF

# plan and submit the  workflow
PLANOUT=$(pegasus-plan -D pegasus.user.properties=pegasusrc \
                       --sites local \
                       --dir work \
                       --output local \
                       --dax cntbands.dax \
                       --submit | grep status)

RUNDIR=$(echo ${PLANOUT} | sed -e "s/.* //")
echo ${RUNDIR}
# for times when NFS is extremely slow
until [ -d ${RUNDIR} ] ; do
   sleep 2
done

pegasus-rappture --rappture ${RUNDIR}

RUNXML=run$(date +"%s%N" | cut -c -16).xml
cp -p outputs/results_mergeio_${ID}.xml ${RUNXML}
echo '=RAPPTURE-RUN=>'${RUNXML}

Node Script

The node script is what is executed by a Condor job launched by Pegasus. The example given here is used for the program nodes. Simulation may occur locally or remotely via submit.

#!/bin/sh

nodeNames[1]=unitcell
nodeNames[2]=CNTbands
nodeNames[3]=maketube
nodeNames[4]=maketube2
nodeNames[5]=xyz2huc
nodeNames[6]=huckel
nodeNames[7]=eht_ek
nodeNames[8]=eht_dos
nodeNames[9]=plot4
nodeNames[10]=gui_output

transferFiles[1]="-i CNTbands/bin/cnbandswr_unitcell -i CNTbands/bin/unitcell"
transferFiles[2]="-i CNTbands/bin/cnbandswr_CNTbands -i CNTbands/bin/octave/CNTbands.m"
transferFiles[3]="-i CNTbands/bin/cnbandswr_maketube -i CNTbands/bin/maketube"
transferFiles[4]="-i CNTbands/bin/cnbandswr_maketube2 -i CNTbands/bin/maketube2"
transferFiles[5]=""
transferFiles[6]="-i CNTbands/bin/cnbandswr_huckel -i CNTbands/bin/huckel -i CNTbands/data"
transferFiles[7]="-i CNTbands/bin/cnbandswr_eht_ek -i CNTbands/bin/octave/eht_ek.m -i CNTbands/bin/octave/bandExchange.m"
transferFiles[8]="-i CNTbands/bin/cnbandswr_eht_dos -i CNTbands/bin/octave/eht_dos.m"
transferFiles[9]="-i CNTbands/bin/cnbandswr_plot4 -i CNTbands/bin/octave/plot4.m"
transferFiles[10]=""

#set -e
#set -x

TOP_DIR=`pwd`

export PATH=${TOP_DIR}/CNTbands/bin:$PATH

. /apps/environ/.setup
use -e -r submit
use -e -r rappture
use -e -r octave-3.0.0

NODE=$1
ID=$2
shift 2
MERGE_XML=$@

DRIVER=driver_${NODE}_${ID}.xml
RUNXML=results_${NODE}_${ID}.xml

requiresSubmit=0
requiresOctave=0
for ((i=0 ; i < ${#nodeNames[@]} ; i++ )) ; do
   if [ "${NODE}" == "${nodeNames[$i]}" ] ; then
      transfers=${transferFiles[$i]}
      if [ ${#transfers} -gt 0 ] ; then
         requiresSubmit=1
         if [ `grep -c octave <<< ${transfers}` -gt 0 ] ; then
            requiresOctave=1
         fi
      fi
   fi
done

echo "mergerunxml.tcl ${NODE} ${MERGE_XML}"
MERGEXML=$(mergerunxml.tcl ${NODE} ${MERGE_XML})
MERGEXML=${MERGEXML#*=RAPPTURE-RUN=>}
echo "mv ${MERGEXML} ${DRIVER}"
mv ${MERGEXML} ${DRIVER}

if [ ${requiresSubmit} -eq 1 ] ; then
   if [ ${requiresOctave} -eq 1 ] ; then
      submit -v coates -w 10 ${transfers} \
            simsim-20101214_octave-3.2.4 -tool ${DRIVER} \
                                         -values current \
                                         -runfile ${RUNXML}
   else
      submit -v coates -w 10 ${transfers} \
            simsim-20101214 -tool ${DRIVER} \
                            -values current \
                            -runfile ${RUNXML}
   fi
else
   if [ ${requiresOctave} -eq 1 ] ; then
      export OCTAVE_LOADPATH=:${TOP_DIR}/CNTbands/bin/octave:$OCTAVE_LOADPATH
   fi
   simsim -tool ${DRIVER} \
          -values current \
          -runfile ${RUNXML}
fi
RC=$?

exit $RC

mergerunxml.tcl

This script is used to combine data from previous program runs into a Rappture driver file. What values to pull from what file are specified via command line arguments. As an example the command to build the driver file for the unitcell node is mergerunxml.tcl unitcell 3f15588a8ec7c0d8f3ce5439496ba308 results_gui_input_3f15588a8ec7c0d8f3ce5439496ba308.xml:CarbonTypeN,CarbonTypeM,CenterToCenter. The ID parameter was obtained by hashing the CNTbands driver file. This is not necessary but is start towards possibly using the Pegasus catalog system to retrieve and reuse previously generated results. The script combines a fixed header file with outputs from one or more run files and produces a driver file. It is assumed that parameters are uniquely identified in all files. This allows for a very simple path specification during the get and put operations.

#!/usr/bin/env tclsh
# ----------------------------------------------------------------------
package require Rappture

set installdir   [file dirname [info script]]
set toolBasePath [file join $installdir .. rappture tool_base.xml]

# open the XML file containing the run parameters
set opt [Rappture::library $toolBasePath]

set nodeName [lindex $argv 0]
set toolCommand [$opt get tool.command]
regsub "_NODE" $toolCommand "_$nodeName" toolCommand
$opt put tool.command $toolCommand

foreach {arg} [lrange $argv 1 end] {
   foreach {fileName fragments} [split $arg ":"] {
      puts "$fileName"
      set tempLibrary [Rappture::library $fileName]
      foreach fragment [split $fragments ","] {
         puts "$fragment"
         set tempLibraryPath [$tempLibrary element -as component output.($fragment)]
         $opt copy input.$tempLibraryPath from $tempLibrary output.$tempLibraryPath
      }
   }
}

Rappture::result $opt 0
exit 0

GUI view

During a simulation workflow progress is reported as simple text.


Intermediate workflow progress

Upon completion results are rendered in the standard manner.


Results

Attachments