Commit 00e5f8f3 authored by Anton Pershin's avatar Anton Pershin

Fixed numerous bugs due to the introduction of transfer functions, selectors etc.

parent dc6980a9
...@@ -202,12 +202,6 @@ def create_file_mkdir(filepath): ...@@ -202,12 +202,6 @@ def create_file_mkdir(filepath):
os.makedirs(dirpath) os.makedirs(dirpath)
return open(filepath, 'w') return open(filepath, 'w')
def get_templates_path():
'''
Returns the absolute path to templates directory. It is useful when the module is imported from elsewhere.
'''
return os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')
def merge_dicts(*dict_args): def merge_dicts(*dict_args):
''' '''
Given any number of dicts, shallow copy and merge into a new dict, Given any number of dicts, shallow copy and merge into a new dict,
...@@ -454,3 +448,8 @@ def load_from_json(cls: Type[JsonSerializable], path_to_jsons: str = 'jsons') -> ...@@ -454,3 +448,8 @@ def load_from_json(cls: Type[JsonSerializable], path_to_jsons: str = 'jsons') ->
with open(filename, 'r') as f: with open(filename, 'r') as f:
obj_as_dict = json.load(f) obj_as_dict = json.load(f)
return cls.from_json(obj_as_dict) return cls.from_json(obj_as_dict)
def print_msg_if_allowed(msg, allow=False):
if allow:
print(msg)
import comsdk.comaux as aux
from comsdk.communication import CommunicationError
from comsdk.graph import Func
from mako.template import Template
import os import os
import posixpath import posixpath
import pickle import pickle
from typing import Sequence, Tuple, Optional
import logging
import json
from mako.template import Template
import comsdk.comaux as aux
from comsdk.communication import CommunicationError
from comsdk.graph import Func, State
dummy_predicate = Func(func=lambda d: True)
dummy_morphism = Func()
job_finished_predicate = Func(func= lambda d: d['job_finished'])
job_unfinished_predicate = Func(func= lambda d: not d['job_finished'])
class InOutMapping(object): class InOutMapping(object):
...@@ -18,6 +28,14 @@ class InOutMapping(object): ...@@ -18,6 +28,14 @@ class InOutMapping(object):
self._relative_keys = relative_keys if aux.is_sequence(relative_keys) else (relative_keys,) self._relative_keys = relative_keys if aux.is_sequence(relative_keys) else (relative_keys,)
self._keys_mapping = keys_mapping self._keys_mapping = keys_mapping
def __str__(self):
return 'Default relative key: {}\n' \
'Relative keys:\n{}\n' \
'Keys mapping:\n\tLocal -> Global\n\t----------------\n' \
'{}'.format('.'.join(self._default_relative_key),
'\n'.join(['\t' + '.'.join(k) for k in self._relative_keys]),
'\n'.join(['\t' + loc + ' -> ' + '.'.join(glo) for loc, glo in self._keys_mapping]))
def build_proxy_data(self, data, dynamic_keys_mapping={}): def build_proxy_data(self, data, dynamic_keys_mapping={}):
if self._default_relative_key == () and self._relative_keys == () and self._keys_mapping == {} and dynamic_keys_mapping == {}: if self._default_relative_key == () and self._relative_keys == () and self._keys_mapping == {} and dynamic_keys_mapping == {}:
return data return data
...@@ -25,7 +43,8 @@ class InOutMapping(object): ...@@ -25,7 +43,8 @@ class InOutMapping(object):
#print('\t{}\n\t{}\n\t{}'.format(self._relative_keys, self._keys_mapping, dynamic_keys_mapping)) #print('\t{}\n\t{}\n\t{}'.format(self._relative_keys, self._keys_mapping, dynamic_keys_mapping))
return aux.ProxyDict(data, self._relative_keys, dict(self._keys_mapping, **dynamic_keys_mapping), self._default_relative_key) return aux.ProxyDict(data, self._relative_keys, dict(self._keys_mapping, **dynamic_keys_mapping), self._default_relative_key)
class Edge(object):
class Edge:
__slots__ = [ __slots__ = [
'pred_f', 'pred_f',
'morph_f', 'morph_f',
...@@ -33,12 +52,15 @@ class Edge(object): ...@@ -33,12 +52,15 @@ class Edge(object):
'preprocess', 'preprocess',
'postprocess', 'postprocess',
'order', 'order',
'comment' 'comment',
'mandatory_keys'
] ]
def __init__(self, predicate, morphism, def __init__(self, predicate, morphism,
io_mapping=InOutMapping(), io_mapping=InOutMapping(),
order=0, order=0,
comment="" comment="",
mandatory_keys=(),
): ):
self.pred_f = predicate self.pred_f = predicate
self.morph_f = morphism self.morph_f = morphism
...@@ -47,33 +69,29 @@ class Edge(object): ...@@ -47,33 +69,29 @@ class Edge(object):
self.postprocess = lambda pd: None self.postprocess = lambda pd: None
self.order = int(0 if order is None else order) self.order = int(0 if order is None else order)
self.comment = comment self.comment = comment
self.mandatory_keys = mandatory_keys
def predicate(self, data, dynamic_keys_mapping={}): def predicate(self, data, dynamic_keys_mapping={}):
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping) proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
return self.pred_f.func(proxy_data) return self.pred_f.func(proxy_data)
def morph(self, data, dynamic_keys_mapping={}): def morph(self, data, dynamic_keys_mapping={}):
# print(self.pred_name, self.morph_name, self.order) #print(self.pred_name, self.morph_name, self.order)
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping) proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
# print(proxy_data) # print(proxy_data)
self.preprocess(data) self.preprocess(data)
self._throw_if_not_set(proxy_data, self.mandatory_keys)
self.morph_f.func(proxy_data) self.morph_f.func(proxy_data)
self.postprocess(data) self.postprocess(data)
def _throw_if_not_set(self, data, mandatory_keys: Sequence[str]):
# class DummyEdge(Edge): for k in mandatory_keys:
# def __init__(self): if k not in data:
# super().__init__(None, None) logging.exception('EDGE {}: key "{}" is not set whilst being mandatory.\nIOMapping:\n'
# '{}'.format(type(self).__name__, k, str(self._io_mapping)))
# def predicate(self, data, dynamic_keys_mapping={}): raise KeyError()
# return True # raise KeyError('EDGE {}: key "{}" is not set whilst being mandatory.\nIOMapping:\n'
# # '{}'.format(type(self).__name__, k, str(self._io_mapping)))
# def morph(self, data, dynamic_keys_mapping={}):
# self.preprocess(data)
# self.postprocess(data)
def DummyEdge():
return Edge(Func(), Func())
class ExecutableProgramEdge(Edge): class ExecutableProgramEdge(Edge):
''' '''
...@@ -98,7 +116,7 @@ class ExecutableProgramEdge(Edge): ...@@ -98,7 +116,7 @@ class ExecutableProgramEdge(Edge):
Finally, data must be somehow updated after finishing. This will be done by updating data according to output_dict (it is just added) Finally, data must be somehow updated after finishing. This will be done by updating data according to output_dict (it is just added)
''' '''
def __init__(self, program_name, comm, def __init__(self, program_name, comm,
predicate=None, predicate=dummy_predicate,
io_mapping=InOutMapping(), io_mapping=InOutMapping(),
output_dict={}, # output dict which will be added to the main dictionary (w.r.t. output_keys_mapping) output_dict={}, # output dict which will be added to the main dictionary (w.r.t. output_keys_mapping)
keyword_names=(), # "local keys" where keyword args are stored keyword_names=(), # "local keys" where keyword args are stored
...@@ -107,7 +125,7 @@ class ExecutableProgramEdge(Edge): ...@@ -107,7 +125,7 @@ class ExecutableProgramEdge(Edge):
remote=False, remote=False,
stdout_processor=None, stdout_processor=None,
): ):
predicate = predicate if predicate is not None else dummy_predicate #predicate = predicate if predicate is not None else dummy_predicate
self._output_dict = output_dict self._output_dict = output_dict
self._comm = comm self._comm = comm
self._program_name = program_name self._program_name = program_name
...@@ -115,8 +133,9 @@ class ExecutableProgramEdge(Edge): ...@@ -115,8 +133,9 @@ class ExecutableProgramEdge(Edge):
self._flag_names = flag_names self._flag_names = flag_names
self._trailing_args_keys = trailing_args_keys self._trailing_args_keys = trailing_args_keys
self._working_dir_key = '__REMOTE_WORKING_DIR__' if remote else '__WORKING_DIR__' self._working_dir_key = '__REMOTE_WORKING_DIR__' if remote else '__WORKING_DIR__'
mandatory_keys = [self._working_dir_key]
self._stdout_processor = stdout_processor self._stdout_processor = stdout_processor
super().__init__(predicate, self.execute, io_mapping) super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
def execute(self, data): def execute(self, data):
args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys) args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
...@@ -128,6 +147,7 @@ class ExecutableProgramEdge(Edge): ...@@ -128,6 +147,7 @@ class ExecutableProgramEdge(Edge):
data.update(stdout_data) data.update(stdout_data)
data.update(output_data) data.update(output_data)
class QsubScriptEdge(Edge): class QsubScriptEdge(Edge):
''' '''
Class implementing the edge which builds up the sh-script for qsub. Class implementing the edge which builds up the sh-script for qsub.
...@@ -144,20 +164,21 @@ class QsubScriptEdge(Edge): ...@@ -144,20 +164,21 @@ class QsubScriptEdge(Edge):
Data will be augmented by 'qsub_script' pointing to the local file. Data will be augmented by 'qsub_script' pointing to the local file.
''' '''
def __init__(self, program_name, local_comm, remote_comm, def __init__(self, program_name, local_comm, remote_comm,
predicate=None, predicate=dummy_predicate,
io_mapping=InOutMapping(), io_mapping=InOutMapping(),
keyword_names=(), # "local keys" where keyword args are stored keyword_names=(), # "local keys" where keyword args are stored
flag_names=(), # "local keys" where flags are stored flag_names=(), # "local keys" where flags are stored
trailing_args_keys=(), # "local keys" where trailing args are stored trailing_args_keys=(), # "local keys" where trailing args are stored
): ):
predicate = predicate if predicate is not None else dummy_predicate # predicate = predicate if predicate is not None else dummy_predicate
self._local_comm = local_comm self._local_comm = local_comm
self._remote_comm = remote_comm self._remote_comm = remote_comm
self._program_name = program_name self._program_name = program_name
self._keyword_names = keyword_names self._keyword_names = keyword_names
self._flag_names = flag_names self._flag_names = flag_names
self._trailing_args_keys = trailing_args_keys self._trailing_args_keys = trailing_args_keys
super().__init__(predicate, self.execute, io_mapping) mandatory_keys = ['__WORKING_DIR__', 'qsub_script_name', 'time_required', 'cores_required']
super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
def execute(self, data): def execute(self, data):
if isinstance(data, aux.ProxyDict): if isinstance(data, aux.ProxyDict):
...@@ -170,6 +191,7 @@ class QsubScriptEdge(Edge): ...@@ -170,6 +191,7 @@ class QsubScriptEdge(Edge):
data['cores_required'], data['time_required'], (command_line,)) data['cores_required'], data['time_required'], (command_line,))
data.update({'qsub_script': qsub_script_path}) data.update({'qsub_script': qsub_script_path})
class UploadOnRemoteEdge(Edge): class UploadOnRemoteEdge(Edge):
''' '''
Class implementing the edge which uploads the data to the remote computer. Class implementing the edge which uploads the data to the remote computer.
...@@ -188,22 +210,25 @@ class UploadOnRemoteEdge(Edge): ...@@ -188,22 +210,25 @@ class UploadOnRemoteEdge(Edge):
After edge execution, data is going to be updated such that local paths will be replaced by remote ones. After edge execution, data is going to be updated such that local paths will be replaced by remote ones.
''' '''
def __init__(self, comm, def __init__(self, comm,
predicate=None, predicate=dummy_predicate,
io_mapping=InOutMapping(), io_mapping=InOutMapping(),
local_paths_keys=(), # "local keys", needed to build a copy list local_paths_keys=(), # "local keys", needed to build a copy list
update_paths=True, update_paths=True,
already_remote_path_key=None, already_remote_path_key=None,
): ):
predicate = predicate if predicate is not None else dummy_predicate # predicate = predicate if predicate is not None else dummy_predicate
self._local_paths_keys = local_paths_keys self._local_paths_keys = local_paths_keys
self._comm = comm self._comm = comm
self._update_paths = update_paths self._update_paths = update_paths
self._already_remote_path_key = already_remote_path_key self._already_remote_path_key = already_remote_path_key
super().__init__(predicate, self.execute, io_mapping) mandatory_keys = list(self._local_paths_keys) + ['__WORKING_DIR__',
'__REMOTE_WORKING_DIR__', 'qsub_script_name', 'time_required',
'cores_required']
if self._already_remote_path_key is not None:
mandatory_keys.append(self._already_remote_path_key)
super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
def execute(self, data): def execute(self, data):
# print(data)
# print(data['c_field_path'])
if self._already_remote_path_key is not None: if self._already_remote_path_key is not None:
if data[self._already_remote_path_key]: if data[self._already_remote_path_key]:
return return
...@@ -217,10 +242,12 @@ class UploadOnRemoteEdge(Edge): ...@@ -217,10 +242,12 @@ class UploadOnRemoteEdge(Edge):
working_dir = data['__WORKING_DIR__'] working_dir = data['__WORKING_DIR__']
if isinstance(data, aux.ProxyDict): if isinstance(data, aux.ProxyDict):
print('UploadOnRemoteEdge -> {}: {}'.format(key, data._keys_mappings[key])) print('UploadOnRemoteEdge -> {}: {}'.format(key, data._keys_mappings[key]))
remote_path = self._comm.copy(os.path.join(working_dir, data[key]), remote_working_dir, mode='from_local') remote_path = self._comm.copy(os.path.join(working_dir, data[key]), remote_working_dir,
mode='from_local')
if self._update_paths: if self._update_paths:
data[key] = remote_path data[key] = remote_path
class DownloadFromRemoteEdge(Edge): class DownloadFromRemoteEdge(Edge):
''' '''
Class implementing the edge which downloads the data from the remote computer. Class implementing the edge which downloads the data from the remote computer.
...@@ -241,16 +268,19 @@ class DownloadFromRemoteEdge(Edge): ...@@ -241,16 +268,19 @@ class DownloadFromRemoteEdge(Edge):
After edge execution, data is going to be updated such that remote/relative paths will be replaced by local ones. After edge execution, data is going to be updated such that remote/relative paths will be replaced by local ones.
''' '''
def __init__(self, comm, def __init__(self, comm,
predicate=None, predicate=dummy_predicate,
io_mapping=InOutMapping(), io_mapping=InOutMapping(),
remote_paths_keys=(), # "local keys", needed to build a list for downloading remote_paths_keys=(), # "local keys", needed to build a list for downloading
update_paths=True, update_paths=True,
show_msg=False,
): ):
predicate = predicate if predicate is not None else dummy_predicate # predicate = predicate if predicate is not None else dummy_predicate
self._remote_paths_keys = remote_paths_keys self._remote_paths_keys = remote_paths_keys
self._comm = comm self._comm = comm
self._update_paths = update_paths self._update_paths = update_paths
super().__init__(predicate, self.execute, io_mapping) self._show_msg = show_msg
mandatory_keys = list(self._remote_paths_keys) + ['__WORKING_DIR__', '__REMOTE_WORKING_DIR__']
super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
def execute(self, data): def execute(self, data):
working_dir = data['__WORKING_DIR__'] working_dir = data['__WORKING_DIR__']
...@@ -259,34 +289,24 @@ class DownloadFromRemoteEdge(Edge): ...@@ -259,34 +289,24 @@ class DownloadFromRemoteEdge(Edge):
output_file_or_dir = data[key] output_file_or_dir = data[key]
local_path = None local_path = None
if output_file_or_dir == '*': if output_file_or_dir == '*':
aux.print_msg_if_allowed('\tAll possible output files will be downloaded', allow=self._show_msg)
paths = self._comm.listdir(remote_working_dir) paths = self._comm.listdir(remote_working_dir)
local_full_paths = ['/'.join([working_dir, file_or_dir]) for file_or_dir in paths] local_full_paths = ['/'.join([working_dir, file_or_dir]) for file_or_dir in paths]
remote_full_paths = ['/'.join([remote_working_dir, file_or_dir]) for file_or_dir in paths] remote_full_paths = ['/'.join([remote_working_dir, file_or_dir]) for file_or_dir in paths]
for file_or_dir in remote_full_paths: for file_or_dir in remote_full_paths:
self._comm.copy(file_or_dir, working_dir, mode='from_remote') aux.print_msg_if_allowed('\tAm going to download "{}" to "{}"'.format(file_or_dir, working_dir),
allow=self._show_msg)
self._comm.copy(file_or_dir, working_dir, mode='from_remote', show_msg=self._show_msg)
local_path = local_full_paths local_path = local_full_paths
else: else:
local_path = self._comm.copy('/'.join([remote_working_dir, output_file_or_dir]), working_dir, mode='from_remote') file_or_dir = '/'.join([remote_working_dir, output_file_or_dir])
aux.print_msg_if_allowed('\tAm going to download "{}" to "{}"'.format(file_or_dir, working_dir),
allow=self._show_msg)
local_path = self._comm.copy(file_or_dir, working_dir,
mode='from_remote', show_msg=self._show_msg)
if self._update_paths: if self._update_paths:
data[key] = local_path data[key] = local_path
'''
@todo: to be removed
'''
def dummy_edge(data):
pass
'''
@todo: to be removed
'''
def dummy_predicate(data):
return True
def job_finished_predicate(data):
return data['job_finished']
def job_unfinished_predicate(data):
return not data['job_finished']
def make_cd(key_path): def make_cd(key_path):
def _cd(d): def _cd(d):
...@@ -301,6 +321,7 @@ def make_cd(key_path): ...@@ -301,6 +321,7 @@ def make_cd(key_path):
d['__REMOTE_WORKING_DIR__'] = posixpath.join(d['__REMOTE_WORKING_DIR__'], subdir) d['__REMOTE_WORKING_DIR__'] = posixpath.join(d['__REMOTE_WORKING_DIR__'], subdir)
return _cd return _cd
def make_dump(dump_name_format, format_keys=(), omit=None): def make_dump(dump_name_format, format_keys=(), omit=None):
def _dump(d): def _dump(d):
format_params = [aux.recursive_get(d, key) for key in format_keys] format_params = [aux.recursive_get(d, key) for key in format_keys]
...@@ -312,12 +333,14 @@ def make_dump(dump_name_format, format_keys=(), omit=None): ...@@ -312,12 +333,14 @@ def make_dump(dump_name_format, format_keys=(), omit=None):
pickle.dump(dumped_d, f) pickle.dump(dumped_d, f)
return _dump return _dump
def make_composite_func(*funcs): def make_composite_func(*funcs):
def _composite(d): def _composite(d):
for func in funcs: for func in funcs:
func(d) func(d)
return _composite return _composite
def make_composite_predicate(*preds): def make_composite_predicate(*preds):
def _composite(d): def _composite(d):
for pred in preds: for pred in preds:
...@@ -326,6 +349,7 @@ def make_composite_predicate(*preds): ...@@ -326,6 +349,7 @@ def make_composite_predicate(*preds):
return True return True
return _composite return _composite
def create_local_data_from_global_data(global_data, keys_mapping): def create_local_data_from_global_data(global_data, keys_mapping):
if keys_mapping is None: if keys_mapping is None:
return global_data return global_data
...@@ -334,6 +358,7 @@ def create_local_data_from_global_data(global_data, keys_mapping): ...@@ -334,6 +358,7 @@ def create_local_data_from_global_data(global_data, keys_mapping):
else: else:
return {local_key: aux.recursive_get(global_data, global_key) for local_key, global_key in keys_mapping.items()} return {local_key: aux.recursive_get(global_data, global_key) for local_key, global_key in keys_mapping.items()}
def update_global_data_according_to_local_data(local_data, global_data, keys_mapping): def update_global_data_according_to_local_data(local_data, global_data, keys_mapping):
if keys_mapping is None: if keys_mapping is None:
global_data.update(local_data) global_data.update(local_data)
...@@ -344,6 +369,7 @@ def update_global_data_according_to_local_data(local_data, global_data, keys_map ...@@ -344,6 +369,7 @@ def update_global_data_according_to_local_data(local_data, global_data, keys_map
for local_key, global_key in keys_mapping.items(): for local_key, global_key in keys_mapping.items():
recursive_set(global_data, global_key, local_data[local_key]) recursive_set(global_data, global_key, local_data[local_key])
def build_args_line(data, keyword_names, flag_names, trailing_args_keys): def build_args_line(data, keyword_names, flag_names, trailing_args_keys):
args_str = '' args_str = ''
for keyword in keyword_names: for keyword in keyword_names:
...@@ -360,11 +386,26 @@ def build_args_line(data, keyword_names, flag_names, trailing_args_keys): ...@@ -360,11 +386,26 @@ def build_args_line(data, keyword_names, flag_names, trailing_args_keys):
args_str += ' ' args_str += ' '
return args_str return args_str
def render_sge_template(sge_template_name, sge_script_path, cores, time, commands): def render_sge_template(sge_template_name, sge_script_path, cores, time, commands):
sge_templ_path = os.path.join(aux.get_templates_path(), sge_template_name) with open('config_research.json', 'r') as f:
conf = json.load(f)
sge_templ_path = os.path.join(conf['TEMPLATES_PATH'], sge_template_name)
if not os.path.exists(sge_templ_path): # by default, templates are in templates/, but here we let the user put any path if not os.path.exists(sge_templ_path): # by default, templates are in templates/, but here we let the user put any path
sge_templ_path = sge_template_name sge_templ_path = sge_template_name
f = open(sge_templ_path, 'r') f = open(sge_templ_path, 'r')
rendered_data = Template(f.read()).render(cores=cores, time=time, commands=commands) rendered_data = Template(f.read()).render(cores=cores, time=time, commands=commands)
sge_script_file = aux.create_file_mkdir(sge_script_path) sge_script_file = aux.create_file_mkdir(sge_script_path)
sge_script_file.write(rendered_data) sge_script_file.write(rendered_data)
def connect_branches(branches: Sequence[Tuple[State, State]], edges: Optional[Sequence[Edge]] = None):
if edges is None:
edges = [dummy_edge for _ in range(len(branches) - 1)]
for i, edge in zip(range(1, len(branches)), edges):
_, prev_branch_end = branches[i - 1]
next_branch_start, _ = branches[i]
prev_branch_end.connect_to(next_branch_start, edge=edge)
dummy_edge = Edge(dummy_predicate, Func())
...@@ -4,23 +4,24 @@ from enum import Enum, auto ...@@ -4,23 +4,24 @@ from enum import Enum, auto
from functools import partial from functools import partial
import importlib as imp import importlib as imp
import comsdk.comaux as aux import comsdk.comaux as aux
ImplicitParallelizationInfo = collections.namedtuple('ImplicitParallelizationInfo', ['array_keys_mapping', 'branches_number', 'branch_i']) ImplicitParallelizationInfo = collections.namedtuple('ImplicitParallelizationInfo', ['array_keys_mapping', 'branches_number', 'branch_i'])
class Func():
class Func:
__slots__ = ( __slots__ = (
'module', 'module',
'func', 'func',
'comment', 'comment',
'name' 'name'
) )
def __init__(self, module="", name="", dummy=False,func=None, comment=''):
def __init__(self, module="", name="", dummy=False, func=None, comment=''):
self.module = module self.module = module
self.name = name self.name = name
self.comment=comment.replace("\0", " ") if comment is not None else "" self.comment = comment.replace("\0", " ") if comment is not None else ""
if module =="" or name =="" or module is None or name is None: if module == "" or name == "" or module is None or name is None:
dummy = True dummy = True
if func is not None: if func is not None:
self.func = func self.func = func
...@@ -34,24 +35,26 @@ class Func(): ...@@ -34,24 +35,26 @@ class Func():
raise Exception("Could not load function {} from {} module".format(name, module)) raise Exception("Could not load function {} from {} module".format(name, module))
def __str__(self): def __str__(self):
if self.module =="" or self.name =="": if self.module == "" or self.name == "":
return '' return self.func.__name__
return "{}_{}".format(self.module, self.name) return "{}_{}".format(self.module, self.name)
class Selector(Func): class Selector(Func):
def __init__(self, ntransf, module="", name="", dummy=False): def __init__(self, ntransf, module="", name="", dummy=False):
if module=="" and name =="": if module == "" and name == "":
dummy = True dummy = True
self.dummy = dummy self.dummy = dummy
super().__init__(module, name, func=(lambda x: [True for i in range(ntransf)]) if dummy else None) super().__init__(module, name, func=(lambda x: [True for i in range(ntransf)]) if dummy else None)
def __str__(self): def __str__(self):
if self.module =="" or self.name =="": if self.module == "" or self.name == "":
return '' return ''
return "{}_{}".format(self.module, self.name) return "{}_{}".format(self.module, self.name)
class Transfer: class Transfer:
def __init__(self, edge, output_state, order=0, comment = None): def __init__(self, edge, output_state, order=0):
self.edge = edge self.edge = edge
self.output_state = output_state self.output_state = output_state
self.order = order self.order = order
...@@ -60,10 +63,12 @@ class Transfer: ...@@ -60,10 +63,12 @@ class Transfer:
self.edge.morph(data, dynamic_keys_mapping) self.edge.morph(data, dynamic_keys_mapping)
return self.output_state return self.output_state
class IdleRunType(Enum): class IdleRunType(Enum):
INIT = auto() INIT = auto()
CLEANUP = auto() CLEANUP = auto()
class PluralState: class PluralState:
def __init__(self, states): def __init__(self, states):
self.states = states self.states = states
...@@ -73,6 +78,7 @@ class PluralState: ...@@ -73,6 +78,7 @@ class PluralState:
for init_state, term_state in zip(self.states, term_states): for init_state, term_state in zip(self.states, term_states):
init_state.transfers.append(Transfer(edge, term_state)) init_state.transfers.append(Transfer(edge, term_state))
class Graph: class Graph:
''' '''
Class describing a graph-based computational method. Graph execution must start from this object. Class describing a graph-based computational method. Graph execution must start from this object.
...@@ -124,8 +130,6 @@ class Graph: ...@@ -124,8 +130,6 @@ class Graph:
data['__WORKING_DIR__'] = data['__CURRENT_WORKING_DIR__'] data['__WORKING_DIR__'] = data['__CURRENT_WORKING_DIR__']
class State: class State:
__slots__ = [ __slots__ = [
'name', 'name',
...@@ -155,10 +159,10 @@ class State: ...@@ -155,10 +159,10 @@ class State:
self.looped_edges_number = 0 self.looped_edges_number = 0
self.activated_input_edges_number = 0 self.activated_input_edges_number = 0
self.transfers = [] self.transfers = []
self.possible_branches=[] self.possible_branches = []
self.is_term_state=False self.is_term_state = False
self._branching_states_history = None self._branching_states_history = None
self._proxy_state=None self._proxy_state = None
self.comment = None self.comment = None
def idle_run(self, idle_run_type, branching_states_history): def idle_run(self, idle_run_type, branching_states_history):
...@@ -209,7 +213,6 @@ class State: ...@@ -209,7 +213,6 @@ class State:
graph.term_state.transfers = self.transfers graph.term_state.transfers = self.transfers
graph.term_state.selector = self.selector graph.term_state.selector = self.selector
def run(self, data, implicit_parallelization_info=None): def run(self, data, implicit_parallelization_info=None):
print('STATE {}\n\tjust entered, implicit_parallelization_info: {}'.format(self.name, implicit_parallelization_info)) print('STATE {}\n\tjust entered, implicit_parallelization_info: {}'.format(self.name, implicit_parallelization_info))
# print('\t{}'.format(data)) # print('\t{}'.format(data))
...@@ -231,16 +234,24 @@ class State: ...@@ -231,16 +234,24 @@ class State:
if not selected_edges: if not selected_edges:
raise GraphUnexpectedTermination( raise GraphUnexpectedTermination(
"STATE {}: error in selector: {} ".format(self.name, selected_edges)) "STATE {}: error in selector: {} ".format(self.name, selected_edges))
selected_transfers = [self.transfers[i] for i, _ in enumerate(selected_edges) if selected_edges[i]==True] # selected_transfers = [self.transfers[i] for i, _ in enumerate(selected_edges) if selected_edges[i]]
for transf in selected_transfers: # for transf in selected_transfers:
if not transf.edge.predicate(data, dynamic_keys_mapping): # if not transf.edge.predicate(data, dynamic_keys_mapping):
raise Exception("\tERROR: predicate {} returns {} running from state {}\n data{}".format(transf.edge.pred_f.name,transf.edge.predicate(data, dynamic_keys_mapping), self.name, data)) # raise Exception("\tERROR: predicate {} returns {} running from state {}\n data{}".format(transf.edge.pred_f.name,transf.edge.predicate(data, dynamic_keys_mapping), self.name, data))
selected_transfers = [self.transfers[i] for i, _ in enumerate(selected_edges)
if selected_edges[i] and self.transfers[i].edge.predicate(data, dynamic_keys_mapping)]
if not selected_transfers:
raise GraphUnexpectedTermination('\tERROR: no transfer function has been '
'selected out of {} ones. Predicate values are {}. '
'Selector values are {}.'.format(len(self.transfers),
[t.edge.predicate(data, dynamic_keys_mapping) for t in self.transfers],
selected_edges))
return self.parallelization_policy.make_transfer_func(selected_transfers, return self.parallelization_policy.make_transfer_func(selected_transfers,
array_keys_mapping=self.array_keys_mapping, array_keys_mapping=self.array_keys_mapping,
implicit_parallelization_info=implicit_parallelization_info, state=self), \ implicit_parallelization_info=implicit_parallelization_info,
state=self), \
implicit_parallelization_info implicit_parallelization_info
def _activate_input_edge(self, implicit_parallelization_info=None): def _activate_input_edge(self, implicit_parallelization_info=None):
if implicit_parallelization_info is None or self.is_term_state: if implicit_parallelization_info is None or self.is_term_state:
self.activated_input_edges_number += 1 self.activated_input_edges_number += 1
...@@ -296,56 +307,58 @@ class SerialParallelizationPolicy: ...@@ -296,56 +307,58 @@ class SerialParallelizationPolicy:
def __init__(self): def __init__(self):
pass pass
def make_transfer_func(self, morphisms, array_keys_mapping=None, implicit_parallelization_info=None, state=None): def make_transfer_func(self, transfers, array_keys_mapping=None, implicit_parallelization_info=None, state=None):
def _morph(data): def _morph(data):
# print("MORPHING FROM {}".format(state.name)) # print("MORPHING FROM {}".format(state.name))
if array_keys_mapping is None: if array_keys_mapping is None:
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info) dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
next_morphs = [partial(morphism.transfer, dynamic_keys_mapping=dynamic_keys_mapping) for morphism in morphisms] next_transfers = [partial(t.transfer, dynamic_keys_mapping=dynamic_keys_mapping) for t in transfers]
next_impl_para_infos = [implicit_parallelization_info for _ in morphisms] next_impl_para_infos = [implicit_parallelization_info for _ in transfers]
# print('\t\t {}'.format(implicit_parallelization_infos)) # print('\t\t {}'.format(implicit_parallelization_infos))
else: else:
if len(morphisms) != 1: if len(transfers) != 1:
raise BadGraphStructure('Impossible to create implicit paralleilzation in the state with {} output edges'.format(len(morphisms))) raise BadGraphStructure('Impossible to create implicit paralleilzation in the state '
'with {} output edges'.format(len(transfers)))
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info) dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
proxy_data = aux.ProxyDict(data, keys_mappings=array_keys_mapping) proxy_data = aux.ProxyDict(data, keys_mappings=array_keys_mapping)
anykey = next(iter(array_keys_mapping.keys())) anykey = next(iter(array_keys_mapping.keys()))
implicit_branches_number = len(proxy_data[anykey]) implicit_branches_number = len(proxy_data[anykey])
next_morphs = [] next_transfers = []
next_impl_para_infos = [] next_impl_para_infos = []
for branch_i in range(implicit_branches_number): for branch_i in range(implicit_branches_number):
implicit_parallelization_info_ = ImplicitParallelizationInfo(array_keys_mapping, implicit_branches_number, branch_i) implicit_parallelization_info_ = ImplicitParallelizationInfo(array_keys_mapping, implicit_branches_number, branch_i)
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info_) dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info_)
# print(dynamic_keys_mapping) # print(dynamic_keys_mapping)
next_morphs.append(partial(morphisms[0].morph, dynamic_keys_mapping=dynamic_keys_mapping)) #next_transfers.append(partial(transfers[0].edge.morph, dynamic_keys_mapping=dynamic_keys_mapping))
next_transfers.append(partial(transfers[0].transfer, dynamic_keys_mapping=dynamic_keys_mapping))
next_impl_para_infos.append(implicit_parallelization_info_) next_impl_para_infos.append(implicit_parallelization_info_)
cur_morphs = [] cur_transfers = []
cur_impl_para_infos = [] cur_impl_para_infos = []
#while len(next_morphs) != 1 or _is_implicitly_parallelized(next_impl_para_infos): #while len(next_transfers) != 1 or _is_implicitly_parallelized(next_impl_para_infos):
while len(next_morphs) != 1 or _requires_joint_of_implicit_parallelization(array_keys_mapping, next_impl_para_infos): while len(next_transfers) != 1 or _requires_joint_of_implicit_parallelization(array_keys_mapping, next_impl_para_infos):
if next_impl_para_infos == []: if next_impl_para_infos == []:
raise Exception("Morphs count on state {} is {}".format(state.name, str(len(next_morphs)))) raise Exception("Morphs count on state {} is {}".format(state.name, str(len(next_transfers))))
# print(array_keys_mapping, next_impl_para_infos) # print(array_keys_mapping, next_impl_para_infos)
cur_morphs[:] = next_morphs[:] cur_transfers[:] = next_transfers[:]
cur_impl_para_infos[:] = next_impl_para_infos[:] cur_impl_para_infos[:] = next_impl_para_infos[:]
del next_morphs[:] del next_transfers[:]
del next_impl_para_infos[:] del next_impl_para_infos[:]
for morph, impl_para_info in zip(cur_morphs, cur_impl_para_infos): for t, impl_para_info in zip(cur_transfers, cur_impl_para_infos):
next_state = morph(data) next_state = t(data)
# print('\t next_state: {}, with impl para info: {}'.format(next_state.name, impl_para_info)) # print('\t next_state: {}, with impl para info: {}'.format(next_state.name, impl_para_info))
if next_state is None: if next_state is None:
return None return None
next_morph, next_impl_para_info = _run_state(next_state, data, impl_para_info) next_t, next_impl_para_info = _run_state(next_state, data, impl_para_info)
# print('\t next_morph: {}'.format(next_morph)) # print('\t next_morph: {}'.format(next_morph))
if '__EXCEPTION__' in data: if '__EXCEPTION__' in data:
return None return None
if next_morph is not None: if next_t is not None:
next_morphs.append(next_morph) next_transfers.append(next_t)
next_impl_para_infos.append(next_impl_para_info) next_impl_para_infos.append(next_impl_para_info)
# print(array_keys_mapping, next_impl_para_infos) # print(array_keys_mapping, next_impl_para_infos)
#print(len(next_morphs)) #print(len(next_transfers))
# print('\t last morph: {}'.format(next_morphs[0])) # print('\t last morph: {}'.format(next_transfers[0]))
next_state = next_morphs[0](data) next_state = next_transfers[0](data)
# print(next_state.name, next_impl_para_infos[0]) # print(next_state.name, next_impl_para_infos[0])
return next_state return next_state
return _morph return _morph
...@@ -354,9 +367,11 @@ class SerialParallelizationPolicy: ...@@ -354,9 +367,11 @@ class SerialParallelizationPolicy:
class BadGraphStructure(Exception): class BadGraphStructure(Exception):
pass pass
class GraphUnexpectedTermination(Exception): class GraphUnexpectedTermination(Exception):
pass pass
def _requires_joint_of_implicit_parallelization(array_keys_mapping, impl_para_infos): def _requires_joint_of_implicit_parallelization(array_keys_mapping, impl_para_infos):
if array_keys_mapping is None: if array_keys_mapping is None:
return False return False
...@@ -365,16 +380,10 @@ def _requires_joint_of_implicit_parallelization(array_keys_mapping, impl_para_in ...@@ -365,16 +380,10 @@ def _requires_joint_of_implicit_parallelization(array_keys_mapping, impl_para_in
return True return True
return False return False
def _get_trues(boolean_list): def _get_trues(boolean_list):
return [i for i, val in enumerate(boolean_list) if val == True] return [i for i, val in enumerate(boolean_list) if val == True]
#def _run_state(state, data, implicit_parallelization_info=None):
# try:
# next_morphism = state.run(data, implicit_parallelization_info)
# except GraphUnexpectedTermination as e:
# data['__EXCEPTION__'] = str(e)
# return None
# return next_morphism
def _run_state(state, data, implicit_parallelization_info=None): def _run_state(state, data, implicit_parallelization_info=None):
try: try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment