edge.py 17.5 KB
Newer Older
Savva Golubitsky's avatar
Savva Golubitsky committed
1
import comsdk.comaux as aux
2
from comsdk.communication import CommunicationError
3
from comsdk.graph import Func
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

from mako.template import Template
import os
import posixpath
import pickle


class InOutMapping(object):
    def __init__(self,
                 keys_mapping={},
                 relative_keys=(),
                 default_relative_key=(),
                ):
        self._default_relative_key = default_relative_key if aux.is_sequence(default_relative_key) else (default_relative_key,)
        self._relative_keys = relative_keys if aux.is_sequence(relative_keys) else (relative_keys,)
        self._keys_mapping = keys_mapping

    def build_proxy_data(self, data, dynamic_keys_mapping={}):
        if self._default_relative_key == () and self._relative_keys == () and self._keys_mapping == {} and dynamic_keys_mapping == {}:
            return data
        else:
            #print('\t{}\n\t{}\n\t{}'.format(self._relative_keys, self._keys_mapping, dynamic_keys_mapping))
            return aux.ProxyDict(data, self._relative_keys, dict(self._keys_mapping, **dynamic_keys_mapping), self._default_relative_key)

class Edge(object):
    __slots__ = [
Savva Golubitsky's avatar
Savva Golubitsky committed
30 31
        'pred_f',
        'morph_f',
32 33
        '_io_mapping',
        'preprocess',
34
        'postprocess',
35 36
        'order',
        'comment'
37
    ]
38 39
    def __init__(self, predicate, morphism, 
                 io_mapping=InOutMapping(),
40 41
                 order=0, 
                 comment=""
42
                 ):
Savva Golubitsky's avatar
Savva Golubitsky committed
43 44
        self.pred_f = predicate
        self.morph_f = morphism
45 46 47
        self._io_mapping = io_mapping
        self.preprocess = lambda pd: None
        self.postprocess = lambda pd: None
48
        self.order = int(0 if order is None else order)
49
        self.comment = comment
50 51 52

    def predicate(self, data, dynamic_keys_mapping={}):
        proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
Savva Golubitsky's avatar
Savva Golubitsky committed
53
        return self.pred_f.func(proxy_data)
54 55

    def morph(self, data, dynamic_keys_mapping={}):
56
        # print(self.pred_name, self.morph_name, self.order)
57
        proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
58
        # print(proxy_data)
59
        self.preprocess(data)
Savva Golubitsky's avatar
Savva Golubitsky committed
60
        self.morph_f.func(proxy_data)
61
        self.postprocess(data)
62
    
63

64 65 66 67 68 69 70 71 72 73
# class DummyEdge(Edge):
    # def __init__(self):
        # super().__init__(None, None)
# 
    # def predicate(self, data, dynamic_keys_mapping={}):
        # return True
# 
    # def morph(self, data, dynamic_keys_mapping={}):
        # self.preprocess(data)
        # self.postprocess(data)
74

75 76
def DummyEdge():
    return Edge(Func(), Func())
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320

