Commit ca70311b authored by Anton Pershin's avatar Anton Pershin

Forgotten commit

parent 0dcaa554
......@@ -125,9 +125,10 @@ class BaseCommunication(metaclass=ABCMeta):
'''
pass
def execute_program(self, prog_name, args_str, working_dir=None):
def execute_program(self, prog_name, args_str, working_dir=None, chaining_command_at_start='',
chaining_command_at_end=''):
prog_path = self.host.get_program_launch_path(prog_name)
command = '{} {}'.format(prog_path, args_str)
command = f'{chaining_command_at_start} {prog_path} {args_str} {chaining_command_at_end}'
return self.execute(command, working_dir)
def _print_copy_msg(self, from_, to_):
......@@ -172,19 +173,20 @@ class LocalCommunication(BaseCommunication):
class SshCommunication(BaseCommunication):
def __init__(self, remote_host, username, password, machine_name='', pkey=None):
def __init__(self, remote_host, username, password, machine_name='', pkey=None, execute_after_connection=None):
if not isinstance(remote_host, RemoteHost):
Exception('Only RemoteHost can be used to build SshCommunication')
self.host = remote_host
self.username = username
self.password = password
self.pkey = pkey
self.execute_after_connection = execute_after_connection
self.ssh_client = paramiko.SSHClient()
self.sftp_client = None
#self.main_dir = '/nobackup/mmap/research'
super().__init__(self.host, machine_name)
self.connect()
paramiko.util.log_to_file('paramiko.log')
super().__init__(self.host, machine_name)
@classmethod
def create_from_config(cls, host_sid):
......@@ -200,7 +202,8 @@ class SshCommunication(BaseCommunication):
return SshCommunication(remote_host, username=hostconf['username'],
password=hostconf['password'] if 'password' in hostconf else None,
machine_name=host_sid,
pkey=hostconf['pkey'] if 'pkey' in hostconf else None)
pkey=hostconf['pkey'] if 'pkey' in hostconf else None,
execute_after_connection=hostconf['execute_after_connection'] if 'execute_after_connection' in hostconf else None)
def __getstate__(self):
return {
......@@ -208,19 +211,23 @@ class SshCommunication(BaseCommunication):
'username': self.username,
'password': self.password,
'pkey': self.pkey,
'execute_after_connection': self.execute_after_connection,
}
def __setstate__(self, state):
remote_host = RemoteHost.__new__(RemoteHost)
remote_host.__setstate__(state['host'])
self.__init__(remote_host, state['username'], state['password'], pkey=state['pkey'])
self.__init__(remote_host, state['username'], state['password'], pkey=state['pkey'],
execute_after_connection=state['execute_after_connection'])
def execute(self, command, working_dir=None):
if self.ssh_client is None:
raise Exception('Remote host is not set')
#self._print_exec_msg(command, is_remote=True)
self._print_exec_msg(command, is_remote=True)
command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command)
command_line = command_line if self.execute_after_connection is None else f'{self.execute_after_connection}; {command_line}'
print(command_line)
def _cleanup():
print('\t\tMSG: Reboot SSH client')
......@@ -237,11 +244,11 @@ class SshCommunication(BaseCommunication):
else:
cleanup = lambda: None
cleanup()
for line in stdout:
print('\t\t' + line.strip('\n'))
for line in stderr:
print('\t\t' + line.strip('\n'))
return stdout.readlines(), stderr.readlines()
# for line in stdout:
# print('\t\t' + line.strip('\n'))
# for line in stderr:
# print('\t\t' + line.strip('\n'))
def copy(self, from_, to_, mode='from_local', show_msg=False):
if self.ssh_client is None:
......@@ -414,6 +421,7 @@ class SshCommunication(BaseCommunication):
if self.pkey is not None: # if a private key is given, first attempt to connect using it
self.ssh_client.connect(self.host.ssh_host, username=self.username, key_filename=self.pkey, timeout=10, sock=sock)
else: # otherwise try to connect via password using it is given
print(self.host.ssh_host, self.username)
self.ssh_client.connect(self.host.ssh_host, username=self.username, password=self.password, look_for_keys=False, allow_agent=False, timeout=10, sock=sock)
connected = True
except socket.timeout as e:
......@@ -424,6 +432,9 @@ class SshCommunication(BaseCommunication):
transport.packetizer.REKEY_BYTES = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
transport.packetizer.REKEY_PACKETS = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
if self.execute_after_connection is not None:
self.execute(self.execute_after_connection)
def reboot(self):
self.disconnect()
self.connect()
......
......@@ -14,8 +14,8 @@ from comsdk.graph import Func, State
dummy_predicate = Func(func=lambda d: True)
dummy_morphism = Func()
job_finished_predicate = Func(func= lambda d: d['job_finished'])
job_unfinished_predicate = Func(func= lambda d: not d['job_finished'])
job_finished_predicate = Func(func=lambda d: d['job_finished'])
job_unfinished_predicate = Func(func=lambda d: not d['job_finished'])
class InOutMapping(object):
......@@ -124,6 +124,8 @@ class ExecutableProgramEdge(Edge):
trailing_args_keys=(), # "local keys" where trailing args are stored
remote=False,
stdout_processor=None,
chaining_command_at_start='',
chaining_command_at_end='',
):
#predicate = predicate if predicate is not None else dummy_predicate
self._output_dict = output_dict
......@@ -135,12 +137,16 @@ class ExecutableProgramEdge(Edge):
self._working_dir_key = '__REMOTE_WORKING_DIR__' if remote else '__WORKING_DIR__'
mandatory_keys = [self._working_dir_key]
self._stdout_processor = stdout_processor
self.chaining_command_at_start = chaining_command_at_start
self.chaining_command_at_end = chaining_command_at_end
super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
def execute(self, data):
args_str = build_args_line(data, self._keyword_names, self._flag_names, self._trailing_args_keys)
working_dir = data[self._working_dir_key]
stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir) # here we execute
stdout_lines, stderr_lines = self._comm.execute_program(self._program_name, args_str, working_dir,
self.chaining_command_at_start,
self.chaining_command_at_end)
output_data = self._output_dict
if self._stdout_processor:
stdout_data = self._stdout_processor(data, stdout_lines)
......@@ -221,9 +227,7 @@ class UploadOnRemoteEdge(Edge):
self._comm = comm
self._update_paths = update_paths
self._already_remote_path_key = already_remote_path_key
mandatory_keys = list(self._local_paths_keys) + ['__WORKING_DIR__',
'__REMOTE_WORKING_DIR__', 'qsub_script_name', 'time_required',
'cores_required']
mandatory_keys = list(self._local_paths_keys) + ['__WORKING_DIR__', '__REMOTE_WORKING_DIR__']
if self._already_remote_path_key is not None:
mandatory_keys.append(self._already_remote_path_key)
super().__init__(predicate, Func(func=self.execute), io_mapping, mandatory_keys=mandatory_keys)
......@@ -299,7 +303,13 @@ class DownloadFromRemoteEdge(Edge):
self._comm.copy(file_or_dir, working_dir, mode='from_remote', show_msg=self._show_msg)
local_path = local_full_paths
else:
file_or_dir = '/'.join([remote_working_dir, output_file_or_dir])
output_file_or_dir_as_list = []
if isinstance(output_file_or_dir, list):
output_file_or_dir_as_list = output_file_or_dir
else:
output_file_or_dir_as_list = [output_file_or_dir]
for f in output_file_or_dir_as_list:
file_or_dir = '/'.join([remote_working_dir, f])
aux.print_msg_if_allowed('\tAm going to download "{}" to "{}"'.format(file_or_dir, working_dir),
allow=self._show_msg)
local_path = self._comm.copy(file_or_dir, working_dir,
......@@ -322,15 +332,22 @@ def make_cd(key_path):
return _cd
def make_dump(dump_name_format, format_keys=(), omit=None):
def make_dump(dump_name_format, format_keys=(), omit=None, method='pickle'):
def _dump(d):
format_params = [aux.recursive_get(d, key) for key in format_keys]
with open(os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params)), 'wb') as f:
dump_path = os.path.join(d['__WORKING_DIR__'], dump_name_format.format(*format_params))
if omit is None:
dumped_d = d
else:
dumped_d = {key: val for key, val in d.items() if not key in omit}
if method == 'pickle':
with open(dump_path, 'wb') as f:
pickle.dump(dumped_d, f)
elif method == 'json':
with open(dump_path, 'w') as f:
json.dump(dumped_d, f)
else:
raise ValueError(f'Method "{method}" is not supported in dumping')
return _dump
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment