Source code for paws.core.operations.optools

"""
Various tools for working with Workflows and Operations
"""

import glob
import copy
from collections import Iterator
from collections import OrderedDict

from . import Operation as opmod 

[docs]class FileSystemIterator(Iterator): def __init__(self,dirpath,regex,include_existing_files=True): self.dirpath = dirpath self.rx = regex self.paths_done = [] if not include_existing_files: self.paths_done = glob.glob(self.dirpath+'/'+self.rx) super(FileSystemIterator,self).__init__()
[docs] def next(self): batch_list = glob.glob(self.dirpath+'/'+self.rx) for path in batch_list: if not path in self.paths_done: self.paths_done.append(path) return path return None
[docs]class ExecutionError(Exception): def __init__(self,msg): super(ExecutionError,self).__init__(self,msg)
[docs]def get_uri_from_dict(uri,d): keys = uri.split('.') itm = d for k in keys: if not isinstance(itm,dict): msg = 'something in {} is not a dict'.format(uri) raise KeyError(msg) if not k in itm.keys(): msg = 'did not find uri {} in dict'.format(uri) raise KeyError(msg) else: itm = itm[k] return itm
[docs]def dict_contains_uri(uri,d): keys = uri.split('.') itm = d for k in keys: if not k in itm.keys(): return False else: itm = itm[k] return True