Flake8 fixes (#35)

* Apply Flake8 also on `setup.py`

modified:   tox.ini

* Fix W605 - invalid escape syntax

modified:   packages/setup.py
modified:   tox.ini

* Update comment about Flake8: W504

W503 and W504 are mutual exclusive - so we have to keep one of them.

modified:   tox.ini

* Fix F841 - variable assigned but never used

modified:   ssh-audit.py
modified:   tox.ini

* Fix E741 - ambiguous variable name 'l'

modified:   ssh-audit.py
modified:   tox.ini

* Fix E712 - comparison to False should be 'if cond is False'

... and not 'if conf == False'.

modified:   ssh-audit.py
modified:   tox.ini

* Fix E711 - comparison to None should be 'if cond is not None'

... and not 'if cond != None'.

modified:   ssh-audit.py
modified:   tox.ini

* Fix E305 - expected 2 blank lines

... after class or function definition, found 1.

modified:   ssh-audit.py
modified:   tox.ini

* Fix E303 - too many blank lines

modified:   ssh-audit.py
modified:   tox.ini

* Fix E303 - too many blank lines

modified:   ssh-audit.py
modified:   tox.ini

* Fix E301 - expected 1 blank line, found 0

No code change necessary, probably fixed by another commit.

modified:   tox.ini

* Fix E265 - block comment should start with '# '

There is lots of commented out code, which usually should be just
deleted.

I will keep it for now, as I am not yet very familiar with the code
base.

modified:   ssh-audit.py
modified:   tox.ini

* Fix E261 - at least two spaces before inline comment

modified:   ssh-audit.py
modified:   tox.ini

* Fix E251 - unexpected spaces around keyword / parameter equals

modified:   packages/setup.py
modified:   tox.ini

* Fix E231 - missing whitespace after ','

No code change necessary, probably fixed by previous commit.

modified:   tox.ini

* Fix E226 - missing whitespace around arithmetic operator

modified:   ssh-audit.py
modified:   tox.ini

* Fix W293 - blank line contains whitespace

modified:   ssh-audit.py
modified:   tox.ini

* Fix E221 - multiple spaces before operator

modified:   ssh-audit.py
modified:   tox.ini

* Update comment about Flake 8 E241

Lots of data is formatted as tables, so this warning is disabled for a
good reason.

modified:   tox.ini

* Fix E401 - multiple imports on one line

modified:   ssh-audit.py
modified:   tox.ini

* Do not ignore Flake8 warning F401

... as there were no errors in source code anyway.

modified:   tox.ini

* Fix F821 - undefined name

modified:   ssh-audit.py
modified:   tox.ini

* Reformat ignore section for Flake8

modified:   tox.ini

* Flake8 test suite

modified:   test/conftest.py
modified:   test/test_auditconf.py
modified:   test/test_banner.py
modified:   test/test_buffer.py
modified:   test/test_errors.py
modified:   test/test_output.py
modified:   test/test_resolve.py
modified:   test/test_socket.py
modified:   test/test_software.py
modified:   test/test_ssh1.py
modified:   test/test_ssh2.py
modified:   test/test_ssh_algorithm.py
modified:   test/test_utils.py
modified:   test/test_version_compare.py
modified:   tox.ini
This commit is contained in:
Jürgen Gmach 2020-06-09 23:54:07 +02:00 committed by GitHub
parent 29d874b450
commit 246a41d46f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 459 additions and 485 deletions

View File

@ -5,7 +5,7 @@ import re
from setuptools import setup from setuptools import setup
version = re.search('^VERSION\s*=\s*\'v(\d\.\d\.\d)\'', open('sshaudit/sshaudit.py').read(), re.M).group(1) version = re.search(r'^VERSION\s*=\s*\'v(\d\.\d\.\d)\'', open('sshaudit/sshaudit.py').read(), re.M).group(1)
print("\n\nPackaging ssh-audit v%s...\n\n" % version) print("\n\nPackaging ssh-audit v%s...\n\n" % version)
with open("sshaudit/README.md", "rb") as f: with open("sshaudit/README.md", "rb") as f:
@ -13,20 +13,20 @@ with open("sshaudit/README.md", "rb") as f:
setup( setup(
name = "ssh-audit", name="ssh-audit",
packages = ["sshaudit"], packages=["sshaudit"],
license = 'MIT', license='MIT',
entry_points = { entry_points={
"console_scripts": ['ssh-audit = sshaudit.sshaudit:main'] "console_scripts": ['ssh-audit = sshaudit.sshaudit:main']
}, },
version = version, version=version,
description = "An SSH server & client configuration security auditing tool", description="An SSH server & client configuration security auditing tool",
long_description = long_descr, long_description=long_descr,
long_description_content_type = "text/markdown", long_description_content_type="text/markdown",
author = "Joe Testa", author="Joe Testa",
author_email = "jtesta@positronsecurity.com", author_email="jtesta@positronsecurity.com",
url = "https://github.com/jtesta/ssh-audit", url="https://github.com/jtesta/ssh-audit",
classifiers = [ classifiers=[
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"Intended Audience :: Information Technology", "Intended Audience :: Information Technology",
"Intended Audience :: System Administrators", "Intended Audience :: System Administrators",

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ class _OutputSpy(list):
self.__out = StringIO() self.__out = StringIO()
self.__old_stdout = sys.stdout self.__old_stdout = sys.stdout
sys.stdout = self.__out sys.stdout = self.__out
def flush(self): def flush(self):
lines = self.__out.getvalue().splitlines() lines = self.__out.getvalue().splitlines()
sys.stdout = self.__old_stdout sys.stdout = self.__old_stdout
@ -44,12 +44,12 @@ class _VirtualGlobalSocket(object):
def __init__(self, vsocket): def __init__(self, vsocket):
self.vsocket = vsocket self.vsocket = vsocket
self.addrinfodata = {} self.addrinfodata = {}
# pylint: disable=unused-argument # pylint: disable=unused-argument
def create_connection(self, address, timeout=0, source_address=None): def create_connection(self, address, timeout=0, source_address=None):
# pylint: disable=protected-access # pylint: disable=protected-access
return self.vsocket._connect(address, True) return self.vsocket._connect(address, True)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def socket(self, def socket(self,
family=socket.AF_INET, family=socket.AF_INET,
@ -57,7 +57,7 @@ class _VirtualGlobalSocket(object):
proto=0, proto=0,
fileno=None): fileno=None):
return self.vsocket return self.vsocket
def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0): def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0):
key = '{0}#{1}'.format(host, port) key = '{0}#{1}'.format(host, port)
if key in self.addrinfodata: if key in self.addrinfodata:
@ -85,41 +85,41 @@ class _VirtualSocket(object):
self.sdata = [] self.sdata = []
self.errors = {} self.errors = {}
self.gsock = _VirtualGlobalSocket(self) self.gsock = _VirtualGlobalSocket(self)
def _check_err(self, method): def _check_err(self, method):
method_error = self.errors.get(method) method_error = self.errors.get(method)
if method_error: if method_error:
raise method_error raise method_error
def connect(self, address): def connect(self, address):
return self._connect(address, False) return self._connect(address, False)
def _connect(self, address, ret=True): def _connect(self, address, ret=True):
self.peer_address = address self.peer_address = address
self._connected = True self._connected = True
self._check_err('connect') self._check_err('connect')
return self if ret else None return self if ret else None
def settimeout(self, timeout): def settimeout(self, timeout):
self.timeout = timeout self.timeout = timeout
def gettimeout(self): def gettimeout(self):
return self.timeout return self.timeout
def getpeername(self): def getpeername(self):
if self.peer_address is None or not self._connected: if self.peer_address is None or not self._connected:
raise socket.error(57, 'Socket is not connected') raise socket.error(57, 'Socket is not connected')
return self.peer_address return self.peer_address
def getsockname(self): def getsockname(self):
return self.sock_address return self.sock_address
def bind(self, address): def bind(self, address):
self.sock_address = address self.sock_address = address
def listen(self, backlog): def listen(self, backlog):
pass pass
def accept(self): def accept(self):
# pylint: disable=protected-access # pylint: disable=protected-access
conn = _VirtualSocket() conn = _VirtualSocket()
@ -127,7 +127,7 @@ class _VirtualSocket(object):
conn.peer_address = ('127.0.0.1', 0) conn.peer_address = ('127.0.0.1', 0)
conn._connected = True conn._connected = True
return conn, conn.peer_address return conn, conn.peer_address
def recv(self, bufsize, flags=0): def recv(self, bufsize, flags=0):
# pylint: disable=unused-argument # pylint: disable=unused-argument
if not self._connected: if not self._connected:
@ -138,7 +138,7 @@ class _VirtualSocket(object):
if isinstance(data, Exception): if isinstance(data, Exception):
raise data raise data
return data return data
def send(self, data): def send(self, data):
if self.peer_address is None or not self._connected: if self.peer_address is None or not self._connected:
raise socket.error(32, 'Broken pipe') raise socket.error(32, 'Broken pipe')

View File

@ -9,7 +9,7 @@ class TestAuditConf(object):
def init(self, ssh_audit): def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.usage = ssh_audit.usage self.usage = ssh_audit.usage
@staticmethod @staticmethod
def _test_conf(conf, **kwargs): def _test_conf(conf, **kwargs):
options = { options = {
@ -38,11 +38,11 @@ class TestAuditConf(object):
assert conf.ipv4 == options['ipv4'] assert conf.ipv4 == options['ipv4']
assert conf.ipv6 == options['ipv6'] assert conf.ipv6 == options['ipv6']
assert conf.ipvo == options['ipvo'] assert conf.ipvo == options['ipvo']
def test_audit_conf_defaults(self): def test_audit_conf_defaults(self):
conf = self.AuditConf() conf = self.AuditConf()
self._test_conf(conf) self._test_conf(conf)
def test_audit_conf_booleans(self): def test_audit_conf_booleans(self):
conf = self.AuditConf() conf = self.AuditConf()
for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']: for p in ['ssh1', 'ssh2', 'batch', 'colors', 'verbose']:
@ -52,7 +52,7 @@ class TestAuditConf(object):
for v in [False, 0]: for v in [False, 0]:
setattr(conf, p, v) setattr(conf, p, v)
assert getattr(conf, p) is False assert getattr(conf, p) is False
def test_audit_conf_port(self): def test_audit_conf_port(self):
conf = self.AuditConf() conf = self.AuditConf()
for port in [22, 2222]: for port in [22, 2222]:
@ -62,7 +62,7 @@ class TestAuditConf(object):
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
conf.port = port conf.port = port
excinfo.match(r'.*invalid port.*') excinfo.match(r'.*invalid port.*')
def test_audit_conf_ipvo(self): def test_audit_conf_ipvo(self):
# ipv4-only # ipv4-only
conf = self.AuditConf() conf = self.AuditConf()
@ -114,7 +114,7 @@ class TestAuditConf(object):
assert conf.ipvo == (4, 6) assert conf.ipvo == (4, 6)
conf.ipvo = (4, 4, 4, 6, 6) conf.ipvo = (4, 4, 4, 6, 6)
assert conf.ipvo == (4, 6) assert conf.ipvo == (4, 6)
def test_audit_conf_level(self): def test_audit_conf_level(self):
conf = self.AuditConf() conf = self.AuditConf()
for level in ['info', 'warn', 'fail']: for level in ['info', 'warn', 'fail']:
@ -124,7 +124,7 @@ class TestAuditConf(object):
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
conf.level = level conf.level = level
excinfo.match(r'.*invalid level.*') excinfo.match(r'.*invalid level.*')
def test_audit_conf_cmdline(self): def test_audit_conf_cmdline(self):
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage) # noqa c = lambda x: self.AuditConf.from_cmdline(x.split(), self.usage) # noqa

View File

@ -8,7 +8,7 @@ class TestBanner(object):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def test_simple_banners(self): def test_simple_banners(self):
banner = lambda x: self.ssh.Banner.parse(x) # noqa banner = lambda x: self.ssh.Banner.parse(x) # noqa
b = banner('SSH-2.0-OpenSSH_7.3') b = banner('SSH-2.0-OpenSSH_7.3')
@ -26,12 +26,12 @@ class TestBanner(object):
assert b.software == 'Cisco-1.25' assert b.software == 'Cisco-1.25'
assert b.comments is None assert b.comments is None
assert str(b) == 'SSH-1.5-Cisco-1.25' assert str(b) == 'SSH-1.5-Cisco-1.25'
def test_invalid_banners(self): def test_invalid_banners(self):
b = lambda x: self.ssh.Banner.parse(x) # noqa b = lambda x: self.ssh.Banner.parse(x) # noqa
assert b('Something') is None assert b('Something') is None
assert b('SSH-XXX-OpenSSH_7.3') is None assert b('SSH-XXX-OpenSSH_7.3') is None
def test_banners_with_spaces(self): def test_banners_with_spaces(self):
b = lambda x: self.ssh.Banner.parse(x) # noqa b = lambda x: self.ssh.Banner.parse(x) # noqa
s = 'SSH-2.0-OpenSSH_4.3p2' s = 'SSH-2.0-OpenSSH_4.3p2'
@ -42,7 +42,7 @@ class TestBanner(object):
assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu')) == s assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu')) == s
assert str(b('SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s assert str(b('SSH-2.0-OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s
assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s assert str(b('SSH-2.0- OpenSSH_4.3p2 Debian-9etch3 on i686-pc-linux-gnu ')) == s
def test_banners_without_software(self): def test_banners_without_software(self):
b = lambda x: self.ssh.Banner.parse(x) # noqa b = lambda x: self.ssh.Banner.parse(x) # noqa
assert b('SSH-2.0').protocol == (2, 0) assert b('SSH-2.0').protocol == (2, 0)
@ -53,13 +53,13 @@ class TestBanner(object):
assert b('SSH-2.0-').software == '' assert b('SSH-2.0-').software == ''
assert b('SSH-2.0-').comments is None assert b('SSH-2.0-').comments is None
assert str(b('SSH-2.0-')) == 'SSH-2.0-' assert str(b('SSH-2.0-')) == 'SSH-2.0-'
def test_banners_with_comments(self): def test_banners_with_comments(self):
b = lambda x: self.ssh.Banner.parse(x) # noqa b = lambda x: self.ssh.Banner.parse(x) # noqa
assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '<Banner(protocol=2.0, software=OpenSSH_7.2p2, comments=Ubuntu-1)>' assert repr(b('SSH-2.0-OpenSSH_7.2p2 Ubuntu-1')) == '<Banner(protocol=2.0, software=OpenSSH_7.2p2, comments=Ubuntu-1)>'
assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '<Banner(protocol=1.99, software=OpenSSH_3.4p1, comments=Debian 1:3.4p1-1.woody.3)>' assert repr(b('SSH-1.99-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3')) == '<Banner(protocol=1.99, software=OpenSSH_3.4p1, comments=Debian 1:3.4p1-1.woody.3)>'
assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '<Banner(protocol=1.5, software=1.3.7, comments=F-SECURE SSH)>' assert repr(b('SSH-1.5-1.3.7 F-SECURE SSH')) == '<Banner(protocol=1.5, software=1.3.7, comments=F-SECURE SSH)>'
def test_banners_with_multiple_protocols(self): def test_banners_with_multiple_protocols(self):
b = lambda x: self.ssh.Banner.parse(x) # noqa b = lambda x: self.ssh.Banner.parse(x) # noqa
assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2' assert str(b('SSH-1.99-SSH-1.99-OpenSSH_3.6.1p2')) == 'SSH-1.99-OpenSSH_3.6.1p2'

View File

@ -11,13 +11,13 @@ class TestBuffer(object):
self.rbuf = ssh_audit.ReadBuf self.rbuf = ssh_audit.ReadBuf
self.wbuf = ssh_audit.WriteBuf self.wbuf = ssh_audit.WriteBuf
self.utf8rchar = b'\xef\xbf\xbd' self.utf8rchar = b'\xef\xbf\xbd'
@classmethod @classmethod
def _b(cls, v): def _b(cls, v):
v = re.sub(r'\s', '', v) v = re.sub(r'\s', '', v)
data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)] data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
return bytes(bytearray(data)) return bytes(bytearray(data))
def test_unread(self): def test_unread(self):
w = self.wbuf().write_byte(1).write_int(2).write_flush() w = self.wbuf().write_byte(1).write_int(2).write_flush()
r = self.rbuf(w) r = self.rbuf(w)
@ -26,7 +26,7 @@ class TestBuffer(object):
assert r.unread_len == 4 assert r.unread_len == 4
r.read_int() r.read_int()
assert r.unread_len == 0 assert r.unread_len == 0
def test_byte(self): def test_byte(self):
w = lambda x: self.wbuf().write_byte(x).write_flush() # noqa w = lambda x: self.wbuf().write_byte(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_byte() # noqa r = lambda x: self.rbuf(x).read_byte() # noqa
@ -37,7 +37,7 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert w(p[0]) == self._b(p[1]) assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0] assert r(self._b(p[1])) == p[0]
def test_bool(self): def test_bool(self):
w = lambda x: self.wbuf().write_bool(x).write_flush() # noqa w = lambda x: self.wbuf().write_bool(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_bool() # noqa r = lambda x: self.rbuf(x).read_bool() # noqa
@ -46,7 +46,7 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert w(p[0]) == self._b(p[1]) assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0] assert r(self._b(p[1])) == p[0]
def test_int(self): def test_int(self):
w = lambda x: self.wbuf().write_int(x).write_flush() # noqa w = lambda x: self.wbuf().write_int(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_int() # noqa r = lambda x: self.rbuf(x).read_int() # noqa
@ -57,7 +57,7 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert w(p[0]) == self._b(p[1]) assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0] assert r(self._b(p[1])) == p[0]
def test_string(self): def test_string(self):
w = lambda x: self.wbuf().write_string(x).write_flush() # noqa w = lambda x: self.wbuf().write_string(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_string() # noqa r = lambda x: self.rbuf(x).read_string() # noqa
@ -69,7 +69,7 @@ class TestBuffer(object):
if not isinstance(v, bytes): if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8')) v = bytes(bytearray(v, 'utf-8'))
assert r(self._b(p[1])) == v assert r(self._b(p[1])) == v
def test_list(self): def test_list(self):
w = lambda x: self.wbuf().write_list(x).write_flush() # noqa w = lambda x: self.wbuf().write_list(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_list() # noqa r = lambda x: self.rbuf(x).read_list() # noqa
@ -77,13 +77,13 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert w(p[0]) == self._b(p[1]) assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0] assert r(self._b(p[1])) == p[0]
def test_list_nonutf8(self): def test_list_nonutf8(self):
r = lambda x: self.rbuf(x).read_list() # noqa r = lambda x: self.rbuf(x).read_list() # noqa
src = self._b('00 00 00 04 de ad be ef') src = self._b('00 00 00 04 de ad be ef')
dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')] dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')]
assert r(src) == dst assert r(src) == dst
def test_line(self): def test_line(self):
w = lambda x: self.wbuf().write_line(x).write_flush() # noqa w = lambda x: self.wbuf().write_line(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_line() # noqa r = lambda x: self.rbuf(x).read_line() # noqa
@ -91,13 +91,13 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert w(p[0]) == self._b(p[1]) assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0] assert r(self._b(p[1])) == p[0]
def test_line_nonutf8(self): def test_line_nonutf8(self):
r = lambda x: self.rbuf(x).read_line() # noqa r = lambda x: self.rbuf(x).read_line() # noqa
src = self._b('de ad be af') src = self._b('de ad be af')
dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8') dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')
assert r(src) == dst assert r(src) == dst
def test_bitlen(self): def test_bitlen(self):
# pylint: disable=protected-access # pylint: disable=protected-access
class Py26Int(int): class Py26Int(int):
@ -105,7 +105,7 @@ class TestBuffer(object):
raise AttributeError raise AttributeError
assert self.wbuf._bitlength(42) == 6 assert self.wbuf._bitlength(42) == 6
assert self.wbuf._bitlength(Py26Int(42)) == 6 assert self.wbuf._bitlength(Py26Int(42)) == 6
def test_mpint1(self): def test_mpint1(self):
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() # noqa mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() # noqa
mpint1r = lambda x: self.rbuf(x).read_mpint1() # noqa mpint1r = lambda x: self.rbuf(x).read_mpint1() # noqa
@ -116,7 +116,7 @@ class TestBuffer(object):
for p in tc: for p in tc:
assert mpint1w(p[0]) == self._b(p[1]) assert mpint1w(p[0]) == self._b(p[1])
assert mpint1r(self._b(p[1])) == p[0] assert mpint1r(self._b(p[1])) == p[0]
def test_mpint2(self): def test_mpint2(self):
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() # noqa mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() # noqa
mpint2r = lambda x: self.rbuf(x).read_mpint2() # noqa mpint2r = lambda x: self.rbuf(x).read_mpint2() # noqa

View File

@ -11,13 +11,13 @@ class TestErrors(object):
def init(self, ssh_audit): def init(self, ssh_audit):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.audit = ssh_audit.audit self.audit = ssh_audit.audit
def _conf(self): def _conf(self):
conf = self.AuditConf('localhost', 22) conf = self.AuditConf('localhost', 22)
conf.colors = False conf.colors = False
conf.batch = True conf.batch = True
return conf return conf
def _audit(self, spy, conf=None, sysexit=True): def _audit(self, spy, conf=None, sysexit=True):
if conf is None: if conf is None:
conf = self._conf() conf = self._conf()
@ -29,34 +29,33 @@ class TestErrors(object):
self.audit(conf) self.audit(conf)
lines = spy.flush() lines = spy.flush()
return lines return lines
def test_connection_unresolved(self, output_spy, virtual_socket): def test_connection_unresolved(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.gsock.addrinfodata['localhost#22'] = [] vsocket.gsock.addrinfodata['localhost#22'] = []
lines = self._audit(output_spy) lines = self._audit(output_spy)
assert len(lines) == 1 assert len(lines) == 1
assert 'has no DNS records' in lines[-1] assert 'has no DNS records' in lines[-1]
def test_connection_refused(self, output_spy, virtual_socket): def test_connection_refused(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused') vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused')
lines = self._audit(output_spy) lines = self._audit(output_spy)
assert len(lines) == 1 assert len(lines) == 1
assert 'Connection refused' in lines[-1] assert 'Connection refused' in lines[-1]
def test_connection_timeout(self, output_spy, virtual_socket): def test_connection_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.errors['connect'] = socket.timeout('timed out') vsocket.errors['connect'] = socket.timeout('timed out')
lines = self._audit(output_spy) lines = self._audit(output_spy)
assert len(lines) == 1 assert len(lines) == 1
assert 'timed out' in lines[-1] assert 'timed out' in lines[-1]
def test_recv_empty(self, output_spy, virtual_socket): def test_recv_empty(self, output_spy, virtual_socket):
vsocket = virtual_socket
lines = self._audit(output_spy) lines = self._audit(output_spy)
assert len(lines) == 1 assert len(lines) == 1
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
def test_recv_timeout(self, output_spy, virtual_socket): def test_recv_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(socket.timeout('timed out')) vsocket.rdata.append(socket.timeout('timed out'))
@ -64,7 +63,7 @@ class TestErrors(object):
assert len(lines) == 1 assert len(lines) == 1
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1] assert 'timed out' in lines[-1]
def test_recv_retry_till_timeout(self, output_spy, virtual_socket): def test_recv_retry_till_timeout(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
@ -75,7 +74,7 @@ class TestErrors(object):
assert len(lines) == 1 assert len(lines) == 1
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
assert 'timed out' in lines[-1] assert 'timed out' in lines[-1]
def test_recv_retry_till_reset(self, output_spy, virtual_socket): def test_recv_retry_till_reset(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
@ -86,7 +85,7 @@ class TestErrors(object):
assert len(lines) == 1 assert len(lines) == 1
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1] assert 'reset by peer' in lines[-1]
def test_connection_closed_before_banner(self, output_spy, virtual_socket): def test_connection_closed_before_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer')) vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
@ -94,7 +93,7 @@ class TestErrors(object):
assert len(lines) == 1 assert len(lines) == 1
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1] assert 'reset by peer' in lines[-1]
def test_connection_closed_after_header(self, output_spy, virtual_socket): def test_connection_closed_after_header(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'header line 1\n') vsocket.rdata.append(b'header line 1\n')
@ -105,7 +104,7 @@ class TestErrors(object):
assert len(lines) == 3 assert len(lines) == 3
assert 'did not receive banner' in lines[-1] assert 'did not receive banner' in lines[-1]
assert 'reset by peer' in lines[-1] assert 'reset by peer' in lines[-1]
def test_connection_closed_after_banner(self, output_spy, virtual_socket): def test_connection_closed_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
@ -114,7 +113,7 @@ class TestErrors(object):
assert len(lines) == 2 assert len(lines) == 2
assert 'error reading packet' in lines[-1] assert 'error reading packet' in lines[-1]
assert 'reset by peer' in lines[-1] assert 'reset by peer' in lines[-1]
def test_empty_data_after_banner(self, output_spy, virtual_socket): def test_empty_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
@ -122,7 +121,7 @@ class TestErrors(object):
assert len(lines) == 2 assert len(lines) == 2
assert 'error reading packet' in lines[-1] assert 'error reading packet' in lines[-1]
assert 'empty' in lines[-1] assert 'empty' in lines[-1]
def test_wrong_data_after_banner(self, output_spy, virtual_socket): def test_wrong_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
@ -131,7 +130,7 @@ class TestErrors(object):
assert len(lines) == 2 assert len(lines) == 2
assert 'error reading packet' in lines[-1] assert 'error reading packet' in lines[-1]
assert 'xxx' in lines[-1] assert 'xxx' in lines[-1]
def test_non_ascii_banner(self, output_spy, virtual_socket): def test_non_ascii_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n') vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n')
@ -140,7 +139,7 @@ class TestErrors(object):
assert 'error reading packet' in lines[-1] assert 'error reading packet' in lines[-1]
assert 'ASCII' in lines[-2] assert 'ASCII' in lines[-2]
assert lines[-3].endswith('SSH-2.0-ssh-audit-test?') assert lines[-3].endswith('SSH-2.0-ssh-audit-test?')
def test_nonutf8_data_after_banner(self, output_spy, virtual_socket): def test_nonutf8_data_after_banner(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
@ -149,7 +148,7 @@ class TestErrors(object):
assert len(lines) == 2 assert len(lines) == 2
assert 'error reading packet' in lines[-1] assert 'error reading packet' in lines[-1]
assert '\\x81\\xff' in lines[-1] assert '\\x81\\xff' in lines[-1]
def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket): def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n') vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n')

View File

@ -10,7 +10,7 @@ class TestOutput(object):
def init(self, ssh_audit): def init(self, ssh_audit):
self.Output = ssh_audit.Output self.Output = ssh_audit.Output
self.OutputBuffer = ssh_audit.OutputBuffer self.OutputBuffer = ssh_audit.OutputBuffer
def test_output_buffer_no_lines(self, output_spy): def test_output_buffer_no_lines(self, output_spy):
output_spy.begin() output_spy.begin()
with self.OutputBuffer() as obuf: with self.OutputBuffer() as obuf:
@ -21,13 +21,13 @@ class TestOutput(object):
pass pass
obuf.flush() obuf.flush()
assert output_spy.flush() == [] assert output_spy.flush() == []
def test_output_buffer_no_flush(self, output_spy): def test_output_buffer_no_flush(self, output_spy):
output_spy.begin() output_spy.begin()
with self.OutputBuffer(): with self.OutputBuffer():
print(u'abc') print(u'abc')
assert output_spy.flush() == [] assert output_spy.flush() == []
def test_output_buffer_flush(self, output_spy): def test_output_buffer_flush(self, output_spy):
output_spy.begin() output_spy.begin()
with self.OutputBuffer() as obuf: with self.OutputBuffer() as obuf:
@ -36,14 +36,14 @@ class TestOutput(object):
print(u'def') print(u'def')
obuf.flush() obuf.flush()
assert output_spy.flush() == [u'abc', u'', u'def'] assert output_spy.flush() == [u'abc', u'', u'def']
def test_output_defaults(self): def test_output_defaults(self):
out = self.Output() out = self.Output()
# default: on # default: on
assert out.batch is False assert out.batch is False
assert out.use_colors is True assert out.use_colors is True
assert out.level == 'info' assert out.level == 'info'
def test_output_colors(self, output_spy): def test_output_colors(self, output_spy):
out = self.Output() out = self.Output()
# test without colors # test without colors
@ -82,7 +82,7 @@ class TestOutput(object):
output_spy.begin() output_spy.begin()
out.fail('fail color') out.fail('fail color')
assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m'] assert output_spy.flush() == [u'\x1b[0;31mfail color\x1b[0m']
def test_output_sep(self, output_spy): def test_output_sep(self, output_spy):
out = self.Output() out = self.Output()
output_spy.begin() output_spy.begin()
@ -90,7 +90,7 @@ class TestOutput(object):
out.sep() out.sep()
out.sep() out.sep()
assert output_spy.flush() == [u'', u'', u''] assert output_spy.flush() == [u'', u'', u'']
def test_output_levels(self): def test_output_levels(self):
out = self.Output() out = self.Output()
assert out.get_level('info') == 0 assert out.get_level('info') == 0
@ -98,7 +98,7 @@ class TestOutput(object):
assert out.get_level('warn') == 1 assert out.get_level('warn') == 1
assert out.get_level('fail') == 2 assert out.get_level('fail') == 2
assert out.get_level('unknown') > 2 assert out.get_level('unknown') > 2
def test_output_level_property(self): def test_output_level_property(self):
out = self.Output() out = self.Output()
out.level = 'info' out.level = 'info'
@ -111,7 +111,7 @@ class TestOutput(object):
assert out.level == 'fail' assert out.level == 'fail'
out.level = 'invalid level' out.level = 'invalid level'
assert out.level == 'unknown' assert out.level == 'unknown'
def test_output_level(self, output_spy): def test_output_level(self, output_spy):
out = self.Output() out = self.Output()
# visible: all # visible: all
@ -150,7 +150,7 @@ class TestOutput(object):
out.warn('warn color') out.warn('warn color')
out.fail('fail color') out.fail('fail color')
assert len(output_spy.flush()) == 1 assert len(output_spy.flush()) == 1
def test_output_batch(self, output_spy): def test_output_batch(self, output_spy):
out = self.Output() out = self.Output()
# visible: all # visible: all

View File

@ -11,13 +11,13 @@ class TestResolve(object):
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
self.audit = ssh_audit.audit self.audit = ssh_audit.audit
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def _conf(self): def _conf(self):
conf = self.AuditConf('localhost', 22) conf = self.AuditConf('localhost', 22)
conf.colors = False conf.colors = False
conf.batch = True conf.batch = True
return conf return conf
def test_resolve_error(self, output_spy, virtual_socket): def test_resolve_error(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.gsock.addrinfodata['localhost#22'] = socket.gaierror(8, 'hostname nor servname provided, or not known') vsocket.gsock.addrinfodata['localhost#22'] = socket.gaierror(8, 'hostname nor servname provided, or not known')
@ -25,11 +25,11 @@ class TestResolve(object):
conf = self._conf() conf = self._conf()
output_spy.begin() output_spy.begin()
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
r = list(s._resolve(conf.ipvo)) list(s._resolve(conf.ipvo))
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 1 assert len(lines) == 1
assert 'hostname nor servname provided' in lines[-1] assert 'hostname nor servname provided' in lines[-1]
def test_resolve_hostname_without_records(self, output_spy, virtual_socket): def test_resolve_hostname_without_records(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
vsocket.gsock.addrinfodata['localhost#22'] = [] vsocket.gsock.addrinfodata['localhost#22'] = []
@ -38,36 +38,32 @@ class TestResolve(object):
output_spy.begin() output_spy.begin()
r = list(s._resolve(conf.ipvo)) r = list(s._resolve(conf.ipvo))
assert len(r) == 0 assert len(r) == 0
def test_resolve_ipv4(self, virtual_socket): def test_resolve_ipv4(self, virtual_socket):
vsocket = virtual_socket
conf = self._conf() conf = self._conf()
conf.ipv4 = True conf.ipv4 = True
s = self.ssh.Socket('localhost', 22) s = self.ssh.Socket('localhost', 22)
r = list(s._resolve(conf.ipvo)) r = list(s._resolve(conf.ipvo))
assert len(r) == 1 assert len(r) == 1
assert r[0] == (socket.AF_INET, ('127.0.0.1', 22)) assert r[0] == (socket.AF_INET, ('127.0.0.1', 22))
def test_resolve_ipv6(self, virtual_socket): def test_resolve_ipv6(self, virtual_socket):
vsocket = virtual_socket
s = self.ssh.Socket('localhost', 22) s = self.ssh.Socket('localhost', 22)
conf = self._conf() conf = self._conf()
conf.ipv6 = True conf.ipv6 = True
r = list(s._resolve(conf.ipvo)) r = list(s._resolve(conf.ipvo))
assert len(r) == 1 assert len(r) == 1
assert r[0] == (socket.AF_INET6, ('::1', 22)) assert r[0] == (socket.AF_INET6, ('::1', 22))
def test_resolve_ipv46_both(self, virtual_socket): def test_resolve_ipv46_both(self, virtual_socket):
vsocket = virtual_socket
s = self.ssh.Socket('localhost', 22) s = self.ssh.Socket('localhost', 22)
conf = self._conf() conf = self._conf()
r = list(s._resolve(conf.ipvo)) r = list(s._resolve(conf.ipvo))
assert len(r) == 2 assert len(r) == 2
assert r[0] == (socket.AF_INET, ('127.0.0.1', 22)) assert r[0] == (socket.AF_INET, ('127.0.0.1', 22))
assert r[1] == (socket.AF_INET6, ('::1', 22)) assert r[1] == (socket.AF_INET6, ('::1', 22))
def test_resolve_ipv46_order(self, virtual_socket): def test_resolve_ipv46_order(self, virtual_socket):
vsocket = virtual_socket
s = self.ssh.Socket('localhost', 22) s = self.ssh.Socket('localhost', 22)
conf = self._conf() conf = self._conf()
conf.ipv4 = True conf.ipv4 = True

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import socket
import pytest import pytest
@ -9,21 +8,21 @@ class TestSocket(object):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def test_invalid_host(self, virtual_socket): def test_invalid_host(self, virtual_socket):
with pytest.raises(ValueError): with pytest.raises(ValueError):
s = self.ssh.Socket(None, 22) self.ssh.Socket(None, 22)
def test_invalid_port(self, virtual_socket): def test_invalid_port(self, virtual_socket):
with pytest.raises(ValueError): with pytest.raises(ValueError):
s = self.ssh.Socket('localhost', 'abc') self.ssh.Socket('localhost', 'abc')
with pytest.raises(ValueError): with pytest.raises(ValueError):
s = self.ssh.Socket('localhost', -1) self.ssh.Socket('localhost', -1)
with pytest.raises(ValueError): with pytest.raises(ValueError):
s = self.ssh.Socket('localhost', 0) self.ssh.Socket('localhost', 0)
with pytest.raises(ValueError): with pytest.raises(ValueError):
s = self.ssh.Socket('localhost', 65536) self.ssh.Socket('localhost', 65536)
def test_not_connected_socket(self, virtual_socket): def test_not_connected_socket(self, virtual_socket):
sock = self.ssh.Socket('localhost', 22) sock = self.ssh.Socket('localhost', 22)
banner, header, err = sock.get_banner() banner, header, err = sock.get_banner()

View File

@ -8,13 +8,13 @@ class TestSoftware(object):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def test_unknown_software(self): def test_unknown_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
assert ps('SSH-1.5') is None assert ps('SSH-1.5') is None
assert ps('SSH-1.99-AlfaMegaServer') is None assert ps('SSH-1.99-AlfaMegaServer') is None
assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None assert ps('SSH-2.0-BetaMegaServer 0.0.1') is None
def test_openssh_software(self): def test_openssh_software(self):
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
@ -102,7 +102,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == 'OpenSSH 5.9' assert s.display(False) == 'OpenSSH 5.9'
assert repr(s) == '<Software(product=OpenSSH, version=5.9, patch=CASPUR)>' assert repr(s) == '<Software(product=OpenSSH, version=5.9, patch=CASPUR)>'
def test_dropbear_software(self): def test_dropbear_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common # common
@ -153,7 +153,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == 'Dropbear SSH 2014.66' assert s.display(False) == 'Dropbear SSH 2014.66'
assert repr(s) == '<Software(product=Dropbear SSH, version=2014.66, patch=agbn_1)>' assert repr(s) == '<Software(product=Dropbear SSH, version=2014.66, patch=agbn_1)>'
def test_libssh_software(self): def test_libssh_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common # common
@ -179,7 +179,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == str(s) assert s.display(False) == str(s)
assert repr(s) == '<Software(product=libssh, version=0.7.4)>' assert repr(s) == '<Software(product=libssh, version=0.7.4)>'
def test_romsshell_software(self): def test_romsshell_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common # common
@ -194,7 +194,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == str(s) assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=Allegro Software, product=RomSShell, version=5.40)>' assert repr(s) == '<Software(vendor=Allegro Software, product=RomSShell, version=5.40)>'
def test_hp_ilo_software(self): def test_hp_ilo_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common # common
@ -209,7 +209,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == str(s) assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=HP, product=iLO (Integrated Lights-Out) sshd, version=0.2.1)>' assert repr(s) == '<Software(vendor=HP, product=iLO (Integrated Lights-Out) sshd, version=0.2.1)>'
def test_cisco_software(self): def test_cisco_software(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# common # common
@ -224,7 +224,7 @@ class TestSoftware(object):
assert s.display(True) == str(s) assert s.display(True) == str(s)
assert s.display(False) == str(s) assert s.display(False) == str(s)
assert repr(s) == '<Software(vendor=Cisco, product=IOS/PIX sshd, version=1.25)>' assert repr(s) == '<Software(vendor=Cisco, product=IOS/PIX sshd, version=1.25)>'
def test_software_os(self): def test_software_os(self):
ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa ps = lambda x: self.ssh.Software.parse(self.ssh.Banner.parse(x)) # noqa
# unknown # unknown

View File

@ -14,7 +14,7 @@ class TestSSH1(object):
self.wbuf = ssh_audit.WriteBuf self.wbuf = ssh_audit.WriteBuf
self.audit = ssh_audit.audit self.audit = ssh_audit.audit
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
def _conf(self): def _conf(self):
conf = self.AuditConf('localhost', 22) conf = self.AuditConf('localhost', 22)
conf.colors = False conf.colors = False
@ -23,7 +23,7 @@ class TestSSH1(object):
conf.ssh1 = True conf.ssh1 = True
conf.ssh2 = False conf.ssh2 = False
return conf return conf
def _create_ssh1_packet(self, payload, valid_crc=True): def _create_ssh1_packet(self, payload, valid_crc=True):
padding = -(len(payload) + 4) % 8 padding = -(len(payload) + 4) % 8
plen = len(payload) + 4 plen = len(payload) + 4
@ -31,15 +31,15 @@ class TestSSH1(object):
cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0 cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0
data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum) data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum)
return data return data
@classmethod @classmethod
def _server_key(cls): def _server_key(cls):
return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551) return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551)
@classmethod @classmethod
def _host_key(cls): def _host_key(cls):
return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b) return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b)
def _pkm_payload(self): def _pkm_payload(self):
w = self.wbuf() w = self.wbuf()
w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff') w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
@ -51,11 +51,11 @@ class TestSSH1(object):
w.write_int(72) w.write_int(72)
w.write_int(36) w.write_int(36)
return w.write_flush() return w.write_flush()
def test_crc32(self): def test_crc32(self):
assert self.ssh1.crc32(b'') == 0x00 assert self.ssh1.crc32(b'') == 0x00
assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808 assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808
def test_fingerprint(self): def test_fingerprint(self):
# pylint: disable=protected-access # pylint: disable=protected-access
b, e, m = self._host_key() b, e, m = self._host_key()
@ -65,7 +65,7 @@ class TestSSH1(object):
assert b == 2048 assert b == 2048
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def _assert_pkm_keys(self, pkm, skey, hkey): def _assert_pkm_keys(self, pkm, skey, hkey):
b, e, m = skey b, e, m = skey
assert pkm.server_key_bits == b assert pkm.server_key_bits == b
@ -75,7 +75,7 @@ class TestSSH1(object):
assert pkm.host_key_bits == b assert pkm.host_key_bits == b
assert pkm.host_key_public_exponent == e assert pkm.host_key_public_exponent == e
assert pkm.host_key_public_modulus == m assert pkm.host_key_public_modulus == m
def _assert_pkm_fields(self, pkm, skey, hkey): def _assert_pkm_fields(self, pkm, skey, hkey):
assert pkm is not None assert pkm is not None
assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
@ -88,25 +88,25 @@ class TestSSH1(object):
fp = self.ssh.Fingerprint(pkm.host_key_fingerprint_data) fp = self.ssh.Fingerprint(pkm.host_key_fingerprint_data)
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96' assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs' assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def test_pkm_init(self): def test_pkm_init(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
pflags, cmask, amask = 2, 72, 36 pflags, cmask, amask = 2, 72, 36
skey, hkey = self._server_key(), self._host_key() skey, hkey = self._server_key(), self._host_key()
pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
self._assert_pkm_fields(pkm, skey, hkey) self._assert_pkm_fields(pkm, skey, hkey)
for skey2 in ([], [0], [0,1], [0,1,2,3]): for skey2 in ([], [0], [0, 1], [0, 1, 2, 3]):
with pytest.raises(ValueError): with pytest.raises(ValueError):
pkm = self.ssh1.PublicKeyMessage(cookie, skey2, hkey, pflags, cmask, amask) pkm = self.ssh1.PublicKeyMessage(cookie, skey2, hkey, pflags, cmask, amask)
for hkey2 in ([], [0], [0,1], [0,1,2,3]): for hkey2 in ([], [0], [0, 1], [0, 1, 2, 3]):
with pytest.raises(ValueError): with pytest.raises(ValueError):
print(hkey2) print(hkey2)
pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey2, pflags, cmask, amask) pkm = self.ssh1.PublicKeyMessage(cookie, skey, hkey2, pflags, cmask, amask)
def test_pkm_read(self): def test_pkm_read(self):
pkm = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) pkm = self.ssh1.PublicKeyMessage.parse(self._pkm_payload())
self._assert_pkm_fields(pkm, self._server_key(), self._host_key()) self._assert_pkm_fields(pkm, self._server_key(), self._host_key())
def test_pkm_payload(self): def test_pkm_payload(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff' cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
skey, hkey = self._server_key(), self._host_key() skey, hkey = self._server_key(), self._host_key()
@ -114,7 +114,7 @@ class TestSSH1(object):
pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask) pkm1 = self.ssh1.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload()) pkm2 = self.ssh1.PublicKeyMessage.parse(self._pkm_payload())
assert pkm1.payload == pkm2.payload assert pkm1.payload == pkm2.payload
def test_ssh1_server_simple(self, output_spy, virtual_socket): def test_ssh1_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
w = self.wbuf() w = self.wbuf()
@ -126,7 +126,7 @@ class TestSSH1(object):
self.audit(self._conf()) self.audit(self._conf())
lines = output_spy.flush() lines = output_spy.flush()
assert len(lines) == 13 assert len(lines) == 13
def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket): def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket
w = self.wbuf() w = self.wbuf()

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import struct, os import os
import struct
import pytest import pytest
@ -14,7 +15,7 @@ class TestSSH2(object):
self.wbuf = ssh_audit.WriteBuf self.wbuf = ssh_audit.WriteBuf
self.audit = ssh_audit.audit self.audit = ssh_audit.audit
self.AuditConf = ssh_audit.AuditConf self.AuditConf = ssh_audit.AuditConf
def _conf(self): def _conf(self):
conf = self.AuditConf('localhost', 22) conf = self.AuditConf('localhost', 22)
conf.colors = False conf.colors = False
@ -23,7 +24,7 @@ class TestSSH2(object):
conf.ssh1 = False conf.ssh1 = False
conf.ssh2 = True conf.ssh2 = True
return conf return conf
@classmethod @classmethod
def _create_ssh2_packet(cls, payload): def _create_ssh2_packet(cls, payload):
padding = -(len(payload) + 5) % 8 padding = -(len(payload) + 5) % 8
@ -33,7 +34,7 @@ class TestSSH2(object):
pad_bytes = b'\x00' * padding pad_bytes = b'\x00' * padding
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
return data return data
def _kex_payload(self): def _kex_payload(self):
w = self.wbuf() w = self.wbuf()
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff') w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
@ -50,7 +51,7 @@ class TestSSH2(object):
w.write_byte(False) w.write_byte(False)
w.write_int(0) w.write_int(0)
return w.write_flush() return w.write_flush()
def test_kex_read(self): def test_kex_read(self):
kex = self.ssh2.Kex.parse(self._kex_payload()) kex = self.ssh2.Kex.parse(self._kex_payload())
assert kex is not None assert kex is not None
@ -69,7 +70,7 @@ class TestSSH2(object):
assert kex.server.languages == [u''] assert kex.server.languages == [u'']
assert kex.follows is False assert kex.follows is False
assert kex.unused == 0 assert kex.unused == 0
def _get_empty_kex(self, cookie=None): def _get_empty_kex(self, cookie=None):
kex_algs, key_algs = [], [] kex_algs, key_algs = [], []
enc, mac, compression, languages = [], [], ['none'], [] enc, mac, compression, languages = [], [], ['none'], []
@ -80,7 +81,7 @@ class TestSSH2(object):
cookie = os.urandom(16) cookie = os.urandom(16)
kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0) kex = self.ssh2.Kex(cookie, kex_algs, key_algs, cli, srv, 0)
return kex return kex
def _get_kex_variat1(self): def _get_kex_variat1(self):
cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
kex = self._get_empty_kex(cookie) kex = self._get_empty_kex(cookie)
@ -123,12 +124,12 @@ class TestSSH2(object):
continue continue
kex.client.compression.append(a) kex.client.compression.append(a)
return kex return kex
def test_key_payload(self): def test_key_payload(self):
kex1 = self._get_kex_variat1() kex1 = self._get_kex_variat1()
kex2 = self.ssh2.Kex.parse(self._kex_payload()) kex2 = self.ssh2.Kex.parse(self._kex_payload())
assert kex1.payload == kex2.payload assert kex1.payload == kex2.payload
@pytest.mark.skip(reason="Temporarily skip this test to have a working test suite!") @pytest.mark.skip(reason="Temporarily skip this test to have a working test suite!")
def test_ssh2_server_simple(self, output_spy, virtual_socket): def test_ssh2_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket vsocket = virtual_socket

