From b9c34882c59daef6f209dcd936f882e923be503b Mon Sep 17 00:00:00 2001 From: mgeeky Date: Thu, 14 Nov 2019 10:54:28 +0100 Subject: [PATCH] smtpAudit.py --- networks/README.md | 37 + networks/smtpAudit.py | 3879 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 3916 insertions(+) create mode 100644 networks/smtpAudit.py diff --git a/networks/README.md b/networks/README.md index e8718ce..294047b 100644 --- a/networks/README.md +++ b/networks/README.md @@ -88,6 +88,43 @@ Capturing on 'eth0' 12 0.050831934 192.168.1.14 → 224.0.0.9 RIPv2 61 Unknown command (64)[Malformed Packet] ``` +- **`smtpAudit.py`** - SMTP Server configuration black-box testing/audit tool, capable of auditing SPF/Accepted Domains, DKIM, DMARC, SSL/TLS, SMTP services, banner, Authentication (AUTH, X-EXPS), conducting user enumerations (VRFY, EXPN, RCPT TO) and others. ([gist](https://gist.github.com/mgeeky/ef49e5fb6c3479dd6a24eb90b53f9baa)) + + Currently supported tests: + 01) 'spf' - SPF DNS record test + - 'spf-version' - Checks whether SPF record version is valid + - 'all-mechanism-usage' - Checks whether 'all' mechanism is used correctly + - 'allowed-hosts-list' - Checks whether there are not too many allowed hosts + 02) 'dkim' - DKIM DNS record test + - 'public-key-length' - Tests whether DKIM Public Key is at least 1024 bits long + 03) 'dmarc' - DMARC DNS record test + - 'dmarc-version' - Checks whether DMARC record version is valid + - 'policy-rejects-by-default' - Checks whether DMARC uses reject policy + - 'number-of-messages-filtered' - Checks whether there are at least 20% messages filtered. + 04) 'banner-contents' - SMTP Banner sensitive informations leak test + - 'not-contains-version' - Contains version information + - 'not-contains-prohibited-words'- Contains software/OS/or other prohibited name + - 'is-not-long-or-complex' - Seems to be long and/or complex + - 'contains-hostname' - Checks whether SMTP banner contains valid hostname + 05) 'open-relay' - Open-Relay misconfiguration test + - 'internal-internal' + - 'internal-external' + - 'external-internal' + - 'external-external' + - And about 19 other variants + - (the above is very effective against Postfix) + 06) 'vrfy' - VRFY user enumeration vulnerability test + 07) 'expn' - EXPN user enumeration vulnerability test + 08) 'rcpt-to' - RCPT TO user enumeration vulnerability test + 09) 'secure-ciphers' - SSL/TLS ciphers security weak configuration + 10) 'starttls-offering' - STARTTLS offering (opportunistic) weak configuration + 11) 'auth-over-ssl' - STARTTLS before AUTH/X-EXPS enforcement weak configuration + 12) 'auth-methods-offered' - Test against unsecure AUTH/X-EXPS PLAIN/LOGIN methods. + 13) 'tls-key-len' - Checks private key length of negotiated or offered SSL/TLS cipher suites. + 14) 'spf-validation' - Checks whether SMTP Server has been configured to validate sender's SPF + or if it's Microsoft Exchange - that is uses Accepted Domains + + - **`sshbrute.py`** - ripped out from Violent Python - by TJ O'Connor. ([gist](https://gist.github.com/mgeeky/70606be7249a61ac26b34b1ef3b07553)) - **`smb-credential-leak.html`** - SMB Credentials leakage by MSEdge as presented in Browser Security White Paper, X41 D-Sec GmbH. ([gist](https://gist.github.com/mgeeky/44ce8a8887c169aa6a0093d915ea103d)) diff --git a/networks/smtpAudit.py b/networks/smtpAudit.py new file mode 100644 index 0000000..35265bf --- /dev/null +++ b/networks/smtpAudit.py @@ -0,0 +1,3879 @@ +#!/usr/bin/python3 +# +# SMTP Server configuration black-box testing/audit tool, capable of auditing +# SPF/Accepted Domains, DKIM, DMARC, SSL/TLS, SMTP services, banner, Authentication (AUTH, X-EXPS) +# user enumerations (VRFY, EXPN, RCPT TO), and others. +# +# Currently supported tests: +# 01) 'spf' - SPF DNS record test +# - 'spf-version' - Checks whether SPF record version is valid +# - 'all-mechanism-usage' - Checks whether 'all' mechanism is used correctly +# - 'allowed-hosts-list' - Checks whether there are not too many allowed hosts +# 02) 'dkim' - DKIM DNS record test +# - 'public-key-length' - Tests whether DKIM Public Key is at least 1024 bits long +# 03) 'dmarc' - DMARC DNS record test +# - 'dmarc-version' - Checks whether DMARC record version is valid +# - 'policy-rejects-by-default' - Checks whether DMARC uses reject policy +# - 'number-of-messages-filtered' - Checks whether there are at least 20% messages filtered. +# 04) 'banner-contents' - SMTP Banner sensitive informations leak test +# - 'not-contains-version' - Contains version information +# - 'not-contains-prohibited-words'- Contains software/OS/or other prohibited name +# - 'is-not-long-or-complex' - Seems to be long and/or complex +# - 'contains-hostname' - Checks whether SMTP banner contains valid hostname +# 05) 'open-relay' - Open-Relay misconfiguration test +# - 'internal-internal' +# - 'internal-external' +# - 'external-internal' +# - 'external-external' +# - And about 19 other variants +# - (the above is very effective against Postfix) +# 06) 'vrfy' - VRFY user enumeration vulnerability test +# 07) 'expn' - EXPN user enumeration vulnerability test +# 08) 'rcpt-to' - RCPT TO user enumeration vulnerability test +# 09) 'secure-ciphers' - SSL/TLS ciphers security weak configuration +# 10) 'starttls-offering' - STARTTLS offering (opportunistic) weak configuration +# 11) 'auth-over-ssl' - STARTTLS before AUTH/X-EXPS enforcement weak configuration +# 12) 'auth-methods-offered' - Test against unsecure AUTH/X-EXPS PLAIN/LOGIN methods. +# 13) 'tls-key-len' - Checks private key length of negotiated or offered SSL/TLS cipher suites. +# 14) 'spf-validation' - Checks whether SMTP Server has been configured to validate sender's SPF +# or if it's Microsoft Exchange - that is uses Accepted Domains +# +# Tests obtain results in tri-state boolean, acordingly: +# - 'secure' - The test has succeeded and proved GOOD and SECURE configuration. +# - 'unsecure'- The test has succeeded and proved BAD and UNSECURE configuration. +# - 'unknown' - The test has failed and did not prove anything. +# +# ATTACKS offered (--attack option): +# Currently the tool offers functionality to lift up user emails enumeration, by the use of +# RCPT TO, MAIL FROM and VRFY methods. +# +# Requirements: +# - Python 3.5+ +# - dnspython +# +# TODO: +# - refactor all the code cause it's a mess at the moment +# - modularize the code +# - add support for Outlook's OWA, AutoDiscover, MAPI-over-HTTP, Exchange ActiveSync (EAC) +# - add support for NTLM/Kerberos (GSSAPI) authentication when used from Domain-joined Windows box +# - BUG: if smtpAudit.py connects with SMTP over non-encrypted channel (ssl: False) it should be alerted as 'unsecure', it is not atm +# - test it more thoroughly against various SMTP setups and configurations +# - fix the issue with hanged jobs doing DKIM lookup when they reach 99% +# - introduce general program timeout +# - improve output informations/messages, explanations +# - implement options parsing, files passing, verbosity levels, etc +# - add more options specifying various parameters, thresholds +# - research other potential tests to implement +# - add test for 'reject_multi_recipient_bounce' a.k.a. multi RCPT TO commands +# - add more options and improve code for penetration-testing oriented usage (active attacks) +# +# Tested against: +# - postfix 3.x +# - Microsoft Exchange Server 2013 +# +# Author: +# Mariusz B. / mgeeky, '17-19, +# +# + +import re +import sys +import ssl +import time +import json +import math +import base64 +import string +import socket +import pprint +import random +import inspect +import smtplib +import argparse +import datetime +import threading +import multiprocessing +from collections import Counter + +try: + from dns import name, resolver, exception +except ImportError: + print('[!] Module "dnspython" not installed. Try: python3 -m pip install dnspython') + sys.exit(-1) + +if float(sys.version[:3]) < 3.5: + print('[!] This program must be run with Python 3.5+') + sys.exit(-1) + + +# +# =================================================== +# GLOBAL PROGRAM CONFIGURATION +# + +VERSION = '0.7.7' + +config = { + # Enable script's output other than tests results. + 'verbose' : False, + + # Turn on severe debugging facilities + 'debug' : False, + 'smtp_debug': False, + + # Connection timeout threshold + 'timeout' : 5.0, + + # Delay between consequent requests and connections. + 'delay' : 2.0, + + # During the work of the program - the SMTP server will receive many of our incoming + # connections. In such situation, the server may block our new connections due to + # exceeding conns limit/rate (like it does Postfix/anvil=count). Therefore it is crucial + # to set up long enough interconnection-delay that will take of as soon as server + # responds with: "421 Too many connections". For most situations - 60 seconds will do fine. + 'too_many_connections_delay' : 60, + + # Perform full-blown, long-time taking DNS records enumeration (for SPF, DKIM, DMARC) + # Accepted values: + # - 'always' + # - 'on-ip' - do full enumeration only when given with server's IP address + # - 'never' + 'dns_full' : 'on-ip', + + # Specifies whether to do full, long-time taking DKIM selectors review. + 'dkim_full_enumeration' : True, + + # External domain used in Open-Relay and other tests + 'smtp_external_domain': 'gmail.com', + + # Pretend to be the following client host: + 'pretend_client_hostname': 'smtp.gmail.com', + + # Specifies whether to show results JSON unfolded (nested) or only when needed + 'always_unfolded_results': False, + + # Num of enumeration tries until test is considered completed (whether it succeeds or not). + # Value -1 denotes to go with full spectrum of the test. + 'max_enumerations' : -1, + + # Use threading - may cause some issues with responsiveness, or cause program to hang. + 'threads' : True, + + # Uncommon words to have in DKIM selectors permutations list + 'uncommon_words' : (), + + # DO NOT CHANGE THIS ONE. + 'tests_to_carry' : 'all', + 'tests_to_skip' : '', + + # Maximum number of parallel process in DKIM enumeration test + 'parallel_processes' : 10, + + # When DNS resolver becomes busy handling thousands of DKIM queries, + # we can delay asking for more selectors iteratively. + 'delay_dkim_queries' : True, + + # Output format. Possible values: json, text + 'format' : 'text', + + # Colorize output + 'colors': True, + + # Attack mode + 'attack': False, + + # Minimal key length to consider it secure + 'key_len' : 2048, + + # Maximum hosts in SPF considered secure: + 'spf_maximum_hosts' : 32, +} + + +# +# =================================================== +# PROGRAM IMPLEMENTATION +# + +class colors: + '''Colors class: + reset all colors with colors.reset + two subclasses fg for foreground and bg for background. + use as colors.subclass.colorname. + i.e. colors.fg.red or colors.bg.green + also, the generic bold, disable, underline, reverse, strikethrough, + and invisible work with the main class + i.e. colors.bold + ''' + reset = '\033[0m' + bold = '\033[01m' + disable = '\033[02m' + underline = '\033[04m' + reverse = '\033[07m' + strikethrough = '\033[09m' + invisible = '\033[08m' + + class fg: + black = '\033[30m' + red = '\033[31m' + green = '\033[32m' + orange = '\033[33m' + blue = '\033[34m' + purple = '\033[35m' + cyan = '\033[36m' + lightgrey = '\033[37m' + darkgrey = '\033[90m' + lightred = '\033[91m' + lightgreen = '\033[92m' + yellow = '\033[93m' + lightblue = '\033[94m' + pink = '\033[95m' + lightcyan = '\033[96m' + + class bg: + black = '\033[40m' + red = '\033[41m' + green = '\033[42m' + orange = '\033[43m' + blue = '\033[44m' + purple = '\033[45m' + cyan = '\033[46m' + lightgrey = '\033[47m' + +# +# Output routines. +# +def _out(x, toOutLine = False, col = colors.reset): + if config['colors']: + text = '{}{}{}\n'.format( + col, x, colors.reset + ) + else: + text = x + '\n' + + if config['debug'] or config['verbose']: + if config['debug']: + caller = (inspect.getouterframes(inspect.currentframe(), 2))[2][3] + if x.startswith('['): + x = x[:4] + ' ' + caller + '(): ' + x[4:] + sys.stderr.write(text) + + elif config['format'] == 'text' and \ + (toOutLine or 'SECURE: ' in x or 'UNKNOWN: ' in x): + if config['attack']: + sys.stderr.write(text) + else: + sys.stdout.write(text) + +def dbg(x): + if config['debug']: + caller2 = (inspect.getouterframes(inspect.currentframe(), 2))[1][3] + caller1 = (inspect.getouterframes(inspect.currentframe(), 2))[2][3] + caller = '{}() -> {}'.format(caller1, caller2) + text = x + if config['colors']: text = '{}{}{}'.format(colors.fg.lightblue, x, colors.reset) + sys.stderr.write('[dbg] ' + caller + '(): ' + text + '\n') + +def out(x, toOutLine = False): _out('[.] ' + x, toOutLine) +def info(x, toOutLine = False):_out('[?] ' + x, toOutLine, colors.fg.yellow) +def err(x, toOutLine = False): _out('[!] ' + x, toOutLine, colors.bg.red + colors.fg.black) +def fail(x, toOutLine = False):_out('[-] ' + x, toOutLine, colors.fg.red + colors.bold) +def ok(x, toOutLine = False): _out('[+] ' + x, toOutLine, colors.fg.green + colors.bold) + + +class BannerParser: + softwareWeight = 3 + osWeight = 2 + + # MTAs + prohibitedSoftwareWords = ( + 'Exim', + 'Postfix', + 'Maildrop', + 'Cyrus', + 'Sendmail', + 'Exchange', + 'Lotus Domino', + ) + + prohibitedOSWords = ( + 'Windows', + 'Linux', + 'Debian', + 'Fedora', + 'Unix', + '/GNU)', + 'SuSE', + 'Mandriva', + 'Centos', + 'Gentoo', + 'Red Hat', + 'Microsoft(R) Windows(R)', + ) + + # Certain words will have greater weight since they are more important to hide in banner. + # Every word must be in it's own list. + prohibitedWords = prohibitedSoftwareWords + prohibitedOSWords + ( + 'Microsoft ESMTP', + 'MAIL service ready at ', + 'Version:', + 'qmail', + 'Ver.', + '(v.', + 'build:', + ) + + wellKnownDefaultBanners = { + 'Microsoft Exchange' : 'Microsoft ESMTP MAIL service ready at ', + 'IBM Lotus Domino' : 'ESMTP Service (Lotus Domino ', + } + + # Statistical banner's length characteristics + lengthCharacteristics = { + 'mean': 66.08, + 'median': 58.5, + 'std.dev': 27.27 + } + + # Reduced entropy statistical characteristics after removing potential timestamp + # (as being added by e.g. Exim and Exchange) + reducedEntropyCharacteristics = { + 'mean': 3.171583046, + 'median': 3.203097614, + 'std.dev': 0.191227689 + } + + weights = { + 'prohibitedWord': 1, + 'versionFound': 2, + 'versionNearProhibitedWord': 3, + } + + # Max penalty score to consider banner unsecure. + maxPenaltyScore = 4.0 + + localHostnameRegex = r'(?:[0-9]{3}\s)?([\w\-\.]+).*' + + def __init__(self): + self.results = { + 'not-contains-version' : True, + 'not-contains-prohibited-words' : True, + 'is-not-long-or-complex' : True, + 'contains-hostname' : False, + } + + @staticmethod + def entropy(data, unit='natural'): + ''' + Source: https://stackoverflow.com/a/37890790 + ''' + base = { + 'shannon' : 2., + 'natural' : math.exp(1), + 'hartley' : 10. + } + + if len(data) <= 1: + return 0 + + counts = Counter() + for d in data: + counts[d] += 1 + + probs = [float(c) / len(data) for c in counts.values()] + probs = [p for p in probs if p > 0.] + + ent = 0 + for p in probs: + if p > 0.: + ent -= p * math.log(p, base[unit]) + + return ent + + @staticmethod + def removeTimestamp(banner): + rex = r'\w{3}, \d{1,2} \w{3} \d{4} \d{2}:\d{2}:\d{2}(?: .\d{4})?' + return re.sub(rex, '', banner) + + def parseBanner(self, banner): + if not banner: + if config['always_unfolded_results']: + return dict.fromkeys(self.results, None) + else: + return None + + penalty = 0 + versionFound = '' + + for service, wellKnownBanner in BannerParser.wellKnownDefaultBanners.items(): + if wellKnownBanner.lower() in banner.lower(): + fail('UNSECURE: Default banner found for {}: "{}"'.format( + service, banner + )) + return False + + penalty += self.analyseBannerEntropy(banner) + penalty += self.checkForProhibitedWordsAndVersion(banner) + penalty += self.checkHostnameInBanner(banner) + + ret = (penalty < BannerParser.maxPenaltyScore) + if not ret: + fail('UNSECURE: Banner considered revealing sensitive informations (penalty: {}/{})!'.format( + penalty, BannerParser.maxPenaltyScore + )) + _out('\tBanner: ("{}")'.format(banner), toOutLine = True) + + return self.results + else: + ok('SECURE: Banner was not found leaking anything. (penalty: {}/{})'.format( + penalty, BannerParser.maxPenaltyScore + )) + _out('\tBanner: ("{}")'.format(banner), toOutLine = True) + + if all(self.results.values()) and not config['always_unfolded_results']: + return True + else: + return self.results + + def analyseBannerEntropy(self, banner): + penalty = 0 + reducedBanner = BannerParser.removeTimestamp(banner) + bannerEntropy = BannerParser.entropy(reducedBanner) + + dbg('Analysing banner: "{}"'.format(banner)) + dbg('Length: {}, reduced banner Entropy: {:.6f}'.format(len(banner), bannerEntropy)) + + if len(reducedBanner) > (BannerParser.lengthCharacteristics['mean'] \ + + 1 * BannerParser.lengthCharacteristics['std.dev']): + info('Warning: Banner seems to be very long. Consider shortening it.', toOutLine = True) + self.results['is-not-long-or-complex'] = False + penalty += 1 + + if bannerEntropy > (BannerParser.reducedEntropyCharacteristics['mean'] \ + + 1 * BannerParser.reducedEntropyCharacteristics['std.dev']): + info('Warning: Banner seems to be complex in terms of entropy.' + ' Consider generalising it.', toOutLine = True) + self.results['is-not-long-or-complex'] = False + penalty += 1 + + return penalty + + def checkForProhibitedWordsAndVersion(self, banner): + penalty = 0 + versionFound = '' + regexVersionNumber = r'(?:(\d+)\.)?(?:(\d+)\.)?(?:(\d+)\.\d+)' + + match = re.search(regexVersionNumber, banner) + if match: + versionFound = match.group(0) + fail('Sensitive software version number found in banner: "{}"'.format( + versionFound + ), toOutLine = True) + self.results['not-contains-version'] = False + penalty += BannerParser.weights['versionFound'] + + alreadyFound = set() + for word in BannerParser.prohibitedWords: + if word.lower() in banner.lower(): + if not word.lower() in alreadyFound: + info('Prohibited word found in banner: "{}"'.format( + word + ), toOutLine = True) + self.results['not-contains-prohibited-words'] = True + alreadyFound.add(word.lower()) + + mult = 1 + if word.lower() in BannerParser.prohibitedSoftwareWords: + mult = BannerParser.softwareWeight + elif word.lower() in BannerParser.prohibitedOSWords: + mult = BannerParser.prohibitedOSWords + + penalty += (float(mult) * BannerParser.weights['prohibitedWord']) + + # Does the word immediately follow or precede version number? + if versionFound: + surrounds = ( + '{}{}'.format(word, versionFound), + '{}{}'.format(versionFound, word), + '{} {}'.format(word, versionFound), + '{} {}'.format(versionFound, word), + '{}/{}'.format(word, versionFound), + '{}/{}'.format(versionFound, word), + ) + for surr in surrounds: + if surr in banner: + info('Word was found lying around version: "{}". '\ + 'Consider removing it.'.format( + surr + ), toOutLine = True) + penalty += BannerParser.weights['versionNearProhibitedWord'] + break + + return penalty + + def checkHostnameInBanner(self, banner): + penalty = 0 + matched = re.search(BannerParser.localHostnameRegex, banner) + + if matched: + localHostname = matched.group(1) + self.results['contains-hostname'] = True + info('Extracted hostname from banner: "{}"'.format(localHostname)) + else: + fail('SMTP Banner does not contain server\'s hostname. This may cause SPAM reports.', toOutLine = True) + penalty = 1 + + return penalty + + + +class DmarcParser: + def __init__(self): + self.results = { + 'dmarc-version' : False, + 'policy-rejects-by-default': False, + 'number-of-messages-filtered': True, + } + + def processDmarc(self, record): + if not record: + if config['always_unfolded_results']: + return dict.fromkeys(self.results, None) + else: + return None + + for keyValue in record.split(' '): + if not keyValue: break + k, v = keyValue.split('=') + k = k.strip() + v = v.strip() + + if v.endswith(';'): + v = v[:-1] + + if k == 'v': + self.results['dmarc-version'] = v.lower() == 'dmarc1' + if not self.results['dmarc-version']: + fail('UNSECURE: Unknown version of DMARC stated: {}'.format(v)) + + elif k == 'p': + if v.lower() not in ('none', 'reject', 'quarantine'): + fail('UNSECURE: Unknown policy stated: {}'.format(v)) + self.results['policy-rejects-by-default'] = False + else: + self.results['policy-rejects-by-default'] = v.lower() == 'reject' + + if not self.results['policy-rejects-by-default']: + fail('UNSECURE: DMARC policy does not reject unverified messages ({}).'.format( + v + )) + elif k == 'pct': + try: + perc = int(v) + self.results['number-of-messages-filtered'] = perc >= 20 + + if self.results['number-of-messages-filtered']: + info('Percentage of filtered messages is satisfiable ({})'.format( + perc + )) + else: + fail('UNSECURE: Unsatisfiable percentage of messages filtered: {}!'.format( + perc + )) + + except ValueError: + fail('Defined "pct" is not a valid percentage!') + self.results['number-of-messages-filtered'] = False + + if not config['always_unfolded_results'] and all(self.results.values()): + return True + else: + return self.results + +class DkimParser: + minimumDkimKeyLength = 1024 + + def __init__(self): + self.results = { + 'public-key-length': True, + } + + def process(self, record): + self.testKeyLength(record) + + if not config['always_unfolded_results'] and all(self.results.values()): + return True + else: + return self.results + + def testKeyLength(self, txt): + tags = txt.split(';') + dkim = {} + + for t in tags: + k, v = t.strip().split('=') + dkim[k] = v + + if 'p' not in dkim.keys(): return False + + pubkey = base64.b64decode(dkim['p']) + keyLen = (len(pubkey) - 38) * 8 # 38 bytes is for key's metadata + + if keyLen < 0: + fail('Incorrect Public Key in DKIM!') + keyLen = 0 + + dbg('DKIM: version = {}, algorithm = {}, key length = {}'.format( + dkim['v'], dkim['k'], keyLen + )) + + if keyLen < DkimParser.minimumDkimKeyLength: + fail('UNSECURE: DKIM Public Key length is insufficient: {}. ' \ + 'Recommended at least {}'.format( + keyLen, DkimParser.minimumDkimKeyLength + )) + + self.results['public-key-length'] = False + else: + ok('SECURE: DKIM Public key is of sufficient length: {}'.format(keyLen)) + self.results['public-key-length'] = True + + return self.results['public-key-length'] + +class SpfParser: + #maxAllowedNetworkMask = 28 + maxNumberOfDomainsAllowed = 3 + + allowedHostsNumber = 0 + allowSpecifiers = 0 + + mechanisms = ('all', 'ip4', 'ip6', 'a', 'mx', 'ptr', 'exists', 'include') + qualifiers = ('+', '-', '~', '?') + + def __init__(self): + self.results = { + 'spf-version': True, + 'all-mechanism-usage': True, + 'allowed-hosts-list': True, + } + + self.addressBasedMechanism = 0 + + def process(self, record): + if not record: + if config['always_unfolded_results']: + return dict.fromkeys(self.results, None) + else: + return None + + record = record.lower() + tokens = record.split(' ') + + dbg('Processing SPF record: "{}"'.format(record)) + + for token in tokens: + qualifier = '' + if not token: continue + + dbg('SPF token: {}'.format(token)) + + if token.startswith('v=spf'): + self.results['spf-version'] = self.processVersion(token) + continue + + if token[0] not in string.ascii_letters and token[0] not in SpfParser.qualifiers: + fail('SPF record contains unknown qualifier: "{}". Ignoring it...'.format( + token[0] + )) + + qualifier = token[0] + token = token[1:] + else: + qualifier = '+' + + if 'all' in token: + self.results['all-mechanism-correctly-used'] = \ + self.processAllMechanism(token, record, qualifier) + continue + + if len(list(filter(lambda x: token.startswith(x), SpfParser.mechanisms))) >= 1: + self.processMechanism(record, token, qualifier) + + if not self.results['allowed-hosts-list']: + #maxAllowed = 2 ** (32 - SpfParser.maxAllowedNetworkMask) + maxAllowed = config['spf_maximum_hosts'] + + fail('UNSECURE: SPF record allows more than {} max allowed hosts: {} in total.'.format( + maxAllowed, self.allowedHostsNumber + )) + _out('\tRecord: ("{}")'.format(record)) + + if not self.results['allowed-hosts-list']: + fail('There are too many allowed domains/CIDR ranges specified in SPF record: {}.'.format( + self.allowSpecifiers + )) + + if not config['always_unfolded_results'] and all(self.results.values()): + dbg('All tests passed.') + return True + else: + if not all(self.results.values()): + dbg('Not all tests passed.: {}'.format(self.results)) + else: + dbg('All tests passed.') + + return self.results + + def areThereAnyOtherMechanismsThan(self, mechanism, record): + tokens = record.split(' ') + otherMechanisms = 0 + + for token in tokens: + if not token: continue + if token.startswith('v='): continue + if token[0] in SpfParser.qualifiers: + token = token[1:] + + if token == mechanism: continue + if ':' in token: + for s in token.split(':'): + if s in SpfParser.mechanisms: + otherMechanisms += 1 + break + + if '/' in token: + for s in token.split('/'): + if s in SpfParser.mechanisms: + otherMechanisms += 1 + break + + if token in SpfParser.mechanisms: + otherMechanisms += 1 + + dbg('Found {} other mechanisms than "{}"'.format(otherMechanisms, mechanism)) + return (otherMechanisms > 0) + + def processVersion(self, token): + v, ver = token.split('=') + validVersions = ('1') + + for version in validVersions: + if 'spf{}'.format(version) == ver: + dbg('SPF version was found valid.') + return True + + fail('SPF version is invalid.') + return False + + def processAllMechanism(self, token, record, qualifier): + if not record.endswith(token): + fail('SPF Record wrongly stated - "{}" mechanism must be placed at the end!'.format( + token + )) + return False + + if token == 'all' and qualifier == '+': + fail('UNSECURE: SPF too permissive: "The domain owner thinks that SPF is useless and/or doesn\'t care.": "{}"'.format(record)) + return False + + if not self.areThereAnyOtherMechanismsThan('all', record): + fail('SPF "all" mechanism is too restrictive: "The domain sends no mail at all.": "{}"'.format(record), toOutLine = True) + return False + + return True + + def getNetworkSize(self, net): + dbg('Getting network size out of: {}'.format(net)) + m = re.match(r'[\w\.-:]+\/(\d{1,2})', net) + if m: + mask = int(m.group(1)) + return 2 ** (32 - mask) + + # Assuming any other value is a one host. + return 1 + + def processMechanism(self, record, token, qualifier): + key, value = None, None + addressBasedMechanisms = ('ip4', 'ip6', 'a', 'mx') + numOfAddrBasedMechanisms = len(list(filter(lambda x: token.startswith(x), + addressBasedMechanisms))) + + # Processing address-based mechanisms. + if numOfAddrBasedMechanisms >= 1: + if self.addressBasedMechanism >= SpfParser.maxNumberOfDomainsAllowed: + self.results['allowed-hosts-list'] = False + self.allowSpecifiers += 1 + else: + if qualifier == '+': + self.addressBasedMechanism += 1 + self.checkTooManyAllowedHosts(token, record, qualifier) + else: + dbg('Mechanism: "{}" not being passed.'.format(token)) + + + def checkTooManyAllowedHosts(self, token, record, qualifier): + if self.results['allowed-hosts-list'] != True: + return + + tok, val = None, None + + if ':' in token: + tok, val = token.split(':') + elif '/' in token and not ':' in token: + tok, val = token.split('/') + val = '0/{}'.format(val) + elif token in SpfParser.mechanisms: + tok = token + val = '0/32' + else: + err('Invalid address-based mechanism: {}!'.format(token)) + return + + dbg('Processing SPF mechanism: "{}" with value: "{}"'.format( + tok, val + )) + + size = self.getNetworkSize(val) + #maxAllowed = 2 ** (32 - SpfParser.maxAllowedNetworkMask) + maxAllowed = config['spf_maximum_hosts'] + + self.allowedHostsNumber += size + if size > maxAllowed: + self.results['minimum-allowed-hosts-list'] = False + fail('UNSECURE: Too many hosts allowed in directive: {} - total: {}'.format( + token, size + )) + + +class SmtpTester: + testsConducted = { + 'spf' : 'SPF DNS record test', + 'dkim' : 'DKIM DNS record test', + 'dmarc' : 'DMARC DNS record test', + 'banner-contents': 'SMTP Banner sensitive informations leak test', + 'starttls-offering': 'STARTTLS offering (opportunistic) weak configuration', + 'secure-ciphers': 'SSL/TLS ciphers security weak configuration', + 'tls-key-len': 'Checks private key length of negotiated or offered SSL/TLS cipher suites.', + 'auth-methods-offered': 'Test against unsecure AUTH/X-EXPS PLAIN/LOGIN methods.', + 'auth-over-ssl': 'STARTTLS before AUTH/X-EXPS enforcement weak configuration', + 'vrfy': 'VRFY user enumaration vulnerability test', + 'expn': 'EXPN user enumaration vulnerability test', + 'rcpt-to': 'RCPT TO user enumaration vulnerability test', + 'open-relay': 'Open-Relay misconfiguration test', + 'spf-validation': 'Checks whether SMTP Server has been configured to validate sender\'s SPF or Accepted Domains in case of MS Exchange', + } + + connectionLessTests = ( + 'spf', 'dkim', 'dmarc' + ) + + # 25 - plain text SMTP + # 465 - SMTP over SSL + # 587 - SMTP-AUTH / Submission + commonSmtpPorts = (25, 465, 587, ) + + # Common AUTH X methods with sample Base64 authentication data. + commonSmtpAuthMethods = { + 'PLAIN' : base64.b64encode('\0user\0password'.encode()), + 'LOGIN' : ( + (base64.b64encode('user'.encode()), base64.b64encode('password'.encode())), + ('user@DOMAIN.COM', base64.b64encode('password'.encode())) + ), + 'NTLM' : ( + 'TlRMTVNTUAABAAAABzIAAAYABgArAAAACwALACAAAABXT1JLU1RBVElPTkRPTUFJTg==', + 'TlRMTVNTUAABAAAAB4IIogAAAAAAAAAAAAAAAAAAAAAGAbEdAAAADw==', + ), + 'MD5' : '', + 'DIGEST-MD5' : '', + 'CRAM-MD5' : '', + } + + smtpAuthServices = ('AUTH', 'X-EXPS') + authMethodsNotNeedingStarttls = ('NTLM', 'GSSAPI') + + # Pretend you are the following host: + pretendLocalHostname = config['pretend_client_hostname'] + + maxStarttlsRetries = 5 + + # Source: SSLabs research: + # https://github.com/ssllabs/research/wiki/SSL-and-TLS-Deployment-Best-Practices + secureCipherSuitesList = ( + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'ECDHE-ECDSA-AES128-SHA', + 'ECDHE-ECDSA-AES256-SHA', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-ECDSA-AES256-SHA384', + 'ECDHE-RSA-AES128-GCM-SHA256', + 'ECDHE-RSA-AES256-GCM-SHA384', + 'ECDHE-RSA-AES128-SHA', + 'ECDHE-RSA-AES256-SHA', + 'ECDHE-RSA-AES128-SHA256', + 'ECDHE-RSA-AES256-SHA384', + 'DHE-RSA-AES128-GCM-SHA256', + 'DHE-RSA-AES256-GCM-SHA384', + 'DHE-RSA-AES128-SHA', + 'DHE-RSA-AES256-SHA', + 'DHE-RSA-AES128-SHA256', + 'DHE-RSA-AES256-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384', + 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256', + 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256', + 'TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384', + 'TLS_DHE_RSA_WITH_AES_128_GCM_SHA256', + 'TLS_DHE_RSA_WITH_AES_256_GCM_SHA384', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA256', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA256', + ) + + def __init__(self, + hostname, + port = None, + forceSSL = False, + dkimSelectorsList = None, + userNamesList = None, + openRelayParams = ('', ''), + connect = True, + mailDomain = '' + ): + self.originalHostname = hostname + self.hostname = hostname + self.remoteHostname = self.localHostname = self.domain = self.resolvedIPAddress = '' + self.port = port + self.mailDomain = mailDomain + self.ssl = None if not forceSSL else True + self.forceSSL = forceSSL + self.server = None + self.starttlsFailures = 0 + self.starttlsSucceeded = False + self.dkimSelectorsList = dkimSelectorsList + self.userNamesList = userNamesList + self.availableServices = set() + self.banner = '' + self.connected = False + self.dumpTlsOnce = False + self.connectionErrors = 0 + self.connectionErrorCodes = {} + self.results = {} + self.threads = {} + self.stopEverything = False + self.server_tls_params = {} + self.openRelayParams = openRelayParams + self.spfValidated = False + + if not hostname: + fail('No hostname specified!') + return + + assert config['dns_full'] in ('always', 'on-ip', 'never'), \ + "config['dns_full'] wrongly stated." + + if re.match(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', hostname) and not mailDomain: + spf = SmtpTester.checkIfTestToRun('spf') + dkim = SmtpTester.checkIfTestToRun('dkim') + dmarc = SmtpTester.checkIfTestToRun('dmarc') + + if spf or dkim or dmarc: + out('Server\'s IP specified and no mail domain: SPF/DKIM/DMARC results may be inaccurate.', toOutLine = True) + out('You may want to specify \'--domain\' and repeat those tests for greater confidence.', toOutLine = True) + + self.resolvedIPAddress = hostname + + needsConnection = False + for test in SmtpTester.testsConducted.keys(): + if self.checkIfTestToRun(test) and test not in SmtpTester.connectionLessTests: + needsConnection = True + break + + try: + if needsConnection and connect and not self.connect(): + sys.exit(-1) + except KeyboardInterrupt: + fail('Premature program interruption. Did not even obtained connection.') + sys.exit(-1) + + self.connected = True + if not self.resolveDomainName(): + sys.exit(-1) + + @staticmethod + def getTests(): + return SmtpTester.testsConducted + + def stop(self): + err('Stopping everything.') + config['max_enumerations'] = 0 + self.stopEverything = True + self.disconnect() + + def resolveDomainName(self): + if self.hostname: + resolutionFailed = False + + if re.match('^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$', self.hostname): + resolved = None + try: + resolved = socket.gethostbyaddr(self.hostname) + self.remoteHostname = repr(resolved[0]).replace("'", '') + info('Resolved DNS (A) name: "{}"'.format( + self.remoteHostname + )) + + except socket.herror as e: + dbg('IP address could not be resolved into hostname.') + resolutionFailed = True + else: + try: + resolved = socket.gethostbyname(self.hostname) + info('Resolved IP address / PTR: "{}"'.format( + resolved + )) + self.resolvedIPAddress = resolved + except socket.herror as e: + dbg('DNS name could not be resolved into IP address.') + + matched = None + if self.banner: + matched = re.search(BannerParser.localHostnameRegex, self.banner) + if matched: + self.localHostname = matched.group(1) + info('SMTP banner revealed server name: "{}".'.format( + self.localHostname + )) + + if resolutionFailed and not matched: + fail("Could not obtain server's hostname from neither IP nor banner!") + return False + elif not resolutionFailed and not matched: + info("Resolved IP but could not obtain server's hostname from the banner.") + return True + elif resolutionFailed and matched: + info("It was possible to obtain server's hostname from the banner but not to resolve IP address.") + return True + + return True + + def printDNS(getDNSValidHostname): + def wrapper(self, noRemote = True): + out = getDNSValidHostname(self, noRemote) + if config['smtp_debug']: + dbg('Using hostname: "{}" for DNS query.'.format(out)) + return out + + return wrapper + + @printDNS + def getDNSValidHostname(self, noRemote = False): + if self.localHostname: + return self.localHostname + elif not noRemote and self.remoteHostname: + return self.remoteHostname + else: + return self.hostname + + def getMailDomain(self): + if self.mailDomain: + return self.mailDomain + + hostname = self.getDNSValidHostname(noRemote = True) + return '.'.join(hostname.split('.')[1:]) + + def getAllPossibleDomainNames(self): + allOfThem = [ + self.originalHostname, # 0 + self.hostname, # 1 + self.localHostname, # 2 + self.getMailDomain(), # 3 + self.remoteHostname, # 4 + + # 5. FQDN without first LLD + '.'.join(self.originalHostname.split('.')[1:]) + ] + uniq = set() + ret = [] + + # Workaround for having OrderedSet() alike collection w/o importing such modules + for host in allOfThem: + if host not in uniq: + ret.append(host) + uniq.add(host) + + return ret + + def getDomainsToReviewDNS(self): + if self.mailDomain: + return [self.mailDomain,] + + domainsToReview = [self.originalHostname] + doFullReview = False + ipRex = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' + + if config['dns_full'] == 'always' or \ + (config['dns_full'] == 'on-ip' and re.match(ipRex, self.originalHostname)): + doFullReview = True + + if doFullReview: + domainsToReview = list(filter( + lambda x: not re.match(ipRex, x), + self.getAllPossibleDomainNames() + )) + + # Get only domains, not subdomains. + domainsToReview = set(map( + lambda x: '.'.join(x.split('.')[-2:]), + domainsToReview + )) + + out = list(filter(None, domainsToReview)) + out = [x.replace('"', '').replace("'", "") for x in out] + return out + + def disconnect(self): + if self.server: + try: + self.server.quit() + del self.server + self.server = None + time.sleep(0.5) + except: + pass + + def connect(self, quiet = False, sayHello = False): + ret = False + noBannerPreviously = self.banner == '' + + if self.stopEverything: + return False + + self.disconnect() + + if self.port == None: + ret = self.tryToConnectOnDifferentPorts(quiet) + else: + ret = self.reconnect(quiet) + + if noBannerPreviously and self.banner: + _out('SMTP banner: "{}"'.format(self.banner), True, colors.fg.pink) + + if ret and sayHello: + dbg('Saying HELO/EHLO to the server...') + out = self.sendcmd('EHLO ' + SmtpTester.pretendLocalHostname) + dbg('Server responded to HELO/EHLO with: {}'.format(out)) + + if out[0]: + self.parseHelpOutputAndUpdateServicesList(out[1].decode()) + else: + err('Could not obtain response to EHLO/HELO. Fatal error.', toOutLine = True) + sys.exit(-1) + + return ret + + def connectSocket(self, port, ssl, sayHello = True): + if ssl: + self.server = smtplib.SMTP_SSL( + local_hostname = SmtpTester.pretendLocalHostname, + timeout = config['timeout'] + ) + else: + self.server = smtplib.SMTP( + local_hostname = SmtpTester.pretendLocalHostname, + timeout = config['timeout'] + ) + + if config['smtp_debug']: + self.server.set_debuglevel(9) + + if config['delay'] > 0.0: + time.sleep(config['delay']) + + out = self.server.connect(self.hostname, port) + if out[0] in (220, 250, ): + dbg('Connected over {} to {}:{}'.format( + 'SSL' if ssl else 'Non-SSL', self.hostname, port + )) + + self.banner = out[1].decode() + self.port = port + self.ssl = ssl + + if ssl: + self.performedStarttls = True + self.server_tls_params = { + 'cipher' : self.server.sock.cipher(), + 'version': self.server.sock.version(), + 'shared_ciphers': self.server.sock.shared_ciphers(), + 'compression': self.server.sock.compression(), + 'DER_peercert': self.server.sock.getpeercert(True), + 'selected_alpn_protocol': self.server.sock.selected_alpn_protocol(), + 'selected_npn_protocol': self.server.sock.selected_npn_protocol(), + } + + if sayHello: + dbg('Saying HELO/EHLO to the server...') + out = self.sendcmd('EHLO ' + SmtpTester.pretendLocalHostname) + dbg('Server responded to HELO/EHLO with: {}'.format(out)) + + self.parseHelpOutputAndUpdateServicesList(self.banner) + else: + if out[0] not in self.connectionErrorCodes.keys(): + self.connectionErrorCodes[out[0]] = 0 + else: + self.connectionErrorCodes[out[0]] += 1 + + if out[0] == 421: + # 421 - Too many connections error + pass + + elif out[0] == 450: + # 450 - 4.3.2 try again later + if self.connectionErrorCodes[out[0]] > 5: + err("We have sent too many connection requests and were temporarily blocked.\nSorry. Try again later.", toOutLine = True) + sys.exit(-1) + else: + fail('Waiting 30s for server to cool down after our flooding...') + time.sleep(30) + + elif out[0] == 554: + # 554 - 5.7.1 no reverse DNS + out = False if self.connectionErrors > 0 else True + err('Our host\'s IP does not have reverse DNS records - what makes SMTP server reject us.', toOutLine = out) + if self.connectionErrors > 5: + err('Could not make the SMTP server, ccept us without reverse DNS record.', toOutLine = True) + sys.exit(-1) + else: + err('Unexpected response after connection, from {}:{}:\n\tCode: {}, Message: {}.'.format( + self.hostname, port, out[0], out[1] + )) + dbg('-> Got response: {}'.format(out)) + + self.connectionErrors += 1 + if self.connectionErrors > 20: + err('Could not connect to the SMTP server!') + sys.exit(-1) + + return out + + def tryToConnectOnSSLandNot(self, port): + try: + # Try connecting over Non-SSL socket + if self.forceSSL: + raise Exception('forced ssl') + + dbg('Trying non-SSL over port: {}'.format(port)) + self.connectSocket(port, False) + return True + + except Exception as e: + # Try connecting over SSL socket + dbg('Exception occured: "{}"'.format(str(e))) + try: + dbg('Trying SSL over port: {}'.format(port)) + self.connectSocket(port, True) + + self.starttlsSucceeded = True + return True + + except Exception as e: + dbg('Both non-SSL and SSL connections failed: "{}"'.format(str(e))) + + return False + + def tryToConnectOnDifferentPorts(self, quiet): + # + # No previous connection. + # Enumerate common SMTP ports and find opened one. + # + succeeded = False + + for port in SmtpTester.commonSmtpPorts: + if self.stopEverything: break + if self.tryToConnectOnSSLandNot(port): + succeeded = True + break + + if not quiet: + if not succeeded: + err('Could not connect to the SMTP server!') + else: + ok('Connected to the server over port: {}, SSL: {}'.format( + self.port, self.ssl + ), toOutLine = True) + + return succeeded + + def reconnect(self, quiet, sayHello = True): + # + # The script has previously connected or knows what port to choose. + # + multiplier = 0 + + for i in range(4): + try: + out = self.connectSocket(self.port, self.ssl, sayHello = sayHello) + if out[0] == 421: + multiplier += 1 + delay = multiplier * config['too_many_connections_delay'] + + info('Awaiting {} secs for server to close some of our connections...'.format( + delay + )) + time.sleep(delay) + continue + else: + dbg('Reconnection succeeded ({})'.format(out)) + return True + + except (socket.gaierror, + socket.timeout, + smtplib.SMTPServerDisconnected, + ConnectionResetError) as e: + dbg('Reconnection failed ({}/3): "{}"'.format(i, str(e))) + + dbg('Server could not reconnect after it unexpectedly closed socket.') + + return False + + def setSocketTimeout(self, timeout = config['timeout']): + try: + self.server.sock.settimeout(timeout) + + except (AttributeError, OSError): + dbg('Socket lost somehow. Reconnecting...') + + if self.connect(True): + try: + self.server.sock.settimeout(timeout) + except (AttributeError, OSError): pass + else: + dbg('FAILED: Could not reconnect to set socket timeout.') + + + def processOutput(sendcmd): + def wrapper(self, command, nowrap = False): + out = sendcmd(self, command, nowrap) + + if nowrap: + return out + + if out and (out[0] == 530 and b'STARTTLS' in out[1]): + if self.starttlsFailures >= SmtpTester.maxStarttlsRetries: + dbg('Already tried STARTTLS and it have failed too many times.') + return (False, False) + + dbg('STARTTLS reconnection after wrapping command ({})...'.format(command)) + + if not self.performStarttls(): + dbg('STARTTLS wrapping failed.') + return (False, 'Failure') + + dbg('Wrapping succeeded. Retrying command "{}" after STARTTLS.'.format( + command + )) + + return sendcmd(self, command) + + elif out and (out[0] == 421): + # 'Exceeded bad SMTP command limit, disconnecting.' + dbg('Reconnecting due to exceeded number of SMTP connections...') + if self.connect(quiet = True): + return sendcmd(self, command) + else: + dbg('Could not reconnect after exceeded number of connections!') + return (False, False) + + self.checkIfSpfEnforced(out) + + return out + return wrapper + + def performStarttls(self, sendEhlo = True): + ret = True + + if self.ssl == True: + dbg('The connection is already carried through SSL Socket.') + return True + + if self.starttlsFailures > SmtpTester.maxStarttlsRetries: + fail('Giving up on STARTTLS. There were too many failures...') + return False + + out = self.sendcmd('STARTTLS') + if out[0] == 220: + dbg('STARTTLS engaged. Wrapping socket around SSL layer.') + + context = ssl.create_default_context() + + # Allow unsecure ciphers like SSLv2 and SSLv3 + context.options &= ~(ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + if self.server and self.server.sock: + self.setSocketTimeout(5 * config['timeout']) + + try: + newsock = context.wrap_socket( + self.server.sock, + server_hostname = SmtpTester.pretendLocalHostname + ) + + # Re-initializing manually the smtplib instance + self.server.sock = newsock + self.server.file = None + self.server.helo_resp = None + self.server.ehlo_resp = None + self.server.esmtp_features = {} + self.server.does_esmtp = 0 + + self.starttlsSucceeded = True + + self.server_tls_params = { + 'cipher' : newsock.cipher(), + 'version': newsock.version(), + 'shared_ciphers': newsock.shared_ciphers(), + 'compression': newsock.compression(), + 'DER_peercert': newsock.getpeercert(True), + 'selected_alpn_protocol': newsock.selected_alpn_protocol(), + 'selected_npn_protocol': newsock.selected_npn_protocol(), + } + + dbg('Connected to the SMTP Server via SSL/TLS.') + if not self.dumpTlsOnce: + dbg('SSL Socket parameters:\n{}'.format(pprint.pformat(self.server_tls_params))) + self.dumpTlsOnce = True + + if sendEhlo: + dbg('Sending EHLO after STARTTLS...') + out = self.sendcmd('EHLO ' + SmtpTester.pretendLocalHostname) + if out[0]: + dbg('EHLO after STARTTLS returned: {}'.format(out)) + else: + err('EHLO after STARTTLS failed: {}'.format(out)) + + except (socket.timeout, ConnectionResetError) as e: + err('SSL Handshake timed-out (Firewall filtering?). Fall back to plain channel.') + dbg('STARTTLS exception: "{}"'.format(str(e))) + + self.starttlsFailures += 1 + if not self.connect(quiet = True, sayHello = False): + ret = False + + self.setSocketTimeout() + elif out[0] == 500: + info('The server is not offering STARTTLS.') + else: + fail('The server has not reacted for STARTTLS: ({}). Try increasing timeout.'.format(str(out))) + + return ret + + @processOutput + def sendcmd(self, command, nowrap = False): + out = (False, False) + + dbg('Sending command: "{}"'.format(command)) + self.setSocketTimeout(3 * config['timeout']) + + for j in range(3): + try: + if config['delay'] > 0.0: + time.sleep(config['delay']) + + out = self.server.docmd(command) + dbg('Command resulted with: {}.'.format(out)) + + if out[0] in (503,) and b'hello first' in out[1].lower(): + # 503: 5.5.2 Send hello first + + dbg('Ok, ok - sending Hello first...') + + if self.connect(quiet = True, sayHello = True): + dbg('Ok, reconnected and said hello. Trying again...') + else: + dbg('Failed reconnecting and saying hello.') + return (False, False) + continue + break + + except (smtplib.SMTPServerDisconnected, socket.timeout) as e: + if str(e) == 'Connection unexpectedly closed': + # smtplib.getreply() returns this error in case of reading empty line. + #dbg('Server returned empty line / did not return anything.') + #return (False, '') + + dbg('Connection unexpectedly closed: {}'.format(str(e))) + + if self.connect(quiet = True, sayHello = False): + continue + else: + dbg('Server has disconnected ({}).'.format(str(e))) + if 'connect' in str(e).lower(): + dbg('Attempting to reconnect and resend command...') + if self.connect(quiet = True, sayHello = False): + continue + else: + break + + if not out[0]: + dbg('Could not reconnect after failure.') + + self.setSocketTimeout() + return out[0], out[1] + + def parseHelpOutput(self, output): + if len(output.split('\n')) >= 2: + + output = output.replace('\t', '\n') + dbg('Parsing potential HELP output: "{}"'.format( + output.replace('\n', '\\n') + )) + + helpMultilineCommandsRegexes = ( + r'(?:\\n)([a-zA-Z- 0-9]{3,})', + r'(?:\n)([a-zA-Z- 0-9]{3,})' + ) + + for rex in helpMultilineCommandsRegexes: + out = re.findall(rex, output) + if len([x for x in out if x != None]) > 0: + return out + else: + return '' + + def parseHelpOutputAndUpdateServicesList(self, out): + outlines = self.parseHelpOutput(out) + if outlines: + self.availableServices.update(set(map(lambda x: x.strip(), outlines))) + outlines = set() + + dbg('SMTP available services: {}'.format(pprint.pformat(self.availableServices))) + return True + + return False + + def getAvailableServices(self): + dbg('Acquiring list of available services...') + + out = False + outlines = set() + + if self.banner: + if self.parseHelpOutputAndUpdateServicesList(self.banner): + return True + + out = self.sendcmd('EHLO ' + SmtpTester.pretendLocalHostname) + if out[0]: + dbg('EHLO returned: {}'.format(out)) + if self.parseHelpOutputAndUpdateServicesList(out[1].decode()): + return True + + + # We are about to provoke SMTP server sending us the HELP listing in result + # of sending one of below collected list of commands. + for cmd in ('HELP', '\r\nHELP', 'TEST'): + try: + out = self.sendcmd(cmd) + if out[0] in (214, 220, 250): + ret = out[1].decode() + + if self.parseHelpOutputAndUpdateServicesList(ret): + return True + + outlines = self.parseHelpOutput(ret) + if len(outlines) < 2: + for line in ret.split('\\n'): + m = re.findall(r'([A-Z-]{3,})', line) + pos = ret.find(line) + if m and (pos > 0 and ret[pos-1] == '\n'): + dbg('Following line was found by 2nd method HELP parsing: "{}"'.format( + line + )) + + outlines = m + break + if outlines: + break + + except Exception as e: + continue + + if outlines: + self.availableServices.update(set(map(lambda x: x.strip(), outlines))) + + dbg('SMTP available services: {}'.format(pprint.pformat(self.availableServices))) + return True + + info('Could not collect available services list (HELP)') + return False + + def getAuthMethods(self, service): + if not self.availableServices: + self.getAvailableServices() + + if not self.availableServices: + fail('UNKNOWN: Could not collect available SMTP services') + return None + + authMethods = set() + authMethodsList = list(filter( + lambda x: x.lower().startswith(service.lower()) and x.lower() != service.lower(), + self.availableServices + )) + + # Conform following HELP format: "250-AUTH=DIGEST-MD5 CRAM-MD5 PLAIN LOGIN" + if authMethodsList: + dbg('List of candidates for {} methods: {}'.format(service, authMethodsList)) + + for auth in authMethodsList: + auth = auth.strip().replace('=', ' ') + auth = auth.replace(service + ' ', '') + + if auth.count(' ') > 0: + s = set(['{}'.format(a) for a in auth.split(' ') \ + if a.lower() != service.lower()]) + authMethods.update(s) + else: + authMethods.add(auth) + else: + dbg('The server does not offer any {} methods.'.format(service)) + + if authMethods: + dbg('List of {} methods to test: {}'.format(service, authMethods)) + + return authMethods + + @staticmethod + def ifMessageLike(out, codes = None, keywords = None, keywordsAtLeast = 0): + codeCheck = False + keywordCheck = False + + if not codes and not keywords: + return False + + keywords2 = [k.lower() for k in keywords] + msg = out[1].decode() + found = 0 + for word in msg.split(' '): + if word.lower() in keywords2: + found += 1 + + if codes != None and len(codes) > 0: + codeCheck = out[0] in codes + else: + codeCheck = True + + if keywords != None and len(keywords) > 0: + if keywordsAtLeast == 0: + keywordCheck = found == len(keywords) + else: + keywordCheck = found >= keywordsAtLeast + else: + keywordCheck = True + + return codeCheck and keywordCheck + + @staticmethod + def checkIfTestToRun(test): + if (test in config['tests_to_skip']): + return False + + if ('all' in config['tests_to_carry'] or test in config['tests_to_carry']): + return True + else: + if config['smtp_debug']: + dbg('Test: "{}" being skipped as it was marked as disabled.'.format(test)) + return False + + def runTests(self): + dkimTestThread = None + if SmtpTester.checkIfTestToRun('dkim'): + dkimTestThread = self.dkimTestThread() + + results = [ + ('spf', None), + ('dkim', None), + ('dmarc', None), + ('banner-contents', self.bannerSnitch), + ('starttls-offering', self.starttlsOffer), + ('secure-ciphers', self.testSecureCiphers), + ('tls-key-len', self.testSSLKeyLen), + ('auth-methods-offered', self.testSecureAuthMethods), + ('auth-over-ssl', self.testSSLAuthEnforcement), + ('vrfy', self.vrfyTest), + ('expn', self.expnTest), + ('rcpt-to', self.rcptToTests), + ('open-relay', self.openRelayTest), + ('spf-validation', self.spfValidationTest), + ] + + if SmtpTester.checkIfTestToRun('spf'): + self.results['spf'] = self.spfTest() + + once = True + for res in results: + test, func = res + + assert test in SmtpTester.testsConducted.keys(), \ + "The test: '{}' has not been added to SmtpTester.testsConducted!".format(test) + + if self.stopEverything: break + if not SmtpTester.checkIfTestToRun(test): + continue + + if not func: continue + + if config['delay'] > 0.0: + time.sleep(config['delay']) + + if once: + if not self.connected and not self.connect(): + sys.exit(-1) + else: + self.connected = True + once = False + + dbg('Starting test: "{}"'.format(test)) + self.results[test] = func() + + if SmtpTester.checkIfTestToRun('auth-over-ssl') and \ + test == 'auth-over-ssl': + dbg('Reconnecting after SSL AUth enforcement tests.') + if self.stopEverything: break + self.reconnect(quiet = True) + + testDmarc = False + if SmtpTester.checkIfTestToRun('dkim') and \ + SmtpTester.checkIfTestToRun('spf') and \ + SmtpTester.checkIfTestToRun('dmarc'): + testDmarc = True + self.results['dmarc'] = None + + if SmtpTester.checkIfTestToRun('dmarc') and not testDmarc: + err('To test DMARC following tests must be run also: SPF, DKIM.') + + if self.threads or dkimTestThread: + if not self.stopEverything: + info("Awaiting for threads ({}) to finish. Pressing CTRL-C will interrupt lookup process.".format( + ', '.join(self.threads.keys()) + ), toOutLine = True) + + try: + while (self.threads and all(self.threads.values())): + if self.stopEverything: + break + time.sleep(2) + if config['smtp_debug']: + dbg('Threads wait loop has finished iterating.') + + if testDmarc: + self.results['dmarc'] = self.evaluateDmarc( + self.dmarcTest(), + self.results['spf'], + self.results['dkim'] + ) + + except KeyboardInterrupt: + err('User has interrupted threads wait loop. Returning results w/o DKIM and DMARC.') + else: + if testDmarc: + self.results['dmarc'] = self.evaluateDmarc( + self.dmarcTest(), + self.results['spf'], + self.results['dkim'] + ) + + # Translate those True and False to 'Secure' and 'Unsecure' + self.results.update(SmtpTester.translateResultsDict(self.results)) + + indent = 2 + return json.dumps(self.results, indent = indent) + + def runAttacks(self): + attacksToBeLaunched = { + 'vrfy': self.vrfyTest, + 'expn': self.expnTest, + 'rcpt-to': self.rcptToTests, + } + + results = [] + + info('Attacks will be launched against domain: @{}'.format(self.getMailDomain()), toOutLine = True) + info('If that\'s not correct, specify another one with \'--domain\'') + + for attack, func in attacksToBeLaunched.items(): + if not SmtpTester.checkIfTestToRun(attack): + continue + + info('Launching attack: {} enumeration.'.format(attack), toOutLine = True) + out = func(attackMode = True) + + if out and isinstance(out, list): + info('Attack result: {} users found.'.format(len(out)), toOutLine = True) + results.extend(out) + elif out: + info('Attack most likely failed {}, result: {}'.format(attack, str(out)), toOutLine = True) + else: + fail('Attack {} failed.'.format(attack), toOutLine = True) + + return list(set(results)) + + @staticmethod + def translateResultsDict(results): + for k, v in results.items(): + if isinstance(v, dict): + results[k] = SmtpTester.translateResultsDict(v) + else: + if v == True: results[k] = 'secure' + elif v == False:results[k] = 'unsecure' + else: results[k] = 'unknown' + + return results + + # + # =========================== + # BANNER REVEALING SENSITIVIE INFORMATIONS TEST + # + def bannerSnitch(self): + if not self.banner: + info('Cannot process server\'s banner - as it was not possible to obtain one.') + + parser = BannerParser() + return parser.parseBanner(self.banner) + + + # + # =========================== + # SPF TESTS + # + def enumerateSpfRecords(self, domain): + records = set() + numberOfSpfRecords = 0 + once = True + + resv = resolver.Resolver() + resv.timeout = config['timeout'] / 2.0 + + info('Queried domain for SPF: "{}"'.format(domain)) + + try: + for txt in resv.query(domain, 'TXT'): + txt = txt.to_text().replace('"', '') + if txt.lower().startswith('v=spf') and txt not in records: + numberOfSpfRecords += 1 + records.add(txt) + + if numberOfSpfRecords > 1 and once: + err('Found more than one SPF record. One should stick to only one SPF record.') + once = False + + except (resolver.NoAnswer, + resolver.NXDOMAIN, + name.EmptyLabel, + resolver.NoNameservers) as e: + pass + + return records + + def spfTest(self): + records = {} + txts = [] + for domain in self.getDomainsToReviewDNS(): + for txt in self.enumerateSpfRecords(domain): + if txt not in records.keys(): + txts.append(txt) + records[txt] = self.processSpf(txt) + + success = True + if len(records): + results = {} + for txt, rec in records.items(): + origTxt, results = rec + if isinstance(results, dict) and all(results.values()): + pass + elif isinstance(results, bool) and results: + pass + else: + fail('UNSECURE: SPF record exists, but not passed tests.') + _out('\tRecord: ("{}")'.format(origTxt)) + return results + + ok('SECURE: SPF test passed.') + _out('\tRecords: ("{}")'.format('", "'.join(txts))) + if config['always_unfolded_results']: + return results + else: + fail('UNSECURE: SPF record is missing.') + success = False + + return success + + def processSpf(self, txt, recurse = 0): + ''' + Code processing, parsing and evaluating SPF record's contents. + ''' + maxRecursion = 3 + info('Found SPF record: "{}"'.format(txt)) + + + if recurse > maxRecursion: + err('Too many SPF redirects, breaking recursion.') + return None + + pos = txt.lower().find('redirect=') + if pos > 0: + for tok in txt.lower().split(' '): + k, v = tok.split('=') + if v.endswith(';'): v = v[:-1] + if k == 'redirect': + info('SPF record redirects to: "{}". Following...'.format(v)) + for txt in self.enumerateSpfRecords(v): + return (txt, self.processSpf(txt, recurse + 1)) + + spf = SpfParser() + return (txt, spf.process(txt)) + + + # + # =========================== + # DKIM TESTS + # + @staticmethod + def _job(jid, domains, data, syncDkimThreadsStop, results, totalTested, dkimQueryDelay): + try: + if (results and sum([x != None for x in results]) > 0) or \ + SmtpTester.stopCondition(totalTested, syncDkimThreadsStop): + return + results.append(SmtpTester.dkimTestWorker(domains, data, syncDkimThreadsStop, dkimQueryDelay, False, totalTested)) + except (ConnectionResetError, FileNotFoundError, BrokenPipeError, EOFError, KeyboardInterrupt): + pass + + def dkimTestThread(self): + self.results['dkim'] = None + + if not config['threads']: + return self.dkimTest() + + poolNum = config['parallel_processes'] + t = threading.Thread(target = self._dkimTestThread, args = (poolNum, )) + t.daemon = True + t.start() + return t + + def stopCondition(totalTested, syncDkimThreadsStop): + if syncDkimThreadsStop.value: + return True + + if config['max_enumerations'] > 0 and \ + totalTested.value >= config['max_enumerations']: + return True + + return False + + def _dkimTestThread(self, poolNum): + def _chunks(l, n): + return [l[i:i+n] for i in range(0, len(l), n)] + + self.threads['dkim'] = True + dbg('Launched DKIM test in a new thread running with {} workers.'.format(poolNum)) + + selectors = self.generateListOfCommonDKIMSelectors() + info('Selectors to review: {}'.format(len(selectors))) + + jobs = [] + mgr = multiprocessing.Manager() + totalTested = multiprocessing.Value('i', 0) + syncDkimThreadsStop = multiprocessing.Value('i', 0) + dkimQueryDelay = multiprocessing.Value('d', 0.0) + + results = mgr.list() + slice = _chunks(selectors, len(selectors) // poolNum) + domains = self.getDomainsToReviewDNS() + + try: + for i, s in enumerate(slice): + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop) or self.stopEverything: break + proc = multiprocessing.Process( + target = SmtpTester._job, + args = (i, domains, s, syncDkimThreadsStop, results, totalTested, dkimQueryDelay) + ) + proc.start() + jobs.append(proc) + + num = len(domains) * len(selectors) + totals = [] + lastTotal = 0 + + maxDelay = 4.0 + delayStep = 0.5 + smallStepToDelay = 50 + + while totalTested.value < len(selectors) - 50: + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop) or self.stopEverything: break + + totals.append(totalTested.value) + js = '(jobs running: {})'.format(len(jobs)) + SmtpTester.dkimProgress(totalTested.value, selectors, num, syncDkimThreadsStop, True, js, dkimQueryDelay.value) + + if config['delay_dkim_queries']: + if totalTested.value - lastTotal < smallStepToDelay and dkimQueryDelay.value < maxDelay: + dkimQueryDelay.value += delayStep + elif totalTested.value - lastTotal >= smallStepToDelay and dkimQueryDelay.value > 0: + dkimQueryDelay.value -= delayStep + + lastTotal = totalTested.value + + # Wait 5*2 seconds for another DKIM progress message + for i in range(15): + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop) or self.stopEverything: break + time.sleep(2) + + if totals.count(totalTested.value) > 1: + syncDkimThreadsStop.value = 1 + err('Stopping DKIM thread cause it seems to have stuck.', toOutLine = True) + break + + info('DKIM selectors enumerated. Stopping jobs...') + for j in jobs: + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop) or self.stopEverything: break + for i in range(30): + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop) or self.stopEverything: break + j.join(2 * 60 / 30) + except (KeyboardInterrupt, BrokenPipeError): + pass + + try: + if results and sum([x != None for x in results]) > 0: + dbg('DKIM thread found valid selector.') + self.results['dkim'] = [x for x in results if x != None][0] + else: + fail('UNSECURE: DKIM record is most likely missing, as proved after {} tries.'.format( + totalTested.value + )) + except FileNotFoundError: + pass + + self.threads['dkim'] = False + return self.results['dkim'] + + def dkimTest(self, selectors = None): + if not selectors: + selectors = self.generateListOfCommonDKIMSelectors() + + ret = self.dkimTestWorker(self.getDomainsToReviewDNS(), selectors) + self.results['dkim'] = ret + return ret + + @staticmethod + def dkimProgress(total, selectors, num, syncDkimThreadsStop, unconditional = False, extra = None, dkimQueryDelay = 0): + if total < 100 or SmtpTester.stopCondition(total, syncDkimThreadsStop): + return + + progressStr = 'DKIM: Checked {:02.0f}% ({:05}/{:05}) selectors. Query delay: {:0.2f} sec.'.format( + 100.0 * (float(total) / float(len(selectors))), + total, + len(selectors), + dkimQueryDelay + ) + + if extra: progressStr += ' ' + extra + progressStr += '...' + + N = 10 + if (not config['debug'] and (unconditional or ((total % int(num // N)) == 0))): + info(progressStr, toOutLine = True) + + elif (config['debug'] and (unconditional or (total % 250 == 0))): + if config['threads']: + dbg(progressStr) + else: + sys.stderr.write(progressStr + '\r') + sys.stderr.flush() + + @staticmethod + def dkimTestWorker(domainsToReview, selectors, syncDkimThreadsStop, dkimQueryDelay = None, reportProgress = True, totalTested = None): + ret = False + stopIt = False + total = 0 + + maxTimeoutsToAccept = int(0.3 * len(selectors)) + timeoutsSoFar = 0 + + if SmtpTester.stopCondition(totalTested, syncDkimThreadsStop): return None + + num = len(domainsToReview) * len(selectors) + if reportProgress: + info('Checking around {} selectors. Please wait - this will take a while.'.format( + num + )) + + resv = resolver.Resolver() + resv.timeout = 1.2 + + for domain in domainsToReview: + if stopIt or SmtpTester.stopCondition(totalTested, syncDkimThreadsStop): break + if reportProgress: + info('Enumerating selectors for domain: {}...'.format(domain)) + + for sel in selectors: + if stopIt or SmtpTester.stopCondition(totalTested, syncDkimThreadsStop): break + dkimRecord = '{}._domainkey.{}'.format(sel, domain) + total += 1 + if totalTested: totalTested.value += 1 + + if reportProgress: + SmtpTester.dkimProgress(total, selectors, num) + try: + if not dkimRecord: continue + if dkimQueryDelay and dkimQueryDelay.value > 0: + time.sleep(dkimQueryDelay.value) + + for txt in resv.query(dkimRecord, 'TXT'): + if stopIt or SmtpTester.stopCondition(totalTested, syncDkimThreadsStop): break + + txt = txt.to_text().replace('"', '') + if config['max_enumerations'] > -1 and \ + total >= config['max_enumerations']: + stopIt = True + break + + if txt.lower().startswith('v=dkim'): + info('DKIM found at selector: "{}"'.format(sel)) + ret = SmtpTester.processDkim(txt) + + if ret: + ok('SECURE: DKIM test passed.') + else: + fail('UNSECURE: DKIM test not passed') + + syncDkimThreadsStop.value = 1 + return ret + except (exception.Timeout) as e: + if timeoutsSoFar >= maxTimeoutsToAccept: + err('DNS enumeration failed: Maximum number of timeouts from DNS server reached.') + break + + timeoutsSoFar += 1 + + except (AttributeError, + resolver.NoAnswer, + resolver.NXDOMAIN, + resolver.NoNameservers, + name.EmptyLabel, + name.NameTooLong) as e: + continue + + except KeyboardInterrupt: + dbg('User has interrupted DKIM selectors enumeration test.') + return None + + if reportProgress: + if total >= num: + fail('UNSECURE: DKIM record is most likely missing. Exhausted list of selectors.') + else: + fail('UNSECURE: DKIM record is most likely missing. Process interrupted ({}/{}).'.format( + total, num + )) + + return None + + @staticmethod + def processDkim(txt): + ''' + Code processing, parsing and evaluating DKIM record's contents. + ''' + + dkim = DkimParser() + return dkim.process(txt) + + + def generateListOfCommonDKIMSelectors(self): + ''' + Routine responsible for generating list of DKIM selectors based on + various permutations of the input words (like common DKIM selectors or other likely + selector names). + ''' + + months = ('styczen', 'luty', 'marzec', 'kwiecien', 'maj', 'czerwiec', 'lipiec', + 'sierpien', 'wrzesien', 'pazdziernik', 'listopad', 'grudzien', 'january', + 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'october', + 'november', 'september', 'december', 'enero', 'febrero', 'marzo', 'abril', + 'mayo', 'junio', 'agosto', 'septiembre', 'octubre', 'noviembre', 'diciembre', + 'januar', 'februar', 'marz', 'mai', 'juni', 'juli', 'oktober', 'dezember') + + domains = self.domain.split('.') + words = ('default', 'dkim', 'dk', 'domain', 'domainkey', 'test', 'selector', + 'mail', 'smtp', 'dns', 'key', 'sign', 'signing', 'auth', 'sel', 'google', + 'shopify.com' + ) + tuple(domains) + config['uncommon_words'] + + selectors = [] + + # Set 0: All collected domains + selectors.extend(self.getAllPossibleDomainNames()) + + # Set 1: User-defined + try: + if self.dkimSelectorsList: + with open(self.dkimSelectorsList, 'r') as f: + for l in f.readlines(): + selectors.append(l.strip()) + except IOError: + err('Could not open DKIM selectors list file.') + sys.exit(-1) + + # Set 2: Common words permutations + for w in words: + selectors.append('{}'.format(w)) + selectors.append('_{}'.format(w)) + selectors.append('{}_'.format(w)) + + for i in range(0, 11): + if not config['dkim_full_enumeration']: + break + + selectors.append('{}{}'.format(w, i)) + selectors.append('{}{:02d}'.format(w, i)) + + + if config['dkim_full_enumeration']: + nowTime = datetime.datetime.now() + currYear = nowTime.year + yearsRange = range(currYear - 2, currYear + 1) + + # Set 3: Year-Month text permutations + for m in months: + for yr in yearsRange: + ms = ( + m[:3], + m, + '%d' % yr, + '%s%d' % (m, yr), + '%s%d' % (m[:3], yr), + '%s%d' % (m, (yr - 2000)), + '%s%d' % (m[:3], (yr - 2000)), + '%d%s' % (yr, m), + '%d%s' % (yr, m[:3]), + '%d%s' % ((yr - 2000), m), + '%d%s' % ((yr - 2000), m[:3]), + ) + selectors.extend(ms) + + currTimeFormats = ( + '%Y%m%d', + '%Y%d%m', + '%d%m%Y', + '%m%d%Y', + '%Y', + '%m', + '%Y%m', + '%m%Y' + ) + + # Set 4: Year-Month-Day date permutations + for f in currTimeFormats: + selectors.append(nowTime.strftime(f)) + + for yr in yearsRange: + for j in range(1,13): + for k in range(1, 32): + try: + t = datetime.datetime(yr, j, k) + selectors.append(t.strftime(f)) + selectors.append('%d' % (time.mktime(t.timetuple()))) + except: + pass + + dbg('Generated: {} selectors to review.'.format(len(selectors))) + return selectors + + + # + # =========================== + # DMARC TESTS + # + + def evaluateDmarc(self, dmarc, spf, dkim): + lack = [] + if not spf: lack.append('SPF') + if not dkim: lack.append('DKIM') + if dmarc and lack: + fail('UNSECURE: DMARC cannot work without {} being set.'.format(', '.join(lack))) + # Return anyway... + #return False + + return dmarc + + def dmarcTest(self): + ret = False + found = False + + records = [] + + for domain in self.getDomainsToReviewDNS(): + domain = '_dmarc.' + domain + try: + for txt in resolver.query(domain, 'TXT'): + txt = txt.to_text().replace('"', '') + if txt.lower().startswith('v=dmarc'): + info('Found DMARC record: "{}"'.format(txt)) + ret = self.processDmarc(txt) + records.append(txt) + found = True + break + + except (resolver.NXDOMAIN, + resolver.NoAnswer, + resolver.NoNameservers): + pass + + if ret: break + + if ret: + ok('SECURE: DMARC test passed.') + _out('\tRecords: "{}"'.format('", "'.join(records))) + + elif found and not ret: + fail('UNSECURE: DMARC tets not passed.') + else: + fail('UNSECURE: DMARC record is missing.') + + return ret + + def processDmarc(self, record): + parser = DmarcParser() + return parser.processDmarc(record) + + def generateUserNamesList(self, permute = True): + users = [] + + common_ones = ('all', 'admin', 'mail', 'test', 'guest', 'root', 'spam', 'catchall', + 'abuse', 'contact', 'administrator', 'email', 'help', 'post', 'postmaster', + 'rekrutacja', 'recruitment', 'pomoc', 'ayuda', 'exchange', 'relay', + 'hilfe', 'nobody', 'anonymous', 'security', 'press', 'media', 'user', + 'foo', 'robot', 'av', 'antivirus', 'gate', 'gateway', 'job', 'praca', + 'it', 'auto', 'account', 'hr', 'db', 'web') + + if not permute: + return common_ones + + words = common_ones + config['uncommon_words'] + + # Set 1: User-defined + try: + if self.userNamesList: + with open(self.userNamesList, 'r') as f: + for l in f.readlines(): + users.append(l.strip()) + + info('Read {} lines from users list.'.format(len(users)), toOutLine = True) + return users + + except IOError: + err('Could not open user names list file.', toOutLine = True) + sys.exit(-1) + + # Set 2: Common words permutations + for w in words: + users.append('{}'.format(w)) + + for i in range(0, 11): + users.append('{}{}'.format(w, i)) + users.append('{}{:02d}'.format(w, i)) + + dbg('Generated list of {} user names to test.'.format(len(users))) + + return users + + # + # =========================== + # EXPN TESTS + # + def expnTest(self, attackMode = False): + i = 0 + maxFailures = 64 + failures = 0 + + secureConfigurationCodes = (252, 500, 502) + unsecureConfigurationCodes = (250, 251, 550, 551, 553) + + userNamesList = set(self.generateUserNamesList(permute = attackMode)) + foundUserNames = set() + + info('Attempting EXPN test, be patient - it may take a longer while...') + + try: + for user in userNamesList: + if config['max_enumerations'] > -1 and i >= config['max_enumerations']: + dbg('Max enumerations exceeded accepted limit.') + if not attackMode: return False + else: return list(foundUserNames ) + + if not attackMode and failures >= maxFailures: + err('FAILED: EXPN test failed too many times.') + return None + + out = self.sendcmd('EXPN {}'.format(user)) + + if out[0] in secureConfigurationCodes \ + or (out[0] == 550 and 'access denied' in out[1].lower()): + ok('SECURE: EXPN could not be used for user enumeration.') + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + if not attackMode: return True + else: return list(foundUserNames) + + elif out[0] in unsecureConfigurationCodes: + if not attackMode: + fail('UNSECURE: "EXPN {}": allows user enumeration!'.format( + user + )) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + return False + else: + ok('Found new user: {}@{}'.format(rcptTo, self.getMailDomain()), toOutLine = True) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + foundUserNames.add(rcptTo) + + elif (out[0] == False and out[1] == False) or not out[1]: + info('UNKNOWN: During EXPN test the server disconnected. This might be secure.') + if not attackMode: return None + else: return list(foundUserNames) + + else: + dbg('Other return code: {}'.format(out[0])) + failures += 1 + + i += 1 + + except KeyboardInterrupt: + info('EXPN Attack interrupted.', toOutLine = True) + + if not attackMode: + ok('SECURE: EXPN test succeeded, yielding secure configuration.') + return True + else: + ok('EXPN Attack finished. Found: {} / {}'.format( + len(foundUserNames), + len(userNamesList) + ), toOutLine = True) + return list(foundUserNames) + + + # + # =========================== + # RCPT TO TESTS + # + def rcptToTests(self, attackMode = False): + i = 0 + maxFailures = 256 + failures = 0 + + unsecureConfigurationCodes = (250, ) + secureConfigurationCodes = (530, 553, 550) + + userNamesList = set(self.generateUserNamesList(permute = attackMode)) + foundUserNames = set() + + info('Attempting RCPT TO test, be patient - it takes a longer while...') + + for mailFrom in userNamesList: + if not attackMode and failures >= maxFailures: + err('FAILED: RCPT TO test failed too many times.') + return None + + if config['max_enumerations'] > -1 and i >= config['max_enumerations']: + dbg('Max enumerations exceeded accepted limit.') + if not attackMode: return False + else: return list(foundUserNames ) + + out = self.sendcmd('MAIL FROM: <{}@{}>'.format( + mailFrom, self.getMailDomain() + )) + dbg('MAIL FROM returned: ({})'.format(out)) + + if out and out[0] in (250,): + dbg('Sender ok. Proceeding...') + + elif out[0] in (530, ): + # 530: 5.7.1 Client was not authenticated + ok('SECURE: SMTP server requires prior authentication when using RCPT TO.') + _out('\tReturned: ("{}")'.format(out[1].decode())) + if not attackMode: return True + else: return list(foundUserNames) + + elif (out[0] == 503 and '5.5.1' in out[1] and 'sender' in out[1].lower() and 'specified' in out[1].lower()): + # 503, 5.5.1 Sender already specified + failures += 1 + continue + + elif out[0] in (503, ): + # 503: 5.5.2 Send Hello first + self.connect(quiet = True, sayHello = True) + failures += 1 + continue + + elif (out[0] == False and out[1] == False) or not out[1]: + info('UNKNOWN: During RCPT TO the server has disconnected. This might be secure.') + if not attackMode: return None + else: return list(foundUserNames) + + else: + dbg('Server returned unexpected response in RCPT TO: {}'.format(out)) + failures += 1 + continue + + i = 0 + failures = 0 + + try: + for rcptTo in userNamesList: + if mailFrom == rcptTo: continue + + if attackMode: + perc = float(i) / float(len(userNamesList)) * 100.0 + if i % (len(userNamesList) / 10) == 0 and i > 0: + info('RCPT TO test progress: {:02.2f}% - {:04} / {:04}'.format( + perc, i, len(userNamesList)), toOutLine = True) + + if config['max_enumerations'] > -1 and i >= config['max_enumerations']: + dbg('Max enumerations exceeded accepted limit.') + if not attackMode: return None + else: return list(foundUserNames) + + if not attackMode and failures >= maxFailures: + err('FAILED: RCPT TO test failed too many times.') + return None + + out = self.sendcmd('RCPT TO: <{}@{}>'.format( + rcptTo, self.getMailDomain() + )) + dbg('RCTP TO returned: ({})'.format(out)) + + if out and out[0] in unsecureConfigurationCodes: + if not attackMode: + fail('UNSECURE: "RCPT TO" potentially allows user enumeration: ({}, {})'.format( + out[0], out[1].decode() + )) + return False + elif rcptTo not in foundUserNames: + ok('Found new user: {}@{}'.format(rcptTo, self.getMailDomain()), toOutLine = True) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + foundUserNames.add(rcptTo) + + elif out and out[0] in secureConfigurationCodes: + if SmtpTester.ifMessageLike(out, (550, ), ('user', 'unknown', 'recipient', 'rejected'), 2): + if not attackMode: + info('Warning: RCPT TO may be possible: {} ({})'.format(out[0], out[1].decode())) + + # + # Can't decided, whether error code shall be treated as RCPT TO disabled message or + # as an implication that wrong recipient's address was tried. Therefore, we disable the below + # logic making it try every user name in generated list, until something pops up. + # + #else: + # ok('SECURE: Server disallows user enumeration via RCPT TO method.') + # _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + # if not attackMode: return False + # else: return list(foundUserNames) + + elif (out[0] == False and out[1] == False) or not out[1]: + info('UNKNOWN: During RCPT TO test the server has disconnected. This might be secure.') + if not attackMode: return None + else: return list(foundUserNames) + + else: + dbg('Other return code: {}'.format(out[0])) + failures += 1 + + i += 1 + + if attackMode: + break + + except KeyboardInterrupt: + info('RCPT TO Attack interrupted.', toOutLine = True) + break + + + if not attackMode: + ok('SECURE: RCPT TO test succeeded, yielding secure configuration.') + return True + else: + ok('RCPT TO Attack finished. Found: {} / {}'.format( + len(foundUserNames), + len(userNamesList) + ), toOutLine = True) + return list(foundUserNames) + + + # + # =========================== + # VRFY TESTS + # + def vrfyTest(self, attackMode = False): + i = 0 + maxFailures = 64 + failures = 0 + + unsecureConfigurationCodes = (250, 251, 550, 551, 553) + secureConfigurationCodes = (252, 500, 502, 535) + + userNamesList = set(self.generateUserNamesList(permute = attackMode)) + foundUserNames = set() + + info('Attempting VRFY test, be patient - it may take a longer while...') + + try: + for user in userNamesList: + if config['max_enumerations'] > -1 and i >= config['max_enumerations']: + dbg('Max enumerations exceeded accepted limit.') + if not attackMode: return False + else: return list(foundUserNames) + + if not attackMode and failures >= maxFailures: + dbg('Failures exceeded maximum failures limit.') + return None + + out = self.sendcmd('VRFY {}'.format(user)) + + if out[0] in secureConfigurationCodes \ + or (out[0] == 550 and 'access denied' in out[1].lower()): + comm = '' + if out[0] == 535: + comm = 'unauthenticated ' + ok('SECURE: VRFY disallows {}user enumeration.'.format(comm)) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + + if not attackMode: return True + else: return list(foundUserNames) + + elif out[0] in unsecureConfigurationCodes: + if not attackMode: + fail('UNSECURE: "VRFY {}": allows user enumeration!'.format( + user + )) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + return False + else: + ok('Found new user: {}@{}'.format(rcptTo, self.getMailDomain()), toOutLine = True) + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + foundUserNames.add(rcptTo) + + elif (out[0] == False and out[1] == False) or not out[1]: + info('UNKNOWN: During VRFY test the server has disconnected. This might be secure.') + if not attackMode: return None + else: return list(foundUserNames) + + else: + dbg('Other return code: {}'.format(out[0])) + failures += 1 + + i += 1 + + except KeyboardInterrupt: + info('Attack interrupted.', toOutLine = True) + + if not attackMode: + ok('SECURE: VRFY test succeeded, yielding secure configuration.') + return True + else: + ok('VRFY Attack finished. Found: {} / {}'.format( + len(foundUserNames), + len(userNamesList) + ), toOutLine = True) + return list(foundUserNames) + + + # + # =========================== + # OPEN-RELAY TESTS + # + def openRelayTest(self): + if self.connect(quiet = True, sayHello = True): + results = {} + + internalDomain = self.getMailDomain() + externalDomain = config['smtp_external_domain'] + + ip = '[{}]'.format(self.resolvedIPAddress) + if not self.resolvedIPAddress: + ip = '[{}]'.format(self.originalHostname) + + srvname = self.localHostname + domain = self.originalHostname + + if domain == srvname: + domain = self.getMailDomain() + + dbg('Attempting open relay tests. Using following parameters:\n\tinternalDomain = {}\n\texternalDomain = {}\n\tdomain = {}\n\tsrvname = {}\n\tip = {}'.format( + internalDomain, externalDomain, domain, srvname, ip + )) + + domains = { + 'internal -> internal' : [internalDomain, internalDomain], + 'srvname -> internal' : [srvname, internalDomain], + 'internal -> external' : [internalDomain, externalDomain], + 'external -> internal' : [externalDomain, internalDomain], + 'external -> external' : [externalDomain, externalDomain], + + 'user@localhost -> external' : ['localhost', externalDomain], + + #'empty -> empty' : ['', ''], + 'empty -> internal' : ['', internalDomain], + 'empty -> external' : ['', externalDomain], + 'ip -> internal' : [ip, internalDomain], + 'ip -> to%domain@[ip]' : [ip, '%{}@{}'.format(domain, ip)], + 'ip -> to%domain@srvname': [ip, '%{}@{}'.format(domain, srvname)], + 'ip -> to%domain@[srvname]': [ip, '%{}@[{}]'.format(domain, srvname)], + 'ip -> "to@domain"' : [ip, '"@{}"'.format(domain)], + 'ip -> "to%domain"' : [ip, '"%{}"'.format(domain)], + 'ip -> to@domain@[ip]' : [ip, '@{}@{}'.format(domain, ip)], + 'ip -> to@domain@' : [ip, '@{}@'.format(domain)], + 'ip -> "to@domain"@[ip]': [ip, '"@{}"@{}'.format(domain, ip)], + 'ip -> to@domain@srvname': [ip, '@{}@{}'.format(domain,srvname)], + 'ip -> @[ip]:to@domain' : [ip, '@{}:@{}'.format(ip, domain)], + 'ip -> @srvname:to@domain': [ip, '@{}:@{}'.format(srvname, domain)], + 'ip -> domain!to' : [ip, '{}!'.format(domain)], + 'ip -> domain!to@[ip]' : [ip, '{}!@{}'.format(domain, ip)], + 'ip -> domain!to@srvname': [ip, '{}!@{}'.format(domain,srvname)], + } + + dbg('Performing Open-Relay tests...') + + interrupted = False + + try: + if (self.openRelayParams[0] != '' and self.openRelayParams[1] != '') and \ + ('@' in self.openRelayParams[0] and '@' in self.openRelayParams[1]): + info('Running custom test: (from: <{}>) => (to: <{}>)'.format( + self.openRelayParams[0], self.openRelayParams[1] + ), toOutLine = True) + results['custom'] = self._openRelayTest('custom', self.openRelayParams) + else: + avoidMailFrom = False + rollBackSenderOnce = False + + num = 0 + for k, v in domains.items(): + if self.stopEverything: break + num += 1 + results[k] = False + + retry = 0 + for i in range(2): + if self.stopEverything: break + dbg('Attempting Open-Relay test #{}: "{}"'.format(num, k)) + results[k] = self._openRelayTest(k, v, avoidMailFrom, num) + + if results[k] == 554 and not rollBackSenderOnce: + dbg('Rolling back to traditional sender\'s address: @{}'.format(internalDomain)) + rollBackSenderOnce = True + + for d, v in domains.items(): + if d.startswith('ip -> '): + domains[d] = [internalDomain, v[1]] + + #elif (results[k] == 503 or results[k] == 501) and not avoidMailFrom: + # dbg('Will not send MAIL FROM anymore.') + # avoidMailFrom = True + + elif (results[k] == 501 or results[k] == 503): + results[k] = False + dbg('Reconnecting as SMTP server stuck in repeated/invalid MAIL FROM envelope.') + if self.stopEverything: break + self.reconnect(quiet = True) + results[k] = self._openRelayTest(k, v, avoidMailFrom, num) + continue + break + except KeyboardInterrupt: + interrupted = True + info('Open-Relay tests interrupted by user!') + + if not config['always_unfolded_results'] and all(results.values()): + ok('SECURE: Open-Relay seems not to be possible as proved after {} tests.'.format(len(results))) + return True + else: + sumOfValues = 0 + for k, v in results.items(): + dbg('Open-Relay test ({}) resulted with: {}'.format( + k, v + )) + if v == False: + sumOfValues += 1 + + appendix = '' + if sumOfValues != len(results): + appendix = '\tThe rest of tests failed at some point, without any status.' + + if interrupted: + sumOfValues = 1 if sumOfValues < 1 else sumOfValues + appendix = '\tTests were interrupted thus dunno whether the server is open-relaying or not.' + _out('[?] UNKNOWN: Open-Relay were interrupted after {}/{} carried tests.'.format( + sumOfValues - 1, len(results) + ), True, colors.fg.pink) + else: + fail('UNSECURE: Open-Relay MAY BE possible as turned out after {}/{} successful tests.'.format( + sumOfValues, len(results) + )) + + if appendix: + _out(appendix, True, colors.fg.pink) + + return results + else: + fail('FAILED: Could not reconnect for Open-Relay testing purposes.') + + return None + + @staticmethod + def _extractMailAddress(param, baseName = ''): + ''' + @param param - specifies target SMTP domain + @param baseName - specifies target mail username + ''' + + surnames = ['John Doe', 'Mike Smith', 'William Dafoe', 'Henry Mitchell'] + + if not param: + return '', '' + + base = 'test{}'.format(random.randint(0, 9)) + if baseName: + base = baseName + + # Format: test@test.com + m = re.match(r"(^[a-zA-Z0-9_.+-]+)@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", baseName) + if m: + base = m.group(1) + + if '' in param: + param = param.replace('', base) + + addr = '{}@{}'.format(base, param) + if '@' in param and param.count('@') == 1: + addr = param + param = param.split('@')[1] + + elif '@' in param and param.count('@') > 1: + return param, param + + mail = '"{}" <{}>'.format(random.choice(surnames), addr) + + # Format: test@test.com + m = re.match(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", param) + if m: + addr = m.group(1) + mail = '"{}" <{}>'.format(random.choice(surnames), addr) + return addr, mail + + # Format: "John Doe" + m = re.match(r'(^\"([^\"]+)\"[\s,]+<([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)>$)', param) + if m: + addr = m.group(3) + mail = '"{}" <{}>'.format(m.group(2), addr) + return addr, mail + + return addr, mail + + @staticmethod + def extractMailAddress(param, baseName = ''): + dbg('Extracting mail address from parameter: "{}", according to base: "{}"'.format( + param, baseName + )) + + addr, mail = SmtpTester._extractMailAddress(param, baseName) + + dbg('After extraction: addr="{}", mail="{}"'.format( + addr, mail + )) + + return addr, mail + + + def _openRelayTest(self, testName, twoDomains, avoidMailFrom = False, num = 0, doNotSendAndTest = False): + secureConfigurationCodes = (221, 454, 500, 501, 503, 504, 530, 550, 554, ) + now = datetime.datetime.now() + + # If True - secure configuration, could not send via open-relay + result = None + + fromAddr, fromMail = SmtpTester.extractMailAddress(twoDomains[0], self.openRelayParams[0]) + toAddr, toMail = SmtpTester.extractMailAddress(twoDomains[1], self.openRelayParams[1]) + + if testName == 'custom': + info('Performing custom Open-Relay test from: {}, to: {}'.format( + fromMail, toMail + )) + + dateNow = now.strftime("%a, %d %b %Y %H:%m:%S") + subject = 'Open-Relay test #{}: {}'.format(num, testName) + + mailFromReturn = '' + rcptToReturn = '' + dataReturn = '' + + mailCommands = ( + 'MAIL From: ' + fromAddr, + 'RCPT To: ' + toAddr, + 'DATA', + '' + ) + + message = '''From: {fromMail} +To: {toMail} +Subject: {subject} +Date: {dateNow} + +Warning! + +This is a test mail coming from 'smtpAudit.py' tool. + +If you see this message it means that your SMTP server is *vulnerable* to Open-Relay spam technique (https://en.wikipedia.org/wiki/Open_mail_relay). Unauthorized users will be able to make your server send messages in a name of other mail users. + +You may want to contact with your mail administrator and pass him with the following informations: + +--------------------8<-------------------- +Open-Relay test name: "{testName}" + +MAIL From: {fromAddr} + Server response: {mailFromReturn} +RCPT To: {toAddr} + Server response: {rcptToReturn} +DATA + Server response: {dataReturn} + +Subject: "{subject}" +Date: {dateNow} +--------------------8<-------------------- + +smtpAudit.py ({VERSION}) - SMTP Server penetration testing / audit tool, +(https://gist.github.com/mgeeky/ef49e5fb6c3479dd6a24eb90b53f9baa) +by Mariusz B. / mgeeky () +. +''' + + n = 0 + out = None + for line in mailCommands: + if self.stopEverything: break + + if avoidMailFrom and line.startswith('MAIL From:'): + dbg('Skipping MAIL From: line.') + continue + + n += 1 + + if line.startswith('DATA') and doNotSendAndTest: + break + + if line == '': + line = message.format( + fromMail = fromMail, + toMail = toMail, + subject = subject, + dateNow = dateNow, + fromAddr = fromAddr, + toAddr = toAddr, + testName = testName, + VERSION = VERSION, + mailFromReturn = mailFromReturn, + rcptToReturn = rcptToReturn, + dataReturn = dataReturn + ) + + out = self.sendcmd(line) + msg = out[1].decode().lower() + + if line.startswith('MAIL From'): + mailFromReturn = '{} ({})'.format(out[0], out[1].decode()) + if line.startswith('RCPT To'): + rcptToReturn = '{} ({})'.format(out[0], out[1].decode()) + if line.startswith('DATA'): + dataReturn = '{} ({})'.format(out[0], out[1].decode()) + + if 'rcpt to' in line.lower(): + _out('[>] Open-Relay test (from: <{}>) => (to: <{}>); returned: {} ({})'.format( + fromAddr, toAddr, out[0], out[1].decode() + ), False, colors.fg.pink) + + elif out[0] == 221 and 'can' in msg and 'break' in msg and 'rules' in msg: + # 221 (2.7.0 Error: I can break rules, too. Goodbye.) + result = True + + if out[0] == 501 and 'mail from' in msg and 'already' in msg: + # 501 (5.5.1 MAIL FROM already established) + return 501 + + elif out[0] == 503 and 'nested' in msg and 'mail' in msg: + # 503 (5.5.1 Error: nested MAIL command) + return 503 + + elif out[0] == 503 and 'already' in msg and 'specified' in msg: + # 503 (5.5.1 Sender already specified) + #return 503 + continue + + elif out[0] == 554 and 'bad' in msg and 'sender' in msg and 'addr' in msg: + # 554 (5.7.1 Bad senders system address) + dbg('Bad sender\'s address. Rolling back.') + return 554 + + elif (out[0] == 550 or out[0] == 530) and self.processResponseForAcceptedDomainsFailure(out): + # 530 (5.7.1 Client was not authenticated). + # 550 (5.7.1 Client does not have permissions to send as this sender). + info('Microsoft Exchange Accepted Domains mechanism properly rejects us from relaying. Splendid.') + result = True + + elif out[0] == 550 and self.processResponseForSpfFailure(out): + # 550 (5.7.1 Recipient address rejected: Message rejected due to: SPF fail - not authorized). + info('SPF properly rejects us from relaying. Splendid.') + result = True + + elif not out or not out[0] or not out[1] or out[0] in secureConfigurationCodes: + if line.startswith('From: '): + info('Open-Relay {} MAY be possible: the server hanged up on us after invalid "From:" (step: {})'.format( + testName, n + ), toOutLine = True) + info('\tThis means, that upon receiving existing From/To addresses - server could allow for Open-Relay.', toOutLine = True) + info('\tTo further analyse this issue - increase verbosity and choose another "--from" or "--to" parameters.', toOutLine = True) + result = None + else: + dbg('Open-Relay {} test failed at step {}: {}.'.format( + testName, n, line.strip() + )) + result = True + break + + dbg('Open-Relay {} test DID NOT failed at step {}: {}. Response: {}'.format( + testName, n, line.strip(), str(out) + )) + + verdict = 'most likely' + if out[0] == 250: + verdict = 'TOTALLY' + + if doNotSendAndTest: + return True + + if result != True and out[0] < 500: + fail('UNSECURE: Open-Relay {} is {} possible.'.format( + testName, verdict + )) + _out('\tReturned: {} ("{}")'.format(out[0], out[1].decode())) + + result = False + + elif (result == False and not out[0]) or result == None: + fail('UNKNOWN: Server has disconnected after the Open-Relay ({}) test. Most likely secure.'.format(testName)) + result = None + + else: + if 'relaying denied' in out[1].decode().lower(): + # (550, b'5.7.1 Relaying denied') + ok('SECURE: Open-Relay attempt "{}" was denied.'.format(testName)) + else: + info('Open-Relay "{}" seems not to be possible.'.format( + testName + )) + try: + _out('\tReturned: {} ({})'.format(out[0], out[1].decode())) + except: + _out('\tReturned: ({})'.format(str(out))) + + result = True + + return result + + + # + # =========================== + # SSL AUTH ENFORCEMENT TESTS + # + def starttlsOffer(self): + if not self.availableServices: + self.getAvailableServices() + if not self.availableServices: + fail('UNKNOWN: Could not collect available SMTP services') + return None + + ret = ('starttls' in map(lambda x: x.lower(), self.availableServices)) + + if ret or self.ssl: + ok('SECURE: STARTTLS is offered by SMTP server.') + else: + dbg('Trying to send STARTTLS by hand') + out = self.sendcmd('STARTTLS', nowrap = True) + + if out[0] == 220: + ok('SECURE: STARTTLS is supported, but not offered at first sight.') + ret = True + + self.connect(quiet = True) + + else: + fail('UNSECURE: STARTTLS is NOT offered by SMTP server.') + + return ret + + # + # =========================== + # SSL AUTH ENFORCEMENT TESTS + # + + def testSSLAuthEnforcement(self): + for service in SmtpTester.smtpAuthServices: + ret = self.testSSLAuthEnforcementForService(service) + if ret == False: + return ret + + return True + + def testSSLAuthEnforcementForService(self, service): + authMethods = self.getAuthMethods(service) + ret = True + emptyMethods = False + + notSupportedCodes = (500, 502, 503, 504, 535) + unsecureConfigurationCodes = () + + for authMethod in authMethods: + if authMethod.upper() == 'NTLM': + _out('[?] This may be a Microsoft Exchange receive connector offering Integrated Windows Authentication service.', True, colors.fg.pink) + + if authMethod.upper() == 'GSSAPI': + _out('[?] This may be a Microsoft Exchange receive connector offering Exchange Server authentication service over Generic Security Services application programming interface (GSSAPI) and Mutual GSSAPI authentication.', True, colors.fg.pink) + + if not authMethods: + emptyMethods = True + authMethods = SmtpTester.commonSmtpAuthMethods.keys() + + for authMethod in authMethods: + dbg("Checking authentication method: {}".format(authMethod)) + + if authMethod.upper() in SmtpTester.authMethodsNotNeedingStarttls: + dbg('Method {} does not need to be issued after STARTTLS.'.format( + authMethod.upper() + )) + #continue + + auths = [] + _auth = '{} {}'.format(service, authMethod) + + if authMethod in SmtpTester.commonSmtpAuthMethods.keys(): + param = SmtpTester.commonSmtpAuthMethods[authMethod] + + if isinstance(param, bytes): param = param.decode() + + if isinstance(param, str): + _auth += ' ' + param + auths.append(_auth) + elif isinstance(param, list) or isinstance(param, tuple): + for n in param: + if isinstance(param, bytes): n = n.decode() + + if isinstance(n, str): + auths.append(_auth) + n = base64.b64encode(n.replace('DOMAIN.COM', self.originalHostname).encode()) + auths.append(n) + elif isinstance(n, list) or isinstance(n, tuple): + auths.append(_auth) + for m in n: + if isinstance(m, bytes): m = m.decode() + + if 'DOMAIN.COM' in m: + m = base64.b64encode(m.replace('DOMAIN.COM', self.originalHostname).encode()) + auths.append(m) + + index = 0 + for index in range(len(auths)): + auth = auths[index] + out = self.sendcmd(auth, nowrap = True) + + dbg('The server responded for {} command with: ({})'.format(auth, str(out))) + + if not out or out[0] == False: + dbg('Something gone wrong along the way.') + + elif out and out[0] in notSupportedCodes: + dbg('The {} {} method is either not supported or not available.'.format( + service, authMethod + )) + index += 1 + + elif not out[0] and not out[1]: + info('The server disconnected during {} {}, this might be secure.'.format( + service, authMethod + )) + + elif out[0] == 454: + # 4.7.0 TLS not available due to local problem + fail('UNSECURE: STARTTLS seems to be not available on the server side.') + _out('\tReturned: {} ("{}")'.format(out[0], out[1].decode())) + return False + + elif out[0] == 334: + # 334 base64 encoded User then Password prompt + if out[1].decode() == 'VXNlcm5hbWU6': + dbg('During LOGIN process the server enticed to carry on') + + elif out[1].decode() == 'UGFzc3dvcmQ6': + if not self.ssl: + fail('UNSECURE: Server allowed authentication over non-SSL channel via "{} {}"!'.format( + service, authMethod + )) + _out('\tReturned: {} ("{}")'.format(out[0], out[1].decode())) + return False + + else: + dbg('The {} {} method is not understood.: ({})'.format( + service, authMethod, str(out) + )) + + elif out and not (out[0] in (530, ) and b'starttls' in out[1].lower()): + fail('UNSECURE: For method "{} {}" the server did not required STARTTLS!'.format( + service, authMethod + )) + _out('\tReturned: {} ("{}")'.format(out[0], out[1].decode())) + return False + + elif out and (out[0] == 530 and b'STARTTLS' in out[1]): + ok('SECURE: Server enforces SSL/TLS channel negotation before {}.'.format( + service + )) + _out('\tReturned: {} ("{}")'.format(out[0], out[1].decode())) + return True + + if set(authMethods) <= set(SmtpTester.authMethodsNotNeedingStarttls): + ok('SECURE: There were no {} methods requiring STARTTLS.'.format(service)) + return True + + if emptyMethods: + info('The server does not offer any {} methods to enforce.'.format( + service + )) + else: + info('UNKNOWN: None of tested {} methods yielded any result (among: {}).'.format( + service, ', '.join(authMethods) + )) + + return None + + + # + # =========================== + # SSL/TLS UNSECURE CIPHERS TESTS + # + def testSecureCiphers(self): + performedStarttls = False + if not self.starttlsSucceeded: + dbg('STARTTLS session has not been set yet. Setting up...') + performedStarttls = self.performStarttls() + + if not self.ssl and not performedStarttls and not self.starttlsSucceeded: + err('Could not initiate successful STARTTLS session. Failure') + return None + + try: + cipherUsed = self.server_tls_params['cipher'] + version = self.server_tls_params['version'] + except (KeyError, AttributeError): + err('Could not initiate successful STARTTLS session. Failure') + return None + + dbg('Offered cipher: {} and version: {}'.format(cipherUsed, version)) + + if cipherUsed[0].upper() in SmtpTester.secureCipherSuitesList: + ok('SECURE: Offered cipher is considered secure.') + _out('\tCipher: {}'.format(cipherUsed[0])) + return True + + + for secureCipher in SmtpTester.secureCipherSuitesList: + ciphers = set(secureCipher.split('-')) + cipherUsedSet = set(cipherUsed[0].upper().split('-')) + + intersection = ciphers.intersection(cipherUsedSet) + minWords = min(len(ciphers), len(cipherUsedSet)) + if minWords >= 3 and len(intersection) >= (minWords - 1): + ok('SECURE: Offered cipher is having secure structure.') + _out('\tCipher: {}'.format(cipherUsed)) + + return True + + unsecureCiphers = ('RC4', '3DES', 'DES', ) + usedUnsecureCipher = '' + + for cipher in unsecureCiphers: + if cipher in cipherUsed[0].upper(): + fail('SMTP Server offered unsecure cipher.') + _out('\tCipher: {}'.format(cipher)) + return False + + usedSSL = 'ssl' in version.lower() + + unsecureSSLs = ('sslv2', 'sslv3') + if 'shared_ciphers' in self.server_tls_params.keys(): + unsecureProtocolsOffered = set() + for s in self.server_tls_params['shared_ciphers']: + dbg('Offered cipher (22222): {}'.format(s[1])) + if s[1].lower() in unsecureSSLs: + unsecureProtocolsOffered.add(s[1]) + + if len(unsecureProtocolsOffered) > 0: + out = ', '.join(unsecureProtocolsOffered) + + fail('SMTP Server offered unsecure SSL/TLS protocols: {}'.format(out)) + return False + else: + fail('No server TLS parameters obtained yet.') + + if not usedSSL and not usedUnsecureCipher: + ok('SECURE: SMTP Server did not offered unsecure encryption suite.') + return True + + else: + fail('UNSECURE: SMTP Server offered unsecure encryption suite.') + _out('\tCipher: {}'.format(usedUnsecureCipher)) + return False + + + # + # =========================== + # UNSECURE AUTH METHODS TESTS + # + def testSecureAuthMethods(self): + success = None + for service in SmtpTester.smtpAuthServices: + ret = self.testSecureAuthMethodsForService(service) + if ret == False: + return ret + elif ret == True: + # ret may be also 'None' + success = True + + return success + + def testSecureAuthMethodsForService(self, service): + authMethods = self.getAuthMethods(service) + + unsecureAuthMethods = ('PLAIN', 'LOGIN') + ret = True + methods = set() + + if not authMethods: + authMethods = SmtpTester.commonSmtpAuthMethods + foundMethods = [] + + dbg('The server is not offering any {} method. Going to try to discover ones.'.format( + service + )) + + for authMethod in authMethods: + if authMethod in SmtpTester.authMethodsNotNeedingStarttls: + dbg('Method: {} {} is considered not needing STARTTLS.'.format( + service, authMethod + )) + continue + + auth = '{} {}'.format(service, authMethod) + out = self.sendcmd(auth) + + if out[0] == (500, 503) or \ + (out[1] and (b'not available' in out[1].lower() or \ + b'not recognized' in out[1].lower())): + info('UNKNOWN: {} method not available at all.'.format(service)) + return None + + elif out and out[0] in (334, ): + dbg('Authentication via {} is supported'.format(auth)) + foundMethods.append(authMethod) + + if authMethod.upper() in unsecureAuthMethods: + if not self.ssl: + fail('UNSECURE: SMTP offers plain-text authentication method: {}!'.format( + auth + )) + else: + ok('SECURE: SMTP offered plain-text authentication method over SSL: {}!'.format( + auth + )) + + _out('\tOffered reply: {} ("{}")'.format(out[0], out[1].decode())) + + ret = False + break + + if out[0] == False and out[1] == False: + info('UNKNOWN: The server has disconnected while checking'\ + ' {}. This might be secure'.format( + auth + )) + return None + + methods = foundMethods + else: + for authMethod in authMethods: + if authMethod.upper() in unsecureAuthMethods: + if not self.ssl: + fail('UNSECURE: SMTP server offers plain-text authentication method: {}.'.format( + authMethod + )) + else: + ok('SECURE: SMTP server offered plain-text authentication method over SSL: {}.'.format( + authMethod + )) + + ret = False + break + + methods = authMethods + + if ret and methods: + ok('SECURE: Among found {} methods ({}) none was plain-text.'.format( + service, ', '.join(methods) + )) + elif not ret: + pass + elif not methods: + info('UNKNOWN: The server does not offer any {} methods.'.format( + service + )) + + return None + + dbg('ret = {}, methods = {}'.format(ret, methods)) + + return ret + + + # + # =========================== + # SSL/TLS PRIVATE KEY LENGTH + # + def testSSLKeyLen(self): + performedStarttls = False + if not self.server_tls_params or not self.starttlsSucceeded: + dbg('STARTTLS session has not been set yet. Setting up...') + performedStarttls = self.performStarttls() + + if not performedStarttls and not self.starttlsSucceeded: + err('Could not initiate successful STARTTLS session. Failure') + return None + + try: + cipherUsed = self.server_tls_params['cipher'] + version = self.server_tls_params['version'] + sharedCiphers = self.server_tls_params['shared_ciphers'] + except (KeyError, AttributeError): + err('Could not initiate successful STARTTLS session. Failure') + return None + + dbg('Offered cipher: {} and version: {}'.format(cipherUsed, version)) + + keyLen = cipherUsed[2] * 8 + if keyLen < config['key_len']: + fail('UNSECURE: SSL/TLS negotiated cipher\'s ({}) key length is insufficient: {} bits'.format( + cipherUsed[0], keyLen + )) + elif sharedCiphers != None and len(sharedCiphers) > 0: + for ciph in sharedCiphers: + name, ver, length = ciph + if length * 8 < 1024: + fail('UNSECURE: SMTP server offers SSL/TLS cipher suite ({}) which key length is insufficient: {} bits'.format( + name, keyLen + )) + return False + + ok('SECURE: SSL/TLS negotiated key length is sufficient ({} bits).'.format( + keyLen + )) + else: + fail('UNKNOWN: Something went wrong during SSL/TLS shared ciphers negotiation.') + return None + + return keyLen >= config['key_len'] + + + # + # =========================== + # SPF VALIDATION CHECK + # + def spfValidationTest(self): + + if not self.spfValidated: + dbg('Sending half-mail to domain: "{}" to trigger SPF/Accepted Domains'.format(self.mailDomain)) + self._openRelayTest('spf-validation', ['test@' + self.getMailDomain(), 'admin@' + self.getMailDomain()], False, 0, True) + + if self.spfValidated: + ok('SECURE: SMTP Server validates sender\'s SPF record') + info('\tor is using MS Exchange\'s Accepted Domains mechanism.') + _out('\tReturned: {}'.format(self.spfValidated)) + return True + else: + fail("UNKNOWN: SMTP Server has not been seen validating sender's SPF record.") + info("\tIf it is Microsoft Exchange - it could have reject us via Accepted Domains mechanism using code 550 5.7.1") + return None + + def processResponseForAcceptedDomainsFailure(self, out): + try: + msg = out[1].lower() + + #if out[0] == 530 and '5.7.1' in msg and 'was not authenticated' in msg: + # info('Looks like we might be dealing with Microsoft Exchange') + # return True + + if out[0] == 550 and '5.7.1' in msg and 'does not have permissions to send as this sender' in msg: + info('Looks like we might be dealing with Microsoft Exchange') + return True + except: + pass + + return False + + def processResponseForSpfFailure(self, out): + spfErrorCodes = (250, 451, 550, 554, ) + spfErrorKnownSentences = ( + 'Client host rejected: Access denied', + ) + spfErrorKeywords = ('validat', 'host rejected', 'fail', 'reject', 'check', 'soft', 'not auth', 'openspf.net/Why') + + if out[0] in spfErrorCodes: + msg = out[1].decode().strip() + + # Maybe this error is already known? + for knownSentence in spfErrorKnownSentences: + if knownSentence in msg: + dbg('SPF validation found when received well-known SPF failure error: {} ({})'.format( + out[0], msg + )) + return True + + found = 0 + for word in msg.split(' '): + for k in spfErrorKeywords: + if k.lower() in word: + found += 1 + break + + if 'spf' in msg.lower() and found >= 2: + return True + + if found > 0: + dbg('SPF validation possibly found but unsure ({} keywords related): {} ({})'.format( + found, out[0], msg + )) + + return False + + def checkIfSpfEnforced(self, out): + if self.spfValidated: + return True + + if self.processResponseForSpfFailure(out): + info('SPF validation found: {} ({})'.format(out[0], out[1].decode())) + self.spfValidated = '{} ({})'.format(out[0], out[1].decode()) + return True + + if self.processResponseForAcceptedDomainsFailure(out): + info('SPF validation not found but found enabled Microsoft Exchange Accepted Domains mechanism: {} ({})'.format(out[0], out[1].decode())) + self.spfValidated = '{} ({})'.format(out[0], out[1].decode()) + return False + + return False + + +class ParseOptions: + def __init__(self, argv): + self.argv = argv + self.domain = '' + self.port = None + self.userslist = '' + self.selectors = '' + self.forceSSL = False + self.fromAddr = '' + self.toAddr = '' + + self.parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options] ') + + self.parser.add_argument('hostname', metavar='', type=str, + help='Domain address (server name, or IPv4) specifying SMTP server to scan (host:port).') + + self.parser.add_argument('-d', '--domain', metavar='DOMAIN', dest='maildomain', default='', help = 'This option can be used to specify proper and valid mail (MX) domain (what comes after @, like: example.com). It helps avoid script confusion when it automatically tries to find that mail domain and it fails (like in case IP was passed in first argument).') + + self.parser.add_argument('-v', '--verbose', dest='verbose', + action = 'count', default = 0, help='Increase verbosity level (use -vv or more for greater effect)') + self.parser.add_argument('-T', '--list-tests', dest='testsHelp', action='store_true', help='List available tests.') + self.parser.add_argument('-u', '--unfolded', dest='unfolded', default=False, action='store_true', + help = 'Always display unfolded JSON results even if they were "secure".') + self.parser.add_argument('-C', '--no-colors', dest = 'colors', default = True, action = 'store_false', help = 'Print without colors.') + self.parser.add_argument('-f', '--format', metavar='FORMAT', dest='format', + default = 'text', choices = ['text', 'json'], + help = 'Specifies output format. Possible values: text, json. Default: text.') + + self.parser.add_argument('-m', '--tests', metavar='TEST', dest='testToCarry', + type=str, + default = 'all', help = 'Select specific tests to conduct. For a list of tests'\ + ', launch the program with option: "{} -T tests". Add more tests after colon. (Default: run all tests).'.format( + argv[0] + )) + + self.parser.add_argument('-M', '--skip-test', metavar='TEST', dest='testToSkip', + type=str, + default = '', help = 'Select specific tests to skip. For a list of tests'\ + ', launch the program with option: "{} -T tests". Add more tests after colon. (Default: run all tests).'.format( + argv[0] + )) + + self.parser.add_argument('-t', '--timeout', metavar="TIMEOUT", type=float, dest='timeout', + default = config['timeout'], help='Socket timeout. (Default: {})'.format( + config['timeout'] + )) + + self.parser.add_argument('--delay', metavar="DELAY", dest='delay', type=float, + default = config['delay'], + help='Delay introduced between subsequent requests and connections. '\ + '(Default: {} secs)'.format( + config['delay'] + )) + + # Attack options + attack = self.parser.add_argument_group('Attacks') + attack.add_argument('--attack', dest='attack', action='store_true', help = 'Switch to attack mode in which only enumeration techniques will be pulled off (vrfy, expn, rcpt to). You can use --tests option to specify which of them to launch.') + + attack.add_argument('-U', '--users', metavar="USERS", type=str, dest='userslist', + default = '', help='Users list file used during enumeration tests.') + + # DKIM options + dkim = self.parser.add_argument_group('DKIM Tests') + dkim.add_argument('-w', '--wordlist', dest='words', default='', type=str, + help = 'Uncommon words to be used in DKIM selectors dictionary generation. Comma separated.') + dkim.add_argument('-D', '--selectors', metavar="SELECTORS", type=str, dest='selectors', + default = '', help='DKIM selectors list file with custom selectors list to review.') + dkim.add_argument('-y', '--tries', metavar="TRIES", type=int, dest='tries', + default = -1, help='Maximum number of DNS tries/enumerations in DKIM test. (Default: all of them)') + + dkim.add_argument('--dkim-enumeration', metavar="TYPE", type=str, + choices = ['never', 'on-ip', 'full'], dest = 'dnsenum', + default = config['dns_full'], + help='When to do full-blown DNS records enumeration. Possible values: '\ + 'always, on-ip, never. When on-ip means when DOMAIN was IP address. '\ + '(Default: "{}")'.format( + config['dns_full'] + )) + + + # Open-Relay options + openRelay = self.parser.add_argument_group('Open-Relay Tests') + openRelay.add_argument('-x', '--external-domain', dest='external_domain', metavar='DOMAIN', + default = config['smtp_external_domain'], type=str, + help = 'External domain to use in Open-Relay tests. (Default: "{}")'.format( + config['smtp_external_domain'] + )) + openRelay.add_argument('--from', dest='fromAddr', default='', type=str, + help = 'Specifies "From:" address to be used in Open-Relay test. Possible formats: (\'test\', \'test@test.com\', \'"John Doe" \'). If you specify here and in \'--to\' full email address, you are going to launch your own custom test. Otherwise, those values will be passed into username part @domain.') + openRelay.add_argument('--to', dest='toAddr', default='', type=str, + help = 'Specifies "To:" address to be used in Open-Relay test. Possible formats: (\'test\', \'test@test.com\', \'"John Doe" \'). If you specify here and in \'--from\' full email address, you are going to launch your own custom test. Otherwise, those values will be passed into username part @domain.') + + if len(sys.argv) < 2: + self.usage() + sys.exit(-1) + + if config['verbose']: + ParseOptions.banner() + + if not self.parse(): + sys.exit(-1) + + @staticmethod + def banner(): + sys.stderr.write(''' + :: SMTP Black-Box Audit tool. + v{}, Mariusz B. / mgeeky, '17 + +'''.format(VERSION)) + + def usage(self): + ParseOptions.banner() + self.parser.print_help() + + def parse(self): + global config + + testsHelp = '' + for k, v in SmtpTester.testsConducted.items(): + testsHelp += '\n\t{:20s} - {}'.format(k, v) + + if len(sys.argv) >= 2: + if (sys.argv[1].lower() == '--list-tests') or \ + (sys.argv[1] == '-T' and len(sys.argv) >= 3 and sys.argv[2] == 'tests') or \ + (sys.argv[1] == '-T') or \ + (sys.argv[1] == '--list-tests' and len(sys.argv) >= 3 and sys.argv[2] == 'tests'): + print('Available tests:{}'.format(testsHelp)) + sys.exit(0) + + args = self.parser.parse_args() + + if args.testsHelp: + print('Available tests:{}'.format(testsHelp)) + sys.exit(0) + + self.domain = args.hostname + self.userslist = args.userslist + self.selectors = args.selectors + self.maildomain = args.maildomain + self.attack = args.attack + + if args.fromAddr: self.fromAddr = args.fromAddr + if args.toAddr: self.toAddr = args.toAddr + + if ':' in args.hostname: + self.domain, self.port = args.hostname.split(':') + self.port = int(self.port) + + if args.verbose >= 1: config['verbose'] = True + if args.verbose >= 2: config['debug'] = True + if args.verbose >= 3: config['smtp_debug'] = True + + config['timeout'] = args.timeout + config['delay'] = args.delay + config['max_enumerations'] = args.tries + config['dns_full'] = args.dnsenum + config['always_unfolded_results'] = args.unfolded + config['format'] = args.format + config['colors'] = args.colors + config['attack'] = args.attack + + if args.words: + config['uncommon_words'] = args.words.split(',') + + if args.testToCarry: + config['tests_to_carry'] = args.testToCarry.split(',') + for c in config['tests_to_carry']: + if c == 'all': continue + if c not in SmtpTester.testsConducted.keys(): + err('There is no such test as the one specified: "{}"'.format( + c + )) + print('\nAvailable tests:{}'.format(testsHelp)) + sys.exit(-1) + + l = list(filter(lambda x: x != 'all', config['tests_to_carry'])) + if l: + info('Running following tests: ' + ', '.join(l)) + + if args.testToSkip: + config['tests_to_skip'] = args.testToSkip.split(',') + for c in config['tests_to_skip']: + if c == '': break + if c not in SmtpTester.testsConducted.keys(): + err('There is no such test as the one specified: "{}"'.format( + c + )) + print('\nAvailable tests:{}'.format(testsHelp)) + sys.exit(-1) + + l = list(filter(lambda x: x != '', config['tests_to_skip'])) + if l: + info('Skipping following tests: ' + ', '.join(l)) + + return True + + +def printResults(results, auditMode): + if auditMode: + if config['format'] == 'json': + out = json.dumps(results, indent = 4) + out = out[1:-1] + out = out.replace('\\n', '\n') + out = out.replace('\\', '') + print(out) + + elif config['format'] == 'text': + pass + else: + info('Results:') + if config['format'] == 'json': + out = json.dumps(results, indent = 4) + out = out[1:-1] + out = out.replace('\\n', '\n') + out = out.replace('\\', '') + print(out) + + else: + for found in results: + print(found) + + if not config['verbose'] and not config['debug']: + sys.stderr.write('\n---\nFor more detailed output, consider enabling verbose mode.\n') + + +def main(argv): + opts = ParseOptions(argv) + domain = opts.domain + port = opts.port + userslist = opts.userslist + selectors = opts.selectors + + if config['format'] == 'text': + sys.stderr.write(''' + :: SMTP configuration Audit / Penetration-testing tool + Intended to be used as a black-box tool revealing security state of SMTP. + Mariusz B. / mgeeky, '17-19 + v{} + +'''.format(VERSION)) + + prev = datetime.datetime.now() + info('SMTP Audit started at: [{}], on host: "{}"'.format( + prev.strftime('%Y.%m.%d, %H:%M:%S'), + socket.gethostname() + )) + info('Running against target: {}{}{}'.format( + opts.domain, ':'+str(opts.port) if opts.port != None else '', + ' (...@' + opts.maildomain + ')' if opts.maildomain != '' else '', + toOutLine = True)) + + results = {} + tester = SmtpTester( + domain, + port, + dkimSelectorsList = selectors, + userNamesList = userslist, + openRelayParams = (opts.fromAddr, opts.toAddr), + mailDomain = opts.maildomain + ) + + try: + if opts.attack: + results = tester.runAttacks() + else: + results = tester.runTests() + + except KeyboardInterrupt: + err('USER HAS INTERRUPTED THE PROGRAM.') + if tester: + tester.stop() + + after = datetime.datetime.now() + info('Audit finished at: [{}], took: [{}]'.format( + after.strftime('%Y.%m.%d, %H:%M:%S'), + str(after - prev) + ), toOutLine = True) + + if config['verbose'] and config['format'] != 'text': + sys.stderr.write('\n' + '-' * 50 + '\n\n') + + printResults(results, not opts.attack) + +if __name__ == '__main__': + main(sys.argv)