diff --git a/src/ssh_audit/__main__.py b/src/ssh_audit/__main__.py index 0532ace..9ae64fa 100644 --- a/src/ssh_audit/__main__.py +++ b/src/ssh_audit/__main__.py @@ -1,2 +1,16 @@ +import sys +import traceback + from ssh_audit.ssh_audit import main -main() +from ssh_audit import exitcodes + + +exit_code = exitcodes.GOOD + +try: + exit_code = main() +except Exception: + exit_code = exitcodes.UNKNOWN_ERROR + print(traceback.format_exc()) + +sys.exit(exit_code) diff --git a/src/ssh_audit/exitcodes.py b/src/ssh_audit/exitcodes.py new file mode 100644 index 0000000..5392f1d --- /dev/null +++ b/src/ssh_audit/exitcodes.py @@ -0,0 +1,6 @@ +# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively. +FAILURE = 3 +WARNING = 2 +CONNECTION_ERROR = 1 +GOOD = 0 +UNKNOWN_ERROR = -1 diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 4509332..03b9558 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -44,16 +44,13 @@ import traceback from typing import Dict, List, Set, Sequence, Tuple, Iterable from typing import Callable, Optional, Union, Any +from ssh_audit import exitcodes + + VERSION = 'v2.3.1-dev' SSH_HEADER = 'SSH-{0}-OpenSSH_8.2' # SSH software to impersonate GITHUB_ISSUES_URL = 'https://github.com/jtesta/ssh-audit/issues' # The URL to the Github issues tracker. -# The program return values corresponding to failure(s) encountered, warning(s) encountered, connection errors, and no problems found, respectively. -PROGRAM_RETVAL_FAILURE = 3 -PROGRAM_RETVAL_WARNING = 2 -PROGRAM_RETVAL_CONNECTION_ERROR = 1 -PROGRAM_RETVAL_GOOD = 0 -PROGRAM_RETVAL_UNKNOWN_ERROR = -1 try: # pragma: nocover from colorama import init as colorama_init @@ -63,13 +60,13 @@ except ImportError: # pragma: nocover def usage(err: Optional[str] = None) -> None: - retval = PROGRAM_RETVAL_GOOD + retval = exitcodes.GOOD uout = Output() p = os.path.basename(sys.argv[0]) uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) if err is not None and len(err) > 0: uout.fail('\n' + err) - retval = PROGRAM_RETVAL_UNKNOWN_ERROR + retval = exitcodes.UNKNOWN_ERROR uout.info('usage: {0} [options] \n'.format(p)) uout.info(' -h, --help print this help') uout.info(' -1, --ssh1 force ssh version 1 only') @@ -727,7 +724,7 @@ class AuditConf: if aconf.list_policies: list_policies() - sys.exit(PROGRAM_RETVAL_GOOD) + sys.exit(exitcodes.GOOD) if aconf.client_audit is False and aconf.target_file is None: if oport is not None: @@ -767,17 +764,17 @@ class AuditConf: aconf.policy = Policy(policy_file=aconf.policy_file) except Exception as e: print("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) - sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) + sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a client audit, but provided a server policy, terminate. if aconf.client_audit and aconf.policy.is_server_policy(): print("Error: client audit selected, but server policy provided.") - sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) + sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a server audit, but provided a client policy, terminate. if aconf.client_audit is False and aconf.policy.is_server_policy() is False: print("Error: server audit selected, but client policy provided.") - sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) + sys.exit(exitcodes.UNKNOWN_ERROR) return aconf @@ -2548,7 +2545,7 @@ class SSH: # pylint: disable=too-few-public-methods yield af, addr except socket.error as e: out.fail('[exception] {}'.format(e)) - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) # Listens on a server socket and accepts one connection (used for # auditing client connections). @@ -2578,7 +2575,7 @@ class SSH: # pylint: disable=too-few-public-methods # If we failed to listen on any interfaces, terminate. if len(self.__sock_map.keys()) == 0: print("Error: failed to listen on any IPv4 and IPv6 interfaces!") - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) # Wait for an incoming connection. If a timeout was explicitly # set by the user, terminate when it elapses. @@ -2596,7 +2593,7 @@ class SSH: # pylint: disable=too-few-public-methods if self.__timeout_set and time_elapsed >= self.__timeout: print("Timeout elapsed. Terminating...") - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) # Accept the connection. c, addr = self.__sock_map[fds[0][0]].accept() @@ -2732,7 +2729,7 @@ class SSH: # pylint: disable=too-few-public-methods check_size = 4 + 1 + payload_length + padding_length if check_size % self.__block_size != 0: out.fail('[exception] invalid ssh packet (block size)') - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) self.ensure_read(payload_length) if sshv == 1: payload = self.read(payload_length - 4) @@ -2747,7 +2744,7 @@ class SSH: # pylint: disable=too-few-public-methods rcrc = SSH1.crc32(padding + payload) if crc != rcrc: out.fail('[exception] packet checksum CRC32 mismatch.') - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) else: self.ensure_read(padding_length) padding = self.read(padding_length) @@ -3189,9 +3186,9 @@ def output_algorithm(alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], al first = True for level, text in texts: if level == 'fail': - program_retval = PROGRAM_RETVAL_FAILURE - elif level == 'warn' and program_retval != PROGRAM_RETVAL_FAILURE: # If a failure was found previously, don't downgrade to warning. - program_retval = PROGRAM_RETVAL_WARNING + program_retval = exitcodes.FAILURE + elif level == 'warn' and program_retval != exitcodes.FAILURE: # If a failure was found previously, don't downgrade to warning. + program_retval = exitcodes.WARNING f = getattr(out, level) comment = (padding + ' -- [' + level + '] ' + text) if text != '' else '' @@ -3414,10 +3411,10 @@ def output_info(software: Optional['SSH.Software'], client_audit: bool, any_prob out.sep() -# Returns a PROGRAM_RETVAL_* flag to denote if any failures or warnings were encountered. +# Returns a exitcodes.* flag to denote if any failures or warnings were encountered. def output(aconf: AuditConf, banner: Optional[SSH.Banner], header: List[str], client_host: Optional[str] = None, kex: Optional[SSH2.Kex] = None, pkm: Optional[SSH1.PublicKeyMessage] = None, print_target: bool = False) -> int: - program_retval = PROGRAM_RETVAL_GOOD + program_retval = exitcodes.GOOD client_audit = client_host is not None # If set, this is a client audit. sshv = 1 if pkm is not None else 2 algs = SSH.Algorithms(pkm, kex) @@ -3554,7 +3551,7 @@ def list_policies() -> None: print("\nsys.argv[0]: %s" % sys.argv[0]) print("__file__: %s" % __file__) print("policies_dir: %s" % policies_dir) - sys.exit(PROGRAM_RETVAL_UNKNOWN_ERROR) + sys.exit(exitcodes.UNKNOWN_ERROR) # Get a list of all the files in the policies sub-directory. files = [] @@ -3717,9 +3714,9 @@ def build_struct(banner: Optional['SSH.Banner'], kex: Optional['SSH2.Kex'] = Non return res -# Returns one of the PROGRAM_RETVAL_* flags. +# Returns one of the exitcodes.* flags. def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: - program_retval = PROGRAM_RETVAL_GOOD + program_retval = exitcodes.GOOD out.batch = aconf.batch out.verbose = aconf.verbose out.level = aconf.level @@ -3731,7 +3728,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal err = s.connect() if err is not None: out.fail(err) - sys.exit(PROGRAM_RETVAL_CONNECTION_ERROR) + sys.exit(exitcodes.CONNECTION_ERROR) if sshv is None: sshv = 2 if aconf.ssh2 else 1 @@ -3771,7 +3768,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal if err is not None: output(aconf, banner, header) out.fail(err) - return PROGRAM_RETVAL_CONNECTION_ERROR + return exitcodes.CONNECTION_ERROR if sshv == 1: program_retval = output(aconf, banner, header, pkm=SSH1.PublicKeyMessage.parse(payload)) elif sshv == 2: @@ -3786,7 +3783,7 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal # This is a policy test. elif (aconf.policy is not None) and (aconf.make_policy is False): - program_retval = PROGRAM_RETVAL_GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else PROGRAM_RETVAL_FAILURE + program_retval = exitcodes.GOOD if evaluate_policy(aconf, banner, s.client_host, kex=kex) else exitcodes.FAILURE # A new policy should be made from this scan. elif (aconf.policy is None) and (aconf.make_policy is True): @@ -3799,8 +3796,8 @@ def audit(aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = Fal def algorithm_lookup(alg_names: str) -> int: - '''Looks up a comma-separated list of algorithms and outputs their security properties. Returns a PROGRAM_RETVAL_* flag.''' - retval = PROGRAM_RETVAL_GOOD + '''Looks up a comma-separated list of algorithms and outputs their security properties. Returns an exitcodes.* flag.''' + retval = exitcodes.GOOD alg_types = { 'kex': 'key exchange algorithms', 'key': 'host-key algorithms', @@ -3845,7 +3842,7 @@ def algorithm_lookup(alg_names: str) -> int: ] if len(algorithms_not_found) > 0: - retval = PROGRAM_RETVAL_FAILURE + retval = exitcodes.FAILURE out.head('# unknown algorithms') for algorithm_not_found in algorithms_not_found: out.fail(algorithm_not_found) @@ -3866,7 +3863,7 @@ def main() -> int: # If multiple targets were specified... if len(aconf.target_list) > 0: - ret = PROGRAM_RETVAL_GOOD + ret = exitcodes.GOOD # If JSON output is desired, each target's results will be reported in its own list entry. if aconf.json: @@ -3882,7 +3879,7 @@ def main() -> int: new_ret = audit(aconf, print_target=True) # Set the return value only if an unknown error occurred, a failure occurred, or if a warning occurred and the previous value was good. - if (new_ret == PROGRAM_RETVAL_UNKNOWN_ERROR) or (new_ret == PROGRAM_RETVAL_FAILURE) or ((new_ret == PROGRAM_RETVAL_WARNING) and (ret == PROGRAM_RETVAL_GOOD)): + if (new_ret == exitcodes.UNKNOWN_ERROR) or (new_ret == exitcodes.FAILURE) or ((new_ret == exitcodes.WARNING) and (ret == exitcodes.GOOD)): ret = new_ret # Don't print a delimiter after the last target was handled. @@ -3901,12 +3898,12 @@ def main() -> int: if __name__ == '__main__': # pragma: nocover - exit_code = PROGRAM_RETVAL_GOOD + exit_code = exitcodes.GOOD try: exit_code = main() except Exception: - exit_code = PROGRAM_RETVAL_UNKNOWN_ERROR + exit_code = exitcodes.UNKNOWN_ERROR print(traceback.format_exc()) sys.exit(exit_code) diff --git a/ssh-audit.py b/ssh-audit.py index 4f55bf0..f0cf144 100755 --- a/ssh-audit.py +++ b/ssh-audit.py @@ -2,10 +2,20 @@ """src/ssh_audit/ssh_audit.py wrapper for backwards compatibility""" import sys +import traceback from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent / "src")) from ssh_audit.ssh_audit import main # noqa: E402 +from ssh_audit import exitcodes # noqa: E402 -main() +exit_code = exitcodes.GOOD + +try: + exit_code = main() +except Exception: + exit_code = exitcodes.UNKNOWN_ERROR + print(traceback.format_exc()) + +sys.exit(exit_code) diff --git a/tox.ini b/tox.ini index d4148fa..4426883 100644 --- a/tox.ini +++ b/tox.ini @@ -93,6 +93,7 @@ indent-string = " " disable = bad-continuation, broad-except, + duplicate-code, fixme, invalid-name, line-too-long, @@ -125,9 +126,9 @@ max-module-lines = 2500 [flake8] ignore = - E241, # multiple spaces after operator; should be kept for tabular data + E241, # multiple spaces after operator; should be kept for tabular data E303, # too many blank lines - E501, # line too long + E501, # line too long [pytest] junit_family = xunit1