mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-11-25 12:01:40 +01:00
Send peer a list of supported algorithms after the banner exchange. Fixes not only the weird case of an ssh-audit client hanging against an ssh-audit server, but perhaps some real-world hangs as well.
This commit is contained in:
parent
6d84cfdc31
commit
1123ac718c
51
ssh-audit.py
51
ssh-audit.py
@ -45,7 +45,7 @@ from typing import Dict, List, Set, Sequence, Tuple, Iterable
|
|||||||
from typing import Callable, Optional, Union, Any
|
from typing import Callable, Optional, Union, Any
|
||||||
|
|
||||||
VERSION = 'v2.2.1-dev'
|
VERSION = 'v2.2.1-dev'
|
||||||
SSH_HEADER = 'SSH-{0}-OpenSSH_8.0' # SSH software to impersonate
|
SSH_HEADER = 'SSH-{0}-OpenSSH_8.2' # SSH software to impersonate
|
||||||
GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues' # The URL to the Github issues tracker.
|
GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues' # The URL to the Github issues tracker.
|
||||||
|
|
||||||
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
|
# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively.
|
||||||
@ -1133,6 +1133,12 @@ class SSH2: # pylint: disable=too-few-public-methods
|
|||||||
hostkey_modulus_size = 0
|
hostkey_modulus_size = 0
|
||||||
ca_modulus_size = 0
|
ca_modulus_size = 0
|
||||||
|
|
||||||
|
# If the connection still exists, close it so we can test
|
||||||
|
# using a clean slate (otherwise it may exist in a non-testable
|
||||||
|
# state).
|
||||||
|
if s.is_connected():
|
||||||
|
s.close()
|
||||||
|
|
||||||
# For each host key type...
|
# For each host key type...
|
||||||
for host_key_type in host_key_types:
|
for host_key_type in host_key_types:
|
||||||
# Skip those already handled (i.e.: those in the RSA family, as testing one tests them all).
|
# Skip those already handled (i.e.: those in the RSA family, as testing one tests them all).
|
||||||
@ -1146,7 +1152,10 @@ class SSH2: # pylint: disable=too-few-public-methods
|
|||||||
|
|
||||||
# If the connection is closed, re-open it and get the kex again.
|
# If the connection is closed, re-open it and get the kex again.
|
||||||
if not s.is_connected():
|
if not s.is_connected():
|
||||||
s.connect()
|
err = s.connect()
|
||||||
|
if err is not None:
|
||||||
|
return
|
||||||
|
|
||||||
unused = None # pylint: disable=unused-variable
|
unused = None # pylint: disable=unused-variable
|
||||||
unused2 = None # pylint: disable=unused-variable
|
unused2 = None # pylint: disable=unused-variable
|
||||||
unused, unused2, err = s.get_banner()
|
unused, unused2, err = s.get_banner()
|
||||||
@ -1222,7 +1231,10 @@ class SSH2: # pylint: disable=too-few-public-methods
|
|||||||
if s.is_connected():
|
if s.is_connected():
|
||||||
return True
|
return True
|
||||||
|
|
||||||
s.connect()
|
err = s.connect()
|
||||||
|
if err is not None:
|
||||||
|
return False
|
||||||
|
|
||||||
unused = None # pylint: disable=unused-variable
|
unused = None # pylint: disable=unused-variable
|
||||||
unused2 = None # pylint: disable=unused-variable
|
unused2 = None # pylint: disable=unused-variable
|
||||||
unused, unused2, err = s.get_banner()
|
unused, unused2, err = s.get_banner()
|
||||||
@ -2445,7 +2457,8 @@ class SSH: # pylint: disable=too-few-public-methods
|
|||||||
c.settimeout(self.__timeout)
|
c.settimeout(self.__timeout)
|
||||||
self.__sock = c
|
self.__sock = c
|
||||||
|
|
||||||
def connect(self) -> None:
|
def connect(self) -> Optional[str]:
|
||||||
|
'''Returns None on success, or an error string.'''
|
||||||
err = None
|
err = None
|
||||||
for af, addr in self._resolve(self.__ipvo):
|
for af, addr in self._resolve(self.__ipvo):
|
||||||
s = None
|
s = None
|
||||||
@ -2454,7 +2467,7 @@ class SSH: # pylint: disable=too-few-public-methods
|
|||||||
s.settimeout(self.__timeout)
|
s.settimeout(self.__timeout)
|
||||||
s.connect(addr)
|
s.connect(addr)
|
||||||
self.__sock = s
|
self.__sock = s
|
||||||
return
|
return None
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
err = e
|
err = e
|
||||||
self._close_socket(s)
|
self._close_socket(s)
|
||||||
@ -2463,8 +2476,7 @@ class SSH: # pylint: disable=too-few-public-methods
|
|||||||
else:
|
else:
|
||||||
errt = (self.__host, self.__port, err)
|
errt = (self.__host, self.__port, err)
|
||||||
errm = 'cannot connect to {} port {}: {}'.format(*errt)
|
errm = 'cannot connect to {} port {}: {}'.format(*errt)
|
||||||
out.fail('[exception] {}'.format(errm))
|
return '[exception] {}'.format(errm)
|
||||||
sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
|
|
||||||
|
|
||||||
def get_banner(self, sshv: int = 2) -> Tuple[Optional['SSH.Banner'], List[str], Optional[str]]:
|
def get_banner(self, sshv: int = 2) -> Tuple[Optional['SSH.Banner'], List[str], Optional[str]]:
|
||||||
if self.__sock is None:
|
if self.__sock is None:
|
||||||
@ -2522,6 +2534,23 @@ class SSH: # pylint: disable=too-few-public-methods
|
|||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
return -1, str(e.args[-1])
|
return -1, str(e.args[-1])
|
||||||
|
|
||||||
|
def send_algorithms(self) -> None:
|
||||||
|
'''Sends the list of supported host keys, key exchanges, ciphers, and MACs. Emulates OpenSSH v8.2.'''
|
||||||
|
|
||||||
|
key_exchanges = ['curve25519-sha256', 'curve25519-sha256@libssh.org', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 'diffie-hellman-group18-sha512', 'diffie-hellman-group14-sha256']
|
||||||
|
hostkeys = ['rsa-sha2-512', 'rsa-sha2-256', 'ssh-rsa', 'ecdsa-sha2-nistp256', 'ssh-ed25519']
|
||||||
|
ciphers = ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com']
|
||||||
|
macs = ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']
|
||||||
|
compressions = ['none', 'zlib@openssh.com']
|
||||||
|
languages = ['']
|
||||||
|
|
||||||
|
kexparty = SSH2.KexParty(ciphers, macs, compressions, languages)
|
||||||
|
kex = SSH2.Kex(os.urandom(16), key_exchanges, hostkeys, kexparty, kexparty, False, 0)
|
||||||
|
|
||||||
|
self.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||||
|
kex.write(self)
|
||||||
|
self.send_packet()
|
||||||
|
|
||||||
def send_banner(self, banner: str) -> None:
|
def send_banner(self, banner: str) -> None:
|
||||||
self.send(banner.encode() + b'\r\n')
|
self.send(banner.encode() + b'\r\n')
|
||||||
if self.__state < self.SM_BANNER_SENT:
|
if self.__state < self.SM_BANNER_SENT:
|
||||||
@ -3659,7 +3688,11 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
|
|||||||
if aconf.client_audit:
|
if aconf.client_audit:
|
||||||
s.listen_and_accept()
|
s.listen_and_accept()
|
||||||
else:
|
else:
|
||||||
s.connect()
|
err = s.connect()
|
||||||
|
if err is not None:
|
||||||
|
out.fail(err)
|
||||||
|
sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR)
|
||||||
|
|
||||||
if sshv is None:
|
if sshv is None:
|
||||||
sshv = 2 if aconf.ssh2 else 1
|
sshv = 2 if aconf.ssh2 else 1
|
||||||
err = None
|
err = None
|
||||||
@ -3670,6 +3703,8 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
|
|||||||
else:
|
else:
|
||||||
err = '[exception] did not receive banner: {}'.format(err)
|
err = '[exception] did not receive banner: {}'.format(err)
|
||||||
if err is None:
|
if err is None:
|
||||||
|
s.send_algorithms() # Send the algorithms we support (except we don't since this isn't a real SSH connection).
|
||||||
|
|
||||||
packet_type, payload = s.read_packet(sshv)
|
packet_type, payload = s.read_packet(sshv)
|
||||||
if packet_type < 0:
|
if packet_type < 0:
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user