Merge develop branch.

This commit is contained in:
Andris Raugulis 2016-10-14 08:59:31 +03:00
commit 98717198c2
13 changed files with 1493 additions and 254 deletions

View File

@ -8,7 +8,11 @@ python:
- pypy
- pypy3
install:
- pip install pytest
- pip install --upgrade pytest
- pip install --upgrade pytest-cov
- pip install --upgrade coveralls
script:
- py.test -v test
- py.test --cov-report= --cov=ssh-audit -v test
after_success:
- coveralls

View File

@ -7,10 +7,11 @@
- grab banner, recognize device or software and operating system, detect compression;
- gather key-exchange, host-key, encryption and message authentication code algorithms;
- output algorithm information (available since, removed/disabled, unsafe/weak/legacy, etc);
- output algorithm recommendations (append or remove based on recognized software version);
- output security information (related issues, assigned CVE list, etc);
- analyze SSH version compatibility based on algorithm information;
- historical information from OpenSSH and Dropbear SSH;
- no dependencies, compatible with Python2 and Python3;
- historical information from OpenSSH, Dropbear SSH and libssh;
- no dependencies, compatible with Python 2.6+, Python 3.x and PyPy;
## Usage
```
@ -28,9 +29,17 @@ usage: ssh-audit.py [-bnv] [-l <level>] <host[:port]>
* verbose flag `-v` will prefix each line with section type and algorithm name.
### example
![screenshot](https://cloud.githubusercontent.com/assets/7356025/17623665/da5281c8-60a9-11e6-9582-13f9971c22e0.png)
![screenshot](https://cloud.githubusercontent.com/assets/7356025/19233757/3e09b168-8ef0-11e6-91b4-e880bacd0b8a.png)
## ChangeLog
### v1.6.0 (2016-10-14)
- implement algorithm recommendations section (based on recognized software)
- implement full libssh support (version history, algorithms, security, etc)
- fix SSH-1.99 banner recognition and version comparison functionality
- do not output empty algorithms (happens for misconfigured servers)
- make consistent output for Python 3.x versions
- add a lot more tests (conf, banner, software, SSH1/SSH2, output, etc)
- use Travis CI to test for multiple Python versions (2.6-3.5, pypy, pypy3)
### v1.5.0 (2016-09-20)
- create security section for related security information

View File

@ -26,13 +26,12 @@
from __future__ import print_function
import os, io, sys, socket, struct, random, errno, getopt, re, hashlib, base64
VERSION = 'v1.5.0'
VERSION = 'v1.5.1.dev'
def usage(err=None):
out = Output()
p = os.path.basename(sys.argv[0])
out.batch = False
out.minlevel = 'info'
out.head('# {0} {1}, moo@arthepsy.eu'.format(p, VERSION))
if err is not None:
out.fail('\n' + err)
@ -49,43 +48,78 @@ def usage(err=None):
class AuditConf(object):
def __init__(self):
self.__host = None
self.__port = 22
self.__ssh1 = False
self.__ssh2 = False
def __init__(self, host=None, port=22):
self.host = host
self.port = port
self.ssh1 = True
self.ssh2 = True
self.batch = False
self.colors = True
self.verbose = False
self.minlevel = 'info'
@property
def host(self):
return self.__host
def __setattr__(self, name, value):
valid = False
if name in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']:
valid, value = True, True if value else False
elif name == 'port':
valid, port = True, utils.parse_int(value)
if port < 1 or port > 65535:
raise ValueError('invalid port: {0}'.format(value))
value = port
elif name in ['minlevel']:
if value not in ('info', 'warn', 'fail'):
raise ValueError('invalid level: {0}'.format(value))
valid = True
elif name == 'host':
valid = True
if valid:
object.__setattr__(self, name, value)
@host.setter
def host(self, v):
self.__host = v
@property
def port(self):
return self.__port
@port.setter
def port(self, v):
self.__port = v
@property
def ssh1(self):
return self.__ssh1
@ssh1.setter
def ssh1(self, v):
self.__ssh1 = v
@property
def ssh2(self):
return self.__ssh2
@ssh2.setter
def ssh2(self, v):
self.__ssh2 = v
@classmethod
def from_cmdline(cls, args, usage_cb):
conf = cls()
try:
sopts = 'h12bnvl:'
lopts = ['help', 'ssh1', 'ssh2', 'batch',
'no-colors', 'verbose', 'level=']
opts, args = getopt.getopt(args, sopts, lopts)
except getopt.GetoptError as err:
usage_cb(str(err))
conf.ssh1, conf.ssh2 = False, False
for o, a in opts:
if o in ('-h', '--help'):
usage_cb()
elif o in ('-1', '--ssh1'):
conf.ssh1 = True
elif o in ('-2', '--ssh2'):
conf.ssh2 = True
elif o in ('-b', '--batch'):
conf.batch = True
conf.verbose = True
elif o in ('-n', '--no-colors'):
conf.colors = False
elif o in ('-v', '--verbose'):
conf.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage_cb('level {0} is not valid'.format(a))
conf.minlevel = a
if len(args) == 0:
usage_cb()
s = args[0].split(':')
host, port = s[0].strip(), 22
if len(s) > 1:
port = utils.parse_int(s[1])
if not host:
usage_cb('host is empty')
if port <= 0 or port > 65535:
usage_cb('port {0} is not valid'.format(s[1]))
conf.host = host
conf.port = port
if not (conf.ssh1 or conf.ssh2):
conf.ssh1, conf.ssh2 = True, True
return conf
class Output(object):
@ -100,7 +134,9 @@ class Output(object):
@property
def minlevel(self):
return self.__minlevel
if self.__minlevel < len(self.LEVELS):
return self.LEVELS[self.__minlevel]
return 'unknown'
@minlevel.setter
def minlevel(self, name):
@ -122,7 +158,7 @@ class Output(object):
def __getattr__(self, name):
if name == 'head' and self.batch:
return lambda x: None
if not self.getlevel(name) >= self.minlevel:
if not self.getlevel(name) >= self.__minlevel:
return lambda x: None
if self.colors and os.name == 'posix' and name in self.COLORS:
color = u'\033[0;{0}m'.format(self.COLORS[name])
@ -133,7 +169,7 @@ class Output(object):
class OutputBuffer(list):
def __enter__(self):
self.__buf = io.StringIO()
self.__buf = utils.StringIO()
self.__stdout = sys.stdout
sys.stdout = self.__buf
return self
@ -147,39 +183,110 @@ class OutputBuffer(list):
sys.stdout = self.__stdout
class KexParty(object):
encryption = []
mac = []
compression = []
languages = []
class SSH2(object):
class KexParty(object):
def __init__(self, enc, mac, compression, languages):
self.__enc = enc
self.__mac = mac
self.__compression = compression
self.__languages = languages
@property
def encryption(self):
return self.__enc
class Kex(object):
cookie = None
kex_algorithms = []
key_algorithms = []
server = KexParty()
client = KexParty()
follows = False
unused = 0
@property
def mac(self):
return self.__mac
@property
def compression(self):
return self.__compression
@property
def languages(self):
return self.__languages
class Kex(object):
def __init__(self, cookie, kex_algs, key_algs, cli, srv, follows, unused=0):
self.__cookie = cookie
self.__kex_algs = kex_algs
self.__key_algs = key_algs
self.__client = cli
self.__server = srv
self.__follows = follows
self.__unused = unused
@property
def cookie(self):
return self.__cookie
@property
def kex_algorithms(self):
return self.__kex_algs
@property
def key_algorithms(self):
return self.__key_algs
# client_to_server
@property
def client(self):
return self.__client
# server_to_client
@property
def server(self):
return self.__server
@property
def follows(self):
return self.__follows
@property
def unused(self):
return self.__unused
def write(self, wbuf):
wbuf.write(self.cookie)
wbuf.write_list(self.kex_algorithms)
wbuf.write_list(self.key_algorithms)
wbuf.write_list(self.client.encryption)
wbuf.write_list(self.server.encryption)
wbuf.write_list(self.client.mac)
wbuf.write_list(self.server.mac)
wbuf.write_list(self.client.compression)
wbuf.write_list(self.server.compression)
wbuf.write_list(self.client.languages)
wbuf.write_list(self.server.languages)
wbuf.write_bool(self.follows)
wbuf.write_int(self.__unused)
@property
def payload(self):
wbuf = WriteBuf()
self.write(wbuf)
return wbuf.write_flush()
@classmethod
def parse(cls, payload):
kex = cls()
buf = ReadBuf(payload)
kex.cookie = buf.read(16)
kex.kex_algorithms = buf.read_list()
kex.key_algorithms = buf.read_list()
kex.client.encryption = buf.read_list()
kex.server.encryption = buf.read_list()
kex.client.mac = buf.read_list()
kex.server.mac = buf.read_list()
kex.client.compression = buf.read_list()
kex.server.compression = buf.read_list()
kex.client.languages = buf.read_list()
kex.server.languages = buf.read_list()
kex.follows = buf.read_bool()
kex.unused = buf.read_int()
cookie = buf.read(16)
kex_algs = buf.read_list()
key_algs = buf.read_list()
cli_enc = buf.read_list()
srv_enc = buf.read_list()
cli_mac = buf.read_list()
srv_mac = buf.read_list()
cli_compression = buf.read_list()
srv_compression = buf.read_list()
cli_languages = buf.read_list()
srv_languages = buf.read_list()
follows = buf.read_bool()
unused = buf.read_int()
cli = SSH2.KexParty(cli_enc, cli_mac, cli_compression, cli_languages)
srv = SSH2.KexParty(srv_enc, srv_mac, srv_compression, srv_languages)
kex = cls(cookie, kex_algs, key_algs, cli, srv, follows, unused)
return kex
@ -204,12 +311,14 @@ class SSH1(object):
crc = (crc >> 8) ^ self._table[n]
return crc
_crc32 = CRC32()
_crc32 = None
CIPHERS = ['none', 'idea', 'des', '3des', 'tss', 'rc4', 'blowfish']
AUTHS = [None, 'rhosts', 'rsa', 'password', 'rhosts_rsa', 'tis', 'kerberos']
@classmethod
def crc32(cls, v):
if cls._crc32 is None:
cls._crc32 = cls.CRC32()
return cls._crc32.calc(v)
class KexDB(object):
@ -315,6 +424,24 @@ class SSH1(object):
auths.append(SSH1.AUTHS[i])
return auths
def write(self, wbuf):
wbuf.write(self.cookie)
wbuf.write_int(self.server_key_bits)
wbuf.write_mpint1(self.server_key_public_exponent)
wbuf.write_mpint1(self.server_key_public_modulus)
wbuf.write_int(self.host_key_bits)
wbuf.write_mpint1(self.host_key_public_exponent)
wbuf.write_mpint1(self.host_key_public_modulus)
wbuf.write_int(self.protocol_flags)
wbuf.write_int(self.supported_ciphers_mask)
wbuf.write_int(self.supported_authentications_mask)
@property
def payload(self):
wbuf = WriteBuf()
self.write(wbuf)
return wbuf.write_flush()
@classmethod
def parse(cls, payload):
buf = ReadBuf(payload)
@ -337,7 +464,7 @@ class SSH1(object):
class ReadBuf(object):
def __init__(self, data=None):
super(ReadBuf, self).__init__()
self._buf = io.BytesIO(data) if data else io.BytesIO()
self._buf = utils.BytesIO(data) if data else utils.BytesIO()
self._len = len(data) if data else 0
@property
@ -416,7 +543,7 @@ class WriteBuf(object):
return self.write(v)
def write_list(self, v):
self.write_string(u','.join(v))
return self.write_string(u','.join(v))
@classmethod
def _bitlength(cls, n):
@ -454,6 +581,12 @@ class WriteBuf(object):
data = self._create_mpint(n)
return self.write_string(data)
def write_line(self, v):
if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8'))
v += b'\r\n'
return self.write(v)
def write_flush(self):
payload = self._wbuf.getvalue()
self._wbuf.truncate(0)
@ -472,6 +605,7 @@ class SSH(object):
class Product(object):
OpenSSH = 'OpenSSH'
DropbearSSH = 'Dropbear SSH'
LibSSH = 'libssh'
class Software(object):
def __init__(self, vendor, product, version, patch, os):
@ -505,7 +639,7 @@ class SSH(object):
if other is None:
return 1
if isinstance(other, self.__class__):
other = '{0}{1}'.format(other.version, other.patch)
other = '{0}{1}'.format(other.version, other.patch or '')
else:
other = str(other)
mx = re.match(r'^([\d\.]+\d+)(.*)$', other)
@ -517,15 +651,15 @@ class SSH(object):
return -1
elif self.version > oversion:
return 1
spatch = self.patch
spatch = self.patch or ''
if self.product == SSH.Product.DropbearSSH:
if not re.match(r'^test\d.*$', opatch):
opatch = 'z{0}'.format(opatch)
if not re.match(r'^test\d.*$', self.patch):
spatch = 'z{0}'.format(self.patch)
if not re.match(r'^test\d.*$', spatch):
spatch = 'z{0}'.format(spatch)
elif self.product == SSH.Product.OpenSSH:
mx1 = re.match(r'^p\d(.*)', opatch)
mx2 = re.match(r'^p\d(.*)', self.patch)
mx2 = re.match(r'^p\d(.*)', spatch)
if not (mx1 and mx2):
if mx1:
opatch = mx1.group(1)
@ -544,25 +678,29 @@ class SSH(object):
return False
return True
def __str__(self):
def display(self, full=True):
out = '{0} '.format(self.vendor) if self.vendor else ''
out += self.product
if self.version:
out += ' {0}'.format(self.version)
patch = self.patch
if full:
patch = self.patch or ''
if self.product == SSH.Product.OpenSSH:
mx = re.match('^(p\d)(.*)$', self.patch)
mx = re.match('^(p\d)(.*)$', patch)
if mx is not None:
out += mx.group(1)
patch = mx.group(2).strip()
if patch:
out += ' ({0})'.format(self.patch)
out += ' ({0})'.format(patch)
if self.os:
out += ' running on {0}'.format(self.os)
return out
def __str__(self):
return self.display()
def __repr__(self):
out = 'vendor={0} '.format(self.vendor) if self.vendor else ''
out = 'vendor={0}'.format(self.vendor) if self.vendor else ''
if self.product:
if self.vendor:
out += ', '
@ -577,7 +715,7 @@ class SSH(object):
@staticmethod
def _fix_patch(patch):
return re.sub(r'^[-_\.]+', '', patch)
return re.sub(r'^[-_\.]+', '', patch) or None
@staticmethod
def _fix_date(d):
@ -628,6 +766,12 @@ class SSH(object):
v = None
os = cls._extract_os(banner.comments)
return cls(v, p, mx.group(1), patch, os)
mx = re.match(r'^libssh-([\d\.]+\d+)(.*)', software)
if mx:
patch = cls._fix_patch(mx.group(2))
v, p = None, SSH.Product.LibSSH
os = cls._extract_os(banner.comments)
return cls(v, p, mx.group(1), patch, os)
mx = re.match(r'^RomSShell_([\d\.]+\d+)(.*)', software)
if mx:
patch = cls._fix_patch(mx.group(2))
@ -644,8 +788,8 @@ class SSH(object):
return None
class Banner(object):
_RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-([^\s]*)(?:\s+(.*))?)?'
RX_PROTOCOL = re.compile(_RXP.replace('\d', '(\d)'))
_RXP, _RXR = r'SSH-\d\.\s*?\d+', r'(-\s*([^\s]*)(?:\s+(.*))?)?'
RX_PROTOCOL = re.compile(re.sub(r'\\d(\+?)', '(\\d\g<1>)', _RXP))
RX_BANNER = re.compile(r'^({0}(?:(?:-{0})*)){1}$'.format(_RXP, _RXR))
def __init__(self, protocol, software, comments):
@ -693,6 +837,8 @@ class SSH(object):
if software is None and (mx.group(2) or '').startswith('-'):
software = ''
comments = (mx.group(4) or '').strip() or None
if comments is not None:
comments = re.sub('\s+', ' ', comments)
return cls(protocol, software, comments)
class Fingerprint(object):
@ -714,21 +860,34 @@ class SSH(object):
class Security(object):
CVE = {
'Dropbear SSH': [
['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection.'],
['0.28', '2013.58', 1, 'CVE-2013-4434', 5.0, 'discover valid usernames through different time delays.'],
['0.28', '2013.58', 1, 'CVE-2013-4421', 5.0, 'cause DoS (memory consumption) via a compressed packet.'],
['0.52', '2011.54', 1, 'CVE-2012-0920', 7.1, 'execute arbitrary code or bypass command restrictions.'],
['0.40', '0.48.1', 1, 'CVE-2007-1099', 7.5, 'conduct a MitM attack (no warning for hostkey mismatch).'],
['0.28', '0.47', 1, 'CVE-2006-1206', 7.5, 'cause DoS (slot exhaustion) via large number of connections.'],
['0.39', '0.47', 1, 'CVE-2006-0225', 4.6, 'execute arbitrary commands via scp with crafted filenames.'],
['0.28', '0.46', 1, 'CVE-2005-4178', 6.5, 'execute arbitrary code via buffer overflow vulnerability.'],
['0.28', '0.42', 1, 'CVE-2004-2486', 7.5, 'execute arbitrary code via DSS verification code.'],
]
['0.44', '2015.71', 1, 'CVE-2016-3116', 5.5, 'bypass command restrictions via xauth command injection'],
['0.28', '2013.58', 1, 'CVE-2013-4434', 5.0, 'discover valid usernames through different time delays'],
['0.28', '2013.58', 1, 'CVE-2013-4421', 5.0, 'cause DoS (memory consumption) via a compressed packet'],
['0.52', '2011.54', 1, 'CVE-2012-0920', 7.1, 'execute arbitrary code or bypass command restrictions'],
['0.40', '0.48.1', 1, 'CVE-2007-1099', 7.5, 'conduct a MitM attack (no warning for hostkey mismatch)'],
['0.28', '0.47', 1, 'CVE-2006-1206', 7.5, 'cause DoS (slot exhaustion) via large number of connections'],
['0.39', '0.47', 1, 'CVE-2006-0225', 4.6, 'execute arbitrary commands via scp with crafted filenames'],
['0.28', '0.46', 1, 'CVE-2005-4178', 6.5, 'execute arbitrary code via buffer overflow vulnerability'],
['0.28', '0.42', 1, 'CVE-2004-2486', 7.5, 'execute arbitrary code via DSS verification code']],
'libssh': [
['0.1', '0.7.2', 1, 'CVE-2016-0739', 4.3, 'conduct a MitM attack (weakness in DH key generation)'],
['0.5.1', '0.6.4', 1, 'CVE-2015-3146', 5.0, 'cause DoS via kex packets (null pointer dereference)'],
['0.5.1', '0.6.3', 1, 'CVE-2014-8132', 5.0, 'cause DoS via kex init packet (dangling pointer)'],
['0.4.7', '0.6.2', 1, 'CVE-2014-0017', 1.9, 'leak data via PRNG state reuse on forking servers'],
['0.4.7', '0.5.3', 1, 'CVE-2013-0176', 4.3, 'cause DoS via kex packet (null pointer dereference)'],
['0.4.7', '0.5.2', 1, 'CVE-2012-6063', 7.5, 'cause DoS or execute arbitrary code via sftp (double free)'],
['0.4.7', '0.5.2', 1, 'CVE-2012-4562', 7.5, 'cause DoS or execute arbitrary code (overflow check)'],
['0.4.7', '0.5.2', 1, 'CVE-2012-4561', 5.0, 'cause DoS via unspecified vectors (invalid pointer)'],
['0.4.7', '0.5.2', 1, 'CVE-2012-4560', 7.5, 'cause DoS or execute arbitrary code (buffer overflow)'],
['0.4.7', '0.5.2', 1, 'CVE-2012-4559', 6.8, 'cause DoS or execute arbitrary code (double free)']]
}
TXT = {
'Dropbear SSH': [
['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387).'],
]
['0.28', '0.34', 1, 'remote root exploit', 'remote format string buffer overflow exploit (exploit-db#387)']],
'libssh': [
['0.3.3', '0.3.3', 1, 'null pointer check', 'missing null pointer check in "crypt_set_algorithms_server"'],
['0.3.3', '0.3.3', 1, 'integer overflow', 'integer overflow in "buffer_get_data"'],
['0.3.3', '0.3.3', 3, 'heap overflow', 'heap overflow in "packet_decrypt"']]
}
class Socket(ReadBuf, WriteBuf):
@ -960,29 +1119,29 @@ class KexDB(object):
ALGORITHMS = {
'kex': {
'diffie-hellman-group1-sha1': [['2.3.0,d0.28', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]],
'diffie-hellman-group14-sha1': [['3.9,d0.53'], [], [WARN_HASH_WEAK]],
'diffie-hellman-group1-sha1': [['2.3.0,d0.28,l10.2', '6.6', '6.9'], [FAIL_OPENSSH67_UNSAFE, FAIL_OPENSSH70_LOGJAM], [WARN_MODULUS_SIZE, WARN_HASH_WEAK]],
'diffie-hellman-group14-sha1': [['3.9,d0.53,l10.6.0'], [], [WARN_HASH_WEAK]],
'diffie-hellman-group14-sha256': [['7.3,d2016.73']],
'diffie-hellman-group16-sha512': [['7.3,d2016.73']],
'diffie-hellman-group18-sha512': [['7.3']],
'diffie-hellman-group-exchange-sha1': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_HASH_WEAK]],
'diffie-hellman-group-exchange-sha256': [['4.4'], [], [WARN_MODULUS_CUSTOM]],
'ecdh-sha2-nistp256': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
'ecdh-sha2-nistp256': [['5.7,d2013.62,l10.6.0'], [WARN_CURVES_WEAK]],
'ecdh-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
'ecdh-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK]],
'curve25519-sha256@libssh.org': [['6.5,d2013.62']],
'curve25519-sha256@libssh.org': [['6.5,d2013.62,l10.6.0']],
'kexguess2@matt.ucc.asn.au': [['d2013.57']],
},
'key': {
'rsa-sha2-256': [['7.2']],
'rsa-sha2-512': [['7.2']],
'ssh-ed25519': [['6.5']],
'ssh-ed25519': [['6.5,l10.7.0']],
'ssh-ed25519-cert-v01@openssh.com': [['6.5']],
'ssh-rsa': [['2.5.0,d0.28']],
'ssh-dss': [['2.1.0,d0.28', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp256': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp384': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp521': [['5.7,d2013.62'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ssh-rsa': [['2.5.0,d0.28,l10.2']],
'ssh-dss': [['2.1.0,d0.28,l10.2', '6.9'], [FAIL_OPENSSH70_WEAK], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp256': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp384': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ecdsa-sha2-nistp521': [['5.7,d2013.62,l10.6.4'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
'ssh-rsa-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], []],
'ssh-dss-cert-v00@openssh.com': [['5.4', '6.9'], [FAIL_OPENSSH70_LEGACY], [WARN_MODULUS_SIZE, WARN_RNDSIG_KEY]],
'ssh-rsa-cert-v01@openssh.com': [['5.6']],
@ -992,10 +1151,10 @@ class KexDB(object):
'ecdsa-sha2-nistp521-cert-v01@openssh.com': [['5.7'], [WARN_CURVES_WEAK], [WARN_RNDSIG_KEY]],
},
'enc': {
'none': [['1.2.2,d2013.56'], [FAIL_PLAINTEXT]],
'3des-cbc': [['1.2.2,d0.28', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]],
'none': [['1.2.2,d2013.56,l10.2'], [FAIL_PLAINTEXT]],
'3des-cbc': [['1.2.2,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_WEAK, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]],
'3des-ctr': [['d0.52']],
'blowfish-cbc': [['1.2.2,d0.28', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]],
'blowfish-cbc': [['1.2.2,d0.28,l10.2', '6.6,d0.52', '7.1,d0.52'], [FAIL_OPENSSH67_UNSAFE, FAIL_DBEAR53_DISABLED], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE, WARN_BLOCK_SIZE]],
'twofish-cbc': [['d0.28', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]],
'twofish128-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]],
'twofish256-cbc': [['d0.47', 'd2014.66'], [FAIL_DBEAR67_DISABLED], [WARN_CIPHER_MODE]],
@ -1005,27 +1164,27 @@ class KexDB(object):
'arcfour': [['2.1.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]],
'arcfour128': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]],
'arcfour256': [['4.2', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_WEAK]],
'aes128-cbc': [['2.3.0,d0.28', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'aes192-cbc': [['2.3.0', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'aes256-cbc': [['2.3.0,d0.47', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'aes128-cbc': [['2.3.0,d0.28,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'aes192-cbc': [['2.3.0,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'aes256-cbc': [['2.3.0,d0.47,l10.2', '6.6', None], [FAIL_OPENSSH67_UNSAFE], [WARN_CIPHER_MODE]],
'rijndael128-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]],
'rijndael192-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]],
'rijndael256-cbc': [['2.3.0', '3.0.2'], [FAIL_OPENSSH31_REMOVE], [WARN_CIPHER_MODE]],
'rijndael-cbc@lysator.liu.se': [['2.3.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_CIPHER_MODE]],
'aes128-ctr': [['3.7,d0.52']],
'aes192-ctr': [['3.7']],
'aes256-ctr': [['3.7,d0.52']],
'aes128-ctr': [['3.7,d0.52,l10.4.1']],
'aes192-ctr': [['3.7,l10.4.1']],
'aes256-ctr': [['3.7,d0.52,l10.4.1']],
'aes128-gcm@openssh.com': [['6.2']],
'aes256-gcm@openssh.com': [['6.2']],
'chacha20-poly1305@openssh.com': [['6.5'], [], [], [INFO_OPENSSH69_CHACHA]],
},
'mac': {
'none': [['d2013.56'], [FAIL_PLAINTEXT]],
'hmac-sha1': [['2.1.0,d0.28'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]],
'hmac-sha1': [['2.1.0,d0.28,l10.2'], [], [WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]],
'hmac-sha1-96': [['2.5.0,d0.47', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]],
'hmac-sha2-256': [['5.9,d2013.56'], [], [WARN_ENCRYPT_AND_MAC]],
'hmac-sha2-256': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]],
'hmac-sha2-256-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]],
'hmac-sha2-512': [['5.9,d2013.56'], [], [WARN_ENCRYPT_AND_MAC]],
'hmac-sha2-512': [['5.9,d2013.56,l10.7.0'], [], [WARN_ENCRYPT_AND_MAC]],
'hmac-sha2-512-96': [['5.9', '6.0'], [FAIL_OPENSSH61_REMOVE], [WARN_ENCRYPT_AND_MAC]],
'hmac-md5': [['2.1.0,d0.28', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]],
'hmac-md5-96': [['2.5.0', '6.6', '7.1'], [FAIL_OPENSSH67_UNSAFE], [WARN_OPENSSH72_LEGACY, WARN_ENCRYPT_AND_MAC, WARN_HASH_WEAK]],
@ -1049,6 +1208,8 @@ class KexDB(object):
def get_ssh_version(version_desc):
if version_desc.startswith('d'):
return (SSH.Product.DropbearSSH, version_desc[1:])
elif version_desc.startswith('l1'):
return (SSH.Product.LibSSH, version_desc[2:])
else:
return (SSH.Product.OpenSSH, version_desc)
@ -1091,8 +1252,10 @@ def get_alg_timeframe(alg_desc, for_server=True, result={}):
def get_ssh_timeframe(alg_pairs, for_server=True):
timeframe = {}
for alg_pair in alg_pairs:
alg_db, algs = alg_pair
for alg_type, alg_list in algs.items():
sshv, alg_db = alg_pair[0]
alg_sets = alg_pair[1:]
for alg_set in alg_sets:
alg_type, alg_list = alg_set
for alg_name in alg_list:
alg_desc = alg_db[alg_type].get(alg_name)
if alg_desc is None:
@ -1110,6 +1273,8 @@ def get_alg_since_text(alg_desc):
ssh_prefix, ssh_version = get_ssh_version(v)
if not ssh_version:
continue
if ssh_prefix in [SSH.Product.LibSSH]:
continue
if ssh_version.endswith('C'):
ssh_version = '{0} (client only)'.format(ssh_version[:-1])
tv.append('{0} {1}'.format(ssh_prefix, ssh_version))
@ -1118,6 +1283,117 @@ def get_alg_since_text(alg_desc):
return 'available since ' + ', '.join(tv).rstrip(', ')
def get_alg_pairs(kex, pkm):
alg_pairs = []
if pkm is not None:
alg_pairs.append(((1, SSH1.KexDB.ALGORITHMS),
('key', ['ssh-rsa1']),
('enc', pkm.supported_ciphers),
('aut', pkm.supported_authentications)))
if kex is not None:
alg_pairs.append(((2, KexDB.ALGORITHMS),
('kex', kex.kex_algorithms),
('key', kex.key_algorithms),
('enc', kex.server.encryption),
('mac', kex.server.mac)))
return alg_pairs
def get_alg_recommendations(software, kex, pkm, for_server=True):
alg_pairs = get_alg_pairs(kex, pkm)
vproducts = [SSH.Product.OpenSSH,
SSH.Product.DropbearSSH,
SSH.Product.LibSSH]
if software is not None:
if software.product not in vproducts:
software = None
if software is None:
ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server)
for product in vproducts:
if product not in ssh_timeframe:
continue
version = ssh_timeframe[product][0]
if version is not None:
software = SSH.Software(None, product, version, None, None)
break
rec = {'.software': software}
if software is None:
return rec
for alg_pair in alg_pairs:
sshv, alg_db = alg_pair[0]
alg_sets = alg_pair[1:]
rec[sshv] = {}
for alg_set in alg_sets:
alg_type, alg_list = alg_set
if alg_type == 'aut':
continue
rec[sshv][alg_type] = {'add': [], 'del': {}}
for n, alg_desc in alg_db[alg_type].items():
if alg_type == 'key' and '-cert-' in n:
continue
versions = alg_desc[0]
if len(versions) == 0 or versions[0] is None:
continue
matches = False
for v in versions[0].split(','):
ssh_prefix, ssh_version = get_ssh_version(v)
if not ssh_version:
continue
if ssh_prefix != software.product:
continue
if ssh_version.endswith('C'):
if for_server:
continue
ssh_version = ssh_version[:-1]
if software.compare_version(ssh_version) < 0:
continue
matches = True
break
if not matches:
continue
adl, faults = len(alg_desc), 0
for i in range(1, 3):
if not adl > i:
continue
fc = len(alg_desc[i])
if fc > 0:
faults += pow(10, 2 - i) * fc
if n not in alg_list:
if faults > 0:
continue
rec[sshv][alg_type]['add'].append(n)
else:
if faults == 0:
continue
if n == 'diffie-hellman-group-exchange-sha256':
if software.compare_version('7.3') < 0:
continue
rec[sshv][alg_type]['del'][n] = faults
add_count = len(rec[sshv][alg_type]['add'])
del_count = len(rec[sshv][alg_type]['del'])
new_alg_count = len(alg_list) + add_count - del_count
if new_alg_count < 1 and del_count > 0:
mf, new_del = min(rec[sshv][alg_type]['del'].values()), {}
for k, v in rec[sshv][alg_type]['del'].items():
if v != mf:
new_del[k] = v
if del_count != len(new_del):
rec[sshv][alg_type]['del'] = new_del
new_alg_count += del_count - len(new_del)
if new_alg_count < 1:
del rec[sshv][alg_type]
else:
if add_count == 0:
del rec[sshv][alg_type]['add']
if del_count == 0:
del rec[sshv][alg_type]['del']
if len(rec[sshv][alg_type]) == 0:
del rec[sshv][alg_type]
if len(rec[sshv]) == 0:
del rec[sshv]
return rec
def output_algorithms(title, alg_db, alg_type, algorithms, maxlen=0):
with OutputBuffer() as obuf:
for algorithm in algorithms:
@ -1134,6 +1410,8 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0):
alg_max_len = len(alg_name)
padding = '' if out.batch else ' ' * (alg_max_len - len(alg_name))
texts = []
if len(alg_name.strip()) == 0:
return
if alg_name in alg_db[alg_type]:
alg_desc = alg_db[alg_type][alg_name]
ldesc = len(alg_desc)
@ -1167,18 +1445,7 @@ def output_algorithm(alg_db, alg_type, alg_name, alg_max_len=0):
def output_compatibility(kex, pkm, for_server=True):
alg_pairs = []
if pkm is not None:
alg_pairs.append((SSH1.KexDB.ALGORITHMS,
{'key': ['ssh-rsa1'],
'enc': pkm.supported_ciphers,
'aut': pkm.supported_authentications}))
if kex is not None:
alg_pairs.append((KexDB.ALGORITHMS,
{'kex': kex.kex_algorithms,
'key': kex.key_algorithms,
'enc': kex.server.encryption,
'mac': kex.server.mac}))
alg_pairs = get_alg_pairs(kex, pkm)
ssh_timeframe = get_ssh_timeframe(alg_pairs, for_server)
vp = 1 if for_server else 2
comp_text = []
@ -1210,7 +1477,8 @@ def output_security_sub(sub, software, padlen):
continue
target, name = line[2:4]
is_server, is_client = target & 1 == 1, target & 2 == 2
if is_client:
is_local = target & 4 == 4
if not is_server:
continue
p = '' if out.batch else ' ' * (padlen - len(name))
if sub == 'cve':
@ -1252,6 +1520,38 @@ def output_fingerprint(kex, pkm, sha256=True, padlen=0):
out.sep()
def output_recommendations(software, kex, pkm, padlen=0):
for_server = True
with OutputBuffer() as obuf:
alg_rec = get_alg_recommendations(software, kex, pkm, for_server)
software = alg_rec['.software']
for sshv in range(2, 0, -1):
if sshv not in alg_rec:
continue
for alg_type in ['kex', 'key', 'enc', 'mac']:
if alg_type not in alg_rec[sshv]:
continue
for action in ['del', 'add']:
if action not in alg_rec[sshv][alg_type]:
continue
for name in alg_rec[sshv][alg_type][action]:
p = '' if out.batch else ' ' * (padlen - len(name))
if action == 'del':
an, sg, fn = 'remove', '-', out.warn
if alg_rec[sshv][alg_type][action][name] >= 10:
fn = out.fail
else:
an, sg, fn = 'append', '+', out.good
b = '(SSH{0})'.format(sshv) if sshv == 1 else ''
fm = '(rec) {0}{1}{2}-- {3} algorithm to {4} {5}'
fn(fm.format(sg, name, p, alg_type, an, b))
if len(obuf) > 0:
title = '(for {0})'.format(software.display(False)) if software else ''
out.head('# algorithm recommendations {0}'.format(title))
obuf.flush()
out.sep()
def output(banner, header, kex=None, pkm=None):
sshv = 1 if pkm else 2
with OutputBuffer() as obuf:
@ -1264,6 +1564,8 @@ def output(banner, header, kex=None, pkm=None):
software = SSH.Software.parse(banner)
if software is not None:
out.good('(gen) software: {0}'.format(software))
else:
software = None
output_compatibility(kex, pkm)
if kex is not None:
compressions = [x for x in kex.server.compression if x != 'none']
@ -1287,6 +1589,7 @@ def output(banner, header, kex=None, pkm=None):
ml(kex.server.encryption),
ml(kex.server.mac),
maxlen)
maxlen += 1
output_security(banner, maxlen)
if pkm is not None:
adb = SSH1.KexDB.ALGORITHMS
@ -1308,59 +1611,36 @@ def output(banner, header, kex=None, pkm=None):
output_algorithms(title, adb, atype, kex.server.encryption, maxlen)
title, atype = 'message authentication code algorithms', 'mac'
output_algorithms(title, adb, atype, kex.server.mac, maxlen)
output_recommendations(software, kex, pkm, maxlen)
output_fingerprint(kex, pkm, True, maxlen)
def parse_int(v):
class Utils(object):
PY2 = sys.version_info[0] == 2
@classmethod
def wrap(cls):
o = cls()
if cls.PY2:
import StringIO
o.StringIO = o.BytesIO = StringIO.StringIO
else:
o.StringIO, o.BytesIO = io.StringIO, io.BytesIO
return o
@staticmethod
def parse_int(v):
try:
return int(v)
except:
return 0
def parse_args():
conf = AuditConf()
try:
sopts = 'h12bnvl:'
lopts = ['help', 'ssh1', 'ssh2', 'batch', 'no-colors', 'verbose', 'level=']
opts, args = getopt.getopt(sys.argv[1:], sopts, lopts)
except getopt.GetoptError as err:
usage(str(err))
for o, a in opts:
if o in ('-h', '--help'):
usage()
elif o in ('-1', '--ssh1'):
conf.ssh1 = True
elif o in ('-2', '--ssh2'):
conf.ssh2 = True
elif o in ('-b', '--batch'):
out.batch = True
out.verbose = True
elif o in ('-n', '--no-colors'):
out.colors = False
elif o in ('-v', '--verbose'):
out.verbose = True
elif o in ('-l', '--level'):
if a not in ('info', 'warn', 'fail'):
usage('level ' + a + ' is not valid')
out.minlevel = a
if len(args) == 0:
usage()
s = args[0].split(':')
host, port = s[0].strip(), 22
if len(s) > 1:
port = parse_int(s[1])
if not host or port <= 0:
usage('port {0} is not valid'.format(port))
conf.host = host
conf.port = port
if not (conf.ssh1 or conf.ssh2):
conf.ssh1 = True
conf.ssh2 = True
return conf
def audit(conf, sshv=None):
out.batch = conf.batch
out.colors = conf.colors
out.verbose = conf.verbose
out.minlevel = conf.minlevel
s = SSH.Socket(conf.host, conf.port)
if sshv is None:
sshv = 2 if conf.ssh2 else 1
@ -1371,7 +1651,8 @@ def audit(conf, sshv=None):
if err is None:
packet_type, payload = s.read_packet(sshv)
if packet_type < 0:
if payload == b'Protocol major versions differ.':
payload = str(payload).decode('utf-8')
if payload == u'Protocol major versions differ.':
if sshv == 2 and conf.ssh1:
audit(conf, 1)
return
@ -1393,11 +1674,12 @@ def audit(conf, sshv=None):
pkm = SSH1.PublicKeyMessage.parse(payload)
output(banner, header, pkm=pkm)
elif sshv == 2:
kex = Kex.parse(payload)
kex = SSH2.Kex.parse(payload)
output(banner, header, kex=kex)
utils = Utils.wrap()
out = Output()
if __name__ == '__main__':
out = Output()
conf = parse_args()
conf = AuditConf.from_cmdline(sys.argv[1:], usage)
audit(conf)

View File

@ -1,10 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os, sys
import pytest
import pytest, os, sys, io
if sys.version_info[0] == 2:
import StringIO
StringIO = StringIO.StringIO
else:
StringIO = io.StringIO
@pytest.fixture(scope='module')
def ssh_audit():
__rdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
sys.path.append(os.path.abspath(__rdir))
return __import__('ssh-audit')
class _OutputSpy(list):
def begin(self):
self.__out = StringIO()
self.__old_stdout = sys.stdout
sys.stdout = self.__out
def flush(self):
lines = self.__out.getvalue().splitlines()
sys.stdout = self.__old_stdout
self.__out = None
return lines
@pytest.fixture(scope='module')
def output_spy():
return _OutputSpy()

115
test/test_auditconf.py Normal file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
class TestAuditConf(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf
self.usage = ssh_audit.usage
def _test_conf(self, conf, **kwargs):
options = {
'host': None,
'port': 22,
'ssh1': True,
'ssh2': True,
'batch': False,
'colors': True,
'verbose': False,
'minlevel': 'info'
}
for k, v in kwargs.items():
options[k] = v
assert conf.host == options['host']
assert conf.port == options['port']
assert conf.ssh1 is options['ssh1']
assert conf.ssh2 is options['ssh2']
assert conf.batch is options['batch']
assert conf.colors is options['colors']
assert conf.verbose is options['verbose']
assert conf.minlevel == options['minlevel']
def test_audit_conf_defaults(self):
conf = self.AuditConf()
self._test_conf(conf)
def test_audit_conf_booleans(self):
conf = self.AuditConf()
for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']:
for v in [True, 1]:
setattr(conf, p, v)
assert getattr(conf, p) is True
for v in [False, 0]:
setattr(conf, p, v)
assert getattr(conf, p) is False
def test_audit_conf_port(self):
conf = self.AuditConf()
for port in [22, 2222]:
conf.port = port
assert conf.port == port
for port in [-1, 0, 65536, 99999]:
with pytest.raises(ValueError) as excinfo:
conf.port = port
excinfo.match(r'.*invalid port.*')
def test_audit_conf_minlevel(self):
conf = self.AuditConf()
for level in ['info', 'warn', 'fail']:
conf.minlevel = level
assert conf.minlevel == level
for level in ['head', 'good', 'unknown', None]:
with pytest.raises(ValueError) as excinfo:
conf.minlevel = level
excinfo.match(r'.*invalid level.*')
def test_audit_conf_cmdline(self):
c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage)
with pytest.raises(SystemExit):
conf = c('')
with pytest.raises(SystemExit):
conf = c('-x')
with pytest.raises(SystemExit):
conf = c('-h')
with pytest.raises(SystemExit):
conf = c('--help')
with pytest.raises(SystemExit):
conf = c(':')
with pytest.raises(SystemExit):
conf = c(':22')
conf = c('localhost')
self._test_conf(conf, host='localhost')
conf = c('github.com')
self._test_conf(conf, host='github.com')
conf = c('localhost:2222')
self._test_conf(conf, host='localhost', port=2222)
with pytest.raises(SystemExit):
conf = c('localhost:')
with pytest.raises(SystemExit):
conf = c('localhost:abc')
with pytest.raises(SystemExit):
conf = c('localhost:-22')
with pytest.raises(SystemExit):
conf = c('localhost:99999')
conf = c('-1 localhost')
self._test_conf(conf, host='localhost', ssh1=True, ssh2=False)
conf = c('-2 localhost')
self._test_conf(conf, host='localhost', ssh1=False, ssh2=True)
conf = c('-12 localhost')
self._test_conf(conf, host='localhost', ssh1=True, ssh2=True)
conf = c('-b localhost')
self._test_conf(conf, host='localhost', batch=True, verbose=True)
conf = c('-n localhost')
self._test_conf(conf, host='localhost', colors=False)
conf = c('-v localhost')
self._test_conf(conf, host='localhost', verbose=True)
conf = c('-l info localhost')
self._test_conf(conf, host='localhost', minlevel='info')
conf = c('-l warn localhost')
self._test_conf(conf, host='localhost', minlevel='warn')
conf = c('-l fail localhost')
self._test_conf(conf, host='localhost', minlevel='fail')
with pytest.raises(SystemExit):
conf = c('-l something localhost')

68
test/test_banner.py Normal file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
class TestBanner(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
def test_simple_banners(self):
banner = lambda x: self.ssh.Banner.parse(x)
b = banner('SSH-2.0-OpenSSH_7.3')
assert b.protocol == (2, 0)
assert b.software == 'OpenSSH_7.3'
assert b.comments is None
assert str(b) == 'SSH-2.0-OpenSSH_7.3'
b = banner('SSH-1.99-Sun_SSH_1.1.3')
assert b.protocol == (1, 99)
assert b.software == 'Sun_SSH_1.1.3'
assert b.comments is None
assert str(b) == 'SSH-1.99-Sun_SSH_1.1.3'
b = banner('SSH-1.5-Cisco-1.25')
assert b.protocol == (1, 5)
assert b.software == 'Cisco-1.25'
assert b.comments is None
assert str(b) == 'SSH-1.5-Cisco-1.25'
def test_invalid_banners(self):
b = lambda x: self.ssh.Banner.parse(x)
assert b('Something') is None
assert b('SSH-XXX-OpenSSH_7.3') is None
def test_banners_with_spaces(self):
b = lambda x: self.ssh.Banner.parse(x)
s = 'SSH-2.0-OpenSSH_4.3p2'
assert str(b('SSH-2.0-OpenSSH_4.3p2 ')) == s
assert str(b('SSH-2.0- OpenSSH_4.3p2')) == s
assert str(b('SSH-2.0- OpenSSH_4.3p2 ')) == s
s = 'SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu'
assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu')) == s
assert str(b('SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s
assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s
def test_banners_without_software(self):
b = lambda x: self.ssh.Banner.parse(x)
assert b('SSH-2.0').protocol == (2, 0)
assert b('SSH-2.0').software is None
assert b('SSH-2.0').comments is None
assert str(b('SSH-2.0')) == 'SSH-2.0'
assert b('SSH-2.0-').protocol == (2, 0)
assert b('SSH-2.0-').software == ''
assert b('SSH-2.0-').comments is None
assert str(b('SSH-2.0-')) == 'SSH-2.0-'
def test_banners_with_comments(self):
b = lambda x: self.ssh.Banner.parse(x)
assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '<Banner(protocol=2.0, software=OpenSSH_7.2p2, comments=Ubuntu-1)>'
assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '<Banner(protocol=1.99, software=OpenSSH_3.4p1, comments=Debian 1:3.4p1-1.woody.3)>'
assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '<Banner(protocol=1.5, software=1.3.7, comments=F-SECURE SSH)>'
def test_banners_with_multiple_protocols(self):
b = lambda x: self.ssh.Banner.parse(x)
assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2'
assert str(b('SSH-2.0-SSH-2.0-OpenSSH_4.3p2 Debian-9')) == 'SSH-2.0-OpenSSH_4.3p2 Debian-9'
assert str(b('SSH-1.99-SSH-2.0-dropbear_0.5')) == 'SSH-1.99-dropbear_0.5'
assert str(b('SSH-2.0-SSH-1.99-OpenSSH_4.2p1 SSH Secure Shell (non-commercial)')) == 'SSH-1.99-OpenSSH_4.2p1 SSH Secure Shell (non-commercial)'
assert str(b('SSH-1.99-SSH-1.99-SSH-1.99-OpenSSH_3.9p1')) == 'SSH-1.99-OpenSSH_3.9p1'

117
test/test_buffer.py Normal file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import re
class TestBuffer(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
def _b(self, v):
v = re.sub(r'\s', '', v)
data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
return bytes(bytearray(data))
def test_unread(self):
w = self.wbuf().write_byte(1).write_int(2).write_flush()
r = self.rbuf(w)
assert r.unread_len == 5
r.read_byte()
assert r.unread_len == 4
r.read_int()
assert r.unread_len == 0
def test_byte(self):
w = lambda x: self.wbuf().write_byte(x).write_flush()
r = lambda x: self.rbuf(x).read_byte()
tc = [(0x00, '00'),
(0x01, '01'),
(0x10, '10'),
(0xff, 'ff')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_bool(self):
w = lambda x: self.wbuf().write_bool(x).write_flush()
r = lambda x: self.rbuf(x).read_bool()
tc = [(True, '01'),
(False, '00')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_int(self):
w = lambda x: self.wbuf().write_int(x).write_flush()
r = lambda x: self.rbuf(x).read_int()
tc = [(0x00, '00 00 00 00'),
(0x01, '00 00 00 01'),
(0xabcd, '00 00 ab cd'),
(0xffffffff, 'ff ff ff ff')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_string(self):
w = lambda x: self.wbuf().write_string(x).write_flush()
r = lambda x: self.rbuf(x).read_string()
tc = [(u'abc1', '00 00 00 04 61 62 63 31'),
(b'abc2', '00 00 00 04 61 62 63 32')]
for p in tc:
v = p[0]
assert w(v) == self._b(p[1])
if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8'))
assert r(self._b(p[1])) == v
def test_list(self):
w = lambda x: self.wbuf().write_list(x).write_flush()
r = lambda x: self.rbuf(x).read_list()
tc = [(['d', 'ef', 'ault'], '00 00 00 09 64 2c 65 66 2c 61 75 6c 74')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_line(self):
w = lambda x: self.wbuf().write_line(x).write_flush()
r = lambda x: self.rbuf(x).read_line()
tc = [(u'example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_bitlen(self):
class Py26Int(int):
def bit_length(self):
raise AttributeError
assert self.wbuf._bitlength(42) == 6
assert self.wbuf._bitlength(Py26Int(42)) == 6
def test_mpint1(self):
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush()
mpint1r = lambda x: self.rbuf(x).read_mpint1()
tc = [(0x0, '00 00'),
(0x1234, '00 0d 12 34'),
(0x12345, '00 11 01 23 45'),
(0xdeadbeef, '00 20 de ad be ef')]
for p in tc:
assert mpint1w(p[0]) == self._b(p[1])
assert mpint1r(self._b(p[1])) == p[0]
def test_mpint2(self):
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush()
mpint2r = lambda x: self.rbuf(x).read_mpint2()
tc = [(0x0, '00 00 00 00'),
(0x80, '00 00 00 02 00 80'),
(0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'),
(-0x1234, '00 00 00 02 ed cc'),
(-0xdeadbeef, '00 00 00 05 ff 21 52 41 11'),
(-0x8000, '00 00 00 02 80 00'),
(-0x80, '00 00 00 01 80')]
for p in tc:
assert mpint2w(p[0]) == self._b(p[1])
assert mpint2r(self._b(p[1])) == p[0]
assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80

172
test/test_output.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import pytest, io, sys
class TestOutput(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.Output = ssh_audit.Output
self.OutputBuffer = ssh_audit.OutputBuffer
def test_output_buffer_no_lines(self, output_spy):
output_spy.begin()
with self.OutputBuffer() as obuf:
pass
assert output_spy.flush() == []
output_spy.begin()
with self.OutputBuffer() as obuf:
pass
obuf.flush()
assert output_spy.flush() == []
def test_output_buffer_no_flush(self, output_spy):
output_spy.begin()
with self.OutputBuffer() as obuf:
print(u'abc')
assert output_spy.flush() == []
def test_output_buffer_flush(self, output_spy):
output_spy.begin()
with self.OutputBuffer() as obuf:
print(u'abc')
print()
print(u'def')
obuf.flush()
assert output_spy.flush() == [u'abc', u'', u'def']
def test_output_defaults(self):
out = self.Output()
# default: on
assert out.batch is False
assert out.colors is True
assert out.minlevel == 'info'
def test_output_colors(self, output_spy):
out = self.Output()
# test without colors
out.colors = False
output_spy.begin()
out.info('info color')
assert output_spy.flush() == [u'info color']
output_spy.begin()
out.head('head color')
assert output_spy.flush() == [u'head color']
output_spy.begin()
out.good('good color')
assert output_spy.flush() == [u'good color']
output_spy.begin()
out.warn('warn color')
assert output_spy.flush() == [u'warn color']
output_spy.begin()
out.fail('fail color')
assert output_spy.flush() == [u'fail color']
# test with colors
out.colors = True
output_spy.begin()
out.info('info color')
assert output_spy.flush() == [u'info color']
output_spy.begin()
out.head('head color')
assert output_spy.flush() == [u'\x1b[0;36mhead color\x1b[0m']
output_spy.begin()
out.good('good color')
assert output_spy.flush() == [u'\x1b[0;32mgood color\x1b[0m']
output_spy.begin()
out.warn('warn color')
assert output_spy.flush() == [u'\x1b[0;33mwarn color\x1b[0m']
output_spy.begin()
out.fail('fail color')
assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m']
def test_output_sep(self, output_spy):
out = self.Output()
output_spy.begin()
out.sep()
out.sep()
out.sep()
assert output_spy.flush() == [u'', u'', u'']
def test_output_levels(self):
out = self.Output()
assert out.getlevel('info') == 0
assert out.getlevel('good') == 0
assert out.getlevel('warn') == 1
assert out.getlevel('fail') == 2
assert out.getlevel('unknown') > 2
def test_output_minlevel_property(self):
out = self.Output()
out.minlevel = 'info'
assert out.minlevel == 'info'
out.minlevel = 'good'
assert out.minlevel == 'info'
out.minlevel = 'warn'
assert out.minlevel == 'warn'
out.minlevel = 'fail'
assert out.minlevel == 'fail'
out.minlevel = 'invalid level'
assert out.minlevel == 'unknown'
def test_output_minlevel(self, output_spy):
out = self.Output()
# visible: all
out.minlevel = 'info'
output_spy.begin()
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 5
# visible: head, warn, fail
out.minlevel = 'warn'
output_spy.begin()
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 3
# visible: head, fail
out.minlevel = 'fail'
output_spy.begin()
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 2
# visible: head
out.minlevel = 'invalid level'
output_spy.begin()
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 1
def test_output_batch(self, output_spy):
out = self.Output()
# visible: all
output_spy.begin()
out.minlevel = 'info'
out.batch = False
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 5
# visible: all except head
output_spy.begin()
out.minlevel = 'info'
out.batch = True
out.info('info color')
out.head('head color')
out.good('good color')
out.warn('warn color')
out.fail('fail color')
assert len(output_spy.flush()) == 4

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import re
class TestProtocol(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
def _b(self, v):
v = re.sub(r'\s', '', v)
data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
return bytes(bytearray(data))
def test_mpint1(self):
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush()
mpint1r = lambda x: self.rbuf(x).read_mpint1()
tc = [(0x0, '00 00'),
(0x1234, '00 0d 12 34'),
(0x12345, '00 11 01 23 45'),
(0xdeadbeef, '00 20 de ad be ef')]
for p in tc:
assert mpint1w(p[0]) == self._b(p[1])
assert mpint1r(self._b(p[1])) == p[0]
def test_mpint2(self):
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush()
mpint2r = lambda x: self.rbuf(x).read_mpint2()
tc = [(0x0, '00 00 00 00'),
(0x80, '00 00 00 02 00 80'),
(0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'),
(-0x1234, '00 00 00 02 ed cc'),
(-0xdeadbeef, '00 00 00 05 ff 21 52 41 11'),
(-0x8000, '00 00 00 02 80 00'),
(-0x80, '00 00 00 01 80')]
for p in tc:
assert mpint2w(p[0]) == self._b(p[1])
assert mpint2r(self._b(p[1])) == p[0]
assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80

285
test/test_software.py Normal file
View File

@ -0,0 +1,285 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
class TestSoftware(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
def test_unknown_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
assert ps('SSH-1.5') is None
assert ps('SSH-1.99-AlfaMegaServer') is None
assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None
def test_openssh_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-2.0-OpenSSH_7.3')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '7.3'
assert s.patch is None
assert s.os is None
assert str(s) == 'OpenSSH 7.3'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(product=OpenSSH, version=7.3)>'
# common, portable
s = ps('SSH-2.0-OpenSSH_7.2p1')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '7.2'
assert s.patch == 'p1'
assert s.os is None
assert str(s) == 'OpenSSH 7.2p1'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 7.2'
assert repr(s) == '<Software(product=OpenSSH, version=7.2, patch=p1)>'
# dot instead of underline
s = ps('SSH-2.0-OpenSSH.6.6')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '6.6'
assert s.patch is None
assert s.os is None
assert str(s) == 'OpenSSH 6.6'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(product=OpenSSH, version=6.6)>'
# dash instead of underline
s = ps('SSH-2.0-OpenSSH-3.9p1')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '3.9'
assert s.patch == 'p1'
assert s.os is None
assert str(s) == 'OpenSSH 3.9p1'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 3.9'
assert repr(s) == '<Software(product=OpenSSH, version=3.9, patch=p1)>'
# patch prefix with dash
s = ps('SSH-2.0-OpenSSH_7.2-hpn14v5')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '7.2'
assert s.patch == 'hpn14v5'
assert s.os is None
assert str(s) == 'OpenSSH 7.2 (hpn14v5)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 7.2'
assert repr(s) == '<Software(product=OpenSSH, version=7.2, patch=hpn14v5)>'
# patch prefix with underline
s = ps('SSH-1.5-OpenSSH_6.6.1_hpn13v11')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '6.6.1'
assert s.patch == 'hpn13v11'
assert s.os is None
assert str(s) == 'OpenSSH 6.6.1 (hpn13v11)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 6.6.1'
assert repr(s) == '<Software(product=OpenSSH, version=6.6.1, patch=hpn13v11)>'
# patch prefix with dot
s = ps('SSH-2.0-OpenSSH_5.9.CASPUR')
assert s.vendor is None
assert s.product == 'OpenSSH'
assert s.version == '5.9'
assert s.patch == 'CASPUR'
assert s.os is None
assert str(s) == 'OpenSSH 5.9 (CASPUR)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 5.9'
assert repr(s) == '<Software(product=OpenSSH, version=5.9, patch=CASPUR)>'
def test_dropbear_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-2.0-dropbear_2016.74')
assert s.vendor is None
assert s.product == 'Dropbear SSH'
assert s.version == '2016.74'
assert s.patch is None
assert s.os is None
assert str(s) == 'Dropbear SSH 2016.74'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(product=Dropbear SSH, version=2016.74)>'
# common, patch
s = ps('SSH-2.0-dropbear_0.44test4')
assert s.vendor is None
assert s.product == 'Dropbear SSH'
assert s.version == '0.44'
assert s.patch == 'test4'
assert s.os is None
assert str(s) == 'Dropbear SSH 0.44 (test4)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'Dropbear SSH 0.44'
assert repr(s) == '<Software(product=Dropbear SSH, version=0.44, patch=test4)>'
# patch prefix with dash
s = ps('SSH-2.0-dropbear_0.44-Freesco-p49')
assert s.vendor is None
assert s.product == 'Dropbear SSH'
assert s.version == '0.44'
assert s.patch == 'Freesco-p49'
assert s.os is None
assert str(s) == 'Dropbear SSH 0.44 (Freesco-p49)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'Dropbear SSH 0.44'
assert repr(s) == '<Software(product=Dropbear SSH, version=0.44, patch=Freesco-p49)>'
# patch prefix with underline
s = ps('SSH-2.0-dropbear_2014.66_agbn_1')
assert s.vendor is None
assert s.product == 'Dropbear SSH'
assert s.version == '2014.66'
assert s.patch == 'agbn_1'
assert s.os is None
assert str(s) == 'Dropbear SSH 2014.66 (agbn_1)'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == 'Dropbear SSH 2014.66'
assert repr(s) == '<Software(product=Dropbear SSH, version=2014.66, patch=agbn_1)>'
def test_libssh_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-2.0-libssh-0.2')
assert s.vendor is None
assert s.product == 'libssh'
assert s.version == '0.2'
assert s.patch is None
assert s.os is None
assert str(s) == 'libssh 0.2'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(product=libssh, version=0.2)>'
s = ps('SSH-2.0-libssh-0.7.3')
assert s.vendor is None
assert s.product == 'libssh'
assert s.version == '0.7.3'
assert s.patch is None
assert s.os is None
assert str(s) == 'libssh 0.7.3'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(product=libssh, version=0.7.3)>'
def test_romsshell_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-2.0-RomSShell_5.40')
assert s.vendor == 'Allegro Software'
assert s.product == 'RomSShell'
assert s.version == '5.40'
assert s.patch is None
assert s.os is None
assert str(s) == 'Allegro Software RomSShell 5.40'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=Allegro Software, product=RomSShell, version=5.40)>'
def test_hp_ilo_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-2.0-mpSSH_0.2.1')
assert s.vendor == 'HP'
assert s.product == 'iLO (Integrated Lights-Out) sshd'
assert s.version == '0.2.1'
assert s.patch is None
assert s.os is None
assert str(s) == 'HP iLO (Integrated Lights-Out) sshd 0.2.1'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=HP, product=iLO (Integrated Lights-Out) sshd, version=0.2.1)>'
def test_cisco_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# common
s = ps('SSH-1.5-Cisco-1.25')
assert s.vendor == 'Cisco'
assert s.product == 'IOS/PIX sshd'
assert s.version == '1.25'
assert s.patch is None
assert s.os is None
assert str(s) == 'Cisco IOS/PIX sshd 1.25'
assert str(s) == s.display()
assert s.display(True) == str(s)
assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=Cisco, product=IOS/PIX sshd, version=1.25)>'
def test_sofware_os(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x))
# unknown
s = ps('SSH-2.0-OpenSSH_3.7.1 MegaOperatingSystem 123')
assert s.os is None
# NetBSD
s = ps('SSH-1.99-OpenSSH_2.5.1 NetBSD_Secure_Shell-20010614')
assert s.os == 'NetBSD (2001-06-14)'
assert str(s) == 'OpenSSH 2.5.1 running on NetBSD (2001-06-14)'
assert repr(s) == '<Software(product=OpenSSH, version=2.5.1, os=NetBSD (2001-06-14))>'
s = ps('SSH-1.99-OpenSSH_5.0 NetBSD_Secure_Shell-20080403+-hpn13v1')
assert s.os == 'NetBSD (2008-04-03)'
assert str(s) == 'OpenSSH 5.0 running on NetBSD (2008-04-03)'
assert repr(s) == '<Software(product=OpenSSH, version=5.0, os=NetBSD (2008-04-03))>'
s = ps('SSH-2.0-OpenSSH_6.6.1_hpn13v11 NetBSD-20100308')
assert s.os == 'NetBSD (2010-03-08)'
assert str(s) == 'OpenSSH 6.6.1 (hpn13v11) running on NetBSD (2010-03-08)'
assert repr(s) == '<Software(product=OpenSSH, version=6.6.1, patch=hpn13v11, os=NetBSD (2010-03-08))>'
s = ps('SSH-2.0-OpenSSH_4.4 NetBSD')
assert s.os == 'NetBSD'
assert str(s) == 'OpenSSH 4.4 running on NetBSD'
assert repr(s) == '<Software(product=OpenSSH, version=4.4, os=NetBSD)>'
s = ps('SSH-2.0-OpenSSH_3.0.2 NetBSD Secure Shell')
assert s.os == 'NetBSD'
assert str(s) == 'OpenSSH 3.0.2 running on NetBSD'
assert repr(s) == '<Software(product=OpenSSH, version=3.0.2, os=NetBSD)>'
# FreeBSD
s = ps('SSH-2.0-OpenSSH_7.2 FreeBSD-20160310')
assert s.os == 'FreeBSD (2016-03-10)'
assert str(s) == 'OpenSSH 7.2 running on FreeBSD (2016-03-10)'
assert repr(s) == '<Software(product=OpenSSH, version=7.2, os=FreeBSD (2016-03-10))>'
s = ps('SSH-1.99-OpenSSH_2.9 FreeBSD localisations 20020307')
assert s.os == 'FreeBSD (2002-03-07)'
assert str(s) == 'OpenSSH 2.9 running on FreeBSD (2002-03-07)'
assert repr(s) == '<Software(product=OpenSSH, version=2.9, os=FreeBSD (2002-03-07))>'
s = ps('SSH-2.0-OpenSSH_2.3.0 green@FreeBSD.org 20010321')
assert s.os == 'FreeBSD (2001-03-21)'
assert str(s) == 'OpenSSH 2.3.0 running on FreeBSD (2001-03-21)'
assert repr(s) == '<Software(product=OpenSSH, version=2.3.0, os=FreeBSD (2001-03-21))>'
s = ps('SSH-1.99-OpenSSH_4.4p1 FreeBSD-openssh-portable-overwrite-base-4.4.p1_1,1')
assert s.os == 'FreeBSD'
assert str(s) == 'OpenSSH 4.4p1 running on FreeBSD'
assert repr(s) == '<Software(product=OpenSSH, version=4.4, patch=p1, os=FreeBSD)>'
s = ps('SSH-2.0-OpenSSH_7.2-OVH-rescue FreeBSD')
assert s.os == 'FreeBSD'
assert str(s) == 'OpenSSH 7.2 (OVH-rescue) running on FreeBSD'
assert repr(s) == '<Software(product=OpenSSH, version=7.2, patch=OVH-rescue, os=FreeBSD)>'
# Windows
s = ps('SSH-2.0-OpenSSH_3.7.1 in RemotelyAnywhere 5.21.422')
assert s.os == 'Microsoft Windows (RemotelyAnywhere 5.21.422)'
assert str(s) == 'OpenSSH 3.7.1 running on Microsoft Windows (RemotelyAnywhere 5.21.422)'
assert repr(s) == '<Software(product=OpenSSH, version=3.7.1, os=Microsoft Windows (RemotelyAnywhere 5.21.422))>'
s = ps('SSH-2.0-OpenSSH_3.8 in DesktopAuthority 7.1.091')
assert s.os == 'Microsoft Windows (DesktopAuthority 7.1.091)'
assert str(s) == 'OpenSSH 3.8 running on Microsoft Windows (DesktopAuthority 7.1.091)'
assert repr(s) == '<Software(product=OpenSSH, version=3.8, os=Microsoft Windows (DesktopAuthority 7.1.091))>'
s = ps('SSH-2.0-OpenSSH_3.8 in RemoteSupportManager 1.0.023')
assert s.os == 'Microsoft Windows (RemoteSupportManager 1.0.023)'
assert str(s) == 'OpenSSH 3.8 running on Microsoft Windows (RemoteSupportManager 1.0.023)'
assert repr(s) == '<Software(product=OpenSSH, version=3.8, os=Microsoft Windows (RemoteSupportManager 1.0.023))>'

74
test/test_ssh1.py Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
class TestSSH1(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
self.ssh1 = ssh_audit.SSH1
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
def test_crc32(self):
assert self.ssh1.crc32(b'') == 0x00
assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808
def _server_key(self):
return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551)
def _host_key(self):
return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b)
def _pkm_payload(self):
w = self.wbuf()
w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
b, e, m = self._server_key()
w.write_int(b).write_mpint1(e).write_mpint1(m)
b, e, m = self._host_key()
w.write_int(b).write_mpint1(e).write_mpint1(m)
w.write_int(2)
w.write_int(72)
w.write_int(36)
return w.write_flush()
def test_fingerprint(self):
b, e, m = self._host_key()
fpd = self.wbuf._create_mpint(m, False)
fpd += self.wbuf._create_mpint(e, False)
fp = self.ssh.Fingerprint(fpd)
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def test_pkm_read(self):
pkm = self.ssh1.PublicKeyMessage.parse(self._pkm_payload())
assert pkm is not None
assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
b, e, m = self._server_key()
assert pkm.server_key_bits == b
assert pkm.server_key_public_exponent == e
assert pkm.server_key_public_modulus == m
b, e, m = self._host_key()
assert pkm.host_key_bits == b
assert pkm.host_key_public_exponent == e
assert pkm.host_key_public_modulus == m
fp = self.ssh.Fingerprint(pkm.host_key_fingerprint_data)
assert pkm.protocol_flags == 2
assert pkm.supported_ciphers_mask == 72
assert pkm.supported_ciphers == ['3des', 'blowfish']
assert pkm.supported_authentications_mask == 36
assert pkm.supported_authentications == ['rsa', 'tis']
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def test_pkm_payload(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
skey = self._server_key()
hkey = self._host_key()
pflags = 2
cmask = 72
amask = 36
pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload())
assert pkm1.payload == pkm2.payload

48
test/test_ssh2.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
class TestSSH2(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.ssh = ssh_audit.SSH
self.ssh2 = ssh_audit.SSH2
self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf
def _kex_payload(self):
w = self.wbuf()
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
w.write_list([u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1'])
w.write_list([u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519'])
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
w.write_list([u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc'])
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
w.write_list([u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1'])
w.write_list([u'none', u'zlib@openssh.com'])
w.write_list([u'none', u'zlib@openssh.com'])
w.write_list([u''])
w.write_list([u''])
w.write_byte(False)
w.write_int(0)
return w.write_flush()
def test_kex_read(self):
kex = self.ssh2.Kex.parse(self._kex_payload())
assert kex is not None
assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
assert kex.kex_algorithms == [u'curve25519-sha256@libssh.org', u'ecdh-sha2-nistp256', u'ecdh-sha2-nistp384', u'ecdh-sha2-nistp521', u'diffie-hellman-group-exchange-sha256', u'diffie-hellman-group14-sha1']
assert kex.key_algorithms == [u'ssh-rsa', u'rsa-sha2-512', u'rsa-sha2-256', u'ssh-ed25519']
assert kex.client is not None
assert kex.server is not None
assert kex.client.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
assert kex.server.encryption == [u'chacha20-poly1305@openssh.com', u'aes128-ctr', u'aes192-ctr', u'aes256-ctr', u'aes128-gcm@openssh.com', u'aes256-gcm@openssh.com', u'aes128-cbc', u'aes192-cbc', u'aes256-cbc']
assert kex.client.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
assert kex.server.mac == [u'umac-64-etm@openssh.com', u'umac-128-etm@openssh.com', u'hmac-sha2-256-etm@openssh.com', u'hmac-sha2-512-etm@openssh.com', u'hmac-sha1-etm@openssh.com', u'umac-64@openssh.com', u'umac-128@openssh.com', u'hmac-sha2-256', u'hmac-sha2-512', u'hmac-sha1']
assert kex.client.compression == [u'none', u'zlib@openssh.com']
assert kex.server.compression == [u'none', u'zlib@openssh.com']
assert kex.client.languages == [u'']
assert kex.server.languages == [u'']
assert kex.follows is False
assert kex.unused == 0

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import pytest
class TestVersionCompare(object):
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
@ -15,34 +16,69 @@ class TestVersionCompare(object):
b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v))
return self.ssh.Software.parse(b)
def get_libssh_software(self, v):
b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v))
return self.ssh.Software.parse(b)
def test_dropbear_compare_version_pre_years(self):
s = self.get_dropbear_software('0.44')
assert s.compare_version(None) == 1
assert s.compare_version('') == 1
assert s.compare_version('0.43') > 0
assert s.compare_version('0.44') == 0
assert s.compare_version(s) == 0
assert s.compare_version('0.45') < 0
assert s.between_versions('0.43', '0.45') == True
assert s.between_versions('0.43', '0.45')
assert s.between_versions('0.43', '0.43') is False
assert s.between_versions('0.45', '0.43') is False
def test_dropbear_compare_version_with_years(self):
s = self.get_dropbear_software('2015.71')
assert s.compare_version('2014.67') > 0
assert s.compare_version(None) == 1
assert s.compare_version('') == 1
assert s.compare_version('2014.66') > 0
assert s.compare_version('2015.71') == 0
assert s.compare_version(s) == 0
assert s.compare_version('2016.74') < 0
assert s.between_versions('2014.67', '2016.74') == True
assert s.between_versions('2014.66', '2016.74')
assert s.between_versions('2014.66', '2015.69') is False
assert s.between_versions('2016.74', '2014.66') is False
def test_dropbear_compare_version_mixed(self):
s = self.get_dropbear_software('0.53.1')
assert s.compare_version(None) == 1
assert s.compare_version('') == 1
assert s.compare_version('0.53') > 0
assert s.compare_version('0.53.1') == 0
assert s.compare_version(s) == 0
assert s.compare_version('2011.54') < 0
assert s.between_versions('0.53', '2011.54') == True
assert s.between_versions('0.53', '2011.54')
assert s.between_versions('0.53', '0.53') is False
assert s.between_versions('2011.54', '0.53') is False
def test_dropbear_compare_version_patchlevel(self):
s1 = self.get_dropbear_software('0.44')
s2 = self.get_dropbear_software('0.44test3')
assert s1.compare_version(None) == 1
assert s1.compare_version('') == 1
assert s1.compare_version('0.44') == 0
assert s1.compare_version(s1) == 0
assert s1.compare_version('0.43') > 0
assert s1.compare_version('0.44test4') > 0
assert s1.between_versions('0.44test4', '0.45')
assert s1.between_versions('0.43', '0.44test4') is False
assert s1.between_versions('0.45', '0.44test4') is False
assert s2.compare_version(None) == 1
assert s2.compare_version('') == 1
assert s2.compare_version('0.44test3') == 0
assert s2.compare_version(s2) == 0
assert s2.compare_version('0.44') < 0
assert s2.compare_version('0.44test4') < 0
assert s2.between_versions('0.43', '0.44')
assert s2.between_versions('0.43', '0.44test2') is False
assert s2.between_versions('0.44', '0.43') is False
assert s1.compare_version(s2) > 0
assert s2.compare_version(s1) < 0
def test_dropbear_compare_version_sequential(self):
versions = []
@ -82,20 +118,28 @@ class TestVersionCompare(object):
def test_openssh_compare_version_simple(self):
s = self.get_openssh_software('3.7.1')
assert s.compare_version(None) == 1
assert s.compare_version('') == 1
assert s.compare_version('3.7') > 0
assert s.compare_version('3.7.1') == 0
assert s.compare_version(s) == 0
assert s.compare_version('3.8') < 0
assert s.between_versions('3.7', '3.8') == True
assert s.between_versions('3.7', '3.8')
assert s.between_versions('3.6', '3.7') is False
assert s.between_versions('3.8', '3.7') is False
def test_openssh_compare_version_patchlevel(self):
s1 = self.get_openssh_software('2.1.1')
s2 = self.get_openssh_software('2.1.1p2')
assert s1.compare_version(s1) == 0
assert s2.compare_version(s2) == 0
assert s1.compare_version('2.1.1p1') == 0
assert s1.compare_version('2.1.1p2') == 0
assert s2.compare_version('2.1.1') == 0
assert s2.compare_version('2.1.1p1') > 0
assert s2.compare_version('2.1.1p3') < 0
assert s1.compare_version(s2) == 0
assert s2.compare_version(s1) == 0
def test_openbsd_compare_version_sequential(self):
versions = []
@ -130,3 +174,41 @@ class TestVersionCompare(object):
if i + 1 < l:
vnext = versions[i + 1]
assert s.compare_version(vnext) < 0
def test_libssh_compare_version_simple(self):
s = self.get_libssh_software('0.3')
assert s.compare_version(None) == 1
assert s.compare_version('') == 1
assert s.compare_version('0.2') > 0
assert s.compare_version('0.3') == 0
assert s.compare_version(s) == 0
assert s.compare_version('0.3.1') < 0
assert s.between_versions('0.2', '0.3.1')
assert s.between_versions('0.1', '0.2') is False
assert s.between_versions('0.3.1', '0.2') is False
def test_libssh_compare_version_sequential(self):
versions = []
for v in ['0.2', '0.3']:
versions.append(v)
for i in range(1, 5):
versions.append('0.3.{0}'.format(i))
for i in range(0, 9):
versions.append('0.4.{0}'.format(i))
for i in range(0, 6):
versions.append('0.5.{0}'.format(i))
for i in range(0, 6):
versions.append('0.6.{0}'.format(i))
for i in range(0, 4):
versions.append('0.7.{0}'.format(i))
l = len(versions)
for i in range(l):
v = versions[i]
s = self.get_libssh_software(v)
assert s.compare_version(v) == 0
if i - 1 >= 0:
vbefore = versions[i - 1]
assert s.compare_version(vbefore) > 0
if i + 1 < l:
vnext = versions[i + 1]
assert s.compare_version(vnext) < 0