Commit 13297d87 authored by Anton Pershin's avatar Anton Pershin

Добавил поддержку ssh-аутентификации через RSA ключи. Добавил обработку ошибок…

Добавил поддержку ssh-аутентификации через RSA ключи. Добавил обработку ошибок при попытке скачать/загрузить/выполнить что-либо через ssh/sftp.
parent 4a430185
...@@ -5,6 +5,7 @@ import paramiko ...@@ -5,6 +5,7 @@ import paramiko
import subprocess import subprocess
import shlex import shlex
import json import json
import socket
from stat import S_ISDIR from stat import S_ISDIR
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
...@@ -160,20 +161,17 @@ class LocalCommunication(BaseCommunication): ...@@ -160,20 +161,17 @@ class LocalCommunication(BaseCommunication):
aux.rm(target) aux.rm(target)
class SshCommunication(BaseCommunication): class SshCommunication(BaseCommunication):
def __init__(self, remote_host, username, password, machine_name=''): def __init__(self, remote_host, username, password, machine_name='', pkey=None):
if not isinstance(remote_host, RemoteHost): if not isinstance(remote_host, RemoteHost):
Exception('Only RemoteHost can be used to build SshCommunication') Exception('Only RemoteHost can be used to build SshCommunication')
self.host = remote_host self.host = remote_host
self.username = username self.username = username
self.password = password self.password = password
self.pkey = pkey
self.ssh_client = paramiko.SSHClient() self.ssh_client = paramiko.SSHClient()
self.sftp_client = None self.sftp_client = None
#self.main_dir = '/nobackup/mmap/research' #self.main_dir = '/nobackup/mmap/research'
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.connect()
self.ssh_client.connect(self.host.ssh_host, username=username, password=password)
transport = self.ssh_client.get_transport()
transport.packetizer.REKEY_BYTES = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
transport.packetizer.REKEY_PACKETS = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
paramiko.util.log_to_file('paramiko.log') paramiko.util.log_to_file('paramiko.log')
super().__init__(self.host, machine_name) super().__init__(self.host, machine_name)
...@@ -189,20 +187,22 @@ class SshCommunication(BaseCommunication): ...@@ -189,20 +187,22 @@ class SshCommunication(BaseCommunication):
job_finished_checker=hostconf['job_finished_checker']) job_finished_checker=hostconf['job_finished_checker'])
_add_programs_from_config(remote_host, hostconf) _add_programs_from_config(remote_host, hostconf)
return SshCommunication(remote_host, username=hostconf['username'], return SshCommunication(remote_host, username=hostconf['username'],
password=hostconf['password'], password=hostconf['password'] if 'password' in hostconf else None,
machine_name=host_sid) machine_name=host_sid,
pkey=hostconf['pkey'] if 'pkey' in hostconf else None)
def __getstate__(self): def __getstate__(self):
return { return {
'host': self.host.__getstate__(), 'host': self.host.__getstate__(),
'username': self.username, 'username': self.username,
'password': self.password, 'password': self.password,
'pkey': self.pkey,
} }
def __setstate__(self, state): def __setstate__(self, state):
remote_host = RemoteHost.__new__(RemoteHost) remote_host = RemoteHost.__new__(RemoteHost)
remote_host.__setstate__(state['host']) remote_host.__setstate__(state['host'])
self.__init__(remote_host, state['username'], state['password']) self.__init__(remote_host, state['username'], state['password'], pkey=state['pkey'])
def execute(self, command, working_dir=None): def execute(self, command, working_dir=None):
if self.ssh_client is None: if self.ssh_client is None:
...@@ -210,7 +210,22 @@ class SshCommunication(BaseCommunication): ...@@ -210,7 +210,22 @@ class SshCommunication(BaseCommunication):
#self._print_exec_msg(command, is_remote=True) #self._print_exec_msg(command, is_remote=True)
command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command) command_line = command if working_dir is None else 'cd {}; {}'.format(working_dir, command)
def _cleanup():
print('\t\tMSG: Reboot SSH client')
self.reboot()
cleanup = _cleanup
received = False
while not received:
try:
stdin, stdout, stderr = self.ssh_client.exec_command(command_line) stdin, stdout, stderr = self.ssh_client.exec_command(command_line)
received = True
except (OSError, socket.timeout, socket.error, paramiko.sftp.SFTPError) as e:
print('\t\tMSG: Catched {} exception while executing "{}"'.format(type(e).__name__, command_line))
print('\t\tMSG: It says: {}'.format(e))
else:
cleanup = lambda: None
cleanup()
return stdout.readlines(), stderr.readlines() return stdout.readlines(), stderr.readlines()
# for line in stdout: # for line in stdout:
# print('\t\t' + line.strip('\n')) # print('\t\t' + line.strip('\n'))
...@@ -277,11 +292,52 @@ class SshCommunication(BaseCommunication): ...@@ -277,11 +292,52 @@ class SshCommunication(BaseCommunication):
@enable_sftp @enable_sftp
def _get(self, remote_path, local_path): def _get(self, remote_path, local_path):
return self.sftp_client.get(remote_path, local_path) def _cleanup():
print('\t\tMSG: Reboot SSH client')
self.reboot()
if os.path.exists(local_path):
aux.rm(local_path)
cleanup = _cleanup
received = False
while not received:
try:
res = self.sftp_client.get(remote_path, local_path)
received = True
except FileNotFoundError as e:
if os.path.exists(local_path):
aux.rm(local_path)
raise
except (socket.timeout, socket.error, paramiko.sftp.SFTPError) as e:
print('\t\tMSG: Catched {} exception while getting "{}"'.format(type(e).__name__, remote_path))
print('\t\tMSG: It says: {}'.format(e))
else:
cleanup = lambda: None
cleanup()
return res
@enable_sftp @enable_sftp
def _put(self, local_path, remote_path): def _put(self, local_path, remote_path):
return self.sftp_client.put(local_path, remote_path) def _cleanup():
print('\t\tMSG: Reboot SSH client')
self.reboot()
self.rm(remote_path)
cleanup = _cleanup
received = False
while not received:
try:
res = self.sftp_client.put(local_path, remote_path)
received = True
except FileNotFoundError as e:
self.rm(remote_path)
raise
except (socket.timeout, socket.error, paramiko.sftp.SFTPError) as e:
print('\t\tMSG: Catched {} exception while putting "{}"'.format(type(e).__name__, remote_path))
print('\t\tMSG: It says: {}'.format(e))
else:
cleanup = lambda: None
cleanup()
return res
def _is_remote_dir(self, path): def _is_remote_dir(self, path):
try: try:
...@@ -319,11 +375,52 @@ class SshCommunication(BaseCommunication): ...@@ -319,11 +375,52 @@ class SshCommunication(BaseCommunication):
def disconnect(self): def disconnect(self):
if self.sftp_client is not None: if self.sftp_client is not None:
self.sftp_client.close() self.sftp_client.close()
self.sftp_client = None
self.ssh_client.close() self.ssh_client.close()
def connect(self):
self.ssh_client.load_system_host_keys()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connected = False
# read ssh config. We assume that all necessary re-routing are done there via ProxyCommand
# only ProxyCommand is read; password should be passed explicitly to SshCommunication
ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
with open(user_config_file) as f:
ssh_config.parse(f)
user_config = ssh_config.lookup(self.host.ssh_host)
sock = None
if 'proxycommand' in user_config:
sock = paramiko.ProxyCommand(user_config['proxycommand'])
while not connected:
try:
if self.pkey is not None: # if a private key is given, first attempt to connect using it
self.ssh_client.connect(self.host.ssh_host, username=self.username, key_filename=self.pkey, timeout=10, sock=sock)
else: # otherwise try to connect via password using it is given
self.ssh_client.connect(self.host.ssh_host, username=self.username, password=self.password, look_for_keys=False, allow_agent=False, timeout=10, sock=sock)
connected = True
except socket.timeout as e:
print('\t\tMSG: Catched {} exception while connecting'.format(type(e).__name__))
print('\t\tMSG: It says: {}'.format(e))
transport = self.ssh_client.get_transport()
transport.packetizer.REKEY_BYTES = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
transport.packetizer.REKEY_PACKETS = pow(2, 40) # 1TB max, this is a security degradation (otherwise we get "paramiko.ssh_exception.SSHException: Key-exchange timed out waiting for key negotiation")
def reboot(self):
self.disconnect()
self.connect()
self._init_sftp()
def _init_sftp(self): def _init_sftp(self):
if self.sftp_client is None: if self.sftp_client is None:
self.sftp_client = self.ssh_client.open_sftp() self.sftp_client = self.ssh_client.open_sftp()
self.sftp_client.get_channel().settimeout(10)
class CommunicationError(Exception): class CommunicationError(Exception):
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment