Example Project¶
Basics¶
The signac-flow package is designed to operate with a scheduler. We submit operations to the scheduler by generating a script, which contains the commands to execute these.
Important
The following example is designed to teach how to use a FlowProject. To implement an actual project, it is usually better to fork the signac-project-template instead of starting from scratch.
For the sake of example, let’s make the following assumptions:
- We operate on a signac data space spanned by parameters
a
andb
.- We initialize the data space in an
init.py
script by creating aninit.txt
file for each job.- After initialization we want to execute a job-operation on a cluster using the
foo
program which requires the name of the input file and a parametera
as arguments.- The output of
foo
will be stored in a file calledout.txt
.- The
foo
program may be parallelized with MPI on up tob
ranks.- Finally, we want to label our jobs, so that we can assess the project’s overall state.
Initialization¶
To get started we implement an initialization routine:
# init.py
import signac
project = signac.init_project('MyProject')
for a in range(10):
for b in range(10):
with project.open_job(dict(a=a, b=b)) as job:
with open('init.txt', 'w') as file:
# Some initialization routine
Workflow logic¶
Then we will implement the workflow logic in a module called project.py
by specializing a FlowProject
:
# project.py
from math import ceil
from flow import FlowProject
from flow import JobOperation
class MyProject(FlowProject):
# The classification of our job workspace is simple:
def classify(self, job):
if job.isfile('init.txt'):
yield 'initialized'
if job.isfile('out.txt'):
yield 'processed'
# There is only one operation in this project:
def next_operation(self, job):
labels = set(self.classify(job))
# We can't run any operation if the job has not been properly initialized.
if 'initialized' not in labels:
return
# If 'processed' is not among the labels, we return the command that
# will execute the operation.
if 'processed' not in labels:
return JobOperation('process', job, 'foo -a {} init.txt > out.txt'.format(job.sp.a))
# The FlowProject will automatically gather and bundle operations that are eligible
# for execution. The user needs to specify how exactly these bundled operations
# are executed on the cluster.
def submit_user(self, env, _id, operations, **kwargs):
# First we define a helper function to determine the # of processors
# required for a specific operation
def np(op):
return op.job.sp.b if op.name == 'process' else 1
# We use this function to determine the total number of required processors.
np_total = sum(map(op, operations))
# Then we calculate the # of required nodes based on the
# processors per node(ppn) provided during submission.
nn = ceil(np_total / ppn)
# We start writing the job submission script by creating a JobScript instance
# from the environment (env). The JobScript class is a thin io.StringIO wrapper,
# essentially a in-memory text file.
#
# The JobScript instance contains a environment specific header and the exact
# arguments required depend on the specific environment, but most environments
# require a name (_id), the number of nodes (nn) and processors per node (ppn).
sscript = env.script(_id, nn=nn, ppn=ppn)
# Then we write the command for each operation to the script, assuming that we
# want to execute the operation from within the job's workspace.
#
# The `write_cmd()` method will write the command to the script, wrapping it with
# the environment specific mpi execution command if np is larger than one and making
# sure that the command is executed in the background if bg is True.
for op in operations:
sscript.writeline('cd {}'.format(op.job.workspace()))
sscript.write_cmd(op.cmd, np=np(op), bg=True)
# Finally, we want to wait until all processes have finished before exiting the script.
sscript.writeline('wait')
Workflow Execution¶
We can initialize the project’s data space by executing the init.py
script:
$ python init.py
To print the status of the project to screen, execute:
>>> project = MyProject()
>>> project.print_status(detailed=True, params=('a',))
Status project 'test-project':
Total # of jobs: 10
label progress
------------ --------------------------------------------------
initialized |########################################| 100.00%
processed |##########################--------------| 66.67%
Detailed view:
job_id S next_job a labels
-------------------------------- - -------- - -------------------------
108ef78ec381244447a108f931fe80db U 1 initialized, processed
be01a9fd6b3044cf12c4a83ee9612f84 U 2 initialized, processed
32764c28ef130baefebeba76a158ac4e U process 3 initialized
# ...
>>>
We can submit the operations to the cluster by executing the project.submit()
function.
In many cases it is useful to create a separate submit.py
script, which allows to submit
operations directly from the command line, for example like this:
# submit.py
import argparse
from flow import get_environment
from project import MyProject
project = MyProject.get_project()
env = get_environment()
parser = argparser.ArgumentParser()
MyProject.add_submit_arguments(parser)
args = parser.parse_args()
project.submit(env, **vars(args))
This means we can execute the submission like this:
$ python submit.py
To get help concerning possible command line options, use the --help
argument:
$ python submit.py --help