mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-11-21 18:11:40 +01:00
Added RSA certificate auditing.
This commit is contained in:
parent
33ae2946ea
commit
ee5dde1cde
357
ssh-audit.py
357
ssh-audit.py
@ -462,14 +462,14 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
# type: () -> int
|
||||
return self.__unused
|
||||
|
||||
def set_rsa_hostkey_size(self, rsa_type, rsa_hostkey_size):
|
||||
self.__rsa_key_sizes[rsa_type] = rsa_hostkey_size;
|
||||
def set_rsa_key_size(self, rsa_type, hostkey_size, ca_size=-1):
|
||||
self.__rsa_key_sizes[rsa_type] = (hostkey_size, ca_size)
|
||||
|
||||
def rsa_hostkey_sizes(self):
|
||||
def rsa_key_sizes(self):
|
||||
return self.__rsa_key_sizes
|
||||
|
||||
def set_dh_modulus_size(self, gex_alg, modulus_size):
|
||||
self.__dh_modulus_sizes[gex_alg] = modulus_size
|
||||
self.__dh_modulus_sizes[gex_alg] = (modulus_size, -1)
|
||||
|
||||
def dh_modulus_sizes(self):
|
||||
return self.__dh_modulus_sizes
|
||||
@ -522,9 +522,10 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
# Obtains RSA host keys and checks their size.
|
||||
class RSAKeyTest(object):
|
||||
RSA_TYPES = ['ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512']
|
||||
RSA_CA_TYPES = ['ssh-rsa-cert-v01@openssh.com']
|
||||
|
||||
@staticmethod
|
||||
def run(s, kex):
|
||||
def run(s, server_kex):
|
||||
KEX_TO_DHGROUP = {
|
||||
'diffie-hellman-group1-sha1': KexGroup1,
|
||||
'diffie-hellman-group14-sha1': KexGroup14_SHA1,
|
||||
@ -543,57 +544,97 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
|
||||
# Pick the first kex algorithm that the server supports, which we
|
||||
# happen to support as well.
|
||||
selected_kex_str = None
|
||||
kex_str = None
|
||||
kex_group = None
|
||||
for server_kex_alg in kex.kex_algorithms:
|
||||
for server_kex_alg in server_kex.kex_algorithms:
|
||||
if server_kex_alg in KEX_TO_DHGROUP:
|
||||
selected_kex_str = server_kex_alg
|
||||
kex_group = KEX_TO_DHGROUP[server_kex_alg]()
|
||||
kex_str = server_kex_alg
|
||||
kex_group = KEX_TO_DHGROUP[kex_str]()
|
||||
break
|
||||
|
||||
if kex_str is not None:
|
||||
SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_TYPES)
|
||||
SSH2.RSAKeyTest.__test(s, server_kex, kex_str, kex_group, SSH2.RSAKeyTest.RSA_CA_TYPES, ca=True)
|
||||
|
||||
@staticmethod
|
||||
def __test(s, server_kex, kex_str, kex_group, rsa_types, ca=False):
|
||||
# If the server supports one of the RSA types, extract its key size.
|
||||
modulus_size = 0
|
||||
if selected_kex_str is not None:
|
||||
for rsa_type in SSH2.RSAKeyTest.RSA_TYPES:
|
||||
if rsa_type in kex.key_algorithms:
|
||||
hostkey_modulus_size = 0
|
||||
ca_modulus_size = 0
|
||||
ran_test = False
|
||||
|
||||
# Send the server our KEXINIT message, using only our
|
||||
# selected kex and RSA type. Send the server's own
|
||||
# list of ciphers and MACs back to it (this doesn't
|
||||
# matter, really).
|
||||
client_kex = SSH2.Kex(os.urandom(16), [selected_kex_str], [rsa_type], kex.client, kex.server, 0, 0)
|
||||
# If the connection is closed, re-open it and get the kex again.
|
||||
if not s.is_connected():
|
||||
s.connect()
|
||||
unused1, unused2, err = s.get_banner()
|
||||
if err is not None:
|
||||
s.close()
|
||||
return
|
||||
|
||||
s.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||
client_kex.write(s)
|
||||
s.send_packet()
|
||||
# Parse the server's initial KEX.
|
||||
packet_type, payload = s.read_packet()
|
||||
SSH2.Kex.parse(payload)
|
||||
|
||||
# Do the initial DH exchange. The server responds back
|
||||
# with the host key and its length. Bingo.
|
||||
kex_group.send_init(s)
|
||||
kex_group.recv_reply(s)
|
||||
modulus_size = kex_group.get_hostkey_size()
|
||||
|
||||
# We only need to test one RSA type, since the others
|
||||
# will all be the same.
|
||||
for rsa_type in rsa_types:
|
||||
if rsa_type in server_kex.key_algorithms:
|
||||
ran_test = True
|
||||
|
||||
# Send the server our KEXINIT message, using only our
|
||||
# selected kex and RSA type. Send the server's own
|
||||
# list of ciphers and MACs back to it (this doesn't
|
||||
# matter, really).
|
||||
client_kex = SSH2.Kex(os.urandom(16), [kex_str], [rsa_type], server_kex.client, server_kex.server, 0, 0)
|
||||
|
||||
s.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||
client_kex.write(s)
|
||||
s.send_packet()
|
||||
|
||||
# Do the initial DH exchange. The server responds back
|
||||
# with the host key and its length. Bingo.
|
||||
kex_group.send_init(s)
|
||||
kex_group.recv_reply(s)
|
||||
|
||||
hostkey_modulus_size = kex_group.get_hostkey_size()
|
||||
ca_modulus_size = kex_group.get_ca_size()
|
||||
|
||||
# If we're not working with the CA types, we only need to
|
||||
# test one RSA key, since the others will all be the same.
|
||||
if ca is False:
|
||||
break
|
||||
|
||||
if modulus_size > 0:
|
||||
|
||||
if hostkey_modulus_size > 0 or ca_modulus_size > 0:
|
||||
# Set the hostkey size for all RSA key types since 'ssh-rsa',
|
||||
# 'rsa-sha2-256', etc. are all using the same host key.
|
||||
for rsa_type in SSH2.RSAKeyTest.RSA_TYPES:
|
||||
kex.set_rsa_hostkey_size(rsa_type, modulus_size)
|
||||
# Note, however, that this may change in the future.
|
||||
if ca is False:
|
||||
for rsa_type in rsa_types:
|
||||
server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size)
|
||||
else:
|
||||
server_kex.set_rsa_key_size(rsa_type, hostkey_modulus_size, ca_modulus_size)
|
||||
|
||||
# Keys smaller than 2048 result in a failure.
|
||||
fail = False
|
||||
if modulus_size < 2048:
|
||||
if hostkey_modulus_size < 2048 or (ca_modulus_size < 2048 and ca_modulus_size > 0):
|
||||
fail = True
|
||||
|
||||
# If this is a bad key size, update the database accordingly.
|
||||
if fail:
|
||||
for rsa_type in SSH2.RSAKeyTest.RSA_TYPES:
|
||||
if ca is False:
|
||||
for rsa_type in SSH2.RSAKeyTest.RSA_TYPES:
|
||||
alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
|
||||
alg_list.append(['using small %d-bit modulus' % hostkey_modulus_size])
|
||||
else:
|
||||
alg_list = SSH2.KexDB.ALGORITHMS['key'][rsa_type]
|
||||
alg_list.append(['using small %d-bit modulus' % modulus_size])
|
||||
|
||||
min_modulus = min(hostkey_modulus_size, ca_modulus_size)
|
||||
min_modulus = min_modulus if min_modulus > 0 else max(hostkey_modulus_size, ca_modulus_size)
|
||||
alg_list.append(['using small %d-bit modulus' % min_modulus])
|
||||
|
||||
# If we ran any tests, close the socket, as the connection has
|
||||
# been put in a state that later tests can't use.
|
||||
if ran_test:
|
||||
s.close()
|
||||
|
||||
# Performs DH group exchanges to find what moduli are supported, and checks
|
||||
# their size.
|
||||
@ -602,13 +643,15 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
# Creates a new connection to the server. Returns an SSH.Socket, or
|
||||
# None on failure.
|
||||
@staticmethod
|
||||
def reconnect(ipvo, host, port, gex_alg):
|
||||
s = SSH.Socket(host, port)
|
||||
s.connect(ipvo)
|
||||
def reconnect(s, gex_alg):
|
||||
if s.is_connected():
|
||||
return
|
||||
|
||||
s.connect()
|
||||
unused, unused, err = s.get_banner()
|
||||
if err is not None:
|
||||
s.close()
|
||||
return None
|
||||
return False
|
||||
|
||||
# Parse the server's initial KEX.
|
||||
packet_type, payload = s.read_packet(2)
|
||||
@ -620,32 +663,31 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
s.write_byte(SSH.Protocol.MSG_KEXINIT)
|
||||
client_kex.write(s)
|
||||
s.send_packet()
|
||||
return s
|
||||
return True
|
||||
|
||||
# Runs the DH moduli test against the specified target.
|
||||
@staticmethod
|
||||
def run(ipvo, host, port, s, kex):
|
||||
def run(s, kex):
|
||||
GEX_ALGS = {
|
||||
'diffie-hellman-group-exchange-sha1': KexGroupExchange_SHA1,
|
||||
'diffie-hellman-group-exchange-sha256': KexGroupExchange_SHA256,
|
||||
}
|
||||
|
||||
# The previous RSA tests put the server in a state we can't
|
||||
# test. So we need a new connection to start with a clean
|
||||
# slate.
|
||||
if s.is_connected():
|
||||
s.close()
|
||||
|
||||
# Check if the server supports any of the group-exchange
|
||||
# algorithms. If so, test each one.
|
||||
for gex_alg in GEX_ALGS:
|
||||
if gex_alg in kex.kex_algorithms:
|
||||
|
||||
# The previous RSA tests put the server in a state we can't
|
||||
# test. So we need a new connection to start with a clean
|
||||
# slate.
|
||||
if s is not None:
|
||||
s.close()
|
||||
s = None
|
||||
|
||||
s = SSH2.GEXTest.reconnect(ipvo, host, port, gex_alg)
|
||||
if s is None:
|
||||
if SSH2.GEXTest.reconnect(s, gex_alg) is False:
|
||||
break
|
||||
|
||||
|
||||
kex_group = GEX_ALGS[gex_alg]()
|
||||
smallest_modulus = -1
|
||||
|
||||
@ -657,12 +699,11 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
# Its been observed that servers will return a group
|
||||
# larger than the requested max. So just because we
|
||||
# got here, doesn't mean the server is vulnerable...
|
||||
smallest_modulus = kex_group.get_modulus_size()
|
||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
s.close()
|
||||
s = None
|
||||
|
||||
# Try an array of specific modulus sizes... one at a time.
|
||||
reconnect_failed = False
|
||||
@ -673,21 +714,22 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
if smallest_modulus > 0 and bits >= smallest_modulus:
|
||||
break
|
||||
|
||||
if s is None:
|
||||
s = SSH2.GEXTest.reconnect(ipvo, host, port, gex_alg)
|
||||
if s is None:
|
||||
reconnect_failed = True
|
||||
break
|
||||
if SSH2.GEXTest.reconnect(s, gex_alg) is False:
|
||||
reconnect_failed = True
|
||||
break
|
||||
|
||||
try:
|
||||
kex_group.send_init(s, bits, bits, bits)
|
||||
kex_group.recv_reply(s)
|
||||
smallest_modulus = kex_group.get_modulus_size()
|
||||
smallest_modulus = kex_group.get_dh_modulus_size()
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
# The server is in a state that is not re-testable,
|
||||
# so there's nothing else to do with this open
|
||||
# connection.
|
||||
s.close()
|
||||
s = None
|
||||
|
||||
|
||||
if smallest_modulus > 0:
|
||||
kex.set_dh_modulus_size(gex_alg, smallest_modulus)
|
||||
@ -699,7 +741,7 @@ class SSH2(object): # pylint: disable=too-few-public-methods
|
||||
# For 'diffie-hellman-group-exchange-sha256', add
|
||||
# a failure reason.
|
||||
if len(lst) == 1:
|
||||
lst.append(text)
|
||||
lst.append([text])
|
||||
# For 'diffie-hellman-group-exchange-sha1', delete
|
||||
# the existing failure reason (which is vague), and
|
||||
# insert our own.
|
||||
@ -975,6 +1017,10 @@ class ReadBuf(object):
|
||||
# type: () -> text_type
|
||||
return self._buf.readline().rstrip().decode('utf-8', 'replace')
|
||||
|
||||
def reset(self):
|
||||
self._buf = BytesIO()
|
||||
self._len = 0
|
||||
super(ReadBuf, self).reset()
|
||||
|
||||
class WriteBuf(object):
|
||||
def __init__(self, data=None):
|
||||
@ -1064,6 +1110,9 @@ class WriteBuf(object):
|
||||
self._wbuf.seek(0)
|
||||
return payload
|
||||
|
||||
def reset(self):
|
||||
self._wbuf = BytesIO()
|
||||
|
||||
|
||||
class SSH(object): # pylint: disable=too-few-public-methods
|
||||
class Protocol(object): # pylint: disable=too-few-public-methods
|
||||
@ -1567,10 +1616,8 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
for alg_type, alg_list in alg_pair.items():
|
||||
if alg_type == 'aut':
|
||||
continue
|
||||
rec[sshv][alg_type] = {'add': {}, 'del': {}}
|
||||
rec[sshv][alg_type] = {'add': {}, 'del': {}, 'chg': {}}
|
||||
for n, alg_desc in alg_db[alg_type].items():
|
||||
if alg_type == 'key' and '-cert-' in n:
|
||||
continue
|
||||
versions = alg_desc[0]
|
||||
if len(versions) == 0 or versions[0] is None:
|
||||
continue
|
||||
@ -1597,18 +1644,19 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
if fc > 0:
|
||||
faults += pow(10, 2 - i) * fc
|
||||
if n not in alg_list:
|
||||
if faults > 0:
|
||||
if faults > 0 or (alg_type == 'key' and '-cert-' in n):
|
||||
continue
|
||||
rec[sshv][alg_type]['add'][n] = 0
|
||||
else:
|
||||
if faults == 0:
|
||||
continue
|
||||
if n == 'diffie-hellman-group-exchange-sha256':
|
||||
if software.compare_version('7.3') < 0:
|
||||
continue
|
||||
rec[sshv][alg_type]['del'][n] = faults
|
||||
if n in ['diffie-hellman-group-exchange-sha256', 'ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512', 'ssh-rsa-cert-v01@openssh.com']:
|
||||
rec[sshv][alg_type]['chg'][n] = faults
|
||||
else:
|
||||
rec[sshv][alg_type]['del'][n] = faults
|
||||
add_count = len(rec[sshv][alg_type]['add'])
|
||||
del_count = len(rec[sshv][alg_type]['del'])
|
||||
chg_count = len(rec[sshv][alg_type]['chg'])
|
||||
new_alg_count = len(alg_list) + add_count - del_count
|
||||
if new_alg_count < 1 and del_count > 0:
|
||||
mf = min(rec[sshv][alg_type]['del'].values())
|
||||
@ -1626,6 +1674,8 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
del rec[sshv][alg_type]['add']
|
||||
if del_count == 0:
|
||||
del rec[sshv][alg_type]['del']
|
||||
if chg_count == 0:
|
||||
del rec[sshv][alg_type]['chg']
|
||||
if len(rec[sshv][alg_type]) == 0:
|
||||
del rec[sshv][alg_type]
|
||||
if len(rec[sshv]) == 0:
|
||||
@ -1770,6 +1820,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
raise ValueError('invalid port: {0}'.format(port))
|
||||
self.__host = host
|
||||
self.__port = nport
|
||||
self.__ipvo = ()
|
||||
|
||||
def _resolve(self, ipvo):
|
||||
# type: (Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]]
|
||||
@ -1794,10 +1845,12 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
out.fail('[exception] {0}'.format(e))
|
||||
sys.exit(1)
|
||||
|
||||
def connect(self, ipvo=(), cto=3.0, rto=5.0):
|
||||
def connect(self, ipvo=None, cto=3.0, rto=5.0):
|
||||
# type: (Sequence[int], float, float) -> None
|
||||
err = None
|
||||
for af, addr in self._resolve(ipvo):
|
||||
if ipvo is not None:
|
||||
self.__ipvo = ipvo
|
||||
for af, addr in self._resolve(self.__ipvo):
|
||||
s = None
|
||||
try:
|
||||
s = socket.socket(af, socket.SOCK_STREAM)
|
||||
@ -1956,8 +2009,19 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
|
||||
return self.send(data)
|
||||
|
||||
# Returns True if this Socket is connected, otherwise False.
|
||||
def is_connected(self):
|
||||
return (self.__sock is not None)
|
||||
|
||||
def close(self):
|
||||
self.__cleanup()
|
||||
self.reset()
|
||||
self.__state = 0
|
||||
self.__header = []
|
||||
self.__banner = None
|
||||
|
||||
def reset(self):
|
||||
super(SSH.Socket, self).reset()
|
||||
|
||||
def _close_socket(self, s):
|
||||
# type: (Optional[socket.socket]) -> None
|
||||
@ -1975,6 +2039,7 @@ class SSH(object): # pylint: disable=too-few-public-methods
|
||||
def __cleanup(self):
|
||||
# type: () -> None
|
||||
self._close_socket(self.__sock)
|
||||
self.__sock = None
|
||||
|
||||
|
||||
class KexDH(object): # pragma: nocover
|
||||
@ -1988,7 +2053,8 @@ class KexDH(object): # pragma: nocover
|
||||
self.__hostkey_type = None
|
||||
self.__hostkey_e = 0
|
||||
self.__hostkey_n = 0
|
||||
self.__hostkey_n_len = 0 # This is the length of the host key modulus.
|
||||
self.__hostkey_n_len = 0 # Length of the host key modulus.
|
||||
self.__ca_n_len = 0 # Length of the CA key modulus (if hostkey is a cert).
|
||||
self.__f = 0
|
||||
self.__h_sig = 0
|
||||
|
||||
@ -2017,47 +2083,97 @@ class KexDH(object): # pragma: nocover
|
||||
# TODO: change Exception to something more specific.
|
||||
raise Exception('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (SSH.Protocol.MSG_KEXDH_REPLY, SSH.Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
|
||||
|
||||
host_key_len = struct.unpack('>I', payload[0:4])[0]
|
||||
ptr = 4
|
||||
|
||||
hostkey = payload[ptr:ptr + host_key_len]
|
||||
ptr += host_key_len
|
||||
|
||||
f_len = struct.unpack('>I', payload[ptr:ptr+4])[0]
|
||||
ptr += 4
|
||||
|
||||
self.__f = payload[ptr:ptr + f_len]
|
||||
ptr += f_len
|
||||
|
||||
h_sig_len = struct.unpack('>I', payload[ptr:ptr+4])[0]
|
||||
ptr += 4
|
||||
|
||||
self.__h_sig = payload[ptr:ptr + h_sig_len]
|
||||
ptr += h_sig_len
|
||||
# Get the host key blob, F, and signature.
|
||||
ptr = 0
|
||||
hostkey, hostkey_len, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
self.__f, f_len, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
self.__h_sig, h_sig_len, ptr = KexDH.__get_bytes(payload, ptr)
|
||||
|
||||
# Now pick apart the host key blob.
|
||||
hostkey_type_len = struct.unpack('>I', hostkey[0:4])[0]
|
||||
ptr = 4
|
||||
|
||||
# Get the host key type (i.e.: 'ssh-rsa', 'ssh-ed25519', etc).
|
||||
self.__hostkey_type = hostkey[ptr:ptr + hostkey_type_len]
|
||||
ptr += hostkey_type_len
|
||||
ptr = 0
|
||||
self.__hostkey_type, hostkey_type_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
hostkey_e_len = struct.unpack('>I', hostkey[ptr:ptr + 4])[0]
|
||||
ptr += 4
|
||||
# If this is an RSA certificate, skip over the nonce.
|
||||
if self.__hostkey_type.startswith('ssh-rsa-cert-v0'):
|
||||
nonce, nonce_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
self.__hostkey_e = int(binascii.hexlify(hostkey[ptr:ptr + hostkey_e_len]), 16)
|
||||
ptr += hostkey_e_len
|
||||
# The public key exponent.
|
||||
hostkey_e, hostkey_e_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16)
|
||||
|
||||
# Here is the modulus size & actual modulus of the host key public key.
|
||||
self.__hostkey_n_len = struct.unpack('>I', hostkey[ptr:ptr + 4])[0]
|
||||
hostkey_n, self.__hostkey_n_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
self.__hostkey_n = int(binascii.hexlify(hostkey_n), 16)
|
||||
|
||||
# If this is an RSA certificate, continue parsing to extract the CA
|
||||
# key.
|
||||
if self.__hostkey_type.startswith('ssh-rsa-cert-v0'):
|
||||
# Skip over the serial number.
|
||||
ptr += 8
|
||||
|
||||
# Get the certificate type.
|
||||
cert_type = int(binascii.hexlify(hostkey[ptr:ptr + 4]), 16)
|
||||
ptr += 4
|
||||
|
||||
# Only SSH2_CERT_TYPE_HOST (2) makes sense in this context.
|
||||
if cert_type == 2:
|
||||
|
||||
# Skip the key ID (this is the serial number of the
|
||||
# certificate).
|
||||
key_id, key_id_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# The principles, which are... I don't know what.
|
||||
principles, printicples_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# The timestamp that this certificate is valid after.
|
||||
valid_after = hostkey[ptr:ptr + 8]
|
||||
ptr += 8
|
||||
|
||||
# The timestamp that this certificate is valid before.
|
||||
valid_before = hostkey[ptr:ptr + 8]
|
||||
ptr += 8
|
||||
|
||||
# TODO: validate the principles, and time range.
|
||||
|
||||
# The critical options.
|
||||
critical_options, critical_options_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# Certificate extensions.
|
||||
extensions, extensions_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# Another nonce.
|
||||
nonce, nonce_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# Finally, we get to the CA key.
|
||||
ca_key, ca_key_len, ptr = KexDH.__get_bytes(hostkey, ptr)
|
||||
|
||||
# Last in the host key blob is the CA signature. It isn't
|
||||
# interesting to us, so we won't bother parsing any further.
|
||||
# The CA key has the modulus, however...
|
||||
ptr = 0
|
||||
|
||||
# 'ssh-rsa', 'rsa-sha2-256', etc.
|
||||
ca_key_type, ca_key_type_len, ptr = KexDH.__get_bytes(ca_key, ptr)
|
||||
|
||||
# CA's public key exponent.
|
||||
ca_key_e, ca_key_e_len, ptr = KexDH.__get_bytes(ca_key, ptr)
|
||||
|
||||
# CA's modulus. Bingo.
|
||||
ca_key_n, self.__ca_n_len, ptr = KexDH.__get_bytes(ca_key, ptr)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get_bytes(buf, ptr):
|
||||
num_bytes = struct.unpack('>I', buf[ptr:ptr + 4])[0]
|
||||
ptr += 4
|
||||
self.__hostkey_n = int(binascii.hexlify(hostkey[ptr:ptr + self.__hostkey_n_len]), 16)
|
||||
|
||||
# Returns the size of the hostkey, in bits.
|
||||
def get_hostkey_size(self):
|
||||
size = self.__hostkey_n_len * 8
|
||||
return buf[ptr:ptr + num_bytes], num_bytes, ptr + num_bytes
|
||||
|
||||
# Converts a modulus length in bytes to its size in bits, after some
|
||||
# possible adjustments.
|
||||
@staticmethod
|
||||
def __adjust_key_size(size):
|
||||
size = size * 8
|
||||
# Actual keys are observed to be about 8 bits bigger than expected
|
||||
# (i.e.: 1024-bit keys have a 1032-bit modulus). Check if this is
|
||||
# the case, and subtract 8 if so. This simply improves readability
|
||||
@ -2066,8 +2182,16 @@ class KexDH(object): # pragma: nocover
|
||||
size = size - 8
|
||||
return size
|
||||
|
||||
# Returns the size of the hostkey, in bits.
|
||||
def get_hostkey_size(self):
|
||||
return KexDH.__adjust_key_size(self.__hostkey_n_len)
|
||||
|
||||
# Returns the size of the CA key, in bits.
|
||||
def get_ca_size(self):
|
||||
return KexDH.__adjust_key_size(self.__ca_n_len)
|
||||
|
||||
# Returns the size of the DH modulus, in bits.
|
||||
def get_modulus_size(self):
|
||||
def get_dh_modulus_size(self):
|
||||
# -2 to account for the '0b' prefix in the string.
|
||||
return len(bin(self.__p)) - 2
|
||||
|
||||
@ -2301,8 +2425,13 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0, alg_sizes=None):
|
||||
# the padding.
|
||||
alg_name_with_size = None
|
||||
if (alg_sizes is not None) and (alg_name in alg_sizes):
|
||||
alg_name_with_size = '%s (%d-bit)' % (alg_name, alg_sizes[alg_name])
|
||||
padding = padding[0:-11]
|
||||
hostkey_size, ca_size = alg_sizes[alg_name]
|
||||
if ca_size > 0:
|
||||
alg_name_with_size = '%s (%d-bit cert/%d-bit CA)' % (alg_name, hostkey_size, ca_size)
|
||||
padding = padding[0:-15]
|
||||
else:
|
||||
alg_name_with_size = '%s (%d-bit)' % (alg_name, hostkey_size)
|
||||
padding = padding[0:-11]
|
||||
|
||||
texts = []
|
||||
if len(alg_name.strip()) == 0:
|
||||
@ -2439,20 +2568,24 @@ def output_recommendations(algs, software, padlen=0):
|
||||
for alg_type in ['kex', 'key', 'enc', 'mac']:
|
||||
if alg_type not in alg_rec[sshv]:
|
||||
continue
|
||||
for action in ['del', 'add']:
|
||||
for action in ['del', 'add', 'chg']:
|
||||
if action not in alg_rec[sshv][alg_type]:
|
||||
continue
|
||||
for name in alg_rec[sshv][alg_type][action]:
|
||||
p = '' if out.batch else ' ' * (padlen - len(name))
|
||||
chg_additional_info = ''
|
||||
if action == 'del':
|
||||
an, sg, fn = 'remove', '-', out.warn
|
||||
if alg_rec[sshv][alg_type][action][name] >= 10:
|
||||
fn = out.fail
|
||||
else:
|
||||
elif action == 'add':
|
||||
an, sg, fn = 'append', '+', out.good
|
||||
elif action == 'chg':
|
||||
an, sg, fn = 'change', '!', out.fail
|
||||
chg_additional_info = ' (increase modulus size to 2048 bits or larger)'
|
||||
b = '(SSH{0})'.format(sshv) if sshv == 1 else ''
|
||||
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4} {5}'
|
||||
fn(fm.format(sg, name, p, alg_type, an, b))
|
||||
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} {6}'
|
||||
fn(fm.format(sg, name, p, alg_type, an, chg_additional_info, b))
|
||||
if len(obuf) > 0:
|
||||
if software is not None:
|
||||
title = '(for {0})'.format(software.display(False))
|
||||
@ -2511,7 +2644,7 @@ def output(banner, header, kex=None, pkm=None):
|
||||
title, atype = 'key exchange algorithms', 'kex'
|
||||
output_algorithms(title, adb, atype, kex.kex_algorithms, maxlen, kex.dh_modulus_sizes())
|
||||
title, atype = 'host-key algorithms', 'key'
|
||||
output_algorithms(title, adb, atype, kex.key_algorithms, maxlen, kex.rsa_hostkey_sizes())
|
||||
output_algorithms(title, adb, atype, kex.key_algorithms, maxlen, kex.rsa_key_sizes())
|
||||
title, atype = 'encryption algorithms (ciphers)', 'enc'
|
||||
output_algorithms(title, adb, atype, kex.server.encryption, maxlen)
|
||||
title, atype = 'message authentication code algorithms', 'mac'
|
||||
@ -2687,7 +2820,7 @@ def audit(aconf, sshv=None):
|
||||
elif sshv == 2:
|
||||
kex = SSH2.Kex.parse(payload)
|
||||
SSH2.RSAKeyTest.run(s, kex)
|
||||
SSH2.GEXTest.run(aconf.ipvo, aconf.host, aconf.port, s, kex)
|
||||
SSH2.GEXTest.run(s, kex)
|
||||
output(banner, header, kex=kex)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user