Commit 5318918b authored by Anton Pershin's avatar Anton Pershin

Updated comments in research.py, refactored the names there and provided a…

Updated comments in research.py, refactored the names there and provided a detailed description of the class. Also updated the config example
parent 75bd6c9a
......@@ -6,159 +6,151 @@ from comsdk.communication import *
from comsdk.distributed_storage import *
from comsdk.edge import Edge, dummy_predicate
# Create RESEARCH-ID. It is a small research which should link different local directories (with reports and time-integration) and ssh directories (with continuation, for example)
# What is included in RESEARCH?
# 1) It should be multistep. So we can always continue RESEARCH if we want
# 2) Everything should be dated and timed!
# 3) For every research we should create local directory having format @date@_@name@. Inside we should have directory "report" and file research.log where all stuff is stored
# If there are some remote calculations, then there should be "remote" directory where all that stuff is organized. Another directory is "local" which contains local results
# Somehow we should save python script reproducing the results (think about it).
# 4)
# Local directory hierarchy:
# ROOT for us is the directory where research.py was launch (as it is a part of postproc package, postproc.research is rather used)
# ROOT/RESEARCH_REL_DIR is main directory
# ROOT/RESEARCH_REL_DIR/1-some_task
# ROOT/RESEARCH_REL_DIR/2-another_task
# ROOT/RESEARCH_REL_DIR/3-one_more_task
# As we can have multiple remotes, a remote root directory should be set somewhere out of Research.
# One possible solution is a factory for Task.
# Anyway, we create directory hierarchy relative to REMOTE_ROOT as follows:
# REMOTE_ROOT/RESEARCH_REL_DIR/1-some_task
# REMOTE_ROOT/RESEARCH_REL_DIR/2-another_task
# REMOTE_ROOT/RESEARCH_REL_DIR/3-one_more_task
# Some usual cases:
# 1) Remote calculation on some cluster.
# 1.1 COPY-EXECUTE TASK
# 1.1.1 copy input files from local to remote
# 1.1.2 copy program from remote to remote
# 1.1.3 execute program
# 1.2 wait for finishing (at this moment, we just have to wait and check)
# 1.3 COPY-TASK
# 1.3.1 copy results from remote to local
# 1.4 as result, we will have results directly in the task directory
class Research:
"""Research is the main class for interacting with the hierarchy of tasks.
It allows to:
(1) find location of a task by its number
(2) store/load object into/from a task's dir
(3) create new tasks by launching TaskExecution locally or on remotes
(4) launch TaskExecution in already existing task's directory
(5) grab task's content from remotes
The main idea behind Research is that we collect tasks in the research's dir and make
them enumerated. Each task is completely identified by its number. Its content, in turn,
is located in the task's dir which is made up by concatenation of the number and the task's
name that must be specified by a user. Next, we distinguish two logical places where tasks
are collected: local one and remote one. Both places are distributed, but in a different
manner: the local place is distributed over the locally accesible space on the current machine
(say, some tasks may be located in dir path/to/storage and another tasks in another_path/to/storage)
whereas the remote place is distributed over different machines. For the local place, we use DistributedStorage
class to access the data. The remote place is accessed by setting a necessary SshCommunication in the Research
class constructor. To avoid the mess in directories, we assume the following rules:
(1) intersection between tasks' dirs of the same research dir on different location on the local machine is empty
(2) all tasks are presented locally (in any location defined in DistributedStorage) at least as empty dirs
(3) intersection between tasks' dirs of the same research dir on different remotes is empty
(4) union of them
Therefore, all tasks located remotely must map to the local machine (but it is not true for an opposite case!).
Hence, when the task intended to be executed on a remote is created, we create both a remote directory and a local
directory for this task. To be consistent in creating tasks locally, we choose a main (master) directory
in DistributedStorage in which we create by default tasks directories whereas other local directories are assumed
to be for storing purposes.
The instance of Research is created by passing to it BaseCommunication object. If the instance is intended to launch
tasks on remotes or grab tasks from remotes, then the corresponding BaseCommunication for interaction must be passed.
By default, LocalCommunication is used and the remote interaction is thus disabled.
"""
def __init__(self, name,
Class Research is a representation of a group of different calculations collected into what we call a Research.
Each ''calculation'' corresponds to the launch of a graph-based scenario which produces a set of files which we
treat as the results of the calculation. We thus call such a calculation a task. Therefore, a Research is a
collection of tasks. Each task is associated with a single directory (within the code, it may be denoted as
task_dir, if only directory name is of interest, or task_path, if the absolute path is of interest) whose name has
a very simple structure, @number@-@long_name@, so that each task is associated with its own unique number (also
called task_number within the code). Normally, one should use the task number to get any task-related information.
All the tasks are located in the research directory whose the absolute path is set by the class property
research_path. It has the following pattern: @date@_@long_name@. Finally, we associate a short Research ID with each
Research which can be accessed using the property research_id. The described structure is independent of where these
directories are located. It is assumed that there is a local root for research and its remote analog. The latter
should be available via any protocol supported by communication module. Class Research can thus be set up in two
regimes: local (remote_comm is None) and local-remote (remote_comm is not None).
Typically, one should construct an instance of Research based on the configuration file called config_research.json.
There are two static functions for this purpose: Research.open() and Research.create(). The former creates an
instance of Research based on the existing Research (one should pass its Research ID in open()) described in the
configuration file and the latter creates a new Research (thus, making a new directory in the local filesystem) and
adds all the necessary information about it in the configuration file. Also, any Research instance is automatically
augmented by the properties listed in ''RESEARCH_PROPS'' dictionary in the configuration file.
For the Research constructor to understand where all the research directories are located, one must supply (either
directly in the constructor or in the configuration file) the potential root paths for the search (both for the
local and remote machines if the latter is specified). The first path in the list of the potential root paths is
called the default root path. A new Research will be created in the default path.
Note that different tasks belonging to the same research (i.e., they are associated with the same Research ID) may
be located at different root paths. When creating a new task, it will be located in the default root path.
.. todo::
Some way for saving auxiliary information about research and tasks (task date and description, for example)
should be implemented. Possibly, the same should be done for launcher scripts.
"""
def __init__(self, name,
continuing=False,
local_research_paths=None,
local_research_roots=None,
remote_comm=None,
remote_research_path=None):
# Always create local communication here
# Remote communication is optional then
self.local_research_path = local_research_paths[0]
self.local_main_path = os.path.dirname(self.local_research_path)
self.remote_research_path = remote_research_path
remote_research_root=None):
"""
:param name: research description (if continuing == False) or Research ID (if continuing == True)
:param continuing: if False, the Research with be read from the root path. Otherwise, a new one will be created
:param local_research_roots: a list of local paths where research directories are searched for
:param remote_comm: BaseCommunication instance used for communication with remote machine
:param remote_research_root: path on the remote machine where research directories are searched for
"""
self.local_research_root = local_research_roots[0]
self.local_main_path = os.path.dirname(self.local_research_root)
self.remote_research_root = remote_research_root
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self._local_comm = LocalCommunication(Host()) # local communication created automatically, no need to pass it
self._remote_comm = remote_comm
self._distr_storage = DistributedStorage(local_research_paths, prior_storage_index=0)
self._distr_storage = DistributedStorage(local_research_roots, prior_storage_index=0)
if not continuing:
# interpret name as name without date
self._research_id = make_suitable_research_name(name)
if self._distr_storage.get_dir_path(self._research_id) is not None:
raise ResearchAlreadyExists("Research with name '{}' already exists, "
"choose another name".format(self._research_id))
self.research_path = self._distr_storage.make_dir(self._research_id)
self._research_path = self._distr_storage.make_dir(self._research_id)
print('Started new research at {}'.format(self.research_path))
else:
# interpret name as the full research id
self._research_id = name
self.research_path = self._load_research_data()
self._research_path = self._load_research_data()
@classmethod
def open(cls, research_sid, remote_comm=None):
def open(cls, research_id, remote_comm=None):
"""
:param research_id: Research ID used to find a relevant research
:param remote_comm: BaseCommunication instance used for communication with remote machine
:return: new Research instance
"""
with open('config_research.json', 'r') as f:
conf = json.load(f)
res = Research(conf['RESEARCH'][research_sid],
res = Research(conf['RESEARCH'][research_id],
continuing=True,
local_research_paths=conf['LOCAL_HOST']['research_paths'],
remote_comm=remote_comm,
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path']
local_research_roots=conf['LOCAL_HOST']['research_roots'],
remote_comm=remote_comm,
remote_research_root=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_root']
if remote_comm is not None else None)
res._add_properties(conf['RESEARCH_PROPS'])
return res
@classmethod
def create(cls, new_research_sid, new_research_descr, remote_comm=None):
def create(cls, new_research_id, new_research_descr, remote_comm=None):
"""
:param new_research_id: Research ID (short name for this research)
:param new_research_descr: relatively long research name
:param remote_comm: BaseCommunication instance used for communication with remote machine
:return: new Research instance
"""
with open('config_research.json', 'r+') as f:
conf = json.load(f)
conf['RESEARCH'][new_research_sid] = make_suitable_research_name(new_research_descr)
conf['RESEARCH'][new_research_id] = make_suitable_research_name(new_research_descr)
f.seek(0)
json.dump(conf, f, indent=4)
f.truncate()
res = Research(new_research_descr,
continuing=False,
local_research_paths=conf['LOCAL_HOST']['research_paths'],
local_research_roots=conf['LOCAL_HOST']['research_roots'],
remote_comm=remote_comm,
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path']
remote_research_root=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_root']
if remote_comm is not None else None)
res._add_properties(conf['RESEARCH_PROPS'])
return res
def __getstate__(self):
@property
def research_path(self) -> str:
return self._research_path
@property
def research_id(self) -> str:
return self._research_id
def __getstate__(self) -> dict:
return {
'research_id': self._research_id,
'local_research_path': self.local_research_path,
'remote_research_path': self.remote_research_path,
'local_research_path': self.local_research_root,
'remote_research_path': self.remote_research_root,
'remote_comm': self._remote_comm.__getstate__(),
}
def __setstate__(self, state):
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self.local_research_path = state['local_research_path']
self.remote_research_path = state['remote_research_path']
self.local_research_root = state['local_research_path']
self.remote_research_root = state['remote_research_path']
self._remote_comm = None
if state['remote_comm'] is not None:
self._remote_comm = SshCommunication.__new__(SshCommunication)
self._remote_comm.__setstate__(state['remote_comm'])
self._distr_storage = DistributedStorage((self.local_research_path,), prior_storage_index=0)
self._distr_storage = DistributedStorage((self.local_research_root,), prior_storage_index=0)
self._research_id = state['research_id']
self.research_path = self._load_research_data()
self._research_path = self._load_research_data()
def _add_properties(self, props):
for prop_name, prop_value in props.items():
self.__setattr__(prop_name, prop_value)
def _load_research_data(self):
def _load_research_data(self) -> str:
# find corresponding date/name
# construct object from all data inside
research_path = self._distr_storage.get_dir_path(self._research_id)
......@@ -185,19 +177,27 @@ class Research:
print('Number of tasks in the current research: {}'.format(self._tasks_number))
return research_path
def create_task(self, name):
def create_task(self, name) -> int:
"""
Creates a new task, copies necessary data and executes the command line
Creates a new task in the current research making a new local directory
:param name: task name
:return: task number
"""
task_number = self._get_next_task_number()
local_task_dir = self._make_task_path(task_number, name)
os.mkdir(local_task_dir)
return task_number
def grab_task_results(self, task_number, copies_list=[]):
def grab_task_results(self, task_number, copies_list=[]) -> None:
"""
Moves task content from the remote to the local. Locally, the task content will appear in the task
dir located in the main research location.
Moves task content from the remote machine to the local one. Locally, the task content will appear in the task
directory located in the research directory.
:param task_number: task number
:param copies_list: a list defining which objects we wish to copy from the remote machine. It consists of
dictionaries each having keys 'path' (path of object we wish to copy relative to the task directory) and
'new_name' (path of this object on the local machine relative to the task directory)
"""
task_results_local_path = self.get_task_path(task_number)
task_results_remote_path = self.get_task_path(task_number, self._remote_comm.host)
......@@ -215,7 +215,7 @@ class Research:
os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])),
os.path.join(task_results_local_path, copy_target['new_name']))
def _make_task_path(self, task_number, task_name, execution_host=None):
def _make_task_path(self, task_number, task_name, execution_host=None) -> str:
task_path = ''
task_dir = get_task_full_name(task_number, task_name)
if execution_host is None:
......@@ -225,25 +225,33 @@ class Research:
task_path = os.path.join(execution_host.research_abs_path, rel_task_dir)
return task_path
def get_task_path(self, task_number, at_remote_host=False):
def get_task_path(self, task_number, at_remote_host=False) -> str:
"""
Returns the task dir corresponding to task_number. By default, the local dir (from DistrubutedStorage) is returned.
If execution_host is specified, then the remote dir will be returned.
Return absolute task path based on its number
:param task_number: task number
:param at_remote_host: return the path on the remote machine (if True) or on the local one (if False)
:return: absolute task path
"""
task_path = ''
task_name = self._get_task_name_by_number(task_number)
rel_task_dir = os.path.join(self._research_id, get_task_full_name(task_number, task_name))
if at_remote_host:
task_path = '{}/{}'.format(self.remote_research_path, rel_task_dir)
task_path = '{}/{}'.format(self.remote_research_root, rel_task_dir)
else:
task_path = self._distr_storage.get_dir_path(rel_task_dir)
return task_path
def dump_object(self, task_number, obj, obj_name):
def dump_object(self, task_number, obj, obj_name) -> None:
"""
Dumps obj into the file whose name is obj_name + '.pyo' and locates it into the task dir corresponding to
task_number
Dumps any python object (using pickle) to the binary file, named obj_name + '.pyo', in the task directory
associated with the task number
:param task_number: task number
:param obj: any python object
:param obj_name: file name to which obj will be saved (without extension)
"""
print('Dumping ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'w')
pickle.dump(obj, f)
......@@ -251,8 +259,12 @@ class Research:
def load_object(self, task_number, obj_name):
"""
Load an object dumped into the file whose name is obj_name + '.pyo' and which is located it into the task dir
corresponding to task_number
Load any python object dumped using pickle from the binary file, named obj_name + '.pyo' and located in the task
directory associated with the task number
:param task_number: task number
:param obj_name: file name from which obj will be loaded (without extension)
:return: python object
"""
print('Loading ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'r')
......@@ -260,11 +272,11 @@ class Research:
f.close()
return obj
def _get_next_task_number(self):
def _get_next_task_number(self) -> int:
self._tasks_number += 1
return self._tasks_number - 1
def _get_task_name_by_number(self, task_number):
def _get_task_name_by_number(self, task_number) -> str:
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id,
'^{}-(?P<task_name>\S+)'.format(task_number))
if find_data is None:
......@@ -280,26 +292,26 @@ class ResearchDoesNotExist(Exception):
pass
def make_suitable_name(name):
def make_suitable_name(name) -> str:
return '_'.join(name.split())
def make_suitable_research_name(descr):
def make_suitable_research_name(descr) -> str:
return '_'.join([str(date.today()), make_suitable_name(descr)])
def get_task_full_name(task_number, task_name):
def get_task_full_name(task_number, task_name) -> str:
return str(task_number) + '-' + make_suitable_name(task_name)
def split_task_dir(task_dir):
def split_task_dir(task_dir) -> (int, str):
parsing_params = parse_by_named_regexp(r'^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
if parsing_params is None:
raise Exception("No task directory '{}' is found".format(task_dir))
return int(parsing_params['task_number']), parsing_params['task_name']
def retrieve_trailing_float_from_task_dir(task_dir):
def retrieve_trailing_float_from_task_dir(task_dir) -> float:
matching = re.search(r'^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)',
task_dir)
if matching is None:
......
{
"LOCAL_HOST": {
"research_path": "...",
"research_roots": "...",
"custom_programs": {
"@path_to_binaries@": ["@bin1@", "@bin2@", ...],
...
......@@ -12,6 +12,7 @@
"max_cores": ...,
"username": "...",
"password": "...",
"pkey": "...",
"research_path": "...",
"env_programs": ["@bin1@", "@bin1@", ...],
"custom_programs": {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment