mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-01-03 14:59:49 +01:00
Added 'client policy' field in policy files to distinguish server from client policies.
This commit is contained in:
parent
b27d768c79
commit
8fb07edafd
44
ssh-audit.py
44
ssh-audit.py
@ -105,6 +105,7 @@ class Policy:
|
||||
self._hostkey_sizes = None # type: Optional[Dict[str, int]]
|
||||
self._cakey_sizes = None # type: Optional[Dict[str, int]]
|
||||
self._dh_modulus_sizes = None # type: Optional[Dict[str, int]]
|
||||
self._server_policy = True
|
||||
|
||||
if (policy_file is None) and (policy_data is None):
|
||||
raise RuntimeError('policy_file and policy_data must not both be None.')
|
||||
@ -134,7 +135,7 @@ class Policy:
|
||||
key = key.strip()
|
||||
val = val.strip()
|
||||
|
||||
if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'):
|
||||
if key not in ['name', 'version', 'banner', 'compressions', 'host keys', 'key exchanges', 'ciphers', 'macs', 'client policy'] and not key.startswith('hostkey_size_') and not key.startswith('cakey_size_') and not key.startswith('dh_modulus_size_'):
|
||||
raise ValueError("invalid field found in policy: %s" % line)
|
||||
|
||||
if key in ['name', 'banner']:
|
||||
@ -190,6 +191,8 @@ class Policy:
|
||||
if self._dh_modulus_sizes is None:
|
||||
self._dh_modulus_sizes = {}
|
||||
self._dh_modulus_sizes[dh_modulus_type] = int(val)
|
||||
elif key.startswith('client policy') and val.lower() == 'true':
|
||||
self._server_policy = False
|
||||
|
||||
|
||||
if self._name is None:
|
||||
@ -199,7 +202,7 @@ class Policy:
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create(host: str, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> str:
|
||||
def create(source: Optional[str], banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'], client_audit: bool) -> str:
|
||||
'''Creates a policy based on a server configuration. Returns a string.'''
|
||||
|
||||
today = date.today().strftime('%Y/%m/%d')
|
||||
@ -211,6 +214,10 @@ class Policy:
|
||||
rsa_hostkey_sizes_str = ''
|
||||
rsa_cakey_sizes_str = ''
|
||||
dh_modulus_sizes_str = ''
|
||||
client_policy_str = ''
|
||||
|
||||
if client_audit:
|
||||
client_policy_str = "\n# Set to true to signify this is a policy for clients, not servers.\nclient policy = true\n"
|
||||
|
||||
if kex is not None:
|
||||
if kex.server.compression is not None:
|
||||
@ -248,7 +255,7 @@ class Policy:
|
||||
policy_data = '''#
|
||||
# Custom policy based on %s (created on %s)
|
||||
#
|
||||
|
||||
%s
|
||||
# The name of this policy (displayed in the output during scans). Must be in quotes.
|
||||
name = "Custom Policy (based on %s on %s)"
|
||||
|
||||
@ -272,7 +279,7 @@ ciphers = %s
|
||||
|
||||
# The MACs that must match exactly (order matters).
|
||||
macs = %s
|
||||
''' % (host, today, host, today, banner, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs)
|
||||
''' % (source, today, client_policy_str, source, today, banner, compressions, rsa_hostkey_sizes_str, rsa_cakey_sizes_str, dh_modulus_sizes_str, host_keys, kex_algs, ciphers, macs)
|
||||
|
||||
return policy_data
|
||||
|
||||
@ -353,6 +360,11 @@ macs = %s
|
||||
return '%s v%s' % (self._name, self._version)
|
||||
|
||||
|
||||
def is_server_policy(self) -> bool:
|
||||
'''Returns True if this is a server policy, or False if this is a client policy.'''
|
||||
return self._server_policy
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
undefined = '{undefined}'
|
||||
|
||||
@ -552,6 +564,16 @@ class AuditConf:
|
||||
print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()))
|
||||
sys.exit(-1)
|
||||
|
||||
# If the user wants to do a client audit, but provided a server policy, terminate.
|
||||
if aconf.client_audit and aconf.policy.is_server_policy():
|
||||
print("Error: client audit selected, but server policy provided.")
|
||||
sys.exit(-1)
|
||||
|
||||
# If the user wants to do a server audit, but provided a client policy, terminate.
|
||||
if aconf.client_audit is False and aconf.policy.is_server_policy() is False:
|
||||
print("Error: server audit selected, but client policy provided.")
|
||||
sys.exit(-1)
|
||||
|
||||
return aconf
|
||||
|
||||
|
||||
@ -2270,7 +2292,7 @@ class SSH: # pylint: disable=too-few-public-methods
|
||||
self.__ipvo = ()
|
||||
self.__timeout = timeout
|
||||
self.__timeout_set = timeout_set
|
||||
self.client_host = None
|
||||
self.client_host = None # type: Optional[str]
|
||||
self.client_port = None
|
||||
|
||||
def _resolve(self, ipvo: Sequence[int]) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
|
||||
@ -3260,8 +3282,14 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos
|
||||
return passed
|
||||
|
||||
|
||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex']) -> None:
|
||||
policy_data = Policy.create(aconf.host, banner, kex)
|
||||
def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'], client_host: Optional[str]) -> None:
|
||||
|
||||
# Set the source of this policy to the server host if this is a server audit, otherwise set it to the client address.
|
||||
source = aconf.host # type: Optional[str]
|
||||
if aconf.client_audit:
|
||||
source = client_host
|
||||
|
||||
policy_data = Policy.create(source, banner, kex, aconf.client_audit)
|
||||
|
||||
if aconf.policy_file is None:
|
||||
raise RuntimeError('Internal error: cannot write policy file since filename is None!')
|
||||
@ -3563,7 +3591,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal
|
||||
|
||||
# A new policy should be made from this scan.
|
||||
elif (aconf.policy is None) and (aconf.make_policy is True):
|
||||
make_policy(aconf, banner, kex=kex)
|
||||
make_policy(aconf, banner, kex, s.client_host)
|
||||
|
||||
else:
|
||||
raise RuntimeError('Internal error while handling output: %r %r' % (aconf.policy is None, aconf.make_policy))
|
||||
|
@ -186,7 +186,7 @@ macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
'''Creates a policy from a kex and ensures it is generated exactly as expected.'''
|
||||
|
||||
kex = self._get_kex()
|
||||
pol_data = self.Policy.create('www.l0l.com', 'bannerX', kex)
|
||||
pol_data = self.Policy.create('www.l0l.com', 'bannerX', kex, False)
|
||||
|
||||
# Today's date is embedded in the policy, so filter it out to get repeatable results.
|
||||
pol_data = pol_data.replace(date.today().strftime('%Y/%m/%d'), '[todays date]')
|
||||
@ -199,7 +199,7 @@ macs = mac_alg1, mac_alg2, mac_alg3'''
|
||||
'''Creates a policy and evaluates it against the same server'''
|
||||
|
||||
kex = self._get_kex()
|
||||
policy_data = self.Policy.create('www.l0l.com', None, kex)
|
||||
policy_data = self.Policy.create('www.l0l.com', None, kex, False)
|
||||
policy = self.Policy(policy_data=policy_data)
|
||||
|
||||
ret, errors = policy.evaluate('SSH Server 1.0', kex)
|
||||
|
Loading…
Reference in New Issue
Block a user