From 683a25d8c74f33ad2dc11807979a6419a8de10d6 Mon Sep 17 00:00:00 2001 From: "Mariusz B. / mgeeky" Date: Tue, 26 Oct 2021 23:03:24 +0200 Subject: [PATCH] updated decode-spam headers - now parses 67+ different SMTPheaders --- phishing/README.md | 96 +- phishing/decode-spam-headers.py | 1721 +++++++++++++++++++++++++++---- 2 files changed, 1574 insertions(+), 243 deletions(-) diff --git a/phishing/README.md b/phishing/README.md index d51e1ac..549efe9 100644 --- a/phishing/README.md +++ b/phishing/README.md @@ -7,39 +7,75 @@ Resulting output will contain useful information on why this e-mail might have been blocked. - Processed headers (more than **32+** headers are parsed): + Processed headers (more than **67+** headers are parsed): - - `Authentication-Results` - - `From` - - `Received-SPF` + - `X-forefront-antispam-report` + - `X-exchange-antispam` + - `X-exchange-antispam-mailbox-delivery` + - `X-exchange-antispam-message-info` + - `X-microsoft-antispam-report-cfa-test` - `Received` + - `From` - `To` - - `X-Forefront-Antispam-Report` - - `X-Mailer` - - `X-Microsoft-Antispam-Mailbox-Delivery` - - `X-Microsoft-Antispam-Message-Info` - - `X-Microsoft-Antispam` - - `X-MS-Exchange-Transport-EndToEndLatency` - - `X-MS-Oob-TLC-OOBClassifiers` - - `X-MS-Exchange-AtpMessageProperties` - - `X-Exchange-Antispam-Report-CFA-Test` - - `X-Microsoft-Antispam-Report-CFA-Test` - - `X-MS-Exchange-AtpMessageProperties` - - `X-Spam-Status` - - `X-Spam-Level` - - `X-Spam-Flag` - - `X-Spam-Report` - - `ARC-Authentication-Results` - - `X-MSFBL` - - `X-Ovh-Spam-Reason` - - `X-VR-SPAMCAUSE` - - `X-VR-SPAMSCORE` - - `X-Virus-Scanned` - - `X-Spam-Checker-Version` - - `X-IronPort-AV` - - `X-Mimecast-Spam-Score` - - `User-Agent` - - `X-Originating-IP` + - `Subject` + - `Thread-topic` + - `Received-spf` + - `X-mailer` + - `X-originating-ip` + - `User-agent` + - `X-forefront-antispam-report` + - `X-microsoft-antispam-mailbox-delivery` + - `X-microsoft-antispam` + - `X-exchange-antispam-report-cfa-test` + - `X-spam-status` + - `X-spam-level` + - `X-spam-flag` + - `X-spam-report` + - `X-vr-spamcause` + - `X-ovh-spam-reason` + - `X-vr-spamscore` + - `X-virus-scanned` + - `X-spam-checker-version` + - `X-ironport-av` + - `X-ironport-anti-spam-filtered` + - `X-ironport-anti-spam-result` + - `X-mimecast-spam-score` + - `Spamdiagnosticmetadata` + - `X-ms-exchange-atpmessageproperties` + - `X-msfbl` + - `X-ms-exchange-transport-endtoendlatency` + - `X-ms-oob-tlc-oobclassifiers` + - `X-ip-spam-verdict` + - `X-amp-result` + - `X-ironport-remoteip` + - `X-ironport-reputation` + - `X-sbrs` + - `X-ironport-sendergroup` + - `X-policy` + - `X-ironport-mailflowpolicy` + - `X-remote-ip` + - `X-sea-spam` + - `X-fireeye` + - `X-antiabuse` + - `X-tmase-version` + - `X-tm-as-product-ver` + - `X-tm-as-result` + - `X-imss-scan-details` + - `X-tm-as-user-approved-sender` + - `X-tm-as-user-blocked-sender` + - `X-tmase-result` + - `X-tmase-snap-result` + - `X-imss-dkim-white-list` + - `X-tm-as-result-xfilter` + - `X-tm-as-smtp` + - `X-scanned-by` + - `X-mimecast-spam-signature` + - `X-mimecast-bulk-signature` + - `X-sender-ip` + - `X-forefront-antispam-report-untrusted` + - `X-microsoft-antispam-untrusted` + - `X-sophos-senderhistory` + - `X-sophos-rescan` - and more... Most of these headers are not fully documented, therefore the script is unable to pinpoint all the details, but at least it collects all I could find on them. diff --git a/phishing/decode-spam-headers.py b/phishing/decode-spam-headers.py index 43c2f53..8c034bd 100644 --- a/phishing/decode-spam-headers.py +++ b/phishing/decode-spam-headers.py @@ -9,34 +9,74 @@ # but also by the Offensive security consultants performing Phishing Awareness Trainings, before sending # a campaign to analyse negative constructs in their e-mails. # -# The script can decode many different SPAM related headers, among the others: -# - Authentication-Results -# - X-Forefront-Antispam-Report -# - X-Mailer -# - X-Microsoft-Antispam-Mailbox-Delivery -# - X-Microsoft-Antispam-Message-Info -# - X-Microsoft-Antispam -# - X-MS-Exchange-Transport-EndToEndLatency -# - X-MS-Oob-TLC-OOBClassifiers -# - X-MS-Exchange-AtpMessageProperties -# - X-Exchange-Antispam-Report-CFA-Test -# - X-Microsoft-Antispam-Report-CFA-Test -# - X-MS-Exchange-AtpMessageProperties -# - X-Spam-Status -# - X-Spam-Level -# - X-Spam-Flag -# - X-Spam-Report -# - ARC-Authentication-Results -# - X-MSFBL -# - X-Ovh-Spam-Reason -# - X-VR-SPAMCAUSE -# - X-VR-SPAMSCORE -# - X-Virus-Scanned -# - X-Spam-Checker-Version -# - X-IronPort-AV -# - X-Mimecast-Spam-Score -# - User-Agent -# - X-Originating-IP +# The script can decode 67+ different SPAM related headers (and others bringing valuable information): +# - X-forefront-antispam-report +# - X-exchange-antispam +# - X-exchange-antispam-mailbox-delivery +# - X-exchange-antispam-message-info +# - X-microsoft-antispam-report-cfa-test +# - Received +# - From +# - To +# - Subject +# - Thread-topic +# - Received-spf +# - X-mailer +# - X-originating-ip +# - User-agent +# - X-forefront-antispam-report +# - X-microsoft-antispam-mailbox-delivery +# - X-microsoft-antispam +# - X-exchange-antispam-report-cfa-test +# - X-spam-status +# - X-spam-level +# - X-spam-flag +# - X-spam-report +# - X-vr-spamcause +# - X-ovh-spam-reason +# - X-vr-spamscore +# - X-virus-scanned +# - X-spam-checker-version +# - X-ironport-av +# - X-ironport-anti-spam-filtered +# - X-ironport-anti-spam-result +# - X-mimecast-spam-score +# - Spamdiagnosticmetadata +# - X-ms-exchange-atpmessageproperties +# - X-msfbl +# - X-ms-exchange-transport-endtoendlatency +# - X-ms-oob-tlc-oobclassifiers +# - X-ip-spam-verdict +# - X-amp-result +# - X-ironport-remoteip +# - X-ironport-reputation +# - X-sbrs +# - X-ironport-sendergroup +# - X-policy +# - X-ironport-mailflowpolicy +# - X-remote-ip +# - X-sea-spam +# - X-fireeye +# - X-antiabuse +# - X-tmase-version +# - X-tm-as-product-ver +# - X-tm-as-result +# - X-imss-scan-details +# - X-tm-as-user-approved-sender +# - X-tm-as-user-blocked-sender +# - X-tmase-result +# - X-tmase-snap-result +# - X-imss-dkim-white-list +# - X-tm-as-result-xfilter +# - X-tm-as-smtp +# - X-scanned-by +# - X-mimecast-spam-signature +# - X-mimecast-bulk-signature +# - X-sender-ip +# - X-forefront-antispam-report-untrusted +# - X-microsoft-antispam-untrusted +# - X-sophos-senderhistory +# - X-sophos-rescan # # Usage: # ./decode-spam-headers [options] @@ -276,30 +316,16 @@ class SMTPHeadersAnalysis: 'slackbot-linkexpanding', 'slurp', 'sogou', 'sonicwall', 'sophos', 'superantispyware', 'symantec', 'tachyon', 'tencent', 'totaldefense', 'trapmine', 'trend micro', 'trendmicro', 'trusteer', 'trustlook', 'virusblokada', 'virustotal', 'virustotalcloud', 'webroot', - 'wget', 'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', + 'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', '-sea-', 'perlmx', 'trustwave', + 'mailmarshal', 'tmase', 'startscan', 'fe-etp', 'jemd', 'suspicious', 'grey', 'infected', 'unscannable', 'dlp-', 'sanitize' ) Interesting_Headers = ( - 'mailgun', - 'sendgrid', - 'mailchimp', - 'x-ses', - 'x-avas', - 'X-Gmail-Labels', - 'X-Originating-IP', - 'X-vrfbldomain', - 'mandrill', - 'bulk', - 'sendinblue', - 'amazonses', - 'mailjet', - 'postmark', - 'postfix', - 'dovecot', - 'roundcube', - '-IP', - 'check', + 'mailgun', 'sendgrid', 'mailchimp', 'x-ses', 'x-avas', 'X-Gmail-Labels', 'X-vrfbldomain', + 'mandrill', 'bulk', 'sendinblue', 'amazonses', 'mailjet', 'postmark', 'postfix', 'dovecot', 'roundcube', + 'seg', '-IP', 'crosspremises', 'brightmail', 'check', 'exim', 'postfix', 'exchange', 'microsoft', 'office365', + 'dovecot', 'sendmail' ) Headers_Known_For_Breaking_Line = ( @@ -318,32 +344,19 @@ class SMTPHeadersAnalysis: 'X-Microsoft-Antispam-Message-Info' ) - Handled_Spam_Headers = ( + # + # This array will be (partially) dynamically adjusted by the script by SMTPHeadersAnalysis.getHeader method. + # + # Add here header names that are processed by the script, but not passed to `getHeader` method + # in order to skip them from "Other Interesting Headers" sweep scan. + # + Handled_Spam_Headers = [ 'X-Forefront-Antispam-Report', - 'X-Microsoft-Antispam', - 'X-Microsoft-Antispam-Mailbox-Delivery', - 'X-Microsoft-Antispam-Message-Info', 'X-Exchange-Antispam', 'X-Exchange-Antispam-Mailbox-Delivery', 'X-Exchange-Antispam-Message-Info', - 'X-Exchange-Antispam-Report-CFA-Test', 'X-Microsoft-Antispam-Report-CFA-Test', - 'X-MS-Exchange-AtpMessageProperties', - 'X-Spam-Status', - 'X-Spam-Level', - 'X-Spam-Flag', - 'X-Spam-Report', - 'ARC-Authentication-Results', - 'X-MSFBL', - 'X-Ovh-Spam-Reason', - 'X-VR-SPAMSCORE', - 'X-VR-SPAMCAUSE', - 'X-Virus-Scanned', - 'X-Spam-Checker-Version', - 'X-IronPort-AV', - 'X-Mimecast-Spam-Score', - 'X-Originating-IP', - ) + ] auth_result = { 'none': 'The message was not signed.', @@ -435,6 +448,22 @@ class SMTPHeadersAnalysis: ) } + Aterisk_Risk_Score = { + '*' : logger.colored('lowest risk associated', 'green'), + '**' : logger.colored('low risk associated', 'green'), + '***' : logger.colored('moderately low risk associated', 'yellow'), + '****' : logger.colored('moderately high risk associated', 'yellow'), + '*****' : logger.colored('high risk associated', 'red'), + '******' : logger.colored('highest risk associated', 'red'), + } + + AMP_Results = { + 'CLEAN' : logger.colored('No malware detected.', "green"), + 'MALICIOUS' : logger.colored('Malware detected.', "red"), + 'UNKNOWN' : logger.colored('Could not categorize the message.', "magenta"), + 'UNSCANNABLE' : logger.colored('Could not scan the message.', "yellow"), + } + Forefront_Antispam_Report = { 'ARC' : ( 'ARC Protocol', @@ -545,6 +574,11 @@ class SMTPHeadersAnalysis: ), } + Trend_Type_AntiSpam = { + 1 : logger.colored('Spam', 'red'), + 2 : logger.colored('Phishing', 'red'), + } + Spam_Diagnostics_Metadata = { 'NSPM' : logger.colored('Not Spam', 'green'), 'SPAM' : logger.colored('SPAM', 'red'), @@ -552,6 +586,9 @@ class SMTPHeadersAnalysis: Anti_Spam_Rules_ReverseEngineered = { '35100500006' : logger.colored('(SPAM) Message contained embedded image. Score +4', 'red'), + + # https://docs.microsoft.com/en-us/answers/questions/416100/what-is-meanings-of-39x-microsoft-antispam-mailbox.html + '520007050' : logger.colored('(SPAM) Moved message to Spam and created Email Rule to move messages from this particular sender to Junk.', 'red'), } ForeFront_Spam_Confidence_Levels = { @@ -601,6 +638,12 @@ class SMTPHeadersAnalysis: ) } + SEA_Spam_Fields = { + 'gauge' : 'Spam Message Gauge result', + 'probability' : 'Spam Probability (100% - certain spam)', + 'report' : 'Anti-Spam rules that matched the message along with their probability', + } + SpamAssassin_Spam_Status = ( 'SpamAssassin spam evaluation status report', { @@ -615,6 +658,13 @@ class SMTPHeadersAnalysis: } ) + Cisco_Predefined_MailFlow_Policies = { + '$TRUSTED' : logger.colored('Mails from this sender are TRUSTED and will not be scanned by Anti-Spam or Anti-Virus.', 'green'), + '$BLOCKED' : logger.colored('Mails from this sender are BLOCKED', 'red'), + '$THROTTLED' : logger.colored('Mails from this sender are slowed down because they were considered suspicious. Messages will be scanned.', 'magenta'), + '$ACCEPTED' : logger.colored('Mail flow policies were undecided about the sender so they accepted the message. Message will be scanned and reputation evaluated.', 'green'), + } + Forefront_Antispam_Delivery = { 'dest' : ( 'Destination where message should be placed', @@ -631,6 +681,21 @@ class SMTPHeadersAnalysis: '1' : 'Authenticated', } ), + + 'OFR' : ( + 'Folder Rules applied to this Message', + { + 'ExclusiveSettings' : '', + 'CustomRules' : 'An existing folder move rule was applied on this message.', + } + ), + + 'RF' : ( + 'Email Rules', + { + 'JunkEmail' : logger.colored('Mail marked as Junk and moved to Junk folder', 'red'), + } + ), } @@ -852,10 +917,37 @@ class SMTPHeadersAnalysis: self.decode_all = decode_all self.logger = logger self.received_path = None + self.securityAppliances = set() # (number, header, value) self.headers = [] + @staticmethod + def safeBase64Decode(value): + enc = False + if type(value) == str: + enc = True + value = value.encode() + + try: + out = base64.b64decode(value) + except: + out = base64.b64decode(value + b'=' * (-len(value) % 4)) + + if enc: + out = out.decode() + + return out + + @staticmethod + def resolveAddress(addr): + try: + res = socket.gethostbyaddr(addr) + return ', '.join([x for x in res if len(x) > 0]) + + except: + return '' + @staticmethod def parseExchangeVersion(lookup): @@ -869,6 +961,10 @@ class SMTPHeadersAnalysis: # Go with version-wise comparison to fuzzily find proper version name sortedversions = sorted(SMTPHeadersAnalysis.Exchange_Versions) + match = re.search(r'\d{1,}\.\d{1,}\.\d{1,}', lookup, re.I) + if not match: + return None + for i in range(len(sortedversions)): if sortedversions[i].version.startswith(lookup): sortedversions[i].name = 'fuzzy match: ' + sortedversions[i].name @@ -891,16 +987,25 @@ class SMTPHeadersAnalysis: def getHeader(self, _header): + if _header not in SMTPHeadersAnalysis.Handled_Spam_Headers: + SMTPHeadersAnalysis.Handled_Spam_Headers.append(_header) + for (num, header, value) in self.headers: if header.lower() == _header.lower(): return (num, header, value) - if '-Microsoft-' in _header: - _header = _header.replace('-Microsoft-', '-Exchange-') + similar_headers = ( + ('-Microsoft-', '-Exchange-'), + ('-Microsoft-', '-Office365-'), + ) - for (num, header, value) in self.headers: - if header.lower() == _header.lower(): - return (num, header, value) + for sim in similar_headers: + if sim[0].lower() in _header.lower(): + _header = re.sub(sim[0], sim[1], _header, re.I) + + for (num, header, value) in self.headers: + if header.lower() == _header.lower(): + return (num, header, value) return (-1, '', '') @@ -1032,17 +1137,59 @@ Results will be unsound. Make sure you have pasted your headers with correct spa ('X-Virus-Scan', self.testXVirusScan), ('X-Spam-Checker-Version', self.testXSpamCheckerVersion), ('X-IronPort-AV', self.testXIronPortAV), + ('X-IronPort-Anti-Spam-Filtered', self.testXIronPortSpamFiltered), + ('X-IronPort-Anti-Spam-Result', self.testXIronPortSpamResult), ('X-Mimecast-Spam-Score', self.testXMimecastSpamScore), ('Spam Diagnostics Metadata', self.testSpamDiagnosticMetadata), ('MS Defender ATP Message Properties', self.testATPMessageProperties), ('Message Feedback Loop', self.testMSFBL), ('End-to-End Latency - Message Delivery Time', self.testTransportEndToEndLatency), ('X-MS-Oob-TLC-OOBClassifiers', self.testTLCOObClasifiers), + ('X-IP-Spam-Verdict', self.testXIPSpamVerdict), + ('X-Amp-Result', self.testXAmpResult), + ('X-IronPort-RemoteIP', self.testXIronPortRemoteIP), + ('X-IronPort-Reputation', self.testXIronPortReputation), + ('X-SBRS', self.testXSBRS), + ('X-IronPort-SenderGroup', self.testXIronPortSenderGroup), + ('X-Policy', self.testXPolicy), + ('X-IronPort-MailFlowPolicy', self.testXIronPortMailFlowPolicy), + ('X-Remote-IP', self.testXRemoteIP), + ('X-SEA-Spam', self.testXSeaSpam), + ('X-FireEye', self.testXFireEye), + ('X-AntiAbuse', self.testXAntiAbuse), + ('X-TMASE-Version', self.testXTMVersion), + ('X-TM-AS-Product-Ver', self.testXTMProductVer), + ('X-TM-AS-Result', self.testXTMResult), + ('X-IMSS-Scan-Details', self.testXTMScanDetails), + ('X-TM-AS-User-Approved-Sender', self.testXTMApprSender), + ('X-TM-AS-User-Blocked-Sender', self.testXTMBlockSender), + ('X-TMASE-Result', self.testXTMASEResult), + ('X-TMASE-SNAP-Result', self.testXTMSnapResult), + ('X-IMSS-DKIM-White-List', self.testXTMDKIM), + ('X-TM-AS-Result-Xfilter', self.testXTMXFilter), + ('X-TM-AS-SMTP', self.testXTMASSMTP), + ('X-TMASE-SNAP-Result', self.testXTMASESNAP), + ('X-TM-Authentication-Results', self.testXTMAuthenticationResults), + ('X-Scanned-By', self.testXScannedBy), + ('X-Mimecast-Spam-Signature', self.testXMimecastSpamSignature), + ('X-Mimecast-Bulk-Signature', self.testXMimecastBulkSignature), + ('X-Sender-IP', self.testXSenderIP), + ('X-Forefront-Antispam-Report-Untrusted', self.testForefrontAntiSpamReportUntrusted), + ('X-Microsoft-Antispam-Untrusted', self.testForefrontAntiSpamUntrusted), + # + # These tests shall be the last ones. + # + ('Similar to SpamAssassin Spam Level headers', self.testSpamAssassinSpamAlikeLevels), + ('SMTP Header Contained IP address', self.testMessageHeaderContainedIP), ('Other unrecognized Spam Related Headers', self.testSpamRelatedHeaders), ('Other interesting headers', self.testInterestingHeaders), + ('Security Appliances Spotted', self.testSecurityAppliances), ) + for i in range(len(SMTPHeadersAnalysis.Handled_Spam_Headers)): + SMTPHeadersAnalysis.Handled_Spam_Headers[i] = SMTPHeadersAnalysis.Handled_Spam_Headers[i].lower() + testsDecodeAll = ( ('X-Microsoft-Antispam-Message-Info', self.testMicrosoftAntiSpamMessageInfo), ('Decoded Mail-encoded header values', self.testDecodeEncodedHeaders), @@ -1083,6 +1230,21 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if options['debug']: raise + for k in self.results.keys(): + if len(self.results[k]) == 0: + continue + + for kk in ['description', 'header', 'value']: + if kk not in list(self.results[k].keys()): + self.results[k][kk] = '' + + self.logger.dbg(f'\n------------------------------------------\nAttempted to process following SMTP headers ({len(SMTPHeadersAnalysis.Handled_Spam_Headers)}):') + + for header in SMTPHeadersAnalysis.Handled_Spam_Headers: + self.logger.dbg(f'\t- {header.capitalize()}') + + self.logger.dbg('\n------------------------------------------\n\n') + return {k: v for k, v in self.results.items() if v} @staticmethod @@ -1147,45 +1309,507 @@ Results will be unsound. Make sure you have pasted your headers with correct spa lines.append(line) return '\n'.join(lines) - def testXVirusScan(self): - (num, header, value) = self.getHeader('X-Virus-Scanned') - if num == -1: return [] + def testSecurityAppliances(self): + result = '' + vals = [x.lower() for x in SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info] - result = f'- Message was scanned with an Anti-Virus.' + self.logger.dbg('Spotted clues about security appliances:') + + for a in self.securityAppliances: + parts = a.split(' ') + skip = True + + self.logger.dbg(f'\t- {a}') + + for p in parts: + if p.lower() in vals: + skip = False + break + + if skip: continue + result += f'\t- {a}\n' if len(result) == 0: return [] + return { + 'header': '', + 'value' : '', + 'analysis' : '- During headers analysis, spotted following clues about Security Appliances:\n\n' + result, + 'description' : '', + } + + def testXTMVersion(self): + (num, header, value) = self.getHeader('X-TMASE-Version') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam Engine (TMASE) Version\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + parts = value.split('-') + + if len(parts) > 0: result += f'\t\t- Vendor Product Name: {parts[0]}\n' + if len(parts) > 1: result += f'\t\t- Product Version: {parts[1]}\n' + if len(parts) > 2: result += f'\t\t- Anti-Spam Enginge Version: {parts[2]}\n' + if len(parts) > 3: result += f'\t\t- Spam Pattern Version: {parts[3]}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMProductVer(self): + (num, header, value) = self.getHeader('X-TM-AS-Product-Ver') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam Engine (TMASE) Version\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + parts = value.split('-') + + if len(parts) > 0: result += f'\t\t- Vendor Product Name: {parts[0]}\n' + if len(parts) > 1: result += f'\t\t- Product Version: {parts[1]}\n' + if len(parts) > 2: result += f'\t\t- Anti-Spam Enginge Version: {parts[2]}\n' + if len(parts) > 3: result += f'\t\t- Spam Pattern Version: {parts[3]}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMApprSender(self): + (num, header, value) = self.getHeader('X-TM-AS-User-Approved-Sender') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + + if value.strip().lower() == 'yes': + result += self.logger.colored('\t- system Approved this Sender\n', 'green') + + if value.strip().lower() == 'no': + result += self.logger.colored('\t- system did not Approve this Sender\n', 'red') + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMBlockSender(self): + (num, header, value) = self.getHeader('X-TM-AS-User-Blocked-Sender') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + + if value.strip().lower() == 'yes': + result += self.logger.colored('\t- system Blocked this Sender\n', 'red') + + if value.strip().lower() == 'no': + result += '\t- system did not Block this Sender\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMASESNAP(self): + (num, header, value) = self.getHeader('X-TMASE-SNAP-Result') + if num == -1: return [] + + result = '- Trend Micro Anti Spam Engine (TMASE) Social Engineering Attack Protection (SNAP) scan result\n\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + + parts = value.strip().split('-') + + if len(parts) > 0: result += f'\t- System Version: {parts[0]}\n' + if len(parts) > 1: result += f'\t- Scan Result: {parts[1]}\n' + if len(parts) > 2: result += f'\t- Scan Aggressive Level: {parts[2]}\n' + if len(parts) > 3: + result += f'\t- Traverse List:' + + if ',' in parts[3]: + num = 0 + for s in parts[3].split(','): + num += 1 + rule, matched = s.split(':') + + if matched == '0': + m = 'not matched' + else: + m = m = self.logger('matched', 'red') + matched = self.logger.red(matched, 'red') + + if num == 1: + result += f' - rule: {rule} - matched: {matched} ({m})\n' + else: + result += f'\t\t\t\t - rule: {rule} - matched: {matched} ({m})\n' + else: + result += f'\t\t- {parts[3]}\n' + + if len(parts) > 4: result += f'\t- Unknown: {parts[4]}\n' + + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMDKIM(self): + (num, header, value) = self.getHeader('X-IMSS-DKIM-White-List') + if num == -1: return [] + + self.securityAppliances.add('Trend Micro InterScan Messaging Security') + + if value.strip().lower() == 'yes': + result = '- Trend Micro InterScan Messaging Security DKIM White Listed this sender\n' + + if value.strip().lower() == 'no': + result = '- Trend Micro InterScan Messaging Security did not DKIM White List this sender\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMResult(self): + (num, header, value) = self.getHeader('X-TM-AS-Result') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam Result\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + return self._parseTMASResult(result, '', num, header, value) + + def testXTMASEResult(self): + (num, header, value) = self.getHeader('X-TMASE-Result') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam Engine (TMASE) Result\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + return self._parseTMASResult(result, '', num, header, value) + + def testXTMScanDetails(self): + (num, header, value) = self.getHeader('X-IMSS-Scan-Details') + if num == -1: return [] + + result = '- Trend Micro InterScan Messaging Security Scan Details\n' + self.securityAppliances.add('Trend Micro InterScan Messaging Security') + return self._parseTMASResult(result, '', num, header, value) + + def _parseTMASResult(self, topicLine, description, num, header, value): + value = value.replace('--', '-=') + parts = value.split('-') + + result = topicLine + + thresh = 0 + if len(parts) > 2: + try: + thresh = float(parts[2]) + except: + pass + + score = 0 + if len(parts) > 1: + score = parts[1] + if parts[1].startswith('='): + score = '-' + score[1:] + + try: + score = float(score) + except: + pass + + col = 'yellow' + if score != 0 and thresh != 0: + if score < thresh: + col = 'green' + + elif score >= thresh: + col = 'red' + + if len(parts) > 0: + val2 = '' + val = parts[0].strip() + + if val.strip().lower() == 'yes': + val = self.logger.colored(val.upper(), col) + + if val.strip().lower() == 'no': + val = self.logger.colored(val.upper(), col) + val2 = ' (SPS filter did not trigger)' + + result += f'\t\t- Is it SPAM?: {val}{val2}\n' + + if len(parts) > 1: + if parts[1].startswith('='): parts[1] = '-' + parts[1][1:] + result += f'\t\t- Trend/Spam Score: {self.logger.colored(parts[1], col)}\n' + + if len(parts) > 2: + result += f'\t\t- Detection Threshold: {parts[2]}\n' + + if len(parts) > 3: + result += f'\t\t- Category : {parts[3]}\n' + + if len(parts) > 4: + result += f'\t\t- Trend Type : {parts[4]}' + + try: + t = int(parts[4]) + if t in SMTPHeadersAnalysis.Trend_Type_AntiSpam.keys(): + result += ' (' + SMTPHeadersAnalysis.Trend_Type_AntiSpam[k] + ')' + except: + pass + + result += '\n' + + return { + 'header': header, + 'value' : value.replace('-=', '--'), + 'analysis' : result, + 'description' : description, + } + + def testXScannedBy(self): + hdrs = ( + 'X-Scanned-By', + 'X-ScannedBy', + 'XScannedBy', + 'XScanned-By', + 'X-Scanned', + 'X-Scan', + 'X-Scan-By', + ) + + for hdr in hdrs: + (num, header, value) = self.getHeader(hdr) + if num == -1: continue + + val = self.logger.colored(value, "yellow") + result = f'- Scanned by: {val}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + return [] + + def testXTMSnapResult(self): + (num, header, value) = self.getHeader('X-TMASE-SNAP-Result') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam Engine (TMASE) SNAP Result\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + result += f'\t- {value}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMXFilter(self): + (num, header, value) = self.getHeader('X-TM-AS-Result-Xfilter') + if num == -1: return [] + + result = '- Trend Micro Anti-Spam XFilter\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + result += f'\t- {value}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXTMASSMTP(self): + (num, header, value) = self.getHeader('X-TM-AS-SMTP') + if num == -1: return [] + + value = SMTPHeadersAnalysis.flattenLine(value) + result = '- Trend Micro Anti-Spam SMTP servers\n' + self.securityAppliances.add('Trend Micro Anti-Spam') + parts = value.split(' ') + + if len(parts) > 2: + try: + p2 = float(parts[0]) + result += f'\t- Priority: {p2}\n' + + except: + result += f'\t- Priority: {parts[0]}\n' + + result += f'\t- Server: {SMTPHeadersAnalysis.safeBase64Decode(parts[1])}\n' + result += f'\t- Recipient: {SMTPHeadersAnalysis.safeBase64Decode(parts[2])}\n' + + return { + 'header': header, + 'value' : value, + 'analysis' : result, + 'description' : '', + } + + def testXVirusScan(self): + (num, header, value) = self.getHeader('X-Virus-Scanned') + if num == -1: return [] + + result = f'- Message was scanned with an Anti-Virus.' + self.securityAppliances.add('Unknown Anti-Virus') + return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } + def testXFireEye(self): + (num, header, value) = self.getHeader('X-FireEye') + if num == -1: return [] + + result = f'- Message was scanned with FireEye Email Security Solution. Result is following:\n' + self.securityAppliances.add('FireEye Email Security Solution') + result += f'\t- {self.logger.colored(value, "green")}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def testXAntiAbuse(self): + result = '' + tmp = '\n' + + SMTPHeadersAnalysis.Handled_Spam_Headers.append('x-antiabuse') + + for num, header, value in self.headers: + if header.lower() != 'x-antiabuse': + continue + + tmp += ' ' + SMTPHeadersAnalysis.flattenLine(value) + '\n' + + if len(tmp) > 5: + result = f''' + - Anti-Abuse message was included in mail headers: + {tmp} +''' + else: + return [] + + return { + 'header' : '-', + 'value': '-', + 'analysis' : result, + 'description' : '', + } + + def testMessageHeaderContainedIP(self): + result = '' + shown = set() + num0 = 0 + tmp = '' + + for num, header, value in self.headers: + if header in shown or header in SMTPHeadersAnalysis.Handled_Spam_Headers: + continue + + match = re.match(r'.{,5}\b([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\b.{,5}', value) + if match: + ipaddr = match.group(1) + + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header) + + num0 += 1 + tmp += f'\t({num0:02}) Header: {self.logger.colored(header, "yellow")} contained an IP address:\n' + + shown.add(header) + + resolved = SMTPHeadersAnalysis.resolveAddress(ipaddr) + + if len(resolved) > 0: + tmp += f'\t Value: {self.logger.colored(ipaddr, "green")} (resolved: {resolved})\n\n' + else: + tmp += f'\t Value: {self.logger.colored(ipaddr, "green")}\n\n' + + if len(tmp) > 0: + result += tmp + '\n' + + if len(result) == 0: + return [] + + return { + 'header' : '-', + 'value': '-', + 'analysis' : result, + 'description' : '', + } + + def testXSenderIP(self): + (num, header, value) = self.getHeader('X-Sender-IP') + if num == -1: return [] + + result = f'- Connecting Client probably leaved its IP address: ' + return self._originatingIPTest(result, '', num, header, value) + + def testXRemoteIP(self): + (num, header, value) = self.getHeader('X-Remote-IP') + if num == -1: return [] + + result = f'- Connecting Client probably leaved its IP address: ' + return self._originatingIPTest(result, '', num, header, value) + def testXOriginatingIP(self): (num, header, value) = self.getHeader('x-originating-ip') if num == -1: return [] result = f'- Connecting Client leaved its IP address: ' + return self._originatingIPTest(result, '', num, header, value) + + def testXIronPortRemoteIP(self): + (num, header, value) = self.getHeader('X-IronPort-RemoteIP') + if num == -1: return [] + + result = f'- Cisco IronPort observed following IP of the connecting Client: ' + self.securityAppliances.add('Cisco IronPort') + return self._originatingIPTest(result, '', num, header, value) + + def _originatingIPTest(self, topicLine, description, num, header, value): + result = '' if '[' == value[0] and value[-1] == ']': value = value[1:-1] - resolved = '' - try: - resolved = socket.gethostbyaddr(value) - except Exception as e: - pass + resolved = SMTPHeadersAnalysis.resolveAddress(value) + + result += topicLine if len(resolved) > 0: - result += f'{self.logger.colored(value, "red")} (resolved: {resolved})\n' + result += f'\n\t- {self.logger.colored(value, "red")} (resolved: {resolved})\n' else: - result += f'{self.logger.colored(value, "red")}\n' + result += f'\n\t- {self.logger.colored(value, "red")} (not resolveable)\n' return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : description, } def testSubjecThreadTopic(self): @@ -1220,14 +1844,118 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : f'{header1}, {header2}', 'value': f'\n{header1}:\n\t{value1}\n\n {header2}:\n\t{value2}', - 'analysis' : result + 'analysis' : result, + 'description' : '', } + def testXSeaSpam(self): + (num, header, value) = self.getHeader('X-SEA-Spam') + if num == -1: return [] + + result = f'- Sophos Email Appliance Spam report:\n' + self.securityAppliances.add('Sophos Email Appliance') + report = {} + value = SMTPHeadersAnalysis.flattenLine(value) + + for match in re.finditer(r"(\w+)=(?!')([^,]+)\b,?", value, re.I): + key = match.group(1) + val = match.group(2) + + if not key: key = '' + if not val: val = '' + + if len(key.strip()) == 0: continue + report[key] = val.strip() + + for match in re.finditer(r"(\w+)='([^']+)'", value, re.I): + key = match.group(1) + val = match.group(2) + + if not key: key = '' + if not val: val = '' + + if len(key.strip()) == 0: continue + report[key] = [x.strip() for x in val.strip().split(',') if len(x.strip()) > 0] + + + for key, val in report.items(): + k = self.logger.colored(key, 'cyan') + + if key.lower() in SMTPHeadersAnalysis.SEA_Spam_Fields.keys(): + result += f'\n\t- {k}: {SMTPHeadersAnalysis.SEA_Spam_Fields[key.lower()]}\n' + else: + result += f'\n\t- {k}: \n' + + if key.lower() == 'report': + result = result[:-1] + result += f'. Matched {self.logger.colored(len(val), "yellow")} rules.\n\n' + + for rule in val: + if len(rule.strip() ) == 0: continue + num = 1 + num2 = 0 + + if ' ' in rule: + rulen, num = rule.split(' ') + try: + num2 = float(num) + except: + pass + else: + rulen = rule + + col = 'white' + + if num2 > 0: + col = 'yellow' + + if num2 > 0.5: + col = 'red' + + num = self.logger.colored(num, col) + rulen = self.logger.colored(rulen, col) + + result += f'\t\t- Probability: {num}\tRule: {rulen}\n' + + elif key.lower() == 'gauge': + leng = len(val) + numX = val.lower().count('x') + numI = val.lower().count('i') + others = leng - numX - numI + + probX = (float(numX) / leng) * 100.0 + probI = (float(numI) / leng) * 100.0 + probOthers = (float(others) / leng) * 100.0 + + result += f'\n\t - Value: {self.logger.colored(val, "yellow")}\n' + result += f'\t - Total length: {leng}\n' + result += f'\t - Number of X: {numX} ({probX:.02}%)\n' + result += f'\t - Number of I: {numI} ({probI:.02}%)\n' + result += f'\t - Number of others: {others} ({probOthers:.02}%)\n' + + elif type(val) == list or type(val) == tuple: + result += f'\t Contains {self.logger.colored(len(val), "yellow")} elements.\n' + for rule in val: + if len(rule.strip()) == 0: continue + result += f'\t - {rule}\n' + + else: + result += f'\t {val}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def testSpamDiagnosticMetadata(self): (num, header, value) = self.getHeader('SpamDiagnosticMetadata') if num == -1: return [] result = f'- SpamDiagnosticMetadata: Antispam stamps in Exchange Server 2016.\n' + self.securityAppliances.add('Exchange Server 2016 Anti-Spam') if value.strip() in SMTPHeadersAnalysis.Spam_Diagnostics_Metadata.keys(): result += f' {value}: ' + SMTPHeadersAnalysis.Spam_Diagnostics_Metadata[value.strip()] + '\n' @@ -1237,13 +1965,131 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', + } + + def testXIronPortSenderGroup(self): + (num, header, value) = self.getHeader('X-IronPort-SenderGroup') + if num == -1: return [] + + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + return self._parseCiscoPolicy('\n- Cisco\'s Email Security Appliance (ESA) applied following Mail Flow policy to this e-mails SenderGroup:\n', '', num, header, value) + + def testXIronPortMailFlowPolicy(self): + (num, header, value) = self.getHeader('X-IronPort-MailFlowPolicy') + if num == -1: return [] + + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + return self._parseCiscoPolicy( + '\n- Cisco\'s Email Security Appliance (ESA) applied following Mail Flow policy to this e-mail:\n', + ''' +A mail flow policy allows you to control or limit the flow of email messages from a sender to the listener during the SMTP conversation. +You control SMTP conversations by defining the following types of parameters in the mail flow policy. + +Src: https://www.cisco.com/c/en/us/support/docs/security/email-security-appliance/118179-configure-esa-00.html +''', + num, header, value) + + def testXPolicy(self): + (num, header, value) = self.getHeader('X-Policy') + if num == -1: return [] + + if value.strip().upper() in SMTPHeadersAnalysis.Cisco_Predefined_MailFlow_Policies.keys(): + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + return self._parseCiscoPolicy('\n- Cisco\'s Email Security Appliance (ESA) applied following Mail Flow policy to this e-mail:\n', '', num, header, value) + + else: + result = '\n- Mail systems applied following policy to this message:\n' + result += f'\n\t- {value}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def _parseCiscoPolicy(self, topicLine, description, num, header, value): + result = '' + + result += '\n' + topicLine + + k = value.strip().upper() + k2 = self.logger.colored(k, "yellow") + + if k in SMTPHeadersAnalysis.Cisco_Predefined_MailFlow_Policies.keys(): + result += f'\t {k2}: ' + SMTPHeadersAnalysis.Cisco_Predefined_MailFlow_Policies[k] + '\n' + else: + result += f'\t {k2}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : description + } + + def testXIronPortReputation(self): + (num, header, value) = self.getHeader('X-IronPort-Reputation') + if num == -1: return [] + + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + topicLine = f'\n\n- Cisco SenderBase Reputation Service result:\n' + return self._parseCiscoSBRS(topicLine, '', num, header, value) + + def testXSBRS(self): + (num, header, value) = self.getHeader('X-SBRS') + if num == -1: return [] + + topicLine = f'\n\n- Cisco SenderBase Reputation Service result (custom header set):\n' + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + return self._parseCiscoSBRS(topicLine, '', num, header, value) + + + def _parseCiscoSBRS(self, topicLine, description, num, header, value): + result = '' + + if not description or len(description) == 0: + description = ''' +The SenderBase Reputation Score (SBRS) is a numeric value assigned to an IP address based on information +from the SenderBase Reputation Service. The SenderBase Reputation Service aggregates data from over 25 +public blocked lists and open proxy lists, and combines this data with global data from SenderBase to assign +a score from -10.0 to +10.0 . + +Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA_Admin_Guide_11_1/b_ESA_Admin_Guide_chapter_0101.pdf +''' + + result += topicLine + + num = 0 + try: + num = float(value.strip()) + + if num < 0: + result += f'\t- Likely {self.logger.colored(f"source of SPAM ({num})", "red")}\n' + + elif num >= 0 and num < 5: + result += f'\t- Likely {self.logger.colored(f"neutral ({num})", "yellow")}\n' + + elif num > 5: + result += f'\t- Likely {self.logger.colored(f"trustworthy sender ({num})", "green")}\n' + + except: + result = f'\t- {value} (could not rate that score!)\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : description, } def testXIronPortAV(self): (num, header, value) = self.getHeader('X-IronPort-AV') if num == -1: return [] + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') result = f'- Cisco IronPort Anti-Virus interface.\n' value = SMTPHeadersAnalysis.flattenLine(value) @@ -1333,19 +2179,62 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', + } + + def testXIronPortSpamFiltered(self): + (num, header, value) = self.getHeader('X-IronPort-Anti-Spam-Filtered') + if num == -1: return [] + + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + if value.strip().strip() == 'true': + result = f'- Cisco IronPort Anti-Spam rules {self.logger.colored("marked this message SPAM", "red")}.' + else: + result = f'- Cisco IronPort Anti-Spam rules considered this message CLEAN.' + + result += f' Value: {value}' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def testXIronPortSpamResult(self): + (num, header, value) = self.getHeader('X-IronPort-Anti-Spam-Result') + if num == -1: return [] + + self.securityAppliances.add('Cisco IronPort / Email Security Appliance (ESA)') + if self.decode_all: + dumped = SMTPHeadersAnalysis.hexdump(SMTPHeadersAnalysis.safeBase64Decode(value)) + + result = f'- Cisco IronPort Anti-Spam result encrypted blob:\n\n' + result += dumped + '\n' + + else: + result = f'- Cisco IronPort Anti-Spam result encrypted blob. Use --decode-all to print its hexdump.' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', } def testXSpamCheckerVersion(self): (num, header, value) = self.getHeader('X-Spam-Checker-Version') if num == -1: return [] + self.securityAppliances.add('SpamAssassin') result = f'- SpamAssassin version.' return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testOvhSpamScore(self): @@ -1353,19 +2242,22 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] result = f'- OVH considered this message as SPAM and attached following Spam ' + self.securityAppliances.add('OVH Anti-Spam') value = SMTPHeadersAnalysis.flattenLine(value).replace(' ', '').replace('\t', '') result += f'Score: {self.logger.colored(value.strip(), "red")}\n' return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testOvhSpamReason(self): (num, header, value) = self.getHeader('X-Ovh-Spam-Reason') if num == -1: return [] + self.securityAppliances.add('OVH Anti-Spam') result = self.logger.colored(f'- OVH considered this message as SPAM', 'red') + ' and attached following information:\n' value = SMTPHeadersAnalysis.flattenLine(value).replace(' ', '').replace('\t', '') tmp = '' @@ -1379,7 +2271,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testSpamCause(self): @@ -1387,6 +2280,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] result = '' + self.securityAppliances.add('OVH Anti-Spam') value = SMTPHeadersAnalysis.flattenLine(value).replace(' ', '').replace('\t', '') decoded = SMTPHeadersAnalysis.decodeSpamcause(value) @@ -1407,7 +2301,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testMSFBL(self): @@ -1430,7 +2325,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testInterestingHeaders(self): @@ -1445,6 +2341,52 @@ Results will be unsound. Make sure you have pasted your headers with correct spa SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info ) + def testSpamAssassinSpamAlikeLevels(self): + result = '' + tmp = '' + num0 = 0 + headers = [] + values = [] + + shown = set() + handled = [x.lower() for x in SMTPHeadersAnalysis.Handled_Spam_Headers] + + for num, header, value in self.headers: + value = SMTPHeadersAnalysis.flattenLine(value) + + if header in shown or header in handled: + continue + + if re.match(r'\s*\*{1,6}\s*', value): + num0 += 1 + + out = self._parseAsteriskRiskScore('', '', num, header, value) + headers.append(header) + values.append(value) + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header) + + tmp += f'\t({num0:02}) {self.logger.colored("Header", "magenta")}: {header}\n' + tmp += out['analysis'] + shown.add(header) + + if len(tmp) > 0: + self.securityAppliances.add('SpamAssassin alike') + result = '\n- Found SpamAssassin like headers that might indicate Spam Risk score:\n' + result += tmp + '\n' + + if len(result) == 0: + return [] + + if len(headers) > 1: headers[0] = '\t' + headers[0] + if len(values) > 1: values[0] = '\t' + values[0] + + return { + 'header' : '\n\t'.join(headers), + 'value': '\n\t'.join(values), + 'analysis' : result, + 'description' : '', + } + def _testListRelatedHeaders(self, msg, listOfValues): result = '' tmp = '' @@ -1456,14 +2398,14 @@ Results will be unsound. Make sure you have pasted your headers with correct spa for num, header, value in self.headers: value = SMTPHeadersAnalysis.flattenLine(value) - if header in shown: + if header in shown or header.lower() in handled: continue for dodgy in listOfValues: if header in shown: break - if dodgy in header.lower() and header.lower() not in handled: + if dodgy in header.lower() and header.lower(): num0 += 1 hhh = re.sub(r'(' + re.escape(dodgy) + r')', self.logger.colored(r'\1', 'red'), header, flags=re.I) @@ -1473,7 +2415,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa shown.add(header) break - elif dodgy in value.lower() and header.lower() not in handled: + elif dodgy in value.lower() and header.lower(): num0 += 1 hhh = header tmp += f'\t({num0:02}) Header: {hhh}\n' @@ -1490,8 +2432,10 @@ Results will be unsound. Make sure you have pasted your headers with correct spa ctx = value[a:b] + ctx = ctx.strip() + tmp += f'\t Keyword: {dodgy}\n' - tmp += f'\t {self.logger.colored("Value", "magenta")}:\n\n{ctx}\n\n' + tmp += f'\t {self.logger.colored("Value", "magenta")}:\n\n{ctx}\n\n' shown.add(header) break @@ -1505,13 +2449,15 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : '-', 'value': '-', - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testSpamAssassinSpamStatus(self): (num, header, value) = self.getHeader('X-Spam-Status') if num == -1: return [] + self.securityAppliances.add('SpamAssassin') result = '- SpamAssassin spam report\n\n' parsed = {} @@ -1586,7 +2532,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testDomainImpersonation(self): @@ -1627,6 +2574,9 @@ Results will be unsound. Make sure you have pasted your headers with correct spa senderDomain = SMTPHeadersAnalysis.extractDomain(revMailDomain) firstHopDomain1 = SMTPHeadersAnalysis.extractDomain(revFirstSenderDomain) + if len(senderDomain) == 0: senderDomain = domain + if len(firstHopDomain1) == 0: firstHopDomain1 = firstHop["host"] + result += f'\t- Mail From: <{email}>\n\n' result += f'\t- Mail Domain: {domain}\n' result += f'\t --> resolves to: {mailDomainAddr}\n' @@ -1671,47 +2621,66 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } - def testSpamAssassinSpamFlag(self): (num, header, value) = self.getHeader('X-Spam-Flag') if num == -1: return [] + self.securityAppliances.add('SpamAssassin') + if value.strip().lower() == 'yes': result = self.logger.colored(f'- SpamAssassin marked this message as SPAM:\n', 'red') result += f'\t- ' + self.logger.colored(value, 'red') + '\n' + else: + result = self.logger.colored(f'- SpamAssassin did not mark this message as spam:\n', 'green') + result += f'\t- ' + self.logger.colored(value, 'green') + '\n' - return { - 'header' : header, - 'value': value, - 'analysis' : result - } + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } - return [] + def _parseAsteriskRiskScore(self, topicLine, description, num, header, value): + desc = '' + + result = topicLine + value = value.strip() + val = self.logger.colored(value, 'yellow') + + if len(value) <= 6: + if value in SMTPHeadersAnalysis.Aterisk_Risk_Score.keys(): + desc = f' ({SMTPHeadersAnalysis.Aterisk_Risk_Score[value]})' + + a = len(value) + b = 6 + result += f'\t- ({a}/{b}) {val} {desc}\n\n' + + else: + result += f'\t- {val}\n\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : description, + } def testSpamAssassinSpamLevel(self): (num, header, value) = self.getHeader('X-Spam-Level') if num == -1: return [] - - if len(value.strip()) > 0: - result = f'- SpamAssassin assigned following spam level to this message:\n' - _num = self.logger.colored(str(len(value.strip())), 'red') - result += f'\t- ' + self.logger.colored(value.strip(), 'red') + f' (number: {_num})\n' - - return { - 'header' : header, - 'value': value, - 'analysis' : result - } - - return [] + self.securityAppliances.add('SpamAssassin') + return self._parseAsteriskRiskScore('- SpamAssassin assigned following spam level to this message:\n', '', num, header, value) def testSpamAssassinSpamReport(self): (num, header, value) = self.getHeader('X-Spam-Report') if num == -1: return [] + self.securityAppliances.add('SpamAssassin') if len(value.strip()) > 0: result = f'- SpamAssassin assigned following spam report to this message:\n' tmp = '' @@ -1726,7 +2695,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } return [] @@ -1736,6 +2706,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] props = value.split('|') + self.securityAppliances.add('MS Defender Advanced Threat Protection') result = '- MS Defender Advanced Threat Protection enabled following protections on this message:\n' for prop in props: @@ -1745,11 +2716,237 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } + def testXAmpResult(self): + (num, header, value) = self.getHeader('X-Amp-Result') + if num == -1: return [] + + self.securityAppliances.add('Cisco Advanced Malware Protection (AMP)') + result = '- Cisco Meraki Advanced Malware Protection (AMP) sandbox marked this message as:\n' + val = value.strip() + k = value.strip().upper() + + if k in SMTPHeadersAnalysis.AMP_Results.keys(): + val = f'\t- {k}: {SMTPHeadersAnalysis.AMP_Results[k]}\n' + else: + result += f'\t- {val}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def testXIPSpamVerdict(self): + (num, header, value) = self.getHeader('X-IP-Spam-Verdict') + if num == -1: return [] + + self.securityAppliances.add('SpamAssassin') + result = '- An old SpamAssassin SPAM verdict header:\n' + + col = 'cyan' + if 'spam' in value.lower(): + col = "red" + + result += '\t- ' + self.logger.colored(value.strip(), col) + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def parseReceived(self, received): + obj = { + 'host' : '', + 'host2' : '', + 'ip' : '', + 'timestamp' : '', + 'ver' : '', + 'with' : '', + 'extra' : [], + } + + keys = ( + 'from', + 'by', + 'via', + 'with', + 'id', + 'for', + ) + + found = set() + + parsed = {} + + pos = 0 + lastkey = '' + posOfKey = 0 + extrapos = 0 + + if not received.lower().strip().startswith('from'): + received = 'from ' + received + + paren = 0 + while pos < len(received): + keynow = '' + + if received[pos] == '(': + paren += 1 + pos += 1 + continue + + elif received[pos] == ')': + paren -= 1 + pos += 1 + continue + + if paren > 0 or received[pos] in string.whitespace: + pos += 1 + continue + + for key in keys: + if key in found: continue + tmp = False + if pos == 0: tmp = True + else: tmp = (received[pos-1] in string.whitespace) + + if received[pos:].lower().startswith(key + ' ') and tmp: + if lastkey != '': + parsed[lastkey] = received[posOfKey+len(lastkey)+1:pos].strip() + + lastkey = keynow = key + posOfKey = pos + found.add(key) + pos += len(key) + break + + pos += 1 + + if lastkey not in parsed.keys(): + parsed[lastkey] = received[posOfKey+len(lastkey)+1:].strip() + + if ';' in parsed[lastkey]: + pos = parsed[lastkey].find(';') + parsed[lastkey] = parsed[lastkey][:pos] + + obj['parsed'] = parsed + + if 'from' not in parsed.keys(): + return {} + + obj['host'] = '' + obj['ip'] = '' + obj['host2'] = '' + + match = re.search( + r'(?P[^\s]+)\s+(?:\((?P[^\s]+)(?:\s*\[(?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\])?\))?', + parsed['from'], + re.I + ) + + if match: + obj['host'] = match.group('host') + obj['ip'] = match.group('ip') + obj['host2'] = match.group('host2') + + if not obj['ip']: obj['ip'] = '' + if not obj['host']: obj['host'] = '' + if not obj['host2']: obj['host2'] = '' + + if obj['host'][0] == '[' and obj['host'][-1] == ']': + obj['ip'] = obj['host'][1:-1] + obj['host'] = '' + + obj['host2'] = obj['host2'].lower().replace('ehlo=', 'helo=') + + if 'helo=' in obj['host2'].lower(): + obj['host'] = obj['host2'][obj['host2'].lower().find('helo=')+5:] + obj['host2'] = '' + + if obj['host'] == '' and obj['ip'] != '': + try: + res = socket.gethostbyaddr(obj['ip']) + if len(res) > 0: + obj['host'] = res[0] + except: + pass + + if len(obj['host2']) > 0: + if obj['ip'] == None or len(obj['ip']) == 0: + obj['ip'] = obj['host2'] + + if extrapos == 0: + a = received.find(obj['host']) + len(obj['host']) + b = received.find(obj['host2']) + len(obj['host2']) + c = received.find(obj['ip']) + len(obj['ip']) + + extrapos = max(a, b, c) + else: + return {} + + if 'id' in parsed.keys(): + ver = parsed['id'].strip() + obj['ver'] = ver + + pos = received.find(';') + if pos != -1: + ts = received[pos+1:].strip() + obj['timestamp'] = str(parser.parse(ts).astimezone(tzutc())) + + for m in re.finditer(r'(?[A-Za-z]{3}\,\s+\d{1,2}\s+[A-Za-z]{3}\s+\d{4}\s+\d{2}:\d{2}:\d{2}(?:\s+[\+-]\d{4}(?:\s+\([A-Z]{3}\))?)?)', r) - ver = re.search(r'id\s+([0-9\.]+)', r) - match = re.match(r'(?:from\s+(?P[^\s]+)\s*(?:\((?P[^\)]+)\))?)', r, re.I) - match2 = re.match(r'(?:by\s+(?P[^\s]+)\s*(?:\((?P[^\)]+)\))?)', r, re.I) + obj = self.parseReceived(r) - ts = None - if timestamp: - ts = parser.parse(timestamp.group('timestamp')).astimezone(tzutc()) + if 'ver' in obj.keys() and len(obj['ver']) > 0: + vers = SMTPHeadersAnalysis.parseExchangeVersion(obj['ver']) + if vers != None: + obj['ver'] = self.logger.colored(str(vers), 'magenta') - vers = '' - if ver: - vers = str(SMTPHeadersAnalysis.parseExchangeVersion(ver.group(1))) - - if not vers or len(vers) == 0: - vers = ver.group(1) - else: - vers = re.sub(r'fuzzy match:\s+fuzzy match:', 'fuzzy match:', vers) - vers = vers.replace('fuzzy match: fuzzy match:', 'fuzzy match: ') - - obj = None - if match: - obj = { - 'type' : 'from', - 'host' : match.group('from'), - 'ip' : match.group('from_ip'), - 'timestamp' : ts, - 'ver' : vers, - } - - elif match2: - obj = { - 'type' : 'by', - 'host' : match2.group('by'), - 'ip' : match2.group('by_ip'), - 'timestamp' : ts, - 'ver' : vers, - } - - if obj: - if (obj['ip'] == None or len(obj['ip']) == 0) and obj['host'] != None and len(obj['host']) > 0: + if obj and (obj['ip'] == None or len(obj['ip']) == 0): + if obj['host'] != None and len(obj['host']) > 0: try: obj['ip'] = socket.gethostbyname(obj['host']) except: @@ -1829,14 +3001,17 @@ Results will be unsound. Make sure you have pasted your headers with correct spa elif match2: obj['ip'] = match2.group(1) - path.append(obj) + path.append(obj) - path.append({ - 'type': 'from', - 'host' : v2, + if n2 != -1: + path.append({ + 'host' : self.logger.colored(v2, 'green'), + 'host2' : '', 'ip' : '', 'timestamp' : None, 'ver' : '', + 'parsed' : {}, + 'extra' : [], }) result = '- List of server hops used to deliver message:\n\n' @@ -1847,31 +3022,56 @@ Results will be unsound. Make sure you have pasted your headers with correct spa for i in range(len(path)): elem = path[i] - if elem['type'] != 'from': continue - num += 1 s = '-->' if i > 0: s = '|_>' - if num == 2: + if num == 2 or n1 == -1: result += iindent + indent * (num+1) + f'{s} ({num}) {self.logger.colored(elem["host"], "green")}' else: - result += iindent + indent * (num+1) + f'{s} ({num}) {elem["host"]}' + result += iindent + indent * (num+1) + f'{s} ({num}) {self.logger.colored(elem["host"], "yellow")}' + + if len(elem['host2']) > 0: + if elem['host2'].endswith('.'): + elem['host2'] = self.logger.colored(elem['host2'][:-1], 'yellow') + + if elem['host2'] != elem['host'] and elem['host2'] != elem['ip']: + result += f' (rev: {self.logger.colored(elem["host2"], "yellow")})' if elem['ip'] != None and len(elem['ip']) > 0: if num == 2: result += f' ({self.logger.colored(elem["ip"], "green")})\n' else: - result += f' ({elem["ip"]})\n' + result += f' ({self.logger.colored(elem["ip"], "yellow")})\n' else: result += '\n' if elem['timestamp'] != None: - result += iindent + indent * (num+3) + 'time: ' + elem['timestamp'].strftime('%d %b %Y %H:%M:%S') + '\n' + result += iindent + indent * (num+3) + 'time: ' + elem['timestamp'] + '\n' if len(elem['ver']) > 0: - result += iindent + indent * (num+3) + 'version: ' + elem['ver'] + '\n' + result += iindent + indent * (num+3) + 'id: ' + elem['ver'] + '\n' + + for kk, vv in elem['parsed'].items(): + vv = str(vv) + if len(vv.strip()) == 0: continue + + if kk.lower() not in ['ip', 'host', 'host2', 'id', 'timestamp', 'parsed', 'extra', '_raw', 'from', 'time']: + n = 8 - len(kk) + if n < 0: n = 0 + vv2 = vv.replace('<', '').replace('>', '') + if kk == 'for' and (vv2 == v1 or vv2 == v2): + continue + + vv = self.colorizeKeywords(vv) + result += iindent + indent * (num+3) + kk + ': ' + ' ' * (n) + vv + '\n' + + if 'extra' in elem.keys() and len(elem['extra']) > 0: + result += iindent + indent * (num+3) + 'extra: \n' + for vv in elem['extra']: + vv0 = self.colorizeKeywords(vv) + result += iindent + indent * (num+4) + '- ' + vv0 + '\n' result += '\n' @@ -1880,7 +3080,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : 'Received', 'value': '...', - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testAntispamReportCFA(self): @@ -1890,7 +3091,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa obj = { 'header' : header, 'value' : value, - 'analysis' : '' + 'analysis' : '', + 'description' : '', } result = '' @@ -1901,6 +3103,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa obj2 = self._parseAntiSpamReport(num, header, value) result += obj2['analysis'] + self.securityAppliances.add('MS ForeFront Anti-Spam') + obj['analysis'] = result return obj @@ -1908,6 +3112,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa (num, header, value) = self.getHeader('X-Microsoft-Antispam') if num == -1: return [] + self.securityAppliances.add('MS ForeFront Anti-Spam') return self._parseBulk(num, header, value) def _parseBulk(self, num, header, value): @@ -1948,13 +3153,29 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testForefrontAntiSpamReport(self): (num, header, value) = self.getHeader('X-Forefront-Antispam-Report') if num == -1: return [] + self.securityAppliances.add('MS ForeFront Anti-Spam') + return self._parseAntiSpamReport(num, header, value) + + def testForefrontAntiSpamReportUntrusted(self): + (num, header, value) = self.getHeader('X-Forefront-Antispam-Report-Untrusted') + if num == -1: return [] + + self.securityAppliances.add('MS ForeFront Anti-Spam') + return self._parseAntiSpamReport(num, header, value) + + def testForefrontAntiSpamUntrusted(self): + (num, header, value) = self.getHeader('X-Microsoft-Antispam-Untrusted') + if num == -1: return [] + + self.securityAppliances.add('MS ForeFront Anti-Spam') return self._parseAntiSpamReport(num, header, value) def _parseAntiSpamReport(self, num, header, value): @@ -1970,12 +3191,9 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if 'CIP' in parsed.keys(): res = '' if self.resolve: - try: - res = socket.gethostbyaddr(parsed['CIP']) - except: - pass + resolved = SMTPHeadersAnalysis.resolveAddress(parsed['CIP']) - result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address: {parsed["CIP"]} (resolved: {res[0]})\n\n' + result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address: {parsed["CIP"]} (resolved: {resolved})\n\n' else: result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address: {parsed["CIP"]}\n\n' @@ -2001,7 +3219,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa tmp += f'\t- {va}: {elem[1][va]}\n' if not found and len(v.strip()) > 0: - tmp += f'\t- Unknown value: "{v}" in parameter {k0}\n' + tmp += f'\t- {v}\n' found = True elif len(v) > 0: @@ -2085,7 +3303,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testFrom(self): @@ -2109,7 +3328,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testDecodeEncodedHeaders(self): @@ -2125,6 +3345,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if m: num0 += 1 + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header) + value_decoded = emailheader.decode_header(value)[0][0] if type(value_decoded) == bytes: value_decoded = value_decoded.decode() @@ -2154,7 +3376,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': '...', - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testMicrosoftAntiSpamMessageInfo(self): @@ -2165,18 +3388,24 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if type(value) == bytes: value = value.decode() + self.securityAppliances.add('MS ForeFront Anti-Spam') result = '- Base64 encoded & encrypted Antispam Message Info:\n\n' result += value tmp = '' - tmp += f'\n\n\t- Base64 decoded Hexdump:\n\n' - tmp += SMTPHeadersAnalysis.hexdump(base64.b64decode(value)) - tmp += '\n\n\n' + + if self.decode_all: + tmp += f'\n\n\t- Base64 decoded Hexdump:\n\n' + tmp += SMTPHeadersAnalysis.hexdump(base64.b64decode(value)) + tmp += '\n\n\n' + else: + tmp += '\n\n\t- Use --decode-all to print its hexdump.' return { 'header' : header, 'value': '...', - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testAntispamMailboxDelivery(self): @@ -2184,6 +3413,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] parsed = {} + self.securityAppliances.add('MS ForeFront Anti-Spam') result = '- This header denotes what to do with received message, where to put it.\n\n' for entry in value.split(';'): @@ -2250,7 +3480,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testXMailer(self): @@ -2258,13 +3489,15 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] vvv = self.logger.colored(value, 'magenta') + self.securityAppliances.add(value) result = f'- X-Mailer header was present and contained value: {vvv}\n' result + ' This header typically indicates sending client\'s name (similar to User-Agent).' return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testUserAgent(self): @@ -2278,7 +3511,37 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', + } + + def testXMimecastBulkSignature(self): + (num, header, value) = self.getHeader('X-Mimecast-Bulk-Signature') + if num == -1: return [] + + if value.strip().lower() == 'yes': + result = f'- Mimecast considers the message as Bulk: {self.logger.colored(value.upper(), "red")}\n' + else: + result = f'- Mimecast does not consider the message as Bulk: {self.logger.colored(value.upper(), "green")}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', + } + + def testXMimecastSpamSignature(self): + (num, header, value) = self.getHeader('X-Mimecast-Spam-Signature') + if num == -1: return [] + + result = f'- Mimecast considers the message as spam due to:\n\t- {self.logger.colored(value, "yellow")}\n' + + return { + 'header' : header, + 'value': value, + 'analysis' : result, + 'description' : '', } def testXMimecastSpamScore(self): @@ -2286,12 +3549,14 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if num == -1: return [] vvv = self.logger.colored(value, 'magenta') + self.securityAppliances.add('Mimecast') result = f'- Mimecast attached following Spam score: {vvv}\n' return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testTLCOObClasifiers(self): @@ -2322,7 +3587,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testTransportEndToEndLatency(self): @@ -2334,7 +3600,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testReceivedSPF(self): @@ -2357,7 +3624,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testAuthenticationResults(self): @@ -2366,6 +3634,9 @@ Results will be unsound. Make sure you have pasted your headers with correct spa def testARCAuthenticationResults(self): return self._testAuthenticationResults('ARC-Authentication-Results') + def testXTMAuthenticationResults(self): + return self._testAuthenticationResults('X-TM-Authentication-Results') + def _testAuthenticationResults(self, targetHeader): headersCounted = 0 headersCountedAll = 0 @@ -2377,6 +3648,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa for (num, header, value) in self.headers: if header.lower() == targetHeader.lower(): headersCounted += 1 + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header.lower()) out = self._testAuthenticationResultsWorker(num, header, value) if out != []: analysis = out['analysis'] @@ -2416,7 +3688,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : header, 'value': value, - 'analysis' : result + 'analysis' : result, + 'description' : '', } def testExtractIP(self): @@ -2436,16 +3709,16 @@ Results will be unsound. Make sure you have pasted your headers with correct spa resolved.add(addr) if self.resolve: self.logger.dbg(f'testExtractIP: Resolving {addr}...') - out = socket.gethostbyaddr(addr) + out = SMTPHeadersAnalysis.resolveAddress(ipaddr) addr = self.logger.colored(addr, 'magenta') - result += f'- Found IP address: ({addr}) that resolves to: {out[0]}\n' + result += f'\t- Found IP address: ({addr}) that resolves to: {out[0]}\n' else: addr = self.logger.colored(addr, 'magenta') - result += f'- Found IP address: ({addr})\n' + result += f'\t- Found IP address: ({addr})\n' except Exception as e: - result += f'- Found IP address: ({addr}) that wasn\'t resolved\n' + result += f'\t- Found IP address: ({addr}) that wasn\'t resolved\n' if len(resolved) == 0: return [] @@ -2453,7 +3726,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : '-', 'value': '-', - 'analysis' : result + 'analysis' : '\t' + result, + 'description' : '', } def testResolveIntoIP(self): @@ -2483,13 +3757,13 @@ Results will be unsound. Make sure you have pasted your headers with correct spa if type(out) == list: out = out[0] - result += f'- Found Domain: {d}\n\t\t- that resolves to: {out}\n' + result += f'\t- Found Domain: {self.logger.colored(d, "magenta")}\n\t\t- that resolves to: {out}\n' else: - result += f'- Found Domain: {d}\n' + result += f'\t- Found Domain: {self.logger.colored(d, "magenta")}\n' except Exception as e: - result += f'- Found Domain: ({d}) that wasn\'t resolved\n' + result += f'\t- Found Domain: ({self.logger.colored(d, "magenta")}) that wasn\'t resolved\n' if len(resolved) == 0: return [] @@ -2497,7 +3771,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : '-', 'value': '-', - 'analysis' : result + 'analysis' : '\t' + result, + 'description' : '', } def testBadKeywords(self): @@ -2505,10 +3780,12 @@ Results will be unsound. Make sure you have pasted your headers with correct spa for num, header, value in self.headers: for w in SMTPHeadersAnalysis.bad_keywords: if w.lower() in value.lower(): + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header) result += self.logger.colored(f'- Header\'s ({header}) value contained bad keyword: "{w}"\n', 'red') result += f' Value: {value}\n\n' elif w.lower() in header.lower(): + SMTPHeadersAnalysis.Handled_Spam_Headers.append(header) result += self.logger.colored(f'- Header\'s ({header}) name contained bad keyword: "{w}"\n\n', 'red') if len(result) == 0: @@ -2517,7 +3794,8 @@ Results will be unsound. Make sure you have pasted your headers with correct spa return { 'header' : '-', 'value': '-', - 'analysis' : result + 'analysis' : result, + 'description' : '', } def opts(argv): @@ -2564,16 +3842,33 @@ def printOutput(out): analysis = v['analysis'] value = v['value'] - analysis = analysis.replace('- ', '\t- ').strip() + analysis = analysis.strip() value = str(textwrap.fill( v['value'], width=width - 1, subsequent_indent=' ' * 4, initial_indent='', - replace_whitespace=False + replace_whitespace=False, )).strip() + description = '' + + if len(v['description']) > 1: + desc = v['description'] + desc = '\n'.join(textwrap.wrap( + desc, + width=width - 1, + subsequent_indent=' ' * 4, + initial_indent='', + replace_whitespace=True, + )).strip() + + description = f''' +{logger.colored("DESCRIPTION", "blue")}: + {desc} +''' + if len(v['header']) > 1 or len(value) > 1: output += f''' ------------------------------------------ @@ -2581,7 +3876,7 @@ def printOutput(out): {logger.colored("HEADER", "blue")}: {v['header']} - +{description} {logger.colored("VALUE", "blue")}: {value}