View File

@ -8,24 +8,24 @@ class TestSSHAlgorithm(object):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def _tf(self, v, s=None): def _tf(self, v, s=None):
return self.ssh.Algorithm.Timeframe().update(v, s) return self.ssh.Algorithm.Timeframe().update(v, s)
def test_get_ssh_version(self): def test_get_ssh_version(self):
def ver(v): def ver(v):
return self.ssh.Algorithm.get_ssh_version(v) return self.ssh.Algorithm.get_ssh_version(v)
assert ver('7.5') == ('OpenSSH', '7.5', False) assert ver('7.5') == ('OpenSSH', '7.5', False)
assert ver('7.5C') == ('OpenSSH', '7.5', True) assert ver('7.5C') == ('OpenSSH', '7.5', True)
assert ver('d2016.74') == ('Dropbear SSH', '2016.74', False) assert ver('d2016.74') == ('Dropbear SSH', '2016.74', False)
assert ver('l10.7.4') == ('libssh', '0.7.4', False) assert ver('l10.7.4') == ('libssh', '0.7.4', False)
assert ver('')[1] == '' assert ver('')[1] == ''
def test_get_since_text(self): def test_get_since_text(self):
def gst(v): def gst(v):
return self.ssh.Algorithm.get_since_text(v) return self.ssh.Algorithm.get_since_text(v)
assert gst(['7.5']) == 'available since OpenSSH 7.5' assert gst(['7.5']) == 'available since OpenSSH 7.5'
assert gst(['7.5C']) == 'available since OpenSSH 7.5 (client only)' assert gst(['7.5C']) == 'available since OpenSSH 7.5 (client only)'
assert gst(['7.5,']) == 'available since OpenSSH 7.5' assert gst(['7.5,']) == 'available since OpenSSH 7.5'
@ -33,12 +33,12 @@ class TestSSHAlgorithm(object):
assert gst(['7.5,d2016.73']) == 'available since OpenSSH 7.5, Dropbear SSH 2016.73' assert gst(['7.5,d2016.73']) == 'available since OpenSSH 7.5, Dropbear SSH 2016.73'
assert gst(['l10.7.4']) is None assert gst(['l10.7.4']) is None
assert gst([]) is None assert gst([]) is None
def test_timeframe_creation(self): def test_timeframe_creation(self):
# pylint: disable=line-too-long,too-many-statements # pylint: disable=line-too-long,too-many-statements
def cmp_tf(v, s, r): def cmp_tf(v, s, r):
assert str(self._tf(v, s)) == str(r) assert str(self._tf(v, s)) == str(r)
cmp_tf(['6.2'], None, {'OpenSSH': ['6.2', None, '6.2', None]}) cmp_tf(['6.2'], None, {'OpenSSH': ['6.2', None, '6.2', None]})
cmp_tf(['6.2'], True, {'OpenSSH': ['6.2', None, None, None]}) cmp_tf(['6.2'], True, {'OpenSSH': ['6.2', None, None, None]})
cmp_tf(['6.2'], False, {'OpenSSH': [None, None, '6.2', None]}) cmp_tf(['6.2'], False, {'OpenSSH': [None, None, '6.2', None]})
@ -57,7 +57,7 @@ class TestSSHAlgorithm(object):
cmp_tf(['6.2C,6.3'], None, {'OpenSSH': ['6.3', None, '6.2', None]}) cmp_tf(['6.2C,6.3'], None, {'OpenSSH': ['6.3', None, '6.2', None]})
cmp_tf(['6.2C,6.3'], True, {'OpenSSH': ['6.3', None, None, None]}) cmp_tf(['6.2C,6.3'], True, {'OpenSSH': ['6.3', None, None, None]})
cmp_tf(['6.2C,6.3'], False, {'OpenSSH': [None, None, '6.2', None]}) cmp_tf(['6.2C,6.3'], False, {'OpenSSH': [None, None, '6.2', None]})
cmp_tf(['6.2', '6.6'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '6.6']}) cmp_tf(['6.2', '6.6'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '6.6']})
cmp_tf(['6.2', '6.6'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6'], True, {'OpenSSH': ['6.2', '6.6', None, None]})
cmp_tf(['6.2', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']}) cmp_tf(['6.2', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']})
@ -76,7 +76,7 @@ class TestSSHAlgorithm(object):
cmp_tf(['6.2C,6.3', '6.6'], None, {'OpenSSH': ['6.3', '6.6', '6.2', '6.6']}) cmp_tf(['6.2C,6.3', '6.6'], None, {'OpenSSH': ['6.3', '6.6', '6.2', '6.6']})
cmp_tf(['6.2C,6.3', '6.6'], True, {'OpenSSH': ['6.3', '6.6', None, None]}) cmp_tf(['6.2C,6.3', '6.6'], True, {'OpenSSH': ['6.3', '6.6', None, None]})
cmp_tf(['6.2C,6.3', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']}) cmp_tf(['6.2C,6.3', '6.6'], False, {'OpenSSH': [None, None, '6.2', '6.6']})
cmp_tf(['6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.2', None]}) cmp_tf(['6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.2', None]})
cmp_tf(['6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]})
cmp_tf(['6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.2', None]}) cmp_tf(['6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.2', None]})
@ -95,7 +95,7 @@ class TestSSHAlgorithm(object):
cmp_tf(['6.3C,6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.3', None]}) cmp_tf(['6.3C,6.2', '6.6', None], None, {'OpenSSH': ['6.2', '6.6', '6.3', None]})
cmp_tf(['6.3C,6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.3C,6.2', '6.6', None], True, {'OpenSSH': ['6.2', '6.6', None, None]})
cmp_tf(['6.3C,6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.3', None]}) cmp_tf(['6.3C,6.2', '6.6', None], False, {'OpenSSH': [None, None, '6.3', None]})
cmp_tf(['6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '7.1']}) cmp_tf(['6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.2', '7.1']})
cmp_tf(['6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]})
cmp_tf(['6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.2', '7.1']}) cmp_tf(['6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.2', '7.1']})
@ -111,7 +111,7 @@ class TestSSHAlgorithm(object):
cmp_tf(['6.3C,6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.3', '7.1']}) cmp_tf(['6.3C,6.2', '6.6', '7.1'], None, {'OpenSSH': ['6.2', '6.6', '6.3', '7.1']})
cmp_tf(['6.3C,6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]}) cmp_tf(['6.3C,6.2', '6.6', '7.1'], True, {'OpenSSH': ['6.2', '6.6', None, None]})
cmp_tf(['6.3C,6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.3', '7.1']}) cmp_tf(['6.3C,6.2', '6.6', '7.1'], False, {'OpenSSH': [None, None, '6.3', '7.1']})
tf1 = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74']) tf1 = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74'])
tf2 = self._tf(['d2016.72,6.2C,6.1', 'd2016.73,6.6', 'd2016.74,7.1']) tf2 = self._tf(['d2016.72,6.2C,6.1', 'd2016.73,6.6', 'd2016.74,7.1'])
tf3 = self._tf(['d2016.72,6.2C,6.1', '6.6,d2016.73', '7.1,d2016.74']) tf3 = self._tf(['d2016.72,6.2C,6.1', '6.6,d2016.73', '7.1,d2016.74'])
@ -123,7 +123,7 @@ class TestSSHAlgorithm(object):
assert dv in str(tf1) and dv in str(tf2) and dv in str(tf3) assert dv in str(tf1) and dv in str(tf2) and dv in str(tf3)
assert ov in repr(tf1) and ov in repr(tf2) and ov in repr(tf3) assert ov in repr(tf1) and ov in repr(tf2) and ov in repr(tf3)
assert dv in repr(tf1) and dv in repr(tf2) and dv in repr(tf3) assert dv in repr(tf1) and dv in repr(tf2) and dv in repr(tf3)
def test_timeframe_object(self): def test_timeframe_object(self):
tf = self._tf(['6.1,6.2C', '6.6', '7.1']) tf = self._tf(['6.1,6.2C', '6.6', '7.1'])
assert 'OpenSSH' in tf assert 'OpenSSH' in tf
@ -138,7 +138,7 @@ class TestSSHAlgorithm(object):
assert tf.get_till('OpenSSH', True) == '6.6' assert tf.get_till('OpenSSH', True) == '6.6'
assert tf.get_from('OpenSSH', False) == '6.2' assert tf.get_from('OpenSSH', False) == '6.2'
assert tf.get_till('OpenSSH', False) == '7.1' assert tf.get_till('OpenSSH', False) == '7.1'
tf = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74']) tf = self._tf(['6.1,d2016.72,6.2C', '6.6,d2016.73', '7.1,d2016.74'])
assert 'OpenSSH' in tf assert 'OpenSSH' in tf
assert 'Dropbear SSH' in tf assert 'Dropbear SSH' in tf

