mirror of
				https://github.com/jtesta/ssh-audit.git
				synced 2025-10-30 21:15:27 +01:00 
			
		
		
		
	Use brighter colors on Windows for better readability. Disable unicode characters on Windows since the default terminal does not display them properly.
This commit is contained in:
		
							
								
								
									
										283
									
								
								ssh-audit.py
									
									
									
									
									
								
							
							
						
						
									
										283
									
								
								ssh-audit.py
									
									
									
									
									
								
							| @@ -456,6 +456,140 @@ macs = %s | |||||||
|         return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str) |         return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s" % (name, version, banner, compressions_str, host_keys_str, kex_str, ciphers_str, macs_str) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Utils: | ||||||
|  |     @classmethod | ||||||
|  |     def _type_err(cls, v: Any, target: str) -> TypeError: | ||||||
|  |         return TypeError('cannot convert {} to {}'.format(type(v), target)) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def to_bytes(cls, v: Union[bytes, str], enc: str = 'utf-8') -> bytes: | ||||||
|  |         if isinstance(v, bytes): | ||||||
|  |             return v | ||||||
|  |         elif isinstance(v, str): | ||||||
|  |             return v.encode(enc) | ||||||
|  |         raise cls._type_err(v, 'bytes') | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def to_text(cls, v: Union[str, bytes], enc: str = 'utf-8') -> str: | ||||||
|  |         if isinstance(v, str): | ||||||
|  |             return v | ||||||
|  |         elif isinstance(v, bytes): | ||||||
|  |             return v.decode(enc) | ||||||
|  |         raise cls._type_err(v, 'unicode text') | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def _is_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127) -> bool: | ||||||
|  |         r = False | ||||||
|  |         if isinstance(v, str): | ||||||
|  |             for c in v: | ||||||
|  |                 i = cls.ctoi(c) | ||||||
|  |                 if not char_filter(i): | ||||||
|  |                     return r | ||||||
|  |             r = True | ||||||
|  |         return r | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def _to_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127, errors: str = 'replace') -> str: | ||||||
|  |         if isinstance(v, str): | ||||||
|  |             r = bytearray() | ||||||
|  |             for c in v: | ||||||
|  |                 i = cls.ctoi(c) | ||||||
|  |                 if char_filter(i): | ||||||
|  |                     r.append(i) | ||||||
|  |                 else: | ||||||
|  |                     if errors == 'ignore': | ||||||
|  |                         continue | ||||||
|  |                     r.append(63) | ||||||
|  |             return cls.to_text(r.decode('ascii')) | ||||||
|  |         raise cls._type_err(v, 'ascii') | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def is_ascii(cls, v: str) -> bool: | ||||||
|  |         return cls._is_ascii(v) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def to_ascii(cls, v: str, errors: str = 'replace') -> str: | ||||||
|  |         return cls._to_ascii(v, errors=errors) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def is_print_ascii(cls, v: str) -> bool: | ||||||
|  |         return cls._is_ascii(v, lambda x: 126 >= x >= 32) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def to_print_ascii(cls, v: str, errors: str = 'replace') -> str: | ||||||
|  |         return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]: | ||||||
|  |         seen = set()  # type: Set[Any] | ||||||
|  |  | ||||||
|  |         def _seen_add(x: Any) -> bool: | ||||||
|  |             seen.add(x) | ||||||
|  |             return False | ||||||
|  |  | ||||||
|  |         if isinstance(seq, tuple): | ||||||
|  |             return tuple(x for x in seq if x not in seen and not _seen_add(x)) | ||||||
|  |         else: | ||||||
|  |             return [x for x in seq if x not in seen and not _seen_add(x)] | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def ctoi(cls, c: Union[str, int]) -> int: | ||||||
|  |         if isinstance(c, str): | ||||||
|  |             return ord(c[0]) | ||||||
|  |         else: | ||||||
|  |             return c | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def parse_int(v: Any) -> int: | ||||||
|  |         try: | ||||||
|  |             return int(v) | ||||||
|  |         except Exception:  # pylint: disable=bare-except | ||||||
|  |             return 0 | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def parse_float(v: Any) -> float: | ||||||
|  |         try: | ||||||
|  |             return float(v) | ||||||
|  |         except Exception:  # pylint: disable=bare-except | ||||||
|  |             return -1.0 | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: | ||||||
|  |         '''Parses a string into a tuple of its host and port.  The port is 0 if not specified.''' | ||||||
|  |         host = host_and_port | ||||||
|  |         port = 0 | ||||||
|  |  | ||||||
|  |         mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) | ||||||
|  |         if mx is not None: | ||||||
|  |             host = mx.group(1) | ||||||
|  |             port_str = mx.group(2) | ||||||
|  |             if port_str is not None: | ||||||
|  |                 port = int(port_str) | ||||||
|  |         else: | ||||||
|  |             s = host_and_port.split(':') | ||||||
|  |             if len(s) == 2: | ||||||
|  |                 host = s[0] | ||||||
|  |                 if len(s[1]) > 0: | ||||||
|  |                     port = int(s[1]) | ||||||
|  |  | ||||||
|  |         return host, port | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def is_ipv6_address(address: str) -> bool: | ||||||
|  |         '''Returns True if address is an IPv6 address, otherwise False.''' | ||||||
|  |         is_ipv6 = True | ||||||
|  |         try: | ||||||
|  |             ipaddress.IPv6Address(address) | ||||||
|  |         except ipaddress.AddressValueError: | ||||||
|  |             is_ipv6 = False | ||||||
|  |  | ||||||
|  |         return is_ipv6 | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def is_windows() -> bool: | ||||||
|  |         return sys.platform in ['win32', 'cygwin'] | ||||||
|  |  | ||||||
|  |  | ||||||
| class AuditConf: | class AuditConf: | ||||||
|     # pylint: disable=too-many-instance-attributes |     # pylint: disable=too-many-instance-attributes | ||||||
|     def __init__(self, host: str = '', port: int = 22) -> None: |     def __init__(self, host: str = '', port: int = 22) -> None: | ||||||
| @@ -652,6 +786,10 @@ class Output: | |||||||
|     LEVELS = ('info', 'warn', 'fail')  # type: Sequence[str] |     LEVELS = ('info', 'warn', 'fail')  # type: Sequence[str] | ||||||
|     COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} |     COLORS = {'head': 36, 'good': 32, 'warn': 33, 'fail': 31} | ||||||
|  |  | ||||||
|  |     # Use brighter colors on Windows for better readability. | ||||||
|  |     if Utils.is_windows(): | ||||||
|  |         COLORS = {'head': 96, 'good': 92, 'warn': 93, 'fail': 91} | ||||||
|  |  | ||||||
|     def __init__(self) -> None: |     def __init__(self) -> None: | ||||||
|         self.batch = False |         self.batch = False | ||||||
|         self.verbose = False |         self.verbose = False | ||||||
| @@ -3388,10 +3526,18 @@ def evaluate_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], client_hos | |||||||
|             print("Host:   %s" % host) |             print("Host:   %s" % host) | ||||||
|         print("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) |         print("Policy: %s%s" % (spacing, aconf.policy.get_name_and_version())) | ||||||
|         print("Result: %s" % spacing, end='') |         print("Result: %s" % spacing, end='') | ||||||
|  |  | ||||||
|  |         # Use these nice unicode characters in the result message, unless we're on Windows (the cmd.exe terminal doesn't display them properly). | ||||||
|  |         icon_good = "✔ " | ||||||
|  |         icon_fail = "❌ " | ||||||
|  |         if Utils.is_windows(): | ||||||
|  |             icon_good = "" | ||||||
|  |             icon_fail = "" | ||||||
|  |  | ||||||
|         if passed: |         if passed: | ||||||
|             out.good("✔ Passed") |             out.good("%sPassed" % icon_good) | ||||||
|         else: |         else: | ||||||
|             out.fail("❌ Failed!") |             out.fail("%sFailed!" % icon_fail) | ||||||
|             out.warn("\nErrors:\n%s" % error_str) |             out.warn("\nErrors:\n%s" % error_str) | ||||||
|  |  | ||||||
|     return passed |     return passed | ||||||
| @@ -3406,7 +3552,8 @@ def list_policies() -> None: | |||||||
|     # If the path is not a directory, print a useful error and exit. |     # If the path is not a directory, print a useful error and exit. | ||||||
|     if not os.path.isdir(policies_dir): |     if not os.path.isdir(policies_dir): | ||||||
|         print("Error: could not find policies directory.  Please report this full output to <%s>:" % GITHUB_ISSUES_URL) |         print("Error: could not find policies directory.  Please report this full output to <%s>:" % GITHUB_ISSUES_URL) | ||||||
|         print("\n__file__: %s" % __file__) |         print("\nsys.argv[0]: %s" % sys.argv[0]) | ||||||
|  |         print("__file__: %s" % __file__) | ||||||
|         print("policies_dir: %s" % policies_dir) |         print("policies_dir: %s" % policies_dir) | ||||||
|         sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) |         sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) | ||||||
|  |  | ||||||
| @@ -3474,136 +3621,6 @@ def make_policy(aconf: AuditConf, banner: Optional['SSH.Banner'], kex: Optional[ | |||||||
|         print("Error: file already exists: %s" % aconf.policy_file) |         print("Error: file already exists: %s" % aconf.policy_file) | ||||||
|  |  | ||||||
|  |  | ||||||
| class Utils: |  | ||||||
|     @classmethod |  | ||||||
|     def _type_err(cls, v: Any, target: str) -> TypeError: |  | ||||||
|         return TypeError('cannot convert {} to {}'.format(type(v), target)) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def to_bytes(cls, v: Union[bytes, str], enc: str = 'utf-8') -> bytes: |  | ||||||
|         if isinstance(v, bytes): |  | ||||||
|             return v |  | ||||||
|         elif isinstance(v, str): |  | ||||||
|             return v.encode(enc) |  | ||||||
|         raise cls._type_err(v, 'bytes') |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def to_text(cls, v: Union[str, bytes], enc: str = 'utf-8') -> str: |  | ||||||
|         if isinstance(v, str): |  | ||||||
|             return v |  | ||||||
|         elif isinstance(v, bytes): |  | ||||||
|             return v.decode(enc) |  | ||||||
|         raise cls._type_err(v, 'unicode text') |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def _is_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127) -> bool: |  | ||||||
|         r = False |  | ||||||
|         if isinstance(v, str): |  | ||||||
|             for c in v: |  | ||||||
|                 i = cls.ctoi(c) |  | ||||||
|                 if not char_filter(i): |  | ||||||
|                     return r |  | ||||||
|             r = True |  | ||||||
|         return r |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def _to_ascii(cls, v: str, char_filter: Callable[[int], bool] = lambda x: x <= 127, errors: str = 'replace') -> str: |  | ||||||
|         if isinstance(v, str): |  | ||||||
|             r = bytearray() |  | ||||||
|             for c in v: |  | ||||||
|                 i = cls.ctoi(c) |  | ||||||
|                 if char_filter(i): |  | ||||||
|                     r.append(i) |  | ||||||
|                 else: |  | ||||||
|                     if errors == 'ignore': |  | ||||||
|                         continue |  | ||||||
|                     r.append(63) |  | ||||||
|             return cls.to_text(r.decode('ascii')) |  | ||||||
|         raise cls._type_err(v, 'ascii') |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def is_ascii(cls, v: str) -> bool: |  | ||||||
|         return cls._is_ascii(v) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def to_ascii(cls, v: str, errors: str = 'replace') -> str: |  | ||||||
|         return cls._to_ascii(v, errors=errors) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def is_print_ascii(cls, v: str) -> bool: |  | ||||||
|         return cls._is_ascii(v, lambda x: 126 >= x >= 32) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def to_print_ascii(cls, v: str, errors: str = 'replace') -> str: |  | ||||||
|         return cls._to_ascii(v, lambda x: 126 >= x >= 32, errors) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def unique_seq(cls, seq: Sequence[Any]) -> Sequence[Any]: |  | ||||||
|         seen = set()  # type: Set[Any] |  | ||||||
|  |  | ||||||
|         def _seen_add(x: Any) -> bool: |  | ||||||
|             seen.add(x) |  | ||||||
|             return False |  | ||||||
|  |  | ||||||
|         if isinstance(seq, tuple): |  | ||||||
|             return tuple(x for x in seq if x not in seen and not _seen_add(x)) |  | ||||||
|         else: |  | ||||||
|             return [x for x in seq if x not in seen and not _seen_add(x)] |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def ctoi(cls, c: Union[str, int]) -> int: |  | ||||||
|         if isinstance(c, str): |  | ||||||
|             return ord(c[0]) |  | ||||||
|         else: |  | ||||||
|             return c |  | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def parse_int(v: Any) -> int: |  | ||||||
|         try: |  | ||||||
|             return int(v) |  | ||||||
|         except Exception:  # pylint: disable=bare-except |  | ||||||
|             return 0 |  | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def parse_float(v: Any) -> float: |  | ||||||
|         try: |  | ||||||
|             return float(v) |  | ||||||
|         except Exception:  # pylint: disable=bare-except |  | ||||||
|             return -1.0 |  | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def parse_host_and_port(host_and_port: str) -> Tuple[str, int]: |  | ||||||
|         '''Parses a string into a tuple of its host and port.  The port is 0 if not specified.''' |  | ||||||
|         host = host_and_port |  | ||||||
|         port = 0 |  | ||||||
|  |  | ||||||
|         mx = re.match(r'^\[([^\]]+)\](?::(\d+))?$', host_and_port) |  | ||||||
|         if mx is not None: |  | ||||||
|             host = mx.group(1) |  | ||||||
|             port_str = mx.group(2) |  | ||||||
|             if port_str is not None: |  | ||||||
|                 port = int(port_str) |  | ||||||
|         else: |  | ||||||
|             s = host_and_port.split(':') |  | ||||||
|             if len(s) == 2: |  | ||||||
|                 host = s[0] |  | ||||||
|                 if len(s[1]) > 0: |  | ||||||
|                     port = int(s[1]) |  | ||||||
|  |  | ||||||
|         return host, port |  | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def is_ipv6_address(address: str) -> bool: |  | ||||||
|         '''Returns True if address is an IPv6 address, otherwise False.''' |  | ||||||
|         is_ipv6 = True |  | ||||||
|         try: |  | ||||||
|             ipaddress.IPv6Address(address) |  | ||||||
|         except ipaddress.AddressValueError: |  | ||||||
|             is_ipv6 = False |  | ||||||
|  |  | ||||||
|         return is_ipv6 |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: | def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = None, pkm: Optional['SSH1.PublicKeyMessage'] = None, client_host: Optional[str] = None) -> Any: | ||||||
|  |  | ||||||
|     banner_str = '' |     banner_str = '' | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Joe Testa
					Joe Testa