class ExecutableProgramEdge(Edge):
    '''
    Class implementing the edge which uses an external program to morph data.
    The program is lauchned via so-called communication which, among others, sets where the program is located and it can be launched.
    Environment can be used to launch program on remote resources.
    # DESCRIPTION OF KEYS MAPPINGS #
    Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
    from data which may be located in different (nested) keys of data (we will call these keys "global"). 
    However, it is very convenient to implement the edge imagining that there is no nested structures 
    and all keys are available in the top-level of data (we will call these keys "local").
    To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
    If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
    Therefore, we have keys mappings for input and output keys.
    # END OF DESCRIPTION OF KEYS MAPPINGS #
    We expect that necessary input files are already on remote.
    Programs may require three types of arguments:
    1) keyword arguments (-somearg something)
    2) flags (-someflag)
    3) trailing arguments
    Local keys determining the corresponding values are located in keyword_names, flag_names and trailing_args_keys.
    Finally, data must be somehow updated after finishing. This will be done by updating data according to output_dict (it is just added)  
    '''
    def __init__(self, program_name, comm,
                 predicate=None,
                 io_mapping=InOutMapping(),
                 output_dict={}, # output dict which will be added to the main dictionary (w.r.t. output_keys_mapping)
                 keyword_names=(), # "local keys" where keyword args are stored
                 flag_names=(), # "local keys" where flags are stored
                 trailing_args_keys=(), # "local keys" where trailing args are stored
                 remote=False,
                 stdout_processor=None,
                 ):
        predicate = predicate if predicate is not None else dummy_predicate
        self._output_dict = output_dict
        self._comm = comm
        self._program_name = program_name
        self._keyword_names = keyword_names
        self._flag_names = flag_names
        self._trailing_args_keys = trailing_args_keys
        self._working_dir_key = '__REMOTE_WORKING_DIR__' if remote else '__WORKING_DIR__'
        self._stdout_processor = stdout_processor
        super().__init__(predicate, self.execute, io_mapping)

    def execute(self, data):
        args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
        working_dir = data[self._working_dir_key]
        stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir) # here we execute
        output_data = self._output_dict
        if self._stdout_processor:
            stdout_data = self._stdout_processor(data, stdout_lines)
            data.update(stdout_data)
        data.update(output_data)

class QsubScriptEdge(Edge):
    '''
    Class implementing the edge which builds up the sh-script for qsub.
    The script is created via communication.
    # DESCRIPTION OF KEYS MAPPINGS #
    Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
    from data which may be located in different (nested) keys of data (we will call these keys "global"). 
    However, it is very convenient to implement the edge imagining that there is no nested structures 
    and all keys are available in the top-level of data (we will call these keys "local").
    To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
    If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
    Therefore, we have keys mappings for input and output keys.
    # END OF DESCRIPTION OF KEYS MAPPINGS #
    Data will be augmented by 'qsub_script' pointing to the local file.
    '''
    def __init__(self, program_name, local_comm, remote_comm,
                 predicate=None,
                 io_mapping=InOutMapping(),
                 keyword_names=(), # "local keys" where keyword args are stored
                 flag_names=(), # "local keys" where flags are stored
                 trailing_args_keys=(), # "local keys" where trailing args are stored
                 ):
        predicate = predicate if predicate is not None else dummy_predicate
        self._local_comm = local_comm
        self._remote_comm = remote_comm
        self._program_name = program_name
        self._keyword_names = keyword_names
        self._flag_names = flag_names
        self._trailing_args_keys = trailing_args_keys
        super().__init__(predicate, self.execute, io_mapping)

    def execute(self, data):
        if isinstance(data, aux.ProxyDict):
            print('QsubScriptEdge -> {}: {}'.format('qsub_script_name', data._keys_mappings['qsub_script_name']))
        qsub_script_path = os.path.join(data['__WORKING_DIR__'], data['qsub_script_name'])
        args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
        program_launch_path = self._remote_comm.host.get_program_launch_path(self._program_name)
        command_line = '{} {}'.format(program_launch_path, args_str)
        render_sge_template(self._remote_comm.host.sge_template_name, qsub_script_path, 
                            data['cores_required'], data['time_required'], (command_line,))
        data.update({'qsub_script': qsub_script_path})

