Added 'Unusual SMTP Headers' test

This commit is contained in:
Mariusz B. / mgeeky 2022-09-07 16:03:22 +02:00
parent 08583758b0
commit c244ceb3dd

View File

@ -122,6 +122,7 @@ import argparse
import json import json
import textwrap import textwrap
import socket import socket
import textwrap
import time import time
import atexit import atexit
import base64 import base64
@ -469,7 +470,7 @@ class SMTPHeadersAnalysis:
'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', '-sea-', 'perlmx', 'trustwave', 'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', '-sea-', 'perlmx', 'trustwave',
'mailmarshal', 'tmase', 'startscan', 'fe-etp', 'jemd', 'suspicious', 'grey', 'infected', 'unscannable', 'mailmarshal', 'tmase', 'startscan', 'fe-etp', 'jemd', 'suspicious', 'grey', 'infected', 'unscannable',
'dlp-', 'sanitize', 'mailscan', 'barracuda', 'clearswift', 'messagelabs', 'msw-jemd', 'fe-etp', 'symc-ess', 'dlp-', 'sanitize', 'mailscan', 'barracuda', 'clearswift', 'messagelabs', 'msw-jemd', 'fe-etp', 'symc-ess',
'starscan', 'mailcontrol', 'spamexpert' 'starscan', 'mailcontrol', 'spamexpert', 'X-Fuglu',
) )
Interesting_Headers = ( Interesting_Headers = (
@ -481,7 +482,8 @@ class SMTPHeadersAnalysis:
'X-ICPINFO', 'x-locaweb-id', 'X-MC-User', 'mailersend', 'MailiGen', 'Mandrill', 'MarketoID', 'X-Messagebus-Info', 'X-ICPINFO', 'x-locaweb-id', 'X-MC-User', 'mailersend', 'MailiGen', 'Mandrill', 'MarketoID', 'X-Messagebus-Info',
'Mixmax', 'X-PM-Message-Id', 'postmark', 'X-rext', 'responsys', 'X-SFDC-User', 'salesforce', 'x-sg-', 'x-sendgrid-', 'Mixmax', 'X-PM-Message-Id', 'postmark', 'X-rext', 'responsys', 'X-SFDC-User', 'salesforce', 'x-sg-', 'x-sendgrid-',
'silverpop', '.mkt', 'X-SMTPCOM-Tracking-Number', 'X-vrfbldomain', 'verticalresponse', 'silverpop', '.mkt', 'X-SMTPCOM-Tracking-Number', 'X-vrfbldomain', 'verticalresponse',
'yesmail', 'logon', 'safelink', 'safeattach', 'appinfo', 'X-XS4ALL-', 'client-ip' 'yesmail', 'logon', 'safelink', 'safeattach', 'appinfo', 'X-XS4ALL-', 'client-ip', 'porn',
'X-Newsletter'
) )
Security_Appliances_And_Their_Headers = \ Security_Appliances_And_Their_Headers = \
@ -504,6 +506,7 @@ class SMTPHeadersAnalysis:
('FireEye Email Security Solution' , 'X-FEAS-'), ('FireEye Email Security Solution' , 'X-FEAS-'),
('FireEye Email Security Solution' , 'X-FireEye'), ('FireEye Email Security Solution' , 'X-FireEye'),
('Mimecast' , 'X-Mimecast-'), ('Mimecast' , 'X-Mimecast-'),
('Fuglu - mail scanner for postfix' , 'X-Fuglu'),
('MS Defender Advanced Threat Protection - Safe Links' , '-ATPSafeLinks'), ('MS Defender Advanced Threat Protection - Safe Links' , '-ATPSafeLinks'),
('MS Defender Advanced Threat Protection' , 'X-MS.+-Atp'), ('MS Defender Advanced Threat Protection' , 'X-MS.+-Atp'),
('MS Defender for Office365' , '-Safelinks'), ('MS Defender for Office365' , '-Safelinks'),
@ -518,6 +521,7 @@ class SMTPHeadersAnalysis:
('SpamAssassin' , 'X-Spam-'), ('SpamAssassin' , 'X-Spam-'),
('Symantec Email Security' , 'X-SpamInfo'), ('Symantec Email Security' , 'X-SpamInfo'),
('Symantec Email Security' , 'X-SpamReason'), ('Symantec Email Security' , 'X-SpamReason'),
('Symantec Email Security' , 'X-Brightmail-Tracker'),
('Symantec Email Security' , 'X-StarScan'), ('Symantec Email Security' , 'X-StarScan'),
('Symantec Email Security' , 'X-SYMC-'), ('Symantec Email Security' , 'X-SYMC-'),
('Trend Micro Anti-Spam' , 'X-TM-AS-'), ('Trend Micro Anti-Spam' , 'X-TM-AS-'),
@ -527,6 +531,7 @@ class SMTPHeadersAnalysis:
('Cloudmark Security Platform' , 'X-CMAE-'), ('Cloudmark Security Platform' , 'X-CMAE-'),
('VIPRE Email Security' , 'X-Vipre-'), ('VIPRE Email Security' , 'X-Vipre-'),
('Sunbelt Software Ninja Email Security' , 'X-Ninja-'), ('Sunbelt Software Ninja Email Security' , 'X-Ninja-'),
('WP.pl / o2.pl Email Scanner' , 'X-WP-AV-'),
) )
Security_Appliances_And_Their_Values = \ Security_Appliances_And_Their_Values = \
@ -1401,6 +1406,226 @@ class SMTPHeadersAnalysis:
) )
} }
#
# Assorted list of most frequently occuring SMTP headers.
#
# Collected & manually filtered from corpora of 1700 emails with following one-liner:
#
# for file in * ; do cat $file | sed -r '/^\s*$/d'| sed -e '/^-\{2,\}[0-9a-ZA-Z_\-]\+/,$d' | grep ':' | grep '^[a-zA-Z]' | pcregrep -o1 '^([a-zA-Z0-9_-]+): ' ; done | sort | uniq -c | sort -n -r
#
Usual_SMTP_Headers = (
'Accept-Language',
'ARC-Authentication-Results',
'ARC-Message-Signature',
'ARC-Seal',
'Authentication-Results',
'authentication-results',
'Auto-Submitted',
'Content-Disposition',
'Content-ID',
'Content-Id',
'Content-Language',
'Content-Transfer-Encoding',
'Content-Type',
'Content-type',
'Date',
'date',
'Delivered-To',
'Disposition-Notification-To',
'DKIM-Filter',
'DKIM-Signature',
'dlp-product',
'dlp-reaction',
'dlp-version',
'DomainKey-Signature',
'Feedback-ID',
'Feedback-Id',
'From',
'Gmail-Client-Draft-ID',
'Gmail-Client-Draft-Thread-ID',
'Importance',
'In-Reply-To',
'IronPort-SDR',
'last-modified',
'List-ID',
'List-Id',
'List-Unsubscribe',
'List-unsubscribe',
'List-Unsubscribe-Post',
'mail-from',
'Message-ID',
'Message-Id',
'MIME-Version',
'Mime-Version',
'msip_labels',
'Origin-messageId',
'Precedence',
'Received',
'Received-SPF',
'received-spf',
'Recipient-Id',
'References',
'Reply-To',
'Reply-to',
'Require-Recipient-Valid-Since',
'Return-Path',
'Return-Receipt-To',
'Sender',
'Sent',
'spamdiagnosticmetadata',
'spamdiagnosticoutput',
'Subject',
'Thread-Index',
'Thread-Topic',
'To',
'User-Agent',
'X-Abuse',
'X-Accounttype',
'X-AntiAbuse',
'X-Attachment',
'X-AuditID',
'X-Auto-Response-Suppress',
'X-Binding',
'X-Brightmail-Tracker',
'X-Campaign',
'X-campaignid',
'X-CampaignID',
'X-Charset',
'X-cid',
'X-CLIENT-HOSTNAME',
'X-CLIENT-IP',
'X-CMAE-Analysis',
'X-CMAE-Score',
'X-Complaints-To',
'X-Cron-Env',
'X-CSA-Complaints',
'X-DCC--Metrics',
'X-Delivery-Context',
'X-elqPod',
'X-elqSiteID',
'X-EMAIL-ID',
'X-Email-Rejection-Mode',
'X-Entity-ID',
'x-eopattributedmessage',
'x-exchange-antispam-report-cfa-test',
'x-exchange-antispam-report-test',
'X-FE-Draft-Info',
'X-FE-Policy-ID',
'X-FEAS-Auth-User',
'X-FEAS-Bypass-Scan-On-Auth',
'X-Feedback-ID',
'x-forefront-antispam-report',
'x-forefront-prvs',
'X-Forwarded-Message-Id',
'X-Gm-Message-State',
'X-Google-DKIM-Signature',
'X-Google-Sender-Auth',
'X-Google-Sender-Delegation',
'X-Google-Smtp-Source',
'X-HS-Cid',
'x-incomingheadercount',
'X-IronPort-AV',
'X-JID',
'x-ld-processed',
'X-LinkedIn-Class',
'X-LinkedIn-fbl',
'X-LinkedIn-Id',
'X-LinkedIn-Template',
'X-Mailer',
'X-Mailgun-Batch-Id',
'X-Mailgun-Sending-Ip',
'X-Mailgun-Sid',
'X-Mailgun-Tag',
'X-Mailgun-Track',
'X-Mailgun-Track-Clicks',
'X-Mailgun-Track-Opens',
'X-Mailgun-Variables',
'X-MAILTAGS',
'X-Mandrill-User',
'X-MC-Unique',
'X-MC-User',
'x-mcpf-jobid',
'X-messageUUID',
'x-microsoft-antispam',
'x-microsoft-antispam-message-info',
'x-microsoft-antispam-prvs',
'x-microsoft-exchange-diagnostics',
'X-Mimecast-Spam-Score',
'X-MIMETrack',
'x-ms-exchange-antispam-messagedata',
'x-ms-exchange-antispam-messagedata-0',
'x-ms-exchange-antispam-messagedata-1',
'x-ms-exchange-antispam-messagedata-chunkcount',
'x-ms-exchange-antispam-relay',
'x-ms-exchange-antispam-srfa-diagnostics',
'x-ms-exchange-calendar-series-instance-id',
'X-MS-Exchange-CrossTenant-AuthAs',
'X-MS-Exchange-CrossTenant-AuthSource',
'X-MS-Exchange-CrossTenant-fromentityheader',
'X-MS-Exchange-CrossTenant-id',
'X-MS-Exchange-CrossTenant-mailboxtype',
'X-MS-Exchange-CrossTenant-Network-Message-Id',
'X-MS-Exchange-CrossTenant-originalarrivaltime',
'X-MS-Exchange-CrossTenant-RMS-PersistedConsumerOrg',
'X-MS-Exchange-CrossTenant-rms-persistedconsumerorg',
'X-MS-Exchange-CrossTenant-userprincipalname',
'x-ms-exchange-generated-message-source',
'X-MS-Exchange-Inbox-Rules-Loop',
'x-ms-exchange-meetingforward-message',
'x-ms-exchange-messagesentrepresentingtype',
'x-ms-exchange-parent-message-id',
'x-ms-exchange-purlcount',
'x-ms-exchange-senderadcheck',
'X-MS-Exchange-Transport-CrossTenantHeadersStamped',
'x-ms-exchange-transport-forked',
'x-ms-exchange-transport-fromentityheader',
'X-MS-Has-Attach',
'x-ms-office365-filtering-correlation-id',
'x-ms-office365-filtering-ht',
'x-ms-oob-tlc-oobclassifiers',
'x-ms-publictraffictype',
'X-MS-TNEF-Correlator',
'x-ms-traffictypediagnostic',
'X-MSFBL',
'X-OriginalArrivalTime',
'X-Originating-Client',
'x-originating-ip',
'X-OriginatorOrg',
'X-Ovh-Tracer-Id',
'X-Priority',
'X-Provags-ID',
'X-Received',
'X-Report-Abuse',
'X-Report-Abuse-To',
'X-REPORT-ABUSE-TO',
'X-Return-Path',
'X-Sender',
'X-SES-Outgoing',
'X-SG-EID',
'X-SG-ID',
'X-sib-id',
'X-Sid',
'X-Spam-Checker-Version',
'X-Spam-Level',
'X-Spam-Status',
'X-Thread-Info',
'X-TM-Deliver-Signature',
'X-TM-MAIL-RECEIVED-TIME',
'X-TM-MAIL-UUID',
'x-tm-snts-smtp',
'X-TM-SNTS-SMTP',
'X-UI-Message-Type',
'X-UI-Out-Filterresults',
'X-VADE-SPAMCAUSE',
'X-VADE-SPAMSTATE',
'X-Virus-Scanned',
'X-VR-SPAMCAUSE',
'X-VR-SPAMSCORE',
'X-VR-SPAMSTATE',
'X-Zoho-RID',
'X-Zoho-Virus-Status',
)
Time_Zone_Acronyms = ( Time_Zone_Acronyms = (
'A', 'ACDT', 'ACST', 'ACT', 'ACT', 'ACWST', 'ADST', 'ADST', 'ADT', 'ADT', 'AEDT', 'AEST', 'AET', 'AET', 'AFT', 'AKDT', 'AKST', 'A', 'ACDT', 'ACST', 'ACT', 'ACT', 'ACWST', 'ADST', 'ADST', 'ADT', 'ADT', 'AEDT', 'AEST', 'AET', 'AET', 'AFT', 'AKDT', 'AKST',
'ALMT', 'AMDT', 'AMST', 'AMST', 'AMT', 'AMT', 'ANAST', 'ANAT', 'AoE', 'AQTT', 'ART', 'AST', 'AST', 'AST', 'AST', 'AST', 'AST', 'ALMT', 'AMDT', 'AMST', 'AMST', 'AMT', 'AMT', 'ANAST', 'ANAT', 'AoE', 'AQTT', 'ART', 'AST', 'AST', 'AST', 'AST', 'AST', 'AST',
@ -1639,7 +1864,7 @@ class SMTPHeadersAnalysis:
Manually_Added_Appliances = set() Manually_Added_Appliances = set()
def __init__(self, logger, resolve = False, decode_all = False, testsToRun = []): def __init__(self, logger, resolve = False, decode_all = False, testsToRun = [], includeUnusual = False):
self.text = '' self.text = ''
self.results = {} self.results = {}
self.resolve = resolve self.resolve = resolve
@ -1649,6 +1874,8 @@ class SMTPHeadersAnalysis:
self.testsToRun = testsToRun self.testsToRun = testsToRun
self.securityAppliances = set() self.securityAppliances = set()
self.mtaHostnamesExposed = {} self.mtaHostnamesExposed = {}
self.ipgeoCache = {}
self.includeUnusual = includeUnusual
# (number, header, value) # (number, header, value)
self.headers = [] self.headers = []
@ -1664,7 +1891,7 @@ class SMTPHeadersAnalysis:
( '2', 'Extracted IP addresses', self.testExtractIP), ( '2', 'Extracted IP addresses', self.testExtractIP),
( '3', 'Extracted Domains', self.testResolveIntoIP), ( '3', 'Extracted Domains', self.testResolveIntoIP),
( '4', 'Bad Keywords In Headers', self.testBadKeywords), ( '4', 'Bad Keywords In Headers', self.testBadKeywords),
( '5', 'From Address Analysis', self.testFrom), ( '5', 'Sender Address Analysis', self.testFrom),
( '6', 'Subject and Thread Topic Difference', self.testSubjecThreadTopic), ( '6', 'Subject and Thread Topic Difference', self.testSubjecThreadTopic),
( '7', 'Authentication-Results', self.testAuthenticationResults), ( '7', 'Authentication-Results', self.testAuthenticationResults),
( '8', 'ARC-Authentication-Results', self.testARCAuthenticationResults), ( '8', 'ARC-Authentication-Results', self.testARCAuthenticationResults),
@ -1766,6 +1993,10 @@ class SMTPHeadersAnalysis:
('78', 'Security Appliances Spotted', self.testSecurityAppliances), ('78', 'Security Appliances Spotted', self.testSecurityAppliances),
('79', 'Email Providers Infrastructure Clues', self.testEmailIntelligence), ('79', 'Email Providers Infrastructure Clues', self.testEmailIntelligence),
('98', 'MTA Hostname Exposed', self.testMTAHostnamesExposed), ('98', 'MTA Hostname Exposed', self.testMTAHostnamesExposed),
('105', 'Identified Sender Addresses', self.testSenderAddress),
# Make this last one, always
('106', 'Unsual SMTP headers', self.testUnusualHeaders),
) )
testsDecodeAll = ( testsDecodeAll = (
@ -2905,10 +3136,16 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
m = re.search(r'\b(' + re.escape(word) + r')\b', value, re.I) m = re.search(r'\b(' + re.escape(word) + r')\b', value, re.I)
if m: if m:
w = m.group(1) w = m.group(1)
pos = value.lower().find(w.lower())
pos2 = value.lower().find(w.lower() + '=')
if pos2 != -1 and ' ' not in w and w.lower() == w:
continue
found.add(w) found.add(w)
foundWords.add(w) foundWords.add(w)
pos = value.lower().find(w.lower())
if pos != -1: if pos != -1:
value = value[:pos] + self.logger.colored(w, "red") + value[pos + len(w):] value = value[:pos] + self.logger.colored(w, "red") + value[pos + len(w):]
@ -4171,6 +4408,62 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info
) )
def testUnusualHeaders(self):
result = ''
num0 = 0
tmp = ''
if not self.includeUnusual:
return []
shown = set()
handled = [x.lower() for x in SMTPHeadersAnalysis.Handled_Spam_Headers]
for num, header, value in self.headers:
value = SMTPHeadersAnalysis.flattenLine(value)
if header.lower() in shown or header.lower() in handled:
continue
skip = False
for rex in SMTPHeadersAnalysis.Usual_SMTP_Headers:
if re.match(rex, header, re.I):
skip = True
break
if skip:
continue
shown.add(header.lower())
num0 += 1
h = self.logger.colored(header, 'yellow')
v = self.colorizeKeywords(value[:60])
if len(value) > 60:
v += ' (...)'
c = ' ' * (30 - len(header))
if len(header) > 30: c = ''
tmp += f'\t- {h}{c}: {v}\n\n'
if len(tmp) > 0:
result += f'\n- This script is aware of {len(SMTPHeadersAnalysis.Usual_SMTP_Headers)} typical SMTP Headers.\n'
result += f'\n- Below {num0} headers are considered unusual:\n\n'
result += tmp
if len(result) == 0:
return []
return {
'header' : '',
'value': '',
'analysis' : result,
'description' : 'This script knows only limited number of SMTP headers making output of this test overly verbose.',
}
def testSpamAssassinSpamAlikeLevels(self): def testSpamAssassinSpamAlikeLevels(self):
result = '' result = ''
tmp = '' tmp = ''
@ -4500,7 +4793,8 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
break break
if not spf: if not spf:
result += self.logger.colored('\n- WARNING! Potential Domain Impersonation!\n', 'red') result += '\n- (this test is very false-positive prone, below results can be inaccurate)'
result += self.logger.colored('\n\n- WARNING! Potential Domain Impersonation!\n', 'red')
result += f'\t- Mail\'s domain should resolve to: \t{self.logger.colored(senderDomain, "green")}\n' result += f'\t- Mail\'s domain should resolve to: \t{self.logger.colored(senderDomain, "green")}\n'
result += f'\t- But instead first hop resolved to:\t{self.logger.colored(firstHopDomain1, "red")}\n' result += f'\t- But instead first hop resolved to:\t{self.logger.colored(firstHopDomain1, "red")}\n'
@ -5195,9 +5489,9 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
if self.resolve: if self.resolve:
resolved = SMTPHeadersAnalysis.resolveAddress(parsed['CIP']) resolved = SMTPHeadersAnalysis.resolveAddress(parsed['CIP'])
result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address: {parsed["CIP"]} (resolved: {resolved})\n\n' result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address:\n\t- {self.logger.colored(parsed["CIP"], "yellow")} (resolved: {self.logger.colored(resolved, "magenta")})\n\n'
else: else:
result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address: {parsed["CIP"]}\n\n' result += f'- {self.logger.colored("CIP", "magenta")}: Connecting IP address:\n\t- {self.logger.colored(parsed["CIP"], "yellow")}\n\n'
for k, v in parsed.items(): for k, v in parsed.items():
elem = None elem = None
@ -5317,6 +5611,66 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
'description' : '', 'description' : '',
} }
def testSenderAddress(self):
headersFound = set()
senderHeaders = (
'MAIL FROM',
'mail-from',
'Return-Path',
'X-Env-Sender',
'From',
'Sender',
'X-Apparently-From',
)
addresses = []
for (num, header, value) in self.headers:
if header.lower() not in [x.lower() for x in senderHeaders]:
continue
headersFound.add(header)
result = ''
headers = ''
values = ''
num1 = 0
for header in headersFound:
(num, hdr, value) = self.getHeader(header)
if num != -1:
num1 += 1
value = value.replace('<', '').replace('>', '').replace('\t', '').replace(' ', '').strip()
headers += f' - {hdr}\n'
values += f' - {value}\n'
t = self.logger.colored(f"{hdr:20}", "yellow")
v = self.logger.colored(value, "green")
addresses.append(value)
result +=f'\n\t- {t}: {v}'
if num1 == 0:
return []
result = f'\n- Identified sender addresses ({num1}):\n' + result
if len(addresses) > 0:
if not addresses.count(addresses[0]) == len(addresses):
result += self.logger.colored(f'\n\n- WARNING! Not all sender addresses match each other - potential Mail Spoofing!\n', 'red')
result += '- See here for more info: https://blog.shiraj.com/2020/05/email-spoofing/\n'
return {
'header' : '\n'+headers,
'value': values,
'analysis' : result,
'description' : f'Sender\'s address was found in {num1} different SMTP headers.',
}
def testFrom(self): def testFrom(self):
(num, header, value) = self.getHeader('From') (num, header, value) = self.getHeader('From')
if num == -1: return [] if num == -1: return []
@ -5876,7 +6230,7 @@ This can lead to an internal information disclosure. This test shows potential h
words = [x.strip() for x in value.lower().split(' ') if len(x.strip()) > 0] words = [x.strip() for x in value.lower().split(' ') if len(x.strip()) > 0]
if words[0] != 'pass': if words[0] != 'pass':
result += self.logger.colored(f'- Received-SPF test failed', 'red') + ': Should be "pass", but was: "' + str(words[0]) + '"\n' result += self.logger.colored(f'- Received-SPF test failed', 'red') + f'\n\t- Should be "{self.logger.colored("pass", "green")}", but was: "' + str(words[0]) + '"\n'
result += '- Decomposition:\n' result += '- Decomposition:\n'
@ -5897,6 +6251,9 @@ This can lead to an internal information disclosure. This test shows potential h
if len(resolved) > 0: if len(resolved) > 0:
result += f'\t(resolved: {self.logger.colored(resolved, "magenta")})' result += f'\t(resolved: {self.logger.colored(resolved, "magenta")})'
geo = self.collectIpGeo(v)
result += '\n\t' + str(textwrap.indent(geo, '\t\t'))
result += '\n' result += '\n'
else: else:
result += f'\t- {k.strip():26}: {self.logger.colored(v.strip(), "yellow")}\n' result += f'\t- {k.strip():26}: {self.logger.colored(v.strip(), "yellow")}\n'
@ -5962,7 +6319,7 @@ This can lead to an internal information disclosure. This test shows potential h
p = self.logger.colored('pass', 'green') p = self.logger.colored('pass', 'green')
p2 = self.logger.colored(tests[k], 'red') p2 = self.logger.colored(tests[k], 'red')
result += self.logger.colored(f'- {k.upper()} test failed:', 'red') + f' Should be "{p}", but was: "' + p2 + '"\n' result += self.logger.colored(f'- {k.upper()} test failed:', 'red') + f'\n\t- Should be "{p}", but was: "' + p2 + '"\n'
if k.lower() == 'dkim' and tests[k] in SMTPHeadersAnalysis.auth_result.keys(): if k.lower() == 'dkim' and tests[k] in SMTPHeadersAnalysis.auth_result.keys():
result += '\t- Meaning: ' + SMTPHeadersAnalysis.auth_result[tests[k]] + '\n\n' result += '\t- Meaning: ' + SMTPHeadersAnalysis.auth_result[tests[k]] + '\n\n'
@ -5976,6 +6333,45 @@ This can lead to an internal information disclosure. This test shows potential h
'analysis' : result, 'analysis' : result,
'description' : '', 'description' : '',
} }
def collectIpGeo(self, addr):
if addr in self.ipgeoCache.keys():
return self.ipgeoCache[addr]
tmp = ''
try:
self.logger.dbg(f'testExtractIP: Collecting IP Geo metadata...')
r = requests.get(
f'http://ip-api.com/json/{addr}',
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Language': 'en-US',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/537.36',
}
)
out = r.json()
if out != None and len(out) > 0 and type(out) is dict:
tmp += f'\t\t- IP Geo metadata:\n'
for k, v in out.items():
k1 = k
k = f'{k:12}'
if k1.lower() in ('country', 'regionName', 'city', 'isp', 'org', 'as'):
k = self.logger.colored(k, "cyan")
v = self.logger.colored(v, "green")
else:
v = self.logger.colored(v, "yellow")
tmp += f'\t\t\t- {k}: {v}\n'
except Exception as e:
pass
self.ipgeoCache[addr] = tmp
return tmp
def testExtractIP(self): def testExtractIP(self):
addresses = re.findall(r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})', self.text) addresses = re.findall(r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})', self.text)
@ -6005,36 +6401,7 @@ This can lead to an internal information disclosure. This test shows potential h
if out != None and len(out) > 0 and out != addr: if out != None and len(out) > 0 and out != addr:
tmp += f'\t\t- that resolves to: {out}\n' tmp += f'\t\t- that resolves to: {out}\n'
try: tmp += str(textwrap.indent(self.collectIpGeo(rawAddr), '\t'))
self.logger.dbg(f'testExtractIP: Collecting IP Geo metadata...')
r = requests.get(
f'http://ip-api.com/json/{rawAddr}',
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Language': 'en-US',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/537.36',
}
)
out = r.json()
if out != None and len(out) > 0 and type(out) is dict:
tmp += f'\t\t- IP Geo metadata:\n'
for k, v in out.items():
k1 = k
k = f'{k:12}'
if k1.lower() in ('country', 'regionName', 'city', 'isp', 'org', 'as'):
k = self.logger.colored(k, "cyan")
v = self.logger.colored(v, "green")
else:
v = self.logger.colored(v, "yellow")
tmp += f'\t\t\t- {k}: {v}\n'
except Exception as e:
pass
else: else:
addr = self.logger.colored(addr, 'magenta') addr = self.logger.colored(addr, 'magenta')
tmp += f'\t- Found IP address: {addr}\n' tmp += f'\t- Found IP address: {addr}\n'
@ -6048,7 +6415,7 @@ This can lead to an internal information disclosure. This test shows potential h
else: else:
result = '\n\t- Extracted IP addresses from headers:\n\n' result = '\n\t- Extracted IP addresses from headers:\n\n'
result += tmp.strip() result += tmp.rstrip()
if len(resolved) == 0: if len(resolved) == 0:
return [] return []
@ -6091,6 +6458,7 @@ This can lead to an internal information disclosure. This test shows potential h
if len(out) > 0: if len(out) > 0:
tmp += f'\t\t- that resolves to: {self.logger.colored(out, "cyan")}\n' tmp += f'\t\t- that resolves to: {self.logger.colored(out, "cyan")}\n'
tmp += self.collectIpGeo(out)
else: else:
tmp += f'\t- Found Domain: {self.logger.colored(d2, "yellow")}\n' tmp += f'\t- Found Domain: {self.logger.colored(d2, "yellow")}\n'
@ -6165,6 +6533,7 @@ def opts(argv):
tst.add_argument('-r', '--resolve', default=False, action='store_true', help='Resolve IPv4 addresses / Domain names & collect IP Geo metadata.') tst.add_argument('-r', '--resolve', default=False, action='store_true', help='Resolve IPv4 addresses / Domain names & collect IP Geo metadata.')
tst.add_argument('-R', '--dont-resolve', default=False, action='store_true', help='Do not resolve anything.') tst.add_argument('-R', '--dont-resolve', default=False, action='store_true', help='Do not resolve anything.')
tst.add_argument('-a', '--decode-all', default=False, action='store_true', help='Decode all =?us-ascii?Q? mail encoded messages and print their contents.') tst.add_argument('-a', '--decode-all', default=False, action='store_true', help='Decode all =?us-ascii?Q? mail encoded messages and print their contents.')
tst.add_argument('-U', '--no-unusual', default=False, action='store_true', help='Do not print SMTP headers this script considers as unusual.')
args = o.parse_args() args = o.parse_args()
@ -6516,7 +6885,7 @@ def main(argv):
testsToRun = sorted(_testsToRun) testsToRun = sorted(_testsToRun)
an = SMTPHeadersAnalysis(logger, args.resolve, args.decode_all, testsToRun) an = SMTPHeadersAnalysis(logger, args.resolve, args.decode_all, testsToRun, not args.no_unusual)
out = an.parse(text) out = an.parse(text)
printed = printOutput(out) printed = printOutput(out)