Commit 75bd6c9a authored by Anton Pershin's avatar Anton Pershin

Fixed the ability to create new research in Research and introduced static…

Fixed the ability to create new research in Research and introduced static methods Research.open() and Research.create() instead of Research.create_from_config(). Also refactored the code and comments slightly.
parent 75e9d95a
import os
import pickle
import shutil
from datetime import datetime, date
from datetime import date
from comsdk.comaux import *
from comsdk.communication import *
from comsdk.distributed_storage import *
......@@ -19,8 +18,6 @@ from comsdk.edge import Edge, dummy_predicate
# Local directory hierarchy:
# ROOT for us is the directory where research.py was launch (as it is a part of postproc package, postproc.research is rather used)
# ROOT/RESEARCH_REL_DIR is main directory
# ROOT/RESEARCH_REL_DIR/research.log
# ROOT/RESEARCH_REL_DIR/report
# ROOT/RESEARCH_REL_DIR/1-some_task
# ROOT/RESEARCH_REL_DIR/2-another_task
# ROOT/RESEARCH_REL_DIR/3-one_more_task
......@@ -43,6 +40,7 @@ from comsdk.edge import Edge, dummy_predicate
# 1.3.1 copy results from remote to local
# 1.4 as result, we will have results directly in the task directory
class Research:
"""Research is the main class for interacting with the hierarchy of tasks.
......@@ -82,8 +80,7 @@ class Research:
continuing=False,
local_research_paths=None,
remote_comm=None,
remote_research_path=None,
):
remote_research_path=None):
# Always create local communication here
# Remote communication is optional then
self.local_research_path = local_research_paths[0]
......@@ -92,49 +89,47 @@ class Research:
self._tasks_number = 0
self._local_comm = LocalCommunication(Host())
self._remote_comm = remote_comm
# NOTE: at the moment we do not need features of the distributed storage
#self._distr_storage = DistributedStorage((rset.LOCAL_HOST['main_research_path'], rset.LOCAL_HOST['storage_research_path']), prior_storage_index=1)
self._distr_storage = DistributedStorage(local_research_paths, prior_storage_index=0)
suitable_name = self._make_suitable_name(name)
if not continuing:
# interpret name as name without date
self._research_id = str(date.today()) + '_' + suitable_name
self._research_id = make_suitable_research_name(name)
if self._distr_storage.get_dir_path(self._research_id) is not None:
raise ResearchAlreadyExists("Research with name '{}' already exists, choose another name".format(self._research_id))
raise ResearchAlreadyExists("Research with name '{}' already exists, "
"choose another name".format(self._research_id))
self.research_path = self._distr_storage.make_dir(self._research_id)
print('Started new research at {}'.format(self.research_path))
else:
# interpret name as the full research id
self._research_id = suitable_name
self._research_id = name
self.research_path = self._load_research_data()
@classmethod
def start_research(cls, name,
local_research_path=None,
remote_comm=None,
remote_research_path=None,
):
return Research(name,
local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path)
@classmethod
def continue_research(cls, name,
local_research_path=None,
remote_comm=None,
remote_research_path=None,
):
return Research(name,
continuing=True, local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path)
@classmethod
def create_from_config(cls, research_sid, remote_comm=None):
def open(cls, research_sid, remote_comm=None):
with open('config_research.json', 'r') as f:
conf = json.load(f)
res = Research(conf['RESEARCH'][research_sid],
continuing=True,
local_research_paths=conf['LOCAL_HOST']['research_paths'],
remote_comm=remote_comm,
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path'] if remote_comm is not None else None)
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path']
if remote_comm is not None else None)
res._add_properties(conf['RESEARCH_PROPS'])
return res
@classmethod
def create(cls, new_research_sid, new_research_descr, remote_comm=None):
with open('config_research.json', 'r+') as f:
conf = json.load(f)
conf['RESEARCH'][new_research_sid] = make_suitable_research_name(new_research_descr)
f.seek(0)
json.dump(conf, f, indent=4)
f.truncate()
res = Research(new_research_descr,
continuing=False,
local_research_paths=conf['LOCAL_HOST']['research_paths'],
remote_comm=remote_comm,
remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path']
if remote_comm is not None else None)
res._add_properties(conf['RESEARCH_PROPS'])
return res
......@@ -169,10 +164,12 @@ class Research:
research_path = self._distr_storage.get_dir_path(self._research_id)
if research_path is None:
# assume date was omitted in research id
research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_id))
regexp_for_search = '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_id)
research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', regexp_for_search)
if dir_params is None:
raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_id))
self._research_id = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'], self._research_id)
self._research_id = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'],
self._research_id)
print('Loaded research at {}'.format(research_path))
......@@ -181,7 +178,7 @@ class Research:
self._tasks_number = 0
for dir_ in dirnames:
if dir_ != 'report':
task_number, _ = self._split_task_dir(dir_)
task_number, _ = split_task_dir(dir_)
if task_number > self._tasks_number:
self._tasks_number = task_number
self._tasks_number += 1
......@@ -189,36 +186,38 @@ class Research:
return research_path
def create_task(self, name):
'''
"""
Creates a new task, copies necessary data and executes the command line
'''
"""
task_number = self._get_next_task_number()
local_task_dir = self._make_task_path(task_number, name)
os.mkdir(local_task_dir)
return task_number
def grab_task_results(self, task_number, copies_list=[]):
'''
"""
Moves task content from the remote to the local. Locally, the task content will appear in the task
dir located in the main research location.
'''
"""
task_results_local_path = self.get_task_path(task_number)
task_results_remote_path = self.get_task_path(task_number, self._remote_comm.host)
if len(copies_list) == 0: # copy all data
pathes = self._remote_comm.listdir(task_results_remote_path)
for file_or_dir in pathes:
self._remote_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path, 'from_remote', show_msg=True)
paths = self._remote_comm.listdir(task_results_remote_path)
for file_or_dir in paths:
self._remote_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path,
'from_remote', show_msg=True)
else:
for copy_target in copies_list:
remote_copy_target_path = '/'.join((task_results_remote_path, copy_target['path'])) # we consider copy targets as relative to task's dir
# we consider copy targets as relative to task's dir
remote_copy_target_path = '/'.join((task_results_remote_path, copy_target['path']))
self._remote_comm.copy(remote_copy_target_path, task_results_local_path, 'from_remote', show_msg=True)
if 'new_name' in copy_target:
os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])), \
os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])),
os.path.join(task_results_local_path, copy_target['new_name']))
def _make_task_path(self, task_number, task_name, execution_host=None):
task_path = ''
task_dir = self._get_task_full_name(task_number, task_name)
task_dir = get_task_full_name(task_number, task_name)
if execution_host is None:
task_path = os.path.join(self.research_path, task_dir)
else:
......@@ -227,13 +226,13 @@ class Research:
return task_path
def get_task_path(self, task_number, at_remote_host=False):
'''
"""
Returns the task dir corresponding to task_number. By default, the local dir (from DistrubutedStorage) is returned.
If execution_host is specified, then the remote dir will be returned.
'''
"""
task_path = ''
task_name = self._get_task_name_by_number(task_number)
rel_task_dir = os.path.join(self._research_id, self._get_task_full_name(task_number, task_name))
rel_task_dir = os.path.join(self._research_id, get_task_full_name(task_number, task_name))
if at_remote_host:
task_path = '{}/{}'.format(self.remote_research_path, rel_task_dir)
else:
......@@ -241,22 +240,22 @@ class Research:
return task_path
def dump_object(self, task_number, obj, obj_name):
'''
"""
Dumps obj into the file whose name is obj_name + '.pyo' and locates it into the task dir corresponding to
task_number
'''
"""
print('Dumping ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'w')
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'w')
pickle.dump(obj, f)
f.close()
def load_object(self, task_number, obj_name):
'''
"""
Load an object dumped into the file whose name is obj_name + '.pyo' and which is located it into the task dir
corresponding to task_number
'''
"""
print('Loading ' + obj_name)
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'r')
f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'r')
obj = pickle.load(f)
f.close()
return obj
......@@ -265,39 +264,49 @@ class Research:
self._tasks_number += 1
return self._tasks_number - 1
def _get_task_full_name(self, task_number, task_name):
return str(task_number) + '-' + self._make_suitable_name(task_name)
def _get_task_name_by_number(self, task_number):
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id, '^{}-(?P<task_name>\S+)'.format(task_number))
find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id,
'^{}-(?P<task_name>\S+)'.format(task_number))
if find_data is None:
raise Exception("No task with number '{}' is found".format(task_number))
return find_data[1]['task_name']
def _split_task_dir(self, task_dir):
parsing_params = parse_by_named_regexp('^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
if parsing_params is None:
raise Exception("No task directory '{}' is found".format(task_dir))
return int(parsing_params['task_number']), parsing_params['task_name']
def _make_suitable_name(self, name):
return '_'.join(name.split())
class ResearchAlreadyExists(Exception):
pass
class ResearchDoesNotExist(Exception):
pass
def get_all_research_ids():
return os.listdir('.' + self.research_path)
def make_suitable_name(name):
return '_'.join(name.split())
def make_suitable_research_name(descr):
return '_'.join([str(date.today()), make_suitable_name(descr)])
def get_task_full_name(task_number, task_name):
return str(task_number) + '-' + make_suitable_name(task_name)
def split_task_dir(task_dir):
parsing_params = parse_by_named_regexp(r'^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
if parsing_params is None:
raise Exception("No task directory '{}' is found".format(task_dir))
return int(parsing_params['task_number']), parsing_params['task_name']
def retrieve_trailing_float_from_task_dir(task_dir):
matching = re.search('^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)', task_dir)
matching = re.search(r'^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)',
task_dir)
if matching is None:
raise Exception('Incorrect task directory is given')
return float('{}.{}'.format(matching.group('float_left'), matching.group('float_right')))
class CreateTaskEdge(Edge):
def __init__(self, res, task_name_maker, predicate=dummy_predicate):
self._res = res
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment