import os import pickle import shutil from datetime import datetime, date from comsdk.aux import * from comsdk.communication import * from comsdk.distributed_storage import * from comsdk.edge import Edge, dummy_predicate # Create RESEARCH-ID. It is a small research which should link different local directories (with reports and time-integration) and ssh directories (with continuation, for example) # What is included in RESEARCH? # 1) It should be multistep. So we can always continue RESEARCH if we want # 2) Everything should be dated and timed! # 3) For every research we should create local directory having format @date@_@name@. Inside we should have directory "report" and file research.log where all stuff is stored # If there are some remote calculations, then there should be "remote" directory where all that stuff is organized. Another directory is "local" which contains local results # Somehow we should save python script reproducing the results (think about it). # 4) # Local directory hierarchy: # ROOT for us is the directory where research.py was launch (as it is a part of postproc package, postproc.research is rather used) # ROOT/RESEARCH_REL_DIR is main directory # ROOT/RESEARCH_REL_DIR/research.log # ROOT/RESEARCH_REL_DIR/report # ROOT/RESEARCH_REL_DIR/1-some_task # ROOT/RESEARCH_REL_DIR/2-another_task # ROOT/RESEARCH_REL_DIR/3-one_more_task # As we can have multiple remotes, a remote root directory should be set somewhere out of Research. # One possible solution is a factory for Task. # Anyway, we create directory hierarchy relative to REMOTE_ROOT as follows: # REMOTE_ROOT/RESEARCH_REL_DIR/1-some_task # REMOTE_ROOT/RESEARCH_REL_DIR/2-another_task # REMOTE_ROOT/RESEARCH_REL_DIR/3-one_more_task # Some usual cases: # 1) Remote calculation on some cluster. # 1.1 COPY-EXECUTE TASK # 1.1.1 copy input files from local to remote # 1.1.2 copy program from remote to remote # 1.1.3 execute program # 1.2 wait for finishing (at this moment, we just have to wait and check) # 1.3 COPY-TASK # 1.3.1 copy results from remote to local # 1.4 as result, we will have results directly in the task directory class Research: """Research is the main class for interacting with the hierarchy of tasks. It allows to: (1) find location of a task by its number (2) store/load object into/from a task's dir (3) create new tasks by launching TaskExecution locally or on remotes (4) launch TaskExecution in already existing task's directory (5) grab task's content from remotes The main idea behind Research is that we collect tasks in the research's dir and make them enumerated. Each task is completely identified by its number. Its content, in turn, is located in the task's dir which is made up by concatenation of the number and the task's name that must be specified by a user. Next, we distinguish two logical places where tasks are collected: local one and remote one. Both places are distributed, but in a different manner: the local place is distributed over the locally accesible space on the current machine (say, some tasks may be located in dir path/to/storage and another tasks in another_path/to/storage) whereas the remote place is distributed over different machines. For the local place, we use DistributedStorage class to access the data. The remote place is accessed by setting a necessary SshCommunication in the Research class constructor. To avoid the mess in directories, we assume the following rules: (1) intersection between tasks' dirs of the same research dir on different location on the local machine is empty (2) all tasks are presented locally (in any location defined in DistributedStorage) at least as empty dirs (3) intersection between tasks' dirs of the same research dir on different remotes is empty (4) union of them Therefore, all tasks located remotely must map to the local machine (but it is not true for an opposite case!). Hence, when the task intended to be executed on a remote is created, we create both a remote directory and a local directory for this task. To be consistent in creating tasks locally, we choose a main (master) directory in DistributedStorage in which we create by default tasks directories whereas other local directories are assumed to be for storing purposes. The instance of Research is created by passing to it BaseCommunication object. If the instance is intended to launch tasks on remotes or grab tasks from remotes, then the corresponding BaseCommunication for interaction must be passed. By default, LocalCommunication is used and the remote interaction is thus disabled. """ def __init__(self, name, continuing=False, local_research_path=None, remote_comm=None, remote_research_path=None, ): # Always create local communication here # Remote communication is optional then self.local_research_path = local_research_path self.local_main_path = os.path.dirname(local_research_path) self.remote_research_path = remote_research_path self._tasks_number = 0 self._local_comm = LocalCommunication(Host()) self._remote_comm = remote_comm # NOTE: at the moment we do not need features of the distributed storage #self._distr_storage = DistributedStorage((rset.LOCAL_HOST['main_research_path'], rset.LOCAL_HOST['storage_research_path']), prior_storage_index=1) self._distr_storage = DistributedStorage((self.local_research_path,), prior_storage_index=0) suitable_name = self._make_suitable_name(name) if not continuing: # interpret name as name without date self._research_id = str(date.today()) + '_' + suitable_name if self._distr_storage.get_dir_path(self._research_id) is not None: raise ResearchAlreadyExists("Research with name '{}' already exists, choose another name".format(self._research_id)) self.research_path = self._distr_storage.make_dir(self._research_id) print('Started new research at {}'.format(self.research_path)) else: # interpret name as the full research id self._research_id = suitable_name self.research_path = self._load_research_data() @classmethod def start_research(cls, name, local_research_path=None, remote_comm=None, remote_research_path=None, ): return Research(name, local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path) @classmethod def continue_research(cls, name, local_research_path=None, remote_comm=None, remote_research_path=None, ): return Research(name, continuing=True, local_research_path=local_research_path, remote_comm=remote_comm, remote_research_path=remote_research_path) @classmethod def create_from_config(cls, research_sid, remote_comm=None): with open('config_research.json', 'r') as f: conf = json.load(f) res = Research(conf['RESEARCH'][research_sid], continuing=True, local_research_path=conf['LOCAL_HOST']['research_path'], remote_comm=remote_comm, remote_research_path=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_path']) res._add_properties(conf['RESEARCH_PROPS']) return res def __getstate__(self): return { 'research_id': self._research_id, 'local_research_path': self.local_research_path, 'remote_research_path': self.remote_research_path, 'remote_comm': self._remote_comm.__getstate__(), } def __setstate__(self, state): self._tasks_number = 0 self._local_comm = LocalCommunication(Host()) self.local_research_path = state['local_research_path'] self.remote_research_path = state['remote_research_path'] self._remote_comm = None if state['remote_comm'] is not None: self._remote_comm = SshCommunication.__new__(SshCommunication) self._remote_comm.__setstate__(state['remote_comm']) self._distr_storage = DistributedStorage((self.local_research_path,), prior_storage_index=0) self._research_id = state['research_id'] self.research_path = self._load_research_data() def _add_properties(self, props): for prop_name, prop_value in props.items(): self.__setattr__(prop_name, prop_value) def _load_research_data(self): # find corresponding date/name # construct object from all data inside research_path = self._distr_storage.get_dir_path(self._research_id) if research_path is None: # assume date was omitted in research id research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_id)) if dir_params is None: raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_id)) self._research_id = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'], self._research_id) print('Loaded research at {}'.format(research_path)) # determine maximum task number to set the number for the next possible task dirnames, _ = self._distr_storage.listdir(self._research_id) self._tasks_number = 0 for dir_ in dirnames: if dir_ != 'report': task_number, _ = self._split_task_dir(dir_) if task_number > self._tasks_number: self._tasks_number = task_number self._tasks_number += 1 print('Number of tasks in the current research: {}'.format(self._tasks_number)) return research_path def create_task(self, name): ''' Creates a new task, copies necessary data and executes the command line ''' task_number = self._get_next_task_number() local_task_dir = self._make_task_path(task_number, name) os.mkdir(local_task_dir) return task_number def grab_task_results(self, task_number, copies_list=[]): ''' Moves task content from the remote to the local. Locally, the task content will appear in the task dir located in the main research location. ''' task_results_local_path = self.get_task_path(task_number) task_results_remote_path = self.get_task_path(task_number, self._exec_comm.host) if len(copies_list) == 0: # copy all data pathes = self._exec_comm.listdir(task_results_remote_path) for file_or_dir in pathes: self._exec_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path, 'from_remote') else: for copy_target in copies_list: remote_copy_target_path = '/'.join((task_results_remote_path, copy_target['path'])) # we consider copy targets as relative to task's dir self._exec_comm.copy(remote_copy_target_path, task_results_local_path, 'from_remote') if 'new_name' in copy_target: os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])), \ os.path.join(task_results_local_path, copy_target['new_name'])) def _make_task_path(self, task_number, task_name, execution_host=None): task_path = '' rel_task_dir = os.path.join(self._research_id, self._get_task_full_name(task_number, task_name)) if execution_host is None: task_path = os.path.join(self.local_research_path, rel_task_dir) else: task_path = os.path.join(execution_host.research_abs_path, rel_task_dir) return task_path def get_task_path(self, task_number, at_remote_host=False): ''' Returns the task dir corresponding to task_number. By default, the local dir (from DistrubutedStorage) is returned. If execution_host is specified, then the remote dir will be returned. ''' task_path = '' task_name = self._get_task_name_by_number(task_number) rel_task_dir = os.path.join(self._research_id, self._get_task_full_name(task_number, task_name)) if at_remote_host: task_path = '{}/{}'.format(self.remote_research_path, rel_task_dir) else: task_path = self._distr_storage.get_dir_path(rel_task_dir) return task_path def dump_object(self, task_number, obj, obj_name): ''' Dumps obj into the file whose name is obj_name + '.pyo' and locates it into the task dir corresponding to task_number ''' print('Dumping ' + obj_name) f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'w') pickle.dump(obj, f) f.close() def load_object(self, task_number, obj_name): ''' Load an object dumped into the file whose name is obj_name + '.pyo' and which is located it into the task dir corresponding to task_number ''' print('Loading ' + obj_name) f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'),'r') obj = pickle.load(f) f.close() return obj def _get_next_task_number(self): self._tasks_number += 1 return self._tasks_number - 1 def _get_task_full_name(self, task_number, task_name): return str(task_number) + '-' + self._make_suitable_name(task_name) def _get_task_name_by_number(self, task_number): find_data = self._distr_storage.find_dir_by_named_regexp(self._research_id, '^{}-(?P<task_name>\S+)'.format(task_number)) if find_data is None: raise Exception("No task with number '{}' is found".format(task_number)) return find_data[1]['task_name'] def _split_task_dir(self, task_dir): parsing_params = parse_by_named_regexp('^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir) if parsing_params is None: raise Exception("No task directory '{}' is found".format(task_dir)) return int(parsing_params['task_number']), parsing_params['task_name'] def _make_suitable_name(self, name): return '_'.join(name.split()) class ResearchAlreadyExists(Exception): pass class ResearchDoesNotExist(Exception): pass def get_all_research_ids(): return os.listdir('.' + self.local_research_path) def retrieve_trailing_float_from_task_dir(task_dir): matching = re.search('^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)', task_dir) if matching is None: raise Exception('Incorrect task directory is given') return float('{}.{}'.format(matching.group('float_left'), matching.group('float_right'))) class CreateTaskEdge(Edge): def __init__(self, res, task_name_maker, predicate=dummy_predicate): self._res = res self._task_name_maker = task_name_maker super().__init__(predicate, self.execute) def execute(self, data): task_name = self._task_name_maker(data) task_number = self._res.create_task(task_name) data['__WORKING_DIR__'] = self._res.get_task_path(task_number) data['__REMOTE_WORKING_DIR__'] = self._res.get_task_path(task_number, at_remote_host=True)