class UploadOnRemoteEdge(Edge):
    '''
    Class implementing the edge which uploads the data to the remote computer.
    It is done via environment which must provide the interface for that.
    # DESCRIPTION OF KEYS MAPPINGS #
    Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
    from data which may be located in different (nested) keys of data (we will call these keys "global"). 
    However, it is very convenient to implement the edge imagining that there is no nested structures 
    and all keys are available in the top-level of data (we will call these keys "local").
    To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
    If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
    Therefore, we have keys mappings for input and output keys.
    # END OF DESCRIPTION OF KEYS MAPPINGS #
    Files for uploading must be found in input_files_keys which is a list of local data keys corresponding to these files.
    They will be uploaded in remote working dir which must be in data['__REMOTE_WORKING_DIR__'].
    After edge execution, data is going to be updated such that local paths will be replaced by remote ones.
    '''
    def __init__(self, comm,
                 predicate=None,
                 io_mapping=InOutMapping(),
                 local_paths_keys=(), # "local keys", needed to build a copy list
                 update_paths=True,
                 already_remote_path_key=None,
                 ):
        predicate = predicate if predicate is not None else dummy_predicate
        self._local_paths_keys = local_paths_keys
        self._comm = comm
        self._update_paths = update_paths
        self._already_remote_path_key = already_remote_path_key
        super().__init__(predicate, self.execute, io_mapping)

    def execute(self, data):
#        print(data)
#        print(data['c_field_path'])
        if self._already_remote_path_key is not None:
            if data[self._already_remote_path_key]:
                return
        remote_working_dir = data['__REMOTE_WORKING_DIR__']
        for key in self._local_paths_keys:
            try:
                # try data[key] as an absolute path
                data[key] = self._comm.copy(data[key], remote_working_dir, mode='from_local')
            except CommunicationError as e:
                # try data[key] as a relative path
                working_dir = data['__WORKING_DIR__']
                if isinstance(data, aux.ProxyDict):
                    print('UploadOnRemoteEdge -> {}: {}'.format(key, data._keys_mappings[key]))
                remote_path = self._comm.copy(os.path.join(working_dir, data[key]), remote_working_dir, mode='from_local')
                if self._update_paths:
                    data[key] = remote_path

class DownloadFromRemoteEdge(Edge):
    '''
    Class implementing the edge which downloads the data from the remote computer.
    It is done via environment which must provide the interface for that.
    # DESCRIPTION OF KEYS MAPPINGS #
    Since data structure is hierarchical, we introduced keys mappings. The edge needs to use some variables
    from data which may be located in different (nested) keys of data (we will call these keys "global"). 
    However, it is very convenient to implement the edge imagining that there is no nested structures 
    and all keys are available in the top-level of data (we will call these keys "local").
    To link global and local keys, we introduce keys mapping, which are either dictionaries (local key string -> sequence) or sequences.
    If the keys mapping is sequence, we treat it as a relative "path" to all needed keys.
    Therefore, we have keys mappings for input and output keys.
    # END OF DESCRIPTION OF KEYS MAPPINGS #
    Files for downloading must be found in output_files_keys which is a list of local data keys corresponding to these files.
    All these files are relative to the remote working dir and will be downloaded into local working dir
    Local working dir must be in data['__LOCAL_WORKING_DIR__'].
    Remote working dir must be in data['__REMOTE_WORKING_DIR__'].
    After edge execution, data is going to be updated such that remote/relative paths will be replaced by local ones.
    '''
    def __init__(self, comm,
                 predicate=None,
                 io_mapping=InOutMapping(),
                 remote_paths_keys=(), # "local keys", needed to build a list for downloading
                 update_paths=True,
                 ):
        predicate = predicate if predicate is not None else dummy_predicate
        self._remote_paths_keys = remote_paths_keys
        self._comm = comm
        self._update_paths = update_paths
        super().__init__(predicate, self.execute, io_mapping)

    def execute(self, data):
        working_dir = data['__WORKING_DIR__']
        remote_working_dir = data['__REMOTE_WORKING_DIR__']
        for key in self._remote_paths_keys:
            output_file_or_dir = data[key]
            local_path = None
            if output_file_or_dir == '*':
                paths = self._comm.listdir(remote_working_dir)
                local_full_paths = ['/'.join([working_dir, file_or_dir]) for file_or_dir in paths]
                remote_full_paths = ['/'.join([remote_working_dir, file_or_dir]) for file_or_dir in paths]
                for file_or_dir in remote_full_paths:
                    self._comm.copy(file_or_dir, working_dir, mode='from_remote')
                local_path = local_full_paths
            else:
                local_path = self._comm.copy('/'.join([remote_working_dir, output_file_or_dir]), working_dir, mode='from_remote')
            if self._update_paths:
                data[key] = local_path

'''
@todo: to be removed
'''
def dummy_edge(data):
    pass

'''
@todo: to be removed
'''
def dummy_predicate(data):
    return True

def job_finished_predicate(data):
    return data['job_finished']

def job_unfinished_predicate(data):
    return not data['job_finished']

def make_cd(key_path):
    def _cd(d):
        if key_path == '..':
            d['__WORKING_DIR__'] = os.path.dirname(d['__WORKING_DIR__'])
            if '__REMOTE_WORKING_DIR__' in d:
                d['__REMOTE_WORKING_DIR__'] = posixpath.dirname(d['__REMOTE_WORKING_DIR__'])
        else:
            subdir = aux.recursive_get(d, key_path)
            d['__WORKING_DIR__'] = os.path.join(d['__WORKING_DIR__'], subdir)
            if '__REMOTE_WORKING_DIR__' in d:
                d['__REMOTE_WORKING_DIR__'] = posixpath.join(d['__REMOTE_WORKING_DIR__'], subdir)
    return _cd

def make_dump(dump_name_format, format_keys=(), omit=None):
    def _dump(d):
        format_params = [aux.recursive_get(d, key) for key in format_keys]
        with open(os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params)), 'wb') as f:
            if omit is None:
                dumped_d = d
            else:
                dumped_d = {key: val for key, val in d.items() if not key in omit}
            pickle.dump(dumped_d, f)
    return _dump

def make_composite_func(*funcs):
    def _composite(d):
        for func in funcs:
            func(d)
    return _composite

321 322 323 324 325 326 327 328
def make_composite_predicate(*preds):
    def _composite(d):
        for pred in preds:
            if not pred(d):
                return False
        return True
    return _composite

329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
def create_local_data_from_global_data(global_data, keys_mapping):
    if keys_mapping is None:
        return global_data
    elif aux.is_sequence(keys_mapping):
        return aux.recursive_get(global_data, keys_mapping)
    else:    
        return {local_key: aux.recursive_get(global_data, global_key) for local_key, global_key in keys_mapping.items()}

def update_global_data_according_to_local_data(local_data, global_data, keys_mapping):
    if keys_mapping is None:
        global_data.update(local_data)
    elif aux.is_sequence(keys_mapping):
        relative_data = aux.recursive_get(global_data, keys_mapping)
        relative_data.update(local_data)
    else:
        for local_key, global_key in keys_mapping.items():
            recursive_set(global_data, global_key, local_data[local_key])

def build_args_line(data, keyword_names, flag_names, trailing_args_keys):
    args_str = ''
    for keyword in keyword_names:
        if keyword in data:
            args_str += '-{} {} '.format(keyword, data[keyword])
    for flag in flag_names:
        if flag in data and data[flag]:
            args_str += '-{} '.format(flag)
    for place_i, trailing_arg_key in enumerate(trailing_args_keys):
        # if we have a sequence under the key, we expand it
        if trailing_arg_key in data:
            trailing_arg = data[trailing_arg_key]
            args_str += ' '.join(map(str, trailing_arg)) if aux.is_sequence(trailing_arg) else trailing_arg
            args_str += ' '
    return args_str

def render_sge_template(sge_template_name, sge_script_path, cores, time, commands):
    sge_templ_path = os.path.join(aux.get_templates_path(), sge_template_name)
    if not os.path.exists(sge_templ_path): # by default, templates are in templates/, but here we let the user put any path
        sge_templ_path = sge_template_name
    f = open(sge_templ_path, 'r')
    rendered_data = Template(f.read()).render(cores=cores, time=time, commands=commands)
    sge_script_file = aux.create_file_mkdir(sge_script_path)
    sge_script_file.write(rendered_data)