diff --git a/src/ssh_audit/algorithms.py b/src/ssh_audit/algorithms.py index c36275c..7c9a850 100644 --- a/src/ssh_audit/algorithms.py +++ b/src/ssh_audit/algorithms.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -55,7 +55,7 @@ class Algorithms: if self.ssh1kex is None: return None item = Algorithms.Item(1, SSH1_KexDB.ALGORITHMS) - item.add('key', [u'ssh-rsa1']) + item.add('key', ['ssh-rsa1']) item.add('enc', self.ssh1kex.supported_ciphers) item.add('aut', self.ssh1kex.supported_authentications) return item diff --git a/src/ssh_audit/fingerprint.py b/src/ssh_audit/fingerprint.py index cae1768..a9d2caf 100644 --- a/src/ssh_audit/fingerprint.py +++ b/src/ssh_audit/fingerprint.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -33,11 +33,11 @@ class Fingerprint: @property def md5(self) -> str: h = hashlib.md5(self.__fpd).hexdigest() - r = u':'.join(h[i:i + 2] for i in range(0, len(h), 2)) - return u'MD5:{}'.format(r) + r = ':'.join(h[i:i + 2] for i in range(0, len(h), 2)) + return 'MD5:{}'.format(r) @property def sha256(self) -> str: h = base64.b64encode(hashlib.sha256(self.__fpd).digest()) r = h.decode('ascii').rstrip('=') - return u'SHA256:{}'.format(r) + return 'SHA256:{}'.format(r) diff --git a/src/ssh_audit/gextest.py b/src/ssh_audit/gextest.py index 47efbce..f82a856 100644 --- a/src/ssh_audit/gextest.py +++ b/src/ssh_audit/gextest.py @@ -86,14 +86,14 @@ class GEXTest: # Check if the server supports any of the group-exchange # algorithms. If so, test each one. - for gex_alg in GEX_ALGS: + for gex_alg, kex_group_class in GEX_ALGS.items(): if gex_alg in kex.kex_algorithms: out.d('Preparing to perform DH group exchange using ' + gex_alg + '...', write_now=True) if GEXTest.reconnect(out, s, kex, gex_alg) is False: break - kex_group = GEX_ALGS[gex_alg]() + kex_group = kex_group_class() smallest_modulus = -1 # First try a range of weak sizes. diff --git a/src/ssh_audit/kexdh.py b/src/ssh_audit/kexdh.py index 9c4e08a..536f0a8 100644 --- a/src/ssh_audit/kexdh.py +++ b/src/ssh_audit/kexdh.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2017-2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2017-2021 Joe Testa (jtesta@positronsecurity.com) Copyright (C) 2017 Andris Raugulis (moo@arthepsy.eu) Permission is hereby granted, free of charge, to any person obtaining a copy @@ -37,8 +37,8 @@ from ssh_audit.ssh_socket import SSH_Socket class KexDH: # pragma: nocover def __init__(self, kex_name: str, hash_alg: str, g: int, p: int) -> None: - self.__kex_name = kex_name - self.__hash_alg = hash_alg + self.__kex_name = kex_name # pylint: disable=unused-private-member + self.__hash_alg = hash_alg # pylint: disable=unused-private-member self.__g = 0 self.__p = 0 self.__q = 0 @@ -46,10 +46,10 @@ class KexDH: # pragma: nocover self.__e = 0 self.set_params(g, p) - self.__ed25519_pubkey: Optional[bytes] = None + self.__ed25519_pubkey: Optional[bytes] = None # pylint: disable=unused-private-member self.__hostkey_type: Optional[bytes] = None - self.__hostkey_e = 0 - self.__hostkey_n = 0 + self.__hostkey_e = 0 # pylint: disable=unused-private-member + self.__hostkey_n = 0 # pylint: disable=unused-private-member self.__hostkey_n_len = 0 # Length of the host key modulus. self.__ca_n_len = 0 # Length of the CA key modulus (if hostkey is a cert). @@ -121,11 +121,11 @@ class KexDH: # pragma: nocover # The public key exponent. hostkey_e, hostkey_e_len, ptr = KexDH.__get_bytes(hostkey, ptr) - self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16) + self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16) # pylint: disable=unused-private-member # Here is the modulus size & actual modulus of the host key public key. hostkey_n, self.__hostkey_n_len, ptr = KexDH.__get_bytes(hostkey, ptr) - self.__hostkey_n = int(binascii.hexlify(hostkey_n), 16) + self.__hostkey_n = int(binascii.hexlify(hostkey_n), 16) # pylint: disable=unused-private-member # If this is an RSA certificate, continue parsing to extract the CA # key. diff --git a/src/ssh_audit/policy.py b/src/ssh_audit/policy.py index 77be4c2..fd9d2c7 100644 --- a/src/ssh_audit/policy.py +++ b/src/ssh_audit/policy.py @@ -1,7 +1,7 @@ """ The MIT License (MIT) - Copyright (C) 2020 Joe Testa (jtesta@positronsecurity.com) + Copyright (C) 2020-2021 Joe Testa (jtesta@positronsecurity.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -117,7 +117,7 @@ class Policy: if policy_file is not None: try: - with open(policy_file, "r") as f: + with open(policy_file, "r", encoding='utf-8') as f: policy_data = f.read() except FileNotFoundError: print("Error: policy file not found: %s" % policy_file) @@ -425,8 +425,8 @@ macs = %s server_policy_names = [] client_policy_names = [] - for policy_name in Policy.BUILTIN_POLICIES: - if Policy.BUILTIN_POLICIES[policy_name]['server_policy']: + for policy_name, policy in Policy.BUILTIN_POLICIES.items(): + if policy['server_policy']: server_policy_names.append(policy_name) else: client_policy_names.append(policy_name) diff --git a/src/ssh_audit/ssh1_publickeymessage.py b/src/ssh_audit/ssh1_publickeymessage.py index 037d4a3..67c4dec 100644 --- a/src/ssh_audit/ssh1_publickeymessage.py +++ b/src/ssh_audit/ssh1_publickeymessage.py @@ -90,7 +90,7 @@ class SSH1_PublicKeyMessage: @property def supported_ciphers(self) -> List[str]: ciphers = [] - for i in range(len(SSH1.CIPHERS)): + for i in range(len(SSH1.CIPHERS)): # pylint: disable=consider-using-enumerate if self.__supported_ciphers_mask & (1 << i) != 0: ciphers.append(Utils.to_text(SSH1.CIPHERS[i])) return ciphers diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index d9da4f4..38b5095 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -563,7 +563,7 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH # Open with mode 'x' (creates the file, or fails if it already exist). succeeded = True try: - with open(aconf.policy_file, 'x') as f: + with open(aconf.policy_file, 'x', encoding='utf-8') as f: f.write(policy_data) except FileExistsError: succeeded = False @@ -681,7 +681,7 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. # If a file containing a list of targets was given, read it. if aconf.target_file is not None: - with open(aconf.target_file, 'r') as f: + with open(aconf.target_file, 'r', encoding='utf-8') as f: aconf.target_list = f.readlines() # Strip out whitespace from each line in target file, and skip empty lines. @@ -869,10 +869,10 @@ def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print if len(payload) > 0: payload_txt = payload.decode('utf-8') else: - payload_txt = u'empty' + payload_txt = 'empty' except UnicodeDecodeError: - payload_txt = u'"{}"'.format(repr(payload).lstrip('b')[1:-1]) - if payload_txt == u'Protocol major versions differ.': + payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1]) + if payload_txt == 'Protocol major versions differ.': if sshv == 2 and aconf.ssh1: ret = audit(out, aconf, 1) out.write() @@ -972,8 +972,8 @@ def algorithm_lookup(out: OutputBuffer, alg_names: str) -> int: similar_algorithms = [ alg_unknown + " --> (" + alg_type + ") " + alg_name for alg_unknown in algorithms_not_found - for alg_type in adb - for alg_name in adb[alg_type] + for alg_type, alg_names in adb.items() + for alg_name in alg_names # Perform a case-insensitive comparison using 'casefold' # and match substrings using the 'in' operator. if alg_unknown.casefold() in alg_name.casefold() diff --git a/src/ssh_audit/ssh_socket.py b/src/ssh_audit/ssh_socket.py index 3891588..6853c1c 100644 --- a/src/ssh_audit/ssh_socket.py +++ b/src/ssh_audit/ssh_socket.py @@ -338,6 +338,6 @@ class SSH_Socket(ReadBuf, WriteBuf): def __cleanup(self) -> None: self._close_socket(self.__sock) - for fd in self.__sock_map: - self._close_socket(self.__sock_map[fd]) + for sock in self.__sock_map.values(): + self._close_socket(sock) self.__sock = None diff --git a/src/ssh_audit/writebuf.py b/src/ssh_audit/writebuf.py index 05760b5..5f25d8c 100644 --- a/src/ssh_audit/writebuf.py +++ b/src/ssh_audit/writebuf.py @@ -54,7 +54,7 @@ class WriteBuf: return self.write(v) def write_list(self, v: List[str]) -> 'WriteBuf': - return self.write_string(u','.join(v)) + return self.write_string(','.join(v)) @classmethod def _bitlength(cls, n: int) -> int: