heavily updated decode-spam-headers.py and phishing-HTML-linter.py

This commit is contained in:
mgeeky 2021-10-29 03:22:54 +02:00
parent ca6fd32747
commit 619f594ba3
4 changed files with 903 additions and 216 deletions

View File

@ -95,9 +95,14 @@
It looks for things such as: It looks for things such as:
- Embedded images - `Embedded Images`
- Images with lacking `ALT=""` attribute - `Images without ALT`
- Anchors trying to masquerade links - `Masqueraded Links`
- `Use of underline tag <u>`
- `HTML code in <a> link tags`
- `<a href="..."> URL contained GET parameter`
- `<a href="..."> URL contained GET parameter with URL`
- `<a href="..."> URL pointed to an executable file`
Such characteristics are known bad smells that will let your e-mail blocked. Such characteristics are known bad smells that will let your e-mail blocked.

View File

@ -114,7 +114,7 @@ optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
Required arguments: Required arguments:
infile Input file to be analysed infile Input file to be analysed or --list tests to show available tests.
Options: Options:
-o OUTFILE, --outfile OUTFILE -o OUTFILE, --outfile OUTFILE
@ -124,12 +124,113 @@ Options:
-N, --nocolor Dont use colors in text output. -N, --nocolor Dont use colors in text output.
-v, --verbose Verbose mode. -v, --verbose Verbose mode.
-d, --debug Debug mode. -d, --debug Debug mode.
-l, --list List available tests and quit. Use it like so: --list tests
Tests: Tests:
-i tests, --include-tests tests
Comma-separated list of test IDs to run. Ex. --include-tests 1,3,7
-e tests, --exclude-tests tests
Comma-separated list of test IDs to skip. Ex. --exclude-tests 1,3,7
-r, --resolve Resolve IPv4 addresses / Domain names. -r, --resolve Resolve IPv4 addresses / Domain names.
-a, --decode-all Decode all =?us-ascii?Q? mail encoded messages and print their contents. -a, --decode-all Decode all =?us-ascii?Q? mail encoded messages and print their contents.
``` ```
If you want to run only a subset of tests, you'll first need to learn Test IDs of which to pick.
Run the script with `-l tests` to grab that list.
List available test and their corresponding IDs:
```
C:\> py decode-spam-headers.py -l tests
[.] Available tests:
TEST_ID - TEST_NAME
--------------------------------------
1 - Received - Mail Servers Flow
2 - Extracted IP addresses
3 - Extracted Domains
4 - Bad Keywords In Headers
5 - From Address Analysis
6 - Subject and Thread Topic Difference
7 - Authentication-Results
8 - ARC-Authentication-Results
9 - Received-SPF
10 - Mail Client Version
11 - User-Agent Version
12 - X-Forefront-Antispam-Report
13 - X-MS-Exchange-Organization-SCL
14 - X-Microsoft-Antispam-Mailbox-Delivery
15 - X-Microsoft-Antispam Bulk Mail
16 - X-Exchange-Antispam-Report-CFA-Test
17 - Domain Impersonation
18 - SpamAssassin Spam Status
19 - SpamAssassin Spam Level
20 - SpamAssassin Spam Flag
21 - SpamAssassin Spam Report
22 - OVH's X-VR-SPAMCAUSE
23 - OVH's X-Ovh-Spam-Reason
24 - OVH's X-Ovh-Spam-Score
25 - X-Virus-Scan
26 - X-Spam-Checker-Version
27 - X-IronPort-AV
28 - X-IronPort-Anti-Spam-Filtered
29 - X-IronPort-Anti-Spam-Result
30 - X-Mimecast-Spam-Score
31 - Spam Diagnostics Metadata
32 - MS Defender ATP Message Properties
33 - Message Feedback Loop
34 - End-to-End Latency - Message Delivery Time
35 - X-MS-Oob-TLC-OOBClassifiers
36 - X-IP-Spam-Verdict
37 - X-Amp-Result
38 - X-IronPort-RemoteIP
39 - X-IronPort-Reputation
40 - X-SBRS
41 - X-IronPort-SenderGroup
42 - X-Policy
43 - X-IronPort-MailFlowPolicy
44 - X-SEA-Spam
45 - X-FireEye
46 - X-AntiAbuse
47 - X-TMASE-Version
48 - X-TM-AS-Product-Ver
49 - X-TM-AS-Result
50 - X-IMSS-Scan-Details
51 - X-TM-AS-User-Approved-Sender
52 - X-TM-AS-User-Blocked-Sender
53 - X-TMASE-Result
54 - X-TMASE-SNAP-Result
55 - X-IMSS-DKIM-White-List
56 - X-TM-AS-Result-Xfilter
57 - X-TM-AS-SMTP
58 - X-TMASE-SNAP-Result
59 - X-TM-Authentication-Results
60 - X-Scanned-By
61 - X-Mimecast-Spam-Signature
62 - X-Mimecast-Bulk-Signature
63 - X-Forefront-Antispam-Report-Untrusted
64 - X-Microsoft-Antispam-Untrusted
65 - X-Mimecast-Impersonation-Protect
66 - X-Proofpoint-Spam-Details
67 - X-Proofpoint-Virus-Version
68 - SPFCheck
69 - X-Barracuda-Spam-Score
70 - X-Barracuda-Spam-Status
71 - X-Barracuda-Spam-Report
72 - X-Barracuda-Bayes
73 - X-Barracuda-Start-Time
74 - Similar to SpamAssassin Spam Level headers
75 - SMTP Header Contained IP address
76 - Other unrecognized Spam Related Headers
77 - Other interesting headers
78 - Security Appliances Spotted
79 - Email Providers Infrastructure Clues
80 - X-Microsoft-Antispam-Message-Info
81 - Decoded Mail-encoded header values
82 - Header Containing Client IP
```
### Sample run ### Sample run

View File

@ -625,7 +625,7 @@ class SMTPHeadersAnalysis:
Anti_Spam_Rules_ReverseEngineered = \ Anti_Spam_Rules_ReverseEngineered = \
{ {
'35100500006' : logger.colored('(SPAM) Message contained embedded image. Score +4', 'red'), '35100500006' : logger.colored('(SPAM) Message contained embedded image.', 'red'),
# https://docs.microsoft.com/en-us/answers/questions/416100/what-is-meanings-of-39x-microsoft-antispam-mailbox.html # https://docs.microsoft.com/en-us/answers/questions/416100/what-is-meanings-of-39x-microsoft-antispam-mailbox.html
'520007050' : logger.colored('(SPAM) Moved message to Spam and created Email Rule to move messages from this particular sender to Junk.', 'red'), '520007050' : logger.colored('(SPAM) Moved message to Spam and created Email Rule to move messages from this particular sender to Junk.', 'red'),
@ -654,6 +654,27 @@ class SMTPHeadersAnalysis:
# This is a strong signal. Mails without <a> doesnt have this rule. # This is a strong signal. Mails without <a> doesnt have this rule.
'166002' : 'HTML mail body contained URL <a> link.', '166002' : 'HTML mail body contained URL <a> link.',
# Message contained <a href="https://something.com/file.html?parameter=value" - GET parameter with value.
'21615005' : 'Mail body contained <a> tag with URL containing GET parameter: href="https://foo.bar/file?aaa=bbb"',
# Message contained <a href="https://something.com/file.html?parameter=https://another.com/website"
# - GET parameter with value, being a URL to another website
'45080400002' : 'Mail body contained <a> tag with URL containing GET parameter with value of another URL: href="https://foo.bar/file?aaa=https://baz.xyz/"',
# Message contained <a> with href pointing to a file with dangerous extension, such as file.exe
'460985005' : 'Mail body contained HTML <a> tag with href URL pointing to a file with dangerous extension (such as .exe)',
#
# Message1: GoPhish -> VPS 587/tcp redirector -> smtp.gmail.com:587 -> target
# Message2: GoPhish -> VPS 587/tcp redirector -> smtp-relay.gmail.com:587 -> target
#
# These were the only differences I spotted:
# Message1 - FirstHop Gmail SMTP Received with ESMTPS.
# Message2 - FirstHop Gmail SMTP-Relay Received with ESMTPSA.
#
'121216002' : 'First Hop MTA SMTP Server used as a SMTP Relay. It used to originate e-mails, but here it acted as a Relay. Or it\'s due to use of "with ESMTPSA" instead of ESMTPS',
} }
ForeFront_Spam_Confidence_Levels = { ForeFront_Spam_Confidence_Levels = {
@ -747,6 +768,22 @@ class SMTPHeadersAnalysis:
} }
), ),
'ucf' : (
'User Custom Flow (?) - custom mail flow rule applied on message?',
{
'0' : 'No user custom mail rule applied.',
'1' : 'User custom mail rule applied.',
}
),
'jmr' : (
'Junk Mail Rule (?) - mail considered as Spam by previous, existing mail rules?',
{
'0' : logger.colored('Mail is not a Junk', 'green'),
'1' : logger.colored('Mail is a Junk', 'red'),
}
),
'OFR' : ( 'OFR' : (
'Folder Rules applied to this Message', 'Folder Rules applied to this Message',
{ {
@ -973,19 +1010,119 @@ class SMTPHeadersAnalysis:
Verstring('Exchange Server 2019 CU3', 'March 2, 2021', '15.2.464.15'), Verstring('Exchange Server 2019 CU3', 'March 2, 2021', '15.2.464.15'),
) )
def __init__(self, logger, resolve = False, decode_all = False, testsToRun = []):
def __init__(self, logger, resolve, decode_all):
self.text = '' self.text = ''
self.results = {} self.results = {}
self.resolve = resolve self.resolve = resolve
self.decode_all = decode_all self.decode_all = decode_all
self.logger = logger self.logger = logger
self.received_path = None self.received_path = []
self.testsToRun = testsToRun
self.securityAppliances = set() self.securityAppliances = set()
# (number, header, value) # (number, header, value)
self.headers = [] self.headers = []
def getAllTests(self):
tests = (
( '1', 'Received - Mail Servers Flow', self.testReceived),
( '2', 'Extracted IP addresses', self.testExtractIP),
( '3', 'Extracted Domains', self.testResolveIntoIP),
( '4', 'Bad Keywords In Headers', self.testBadKeywords),
( '5', 'From Address Analysis', self.testFrom),
( '6', 'Subject and Thread Topic Difference', self.testSubjecThreadTopic),
( '7', 'Authentication-Results', self.testAuthenticationResults),
( '8', 'ARC-Authentication-Results', self.testARCAuthenticationResults),
( '9', 'Received-SPF', self.testReceivedSPF),
('10', 'Mail Client Version', self.testXMailer),
('11', 'User-Agent Version', self.testUserAgent),
('12', 'X-Forefront-Antispam-Report', self.testForefrontAntiSpamReport),
('13', 'X-MS-Exchange-Organization-SCL', self.testForefrontAntiSCL),
('14', 'X-Microsoft-Antispam-Mailbox-Delivery', self.testAntispamMailboxDelivery),
('15', 'X-Microsoft-Antispam Bulk Mail', self.testMicrosoftAntiSpam),
('16', 'X-Exchange-Antispam-Report-CFA-Test', self.testAntispamReportCFA),
('17', 'Domain Impersonation', self.testDomainImpersonation),
('18', 'SpamAssassin Spam Status', self.testSpamAssassinSpamStatus),
('19', 'SpamAssassin Spam Level', self.testSpamAssassinSpamLevel),
('20', 'SpamAssassin Spam Flag', self.testSpamAssassinSpamFlag),
('21', 'SpamAssassin Spam Report', self.testSpamAssassinSpamReport),
('22', 'OVH\'s X-VR-SPAMCAUSE', self.testSpamCause),
('23', 'OVH\'s X-Ovh-Spam-Reason', self.testOvhSpamReason),
('24', 'OVH\'s X-Ovh-Spam-Score', self.testOvhSpamScore),
('25', 'X-Virus-Scan', self.testXVirusScan),
('26', 'X-Spam-Checker-Version', self.testXSpamCheckerVersion),
('27', 'X-IronPort-AV', self.testXIronPortAV),
('28', 'X-IronPort-Anti-Spam-Filtered', self.testXIronPortSpamFiltered),
('29', 'X-IronPort-Anti-Spam-Result', self.testXIronPortSpamResult),
('30', 'X-Mimecast-Spam-Score', self.testXMimecastSpamScore),
('31', 'Spam Diagnostics Metadata', self.testSpamDiagnosticMetadata),
('32', 'MS Defender ATP Message Properties', self.testATPMessageProperties),
('33', 'Message Feedback Loop', self.testMSFBL),
('34', 'End-to-End Latency - Message Delivery Time', self.testTransportEndToEndLatency),
('35', 'X-MS-Oob-TLC-OOBClassifiers', self.testTLCOObClasifiers),
('36', 'X-IP-Spam-Verdict', self.testXIPSpamVerdict),
('37', 'X-Amp-Result', self.testXAmpResult),
('38', 'X-IronPort-RemoteIP', self.testXIronPortRemoteIP),
('39', 'X-IronPort-Reputation', self.testXIronPortReputation),
('40', 'X-SBRS', self.testXSBRS),
('41', 'X-IronPort-SenderGroup', self.testXIronPortSenderGroup),
('42', 'X-Policy', self.testXPolicy),
('43', 'X-IronPort-MailFlowPolicy', self.testXIronPortMailFlowPolicy),
('44', 'X-SEA-Spam', self.testXSeaSpam),
('45', 'X-FireEye', self.testXFireEye),
('46', 'X-AntiAbuse', self.testXAntiAbuse),
('47', 'X-TMASE-Version', self.testXTMVersion),
('48', 'X-TM-AS-Product-Ver', self.testXTMProductVer),
('49', 'X-TM-AS-Result', self.testXTMResult),
('50', 'X-IMSS-Scan-Details', self.testXTMScanDetails),
('51', 'X-TM-AS-User-Approved-Sender', self.testXTMApprSender),
('52', 'X-TM-AS-User-Blocked-Sender', self.testXTMBlockSender),
('53', 'X-TMASE-Result', self.testXTMASEResult),
('54', 'X-TMASE-SNAP-Result', self.testXTMSnapResult),
('55', 'X-IMSS-DKIM-White-List', self.testXTMDKIM),
('56', 'X-TM-AS-Result-Xfilter', self.testXTMXFilter),
('57', 'X-TM-AS-SMTP', self.testXTMASSMTP),
('58', 'X-TMASE-SNAP-Result', self.testXTMASESNAP),
('59', 'X-TM-Authentication-Results', self.testXTMAuthenticationResults),
('60', 'X-Scanned-By', self.testXScannedBy),
('61', 'X-Mimecast-Spam-Signature', self.testXMimecastSpamSignature),
('62', 'X-Mimecast-Bulk-Signature', self.testXMimecastBulkSignature),
('63', 'X-Forefront-Antispam-Report-Untrusted', self.testForefrontAntiSpamReportUntrusted),
('64', 'X-Microsoft-Antispam-Untrusted', self.testForefrontAntiSpamUntrusted),
('65', 'X-Mimecast-Impersonation-Protect', self.testMimecastImpersonationProtect),
('66', 'X-Proofpoint-Spam-Details', self.testXProofpointSpamDetails),
('67', 'X-Proofpoint-Virus-Version', self.testXProofpointVirusVersion),
('68', 'SPFCheck', self.testSPFCheck),
('69', 'X-Barracuda-Spam-Score', self.testXBarracudaSpamScore),
('70', 'X-Barracuda-Spam-Status', self.testXBarracudaSpamStatus),
('71', 'X-Barracuda-Spam-Report', self.testXBarracudaSpamReport),
('72', 'X-Barracuda-Bayes', self.testXBarracudaBayes),
('73', 'X-Barracuda-Start-Time', self.testXBarracudaStartTime),
#
# These tests shall be the last ones.
#
('74', 'Similar to SpamAssassin Spam Level headers', self.testSpamAssassinSpamAlikeLevels),
('75', 'SMTP Header Contained IP address', self.testMessageHeaderContainedIP),
('76', 'Other unrecognized Spam Related Headers', self.testSpamRelatedHeaders),
('77', 'Other interesting headers', self.testInterestingHeaders),
('78', 'Security Appliances Spotted', self.testSecurityAppliances),
('79', 'Email Providers Infrastructure Clues', self.testEmailIntelligence),
)
testsDecodeAll = (
('80', 'X-Microsoft-Antispam-Message-Info', self.testMicrosoftAntiSpamMessageInfo),
('81', 'Decoded Mail-encoded header values', self.testDecodeEncodedHeaders),
)
testsReturningArray = (
('82', 'Header Containing Client IP', self.testAnyOtherIP),
)
return (tests, testsDecodeAll, testsReturningArray)
@staticmethod @staticmethod
def safeBase64Decode(value): def safeBase64Decode(value):
enc = False enc = False
@ -1173,114 +1310,22 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
self.headers = self.collect(text) self.headers = self.collect(text)
tests = ( (tests, testsDecodeAll, testsReturningArray) = self.getAllTests()
('Received - Mail Servers Flow', self.testReceived),
('Extracted IP addresses', self.testExtractIP),
('Extracted Domains', self.testResolveIntoIP),
('Bad Keywords In Headers', self.testBadKeywords),
('From Address Analysis', self.testFrom),
('Subject and Thread Topic Difference', self.testSubjecThreadTopic),
('Authentication-Results', self.testAuthenticationResults),
('ARC-Authentication-Results', self.testARCAuthenticationResults),
('Received-SPF', self.testReceivedSPF),
('Mail Client Version', self.testXMailer),
('User-Agent Version', self.testUserAgent),
('X-Forefront-Antispam-Report', self.testForefrontAntiSpamReport),
('X-Microsoft-Antispam-Mailbox-Delivery', self.testAntispamMailboxDelivery),
('X-Microsoft-Antispam Bulk Mail', self.testMicrosoftAntiSpam),
('X-Exchange-Antispam-Report-CFA-Test', self.testAntispamReportCFA),
('Domain Impersonation', self.testDomainImpersonation),
('SpamAssassin Spam Status', self.testSpamAssassinSpamStatus),
('SpamAssassin Spam Level', self.testSpamAssassinSpamLevel),
('SpamAssassin Spam Flag', self.testSpamAssassinSpamFlag),
('SpamAssassin Spam Report', self.testSpamAssassinSpamReport),
('OVH\'s X-VR-SPAMCAUSE', self.testSpamCause),
('OVH\'s X-Ovh-Spam-Reason', self.testOvhSpamReason),
('OVH\'s X-Ovh-Spam-Score', self.testOvhSpamScore),
('X-Virus-Scan', self.testXVirusScan),
('X-Spam-Checker-Version', self.testXSpamCheckerVersion),
('X-IronPort-AV', self.testXIronPortAV),
('X-IronPort-Anti-Spam-Filtered', self.testXIronPortSpamFiltered),
('X-IronPort-Anti-Spam-Result', self.testXIronPortSpamResult),
('X-Mimecast-Spam-Score', self.testXMimecastSpamScore),
('Spam Diagnostics Metadata', self.testSpamDiagnosticMetadata),
('MS Defender ATP Message Properties', self.testATPMessageProperties),
('Message Feedback Loop', self.testMSFBL),
('End-to-End Latency - Message Delivery Time', self.testTransportEndToEndLatency),
('X-MS-Oob-TLC-OOBClassifiers', self.testTLCOObClasifiers),
('X-IP-Spam-Verdict', self.testXIPSpamVerdict),
('X-Amp-Result', self.testXAmpResult),
('X-IronPort-RemoteIP', self.testXIronPortRemoteIP),
('X-IronPort-Reputation', self.testXIronPortReputation),
('X-SBRS', self.testXSBRS),
('X-IronPort-SenderGroup', self.testXIronPortSenderGroup),
('X-Policy', self.testXPolicy),
('X-IronPort-MailFlowPolicy', self.testXIronPortMailFlowPolicy),
('X-SEA-Spam', self.testXSeaSpam),
('X-FireEye', self.testXFireEye),
('X-AntiAbuse', self.testXAntiAbuse),
('X-TMASE-Version', self.testXTMVersion),
('X-TM-AS-Product-Ver', self.testXTMProductVer),
('X-TM-AS-Result', self.testXTMResult),
('X-IMSS-Scan-Details', self.testXTMScanDetails),
('X-TM-AS-User-Approved-Sender', self.testXTMApprSender),
('X-TM-AS-User-Blocked-Sender', self.testXTMBlockSender),
('X-TMASE-Result', self.testXTMASEResult),
('X-TMASE-SNAP-Result', self.testXTMSnapResult),
('X-IMSS-DKIM-White-List', self.testXTMDKIM),
('X-TM-AS-Result-Xfilter', self.testXTMXFilter),
('X-TM-AS-SMTP', self.testXTMASSMTP),
('X-TMASE-SNAP-Result', self.testXTMASESNAP),
('X-TM-Authentication-Results', self.testXTMAuthenticationResults),
('X-Scanned-By', self.testXScannedBy),
('X-Mimecast-Spam-Signature', self.testXMimecastSpamSignature),
('X-Mimecast-Bulk-Signature', self.testXMimecastBulkSignature),
('X-Forefront-Antispam-Report-Untrusted', self.testForefrontAntiSpamReportUntrusted),
('X-Microsoft-Antispam-Untrusted', self.testForefrontAntiSpamUntrusted),
('X-Mimecast-Impersonation-Protect', self.testMimecastImpersonationProtect),
('X-Proofpoint-Spam-Details', self.testXProofpointSpamDetails),
('X-Proofpoint-Virus-Version', self.testXProofpointVirusVersion),
('SPFCheck', self.testSPFCheck),
('X-Barracuda-Spam-Score', self.testXBarracudaSpamScore),
('X-Barracuda-Spam-Status', self.testXBarracudaSpamStatus),
('X-Barracuda-Spam-Report', self.testXBarracudaSpamReport),
('X-Barracuda-Bayes', self.testXBarracudaBayes),
('X-Barracuda-Start-Time', self.testXBarracudaStartTime),
#
# These tests shall be the last ones.
#
('Similar to SpamAssassin Spam Level headers', self.testSpamAssassinSpamAlikeLevels),
('SMTP Header Contained IP address', self.testMessageHeaderContainedIP),
('Other unrecognized Spam Related Headers', self.testSpamRelatedHeaders),
('Other interesting headers', self.testInterestingHeaders),
('Security Appliances Spotted', self.testSecurityAppliances),
('Email Providers Infrastructure Clues', self.testEmailIntelligence),
)
for i in range(len(SMTPHeadersAnalysis.Handled_Spam_Headers)):
SMTPHeadersAnalysis.Handled_Spam_Headers[i] = SMTPHeadersAnalysis.Handled_Spam_Headers[i].lower()
testsDecodeAll = (
('X-Microsoft-Antispam-Message-Info', self.testMicrosoftAntiSpamMessageInfo),
('Decoded Mail-encoded header values', self.testDecodeEncodedHeaders),
)
testsReturningArray = (
('Header Containing Client IP', self.testAnyOtherIP),
)
testsConducted = 0 testsConducted = 0
for testName, testFunc in tests: for testId, testName, testFunc in tests:
try: try:
if len(self.testsToRun) > 0 and int(testId) not in self.testsToRun:
self.logger.dbg(f'Skipping test {testId} {testName}')
continue
testsConducted += 1 testsConducted += 1
self.logger.dbg(f'Running "{testName}"...') self.logger.dbg(f'Running test {testId}: "{testName}"...')
self.results[testName] = testFunc() self.results[testName] = testFunc()
except Exception as e: except Exception as e:
self.logger.err(f'Test: "{testName}" failed: {e} . Use --debug to show entire stack trace.') self.logger.err(f'Test {testId}: "{testName}" failed: {e} . Use --debug to show entire stack trace.')
self.results[testName] = { self.results[testName] = {
'header' : '', 'header' : '',
@ -1292,14 +1337,18 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
raise raise
if self.decode_all: if self.decode_all:
for testName, testFunc in testsDecodeAll: for testId, testName, testFunc in testsDecodeAll:
try: try:
if len(self.testsToRun) > 0 and int(testId) not in self.testsToRun:
self.logger.dbg(f'Skipping test {testId} {testName}')
continue
testsConducted += 1 testsConducted += 1
self.logger.dbg(f'Running "{testName}"...') self.logger.dbg(f'Running test {testId}: "{testName}"...')
self.results[testName] = testFunc() self.results[testName] = testFunc()
except Exception as e: except Exception as e:
self.logger.err(f'Test: "{testName}" failed: {e} . Use --debug to show entire stack trace.') self.logger.err(f'Test {testId}: "{testName}" failed: {e} . Use --debug to show entire stack trace.')
self.results[testName] = { self.results[testName] = {
'header' : '', 'header' : '',
@ -1310,10 +1359,14 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
if options['debug']: if options['debug']:
raise raise
for testName, testFunc in testsReturningArray: for testId, testName, testFunc in testsReturningArray:
try: try:
if len(self.testsToRun) > 0 and int(testId) not in self.testsToRun:
self.logger.dbg(f'Skipping test {testId} {testName}')
continue
testsConducted += 1 testsConducted += 1
self.logger.dbg(f'Running "{testName}"...') self.logger.dbg(f'Running test {testId}: "{testName}"...')
outs = testFunc() outs = testFunc()
num = 0 num = 0
@ -1322,7 +1375,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
self.results[testName + ' ' + str(num)] = o self.results[testName + ' ' + str(num)] = o
except Exception as e: except Exception as e:
self.logger.err(f'Test: "{testName}" failed: {e} . Use --debug to show entire stack trace.') self.logger.err(f'Test {testId}: "{testName}" failed: {e} . Use --debug to show entire stack trace.')
self.results[testName] = { self.results[testName] = {
'header' : '', 'header' : '',
@ -2933,7 +2986,7 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
self.securityAppliances.add('SpamAssassin') self.securityAppliances.add('SpamAssassin')
result = '- SpamAssassin spam report\n\n' result = '- SpamAssassin spam report\n\n'
return self._parseSpamAssassinStatus(result, '', num, header, value) return self._parseSpamAssassinStatus(result, '', num, header, value, SMTPHeadersAnalysis.Barracuda_Score_Thresholds)
def _parseSpamAssassinStatus(self, topic, description, num, header, value, thresholds): def _parseSpamAssassinStatus(self, topic, description, num, header, value, thresholds):
parsed = {} parsed = {}
@ -3048,82 +3101,83 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
m = re.search(r'<([^<@\s]+)@([^\s]+)>', value) m = re.search(r'<([^<@\s]+)@([^\s]+)>', value)
domain = '' domain = ''
if m and len(self.received_path) > 2: if m and len(self.received_path) < 3:
username = m.group(1) return []
domain = m.group(2)
email = f'{username}@{domain}'
firstHop = self.received_path[1] username = m.group(1)
domain = m.group(2)
email = f'{username}@{domain}'
mailDomainAddr = '' firstHop = self.received_path[1]
revMailDomain = ''
revFirstSenderDomain = ''
firstSenderAddr = '' mailDomainAddr = ''
revFirstSenderDomain revMailDomain = ''
revFirstSenderDomain = firstHop['host2']
firstSenderAddr = ''
try: try:
mailDomainAddr = socket.gethostbyname(domain) mailDomainAddr = socket.gethostbyname(domain)
revMailDomain = socket.gethostbyaddr(mailDomainAddr)[0] revMailDomain = socket.gethostbyaddr(mailDomainAddr)[0]
if(len(firstHop['ip'])) > 0: if(len(firstHop['ip'])) > 0 and len(revFirstSenderDomain) == 0:
revFirstSenderDomain = socket.gethostbyaddr(firstHop['ip'])[0] revFirstSenderDomain = socket.gethostbyaddr(firstHop['ip'])[0]
if(len(firstHop['host'])) > 0: if(len(firstHop['host'])) > 0:
firstSenderAddr = socket.gethostbyname(firstHop['host']) firstSenderAddr = socket.gethostbyname(firstHop['host'])
if len(revFirstSenderDomain) == 0:
revFirstSenderDomain = socket.gethostbyaddr(firstSenderAddr)[0] revFirstSenderDomain = socket.gethostbyaddr(firstSenderAddr)[0]
except: except:
pass pass
senderDomain = SMTPHeadersAnalysis.extractDomain(revMailDomain) senderDomain = SMTPHeadersAnalysis.extractDomain(revMailDomain)
firstHopDomain1 = SMTPHeadersAnalysis.extractDomain(revFirstSenderDomain) firstHopDomain1 = SMTPHeadersAnalysis.extractDomain(revFirstSenderDomain)
if len(senderDomain) == 0: senderDomain = domain if len(senderDomain) == 0: senderDomain = domain
if len(firstHopDomain1) == 0: firstHopDomain1 = firstHop["host"] if len(firstHopDomain1) == 0: firstHopDomain1 = firstHop["host"]
result += f'\t- Mail From: <{email}>\n\n' result += f'\t- Mail From: <{email}>\n\n'
result += f'\t- Mail Domain: {domain}\n' result += f'\t- Mail Domain: {domain}\n'
result += f'\t --> resolves to: {mailDomainAddr}\n' result += f'\t --> resolves to: {mailDomainAddr}\n'
result += f'\t --> reverse-DNS resolves to: {revMailDomain}\n' result += f'\t --> reverse-DNS resolves to: {revMailDomain}\n'
result += f'\t (sender\'s domain: {self.logger.colored(senderDomain, "cyan")})\n\n' result += f'\t (sender\'s domain: {self.logger.colored(senderDomain, "cyan")})\n\n'
result += f'\t- First Hop: {firstHop["host"]} ({firstHop["ip"]})\n' result += f'\t- First Hop: {firstHop["host"]} ({firstHop["ip"]})\n'
result += f'\t --> resolves to: {firstSenderAddr}\n' result += f'\t --> resolves to: {firstSenderAddr}\n'
result += f'\t --> reverse-DNS resolves to: {revFirstSenderDomain}\n' result += f'\t --> reverse-DNS resolves to: {revFirstSenderDomain}\n'
result += f'\t (first hop\'s domain: {self.logger.colored(firstHopDomain1, "cyan")})\n\n' result += f'\t (first hop\'s domain: {self.logger.colored(firstHopDomain1, "cyan")})\n\n'
if firstHopDomain1.lower() != senderDomain.lower(): if firstHopDomain1.lower() != senderDomain.lower():
response = None response = None
try: try:
if domain.endswith('.'): domain = domain[:-1] if domain.endswith('.'): domain = domain[:-1]
response = dns.resolver.resolve(domain, 'TXT') response = dns.resolver.resolve(domain, 'TXT')
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
response = [] response = []
spf = False spf = False
for answer in response: for answer in response:
txt = str(answer) txt = str(answer)
if 'v=spf' in txt: if 'v=spf' in txt:
result += f'- Domain SPF: {txt[:64]}\n' result += f'- Domain SPF: {txt[:64]}\n'
for _domain in re.findall(r'([a-z0-9_\.-]+\.[a-z]{2,})', txt): for _domain in re.findall(r'([a-z0-9_\.-]+\.[a-z]{2,})', txt):
_domain1 = SMTPHeadersAnalysis.extractDomain(_domain) _domain1 = SMTPHeadersAnalysis.extractDomain(_domain)
if _domain1.lower() == firstHopDomain1: if _domain1.lower() == firstHopDomain1:
result += self.logger.colored(f'\n\t- [+] First Hop ({firstHopDomain1}) is authorized to send e-mails on behalf of ({domain}) due to SPF records.\n', 'yellow') result += self.logger.colored(f'\n\t- [+] First Hop ({firstHopDomain1}) is authorized to send e-mails on behalf of ({domain}) due to SPF records.\n', 'yellow')
result += '\t- So I\'m not sure if there was Domain Impersonation or not, but my best guess is negative.\n' result += '\t- So I\'m not sure if there was Domain Impersonation or not, but my best guess is negative.\n'
spf = True spf = True
break break
if spf: if spf:
break break
if not spf: if not spf:
result += self.logger.colored('\n- WARNING! Potential Domain Impersonation!\n', 'red') result += self.logger.colored('\n- WARNING! Potential Domain Impersonation!\n', 'red')
result += f'\t- Mail\'s domain should resolve to: \t{self.logger.colored(senderDomain, "green")}\n' result += f'\t- Mail\'s domain should resolve to: \t{self.logger.colored(senderDomain, "green")}\n'
result += f'\t- But instead first hop resolved to:\t{self.logger.colored(firstHopDomain1, "red")}\n' result += f'\t- But instead first hop resolved to:\t{self.logger.colored(firstHopDomain1, "red")}\n'
return { return {
'header' : header, 'header' : header,
@ -3386,8 +3440,19 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
pass pass
if len(obj['host2']) > 0: if len(obj['host2']) > 0:
if obj['ip'] == None or len(obj['ip']) == 0: match = re.match(r'\[?(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]?', obj['host2'])
obj['ip'] = obj['host2'] if obj['ip'] == None or len(obj['ip']) == 0 and match:
obj['ip'] = match.group(1)
obj['host2'] = ''
if len(obj['host2']) == 0:
if obj['ip'] != None and len(obj['ip']) > 0:
try:
res = socket.gethostbyaddr(obj['ip'])
if len(res) > 0:
obj['host2'] = res[0]
except:
obj['host2'] = self.logger.colored('NXDomain', 'red')
if extrapos == 0: if extrapos == 0:
a = received.find(obj['host']) + len(obj['host']) a = received.find(obj['host']) + len(obj['host'])
@ -3543,14 +3608,10 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
else: else:
result += iindent + indent * (num+1) + f'{s} ({num}) {self.logger.colored(elem["host"], "yellow")}' result += iindent + indent * (num+1) + f'{s} ({num}) {self.logger.colored(elem["host"], "yellow")}'
if len(elem['host2']) > 0:
if elem['host2'].endswith('.'):
elem['host2'] = self.logger.colored(elem['host2'][:-1], 'yellow')
if elem['host2'] != elem['host'] and elem['host2'] != elem['ip']:
result += f' (rev: {self.logger.colored(elem["host2"], "yellow")})'
if elem['ip'] != None and len(elem['ip']) > 0: if elem['ip'] != None and len(elem['ip']) > 0:
if elem['ip'][0] == '[' and elem['ip'][-1] == ']':
elem['ip'] = elem['ip'][1:-1]
if num == 2: if num == 2:
result += f' ({self.logger.colored(elem["ip"], "green")})\n' result += f' ({self.logger.colored(elem["ip"], "green")})\n'
else: else:
@ -3558,6 +3619,14 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
else: else:
result += '\n' result += '\n'
if len(elem['host2']) > 0:
if elem['host2'].endswith('.'):
elem['host2'] = self.logger.colored(elem['host2'][:-1], 'yellow')
if elem['host2'] != elem['host'] and elem['host2'] != elem['ip']:
#result += f' (rev: {self.logger.colored(elem["host2"], "yellow")})'
result += iindent + indent * (num+3) + 'rev-DNS: ' + self.logger.colored(elem["host2"], "yellow") + '\n'
if elem['timestamp'] != None: if elem['timestamp'] != None:
result += iindent + indent * (num+3) + 'time: ' + elem['timestamp'] + '\n' result += iindent + indent * (num+3) + 'time: ' + elem['timestamp'] + '\n'
@ -3588,6 +3657,9 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
self.received_path = path self.received_path = path
if '1' not in self.testsToRun:
return []
return { return {
'header' : 'Received', 'header' : 'Received',
'value': '...', 'value': '...',
@ -3626,6 +3698,72 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
self.securityAppliances.add('MS ForeFront Anti-Spam') self.securityAppliances.add('MS ForeFront Anti-Spam')
return self._parseBulk(num, header, value) return self._parseBulk(num, header, value)
def testForefrontAntiSCL(self):
(num, header, value) = self.getHeader('X-MS-Exchange-Organization-SCL')
if num == -1: return []
tmp = self._parseSCLBased(value.strip(), 'SCL', 'Spam Confidence Level', 'spam', SMTPHeadersAnalysis.ForeFront_Spam_Confidence_Levels)
if len(tmp) == 0:
return []
result = tmp + '\n'
return {
'header' : header,
'value': value,
'analysis' : result,
'description' : '',
}
def _parseSCLBased(self, score, key, topic, listname, listelems):
addscl = False
tmpfoo = ''
result = ''
v = (topic, listname, listelems)
k = key
addscl = True
scl = int(score)
k0 = self.logger.colored(k, 'magenta')
tmpfoo += f'- {k0}: {v[0]}: ' + str(scl) + '\n'
levels = list(v[2].keys())
levels.sort()
if scl in levels:
s = v[2][scl]
f = self.logger.colored(f'Not {v[1]}', 'green')
if s[0]:
f = self.logger.colored(v[1].upper(), 'red')
tmpfoo += f'\t- {f}: {s[1]}\n'
else:
for i in range(len(levels)):
if scl <= levels[i] and i > 0:
s = v[2][levels[i-1]]
f = self.logger.colored(f'Not {v[1]}', 'green')
if s[0]:
f = self.logger.colored(v[1].upper(), 'red')
tmpfoo += f'\t- {f}: {s[1]}\n'
break
elif scl <= levels[0]:
s = v[2][levels[0]]
f = self.logger.colored(f'Not {v[1]}', 'green')
if s[0]:
f = self.logger.colored(v[1].upper(), 'red')
tmpfoo += f'\t- {f}: {s[1]}\n'
break
if addscl:
result += tmpfoo
return result
def _parseBulk(self, num, header, value): def _parseBulk(self, num, header, value):
parsed = {} parsed = {}
result = '' result = ''
@ -3740,6 +3878,7 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
if found: if found:
result += tmp + '\n' result += tmp + '\n'
usedRE = False
for k in ['SFS', 'RULEID', 'ENG']: for k in ['SFS', 'RULEID', 'ENG']:
if k in parsed.keys(): if k in parsed.keys():
res = '' res = ''
@ -3749,20 +3888,27 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
rules = [] rules = []
k0 = self.logger.colored(k, 'magenta') k0 = self.logger.colored(k, 'magenta')
tmp = f'- Message matched {len(rules)} Anti-Spam rules ({k0}):\n' tmp = f'- Message matched {self.logger.colored(str(len(rules)), "yellow")} Anti-Spam rules ({k0}):\n'
rules.sort() rules.sort()
for r in rules: for r in rules:
if len(r) == 0: continue if len(r) == 0: continue
r2 = f'({r})'
if r in SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered.keys(): if r in SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered.keys():
e = SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered[r] e = SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered[r]
tmp += f'\t- ({r}) - {e}\n' tmp += f'\t- {r2: <15} - {e}\n'
usedRE = True
else: else:
tmp += f'\t- ({r})\n' tmp += f'\t- {r2}\n'
result += tmp + '\n' result += tmp + '\n'
if usedRE:
result += '\tNOTICE:\n'
result += '\t(Anti-Spam rule explanation can only be considered as a clue, hint rather than a definitive explanation.)\n'
result += '\t(Rules meaning was established merely in a trial-and-error process by observing SMTP header differences.)\n\n'
sclpcl = { sclpcl = {
'SCL' : ('Spam Confidence Level', 'spam', SMTPHeadersAnalysis.ForeFront_Spam_Confidence_Levels), 'SCL' : ('Spam Confidence Level', 'spam', SMTPHeadersAnalysis.ForeFront_Spam_Confidence_Levels),
'PCL' : ('Phishing Confidence Level', 'phishing', SMTPHeadersAnalysis.ForeFront_Phishing_Confidence_Levels), 'PCL' : ('Phishing Confidence Level', 'phishing', SMTPHeadersAnalysis.ForeFront_Phishing_Confidence_Levels),
@ -3974,20 +4120,29 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
rules = [] rules = []
k0 = self.logger.colored(k, 'magenta') k0 = self.logger.colored(k, 'magenta')
tmp = f'- Message matched {len(rules)} Anti-Spam Delivery rules ({k0}):\n' tmp = f'- Message matched {self.logger.colored(len(rules), "yellow")} Anti-Spam Delivery rules ({k0}):\n'
rules.sort() rules.sort()
usedRE = False
for r in rules: for r in rules:
if len(r) == 0: continue if len(r) == 0: continue
r2 = f'({r})'
if r in SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered.keys(): if r in SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered.keys():
e = SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered[r] e = SMTPHeadersAnalysis.Anti_Spam_Rules_ReverseEngineered[r]
tmp += f'\t- ({r}) - {e}\n' tmp += f'\t- {r2: <15} - {e}\n'
usedRE = True
else: else:
tmp += f'\t- ({r})\n' tmp += f'\t- {r2}\n'
result += tmp + '\n' result += tmp + '\n'
if usedRE:
result += '\tNOTICE:\n'
result += '\t(Anti-Spam rule explanation can only be considered as a clue, hint rather than a definitive explanation.)\n'
result += '\t(Rules meaning was established merely in a trial-and-error process by observing SMTP header differences.)\n\n'
return { return {
'header' : header, 'header' : header,
'value': value, 'value': value,
@ -4349,11 +4504,11 @@ def opts(argv):
global logger global logger
o = argparse.ArgumentParser( o = argparse.ArgumentParser(
usage = 'decode-spam-headers.py [options] <file>' usage = 'decode-spam-headers.py [options] <file | --list tests>'
) )
req = o.add_argument_group('Required arguments') req = o.add_argument_group('Required arguments')
req.add_argument('infile', help = 'Input file to be analysed') req.add_argument('infile', help = 'Input file to be analysed or --list tests to show available tests.')
opt = o.add_argument_group('Options') opt = o.add_argument_group('Options')
opt.add_argument('-o', '--outfile', default='', type=str, help = 'Output file with report') opt.add_argument('-o', '--outfile', default='', type=str, help = 'Output file with report')
@ -4363,6 +4518,9 @@ def opts(argv):
opt.add_argument('-d', '--debug', default=False, action='store_true', help='Debug mode.') opt.add_argument('-d', '--debug', default=False, action='store_true', help='Debug mode.')
tst = o.add_argument_group('Tests') tst = o.add_argument_group('Tests')
opt.add_argument('-l', '--list', default=False, action='store_true', help='List available tests and quit. Use it like so: --list tests')
tst.add_argument('-i', '--include-tests', default='', metavar='tests', help='Comma-separated list of test IDs to run. Ex. --include-tests 1,3,7')
tst.add_argument('-e', '--exclude-tests', default='', metavar='tests', help='Comma-separated list of test IDs to skip. Ex. --exclude-tests 1,3,7')
tst.add_argument('-r', '--resolve', default=False, action='store_true', help='Resolve IPv4 addresses / Domain names.') tst.add_argument('-r', '--resolve', default=False, action='store_true', help='Resolve IPv4 addresses / Domain names.')
tst.add_argument('-a', '--decode-all', default=False, action='store_true', help='Decode all =?us-ascii?Q? mail encoded messages and print their contents.') tst.add_argument('-a', '--decode-all', default=False, action='store_true', help='Decode all =?us-ascii?Q? mail encoded messages and print their contents.')
@ -4446,7 +4604,26 @@ def printOutput(out):
def main(argv): def main(argv):
args = opts(argv) args = opts(argv)
if not args: if not args:
return False return Falsex
if args.list:
print('[.] Available tests:\n')
print('\tTEST_ID - TEST_NAME')
print('\t--------------------------------------')
an = SMTPHeadersAnalysis(logger)
(a, b, c) = an.getAllTests()
tests = a+b+c
for test in tests:
(testId, testName, testFunc) = test
print(f'\t{testId: >7} - {testName}')
print('\n')
return True
logger.info('Analysing: ' + args.infile) logger.info('Analysing: ' + args.infile)
@ -4454,7 +4631,32 @@ def main(argv):
with open(args.infile) as f: with open(args.infile) as f:
text = f.read() text = f.read()
an = SMTPHeadersAnalysis(logger, args.resolve, args.decode_all) try:
include_tests = []
exclude_tests = []
if len(args.include_tests) > 0: include_tests = [int(x) for x in args.include_tests.split(',')]
if len(args.exclude_tests) > 0: exclude_tests = [int(x) for x in args.exclude_tests.split(',')]
if len(include_tests) > 0 and len(exclude_tests) > 0:
logger.fatal('--include-tests and --exclude-tests options are mutually exclusive!')
except:
raise
logger.fatal('Tests to be included/excluded need to be numbers! Ex. --include-tests 1,5,7')
testsToRun = set()
for i in range(1000):
if len(include_tests) > 0:
if i not in include_tests:
continue
elif len(exclude_tests) > 0:
if i in exclude_tests:
continue
testsToRun.add(i)
an = SMTPHeadersAnalysis(logger, args.resolve, args.decode_all, testsToRun)
out = an.parse(text) out = an.parse(text)
output = printOutput(out) output = printOutput(out)

View File

@ -6,13 +6,169 @@ import argparse
import yaml import yaml
import textwrap import textwrap
import json import json
from urllib import parse
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
options = { options = {
'format' : 'text', 'format' : 'text',
} }
executable_extensions = [
'.exe',
'.dll',
'.lnk',
'.scr',
'.sys',
'.ps1',
'.bat',
'.js',
'.jse',
'.vbs',
'.vba',
'.vbe',
'.wsl',
'.cpl',
]
options = {
'debug': False,
'verbose': False,
'nocolor' : False,
'log' : sys.stderr,
'format' : 'text',
}
class Logger:
colors_map = {
'red': 31,
'green': 32,
'yellow': 33,
'blue': 34,
'magenta': 35,
'cyan': 36,
'white': 37,
'grey': 38,
}
colors_dict = {
'error': colors_map['red'],
'trace': colors_map['magenta'],
'info ': colors_map['green'],
'debug': colors_map['grey'],
'other': colors_map['grey'],
}
options = {}
def __init__(self, opts = None):
self.options.update(Logger.options)
if opts != None and len(opts) > 0:
self.options.update(opts)
@staticmethod
def with_color(c, s):
return "\x1b[%dm%s\x1b[0m" % (c, s)
def colored(self, txt, col):
if self.options['nocolor']:
return txt
return Logger.with_color(Logger.colors_map[col], txt)
# Invocation:
# def out(txt, mode='info ', fd=None, color=None, noprefix=False, newline=True):
@staticmethod
def out(txt, fd, mode='info ', **kwargs):
if txt == None or fd == 'none':
return
elif fd == None:
raise Exception('[ERROR] Logging descriptor has not been specified!')
args = {
'color': None,
'noprefix': False,
'newline': True,
'nocolor' : False
}
args.update(kwargs)
if type(txt) != str:
txt = str(txt)
txt = txt.replace('\t', ' ' * 4)
if args['nocolor']:
col = ''
elif args['color']:
col = args['color']
if type(col) == str and col in Logger.colors_map.keys():
col = Logger.colors_map[col]
else:
col = Logger.colors_dict.setdefault(mode, Logger.colors_map['grey'])
prefix = ''
if mode:
mode = '[%s] ' % mode
if not args['noprefix']:
if args['nocolor']:
prefix = mode.upper()
else:
prefix = Logger.with_color(Logger.colors_dict['other'], '%s'
% (mode.upper()))
nl = ''
if 'newline' in args:
if args['newline']:
nl = '\n'
if 'force_stdout' in args:
fd = sys.stdout
if type(fd) == str:
with open(fd, 'a') as f:
prefix2 = ''
if mode:
prefix2 = '%s' % (mode.upper())
f.write(prefix2 + txt + nl)
f.flush()
else:
if args['nocolor']:
fd.write(prefix + txt + nl)
else:
fd.write(prefix + Logger.with_color(col, txt) + nl)
# Info shall be used as an ordinary logging facility, for every desired output.
def info(self, txt, forced = False, **kwargs):
kwargs['nocolor'] = self.options['nocolor']
if forced or (self.options['verbose'] or \
self.options['debug'] ) \
or (type(self.options['log']) == str and self.options['log'] != 'none'):
Logger.out(txt, self.options['log'], 'info', **kwargs)
def text(self, txt, **kwargs):
kwargs['noPrefix'] = True
kwargs['nocolor'] = self.options['nocolor']
Logger.out(txt, self.options['log'], '', **kwargs)
def dbg(self, txt, **kwargs):
if self.options['debug']:
kwargs['nocolor'] = self.options['nocolor']
Logger.out(txt, self.options['log'], 'debug', **kwargs)
def err(self, txt, **kwargs):
kwargs['nocolor'] = self.options['nocolor']
Logger.out(txt, self.options['log'], 'error', **kwargs)
def fatal(self, txt, **kwargs):
kwargs['nocolor'] = self.options['nocolor']
Logger.out(txt, self.options['log'], 'error', **kwargs)
os._exit(1)
logger = Logger(options)
class PhishingMailParser: class PhishingMailParser:
def __init__(self, options): def __init__(self, options):
self.options = options self.options = options
@ -22,10 +178,14 @@ class PhishingMailParser:
self.html = html self.html = html
self.soup = BeautifulSoup(html, features="lxml") self.soup = BeautifulSoup(html, features="lxml")
self.results['Embedded Images'] = self.testEmbeddedImages() self.results['Embedded Images'] = self.testEmbeddedImages()
self.results['Images without ALT'] = self.testImagesNoAlt() self.results['Images without ALT'] = self.testImagesNoAlt()
self.results['Masqueraded Links'] = self.testMaskedLinks() self.results['Masqueraded Links'] = self.testMaskedLinks()
self.results['Use of underline tag <u>'] = self.testUnderlineTag() self.results['Use of underline tag <u>'] = self.testUnderlineTag()
self.results['HTML code in <a> link tags'] = self.testLinksWithHtmlCode()
self.results['<a href="..."> URL contained GET parameter'] = self.testLinksWithGETParams()
self.results['<a href="..."> URL contained GET parameter with URL'] = self.testLinksWithGETParamsBeingURLs()
self.results['<a href="..."> URL pointed to an executable file'] = self.testLinksWithDangerousExtensions()
return {k: v for k, v in self.results.items() if v} return {k: v for k, v in self.results.items() if v}
@ -52,8 +212,8 @@ class PhishingMailParser:
context = '' context = ''
for i in range(len(links)): for i in range(len(links)):
context += '\t- ' + str(links[i]) + '\n' context += str(links[i]) + '\n\n'
if i > 10: break if i > 5: break
return { return {
'description' : desc, 'description' : desc,
@ -61,6 +221,205 @@ class PhishingMailParser:
'analysis' : result 'analysis' : result
} }
def testLinksWithHtmlCode(self):
links = self.soup('a')
desc = 'Links that contain HTML code within <a> ... </a> may increase Spam score heavily'
context = ''
result = ''
num = 0
embed = ''
for link in links:
text = str(link)
pos = text.find('>')
code = text[pos+1:]
m = re.search(r'(.+)<\s*/\s*a\s*>', code, re.I)
if m:
code = m.group(1)
suspicious = '<' in text and '>' in text
if suspicious:
num += 1
if num < 5:
N = 70
tmp = text[:N]
if len(text) > N:
tmp += ' ... ' + text[-N:]
context += tmp + '\n'
code2 = PhishingMailParser.context(code)
context += f"\n\t- {logger.colored('Code inside of <a> tag:','red')}\n\t\t" + logger.colored(code2, 'yellow') + '\n'
if num > 0:
result += f'- Found {num} <a> tags that contained HTML code inside!\n'
result += '\t Links conveying HTML code within <a> ... </a> may greatly increase message Spam score!\n'
if len(result) == 0:
return []
return {
'description' : desc,
'context' : context,
'analysis' : result
}
def testLinksWithGETParams(self):
links = self.soup('a')
desc = 'Links with URLs containing GET parameters will be noticed by anti-spam filters resulting in another rule triggering on message (Office365: 21615005).'
context = ''
result = ''
num = 0
embed = ''
for link in links:
try:
href = link['href']
except:
continue
text = link.getText()
params = dict(parse.parse_qsl(parse.urlsplit(href).query))
if len(params) > 0:
num += 1
if num < 5:
context += PhishingMailParser.context(link) + '\n'
hr = href[:90]
pos = hr.find('?')
hr = hr[:pos] + logger.colored(hr[pos:], 'yellow')
context += f'\thref = "{hr}"\n'
context += f'\ttext = "{text[:90]}"\n\n'
if num > 0:
result += f'- Found {num} <a> tags with href="..." URLs containing GET params.\n'
result += '\t Links with URLs that contain GET params might trigger anti-spam rule (Office365: 21615005)\n'
if len(result) == 0:
return []
return {
'description' : desc,
'context' : context,
'analysis' : result
}
def testLinksWithDangerousExtensions(self):
links = self.soup('a')
desc = 'Message contained <a> tags with href="..." links pointing to a file with dangerous extension (such as .exe)'
context = ''
result = ''
num = 0
embed = ''
for link in links:
try:
href = link['href']
except:
continue
text = link.getText()
parsed = parse.urlsplit(href)
if '.' not in parsed.path:
continue
pos = parsed.path.rfind('.')
if pos == -1:
continue
extension = parsed.path.lower()[pos:]
if extension in executable_extensions:
num += 1
if num < 5:
context += PhishingMailParser.context(link) + '\n'
hr = href[:90]
pos1 = hr.lower().find(extension.lower())
hr = logger.colored(hr[:pos1], 'yellow') + logger.colored(hr[pos1:pos1+len(extension)], 'red') + logger.colored(hr[pos1+len(extension):], 'yellow')
context += f'\thref = "{hr}"\n'
context += f'\ttext = "{text[:90]}"\n\n'
context += f'\tExtension matched: {logger.colored(extension, "red")}\n'
if num > 0:
result += f'- Found {num} <a> tags with href="..." URLs pointing to files with dangerous extensions (such as .exe).\n'
result += '\t Links with URLs that point to potentially executable files might trigger anti-spam rule (Office365: 460985005)\n'
if len(result) == 0:
return []
return {
'description' : desc,
'context' : context,
'analysis' : result
}
def testLinksWithGETParamsBeingURLs(self):
links = self.soup('a')
desc = 'Links with URLs that contain GET parameters pointing to another URL, will trigger two Office365 anti-spam rules (Office365: 45080400002).'
context = ''
result = ''
num = 0
embed = ''
for link in links:
try:
href = link['href']
except:
continue
text = link.getText()
params = dict(parse.parse_qsl(parse.urlsplit(href).query))
url = re.compile(r'((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*')
if len(params) > 0:
for k, v in params.items():
m = url.match(v)
if m:
urlmatched = m.group(1)
num += 1
if num < 5:
context += PhishingMailParser.context(link) + '\n'
hr = href[:90]
hr = logger.colored(hr, 'yellow')
context += f'\thref = "{hr}"\n'
context += f'\ttext = "{text[:90]}"\n\n'
context += f'\thref URL GET parameter contained another URL:\n\t\t' + logger.colored(v, "red") + '\n'
if num > 0:
result += f'- Found {num} <a> tags with href="..." URLs containing GET params containing another URL.\n'
result += '\t Links with URLs that contain GET params with another URL might trigger anti-spam rule (Office365: 45080400002)\n'
if len(result) == 0:
return []
return {
'description' : desc,
'context' : context,
'analysis' : result
}
def testMaskedLinks(self): def testMaskedLinks(self):
links = self.soup('a') links = self.soup('a')
@ -85,9 +444,11 @@ class PhishingMailParser:
if m1 and m2: if m1 and m2:
num += 1 num += 1
context += '- ' + PhishingMailParser.context(link) + '\n'
context += f'\thref = "{href[:64]}"\n' if num < 5:
context += f'\ttext = "{text[:64]}"\n\n' context += PhishingMailParser.context(link) + '\n'
context += f'\thref = "{logger.colored(href[:90],"green")}"\n'
context += f'\ttext = "{logger.colored(text[:90],"red")}"\n\n'
if num > 0: if num > 0:
result += f'- Found {num} <a> tags that masquerade their href="" links with text!\n' result += f'- Found {num} <a> tags that masquerade their href="" links with text!\n'
@ -122,7 +483,9 @@ class PhishingMailParser:
if alt == '': if alt == '':
num += 1 num += 1
context += '- ' + PhishingMailParser.context(img) + '\n'
if num < 5:
context += PhishingMailParser.context(img) + '\n\n'
if num > 0: if num > 0:
result += f'- Found {num} <img> tags without ALT="value" attribute.\n' result += f'- Found {num} <img> tags without ALT="value" attribute.\n'
@ -160,10 +523,18 @@ class PhishingMailParser:
embed = src[:30] embed = src[:30]
num += 1 num += 1
if len(alt) > 0:
context += f'- ALT="{alt}": ' + PhishingMailParser.context(img) + '\n' if num < 5:
else: if len(alt) > 0:
context += '- ' + PhishingMailParser.context(img) + '\n' context += f'- ALT="{alt}": ' + PhishingMailParser.context(img) + '\n'
else:
ctx = PhishingMailParser.context(img)
pos = ctx.find('data:')
pos2 = ctx.find('"', pos+1)
ctx = logger.colored(ctx[:pos], 'yellow') + logger.colored(ctx[pos:pos2], 'red') + logger.colored(ctx[pos2:], 'yellow')
context += ctx + '\n'
if num > 0: if num > 0:
result += f'- Found {num} <img> tags with embedded image ({embed}).\n' result += f'- Found {num} <img> tags with embedded image ({embed}).\n'
@ -186,28 +557,31 @@ def printOutput(out):
for k, v in out.items(): for k, v in out.items():
num += 1 num += 1
analysis = v['analysis'] analysis = v['analysis'].strip()
context = v['context'] context = v['context'].strip()
desc = '\n'.join(textwrap.wrap( desc = '\n'.join(textwrap.wrap(
v['description'], v['description'],
width = 80, width = 80,
initial_indent = '', initial_indent = '',
subsequent_indent = ' ' subsequent_indent = ' '
)) )).strip()
analysis = analysis.replace('- ', '\t- ') analysis = analysis.replace('- ', '\t- ')
print(f''' print(f'''
------------------------------------------ ------------------------------------------
({num}) Test: {k} ({num}) Test: {logger.colored(k, "cyan")}
{logger.colored("DESCRIPTION", "blue")}:
DESCRIPTION:
{desc} {desc}
CONTEXT: {logger.colored("CONTEXT", "blue")}:
{context} {context}
ANALYSIS: {logger.colored("ANALYSIS", "blue")}:
{analysis} {analysis}
''') ''')
@ -226,6 +600,7 @@ def opts(argv):
req.add_argument('file', help = 'Input HTML file') req.add_argument('file', help = 'Input HTML file')
args = o.parse_args() args = o.parse_args()
options.update(vars(args))
return args return args
def main(argv): def main(argv):
@ -246,7 +621,11 @@ def main(argv):
p = PhishingMailParser({}) p = PhishingMailParser({})
ret = p.parse(html.decode()) ret = p.parse(html.decode())
printOutput(ret) if len(ret) > 0:
printOutput(ret)
else:
print('\n[+] Congrats! Your message does not have any known bad smells that could trigger anti-spam rules.\n')
if __name__ == '__main__': if __name__ == '__main__':