Commit 5c05d4ae authored by Anna-Maria Amelkina's avatar Anna-Maria Amelkina

изменения в comsdk

parent ca70311b
import os import os
import os.path import os.path
import shutil import shutil
import paramiko
import subprocess import subprocess
import shlex import shlex
import json import json
...@@ -10,6 +9,8 @@ from stat import S_ISDIR ...@@ -10,6 +9,8 @@ from stat import S_ISDIR
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
import logging import logging
import paramiko
import comsdk.comaux as aux import comsdk.comaux as aux
...@@ -30,12 +31,21 @@ class Host(object): ...@@ -30,12 +31,21 @@ class Host(object):
self.commands[cmd_name] = cmd self.commands[cmd_name] = cmd
def get_program_launch_path(self, prog_name): def get_program_launch_path(self, prog_name):
if prog_name not in self.programs:
raise ValueError(f'Program "{prog_name}" is not recognized. '
'Please add this program to "custom_programs" '
'in the corresponding host in the config file '
'if you want to use it.')
path_to_prog = self.programs[prog_name] path_to_prog = self.programs[prog_name]
if path_to_prog is not None: if path_to_prog is not None:
return self.programs[prog_name] + '/' + prog_name print(self.programs[prog_name], prog_name)
return self.join_path(self.programs[prog_name], prog_name)
else: else:
return prog_name return prog_name
def join_path(self, *path_list):
return os.path.join(*path_list)
class RemoteHost(Host): class RemoteHost(Host):
''' '''
...@@ -69,6 +79,10 @@ class RemoteHost(Host): ...@@ -69,6 +79,10 @@ class RemoteHost(Host):
self.set_job_id = aux.load_function_from_module(state['job_setter']) self.set_job_id = aux.load_function_from_module(state['job_setter'])
self.check_task_finished = aux.load_function_from_module(state['job_finished_checker']) self.check_task_finished = aux.load_function_from_module(state['job_finished_checker'])
def join_path(self, *path_list):
# For RemoteHost, we assume that it is posix-based
return '/'.join(path_list)
# Decorator # Decorator
def enable_sftp(func): def enable_sftp(func):
...@@ -152,12 +166,20 @@ class LocalCommunication(BaseCommunication): ...@@ -152,12 +166,20 @@ class LocalCommunication(BaseCommunication):
return LocalCommunication(local_host) return LocalCommunication(local_host)
def execute(self, command, working_dir=None): def execute(self, command, working_dir=None):
command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command) if working_dir is None:
#print('\t' + command_line) command_line = command
# use PIPEs to avoid breaking the child process when the parent process finishes else:
# (works on Linux, solution for Windows is to add creationflags=0x00000010 instead of stdout, stderr, stdin) if os.name == 'posix':
command_line = 'cd {}; {}'.format(working_dir, command)
elif os.name == 'nt':
command_line = ''
if working_dir[0] != 'C':
command_line += f'{working_dir[0]}: && '
command_line += 'cd {} && {}'.format(working_dir, command)
#self._print_exec_msg(command_line, is_remote=False) #self._print_exec_msg(command_line, is_remote=False)
subprocess.call([command_line], shell=True) #res = subprocess.call([command_line], shell=True)
# print(command_line)
res = subprocess.run(command_line, shell=True)
return [], [] return [], []
def copy(self, from_, to_, mode='from_local', show_msg=False): def copy(self, from_, to_, mode='from_local', show_msg=False):
......
...@@ -53,7 +53,8 @@ class Edge: ...@@ -53,7 +53,8 @@ class Edge:
'postprocess', 'postprocess',
'order', 'order',
'comment', 'comment',
'mandatory_keys' 'mandatory_keys',
'use_proxy_data'
] ]
def __init__(self, predicate, morphism, def __init__(self, predicate, morphism,
...@@ -70,6 +71,7 @@ class Edge: ...@@ -70,6 +71,7 @@ class Edge:
self.order = int(0 if order is None else order) self.order = int(0 if order is None else order)
self.comment = comment self.comment = comment
self.mandatory_keys = mandatory_keys self.mandatory_keys = mandatory_keys
self.use_proxy_data=False
def predicate(self, data, dynamic_keys_mapping={}): def predicate(self, data, dynamic_keys_mapping={}):
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping) proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
...@@ -79,9 +81,15 @@ class Edge: ...@@ -79,9 +81,15 @@ class Edge:
#print(self.pred_name, self.morph_name, self.order) #print(self.pred_name, self.morph_name, self.order)
proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping) proxy_data = self._io_mapping.build_proxy_data(data, dynamic_keys_mapping)
# print(proxy_data) # print(proxy_data)
if (self.use_proxy_data):
self.preprocess(proxy_data)
else:
self.preprocess(data) self.preprocess(data)
self._throw_if_not_set(proxy_data, self.mandatory_keys) self._throw_if_not_set(proxy_data, self.mandatory_keys)
self.morph_f.func(proxy_data) self.morph_f.func(proxy_data)
if (self.use_proxy_data):
self.postprocess(proxy_data)
else:
self.postprocess(data) self.postprocess(data)
def _throw_if_not_set(self, data, mandatory_keys: Sequence[str]): def _throw_if_not_set(self, data, mandatory_keys: Sequence[str]):
...@@ -124,8 +132,8 @@ class ExecutableProgramEdge(Edge): ...@@ -124,8 +132,8 @@ class ExecutableProgramEdge(Edge):
trailing_args_keys=(), # "local keys" where trailing args are stored trailing_args_keys=(), # "local keys" where trailing args are stored
remote=False, remote=False,
stdout_processor=None, stdout_processor=None,
chaining_command_at_start='', chaining_command_at_start=lambda d: '',
chaining_command_at_end='', chaining_command_at_end=lambda d: '',
): ):
#predicate = predicate if predicate is not None else dummy_predicate #predicate = predicate if predicate is not None else dummy_predicate
self._output_dict = output_dict self._output_dict = output_dict
...@@ -145,8 +153,8 @@ class ExecutableProgramEdge(Edge): ...@@ -145,8 +153,8 @@ class ExecutableProgramEdge(Edge):
args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys) args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
working_dir = data[self._working_dir_key] working_dir = data[self._working_dir_key]
stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir, stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir,
self.chaining_command_at_start, self.chaining_command_at_start(data),
self.chaining_command_at_end) self.chaining_command_at_end(data))
output_data = self._output_dict output_data = self._output_dict
if self._stdout_processor: if self._stdout_processor:
stdout_data = self._stdout_processor(data, stdout_lines) stdout_data = self._stdout_processor(data, stdout_lines)
...@@ -338,6 +346,9 @@ def make_dump(dump_name_format, format_keys=(), omit=None, method='pickle'): ...@@ -338,6 +346,9 @@ def make_dump(dump_name_format, format_keys=(), omit=None, method='pickle'):
dump_path = os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params)) dump_path = os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params))
if omit is None: if omit is None:
dumped_d = d dumped_d = d
else:
if (isinstance(d, aux.ProxyDict)):
dumped_d = {key: val for key, val in d._data.items() if not key in omit}
else: else:
dumped_d = {key: val for key, val in d.items() if not key in omit} dumped_d = {key: val for key, val in d.items() if not key in omit}
if method == 'pickle': if method == 'pickle':
......
import pickle import pickle
from datetime import date from datetime import date
from typing import Sequence, Mapping from typing import Sequence, Mapping, TypedDict
from typing_extensions import TypedDict
from comsdk.comaux import * from comsdk.comaux import *
from comsdk.communication import BaseCommunication, LocalCommunication, SshCommunication, Host from comsdk.communication import BaseCommunication, LocalCommunication, SshCommunication, Host
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment