research.py 18.2 KB
Newer Older
1
import pickle
2
from datetime import date
3
from typing import Sequence, Mapping, TypedDict
4

Savva Golubitsky's avatar
Savva Golubitsky committed
5
from comsdk.comaux import *
6
from comsdk.communication import BaseCommunication, LocalCommunication, SshCommunication, Host
7
from comsdk.distributed_storage import *
8 9
from comsdk.edge import Func, Edge, dummy_predicate
from comsdk.graph import Graph, State
10

11 12
CopiesList = TypedDict('CopiesList', {'path': str, 'new_name': str})

13

14
class Research:
15

16
    """
17 18 19 20 21 22 23
    Class Research is a representation of a group of different calculations collected into what we call a Research.
    Each ''calculation'' corresponds to the launch of a graph-based scenario which produces a set of files which we
    treat as the results of the calculation. We thus call such a calculation a task. Therefore, a Research is a
    collection of tasks. Each task is associated with a single directory (within the code, it may be denoted as
    task_dir, if only directory name is of interest, or task_path, if the absolute path is of interest) whose name has
    a very simple structure, @number@-@long_name@, so that each task is associated with its own unique number (also
    called task_number within the code). Normally, one should use the task number to get any task-related information.
24 25 26 27 28 29
    All the tasks are located in the research directory whose the local (remote) absolute path is set by the class
    property local_research_path (remote_research_path). The research directory has the following pattern:
    @date@_@long_name@. Finally, we associate a short Research ID with each Research. The described structure is
    independent of where these directories are located. It is assumed that there is a local root for research and
    its remote analog. The latter should be available via any protocol supported by communication module. Class Research
    can thus be set up in two regimes: local (remote_comm is None) and local-remote (remote_comm is not None).
30 31 32

    Typically, one should construct an instance of Research based on the configuration file called config_research.json.
    There are two static functions for this purpose: Research.open() and Research.create(). The former creates an
33
    instance of Research based on the existing Research (one should pass its Research ID to open()) described in the
34 35
    configuration file and the latter creates a new Research (thus, making a new directory in the local filesystem) and
    adds all the necessary information about it in the configuration file. Also, any Research instance is automatically
36
    augmented by the properties listed in 'RESEARCH_PROPS' dictionary in the configuration file.
37 38 39 40 41 42 43 44 45 46 47 48 49 50

    For the Research constructor to understand where all the research directories are located, one must supply (either
    directly in the constructor or in the configuration file) the potential root paths for the search (both for the
    local and remote machines if the latter is specified). The first path in the list of the potential root paths is
    called the default root path. A new Research will be created in the default path.

    Note that different tasks belonging to the same research (i.e., they are associated with the same Research ID) may
    be located at different root paths. When creating a new task, it will be located in the default root path.

    .. todo::
        Some way for saving auxiliary information about research and tasks (task date and description, for example)
        should be implemented. Possibly, the same should be done for launcher scripts.

    """
51
    def __init__(self, name: str,
52
                 continuing=False,
53 54 55
                 local_research_roots: Optional[Sequence[str]] = None,
                 remote_comm: Optional[BaseCommunication] = None,
                 remote_research_root: Optional[str] = None):
56
        """
57
        :param name: research description (if continuing == False) or research directory (if continuing == True)
58 59 60 61 62
        :param continuing: if False, the Research with be read from the root path. Otherwise, a new one will be created
        :param local_research_roots: a list of local paths where research directories are searched for
        :param remote_comm: BaseCommunication instance used for communication with remote machine
        :param remote_research_root: path on the remote machine where research directories are searched for
        """
63
        self._local_research_root = local_research_roots[0]
64
        self._local_root = os.path.dirname(self._local_research_root)
65
        self._remote_research_root = remote_research_root
66
        self._tasks_number = 0
67
        self._local_comm = LocalCommunication(Host())  # local communication created automatically, no need to pass it
68
        self._remote_comm = remote_comm
69
        self._distr_storage = DistributedStorage(local_research_roots, prior_storage_index=0)
70
        self._local_research_path = None
71 72
        if not continuing:
            # interpret name as name without date
73 74
            self._research_dir = make_suitable_research_dir(name)
            if self._distr_storage.get_dir_path(self._research_dir) is not None:
75
                raise ResearchAlreadyExists("Research with name '{}' already exists, "
76 77 78
                                            "choose another name".format(self._research_dir))
            self._local_research_path = self._distr_storage.make_dir(self._research_dir)
            print('Started new research at {}'.format(self._local_research_path))
79 80
        else:
            # interpret name as the full research id
81 82
            self._research_dir = name
            self._local_research_path = self._load_research_data()
83 84

    @classmethod
85 86
    def open(cls, research_id: str,
             remote_comm: Optional[BaseCommunication] = None):
87 88 89 90 91
        """
        :param research_id: Research ID used to find a relevant research
        :param remote_comm: BaseCommunication instance used for communication with remote machine
        :return: new Research instance
        """
92 93
        with open('config_research.json', 'r') as f:
            conf = json.load(f)
94
        res = Research(conf['RESEARCH'][research_id],
95
                       continuing=True,
96 97 98
                       local_research_roots=conf['LOCAL_HOST']['research_roots'],
                       remote_comm=remote_comm,
                       remote_research_root=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_root']
99 100 101 102 103
                                            if remote_comm is not None else None)
        res._add_properties(conf['RESEARCH_PROPS'])
        return res

    @classmethod
104 105
    def create(cls, new_research_id: str, new_research_descr: str,
               remote_comm: Optional[BaseCommunication] = None):
106 107 108 109 110 111
        """
        :param new_research_id: Research ID (short name for this research)
        :param new_research_descr: relatively long research name
        :param remote_comm: BaseCommunication instance used for communication with remote machine
        :return: new Research instance
        """
112 113
        with open('config_research.json', 'r+') as f:
            conf = json.load(f)
114
            conf['RESEARCH'][new_research_id] = make_suitable_research_dir(new_research_descr)
115 116 117 118 119
            f.seek(0)
            json.dump(conf, f, indent=4)
            f.truncate()
        res = Research(new_research_descr,
                       continuing=False,
120
                       local_research_roots=conf['LOCAL_HOST']['research_roots'],
121
                       remote_comm=remote_comm,
122
                       remote_research_root=conf['REMOTE_HOSTS'][remote_comm.machine_name]['research_root']
123
                                            if remote_comm is not None else None)
124 125 126
        res._add_properties(conf['RESEARCH_PROPS'])
        return res

127
    @property
128 129
    def local_research_path(self) -> str:
        return self._local_research_path
130 131

    @property
132 133 134
    def remote_research_path(self) -> str:
        return os.path.join(self._remote_research_root, self._research_dir)

135 136 137 138
    @property
    def local_root(self) -> str:
        return self._local_root

139 140 141
    @property
    def research_dir(self) -> str:
        return self._research_dir
142 143

    def __getstate__(self) -> dict:
144
        return {
145 146 147
            'research_dir': self._research_dir,
            'local_research_path': self._local_research_root,
            'remote_research_path': self._remote_research_root,
148 149 150 151 152 153
            'remote_comm': self._remote_comm.__getstate__(),
        }

    def __setstate__(self, state):
        self._tasks_number = 0
        self._local_comm = LocalCommunication(Host())
154 155
        self._local_research_root = state['local_research_path']
        self._remote_research_root = state['remote_research_path']
156 157 158 159
        self._remote_comm = None
        if state['remote_comm'] is not None:
            self._remote_comm = SshCommunication.__new__(SshCommunication)
            self._remote_comm.__setstate__(state['remote_comm'])
160 161
        self._distr_storage = DistributedStorage((self._local_research_root,), prior_storage_index=0)
        self._research_dir = state['research_dir']
162
        self._research_path = self._load_research_data()
163

164
    def _add_properties(self, props: Mapping[str, Any]) -> None:
165 166 167
        for prop_name, prop_value in props.items():
            self.__setattr__(prop_name, prop_value)

168
    def _load_research_data(self) -> str:
169 170
        # find corresponding date/name
        # construct object from all data inside
171
        research_path = self._distr_storage.get_dir_path(self._research_dir)
172
        if research_path is None:
173 174 175 176 177 178 179 180 181
            raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_dir))
#        if research_path is None:
#            # assume date was omitted in research id
#            regexp_for_search = '^(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)_{}'.format(self._research_name)
#            research_path, dir_params = self._distr_storage.find_dir_by_named_regexp('', regexp_for_search)
#            if dir_params is None:
#                raise ResearchDoesNotExist("Research '{}' does not exist".format(self._research_name))
#            self._research_name = '{}-{}-{}_{}'.format(dir_params['year'], dir_params['month'], dir_params['day'],
#                                                       self._research_name)
182 183 184 185

        print('Loaded research at {}'.format(research_path))

        # determine maximum task number to set the number for the next possible task
186
        dirnames, _ = self._distr_storage.listdir(self._research_dir)
187 188 189
        self._tasks_number = 0
        for dir_ in dirnames:
            if dir_ != 'report':
190
                task_number, _ = split_task_dir(dir_)
191 192 193 194 195 196
                if task_number > self._tasks_number:
                    self._tasks_number = task_number
        self._tasks_number += 1
        print('Number of tasks in the current research: {}'.format(self._tasks_number))
        return research_path

197
    def create_task(self, name: str) -> int:
198
        """
199 200 201 202
        Creates a new task in the current research making a new local directory

        :param name: task name
        :return: task number
203
        """
204 205 206 207 208
        task_number = self._get_next_task_number()
        local_task_dir = self._make_task_path(task_number, name)
        os.mkdir(local_task_dir)
        return task_number

209 210
    def grab_task_results(self, task_number: int,
                          copies_list: Optional[Sequence[CopiesList]] = None):
211
        """
212 213 214 215 216 217 218
        Moves task content from the remote machine to the local one. Locally, the task content will appear in the task
        directory located in the research directory.

        :param task_number: task number
        :param copies_list: a list defining which objects we wish to copy from the remote machine. It consists of
        dictionaries each having keys 'path' (path of object we wish to copy relative to the task directory) and
        'new_name' (path of this object on the local machine relative to the task directory)
219
        """
220
        task_results_local_path = self.get_task_path(task_number)
221
        task_results_remote_path = self.get_task_path(task_number, self._remote_comm.host)
222
        if copies_list is None: # copy all data
223 224 225 226
            paths = self._remote_comm.listdir(task_results_remote_path)
            for file_or_dir in paths:
                self._remote_comm.copy('/'.join((task_results_remote_path, file_or_dir)), task_results_local_path,
                                       'from_remote', show_msg=True)
227 228
        else:
            for copy_target in copies_list:
229 230
                # we consider copy targets as relative to task's dir
                remote_copy_target_path = '/'.join((task_results_remote_path, copy_target['path']))
231
                self._remote_comm.copy(remote_copy_target_path, task_results_local_path, 'from_remote', show_msg=True)
232
                if 'new_name' in copy_target:
233
                    os.rename(os.path.join(task_results_local_path, os.path.basename(copy_target['path'])),
234 235
                              os.path.join(task_results_local_path, copy_target['new_name']))

236 237
    def _make_task_path(self, task_number: int, task_name: str, at_remote_host=False) -> str:
        task_path = None
238
        task_dir = get_task_full_name(task_number, task_name)
239 240
        if at_remote_host:
            task_path = os.path.join(self._remote_research_root, self._research_dir, task_dir)
241
        else:
242
            task_path = os.path.join(self._local_research_path, task_dir)
243 244
        return task_path

245
    def get_task_path(self, task_number: int, at_remote_host=False) -> str:
246
        """
247 248 249 250 251
        Return absolute task path based on its number

        :param task_number: task number
        :param at_remote_host: return the path on the remote machine (if True) or on the local one (if False)
        :return: absolute task path
252
        """
253
        task_path = None
254
        task_name = self._get_task_name_by_number(task_number)
255
        rel_task_dir = os.path.join(self._research_dir, get_task_full_name(task_number, task_name))
256
        if at_remote_host:
257 258
            if self._remote_comm is None:
                raise ValueError('Cannot get a task path on the remote: remote communication is not set up')
259
            task_path = '{}/{}'.format(self._remote_research_root, rel_task_dir)
260 261 262 263
        else:
            task_path = self._distr_storage.get_dir_path(rel_task_dir)
        return task_path

264
    def dump_object(self, task_number: int, obj: object, obj_name: str) -> None:
265
        """
266 267 268 269 270 271
        Dumps any python object (using pickle) to the binary file, named obj_name + '.pyo', in the task directory
        associated with the task number

        :param task_number: task number
        :param obj: any python object
        :param obj_name: file name to which obj will be saved (without extension)
272
        """
273

274
        print('Dumping ' + obj_name)
275
        f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'w')
276 277 278
        pickle.dump(obj, f)
        f.close()

279
    def load_object(self, task_number: int, obj_name: str):
280
        """
281 282 283 284 285 286
        Load any python object dumped using pickle from the binary file, named obj_name + '.pyo' and located in the task
        directory associated with the task number

        :param task_number: task number
        :param obj_name: file name from which obj will be loaded (without extension)
        :return: python object
287
        """
288
        print('Loading ' + obj_name)
289
        f = open(os.path.join(self.get_task_path(task_number), obj_name + '.pyo'), 'r')
290 291 292 293
        obj = pickle.load(f)
        f.close()
        return obj

294
    def _get_next_task_number(self) -> int:
295 296 297
        self._tasks_number += 1
        return self._tasks_number - 1

298 299
    def _get_task_name_by_number(self, task_number: int) -> str:
        find_data = self._distr_storage.find_dir_by_named_regexp(self._research_dir,
300
                                                                 '^{}-(?P<task_name>\S+)'.format(task_number))
301 302 303 304 305 306 307 308
        if find_data is None:
            raise Exception("No task with number '{}' is found".format(task_number))
        return find_data[1]['task_name']


class ResearchAlreadyExists(Exception):
    pass

309

310 311 312
class ResearchDoesNotExist(Exception):
    pass

313

314
def make_suitable_name(name: str) -> str:
315 316 317 318
    return '-'.join(name.split())


def make_suitable_task_name(name: str) -> str:
319 320 321
    return '_'.join(name.split())


322
def make_suitable_research_dir(descr: str) -> str:
323
    return '-'.join([str(date.today()), make_suitable_name(descr)])
324 325


326
def get_task_full_name(task_number: int, task_name: str) -> str:
327
    return str(task_number) + '-' + make_suitable_task_name(task_name)
328 329


330
def split_task_dir(task_dir: str) -> (int, str):
331 332 333 334 335
    parsing_params = parse_by_named_regexp(r'^(?P<task_number>\d+)-(?P<task_name>\S+)', task_dir)
    if parsing_params is None:
        raise Exception("No task directory '{}' is found".format(task_dir))
    return int(parsing_params['task_number']), parsing_params['task_name']

336

337
def retrieve_trailing_float_from_task_dir(task_dir: str) -> float:
338 339
    matching = re.search(r'^(?P<task_number>\d+)-(?P<task_name>\S+)_(?P<float_left>\d+)\.(?P<float_right>\d+)',
                         task_dir)
340 341 342 343
    if matching is None:
        raise Exception('Incorrect task directory is given')
    return float('{}.{}'.format(matching.group('float_left'), matching.group('float_right')))

344

345
class CreateTaskEdge(Edge):
346
    def __init__(self, res, task_name_maker, predicate=dummy_predicate, remote=False):
347 348
        self._res = res
        self._task_name_maker = task_name_maker
349 350
        self._remote = remote
        super().__init__(predicate, Func(func=self.execute))
351 352 353 354 355

    def execute(self, data):
        task_name = self._task_name_maker(data)
        task_number = self._res.create_task(task_name)
        data['__WORKING_DIR__'] = self._res.get_task_path(task_number)
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
        if self._remote:
            data['__REMOTE_WORKING_DIR__'] = self._res.get_task_path(task_number, at_remote_host=True)


class CreateTaskGraph(Graph):
    def __init__(self, res, task_name_maker, array_keys_mapping=None, remote=False):
        s_init, s_term = self.create_branch(res, task_name_maker, array_keys_mapping=array_keys_mapping, remote=remote)
        super().__init__(s_init, s_term)

    @staticmethod
    def create_branch(res, task_name_maker, array_keys_mapping=None, remote=False):
        s_init = State('READY_FOR_TASK_CREATION', array_keys_mapping=array_keys_mapping)
        s_term = State('TASK_CREATED')
        s_init.connect_to(s_term, edge=CreateTaskEdge(res, task_name_maker=task_name_maker, remote=remote))
        return s_init, s_term