mirror of
https://github.com/jtesta/ssh-audit.git
synced 2024-11-25 12:01:40 +01:00
Enabled the following mypy options: check_untyped_defs, disallow_untyped_defs, disallow_untyped_calls, disallow_incomplete_defs, disallow_untyped_decorators, disallow_untyped_decorators, strict_equality, and strict.
This commit is contained in:
parent
cabbe717d3
commit
30f2b7690a
102
ssh-audit.py
102
ssh-audit.py
@ -84,7 +84,7 @@ def usage(err: Optional[str] = None) -> None:
|
||||
# Validates policy files and performs policy testing
|
||||
class Policy:
|
||||
|
||||
def __init__(self, policy_file: str = None, policy_data: str = None) -> None:
|
||||
def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str] = None) -> None:
|
||||
self._name = None # type: Optional[str]
|
||||
self._version = None # type: Optional[str]
|
||||
self._banner = None # type: Optional[str]
|
||||
@ -274,7 +274,7 @@ macs = %s
|
||||
return policy_data
|
||||
|
||||
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
def evaluate(self, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex']) -> Tuple[bool, List[str]]:
|
||||
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''
|
||||
|
||||
ret = True
|
||||
@ -285,9 +285,12 @@ macs = %s
|
||||
ret = False
|
||||
errors.append('Banner did not match. Expected: [%s]; Actual: [%s]' % (self._banner, banner_str))
|
||||
|
||||
if (self._header is not None) and (header != self._header):
|
||||
actual_header = None
|
||||
if header is not None:
|
||||
actual_header = "\n".join(header)
|
||||
if (self._header is not None) and (actual_header is not None) and (actual_header != self._header):
|
||||
ret = False
|
||||
errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, header))
|
||||
errors.append('Header did not match. Expected: [%s]; Actual: [%s]' % (self._header, actual_header))
|
||||
|
||||
# All subsequent tests require a valid kex, so end here if we don't have one.
|
||||
if kex is None:
|
||||
@ -597,7 +600,7 @@ class Output:
|
||||
return lambda x: print(u'{}'.format(x))
|
||||
|
||||
|
||||
class OutputBuffer(list):
|
||||
class OutputBuffer(List[str]):
|
||||
def __enter__(self) -> 'OutputBuffer':
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.__buf = io.StringIO()
|
||||
@ -608,12 +611,12 @@ class OutputBuffer(list):
|
||||
def flush(self, sort_lines: bool = False) -> None:
|
||||
# Lines must be sorted in some cases to ensure consistent testing.
|
||||
if sort_lines:
|
||||
self.sort()
|
||||
for line in self:
|
||||
self.sort() # pylint: disable=no-member
|
||||
for line in self: # pylint: disable=not-an-iterable
|
||||
print(line)
|
||||
|
||||
def __exit__(self, *args: Any) -> None:
|
||||
self.extend(self.__buf.getvalue().splitlines())
|
||||
self.extend(self.__buf.getvalue().splitlines()) # pylint: disable=no-member
|
||||
sys.stdout = self.__stdout
|
||||
|
||||
|
||||
@ -1664,7 +1667,7 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
return re.sub(r'^[-_\.]+', '', patch) or None
|
||||
|
||||
@staticmethod
|
||||
def _fix_date(d: str) -> Optional[str]:
|
||||
def _fix_date(d: Optional[str]) -> Optional[str]:
|
||||
if d is not None and len(d) == 8:
|
||||
return '{}-{}-{}'.format(d[:4], d[4:6], d[6:8])
|
||||
else:
|
||||
@ -2353,31 +2356,28 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
def get_banner(self, sshv: int = 2) -> Tuple[Optional['SSH.Banner'], List[str], Optional[str]]:
|
||||
if self.__sock is None:
|
||||
return self.__banner, self.__header, 'not connected'
|
||||
if self.__banner is not None:
|
||||
return self.__banner, self.__header, None
|
||||
|
||||
banner = SSH_HEADER.format('1.5' if sshv == 1 else '2.0')
|
||||
if self.__state < self.SM_BANNER_SENT:
|
||||
self.send_banner(banner)
|
||||
# rto = self.__sock.gettimeout()
|
||||
# self.__sock.settimeout(0.7)
|
||||
s, e = self.recv()
|
||||
# self.__sock.settimeout(rto)
|
||||
if s < 0:
|
||||
return self.__banner, self.__header, e
|
||||
|
||||
s = 0
|
||||
e = None
|
||||
while self.__banner is None:
|
||||
if not s > 0:
|
||||
s, e = self.recv()
|
||||
if s < 0:
|
||||
break
|
||||
while self.__banner is None and self.unread_len > 0:
|
||||
while s >= 0:
|
||||
s, e = self.recv()
|
||||
if s < 0:
|
||||
continue
|
||||
while self.unread_len > 0:
|
||||
line = self.read_line()
|
||||
if len(line.strip()) == 0:
|
||||
continue
|
||||
if self.__banner is None:
|
||||
self.__banner = SSH.Banner.parse(line)
|
||||
if self.__banner is not None:
|
||||
continue
|
||||
self.__banner = SSH.Banner.parse(line)
|
||||
if self.__banner is not None:
|
||||
return self.__banner, self.__header, None
|
||||
self.__header.append(line)
|
||||
s = 0
|
||||
|
||||
return self.__banner, self.__header, e
|
||||
|
||||
def recv(self, size: int = 2048) -> Tuple[int, Optional[str]]:
|
||||
@ -2408,7 +2408,6 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
return 0, None
|
||||
except socket.error as e:
|
||||
return -1, str(e.args[-1])
|
||||
self.__sock.send(data)
|
||||
|
||||
def send_banner(self, banner: str) -> None:
|
||||
self.send(banner.encode() + b'\r\n')
|
||||
@ -2547,7 +2546,7 @@ class KexDH: # pragma: nocover
|
||||
# Parse a KEXDH_REPLY or KEXDH_GEX_REPLY message from the server. This
|
||||
# contains the host key, among other things. Function returns the host
|
||||
# key blob (from which the fingerprint can be calculated).
|
||||
def recv_reply(self, s, parse_host_key_size=True):
|
||||
def recv_reply(self, s: 'SSH.Socket', parse_host_key_size: bool = True) -> Optional[bytes]:
|
||||
packet_type, payload = s.read_packet(2)
|
||||
|
||||
# Skip any & all MSG_DEBUG messages.
|
||||
@ -3188,7 +3187,7 @@ def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], cl
|
||||
out.warn("\n\n!!! WARNING: unknown algorithm(s) found!: %s. Please email the full output above to the maintainer (jtesta@positronsecurity.com), or create a Github issue at <https://github.com/jtesta/ssh-audit/issues>.\n" % ','.join(unknown_algorithms))
|
||||
|
||||
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: List[str], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], header: Optional[List[str]], kex: Optional['SSH2.Kex'] = None) -> bool:
|
||||
|
||||
if aconf.policy is None:
|
||||
raise RuntimeError('Internal error: cannot evaluate against null Policy!')
|
||||
@ -3320,15 +3319,26 @@ class Utils:
|
||||
return -1.0
|
||||
|
||||
|
||||
def build_struct(banner, kex=None, pkm=None, client_host=None):
|
||||
def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any:
|
||||
|
||||
banner_str = ''
|
||||
banner_protocol = None
|
||||
banner_software = None
|
||||
banner_comments = None
|
||||
if banner is not None:
|
||||
banner_str = str(banner)
|
||||
banner_protocol = banner.protocol
|
||||
banner_software = banner.software
|
||||
banner_comments = banner.comments
|
||||
|
||||
res = {
|
||||
"banner": {
|
||||
"raw": str(banner),
|
||||
"protocol": banner.protocol,
|
||||
"software": banner.software,
|
||||
"comments": banner.comments,
|
||||
"raw": banner_str,
|
||||
"protocol": banner_protocol,
|
||||
"software": banner_software,
|
||||
"comments": banner_comments,
|
||||
},
|
||||
}
|
||||
} # type: Any
|
||||
if client_host is not None:
|
||||
res['client_ip'] = client_host
|
||||
if kex is not None:
|
||||
@ -3339,8 +3349,8 @@ def build_struct(banner, kex=None, pkm=None, client_host=None):
|
||||
for algorithm in kex.kex_algorithms:
|
||||
entry = {
|
||||
'algorithm': algorithm,
|
||||
}
|
||||
if (alg_sizes is not None) and (algorithm in alg_sizes):
|
||||
} # type: Any
|
||||
if algorithm in alg_sizes:
|
||||
hostkey_size, ca_size = alg_sizes[algorithm]
|
||||
entry['keysize'] = hostkey_size
|
||||
if ca_size > 0:
|
||||
@ -3353,7 +3363,7 @@ def build_struct(banner, kex=None, pkm=None, client_host=None):
|
||||
entry = {
|
||||
'algorithm': algorithm,
|
||||
}
|
||||
if (alg_sizes is not None) and (algorithm in alg_sizes):
|
||||
if algorithm in alg_sizes:
|
||||
hostkey_size, ca_size = alg_sizes[algorithm]
|
||||
entry['keysize'] = hostkey_size
|
||||
if ca_size > 0:
|
||||
@ -3387,12 +3397,20 @@ def build_struct(banner, kex=None, pkm=None, client_host=None):
|
||||
}
|
||||
res['fingerprints'].append(entry)
|
||||
else:
|
||||
pkm_supported_ciphers = None
|
||||
pkm_supported_authentications = None
|
||||
pkm_fp = None
|
||||
if pkm is not None:
|
||||
pkm_supported_ciphers = pkm.supported_ciphers
|
||||
pkm_supported_authentications = pkm.supported_authentications
|
||||
pkm_fp = SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256
|
||||
|
||||
res['key'] = ['ssh-rsa1']
|
||||
res['enc'] = pkm.supported_ciphers
|
||||
res['aut'] = pkm.supported_authentications
|
||||
res['enc'] = pkm_supported_ciphers
|
||||
res['aut'] = pkm_supported_authentications
|
||||
res['fingerprints'] = [{
|
||||
'type': 'ssh-rsa1',
|
||||
'fp': SSH.Fingerprint(pkm.host_key_fingerprint_data).sha256,
|
||||
'fp': pkm_fp,
|
||||
}]
|
||||
|
||||
return res
|
||||
@ -3421,7 +3439,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None) -> int:
|
||||
packet_type, payload = s.read_packet(sshv)
|
||||
if packet_type < 0:
|
||||
try:
|
||||
if payload is not None and len(payload) > 0:
|
||||
if len(payload) > 0:
|
||||
payload_txt = payload.decode('utf-8')
|
||||
else:
|
||||
payload_txt = u'empty'
|
||||
|
11
tox.ini
11
tox.ini
@ -90,14 +90,19 @@ commands =
|
||||
[mypy]
|
||||
ignore_missing_imports = False
|
||||
follow_imports = normal
|
||||
; disallow_untyped_calls = True
|
||||
; disallow_untyped_defs = True
|
||||
; check_untyped_defs = True
|
||||
disallow_incomplete_defs = True
|
||||
disallow_untyped_calls = True
|
||||
disallow_untyped_decorators = True
|
||||
disallow_untyped_defs = True
|
||||
check_untyped_defs = True
|
||||
disallow_subclassing_any = True
|
||||
warn_redundant_casts = True
|
||||
warn_return_any = True
|
||||
warn_unreachable = True
|
||||
warn_unused_ignores = True
|
||||
strict_optional = True
|
||||
strict_equality = True
|
||||
strict = True
|
||||
|
||||
[pylint]
|
||||
reports = no
|
||||
|
Loading…
Reference in New Issue
Block a user