Source code for paws.core.workflows.WfManager
from __future__ import print_function
from multiprocessing import Process,Pool
from collections import OrderedDict
import copy
import traceback
import time
from ..operations import optools
from ..operations.OpManager import OpManager
from ..plugins.PluginManager import PluginManager
from .Workflow import Workflow
from ..operations import Operation as opmod
from ..operations.Operation import Operation
from .. import pawstools
[docs]class WfManager(object):
"""
Manager for paws Workflows.
Stores a list of Workflow objects,
performs operations on them.
Keeps a reference to a PluginManager
for access to PawsPlugins.
"""
def __init__(self,op_manager=None,plugin_manager=None):
"""Initialize a workflow manager.
Parameters
----------
op_manager : OpManager (optional)
an operations manager (paws.core.operations.OpManager.OpManager)-
if not provided, a default OpManager will be created.
plugin_manager : PluginManager (optional)
a plugins manager (paws.core.plugins.PluginManager.PluginManager)-
if not provided, a default PluginManager will be created.
"""
super(WfManager,self).__init__()
if not op_manager:
op_manager = OpManager()
if not plugin_manager:
plugin_manager = PluginManager()
self.op_manager = op_manager
self.plugin_manager = plugin_manager
self.workflows = OrderedDict()
# dict of workflow clones for executing across threads:
self.wf_clones = OrderedDict()
# dict of bools to keep track of who is at work:
self.wf_running = OrderedDict()
self.message_callback = print
self.wf_threads = OrderedDict()
[docs] def add_workflow(self,wf_name):
"""Name and add a workflow.
If `wf_name` is not unique (i.e. a workflow with that name already exists),
this method will overwrite the existing workflow with a new one.
Parameters
----------
wf_name : str
name to give to the new Workflow
Returns
-------
wf : Workflow
a reference to the new Workflow
"""
wf = Workflow()
if not wf.is_tag_valid(wf_name):
raise pawstools.WfNameError(wf.tag_error_message(wf_name))
wf.message_callback = self.message_callback
self.workflows[wf_name] = wf
self.wf_running[wf_name] = False
return wf
[docs] def n_workflows(self):
"""Return the current number of Workflows"""
return len(self.workflows)
[docs] def run_workflow(self,wf_name,pool=None):
"""Execute the workflow indicated by `wf_name`"""
wf = self.workflows[wf_name]
self.message_callback('preparing workflow {} for execution'.format(wf_name))
stk,diag = wf.execution_stack()
self.prepare_wf(wf,stk)
if pool is None:
self.wf_running[wf_name] = True
wf.run()
self.message_callback('execution finished')
else:
wf_clone = wf.build_clone()
wf_clone.message_callback = wf.message_callback
wf_clone.data_callback = wf.set_item
# TODO: how do we know when a cloned wf is finished?
# need to implement a finished_callback?
# or is there something in the Process that can help?
#wf_clone.wfFinished.connect( partial(self.stop_workflow,wf_name) )
for op_name,op in wf_clone.operations.items():
op.message_callback = wf.message_callback
op.data_callback = partial( wf.set_op_item,op_name )
self.wf_clones[wf_name] = wf_clone
# temporary for debugging:
wf_clone.run()
self.message_callback('execution finished')
#wf_proc = Process(target=wf_clone.run,name=wf_name)
#pool[wf_name] = wf_proc
[docs] def stop_workflow(self,wf_name):
"""Stop the workflow indicated by `wf_name`"""
self.message_callback('stopping workflow {}'.format(wf_name))
if wf_name in self.wf_clones.keys():
wf = self.wf_clones.pop(wf_name)
wf.stop()
else:
wf = self.workflows[wf_name]
wf.stop()
self.wf_running[wf_name] = False
[docs] def prepare_wf(self,wf,stk):
"""
For all of the operations in stack stk,
load all inputs that are not workflow items.
"""
for lst in stk:
for op_tag in lst:
op = wf.get_data_from_uri(op_tag)
for inpname,il in op.input_locator.items():
if il.tp not in [opmod.runtime_type,opmod.workflow_item]:
# NOTE 1: runtime inputs should be set directly, without using il.val.
# this is because, when saving a workflow, il.val gets serialized.
# NOTE 2: workflow_item inputs should be set later, during execution.
wf.set_op_item(op_tag,'inputs.'+inpname,self.locate_input(il))
[docs] def load_workflow(self,wf_name,wf_setup_dict):
"""Load a workflow from a dict that specifies its parameters.
If `wf_name` is not unique, self.workflows[wf_name] is overwritten.
Parameters
----------
wf_name : str
name to be given to the new workflow
wf_setup_dict : dict
dict specifying workflow setup
"""
self.add_workflow(wf_name)
wfins = wf_setup_dict.pop('WORKFLOW_INPUTS')
wfouts = wf_setup_dict.pop('WORKFLOW_OUTPUTS')
opflags = wf_setup_dict.pop('OP_ENABLE_FLAGS')
for op_tag, op_setup_dict in wf_setup_dict.items():
self.workflows[wf_name].load_operation(op_tag,op_setup_dict,self.op_manager)
if not opflags[op_tag]:
self.workflows[wf_name].disable_op(op_tag)
for inpname,inpval in wfins.items():
self.workflows[wf_name].connect_input(inpname,inpval)
for outname,outval in wfouts.items():
self.workflows[wf_name].connect_output(outname,outval)