View File

@ -10,7 +10,7 @@ class TestUtils(object):
def init(self, ssh_audit): def init(self, ssh_audit):
self.utils = ssh_audit.Utils self.utils = ssh_audit.Utils
self.PY3 = sys.version_info >= (3,) self.PY3 = sys.version_info >= (3,)
def test_to_bytes_py2(self): def test_to_bytes_py2(self):
if self.PY3: if self.PY3:
return return
@ -22,7 +22,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_bytes(123) self.utils.to_bytes(123)
def test_to_bytes_py3(self): def test_to_bytes_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -34,7 +34,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_bytes(123) self.utils.to_bytes(123)
def test_to_utext_py2(self): def test_to_utext_py2(self):
if self.PY3: if self.PY3:
return return
@ -46,7 +46,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_utext(123) self.utils.to_utext(123)
def test_to_utext_py3(self): def test_to_utext_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -58,7 +58,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_utext(123) self.utils.to_utext(123)
def test_to_ntext_py2(self): def test_to_ntext_py2(self):
if self.PY3: if self.PY3:
return return
@ -70,7 +70,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_ntext(123) self.utils.to_ntext(123)
def test_to_ntext_py3(self): def test_to_ntext_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -82,7 +82,7 @@ class TestUtils(object):
# other # other
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_ntext(123) self.utils.to_ntext(123)
def test_is_ascii_py2(self): def test_is_ascii_py2(self):
if self.PY3: if self.PY3:
return return
@ -94,7 +94,7 @@ class TestUtils(object):
assert self.utils.is_ascii('fran\xc3\xa7ais') is False assert self.utils.is_ascii('fran\xc3\xa7ais') is False
# other # other
assert self.utils.is_ascii(123) is False assert self.utils.is_ascii(123) is False
def test_is_ascii_py3(self): def test_is_ascii_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -105,7 +105,7 @@ class TestUtils(object):
assert self.utils.is_ascii(u'fran\xe7ais') is False assert self.utils.is_ascii(u'fran\xe7ais') is False
# other # other
assert self.utils.is_ascii(123) is False assert self.utils.is_ascii(123) is False
def test_to_ascii_py2(self): def test_to_ascii_py2(self):
if self.PY3: if self.PY3:
return return
@ -119,7 +119,7 @@ class TestUtils(object):
assert self.utils.to_ascii('fran\xc3\xa7ais', 'ignore') == 'franais' assert self.utils.to_ascii('fran\xc3\xa7ais', 'ignore') == 'franais'
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_ascii(123) self.utils.to_ascii(123)
def test_to_ascii_py3(self): def test_to_ascii_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -132,7 +132,7 @@ class TestUtils(object):
assert self.utils.to_ascii(u'fran\xe7ais', 'ignore') == 'franais' assert self.utils.to_ascii(u'fran\xe7ais', 'ignore') == 'franais'
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_ascii(123) self.utils.to_ascii(123)
def test_is_print_ascii_py2(self): def test_is_print_ascii_py2(self):
if self.PY3: if self.PY3:
return return
@ -147,7 +147,7 @@ class TestUtils(object):
assert self.utils.is_print_ascii('fran\xc3\xa7ais') is False assert self.utils.is_print_ascii('fran\xc3\xa7ais') is False
# other # other
assert self.utils.is_print_ascii(123) is False assert self.utils.is_print_ascii(123) is False
def test_is_print_ascii_py3(self): def test_is_print_ascii_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -160,7 +160,7 @@ class TestUtils(object):
assert self.utils.is_print_ascii(u'fran\xe7ais') is False assert self.utils.is_print_ascii(u'fran\xe7ais') is False
# other # other
assert self.utils.is_print_ascii(123) is False assert self.utils.is_print_ascii(123) is False
def test_to_print_ascii_py2(self): def test_to_print_ascii_py2(self):
if self.PY3: if self.PY3:
return return
@ -180,7 +180,7 @@ class TestUtils(object):
assert self.utils.to_print_ascii('fran\xc3\xa7ais\n', 'ignore') == 'franais' assert self.utils.to_print_ascii('fran\xc3\xa7ais\n', 'ignore') == 'franais'
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_print_ascii(123) self.utils.to_print_ascii(123)
def test_to_print_ascii_py3(self): def test_to_print_ascii_py3(self):
if not self.PY3: if not self.PY3:
return return
@ -199,18 +199,18 @@ class TestUtils(object):
assert self.utils.to_print_ascii(u'fran\xe7ais\n', 'ignore') == 'franais' assert self.utils.to_print_ascii(u'fran\xe7ais\n', 'ignore') == 'franais'
with pytest.raises(TypeError): with pytest.raises(TypeError):
self.utils.to_print_ascii(123) self.utils.to_print_ascii(123)
def test_ctoi(self): def test_ctoi(self):
assert self.utils.ctoi(123) == 123 assert self.utils.ctoi(123) == 123
assert self.utils.ctoi('ABC') == 65 assert self.utils.ctoi('ABC') == 65
def test_parse_int(self): def test_parse_int(self):
assert self.utils.parse_int(123) == 123 assert self.utils.parse_int(123) == 123
assert self.utils.parse_int('123') == 123 assert self.utils.parse_int('123') == 123
assert self.utils.parse_int(-123) == -123 assert self.utils.parse_int(-123) == -123
assert self.utils.parse_int('-123') == -123 assert self.utils.parse_int('-123') == -123
assert self.utils.parse_int('abc') == 0 assert self.utils.parse_int('abc') == 0
def test_unique_seq(self): def test_unique_seq(self):
assert self.utils.unique_seq((1, 2, 2, 3, 3, 3)) == (1, 2, 3) assert self.utils.unique_seq((1, 2, 2, 3, 3, 3)) == (1, 2, 3)
assert self.utils.unique_seq((3, 3, 3, 2, 2, 1)) == (3, 2, 1) assert self.utils.unique_seq((3, 3, 3, 2, 2, 1)) == (3, 2, 1)

View File

@ -8,19 +8,19 @@ class TestVersionCompare(object):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def init(self, ssh_audit): def init(self, ssh_audit):
self.ssh = ssh_audit.SSH self.ssh = ssh_audit.SSH
def get_dropbear_software(self, v): def get_dropbear_software(self, v):
b = self.ssh.Banner.parse('SSH-2.0-dropbear_{0}'.format(v)) b = self.ssh.Banner.parse('SSH-2.0-dropbear_{0}'.format(v))
return self.ssh.Software.parse(b) return self.ssh.Software.parse(b)
def get_openssh_software(self, v): def get_openssh_software(self, v):
b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v)) b = self.ssh.Banner.parse('SSH-2.0-OpenSSH_{0}'.format(v))
return self.ssh.Software.parse(b) return self.ssh.Software.parse(b)
def get_libssh_software(self, v): def get_libssh_software(self, v):
b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v)) b = self.ssh.Banner.parse('SSH-2.0-libssh-{0}'.format(v))
return self.ssh.Software.parse(b) return self.ssh.Software.parse(b)
def test_dropbear_compare_version_pre_years(self): def test_dropbear_compare_version_pre_years(self):
s = self.get_dropbear_software('0.44') s = self.get_dropbear_software('0.44')
assert s.compare_version(None) == 1 assert s.compare_version(None) == 1
@ -32,7 +32,7 @@ class TestVersionCompare(object):
assert s.between_versions('0.43', '0.45') assert s.between_versions('0.43', '0.45')
assert s.between_versions('0.43', '0.43') is False assert s.between_versions('0.43', '0.43') is False
assert s.between_versions('0.45', '0.43') is False assert s.between_versions('0.45', '0.43') is False
def test_dropbear_compare_version_with_years(self): def test_dropbear_compare_version_with_years(self):
s = self.get_dropbear_software('2015.71') s = self.get_dropbear_software('2015.71')
assert s.compare_version(None) == 1 assert s.compare_version(None) == 1
@ -44,7 +44,7 @@ class TestVersionCompare(object):
assert s.between_versions('2014.66', '2016.74') assert s.between_versions('2014.66', '2016.74')
assert s.between_versions('2014.66', '2015.69') is False assert s.between_versions('2014.66', '2015.69') is False
assert s.between_versions('2016.74', '2014.66') is False assert s.between_versions('2016.74', '2014.66') is False
def test_dropbear_compare_version_mixed(self): def test_dropbear_compare_version_mixed(self):
s = self.get_dropbear_software('0.53.1') s = self.get_dropbear_software('0.53.1')
assert s.compare_version(None) == 1 assert s.compare_version(None) == 1
@ -56,7 +56,7 @@ class TestVersionCompare(object):
assert s.between_versions('0.53', '2011.54') assert s.between_versions('0.53', '2011.54')
assert s.between_versions('0.53', '0.53') is False assert s.between_versions('0.53', '0.53') is False
assert s.between_versions('2011.54', '0.53') is False assert s.between_versions('2011.54', '0.53') is False
def test_dropbear_compare_version_patchlevel(self): def test_dropbear_compare_version_patchlevel(self):
s1 = self.get_dropbear_software('0.44') s1 = self.get_dropbear_software('0.44')
s2 = self.get_dropbear_software('0.44test3') s2 = self.get_dropbear_software('0.44test3')
@ -80,7 +80,7 @@ class TestVersionCompare(object):
assert s2.between_versions('0.44', '0.43') is False assert s2.between_versions('0.44', '0.43') is False
assert s1.compare_version(s2) > 0 assert s1.compare_version(s2) > 0
assert s2.compare_version(s1) < 0 assert s2.compare_version(s1) < 0
def test_dropbear_compare_version_sequential(self): def test_dropbear_compare_version_sequential(self):
versions = [] versions = []
for i in range(28, 44): for i in range(28, 44):
@ -105,18 +105,18 @@ class TestVersionCompare(object):
versions.append('2015.{0}'.format(i)) versions.append('2015.{0}'.format(i))
for i in range(72, 75): for i in range(72, 75):
versions.append('2016.{0}'.format(i)) versions.append('2016.{0}'.format(i))
l = len(versions) length = len(versions)
for i in range(l): for i in range(length):
v = versions[i] v = versions[i]
s = self.get_dropbear_software(v) s = self.get_dropbear_software(v)
assert s.compare_version(v) == 0 assert s.compare_version(v) == 0
if i - 1 >= 0: if i - 1 >= 0:
vbefore = versions[i - 1] vbefore = versions[i - 1]
assert s.compare_version(vbefore) > 0 assert s.compare_version(vbefore) > 0
if i + 1 < l: if i + 1 < length:
vnext = versions[i + 1] vnext = versions[i + 1]
assert s.compare_version(vnext) < 0 assert s.compare_version(vnext) < 0
def test_openssh_compare_version_simple(self): def test_openssh_compare_version_simple(self):
s = self.get_openssh_software('3.7.1') s = self.get_openssh_software('3.7.1')
assert s.compare_version(None) == 1 assert s.compare_version(None) == 1
@ -128,7 +128,7 @@ class TestVersionCompare(object):
assert s.between_versions('3.7', '3.8') assert s.between_versions('3.7', '3.8')
assert s.between_versions('3.6', '3.7') is False assert s.between_versions('3.6', '3.7') is False
assert s.between_versions('3.8', '3.7') is False assert s.between_versions('3.8', '3.7') is False
def test_openssh_compare_version_patchlevel(self): def test_openssh_compare_version_patchlevel(self):
s1 = self.get_openssh_software('2.1.1') s1 = self.get_openssh_software('2.1.1')
s2 = self.get_openssh_software('2.1.1p2') s2 = self.get_openssh_software('2.1.1p2')
@ -141,7 +141,7 @@ class TestVersionCompare(object):
assert s2.compare_version('2.1.1p3') < 0 assert s2.compare_version('2.1.1p3') < 0
assert s1.compare_version(s2) == 0 assert s1.compare_version(s2) == 0
assert s2.compare_version(s1) == 0 assert s2.compare_version(s1) == 0
def test_openbsd_compare_version_sequential(self): def test_openbsd_compare_version_sequential(self):
versions = [] versions = []
for v in ['1.2.3', '2.1.0', '2.1.1', '2.2.0', '2.3.0']: for v in ['1.2.3', '2.1.0', '2.1.1', '2.2.0', '2.3.0']:
@ -164,18 +164,18 @@ class TestVersionCompare(object):
versions.append('6.{0}'.format(i)) versions.append('6.{0}'.format(i))
for i in range(0, 4): for i in range(0, 4):
versions.append('7.{0}'.format(i)) versions.append('7.{0}'.format(i))
l = len(versions) length = len(versions)
for i in range(l): for i in range(length):
v = versions[i] v = versions[i]
s = self.get_openssh_software(v) s = self.get_openssh_software(v)
assert s.compare_version(v) == 0 assert s.compare_version(v) == 0
if i - 1 >= 0: if i - 1 >= 0:
vbefore = versions[i - 1] vbefore = versions[i - 1]
assert s.compare_version(vbefore) > 0 assert s.compare_version(vbefore) > 0
if i + 1 < l: if i + 1 < length:
vnext = versions[i + 1] vnext = versions[i + 1]
assert s.compare_version(vnext) < 0 assert s.compare_version(vnext) < 0
def test_libssh_compare_version_simple(self): def test_libssh_compare_version_simple(self):
s = self.get_libssh_software('0.3') s = self.get_libssh_software('0.3')
assert s.compare_version(None) == 1 assert s.compare_version(None) == 1
@ -187,7 +187,7 @@ class TestVersionCompare(object):
assert s.between_versions('0.2', '0.3.1') assert s.between_versions('0.2', '0.3.1')
assert s.between_versions('0.1', '0.2') is False assert s.between_versions('0.1', '0.2') is False
assert s.between_versions('0.3.1', '0.2') is False assert s.between_versions('0.3.1', '0.2') is False
def test_libssh_compare_version_sequential(self): def test_libssh_compare_version_sequential(self):
versions = [] versions = []
for v in ['0.2', '0.3']: for v in ['0.2', '0.3']:
@ -202,14 +202,14 @@ class TestVersionCompare(object):
versions.append('0.6.{0}'.format(i)) versions.append('0.6.{0}'.format(i))
for i in range(0, 5): for i in range(0, 5):
versions.append('0.7.{0}'.format(i)) versions.append('0.7.{0}'.format(i))
l = len(versions) length = len(versions)
for i in range(l): for i in range(length):
v = versions[i] v = versions[i]
s = self.get_libssh_software(v) s = self.get_libssh_software(v)
assert s.compare_version(v) == 0 assert s.compare_version(v) == 0
if i - 1 >= 0: if i - 1 >= 0:
vbefore = versions[i - 1] vbefore = versions[i - 1]
assert s.compare_version(vbefore) > 0 assert s.compare_version(vbefore) > 0
if i + 1 < l: if i + 1 < length:
vnext = versions[i + 1] vnext = versions[i + 1]
assert s.compare_version(vnext) < 0 assert s.compare_version(vnext) < 0

