Commit 1b6fe950 authored by Anton Pershin's avatar Anton Pershin

Добавлены файлы рабочей версии

parent c9658e69
# ignore custom config file
config_research.json
from functools import reduce
import os
import re
import collections
from copy import deepcopy
import importlib
import numpy as np
ArrayItemGetter = collections.namedtuple('ArrayItemGetter', ['key_path_to_array', 'i'])
class ProxyDict(object):
'''
Class allowing to access a dict via a proxy mapping using the same interface as dict does.
It supports two types of proxy mappings:
1) relative_keys
2) keys_mappings
and also extends a simple key to key_path. For example, a sequence of keys leading to d['a']['b']['c']
corresponds to a key_path ('a', 'b', 'c').
Proxy mapping relative_keys is a sequence of key_path leading to subdicts. The content of these subdicts
is treated as located in the root of the proxy dict. For example, suppose we have d = {'a': 1, 'b':{'c': 2, 'd': 3}}.
A proxy dict with relative_key ('b',) shall be pd = {'a': 1, 'c': 2, 'd': 3, 'b':{'c': 2, 'd': 3}}.
Proxy mapping keys_mappings is a dict linking a (new) key in the root of proxy dict to key_path in original dict.
For example, for dict d, a proxy dict with keys_mappings {'d': ('b', 'd')} shall be pd = {'a': 1, 'd': 3, 'b':{'c': 2, 'd': 3}}.
Finally, we have default_relative_key which is a key_path leading to a subdict to which new elements must be added.
For example, for dict d, proxy dict pd and default_relative_key ('b',), operation pd['z'] = 0 leads to the following change in d:
d = {'a': 1, 'b':{'c': 2, 'd': 3, 'z': 0}}
The order of the proxy mappings (the higher mapping overwrites the lower):
1) keys_mappings
2) relative_keys
3) original dict (root)
'''
def __init__(self, data,
relative_keys=(),
keys_mappings={},
default_relative_key=(),
):
self._data = data
self._default_relative_key = list(default_relative_key)
self._keys_mappings = {key: key for key in self._data.keys()}
for rel_key in relative_keys:
for inner_key in recursive_get(data, rel_key).keys():
self._keys_mappings[inner_key] = list(rel_key) + [inner_key]
self._keys_mappings.update(keys_mappings)
def __repr__(self):
res = '{'
for key in self._keys_mappings.keys():
res += '{}: {}, '.format(key, self.__getitem__(key))
return res + '}'
def __contains__(self, key):
return key in self._keys_mappings.keys()
def __getitem__(self, key):
# x[key] => x.__getitem__(key)
return recursive_get(self._data, self._keys_mappings[key])
def __setitem__(self, key, value):
# x[key] = value => x.__setitem__(key, value)
if key in self._keys_mappings:
recursive_set(self._data, self._keys_mappings[key], value)
else:
recursive_set(self._data, self._default_relative_key + [key], value)
self._keys_mappings[key] = self._default_relative_key + [key]
def __delitem__(self, key):
# del x[key] => x.__delitem__(key)
val = recursive_get(self._data, self._keys_mappings[key])
del val
def update(self, mapping):
for key in mapping.keys():
self.__setitem__(key, mapping[key])
def recursive_get(d, keys):
if isinstance(keys, ArrayItemGetter):
array_ = recursive_get(d, keys.key_path_to_array)
return array_[keys.i]
elif is_sequence(keys):
return reduce(lambda d_, key_: d_.get(key_, {}), keys, d)
else:
return d[keys]
def recursive_set(d, keys, val):
if isinstance(keys, ArrayItemGetter):
array_ = recursive_get(d, keys.key_path_to_array)
array_[keys.i] = val
elif is_sequence(keys):
last_dict = reduce(lambda d_, key_: d_.setdefault(key_, {}), keys[:-1], d)
last_dict[keys[-1]] = val
else:
d[keys] = val
def is_sequence(obj):
'''
Checks whether obj is a sequence (string does not count as a sequence)
'''
return isinstance(obj, collections.Sequence) and (not hasattr(obj, 'strip'))
def cp(from_, to_):
'''
Copies from_ to to_ where from_ may be file or dir and to_ is a dir.
Returns new path.
'''
if os.path.isfile(from_):
shutil.copy(from_, to_)
else:
shutil.copytree(from_, to_)
return os.path.join(to_, os.path.basename(from_))
def rm(target):
'''
Removes target which may be file or dir.
'''
if os.path.isfile(target):
os.remove(target)
else:
shutil.rmtree(target)
def remove_if_exists(path):
try:
os.remove(path)
return True
except FileNotFoundError as e:
return False
def create_file_mkdir(filepath):
"""Opens a filepath in a write mode (i.e., creates/overwrites it). If the path does not exists,
subsequent directories will be created.
"""
dirpath = os.path.dirname(filepath)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
return open(filepath, 'w')
def get_templates_path():
'''
Returns the absolute path to templates directory. It is useful when the module is imported from elsewhere.
'''
return os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')
def find_dir_by_named_regexp(regexp, where):
"""Search for dir in where which satisfies regexp. If successful, parses the dir according to named regexp.
Returns a tuple (found_dir, params_from_named_regexp) or None if not found.
"""
dirnames = next(os.walk(where))[1]
for dir_ in dirnames:
parsing_params = parse_by_named_regexp(regexp, dir_)
if parsing_params is not None:
return dir_, parsing_params
return None
def find_all_dirs_by_named_regexp(regexp, where):
"""Search for dirs in where which satisfies regexp. If successful, parses them according to named regexp.
Returns a list of tuples (found_dir, params_from_named_regexp).
"""
dirnames = next(os.walk(where))[1]
datas = []
for dir_ in dirnames:
parsing_params = parse_by_named_regexp(regexp, dir_)
if parsing_params is not None:
datas.append((dir_, parsing_params))
return datas
def parse_by_named_regexp(regexp, val):
"""Parses val according to named regexp. Return a dictionary of params.
"""
matching = re.search(regexp, val)
if matching is None:
return None
return matching.groupdict()
def parse_datafile(path, data_names, transform_funcs, cols_to_parse=[]):
"""Parses a data file given by path and structured as a table where rows are separated by \n
and columns are separated by any of whitespaces. The first line in the file will be ignored.
Processed columns are given by cols_to_parse (all columns will be processed if it is empty).
Corresponding names and transformation functions for columns in cols_to_parse are given by
data_names and transform_funcs. Transformation function must be a mapping string -> type.
Returns a dictionary where a key corresponds to a column name (i.e., taken from data_names)
and a value corresponds to a list of the columns values taken from all rows.
"""
if cols_to_parse == []:
cols_to_parse = range(len(data_names))
if len(data_names) != len(transform_funcs) or len(data_names) != len(cols_to_parse):
raise Exception('Number of data names, transform functions and columns to be parsed is inconsistent')
data = collections.OrderedDict()
for data_name in data_names:
data[data_name] = []
f = open(path, 'r') # if not found, expection will be raised anyway
lines = f.readlines()
for line in lines[1:]: # skip the first line
tmp = line.split()
if len(tmp) < len(data_names):
raise Exception('Number of given data names is larger than number of columns we have in the data file.')
for i, data_name in enumerate(data_names):
val = tmp[cols_to_parse[i]]
data[data_name].append(transform_funcs[i](val))
return data
def parse_timed_numdatafile(path):
"""Parses a data file given by path and structured as a table where rows are separated by \n
and columns are separated by any of whitespaces. The table here has an interpretation of a matrix whose
rows axis corresponds to time axis and columns axis corresponds to data axis. Moreover, the first column
contains the time values so the data is contained in columns starting from the second one.
Returns time_list (a list of times from the first column) and data_matrix (a list of numpy arrays of data where
list's index corresponds to the time index).
"""
time = []
data = []
f = open(path, 'r') # if not found, expection will be raised anyway
lines = f.readlines()
for line in lines[1:]: # skip the first line
tmp = line.split()
time.append(float(tmp[0]))
timed_data = np.zeros((len(tmp) - 1, ))
for i, val in enumerate(tmp[1:]):
timed_data[i] = float(val)
data.append(timed_data)
return time, data
def write_datafile(path, data):
keys = list(data.keys())
# print(keys)
values = list(data.values())
with open(path, 'w') as f:
f.write(r'% ' + '\t'.join(keys) + '\n')
for t_i in range(len(values[0])):
line = '\t'.join([str(array[t_i]) for array in values]) + '\n'
f.write(line)
def write_timed_numdatafile(path, time, data):
with open(path, 'w') as f:
for i in range(len(time)):
line = '{}\t'.format(time[i]) + '\t'.join([str(data[i][j]) for j in range(data.shape[1])]) + '\n'
f.write(line)
def load_function_from_module(full_function_name):
module_name, function_name = full_function_name.rsplit('.', 1)
module_ = importlib.import_module(module_name)
return getattr(module_, function_name)
import os
import os.path
import shutil
import paramiko
import subprocess
import shlex
import json
from stat import S_ISDIR
from abc import ABCMeta, abstractmethod
from comsdk.aux import load_function_from_module
class Host(object):
'''
Class storing all necessary information about the host of execution.
'''
def __init__(self):
self.programs = {}
def add_program(self, prog_name,
path_to_prog=None,
):
self.programs[prog_name] = path_to_prog
def get_program_launch_path(self, prog_name):
path_to_prog = self.programs[prog_name]
if path_to_prog is not None:
return self.programs[prog_name] + '/' + prog_name
else:
return prog_name
class RemoteHost(Host):
'''
RemoteHost extends Host including information about ssh host and the number of cores.
'''
def __init__(self, ssh_host, cores, sge_template_name, job_setter, job_finished_checker):
self.ssh_host = ssh_host
self.cores = cores
self.sge_template_name = sge_template_name
self.set_job_id = load_function_from_module(job_setter)
self.check_task_finished = load_function_from_module(job_finished_checker)
self._job_setter = job_setter
self._job_finished_checker = job_finished_checker
super().__init__()
def __getstate__(self):
return {
'ssh_host': self.ssh_host,
'cores': self.cores,
'programs': self.programs,
'sge_template_name': self.sge_template_name,
'job_setter': self._job_setter,
'job_finished_checker': self._job_finished_checker,
}
def __setstate__(self, state):
self.ssh_host = state['ssh_host']
self.cores = state['cores']
self.programs = state['programs']
self.sge_template_name = state['sge_template_name']
self.set_job_id = load_function_from_module(state['job_setter'])
self.check_task_finished = load_function_from_module(state['job_finished_checker'])
# Decorator
def enable_sftp(func):
def wrapped_func(self, *args, **kwds):
self._init_sftp()
return func(self, *args, **kwds)
return wrapped_func
class BaseCommunication(metaclass=ABCMeta):
'''
BaseCommunication is an abstract class which can be used to implement the simplest access to a machine.
A concrete class ought to use a concrete method of communication (e.g., OS API or ssh) allowing to access
the filesystem (copy and remove files) and execute a command line on the machine.
Since a machine can be, in particular, the local machine, and at the same time we must always establish the communication between
the local machine and a machine being communicated, we have to sort the terminology out. We shall call the latter a communicated
machine whilst the former remain the local machine.
Generally, two types of files exchange are possible:
(1) between the local machine and a communicated machine,
(2) within a communicated machine.
Since for now only copying implies this division, we introduce so called 'modes of copying': from_local, to_local
and all_on_communicated
'''
def __init__(self, host, machine_name):
self.host = host
self.machine_name = machine_name
@abstractmethod
def execute(self, command, working_dir=None):
pass
@abstractmethod
def copy(self, from_, to_, mode='from_local'):
'''
Copies from_ to to_ which are interpreted according to mode:
(1) from_local (default) -> from_ is local path, to_ is a path on a communicated machine
(2) from_remote -> from_ is a path on a communicated machine, to_ local path
(3) all_remote -> from_ and to_ are paths on a communicated machine
from_ and to_ can be dirs or files according to the following combinations:
(1) from_ is dir, to_ is dir
(2) from_ is file, to_ is dir
(3) from_ is file, to_ is file
'''
pass
@abstractmethod
def rm(self, target):
'''
Removes target which can be a dir or file
'''
pass
def execute_program(self, prog_name, args_str, working_dir=None):
prog_path = self.host.get_program_launch_path(prog_name)
command = '{} {}'.format(prog_path, args_str)
return self.execute(command, working_dir)
def _print_copy_msg(self, from_, to_):
print('\tCopying %s to %s' % (from_, to_))
def _print_exec_msg(self, cmd, is_remote):
where = '@' + self.machine_name if is_remote else ''
print('\tExecuting %s: %s' % (where, cmd))
class LocalCommunication(BaseCommunication):
def __init__(self, local_host, machine_name='laptop'):
super(LocalCommunication, self).__init__(local_host, machine_name)
@classmethod
def create_from_config(cls):
with open('config_research.json', 'r') as f:
conf = json.load(f)
local_host = Host()
_add_programs_from_config(local_host, conf['LOCAL_HOST'])
return LocalCommunication(local_host)
def execute(self, command, working_dir=None):
command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command)
#print('\t' + command_line)
# use PIPEs to avoid breaking the child process when the parent process finishes
# (works on Linux, solution for Windows is to add creationflags=0x00000010 instead of stdout, stderr, stdin)
#self._print_exec_msg(command_line, is_remote=False)
subprocess.call([command_line], shell=True)
return [], []
def copy(self, from_, to_, mode='from_local'):
'''
Any mode is ignored since the copying shall be within a local machine anyway
'''
#self._print_copy_msg(from_, to_)
return cp(from_, to_)
def rm(self, target):
rm(target)
class SshCommunication(BaseCommunication):
def __init__(self, remote_host, username, password, machine_name=''):
if not isinstance(remote_host, RemoteHost):
Exception('Only RemoteHost can be used to build SshCommunication')
self.host = remote_host
self.username = username
self.password = password
self.ssh_client = paramiko.SSHClient()
self.sftp_client = None
#self.main_dir = '/nobackup/mmap/research'
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_client.connect(self.host.ssh_host, username=username, password=password)
transport = self.ssh_client.get_transport()
transport.packetizer.REKEY_BYTES = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
transport.packetizer.REKEY_PACKETS = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
paramiko.util.log_to_file('paramiko.log')
super().__init__(self.host, machine_name)
@classmethod
def create_from_config(cls, host_sid):
with open('config_research.json', 'r') as f:
conf = json.load(f)
hostconf = conf['REMOTE_HOSTS'][host_sid]
remote_host = RemoteHost(ssh_host=hostconf['ssh_host'],
cores=hostconf['max_cores'],
sge_template_name=hostconf['sge_template_name'],
job_setter=hostconf['job_setter'],
job_finished_checker=hostconf['job_finished_checker'])
_add_programs_from_config(remote_host, hostconf)
return SshCommunication(remote_host, username=hostconf['username'],
password=hostconf['password'],
machine_name=host_sid)
def __getstate__(self):
return {
'host': self.host.__getstate__(),
'username': self.username,
'password': self.password,
}
def __setstate__(self, state):
remote_host = RemoteHost.__new__(RemoteHost)
remote_host.__setstate__(state['host'])
self.__init__(remote_host, state['username'], state['password'])
def execute(self, command, working_dir=None):
if self.ssh_client is None:
raise Exception('Remote host is not set')
#self._print_exec_msg(command, is_remote=True)
command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command)
stdin, stdout, stderr = self.ssh_client.exec_command(command_line)
return stdout.readlines(), stderr.readlines()
# for line in stdout:
# print('\t\t' + line.strip('\n'))
# for line in stderr:
# print('\t\t' + line.strip('\n'))
def copy(self, from_, to_, mode='from_local'):
if self.ssh_client is None:
raise Exception('Remote host is not set')
self._init_sftp()
new_path = None
if mode == 'from_local':
new_path = self._copy_from_local(from_, to_)
elif mode == 'from_remote':
new_path = self._copy_from_remote(from_, to_)
elif mode == 'all_remote':
# self._print_copy_msg(self._machine_name + ':' + from_, self._machine_name + ':' + to_)
self._mkdirp(to_)
self.execute('cp -r %s %s' % (from_, to_))
else:
raise Exception("Incorrect mode '%s'" % mode)
return new_path
def rm(self, target):
if self.ssh_client is None:
raise Exception('Remote host is not set')
self._init_sftp()
self.execute('rm -r %s' % target)
@enable_sftp
def mkdir(self, path):
self.sftp_client.mkdir(path)
@enable_sftp
def listdir(self, path_on_remote):
return self.sftp_client.listdir(path_on_remote)
@enable_sftp
def _chdir(self, path=None):
self.sftp_client.chdir(path)
def _mkdirp(self, path):
path_list = path.split('/')
cur_dir = ''
if (path_list[0] == '') or (path_list[0] == '~'): # path is absolute and relative to user's home dir => don't need to check obvious
cur_dir = path_list.pop(0) + '/'
start_creating = False # just to exclude unnecessary stat() calls when we catch non-existing dir
for dir_ in path_list:
if dir_ == '': # trailing slash or double slash, can skip
continue
cur_dir += dir_
if start_creating or (not self._is_remote_dir(cur_dir)):
self.mkdir(cur_dir)
if not start_creating:
start_creating = True
cur_dir += '/'
@enable_sftp
def _open(self, filename, mode='r'):
return self.sftp_client.open(filename, mode)
@enable_sftp
def _get(self, remote_path, local_path):
return self.sftp_client.get(remote_path, local_path)
@enable_sftp
def _put(self, local_path, remote_path):
return self.sftp_client.put(local_path, remote_path)
def _is_remote_dir(self, path):
try:
return S_ISDIR(self.sftp_client.stat(path).st_mode)
except IOError:
return False
def _copy_from_local(self, from_, to_):
new_path_on_remote = to_ + '/' + os.path.basename(from_)
if os.path.isfile(from_):
self._mkdirp(to_)
# self._print_copy_msg(from_, self._machine_name + ':' + to_)
self._put(from_, new_path_on_remote)
elif os.path.isdir(from_):
self.mkdir(new_path_on_remote)
for dir_or_file in os.listdir(from_):
self._copy_from_local(os.path.join(from_, dir_or_file), new_path_on_remote)
else:
raise CommunicationError("Path %s does not exist" % from_)
return new_path_on_remote
def _copy_from_remote(self, from_, to_):
new_path_on_local = os.path.join(to_, os.path.basename(from_))
if not self._is_remote_dir(from_):
# self._print_copy_msg(self._machine_name + ':' + from_, to_)
self._get(from_, new_path_on_local)
else:
os.mkdir(new_path_on_local)
for dir_or_file in self.sftp_client.listdir(from_):
self._copy_from_remote(from_ + '/' + dir_or_file, new_path_on_local)
return new_path_on_local
def disconnect(self):
if self.sftp_client is not None:
self.sftp_client.close()
self.ssh_client.close()
def _init_sftp(self):
if self.sftp_client is None:
self.sftp_client = self.ssh_client.open_sftp()
class CommunicationError(Exception):
pass
def _add_programs_from_config(host, hostconf):
if 'custom_programs' in hostconf:
paths = hostconf['custom_programs']
for path, programs in paths.items():
for program in programs:
host.add_program(program, path)
if 'env_programs' in hostconf:
for program in hostconf['env_programs']:
host.add_program(program)
from comsdk.aux import find_dir_by_named_regexp
from functools import partial
import os
class DistributedStorage:
"""
Distributed storage is a set of sources contaning the data. The sources must be accessible by the OS API.
It is assumed that the data somewhat overlaps, namely, it should overlap in terms of the catalog hierarchy.
However, this implementation does not guarantee the uniqueness of data: instead, it uses a priority to prefer
one source over another while looking up. Even though duplicates are acceptable, the found ones will be printed
out for the sake of user's attention.
"""
def __init__(self, abs_storage_paths, prior_storage_index=0):
self.storage_paths = abs_storage_paths
self.prior_storage_index = prior_storage_index
def get_dir_path(self, dir_):
"""
Returns the full path to dir_ or None if dir_ is absent.
"""
dir_path_tuple = self.lookup_through_dir(dir_, \
lambda dir_path: (dir_path, dir_path) if os.path.exists(dir_path) else None)
return dir_path_tuple[0] if dir_path_tuple is not None else None
def make_dir(self, dir_):
"""
Creates dir_ in prior storage. Returns the full path to it.
"""
path_ = os.path.join(self.storage_paths[self.prior_storage_index], dir_)
os.makedirs(path_)
return path_
def find_dir_by_named_regexp(self, parent_dir, regexp):
"""
Finds a directory in parent_dir fulfulling regexp. Returns a tuple (full_path_to_found_dir, named_params_from_regexp).
"""
return self.lookup_through_dir(parent_dir, partial(find_dir_by_named_regexp, regexp))
def lookup_through_dir(self, dir_, lookup_func):
"""
Looks up the data in dir_ by executing lookup_func on dir_. Returns a tuple (full_path_to_dir, some_data_regarding_dir)
which must, in turn, be returned by lookup_func. lookup_func must take a single argument -- full path to the dir.
"""
possible_paths = [os.path.join(source, dir_) if dir_ != '' else source for source in self.storage_paths]
found_data = None
prior_found = False
for path_i in range(len(possible_paths)):
path_ = possible_paths[path_i]
if os.path.exists(possible_paths[path_i]):
tmp_found_data = lookup_func(possible_paths[path_i])
if tmp_found_data is not None:
tmp_found_path = os.path.join(possible_paths[path_i], tmp_found_data[0])
if found_data is not None:
print("Duplicate distributed dir is found: '{}' and '{}'".format(tmp_found_path, found_data[0]))
if not prior_found:
found_data = (tmp_found_path, tmp_found_data[1])
if path_i == self.prior_storage_index:
prior_found = True
return found_data
def listdir(self, dir_):
"""
Lists the content of dir_. Returns a tuple (dirnames, filenames) which are obtained by simple union of the content of sources.
Therefore, there might be copies whose detection must be performed elsewhere.
"""
dirnames = []
filenames = []
for storage_path in self.storage_paths:
if os.path.exists(os.path.join(storage_path, dir_)):
_, dirnames_, filenames_ = next(os.walk(os.path.join(storage_path, dir_)))
dirnames += dirnames_
filenames += filenames_
return dirnames, filenames
import comsdk.aux as aux
from comsdk.communication import CommunicationError
from mako.template import Template
import os
import posixpath
import pickle
class InOutMapping(object):
def __init__(self,
keys_mapping={},
relative_keys=(),
default_relative_key=(),
):
self._default_relative_key = default_relative_key if aux.is_sequence(default_relative_key) else (default_relative_key,)
self._relative_keys = relative_keys if aux.is_sequence(relative_keys) else (relative_keys,)
self._keys_mapping = keys_mapping
def build_proxy_data(self, data, dynamic_keys_mapping={}):
if self._default_relative_key == () and self._relative_keys == () and self._keys_mapping == {} and dynamic_keys_mapping == {}:
return data
else:
#print('\t{}\n\t{}\n\t{}'.format(self._relative_keys, self._keys_mapping, dynamic_keys_mapping))
return aux.ProxyDict(data, self._relative_keys, dict(self._keys_mapping, **dynamic_keys_mapping), self._default_relative_key)
class Edge(object):
__slots__ = [
'_predicate',
'_morphism',
'_io_mapping',
'preprocess',
'postprocess',
]
def __init__(self, predicate, morphism,
io_mapping=InOutMapping(),
):
self._predicate = predicate
self._morphism = morphism
self._io_mapping = io_mapping
self.preprocess = lambda pd: None
self.postprocess = lambda pd: None
def predicate(self, data, dynamic_keys_mapping={}):
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
return self._predicate(proxy_data)
def morph(self, data, dynamic_keys_mapping={}):
#print(dynamic_keys_mapping)
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
#print(proxy_data)
self.preprocess(data)
self._morphism(proxy_data)
self.postprocess(data)
class DummyEdge(Edge):
def __init__(self):
super().__init__(None, None)
def predicate(self, data, dynamic_keys_mapping={}):
return True
def morph(self, data, dynamic_keys_mapping={}):
self.preprocess(data)
self.postprocess(data)
class ExecutableProgramEdge(Edge):
'''
Class implementing the edge which uses an external program to morph data.
The program is lauchned via so-called communication which, among others, sets where the program is located and it can be launched.
Environment can be used to launch program on remote resources.
# DESCRIPTION OF KEYS MAPPINGS #
Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
from data which may be located in different (nested) keys of data (we will call these keys "global").
However, it is very convenient to implement the edge imagining that there is no nested structures
and all keys are available in the top-level of data (we will call these keys "local").
To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
Therefore, we have keys mappings for input and output keys.
# END OF DESCRIPTION OF KEYS MAPPINGS #
We expect that necessary input files are already on remote.
Programs may require three types of arguments:
1) keyword arguments (-somearg something)
2) flags (-someflag)
3) trailing arguments
Local keys determining the corresponding values are located in keyword_names, flag_names and trailing_args_keys.
Finally, data must be somehow updated after finishing. This will be done by updating data according to output_dict (it is just added)
'''
def __init__(self, program_name, comm,
predicate=None,
io_mapping=InOutMapping(),
output_dict={}, # output dict which will be added to the main dictionary (w.r.t. output_keys_mapping)
keyword_names=(), # "local keys" where keyword args are stored
flag_names=(), # "local keys" where flags are stored
trailing_args_keys=(), # "local keys" where trailing args are stored
remote=False,
stdout_processor=None,
):
predicate = predicate if predicate is not None else dummy_predicate
self._output_dict = output_dict
self._comm = comm
self._program_name = program_name
self._keyword_names = keyword_names
self._flag_names = flag_names
self._trailing_args_keys = trailing_args_keys
self._working_dir_key = '__REMOTE_WORKING_DIR__' if remote else '__WORKING_DIR__'
self._stdout_processor = stdout_processor
super().__init__(predicate, self.execute, io_mapping)
def execute(self, data):
args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
working_dir = data[self._working_dir_key]
stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir) # here we execute
output_data = self._output_dict
if self._stdout_processor:
stdout_data = self._stdout_processor(data, stdout_lines)
data.update(stdout_data)
data.update(output_data)
class QsubScriptEdge(Edge):
'''
Class implementing the edge which builds up the sh-script for qsub.
The script is created via communication.
# DESCRIPTION OF KEYS MAPPINGS #
Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
from data which may be located in different (nested) keys of data (we will call these keys "global").
However, it is very convenient to implement the edge imagining that there is no nested structures
and all keys are available in the top-level of data (we will call these keys "local").
To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
Therefore, we have keys mappings for input and output keys.
# END OF DESCRIPTION OF KEYS MAPPINGS #
Data will be augmented by 'qsub_script' pointing to the local file.
'''
def __init__(self, program_name, local_comm, remote_comm,
predicate=None,
io_mapping=InOutMapping(),
keyword_names=(), # "local keys" where keyword args are stored
flag_names=(), # "local keys" where flags are stored
trailing_args_keys=(), # "local keys" where trailing args are stored
):
predicate = predicate if predicate is not None else dummy_predicate
self._local_comm = local_comm
self._remote_comm = remote_comm
self._program_name = program_name
self._keyword_names = keyword_names
self._flag_names = flag_names
self._trailing_args_keys = trailing_args_keys
super().__init__(predicate, self.execute, io_mapping)
def execute(self, data):
if isinstance(data, aux.ProxyDict):
print('QsubScriptEdge -> {}: {}'.format('qsub_script_name', data._keys_mappings['qsub_script_name']))
qsub_script_path = os.path.join(data['__WORKING_DIR__'], data['qsub_script_name'])
args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
program_launch_path = self._remote_comm.host.get_program_launch_path(self._program_name)
command_line = '{} {}'.format(program_launch_path, args_str)
render_sge_template(self._remote_comm.host.sge_template_name, qsub_script_path,
data['cores_required'], data['time_required'], (command_line,))
data.update({'qsub_script': qsub_script_path})
class UploadOnRemoteEdge(Edge):
'''
Class implementing the edge which uploads the data to the remote computer.
It is done via environment which must provide the interface for that.
# DESCRIPTION OF KEYS MAPPINGS #
Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
from data which may be located in different (nested) keys of data (we will call these keys "global").
However, it is very convenient to implement the edge imagining that there is no nested structures
and all keys are available in the top-level of data (we will call these keys "local").
To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
Therefore, we have keys mappings for input and output keys.
# END OF DESCRIPTION OF KEYS MAPPINGS #
Files for uploading must be found in input_files_keys which is a list of local data keys corresponding to these files.
They will be uploaded in remote working dir which must be in data['__REMOTE_WORKING_DIR__'].
After edge execution, data is going to be updated such that local paths will be replaced by remote ones.
'''
def __init__(self, comm,
predicate=None,
io_mapping=InOutMapping(),
local_paths_keys=(), # "local keys", needed to build a copy list
update_paths=True,
already_remote_path_key=None,
):
predicate = predicate if predicate is not None else dummy_predicate
self._local_paths_keys = local_paths_keys
self._comm = comm
self._update_paths = update_paths
self._already_remote_path_key = already_remote_path_key
super().__init__(predicate, self.execute, io_mapping)
def execute(self, data):
# print(data)
# print(data['c_field_path'])
if self._already_remote_path_key is not None:
if data[self._already_remote_path_key]:
return
remote_working_dir = data['__REMOTE_WORKING_DIR__']
for key in self._local_paths_keys:
try:
# try data[key] as an absolute path
data[key] = self._comm.copy(data[key], remote_working_dir, mode='from_local')
except CommunicationError as e:
# try data[key] as a relative path
working_dir = data['__WORKING_DIR__']
if isinstance(data, aux.ProxyDict):
print('UploadOnRemoteEdge -> {}: {}'.format(key, data._keys_mappings[key]))
remote_path = self._comm.copy(os.path.join(working_dir, data[key]), remote_working_dir, mode='from_local')
if self._update_paths:
data[key] = remote_path
class DownloadFromRemoteEdge(Edge):
'''
Class implementing the edge which downloads the data from the remote computer.
It is done via environment which must provide the interface for that.
# DESCRIPTION OF KEYS MAPPINGS #
Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
from data which may be located in different (nested) keys of data (we will call these keys "global").
However, it is very convenient to implement the edge imagining that there is no nested structures
and all keys are available in the top-level of data (we will call these keys "local").
To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
Therefore, we have keys mappings for input and output keys.
# END OF DESCRIPTION OF KEYS MAPPINGS #
Files for downloading must be found in output_files_keys which is a list of local data keys corresponding to these files.
All these files are relative to the remote working dir and will be downloaded into local working dir
Local working dir must be in data['__LOCAL_WORKING_DIR__'].
Remote working dir must be in data['__REMOTE_WORKING_DIR__'].
After edge execution, data is going to be updated such that remote/relative paths will be replaced by local ones.
'''
def __init__(self, comm,
predicate=None,
io_mapping=InOutMapping(),
remote_paths_keys=(), # "local keys", needed to build a list for downloading
update_paths=True,
):
predicate = predicate if predicate is not None else dummy_predicate
self._remote_paths_keys = remote_paths_keys
self._comm = comm
self._update_paths = update_paths
super().__init__(predicate, self.execute, io_mapping)
def execute(self, data):
working_dir = data['__WORKING_DIR__']
remote_working_dir = data['__REMOTE_WORKING_DIR__']
for key in self._remote_paths_keys:
output_file_or_dir = data[key]
local_path = None
if output_file_or_dir == '*':
paths = self._comm.listdir(remote_working_dir)
local_full_paths = ['/'.join([working_dir, file_or_dir]) for file_or_dir in paths]
remote_full_paths = ['/'.join([remote_working_dir, file_or_dir]) for file_or_dir in paths]
for file_or_dir in remote_full_paths:
self._comm.copy(file_or_dir, working_dir, mode='from_remote')
local_path = local_full_paths
else:
local_path = self._comm.copy('/'.join([remote_working_dir, output_file_or_dir]), working_dir, mode='from_remote')
if self._update_paths:
data[key] = local_path
'''
@todo: to be removed
'''
def dummy_edge(data):
pass
'''
@todo: to be removed
'''
def dummy_predicate(data):
return True
def job_finished_predicate(data):
return data['job_finished']
def job_unfinished_predicate(data):
return not data['job_finished']
def make_cd(key_path):
def _cd(d):
if key_path == '..':
d['__WORKING_DIR__'] = os.path.dirname(d['__WORKING_DIR__'])
if '__REMOTE_WORKING_DIR__' in d:
d['__REMOTE_WORKING_DIR__'] = posixpath.dirname(d['__REMOTE_WORKING_DIR__'])
else:
subdir = aux.recursive_get(d, key_path)
d['__WORKING_DIR__'] = os.path.join(d['__WORKING_DIR__'], subdir)
if '__REMOTE_WORKING_DIR__' in d:
d['__REMOTE_WORKING_DIR__'] = posixpath.join(d['__REMOTE_WORKING_DIR__'], subdir)
return _cd
def make_dump(dump_name_format, format_keys=(), omit=None):
def _dump(d):
format_params = [aux.recursive_get(d, key) for key in format_keys]
with open(os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params)), 'wb') as f:
if omit is None:
dumped_d = d
else:
dumped_d = {key: val for key, val in d.items() if not key in omit}
pickle.dump(dumped_d, f)
return _dump
def make_composite_func(*funcs):
def _composite(d):
for func in funcs:
func(d)
return _composite
def create_local_data_from_global_data(global_data, keys_mapping):
if keys_mapping is None:
return global_data
elif aux.is_sequence(keys_mapping):
return aux.recursive_get(global_data, keys_mapping)
else:
return {local_key: aux.recursive_get(global_data, global_key) for local_key, global_key in keys_mapping.items()}
def update_global_data_according_to_local_data(local_data, global_data, keys_mapping):
if keys_mapping is None:
global_data.update(local_data)
elif aux.is_sequence(keys_mapping):
relative_data = aux.recursive_get(global_data, keys_mapping)
relative_data.update(local_data)
else:
for local_key, global_key in keys_mapping.items():
recursive_set(global_data, global_key, local_data[local_key])
def build_args_line(data, keyword_names, flag_names, trailing_args_keys):
args_str = ''
for keyword in keyword_names:
if keyword in data:
args_str += '-{} {} '.format(keyword, data[keyword])
for flag in flag_names:
if flag in data and data[flag]:
args_str += '-{} '.format(flag)
for place_i, trailing_arg_key in enumerate(trailing_args_keys):
# if we have a sequence under the key, we expand it
if trailing_arg_key in data:
trailing_arg = data[trailing_arg_key]
args_str += ' '.join(map(str, trailing_arg)) if aux.is_sequence(trailing_arg) else trailing_arg
args_str += ' '
return args_str
def render_sge_template(sge_template_name, sge_script_path, cores, time, commands):
sge_templ_path = os.path.join(aux.get_templates_path(), sge_template_name)
if not os.path.exists(sge_templ_path): # by default, templates are in templates/, but here we let the user put any path
sge_templ_path = sge_template_name
f = open(sge_templ_path, 'r')
rendered_data = Template(f.read()).render(cores=cores, time=time, commands=commands)
sge_script_file = aux.create_file_mkdir(sge_script_path)
sge_script_file.write(rendered_data)
import os
import subprocess
class BaseEnvironment(object):
def __init__(self):
self._programs = {}
def preprocess(self, working_dir, input_copies_list):
raise NotImplementedError()
def execute(self, working_dir, prog_name, command_line):
raise NotImplementedError()
def postprocess(self, working_dir, output_copies_list):
raise NotImplementedError()
def add_program(self, prog_name, path_to_prog):
self._programs[prog_name] = path_to_prog
# def _print_copy_msg(self, from_, to_):
# print('\tCopying %s to %s' % (from_, to_))
#
# def _print_exec_msg(self, cmd, is_remote):
# where = '@' + self._machine_name if is_remote else ''
# print('\tExecuting %s: %s' % (where, cmd))
class LocalEnvironment(BaseEnvironment):
def __init__(self):
super().__init__()
def preprocess(self, working_dir, input_copies_list):
for copy_target in input_copies_list:
_copy(self, copy_target, working_dir)
def execute(self, working_dir, prog_name, args_str):
prog_path = os.path.join(self._programs[prog_name], prog_name)
command_line = 'cd {}; {} {}'.format(working_dir, prog_path, args_str)
# use PIPEs to avoid breaking the child process when the parent process finishes
# (works on Linux, solution for Windows is to add creationflags=0x00000010 instead of stdout, stderr, stdin)
# self._print_exec_msg(command_line, is_remote=False)
#pid = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
#print(pid)
subprocess.call([command_line], shell=True)
def postprocess(self, working_dir, output_copies_list):
pass
def _copy(self, from_, to_, mode='from_local'):
"""Any mode is ignored since the copying shall be within a local machine anyway
"""
cp(from_, to_)
self._print_copy_msg(from_, to_)
def rm(self, target):
rm(target)
import collections
import os
from enum import Enum, auto
from functools import partial
import comsdk.aux as aux
ImplicitParallelizationInfo = collections.namedtuple('ImplicitParallelizationInfo', ['array_keys_mapping', 'branches_number', 'branch_i'])
class Morphism:
def __init__(self, edge, output_state):
self.edge = edge
self.output_state = output_state
def morph(self, data, dynamic_keys_mapping={}):
#print(dynamic_keys_mapping)
self.edge.morph(data, dynamic_keys_mapping)
#return self.output_state, None
return self.output_state
class IdleRunType(Enum):
INIT = auto()
CLEANUP = auto()
class GraphFactory:
def __init__(self):
pass
def create_state():
pass
def create_edge():
# Here we should somehow pass the argument for "special" edges
# Essentially, we change only io_mapping
pass
def make_graph():
pass
class PluralGraphFactory:
def __init__(self, plural_keys_mappings, parallel_graphs_number):
self.plural_keys_mappings = plural_keys_mappings
self.parallel_graphs_number = parallel_graphs_number
self.init_state = None
def create_state(state):
if self.init_state == None:
self.init_state = state
def create_edge():
# Here we should somehow pass the argument for "special" edges
# Essentially, we change only io_mapping
pass
def make_graph():
pass
class PluralState:
def __init__(self, states):
self.states = states
pass
def connect_to(self, term_states, edge):
for init_state, term_state in zip(self.states, term_states):
init_state.output_morphisms.append(Morphism(edge, term_state))
class Graph:
def __init__(self, init_state):
self._init_state = init_state
'''
Class describing a graph-based computational method. Graph execution must start from this object.
'''
def __init__(self, init_state,
term_state=None,
):
self.init_state = init_state
self.term_state = term_state
if self.term_state is not None:
self.term_state.is_term_state = True
self._initialized = False
def run(self, data):
if not self._initialized:
self._init_state.idle_run([self._init_state.name], initialize_state=True)
self._initialized = True
cur_state = self._init_state
'''
Goes through the graph and returns boolean denoting whether the graph has finished successfully.
It runs twice -- the first run is idle (needed for initialization) and the second run is real.
The input data will be augmented by metadata:
1) '__CURRENT_WORKING_DIR__' -- absolute path to the current working directory as defined by the OS
2) '__WORKING_DIR__' -- absolute path to the directory from which external binaries or resources will be launched.
It will be set only if it is not yet set in data
3) '__EXCEPTION__' if any error occurs
'''
self.init_graph(data)
cur_state = self.init_state
implicit_parallelization_info = None
while cur_state is not None:
morph = _run_state(cur_state, data)
# print('1) In main loop', implicit_parallelization_info)
# morph = _run_state(cur_state, data, implicit_parallelization_info)
morph, implicit_parallelization_info = _run_state(cur_state, data, implicit_parallelization_info)
# print('2) In main loop', implicit_parallelization_info)
if '__EXCEPTION__' in data:
return data, False
return False
# cur_state, implicit_parallelization_info = morph(data)
cur_state = morph(data)
# print(morph)
if '__EXCEPTION__' in data:
return data, False
return data, True
return False
return True
def init_graph(self, data):
if not self._initialized:
self.init_state.idle_run(IdleRunType.INIT, [self.init_state.name])
self._initialized = True
else:
self.init_state.idle_run(IdleRunType.CLEANUP, [self.init_state.name])
data['__CURRENT_WORKING_DIR__'] = os.getcwd()
if not '__WORKING_DIR__' in data:
data['__WORKING_DIR__'] = data['__CURRENT_WORKING_DIR__']
class State:
__slots__ = [
......@@ -25,105 +120,150 @@ class State:
'input_edges_number',
'looped_edges_number',
'activated_input_edges_number',
'output_edges',
'output_morphisms',
'parallelization_policy',
'parallel_branches_selection_policy',
'selection_policy',
'is_term_state',
'array_keys_mapping',
'_branching_states_history',
'_proxy_state',
]
def __init__(self, name,
parallelization_policy=None,
parallel_branches_selection_policy=None,
selection_policy=None,
array_keys_mapping=None, # if array_keys_mapping is not None, we have implicit parallelization in this state
):
self.name = name
self.parallelization_policy = parallelization_policy
self.parallel_branches_selection_policy = parallel_branches_selection_policy
self.parallelization_policy = SerialParallelizationPolicy() if parallelization_policy is None else parallelization_policy
self.selection_policy = OnlyOneSelectionPolicy() if selection_policy is None else selection_policy
self.array_keys_mapping = array_keys_mapping
self.input_edges_number = 0
self.looped_edges_number = 0
self.activated_input_edges_number = 0
self.output_edges = []
self.output_morphisms = []
self.is_term_state=False
self._branching_states_history = None
self._proxy_state=None
def idle_run(self, branching_states_history, initialize_state=False):
def idle_run(self, idle_run_type, branching_states_history):
if self._proxy_state is not None:
return self._proxy_state.idle_run(idle_run_type, branching_states_history)
# print('{} {} -> '.format(self.name, branching_states_history), end='')
if initialize_state:
if idle_run_type == IdleRunType.INIT:
self.input_edges_number += 1
if self.input_edges_number != 1:
if self._is_looped_branch(branching_states_history):
# print('Looping found')
self.looped_edges_number += 1
# else:
# print('Branches joint found')
# print('\tStop going further')
return # no need to go further if we already were there
if self._branching_states_history is None:
self._branching_states_history = branching_states_history
elif idle_run_type == IdleRunType.CLEANUP:
self.activated_input_edges_number = 0
# print('\tCLEANUP STATE {}, active: {}, branches_story: {}'.format(self.name, self.activated_input_edges_number, self._branching_states_history))
if self._branching_states_history is not None and self._is_looped_branch(branching_states_history):
# print('\tqwer')
self._branching_states_history = None
return
if self._branching_states_history is None:
self._branching_states_history = branching_states_history
else:
self.activated_input_edges_number += 1 # BUG: here we need to choose somehow whether we proceed or not
# if len(self.output_edges) == 0:
# print('Terminate state found')
if len(self.output_edges) == 1:
self.output_edges[0].identity().idle_run(branching_states_history, initialize_state)
if len(self.output_morphisms) == 1:
self.output_morphisms[0].output_state.idle_run(idle_run_type, branching_states_history)
else:
for i, edge in enumerate(self.output_edges):
next_state = edge.identity()
next_state.idle_run(branching_states_history + [next_state.name], initialize_state)
for i, morphism in enumerate(self.output_morphisms):
next_state = morphism.output_state
next_state.idle_run(idle_run_type, branching_states_history + [next_state.name])
def connect_to(self, term_state, edge):
edge.set_output_state(term_state)
self.output_edges.append(edge)
self.output_morphisms.append(Morphism(edge, term_state))
# edge.set_output_state(term_state)
# self.output_edges.append(edge)
def run(self, data):
# print(self.name, data['a'])
self.activated_input_edges_number += 1
if not self._ready_to_morph():
return None # it means that this state waits for some incoming edges (it is a point of collision of several edges)
self._reset_activity()
if len(self.output_edges) == 0:
return morphism_to_termination
def replace_with_graph(self, graph):
self._proxy_state = graph.init_state
graph.term_state.output_morphisms = self.output_morphisms
def run(self, data, implicit_parallelization_info=None):
print('STATE {}, just entered, implicit_parallelization_info: {}'.format(self.name, implicit_parallelization_info))
if self._proxy_state is not None:
return self._proxy_state.run(data, implicit_parallelization_info)
self._activate_input_edge(implicit_parallelization_info)
#self.activated_input_edges_number += 1
print('STATE {}, required input: {}, active: {}, looped: {}'.format(self.name, self.input_edges_number, self.activated_input_edges_number, self.looped_edges_number))
# print('qwer')
if not self._ready_to_morph(implicit_parallelization_info):
return None, None # it means that this state waits for some incoming edges (it is a point of collision of several edges)
self._reset_activity(implicit_parallelization_info)
if self.is_term_state:
implicit_parallelization_info = None
#print(self.name)
if len(self.output_morphisms) == 0:
return morphism_to_termination, None
predicate_values = []
for edge in self.output_edges:
predicate_values.append(edge.predicate(data))
selected_edge_indices = self.parallel_branches_selection_policy.select(predicate_values)
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
for morphism in self.output_morphisms:
predicate_values.append(morphism.edge.predicate(data, dynamic_keys_mapping))
selected_edge_indices = self.selection_policy.select(predicate_values)
if not selected_edge_indices:
raise GraphUnexpectedTermination(
'State {}: Predicate values {} do not conform selection policy'.format(self.name, predicate_values))
selected_edges = [self.output_edges[i] for i in selected_edge_indices]
return self.parallelization_policy.make_morphism(selected_edges)
selected_morphisms = [self.output_morphisms[i] for i in selected_edge_indices]
return self.parallelization_policy.make_morphism(selected_morphisms,
array_keys_mapping=self.array_keys_mapping,
implicit_parallelization_info=implicit_parallelization_info,), \
implicit_parallelization_info
def _ready_to_morph(self):
required_activated_input_edges_number = self.input_edges_number - self.looped_edges_number
#print(self.input_edges_number, self.looped_edges_number)
return self.activated_input_edges_number == required_activated_input_edges_number
# return self.parallelization_policy.make_morphism(selected_morphisms,
# array_keys_mapping=self.array_keys_mapping,
# implicit_parallelization_info=implicit_parallelization_info,)
def _reset_activity(self):
self.activated_input_edges_number = 0
def _is_looped_branch(self, branching_states_history):
return set(self._branching_states_history).issubset(branching_states_history)
def _activate_input_edge(self, implicit_parallelization_info=None):
if implicit_parallelization_info is None or self.is_term_state:
self.activated_input_edges_number += 1
else:
if isinstance(self.activated_input_edges_number, int):
self.activated_input_edges_number = [0 for i in range(implicit_parallelization_info.branches_number)]
self.activated_input_edges_number[implicit_parallelization_info.branch_i] += 1
class Edge:
__slots__ = [
'_predicate',
'_morphism',
'_output_state',
]
def __init__(self, predicate, morphism, output_state=None):
self._predicate = predicate
self._morphism = morphism
self._output_state = output_state
def _ready_to_morph(self, implicit_parallelization_info=None):
required_activated_input_edges_number = self.input_edges_number - self.looped_edges_number
if implicit_parallelization_info is not None:
if self.is_term_state:
required_activated_input_edges_number = implicit_parallelization_info.branches_number
return self.activated_input_edges_number == required_activated_input_edges_number
return self.activated_input_edges_number[implicit_parallelization_info.branch_i] == required_activated_input_edges_number
else:
return self.activated_input_edges_number == required_activated_input_edges_number
def set_output_state(self, output_state):
self._output_state = output_state
# if implicit_parallelization_info is None or self.is_term_state:
# if self.is_term_state:
# required_activated_input_edges_number = implicit_parallelization_info.branches_number
# return self.activated_input_edges_number == required_activated_input_edges_number
# else:
# return self.activated_input_edges_number[implicit_parallelization_info.branch_i] == required_activated_input_edges_number
def predicate(self, data):
return self._predicate(data)
def _reset_activity(self, implicit_parallelization_info=None):
self._branching_states_history = None
if self._ready_to_morph(implicit_parallelization_info) and self._has_loop():
if implicit_parallelization_info is None or self.is_term_state:
self.activated_input_edges_number -= 1
else:
self.activated_input_edges_number[implicit_parallelization_info.branch_i] -= 1
else:
# self.activated_input_edges_number = 0
if implicit_parallelization_info is None or self.is_term_state:
self.activated_input_edges_number = 0
else:
self.activated_input_edges_number[implicit_parallelization_info.branch_i] = 0
def morph(self, data):
self._morphism(data)
return self._output_state
def _is_looped_branch(self, branching_states_history):
return set(self._branching_states_history).issubset(branching_states_history)
def identity(self):
return self._output_state
def _has_loop(self):
return self.looped_edges_number != 0
def morphism_to_termination(data):
return None
......@@ -134,26 +274,108 @@ class SerialParallelizationPolicy:
def __init__(self):
pass
def make_morphism(self, edges):
# def make_morphism(self, morphisms, array_keys_mapping=None, implicit_parallelization_info=None):
# def _morph(data):
# if array_keys_mapping is None:
# dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
# next_morphs = [partial(morphism.morph, dynamic_keys_mapping=dynamic_keys_mapping) for morphism in morphisms]
# next_impl_para_infos = [implicit_parallelization_info for _ in morphisms]
# # print('\t\t {}'.format(implicit_parallelization_infos))
# else:
# if len(morphisms) != 1:
# raise BadGraphStructure('Impossible to create implicit paralleilzation in the state with {} output edges'.format(len(morphisms)))
# dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
# proxy_data = aux.ProxyDict(data, keys_mappings=array_keys_mapping)
# anykey = next(iter(array_keys_mapping.keys()))
# implicit_branches_number = len(proxy_data[anykey])
# next_morphs = []
# next_impl_para_infos = []
# for branch_i in range(implicit_branches_number):
# implicit_parallelization_info_ = ImplicitParallelizationInfo(array_keys_mapping, implicit_branches_number, branch_i)
# dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info_)
# next_morphs.append(partial(morphisms[0].morph, dynamic_keys_mapping=dynamic_keys_mapping))
# next_impl_para_infos.append(implicit_parallelization_info_)
# cur_morphs = []
# cur_impl_para_infos = []
# while len(next_morphs) != 1:
# cur_morphs[:] = next_morphs[:]
# cur_impl_para_infos[:] = next_impl_para_infos[:]
# del next_morphs[:]
# del next_impl_para_infos[:]
# # WE DO NOT UPDATE implicit_parallelization_infos !!!
# for morph, impl_para_info in zip(cur_morphs, cur_impl_para_infos):
# next_state, _ = morph(data)
## print('\t next_state: {}, with impl para info: {}'.format(next_state.name, impl_para_info))
# if next_state is None:
# return None, None
# next_morph = _run_state(next_state, data, impl_para_info)
## print('\t next_morph: {}'.format(next_morph))
# if '__EXCEPTION__' in data:
# return None, None
# if next_morph is not None:
# next_morphs.append(next_morph)
# next_impl_para_infos.append(impl_para_info)
# #print(len(next_morphs))
## print('\t last morph: {}'.format(next_morphs[0]))
# next_state, _ = next_morphs[0](data)
# print(next_state.name, next_impl_para_infos[0])
# return next_state, next_impl_para_infos[0]
# return _morph
def make_morphism(self, morphisms, array_keys_mapping=None, implicit_parallelization_info=None):
def _morph(data):
next_morphisms = [edge.morph for edge in edges]
cur_morphisms = []
while len(next_morphisms) != 1:
cur_morphisms[:] = next_morphisms[:]
del next_morphisms[:]
for morph in cur_morphisms:
if array_keys_mapping is None:
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
next_morphs = [partial(morphism.morph, dynamic_keys_mapping=dynamic_keys_mapping) for morphism in morphisms]
next_impl_para_infos = [implicit_parallelization_info for _ in morphisms]
# print('\t\t {}'.format(implicit_parallelization_infos))
else:
if len(morphisms) != 1:
raise BadGraphStructure('Impossible to create implicit paralleilzation in the state with {} output edges'.format(len(morphisms)))
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info)
proxy_data = aux.ProxyDict(data, keys_mappings=array_keys_mapping)
anykey = next(iter(array_keys_mapping.keys()))
implicit_branches_number = len(proxy_data[anykey])
next_morphs = []
next_impl_para_infos = []
for branch_i in range(implicit_branches_number):
implicit_parallelization_info_ = ImplicitParallelizationInfo(array_keys_mapping, implicit_branches_number, branch_i)
dynamic_keys_mapping = build_dynamic_keys_mapping(implicit_parallelization_info_)
# print(dynamic_keys_mapping)
next_morphs.append(partial(morphisms[0].morph, dynamic_keys_mapping=dynamic_keys_mapping))
next_impl_para_infos.append(implicit_parallelization_info_)
cur_morphs = []
cur_impl_para_infos = []
#while len(next_morphs) != 1 or _is_implicitly_parallelized(next_impl_para_infos):
while len(next_morphs) != 1 or _requires_joint_of_implicit_parallelization(array_keys_mapping, next_impl_para_infos):
if next_impl_para_infos == []:
raise Exception(str(len(next_morphs)))
# print(array_keys_mapping, next_impl_para_infos)
cur_morphs[:] = next_morphs[:]
cur_impl_para_infos[:] = next_impl_para_infos[:]
del next_morphs[:]
del next_impl_para_infos[:]
for morph, impl_para_info in zip(cur_morphs, cur_impl_para_infos):
next_state = morph(data)
# print('\t next_state: {}, with impl para info: {}'.format(next_state.name, impl_para_info))
if next_state is None:
return None
next_morphism = _run_state(next_state, data)
next_morph, next_impl_para_info = _run_state(next_state, data, impl_para_info)
# print('\t next_morph: {}'.format(next_morph))
if '__EXCEPTION__' in data:
return None
if next_morphism is not None:
next_morphisms.append(next_morphism)
next_state = next_morphisms[0](data)
if next_morph is not None:
next_morphs.append(next_morph)
next_impl_para_infos.append(next_impl_para_info)
# print(array_keys_mapping, next_impl_para_infos)
#print(len(next_morphs))
# print('\t last morph: {}'.format(next_morphs[0]))
next_state = next_morphs[0](data)
# print(next_state.name, next_impl_para_infos[0])
return next_state
return _morph
class OnlyOneSelectionPolicy:
def __init__(self):
pass
......@@ -180,13 +402,38 @@ class BadGraphStructure(Exception):
class GraphUnexpectedTermination(Exception):
pass
def _requires_joint_of_implicit_parallelization(array_keys_mapping, impl_para_infos):
if array_keys_mapping is None:
return False
for obj in impl_para_infos:
if obj is not None:
return True
return False
def _get_trues(boolean_list):
return [i for i, val in enumerate(boolean_list) if val == True]
def _run_state(state, data):
#def _run_state(state, data, implicit_parallelization_info=None):
# try:
# next_morphism = state.run(data, implicit_parallelization_info)
# except GraphUnexpectedTermination as e:
# data['__EXCEPTION__'] = str(e)
# return None
# return next_morphism
def _run_state(state, data, implicit_parallelization_info=None):
try:
next_morphism = state.run(data)
next_morphism, next_impl_para_info = state.run(data, implicit_parallelization_info)
except GraphUnexpectedTermination as e:
data['__EXCEPTION__'] = str(e)
return None
return next_morphism
\ No newline at end of file
return None, None
return next_morphism, next_impl_para_info
def build_dynamic_keys_mapping(implicit_parallelization_info=None):
if implicit_parallelization_info is None:
return {}
dynamic_keys_mapping = {}
for key, keys_path in implicit_parallelization_info.array_keys_mapping.items():
dynamic_keys_mapping[key] = aux.ArrayItemGetter(keys_path, implicit_parallelization_info.branch_i)
return dynamic_keys_mapping
\ No newline at end of file
import os
import pickle
import shutil
from datetime import datetime, date
from comsdk.aux import *
from comsdk.communication import *
from comsdk.distributed_storage import *
from comsdk.edge import Edge, dummy_predicate
# Create RESEARCH-ID. It is a small research which should link different local directories (with reports and time-integration) and ssh directories (with continuation, for example)
# What is included in RESEARCH?
# 1) It should be multistep. So we can always continue RESEARCH if we want
# 2) Everything should be dated and timed!
# 3) For every research we should create local directory having format @date@_@name@. Inside we should have directory "report" and file research.log where all stuff is stored
# If there are some remote calculations, then there should be "remote" directory where all that stuff is organized. Another directory is "local" which contains local results
# Somehow we should save python script reproducing the results (think about it).
# 4)
# Local directory hierarchy:
# ROOT for us is the directory where research.py was launch (as it is a part of postproc package, postproc.research is rather used)
# ROOT/RESEARCH_REL_DIR is main directory
# ROOT/RESEARCH_REL_DIR/research.log
# ROOT/RESEARCH_REL_DIR/report
# ROOT/RESEARCH_REL_DIR/1-some_task
# ROOT/RESEARCH_REL_DIR/2-another_task
# ROOT/RESEARCH_REL_DIR/3-one_more_task
# As we can have multiple remotes, a remote root directory should be set somewhere out of Research.
# One possible solution is a factory for Task.
# Anyway, we create directory hierarchy relative to REMOTE_ROOT as follows:
# REMOTE_ROOT/RESEARCH_REL_DIR/1-some_task
# REMOTE_ROOT/RESEARCH_REL_DIR/2-another_task
# REMOTE_ROOT/RESEARCH_REL_DIR/3-one_more_task
# Some usual cases:
# 1) Remote calculation on some cluster.
# 1.1 COPY-EXECUTE TASK
# 1.1.1 copy input files from local to remote
# 1.1.2 copy program from remote to remote
# 1.1.3 execute program
# 1.2 wait for finishing (at this moment, we just have to wait and check)
# 1.3 COPY-TASK
# 1.3.1 copy results from remote to local
# 1.4 as result, we will have results directly in the task directory
class Research:
"""Research is the main class for interacting with the hierarchy of tasks.
It allows to:
(1) find location of a task by its number
(2) store/load object into/from a task's dir
(3) create new tasks by launching TaskExecution locally or on remotes
(4) launch TaskExecution in already existing task's directory
(5) grab task's content from remotes
The main idea behind Research is that we collect tasks in the research's dir and make
them enumerated. Each task is completely identified by its number. Its content, in turn,
is located in the task's dir which is made up by concatenation of the number and the task's
name that must be specified by a user. Next, we distinguish two logical places where tasks
are collected: local one and remote one. Both places are distributed, but in a different
manner: the local place is distributed over the locally accesible space on the current machine
(say, some tasks may be located in dir path/to/storage and another tasks in another_path/to/storage)
whereas the remote place is distributed over different machines. For the local place, we use DistributedStorage
class to access the data. The remote place is accessed by setting a necessary SshCommunication in the Research
class constructor. To avoid the mess in directories, we assume the following rules:
(1) intersection between tasks' dirs of the same research dir on different location on the local machine is empty
(2) all tasks are presented locally (in any location defined in DistributedStorage) at least as empty dirs
(3) intersection between tasks' dirs of the same research dir on different remotes is empty
(4) union of them
Therefore, all tasks located remotely must map to the local machine (but it is not true for an opposite case!).
Hence, when the task intended to be executed on a remote is created, we create both a remote directory and a local
directory for this task. To be consistent in creating tasks locally, we choose a main (master) directory
in DistributedStorage in which we create by default tasks directories whereas other local directories are assumed
to be for storing purposes.
The instance of Research is created by passing to it BaseCommunication object. If the instance is intended to launch
tasks on remotes or grab tasks from remotes, then the corresponding BaseCommunication for interaction must be passed.
By default, LocalCommunication is used and the remote interaction is thus disabled.
"""
def __init__(self, name,
continuing=False,
local_research_path=None,
remote_comm=None,
remote_research_path=None,
):
# Always create local communication here
# Remote communication is optional then
self.local_research_path = local_research_path
self.local_main_path = os.path.dirname(local_research_path)
self.remote_research_path = remote_research_path
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self._remote_comm = remote_comm
# NOTE: at the moment we do not need features of the distributed storage
#self._distr_storage = DistributedStorage((rset.LOCAL_HOST['main_research_path'], rset.LOCAL_HOST['storage_research_path']), prior_storage_index=1)
self._distr_storage = DistributedStorage((self.local_research_path,), prior_storage_index=0)
suitable_name = self._make_suitable_name(name)
if not continuing:
# interpret name as name without date
self._research_id = str(date.today()) + '_' + suitable_name
if self._distr_storage.get_dir_path(self._research_id) is not None:
raise ResearchAlreadyExists("Research with name '{}' already exists, choose another name".format(self._research_id))
self.research_path = self._distr_storage.make_dir(self._research_id)
print('Started new research at {}'.format(self.research_path))
else:
# interpret name as the full research id
self._research_id = suitable_name
self.research_path = self._load_research_data()
@classmethod
def start_research(cls, name,
local_research_path=None,
remote_comm=None,
remote_research_path=None,
):
return Research(name,
local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path)
@classmethod
def continue_research(cls, name,
local_research_path=None,
remote_comm=None,
remote_research_path=None,
):
return Research(name,
continuing=True, local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path)
@classmethod
def create_from_config(cls, research_sid, remote_comm=None):
with open('config_research.json', 'r') as f:
conf = json.load(f)
res = Research(conf['RESEARCH'][research_sid],
continuing=True,
local_research_path=conf['LOCAL_HOST']['research_path'],
remote_comm=remote_comm,
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path'])
res._add_properties(conf['RESEARCH_PROPS'])
return res
def __getstate__(self):
return {
'research_id': self._research_id,
'local_research_path': self.local_research_path,
'remote_research_path': self.remote_research_path,
'remote_comm': self._remote_comm.__getstate__(),
}
def __setstate__(self, state):
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self.local_research_path = state['local_research_path']
self.remote_research_path = state['remote_research_path']
self._remote_comm = None
if state['remote_comm'] is not None:
self._remote_comm = SshCommunication.__new__(SshCommunication)
self._remote_comm.__setstate__(state['remote_comm'])
self._distr_storage = DistributedStorage((self.local_research_path,), prior_storage_index=0)
self._research_id = state['research_id']
self.research_path = self._load_research_data()
def _add_properties(self, props):
for prop_name, prop_value in props.items():
self.__setattr__(prop_name, prop_value)
def _load_research_data(self):
# find corresponding date/name
# construct object from all data inside
research_path = self._distr_storage.get_dir_path(self._research_id)
if research_path is None:
# assume date was omitted in research id
research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_id))
if dir_params is None:
raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_id))
self._research_id = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'], self._research_id)
print('Loaded research at {}'.format(research_path))
# determine maximum task number to set the number for the next possible task
dirnames, _ = self._distr_storage.listdir(self._research_id)
self._tasks_number = 0
for dir_ in dirnames:
if dir_ != 'report':
task_number, _ = self._split_task_dir(dir_)
if task_number > self._tasks_number:
self._tasks_number = task_number
self._tasks_number += 1
print('Number of tasks in the current research: {}'.format(self._tasks_number))
return research_path
def create_task(self, name):
'''
Creates a new task, copies necessary data and executes the command line
'''
task_number = self._get_next_task_number()
local_task_dir = self._make_task_path(task_number, name)
os.mkdir(local_task_dir)
return task_number
def grab_task_results(self, task_number, copies_list=[]):
'''
Moves task content from the remote to the local. Locally, the task content will appear in the task
dir located in the main research location.
'''
task_results_local_path = self.get_task_path(task_number)
task_results_remote_path = self.get_task_path(task_number, self._exec_comm.host)
if len(copies_list) == 0: # copy all data
pathes = self._exec_comm.listdir(task_results_remote_path)
for file_or_dir in pathes:
self._exec_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path, 'from_remote')
else:
for copy_target in copies_list:
remote_copy_target_path = '/'.join((task_results_remote_path, copy_target['path'])) # we consider copy targets as relative to task's dir
self._exec_comm.copy(remote_copy_target_path, task_results_local_path, 'from_remote')
if 'new_name' in copy_target:
os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])), \
os.path.join(task_results_local_path, copy_target['new_name']))
def _make_task_path(self, task_number, task_name, execution_host=None):
task_path = ''
rel_task_dir = os.path.join(self._research_id, self._get_task_full_name(task_number, task_name))
if execution_host is None:
task_path = os.path.join(self.local_research_path, rel_task_dir)
else:
task_path = os.path.join(execution_host.research_abs_path, rel_task_dir)
return task_path
def get_task_path(self, task_number, at_remote_host=False):
'''
Returns the task dir corresponding to task_number. By default, the local dir (from DistrubutedStorage) is returned.
If execution_host is specified, then the remote dir will be returned.
'''
task_path = ''
task_name = self._get_task_name_by_number(task_number)
rel_task_dir = os.path.join(self._research_id, self._get_task_full_name(task_number, task_name))
if at_remote_host:
task_path = '{}/{}'.format(self.remote_research_path, rel_task_dir)
else:
task_path = self._distr_storage.get_dir_path(rel_task_dir)
return task_path
def dump_object(self, task_number, obj, obj_name):
'''
Dumps obj into the file whose name is obj_name + '.pyo' and locates it into the task dir corresponding to
task_number
'''
print('Dumping ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'w')
pickle.dump(obj, f)
f.close()
def load_object(self, task_number, obj_name):
'''
Load an object dumped into the file whose name is obj_name + '.pyo' and which is located it into the task dir
corresponding to task_number
'''
print('Loading ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'r')
obj = pickle.load(f)
f.close()
return obj
def _get_next_task_number(self):
self._tasks_number += 1
return self._tasks_number - 1
def _get_task_full_name(self, task_number, task_name):
return str(task_number) + '-' + self._make_suitable_name(task_name)
def _get_task_name_by_number(self, task_number):
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id, '^{}-(?P<task_name>\S+)'.format(task_number))
if find_data is None:
raise Exception("No task with number '{}' is found".format(task_number))
return find_data[1]['task_name']
def _split_task_dir(self, task_dir):
parsing_params = parse_by_named_regexp('^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
if parsing_params is None:
raise Exception("No task directory '{}' is found".format(task_dir))
return int(parsing_params['task_number']), parsing_params['task_name']
def _make_suitable_name(self, name):
return '_'.join(name.split())
class ResearchAlreadyExists(Exception):
pass
class ResearchDoesNotExist(Exception):
pass
def get_all_research_ids():
return os.listdir('.' + self.local_research_path)
def retrieve_trailing_float_from_task_dir(task_dir):
matching = re.search('^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)', task_dir)
if matching is None:
raise Exception('Incorrect task directory is given')
return float('{}.{}'.format(matching.group('float_left'), matching.group('float_right')))
class CreateTaskEdge(Edge):
def __init__(self, res, task_name_maker, predicate=dummy_predicate):
self._res = res
self._task_name_maker = task_name_maker
super().__init__(predicate, self.execute)
def execute(self, data):
task_name = self._task_name_maker(data)
task_number = self._res.create_task(task_name)
data['__WORKING_DIR__'] = self._res.get_task_path(task_number)
data['__REMOTE_WORKING_DIR__'] = self._res.get_task_path(task_number, at_remote_host=True)
{
"LOCAL_HOST": {
"research_path": "...",
"custom_programs": {
"@path_to_binaries@": ["@bin1@", "@bin2@", ...],
...
}
},
"REMOTE_HOSTS": {
"@remote_host_sid@": {
"ssh_host": "...",
"max_cores": ...,
"username": "...",
"password": "...",
"research_path": "...",
"env_programs": ["@bin1@", "@bin1@", ...],
"custom_programs": {
"@path_to_binaries@": ["@bin1@", "@bin2@", ...],
...
},
"sge_template_name": "...",
"job_setter": "...",
"job_finished_checker": "..."
},
...
},
"RESEARCH": {
"@research_sid@": "@research_full_name@",
...
},
"RESEARCH_PROPS": {
...
}
}
\ No newline at end of file
#include<fstream>
using namespace std;
int main(int argc, char* argv[])
{
string input_file_path(argv[1]);
string output_file_path("b.dat");
ifstream f_in(input_file_path);
int x;
f_in >> x;
ofstream f_out(output_file_path);
f_out << x*x;
return 0;
}
#$ -cwd -V
#$ -l h_rt=12:00:00
#$ -pe smp 12
/home/home01/mmap/tests/square/square /home/home01/mmap/tests/square_test_dir/a.dat
#$ -cwd -V
#$ -l h_rt=12:00:00
#$ -pe smp 12
./findsoln -symms reflect_symmetry.asc -R 170.320 -o find-170.320 -es 1e-15 -eqb find-170.330/ubest.h5
qsub fe_170.315.sh
import unittest
from copy import deepcopy
import subprocess
import os
import random
from resappserver.graph import *
from comsdk.graph import *
from comsdk.edge import *
from comsdk.communication import *
def dummy_edge(data):
pass
......@@ -8,14 +14,26 @@ def dummy_edge(data):
def increment_a_edge(data):
data['a'] += 1
def increment_a_array_edge(data):
for i in range(len(data['a'])):
data['a'][i] += 1
def increment_b_edge(data):
data['b'] += 1
def decrement_a_edge(data):
data['a'] -= 1
def dummy_predicate(data):
return True
def write_a_edge(data):
a_filename = os.path.join(data['__CURRENT_WORKING_DIR__'], 'tests/square_test_dir/input/a.dat')
with open(a_filename, 'w') as f:
f.write(str(data['a']))
data['a_file'] = a_filename
def load_b_edge(data):
b_filename = os.path.join(data['__WORKING_DIR__'], data['b_file'])
with open(b_filename, 'r') as f:
data['b'] = int(f.read())
def nonzero_predicate(data):
return True if data['a'] != 0 else False
......@@ -30,31 +48,167 @@ def print_exception(exc_data, data):
print('exception data: {}'.format(exc_data))
print('current state of data: {}'.format(data))
def print_stdout(data, stdout_lines):
# print(stdout)
return {}
def check_task_finished(data, stdout_lines):
'''
Example:
job-ID prior name user state submit/start at queue slots ja-task-ID
-----------------------------------------------------------------------------------------------------------------
663565 0.00053 RT700-tran scegr r 09/19/2018 23:51:22 24core-128G.q@dc2s2b1a.arc3.le 24
663566 0.00053 RT800-tran scegr r 09/19/2018 23:51:22 24core-128G.q@dc3s5b1a.arc3.le 24
663567 0.00053 RT900-tran scegr r 09/20/2018 00:00:22 24core-128G.q@dc4s2b1b.arc3.le 24
663569 0.00053 RT1000-tra scegr r 09/20/2018 00:05:07 24core-128G.q@dc1s1b3d.arc3.le 24
'''
job_finished = True
for line in stdout_lines[2:]:
items = line.split()
if int(items[0]) == data['job_ID']:
job_finished = False
return {'job_finished': job_finished}
def set_job_id(data, stdout_lines):
return {'job_ID': int(stdout_lines[0].split()[2])} # example: 'Your job 664989 ("fe_170.310.sh") has been submitted'
def _create_data_from_dict(d):
data = deepcopy(d)
data['__CURRENT_WORKING_DIR__'] = os.getcwd()
if not '__WORKING_DIR__' in data:
data['__WORKING_DIR__'] = data['__CURRENT_WORKING_DIR__']
return data
class GraphGoodCheck(unittest.TestCase):
initial_conditions = range(-10, 10)
@classmethod
def setUpClass(cls):
command_line = 'cd tests/square; g++ square.cpp -o square'
subprocess.call([command_line], shell=True)
local_host = Host()
local_host.add_program('square', os.path.join(os.getcwd(), 'tests', 'square'))
cls.local_comm = LocalCommunication(local_host)
cls.ssh_host = 'arc3.leeds.ac.uk'
cls.ssh_cores = 24
cls.ssh_user = 'mmap'
cls.ssh_pswd = '1bdwbzsc'
cls.ssh_path_to_tests = '/home/home01/mmap/tests'
remote_host = RemoteHost(ssh_host='arc3.leeds.ac.uk',
cores=24,
)
remote_host.add_program('square', '{}/square'.format(cls.ssh_path_to_tests))
remote_host.add_program('qsub')
remote_host.add_program('qstat')
cls.ssh_comm = SshCommunication(remote_host,
username=cls.ssh_user,
password=cls.ssh_pswd,
)
cls.ssh_comm.mkdir('{}/square_test_dir'.format(cls.ssh_path_to_tests))
@classmethod
def tearDownClass(cls):
aux.remove_if_exists('tests/square_test_dir/input/a.dat')
aux.remove_if_exists('tests/square_test_dir/output/b.dat')
cls.ssh_comm.rm('{}/square_test_dir'.format(cls.ssh_path_to_tests))
def test_trivial_serial_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_serial_graph([{'a': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a',), (-1, 0), correct_outputs)
initial_datas = [{'a': ic} for ic in self.initial_conditions]
invalid_initial_datas = [{'a': ic} for ic in (-1, 0)]
initial_state, term_state, correct_outputs = self._get_trivial_serial_graph(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, invalid_initial_datas, correct_outputs)
def test_trivial_parallel_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_parallel_graph([{'a': ic, 'b': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a', 'b'), (-1, 0), correct_outputs)
initial_datas = [{'a': ic, 'b': ic} for ic in self.initial_conditions]
invalid_initial_datas = [{'a': ic, 'b': ic} for ic in (-2, -1, 0)]
initial_state, term_state, correct_outputs = self._get_trivial_parallel_graph(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, invalid_initial_datas, correct_outputs)
def test_trivial_cycled_graph(self):
initial_state, term_state, correct_outputs = self._get_trivial_cycled_graph([{'a': ic} for ic in self.initial_conditions])
self._run_graph(initial_state, ('a',), (), correct_outputs)
initial_datas = [{'a': ic} for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_cycled_graph(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_complex_graph_made_from_trivial_ones(self):
def test_complex_graph_made_from_trivial_ones_using_dummy_edges(self):
'''
serial graph + parallel graph + cycled graph
'''
s_1, s_2, correct_outputs = self._get_trivial_serial_graph([{'a': ic, 'b': ic} for ic in self.initial_conditions])
initial_datas = [{'a': ic, 'b': ic} for ic in self.initial_conditions]
invalid_initial_datas = [{'a': ic, 'b': ic} for ic in (-4, -3, -2, -1, 0)]
s_1, s_2, correct_outputs = self._get_trivial_serial_graph(initial_datas)
s_3, s_4, correct_outputs = self._get_trivial_parallel_graph(correct_outputs)
s_5, s_6, correct_outputs = self._get_trivial_cycled_graph(correct_outputs)
s_2.connect_to(s_3, edge=Edge(dummy_predicate, dummy_edge))
s_4.connect_to(s_5, edge=Edge(dummy_predicate, dummy_edge))
self._run_graph(s_1, ('a', 'b'), (-3, -2, -1, 0), correct_outputs)
self._run_graph(s_1, s_6, initial_datas, invalid_initial_datas, correct_outputs)
def test_trivial_serial_graph_with_subgraph(self):
initial_datas = [{'a': ic} for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_serial_graph_with_subgraph(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_trivial_parallel_graph_with_subgraph(self):
initial_datas = [{'a': ic, 'b': ic} for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_parallel_graph_with_subgraph(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_complex_graph_made_from_trivial_ones_using_subgraphs(self):
'''
serial graph + parallel graph + cycled graph
'''
initial_datas = [{'a': ic, 'b': ic} for ic in self.initial_conditions]
invalid_initial_datas = [{'a': ic, 'b': ic} for ic in (-4, -3, -2, -1, 0)]
s_1, s_2, correct_outputs = self._get_trivial_serial_graph(initial_datas)
s_3, s_4, correct_outputs = self._get_trivial_parallel_graph(correct_outputs)
s_5, s_6, correct_outputs = self._get_trivial_cycled_graph(correct_outputs)
s_2.replace_with_graph(Graph(s_3, s_4))
s_4.replace_with_graph(Graph(s_5, s_6))
print(correct_outputs)
self._run_graph(s_1, s_6, initial_datas, invalid_initial_datas, correct_outputs)
def test_trivial_graph_with_implicit_parallelization(self):
'''
s_1 -> s_2 -> s_3, with dummy edges
s_2 = s_11 -> s_12 -> s_13, with +1 edges for a
three implicitly parallel branches appear instead of s_2
'''
initial_datas = [{'a': [ic**i for i in range(1, 4)]} for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_graph_with_implicit_parallelization(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_cycled_graph_with_implicit_parallelization(self):
random_neg_ics = [[random.randrange(-20, -3) for _ in range(3)] for _ in range(10)]
initial_datas = [{'a': random_neg_ic} for random_neg_ic in random_neg_ics]
#initial_datas = [{'a': [-4, -12]},]
#initial_datas = [{'a': [-3, -3]},]
initial_state, term_state, correct_outputs = self._get_cycled_graph_with_implicit_parallelization(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_trivial_graph_with_external_local_program(self):
initial_datas = [{'a': ic, '__WORKING_DIR__': os.path.join(os.getcwd(), 'tests', 'square_test_dir', 'output')} for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_graph_with_external_local_program(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_trivial_graph_with_external_remote_program(self):
initial_datas = [{'a': ic,
'__WORKING_DIR__': os.path.join(os.getcwd(), 'tests', 'square_test_dir', 'output'),
'__REMOTE_WORKING_DIR__': '{}/square_test_dir'.format(self.ssh_path_to_tests)}
for ic in self.initial_conditions]
initial_state, term_state, correct_outputs = self._get_trivial_graph_with_external_remote_program(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def test_trivial_graph_with_external_remote_program_using_grid_engine(self):
initial_datas = [{'a': ic,
'user': self.ssh_user,
'cores_required': 12,
'time_required': '12:00:00',
'qsub_script_name': 'square.sh',
'__WORKING_DIR__': os.path.join(os.getcwd(), 'tests', 'square_test_dir', 'output'),
'__REMOTE_WORKING_DIR__': '{}/square_test_dir'.format(self.ssh_path_to_tests)}
for ic in self.initial_conditions[0:2]]
initial_state, term_state, correct_outputs = self._get_trivial_graph_with_external_remote_program_using_grid_engine(initial_datas)
self._run_graph(initial_state, term_state, initial_datas, (), correct_outputs)
def _get_trivial_serial_graph(self, initial_conditions):
'''
......@@ -62,55 +216,46 @@ class GraphGoodCheck(unittest.TestCase):
p_12 = p_23 := a not 0
f_12 = f_23 := a + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
s_1 = State('serial_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_2 = State('serial_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('serial_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1 = State('serial_s_1')
s_2 = State('serial_s_2')
s_3 = State('serial_s_3')
s_1.connect_to(s_2, edge=Edge(nonzero_predicate, increment_a_edge))
s_2.connect_to(s_3, edge=Edge(nonzero_predicate, increment_a_edge))
#correct_outputs = [{'a': ic + 2} for ic in initial_conditions]
correct_outputs = []
for ic in initial_conditions:
ic['a'] += 2
correct_outputs.append(ic)
output = _create_data_from_dict(ic)
output['a'] += 2
correct_outputs.append(output)
return s_1, s_3, correct_outputs
def _get_trivial_parallel_graph(self, initial_conditions):
'''
s_1 -> s_2 -> s_4
-> s_3 ->
s_1 -> s_2 -> s_3 -> s_6
-> s_4 -> s_5 ->
p_12 = p_24 = p_13 = p_34 := a not 0
f_12 = f_24 := a + 1
f_13 = f_34 := b + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
asp = AllSelectionPolicy()
s_1 = State('parallel_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=asp)
s_2 = State('parallel_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('parallel_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_4 = State('parallel_s_4', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1 = State('nonparallel_s_1', selection_policy=AllSelectionPolicy())
s_2 = State('parallel_s_2')
s_3 = State('parallel_s_3')
s_4 = State('parallel_s_4')
s_5 = State('parallel_s_5')
s_6 = State('nonparallel_s_6')
s_1.connect_to(s_2, edge=Edge(nonzero_predicate, increment_a_edge))
s_2.connect_to(s_4, edge=Edge(nonzero_predicate, increment_a_edge))
s_1.connect_to(s_3, edge=Edge(nonzero_predicate, increment_b_edge))
s_3.connect_to(s_4, edge=Edge(nonzero_predicate, increment_b_edge))
#correct_outputs = [{'a': ic + 2, 'b': ic + 2} for ic in self.initial_conditions]
s_2.connect_to(s_3, edge=Edge(nonzero_predicate, increment_a_edge))
s_3.connect_to(s_6, edge=Edge(nonzero_predicate, increment_a_edge))
s_1.connect_to(s_4, edge=Edge(nonzero_predicate, increment_b_edge))
s_4.connect_to(s_5, edge=Edge(nonzero_predicate, increment_b_edge))
s_5.connect_to(s_6, edge=Edge(nonzero_predicate, increment_b_edge))
correct_outputs = []
for ic in initial_conditions:
ic['a'] += 2
ic['b'] += 2
correct_outputs.append(ic)
return s_1, s_4, correct_outputs
output = _create_data_from_dict(ic)
output['a'] += 3
output['b'] += 3
correct_outputs.append(output)
return s_1, s_6, correct_outputs
def _get_trivial_cycled_graph(self, initial_conditions):
'''
......@@ -122,39 +267,301 @@ class GraphGoodCheck(unittest.TestCase):
f_12 = f_23 = f_24 := a + 1
'''
spp = SerialParallelizationPolicy()
oosp = OnlyOneSelectionPolicy()
s_1 = State('cycled_s_1', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_2 = State('cycled_s_2', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_3 = State('cycled_s_3', parallelization_policy=spp,
parallel_branches_selection_policy=oosp)
s_1 = State('cycled_s_1')
s_2 = State('cycled_s_2')
s_3 = State('cycled_s_3')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, increment_a_edge))
s_2.connect_to(s_3, edge=Edge(positiveness_predicate, increment_a_edge))
s_2.connect_to(s_1, edge=Edge(nonpositiveness_predicate, increment_a_edge))
# correct_outputs = [{'a': ic + 2} if ic >=0 else {'a': ic%2 + 2} for ic in self.initial_conditions]
correct_outputs = []
for ic in initial_conditions:
if ic['a'] >= 0:
ic['a'] += 2
output = _create_data_from_dict(ic)
if output['a'] >= 0:
output['a'] += 2
else:
ic['a'] = ic['a']%2 + 2
correct_outputs.append(ic)
output['a'] = output['a']%2 + 2
correct_outputs.append(output)
return s_1, s_3, correct_outputs
def _get_trivial_graph_with_external_local_program(self, initial_conditions):
'''
s_1 -> s_2 -> s_3 -> s_4,
p_12 = p_23 = dummy
f_12 = write(a) into a.dat
f_23 = a**2
f_34 = read b.dat from the working dir into b
'''
square_edge = ExecutableProgramEdge('square', self.local_comm,
output_dict={'b_file': 'b.dat'},
trailing_args_keys=('a_file',),
)
s_1 = State('external_s_1')
s_2 = State('external_s_2')
s_3 = State('external_s_3')
s_4 = State('external_s_4')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, write_a_edge))
s_2.connect_to(s_3, edge=square_edge)
s_3.connect_to(s_4, edge=Edge(dummy_predicate, load_b_edge))
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a_file'] = os.path.join(os.getcwd(), 'tests/square_test_dir/input/a.dat')
output['b'] = output['a']**2
output['b_file'] = 'b.dat'
correct_outputs.append(output)
return s_1, s_4, correct_outputs
def _get_trivial_graph_with_external_remote_program(self, initial_conditions):
'''
s_1 -> s_2 -> s_3 -> s_4 -> s_5 -> s_6,
all predicates are dummy
f_12 = write(a) into a.dat
f_23 = upload a.dat into the working dir on remote
f_34 = a**2
f_45 = download b.dat from the working dir on remote to the working dir on local
f_56 = read download b.dat from the working dir on local into b
'''
upload_edge = UploadOnRemoteEdge(self.ssh_comm,
local_paths_keys=('a_file',),
)
square_edge = ExecutableProgramEdge('square', self.ssh_comm,
output_dict={'b_file': 'b.dat'},
trailing_args_keys=('a_file',),
remote=True,
)
download_edge = DownloadFromRemoteEdge(self.ssh_comm,
remote_paths_keys=('b_file',),
)
s_1 = State('remote_s_1')
s_2 = State('remote_s_2')
s_3 = State('remote_s_3')
s_4 = State('remote_s_4')
s_5 = State('remote_s_5')
s_6 = State('remote_s_6')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, write_a_edge))
s_2.connect_to(s_3, edge=upload_edge)
s_3.connect_to(s_4, edge=square_edge)
s_4.connect_to(s_5, edge=download_edge)
s_5.connect_to(s_6, edge=Edge(dummy_predicate, load_b_edge))
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a_file'] = os.path.join(ic['__REMOTE_WORKING_DIR__'], 'a.dat')
output['b'] = output['a']**2
output['b_file'] = os.path.join(ic['__WORKING_DIR__'], 'b.dat')
correct_outputs.append(output)
return s_1, s_6, correct_outputs
def _get_trivial_graph_with_external_remote_program_using_grid_engine(self, initial_conditions):
'''
s_1 -> s_2 -> s_3 -> s_4 -> s_5 -> s_6 -> s_7 -> s_8 -> s_9,
<->
all predicates, expect p_66 and p_67, are dummy
p_66 = job unfinished
p_67 = job finished
f_12 = write(a) into a.dat
f_23 = upload a.dat into the working dir on remote
f_34 = make up qsub script launching square
f_45 = upload qsub script
f_56 = send job (a**2) via qsub
f_66 = check job finished via qstat
f_67 = download b.dat from the working dir on remote to the working dir on local
f_78 = read download b.dat from the working dir on local into b
f_89 = filter out a_file, b_file, job_ID, qsub_script
'''
make_up_qsub_script_edge = QsubScriptEdge('square', self.local_comm, self.ssh_comm,
trailing_args_keys=('a_file',),
)
upload_a_edge = UploadOnRemoteEdge(self.ssh_comm,
local_paths_keys=('a_file',),
)
upload_qsub_script_edge = UploadOnRemoteEdge(self.ssh_comm,
local_paths_keys=('qsub_script',),
)
qsub_edge = ExecutableProgramEdge('qsub', self.ssh_comm,
trailing_args_keys=('qsub_script',),
output_dict={'job_finished': False, 'b_file': 'b.dat'},
stdout_processor=set_job_id,
remote=True,
)
qstat_edge = ExecutableProgramEdge('qstat', self.ssh_comm,
predicate=job_unfinished_predicate,
io_mapping=InOutMapping(keys_mapping={'u': 'user', 'job_ID': 'job_ID'}),
keyword_names=('u',),
remote=True,
stdout_processor=check_task_finished,
)
download_edge = DownloadFromRemoteEdge(self.ssh_comm,
predicate=job_finished_predicate,
remote_paths_keys=('b_file',),
)
s_1 = State('remote_s_1')
s_2 = State('remote_s_2')
s_3 = State('remote_s_3')
s_4 = State('remote_s_4')
s_5 = State('remote_s_5')
s_6 = State('remote_s_6')
s_7 = State('remote_s_7')
s_8 = State('remote_s_8')
s_9 = State('remote_s_9')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, write_a_edge))
s_2.connect_to(s_3, edge=upload_a_edge)
s_3.connect_to(s_4, edge=make_up_qsub_script_edge)
s_4.connect_to(s_5, edge=upload_qsub_script_edge)
s_5.connect_to(s_6, edge=qsub_edge)
s_6.connect_to(s_6, edge=qstat_edge)
s_6.connect_to(s_7, edge=download_edge)
s_7.connect_to(s_8, edge=Edge(dummy_predicate, load_b_edge))
def filter_data(data):
del data['a_file']
del data['b_file']
del data['job_ID']
del data['qsub_script']
s_8.connect_to(s_9, edge=Edge(dummy_predicate, filter_data))
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['b'] = output['a']**2
output['job_finished'] = True
correct_outputs.append(output)
return s_1, s_9, correct_outputs
def _get_trivial_serial_graph_with_subgraph(self, initial_conditions):
'''
s_1 -> s_2,
where s_2 is replaced by s_1 -> s_2
p_12 = p_23 := dummy
f_12 := a + 1
'''
s_1 = State('s_1')
s_2 = State('s_2')
s_3 = State('s_3')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, increment_a_edge))
s_2.connect_to(s_3, edge=Edge(dummy_predicate, increment_a_edge))
sub_s_1 = State('sub_s_1')
sub_s_2 = State('sub_s_2')
sub_s_1.connect_to(sub_s_2, edge=Edge(dummy_predicate, increment_a_edge))
s_2.replace_with_graph(Graph(sub_s_1, sub_s_2))
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a'] += 3
correct_outputs.append(output)
return s_1, s_3, correct_outputs
def _run_graph(self, initial_state, vars_to_initialize, invalid_ics, correct_outputs):
graph = Graph(initial_state)
for ic, correct_output in zip(self.initial_conditions, correct_outputs):
print('Doing ic = {}...'.format(ic))
gotten_output, okay = graph.run({var: ic for var in vars_to_initialize})
if ic in invalid_ics:
print(gotten_output['__EXCEPTION__'])
self.assertEqual('__EXCEPTION__' in gotten_output, True)
def _get_trivial_parallel_graph_with_subgraph(self, initial_conditions):
'''
s_1 -> s_2 -> s_4
-> s_3 ->
where s_2 and s_3 is replaced by s_5 -> s_6 -> s_7
all predicate are dummy
f_12 = f_24 := a + 1
f_13 = f_34 := b + 1
f_56 = f_67 := a + 1
'''
asp = AllSelectionPolicy()
s_1 = State('s_1', selection_policy=AllSelectionPolicy())
s_2 = State('s_2')
s_3 = State('s_3')
s_4 = State('s_4')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, increment_a_edge))
s_1.connect_to(s_3, edge=Edge(dummy_predicate, increment_b_edge))
s_2.connect_to(s_4, edge=Edge(dummy_predicate, increment_a_edge))
s_3.connect_to(s_4, edge=Edge(dummy_predicate, increment_b_edge))
sub1_s_5 = State('s_2_sub_s_5')
sub1_s_6 = State('s_2_sub_s_6')
sub1_s_7 = State('s_2_sub_s_7')
sub2_s_5 = State('s_3_sub_s_5')
sub2_s_6 = State('s_3_sub_s_6')
sub2_s_7 = State('s_3_sub_s_7')
sub1_s_5.connect_to(sub1_s_6, edge=Edge(dummy_predicate, increment_a_edge))
sub1_s_6.connect_to(sub1_s_7, edge=Edge(dummy_predicate, increment_a_edge))
sub2_s_5.connect_to(sub2_s_6, edge=Edge(dummy_predicate, increment_a_edge))
sub2_s_6.connect_to(sub2_s_7, edge=Edge(dummy_predicate, increment_a_edge))
s_2.replace_with_graph(Graph(sub1_s_5, sub1_s_7))
s_3.replace_with_graph(Graph(sub2_s_5, sub2_s_7))
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a'] += 6
output['b'] += 2
correct_outputs.append(output)
return s_1, s_4, correct_outputs
def _get_trivial_graph_with_implicit_parallelization(self, initial_conditions):
'''
s_1 -> s_2 -> s_4
-> s_3 ->
where s_2 and s_3 is replaced by s_5 -> s_6 -> s_7
all predicate are dummy
f_12 = f_24 := a + 1
f_13 = f_34 := b + 1
f_56 = f_67 := a + 1
'''
#asp = AllSelectionPolicy()
sub_s_1 = State('sub_s_1', array_keys_mapping={'a': 'a'})
sub_s_2 = State('sub_s_2')
sub_s_3 = State('sub_s_3')
subgraph = Graph(sub_s_1, sub_s_3)
s_1 = State('s_1')
s_2 = State('s_2')
s_3 = State('s_3')
s_1.connect_to(s_2, edge=Edge(dummy_predicate, dummy_edge))
s_2.connect_to(s_3, edge=Edge(dummy_predicate, dummy_edge))
sub_s_1.connect_to(sub_s_2, edge=Edge(dummy_predicate, increment_a_edge))
sub_s_2.connect_to(sub_s_3, edge=Edge(dummy_predicate, increment_a_edge))
s_2.replace_with_graph(subgraph)
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a'] = [output['a'][i] + 2 for i in range(len(output['a']))]
correct_outputs.append(output)
return s_1, s_3, correct_outputs
def _get_cycled_graph_with_implicit_parallelization(self, initial_conditions):
'''
s_1 -> s_2 -> s_3 -> s_4
<-
p_23 := a > 0
p_22 := a <= 0
all other predicates are dummy
f_11 = f_22 = f_23 = f_34 := a + 1
'''
s_sub_1 = State('s_sub_1', array_keys_mapping={'a': 'a'})
s_sub_2 = State('s_sub_2')
s_sub_3 = State('s_sub_3')
s_1 = State('s_1')
s_2 = State('s_2')
s_sub_1.connect_to(s_sub_2, edge=Edge(dummy_predicate, increment_a_edge))
s_sub_2.connect_to(s_sub_2, edge=Edge(lambda d: d['a'] <= 0, increment_a_edge))
s_sub_2.connect_to(s_sub_3, edge=Edge(lambda d: d['a'] > 0, increment_a_edge))
subgraph = Graph(s_sub_1, s_sub_3)
s_1.connect_to(s_2, edge=Edge(dummy_predicate, increment_a_array_edge))
s_1.replace_with_graph(subgraph)
correct_outputs = []
for ic in initial_conditions:
output = _create_data_from_dict(ic)
output['a'] = [3 for i in range(len(output['a']))]
correct_outputs.append(output)
return s_1, s_2, correct_outputs
def _run_graph(self, initial_state, term_state, initial_datas, invalid_initial_datas, correct_outputs):
graph = Graph(initial_state, term_state)
for initial_data, correct_output in zip(initial_datas, correct_outputs):
print('Doing ic = {}'.format(initial_data))
data = deepcopy(initial_data)
okay = graph.run(data)
#print(data['__EXCEPTION__'])
if initial_data in invalid_initial_datas:
self.assertEqual('__EXCEPTION__' in data, True)
self.assertEqual(okay, False)
else:
self.assertEqual(okay, True)
self.assertEqual(gotten_output, correct_output)
self.assertEqual(data, correct_output)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment