Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Example

Christopher Dunn edited this page Nov 29, 2015 · 3 revisions
#!/usr/bin/env python2.7
from pypeflow.controller import PypeThreadWorkflow
from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
from pypeflow.task import PypeTask, PypeThreadTaskBase
import os, sys, logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)

def system(call, check=False):
    log.debug('$(%s)' %repr(call))
    rc = os.system(call)
    msg = "Call %r returned %d." % (call, rc)
    if rc:
        log.warning(msg)
        if check:
            raise Exception(msg)
    else:
        log.debug(msg)
    return rc

def create_sleeper(t, id):
    foo = makePypeLocalFile(os.path.join('.', 'foo-{}'.format(id)))

    @PypeTask(inputs = [],
              outputs =  {'name': foo},
              TaskType = PypeThreadTaskBase,
              URL = 'task://localhost/func/{}'.format(id))
    def func(self):
        log.debug('Sleeping for {} seconds.'.format(t))
        time.sleep(t)
        system('touch %s' % fn(self.name))
    return func

def create(n, t):
    for i in xrange(n):
        yield create_sleeper(t, i)

def main():
    wf = PypeThreadWorkflow()
    wf.addTasks(list(create(n=40, t=1)))
    wf.refreshTargets(updateFreq=1, exitOnFailure=True)

main()
Clone this wiki locally