51
tox.ini
View File

@ -80,7 +80,7 @@ commands =
deps = deps =
flake8 flake8
commands = commands =
flake8 {posargs:{env:SSHAUDIT}} flake8 {posargs:{env:SSHAUDIT} {toxinidir}/packages/setup.py {toxinidir}/test} --statistics
[testenv:vulture] [testenv:vulture]
deps = deps =
@ -137,42 +137,13 @@ max-module-lines = 2500
[flake8] [flake8]
ignore = ignore =
# indentation contains tabs W191, # indentation contains tabs
W191, E101, # indentation contains mixed spaces and tabs
# blank line contains whitespace E241, # multiple spaces after operator; should be kept for tabular data
W293, E501, # line too long
# indentation contains mixed spaces and tabs E117, # over-indented
E101, E126, # continuation line over-indented for hanging indent
# multiple spaces before operator E128, # continuation line under-indented for visual indent
E221, E722, # do not use bare 'except'
# multiple spaces after operator F601, # dictionary key 'ecdsa-sha2-1.3.132.0.10' repeated with different values
E241, W504, # line break after binary operator; this (or W503) has to stay
# multiple imports on one line
E401,
# line too long
E501,
# module imported but unused
F401,
# undefined name
F821,
# these exceptions should be handled one by one
E117, # over-indented
E126, # continuation line over-indented for hanging indent
E128, # continuation line under-indented for visual indent
E226, # missing whitespace around arithmetic operator
E231, # missing whitespace after ','
E251, # unexpected spaces around keyword / parameter equals
E261, # at least two spaces before inline comment
E265, # block comment should start with '# '
E301, # expected 1 blank line, found 0
E302, # expected 2 blank lines, found 1
E303, # too many blank lines (2)
E305, # expected 2 blank lines after class or function definition, found 1
E711, # comparison to None should be 'if cond is not None:'
E712, # comparison to False should be 'if cond is False:' or 'if not cond:'
E722, # do not use bare 'except'
E741, # ambiguous variable name 'l'
F601, # dictionary key 'ecdsa-sha2-1.3.132.0.10' repeated with different values
F841, # local variable 'e' is assigned to but never used
W504, # line break after binary operator
W605, # invalid escape sequence '\s'