mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-09-03 18:38:32 +02:00
Debug Logging and visibility of SSH Connection errors (#99)
* Debug Logging and visibility of SSH Connection errors * Updated date in man page
This commit is contained in:
@ -30,6 +30,7 @@ from ssh_audit.kexdh import KexDH, KexGroup1, KexGroup14_SHA1, KexGroup14_SHA256
|
||||
from ssh_audit.ssh2_kex import SSH2_Kex
|
||||
from ssh_audit.ssh2_kexdb import SSH2_KexDB
|
||||
from ssh_audit.ssh_socket import SSH_Socket
|
||||
from ssh_audit.outputbuffer import OutputBuffer
|
||||
|
||||
|
||||
# Obtains host keys, checks their size, and derives their fingerprints.
|
||||
@ -52,7 +53,7 @@ class HostKeyTest:
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def run(s: 'SSH_Socket', server_kex: 'SSH2_Kex') -> None:
|
||||
def run(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex') -> None:
|
||||
KEX_TO_DHGROUP = {
|
||||
'diffie-hellman-group1-sha1': KexGroup1,
|
||||
'diffie-hellman-group14-sha1': KexGroup14_SHA1,
|
||||
@ -80,10 +81,10 @@ class HostKeyTest:
|
||||
break
|
||||
|
||||
if kex_str is not None and kex_group is not None:
|
||||
HostKeyTest.perform_test(s, server_kex, kex_str, kex_group, HostKeyTest.HOST_KEY_TYPES)
|
||||
HostKeyTest.perform_test(out, s, server_kex, kex_str, kex_group, HostKeyTest.HOST_KEY_TYPES)
|
||||
|
||||
@staticmethod
|
||||
def perform_test(s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None:
|
||||
def perform_test(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None:
|
||||
hostkey_modulus_size = 0
|
||||
ca_modulus_size = 0
|
||||
|
||||
@ -101,22 +102,26 @@ class HostKeyTest:
|
||||
|
||||
# If this host key type is supported by the server, we test it.
|
||||
if host_key_type in server_kex.key_algorithms:
|
||||
out.d('Preparing to obtain ' + host_key_type + ' host key...', write_now=True)
|
||||
|
||||
cert = host_key_types[host_key_type]['cert']
|
||||
variable_key_len = host_key_types[host_key_type]['variable_key_len']
|
||||
|
||||
# If the connection is closed, re-open it and get the kex again.
|
||||
if not s.is_connected():
|
||||
err = s.connect()
|
||||
err = s.connect(out)
|
||||
if err is not None:
|
||||
out.v(err, write_now=True)
|
||||
return
|
||||
|
||||
_, _, err = s.get_banner()
|
||||
_, _, err = s.get_banner(out)
|
||||
if err is not None:
|
||||
out.v(err, write_now=True)
|
||||
s.close()
|
||||
return
|
||||
|
||||
# Send our KEX using the specified group-exchange and most of the server's own values.
|
||||
s.send_kexinit(key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages)
|
||||
s.send_kexinit(out, key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages)
|
||||
|
||||
# Parse the server's KEX.
|
||||
_, payload = s.read_packet()
|
||||
@ -125,8 +130,8 @@ class HostKeyTest:
|
||||
|
||||
# Do the initial DH exchange. The server responds back
|
||||
# with the host key and its length. Bingo. We also get back the host key fingerprint.
|
||||
kex_group.send_init(s)
|
||||
try:
|
||||
kex_group.send_init(s)
|
||||
host_key = kex_group.recv_reply(s, variable_key_len)
|
||||
if host_key is not None:
|
||||
server_kex.set_host_key(host_key_type, host_key)
|
||||
|
Reference in New Issue
Block a user