Commit 12b38eb1 authored by Anton Pershin's avatar Anton Pershin

Massive refactoring of class Research: (1) enabled static type-checking using…

Massive refactoring of class Research: (1) enabled static type-checking using module 'typing' and package 'mypy', (2) changed class member names to more sensible ones and (3) introduced three properties, namely, local_research_path, remote_research_path, research_dir (they are the only public data members of the class now)
parent 5318918b
......@@ -2,6 +2,7 @@ from comsdk.comaux import find_dir_by_named_regexp
from functools import partial
import os
class DistributedStorage:
"""
Distributed storage is a set of sources contaning the data. The sources must be accessible by the OS API.
......@@ -18,8 +19,8 @@ class DistributedStorage:
"""
Returns the full path to dir_ or None if dir_ is absent.
"""
dir_path_tuple = self.lookup_through_dir(dir_, \
lambda dir_path: (dir_path, dir_path) if os.path.exists(dir_path) else None)
dir_path_tuple = self.lookup_through_dir(dir_, lambda dir_path: (dir_path, dir_path)
if os.path.exists(dir_path) else None)
return dir_path_tuple[0] if dir_path_tuple is not None else None
def make_dir(self, dir_):
......
import pickle
from datetime import date
from typing import Sequence, Optional, Mapping, Any
import json
from mypy_extensions import TypedDict
from comsdk.comaux import *
from comsdk.communication import *
from comsdk.communication import BaseCommunication, LocalCommunication, SshCommunication, Host
from comsdk.distributed_storage import *
from comsdk.edge import Edge, dummy_predicate
CopiesList = TypedDict('CopiesList', {'path': str, 'new_name': str})
class Research:
......@@ -17,19 +23,19 @@ class Research:
task_dir, if only directory name is of interest, or task_path, if the absolute path is of interest) whose name has
a very simple structure, @number@-@long_name@, so that each task is associated with its own unique number (also
called task_number within the code). Normally, one should use the task number to get any task-related information.
All the tasks are located in the research directory whose the absolute path is set by the class property
research_path. It has the following pattern: @date@_@long_name@. Finally, we associate a short Research ID with each
Research which can be accessed using the property research_id. The described structure is independent of where these
directories are located. It is assumed that there is a local root for research and its remote analog. The latter
should be available via any protocol supported by communication module. Class Research can thus be set up in two
regimes: local (remote_comm is None) and local-remote (remote_comm is not None).
All the tasks are located in the research directory whose the local (remote) absolute path is set by the class
property local_research_path (remote_research_path). The research directory has the following pattern:
@date@_@long_name@. Finally, we associate a short Research ID with each Research. The described structure is
independent of where these directories are located. It is assumed that there is a local root for research and
its remote analog. The latter should be available via any protocol supported by communication module. Class Research
can thus be set up in two regimes: local (remote_comm is None) and local-remote (remote_comm is not None).
Typically, one should construct an instance of Research based on the configuration file called config_research.json.
There are two static functions for this purpose: Research.open() and Research.create(). The former creates an
instance of Research based on the existing Research (one should pass its Research ID in open()) described in the
instance of Research based on the existing Research (one should pass its Research ID to open()) described in the
configuration file and the latter creates a new Research (thus, making a new directory in the local filesystem) and
adds all the necessary information about it in the configuration file. Also, any Research instance is automatically
augmented by the properties listed in ''RESEARCH_PROPS'' dictionary in the configuration file.
augmented by the properties listed in 'RESEARCH_PROPS' dictionary in the configuration file.
For the Research constructor to understand where all the research directories are located, one must supply (either
directly in the constructor or in the configuration file) the potential root paths for the search (both for the
......@@ -44,40 +50,41 @@ class Research:
should be implemented. Possibly, the same should be done for launcher scripts.
"""
def __init__(self, name,
def __init__(self, name: str,
continuing=False,
local_research_roots=None,
remote_comm=None,
remote_research_root=None):
local_research_roots: Optional[Sequence[str]] = None,
remote_comm: Optional[BaseCommunication] = None,
remote_research_root: Optional[str] = None):
"""
:param name: research description (if continuing == False) or Research ID (if continuing == True)
:param name: research description (if continuing == False) or research directory (if continuing == True)
:param continuing: if False, the Research with be read from the root path. Otherwise, a new one will be created
:param local_research_roots: a list of local paths where research directories are searched for
:param remote_comm: BaseCommunication instance used for communication with remote machine
:param remote_research_root: path on the remote machine where research directories are searched for
"""
self.local_research_root = local_research_roots[0]
self.local_main_path = os.path.dirname(self.local_research_root)
self.remote_research_root = remote_research_root
self._local_research_root = local_research_roots[0]
self._remote_research_root = remote_research_root
self._tasks_number = 0
self._local_comm = LocalCommunication(Host()) # local communication created automatically, no need to pass it
self._remote_comm = remote_comm
self._distr_storage = DistributedStorage(local_research_roots, prior_storage_index=0)
self._local_research_path = None
if not continuing:
# interpret name as name without date
self._research_id = make_suitable_research_name(name)
if self._distr_storage.get_dir_path(self._research_id) is not None:
self._research_dir = make_suitable_research_dir(name)
if self._distr_storage.get_dir_path(self._research_dir) is not None:
raise ResearchAlreadyExists("Research with name '{}' already exists, "
"choose another name".format(self._research_id))
self._research_path = self._distr_storage.make_dir(self._research_id)
print('Started new research at {}'.format(self.research_path))
"choose another name".format(self._research_dir))
self._local_research_path = self._distr_storage.make_dir(self._research_dir)
print('Started new research at {}'.format(self._local_research_path))
else:
# interpret name as the full research id
self._research_id = name
self._research_path = self._load_research_data()
self._research_dir = name
self._local_research_path = self._load_research_data()
@classmethod
def open(cls, research_id, remote_comm=None):
def open(cls, research_id: str,
remote_comm: Optional[BaseCommunication] = None):
"""
:param research_id: Research ID used to find a relevant research
:param remote_comm: BaseCommunication instance used for communication with remote machine
......@@ -95,7 +102,8 @@ class Research:
return res
@classmethod
def create(cls, new_research_id, new_research_descr, remote_comm=None):
def create(cls, new_research_id: str, new_research_descr: str,
remote_comm: Optional[BaseCommunication] = None):
"""
:param new_research_id: Research ID (short name for this research)
:param new_research_descr: relatively long research name
......@@ -104,7 +112,7 @@ class Research:
"""
with open('config_research.json', 'r+') as f:
conf = json.load(f)
conf['RESEARCH'][new_research_id] = make_suitable_research_name(new_research_descr)
conf['RESEARCH'][new_research_id] = make_suitable_research_dir(new_research_descr)
f.seek(0)
json.dump(conf, f, indent=4)
f.truncate()
......@@ -118,55 +126,61 @@ class Research:
return res
@property
def research_path(self) -> str:
return self._research_path
def local_research_path(self) -> str:
return self._local_research_path
@property
def research_id(self) -> str:
return self._research_id
def remote_research_path(self) -> str:
return os.path.join(self._remote_research_root, self._research_dir)
@property
def research_dir(self) -> str:
return self._research_dir
def __getstate__(self) -> dict:
return {
'research_id': self._research_id,
'local_research_path': self.local_research_root,
'remote_research_path': self.remote_research_root,
'research_dir': self._research_dir,
'local_research_path': self._local_research_root,
'remote_research_path': self._remote_research_root,
'remote_comm': self._remote_comm.__getstate__(),
}
def __setstate__(self, state):
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self.local_research_root = state['local_research_path']
self.remote_research_root = state['remote_research_path']
self._local_research_root = state['local_research_path']
self._remote_research_root = state['remote_research_path']
self._remote_comm = None
if state['remote_comm'] is not None:
self._remote_comm = SshCommunication.__new__(SshCommunication)
self._remote_comm.__setstate__(state['remote_comm'])
self._distr_storage = DistributedStorage((self.local_research_root,), prior_storage_index=0)
self._research_id = state['research_id']
self._distr_storage = DistributedStorage((self._local_research_root,), prior_storage_index=0)
self._research_dir = state['research_dir']
self._research_path = self._load_research_data()
def _add_properties(self, props):
def _add_properties(self, props: Mapping[str, Any]) -> None:
for prop_name, prop_value in props.items():
self.__setattr__(prop_name, prop_value)
def _load_research_data(self) -> str:
# find corresponding date/name
# construct object from all data inside
research_path = self._distr_storage.get_dir_path(self._research_id)
research_path = self._distr_storage.get_dir_path(self._research_dir)
if research_path is None:
# assume date was omitted in research id
regexp_for_search = '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_id)
research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', regexp_for_search)
if dir_params is None:
raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_id))
self._research_id = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'],
self._research_id)
raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_dir))
# if research_path is None:
# # assume date was omitted in research id
# regexp_for_search = '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_name)
# research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', regexp_for_search)
# if dir_params is None:
# raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_name))
# self._research_name = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'],
# self._research_name)
print('Loaded research at {}'.format(research_path))
# determine maximum task number to set the number for the next possible task
dirnames, _ = self._distr_storage.listdir(self._research_id)
dirnames, _ = self._distr_storage.listdir(self._research_dir)
self._tasks_number = 0
for dir_ in dirnames:
if dir_ != 'report':
......@@ -177,7 +191,7 @@ class Research:
print('Number of tasks in the current research: {}'.format(self._tasks_number))
return research_path
def create_task(self, name) -> int:
def create_task(self, name: str) -> int:
"""
Creates a new task in the current research making a new local directory
......@@ -189,7 +203,8 @@ class Research:
os.mkdir(local_task_dir)
return task_number
def grab_task_results(self, task_number, copies_list=[]) -> None:
def grab_task_results(self, task_number: int,
copies_list: Optional[Sequence[CopiesList]] = None):
"""
Moves task content from the remote machine to the local one. Locally, the task content will appear in the task
directory located in the research directory.
......@@ -201,7 +216,7 @@ class Research:
"""
task_results_local_path = self.get_task_path(task_number)
task_results_remote_path = self.get_task_path(task_number, self._remote_comm.host)
if len(copies_list) == 0: # copy all data
if copies_list is None: # copy all data
paths = self._remote_comm.listdir(task_results_remote_path)
for file_or_dir in paths:
self._remote_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path,
......@@ -215,17 +230,16 @@ class Research:
os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])),
os.path.join(task_results_local_path, copy_target['new_name']))
def _make_task_path(self, task_number, task_name, execution_host=None) -> str:
task_path = ''
def _make_task_path(self, task_number: int, task_name: str, at_remote_host=False) -> str:
task_path = None
task_dir = get_task_full_name(task_number, task_name)
if execution_host is None:
task_path = os.path.join(self.research_path, task_dir)
if at_remote_host:
task_path = os.path.join(self._remote_research_root, self._research_dir, task_dir)
else:
rel_task_dir = os.path.join(self._research_id, task_dir)
task_path = os.path.join(execution_host.research_abs_path, rel_task_dir)
task_path = os.path.join(self._local_research_path, task_dir)
return task_path
def get_task_path(self, task_number, at_remote_host=False) -> str:
def get_task_path(self, task_number: int, at_remote_host=False) -> str:
"""
Return absolute task path based on its number
......@@ -233,16 +247,16 @@ class Research:
:param at_remote_host: return the path on the remote machine (if True) or on the local one (if False)
:return: absolute task path
"""
task_path = ''
task_path = None
task_name = self._get_task_name_by_number(task_number)
rel_task_dir = os.path.join(self._research_id, get_task_full_name(task_number, task_name))
rel_task_dir = os.path.join(self._research_dir, get_task_full_name(task_number, task_name))
if at_remote_host:
task_path = '{}/{}'.format(self.remote_research_root, rel_task_dir)
task_path = '{}/{}'.format(self._remote_research_root, rel_task_dir)
else:
task_path = self._distr_storage.get_dir_path(rel_task_dir)
return task_path
def dump_object(self, task_number, obj, obj_name) -> None:
def dump_object(self, task_number: int, obj: object, obj_name: str) -> None:
"""
Dumps any python object (using pickle) to the binary file, named obj_name + '.pyo', in the task directory
associated with the task number
......@@ -257,7 +271,7 @@ class Research:
pickle.dump(obj, f)
f.close()
def load_object(self, task_number, obj_name):
def load_object(self, task_number: int, obj_name: str):
"""
Load any python object dumped using pickle from the binary file, named obj_name + '.pyo' and located in the task
directory associated with the task number
......@@ -276,8 +290,8 @@ class Research:
self._tasks_number += 1
return self._tasks_number - 1
def _get_task_name_by_number(self, task_number) -> str:
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id,
def _get_task_name_by_number(self, task_number: int) -> str:
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_dir,
'^{}-(?P<task_name>\S+)'.format(task_number))
if find_data is None:
raise Exception("No task with number '{}' is found".format(task_number))
......@@ -292,26 +306,26 @@ class ResearchDoesNotExist(Exception):
pass
def make_suitable_name(name) -> str:
def make_suitable_name(name: str) -> str:
return '_'.join(name.split())
def make_suitable_research_name(descr) -> str:
def make_suitable_research_dir(descr: str) -> str:
return '_'.join([str(date.today()), make_suitable_name(descr)])
def get_task_full_name(task_number, task_name) -> str:
def get_task_full_name(task_number: int, task_name: str) -> str:
return str(task_number) + '-' + make_suitable_name(task_name)
def split_task_dir(task_dir) -> (int, str):
def split_task_dir(task_dir: str) -> (int, str):
parsing_params = parse_by_named_regexp(r'^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
if parsing_params is None:
raise Exception("No task directory '{}' is found".format(task_dir))
return int(parsing_params['task_number']), parsing_params['task_name']
def retrieve_trailing_float_from_task_dir(task_dir) -> float:
def retrieve_trailing_float_from_task_dir(task_dir: str) -> float:
matching = re.search(r'^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)',
task_dir)
if matching is None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment