updated decode-spam-headers.py

This commit is contained in:
Mariusz B. / mgeeky 2021-10-27 03:43:42 +02:00
parent 784eec6781
commit 93b1040fe7
1 changed files with 190 additions and 5 deletions

View File

@ -81,6 +81,13 @@
# Usage: # Usage:
# ./decode-spam-headers [options] <smtp-headers.txt> # ./decode-spam-headers [options] <smtp-headers.txt>
# #
# NOTICE:
# Parts of this code contain fragments copied from the following places:
#
# 1) testEmailIntelligence():
# source: https://github.com/nquinlan/Email-Intelligence
# authored by: Nick Quinlan (nick@nicholasquinlan.com)
#
# Requirements: # Requirements:
# - packaging # - packaging
# - dnspython # - dnspython
@ -318,14 +325,20 @@ class SMTPHeadersAnalysis:
'trusteer', 'trustlook', 'virusblokada', 'virustotal', 'virustotalcloud', 'webroot', 'trusteer', 'trustlook', 'virusblokada', 'virustotal', 'virustotalcloud', 'webroot',
'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', '-sea-', 'perlmx', 'trustwave', 'yandex', 'yandexbot', 'zillya', 'zonealarm', 'zscaler', '-sea-', 'perlmx', 'trustwave',
'mailmarshal', 'tmase', 'startscan', 'fe-etp', 'jemd', 'suspicious', 'grey', 'infected', 'unscannable', 'mailmarshal', 'tmase', 'startscan', 'fe-etp', 'jemd', 'suspicious', 'grey', 'infected', 'unscannable',
'dlp-', 'sanitize', 'mailscan', 'barracuda', 'dlp-', 'sanitize', 'mailscan', 'barracuda', 'clearswift', 'messagelabs', 'msw-jemd', 'fe-etp', 'symc-ess',
'starscan', 'mailcontrol',
) )
Interesting_Headers = ( Interesting_Headers = (
'mailgun', 'sendgrid', 'mailchimp', 'x-ses', 'x-avas', 'X-Gmail-Labels', 'X-vrfbldomain', 'mailgun', 'sendgrid', 'mailchimp', 'x-ses', 'x-avas', 'X-Gmail-Labels', 'X-vrfbldomain',
'mandrill', 'bulk', 'sendinblue', 'amazonses', 'mailjet', 'postmark', 'postfix', 'dovecot', 'roundcube', 'mandrill', 'bulk', 'sendinblue', 'amazonses', 'mailjet', 'postmark', 'postfix', 'dovecot', 'roundcube',
'seg', '-IP', 'crosspremises', 'brightmail', 'check', 'exim', 'postfix', 'exchange', 'microsoft', 'office365', 'seg', '-IP', 'crosspremises', 'brightmail', 'check', 'exim', 'postfix', 'exchange', 'microsoft', 'office365',
'dovecot', 'sendmail', 'score', 'report', 'status', 'dovecot', 'sendmail', 'score', 'report', 'status', 'benchmarkemail', 'bronto', 'X-Complaints-To',
'X-Roving-ID', 'X-DynectEmail-Msg', 'X-elqPod', 'X-EMV-MemberId', 'e2ma', 'fishbowl', 'eloop', 'X-Google-Appengine-App-Id',
'X-ICPINFO', 'x-locaweb-id', 'X-MC-User', 'mailersend', 'MailiGen', 'Mandrill', 'MarketoID', 'X-Messagebus-Info',
'Mixmax', 'X-PM-Message-Id', 'postmark', 'X-rext', 'responsys', 'X-SFDC-User', 'salesforce', 'x-sg-', 'x-sendgrid-',
'silverpop', '.mkt', 'X-SMTPCOM-Tracking-Number', 'X-vrfbldomain', 'verticalresponse',
'yesmail',
) )
Headers_Known_For_Breaking_Line = ( Headers_Known_For_Breaking_Line = (
@ -1107,7 +1120,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
i += j - 1 i += j - 1
if headers[-1][1].lower() == 'content-type': if len(headers) > 0 and len(headers[-1]) > 1 and headers[-1][1].lower() == 'content-type':
m = re.search(r'boundary="([^"]+)"', headers[-1][2], re.I) m = re.search(r'boundary="([^"]+)"', headers[-1][2], re.I)
if m: if m:
boundary = m.group(1) boundary = m.group(1)
@ -1205,6 +1218,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
('Other unrecognized Spam Related Headers', self.testSpamRelatedHeaders), ('Other unrecognized Spam Related Headers', self.testSpamRelatedHeaders),
('Other interesting headers', self.testInterestingHeaders), ('Other interesting headers', self.testInterestingHeaders),
('Security Appliances Spotted', self.testSecurityAppliances), ('Security Appliances Spotted', self.testSecurityAppliances),
('Email Providers Infrastructure Clues', self.testEmailIntelligence),
) )
for i in range(len(SMTPHeadersAnalysis.Handled_Spam_Headers)): for i in range(len(SMTPHeadersAnalysis.Handled_Spam_Headers)):
@ -1355,6 +1369,171 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
lines.append(line) lines.append(line)
return '\n'.join(lines) return '\n'.join(lines)
def testEmailIntelligence(self):
service = []
value = self.text
#
# NOTICE:
# This code below was copied from the following repository:
# https://github.com/nquinlan/Email-Intelligence
#
# and is authored solely by Nick Quinlan (nick@nicholasquinlan.com).
#
# Amazon SES
if re.search(r'^X-SES-Outgoing:', value, re.I|re.S) or "Amazon SES" in value or "Amazon SES".lower() in value.lower() or "AmazonSES".lower() in value.lower():
service.append(("Amazon SES", "http://aws.amazon.com/ses/"))
# BenchmarkEmail.com
if re.search(r'www.benchmarkemail.com', value, re.I|re.S) or "BenchmarkEmail".lower() in value.lower():
service.append(("BenchmarkEmail", "http://benchmarkemail.com/"))
# Bronto
if re.search(r'd=bronto.com;', value, re.I|re.S) or "Bronto".lower() in value.lower():
service.append(("Bronto", "http://bronto.com/"))
# Campaign Monitor
if re.search(r'^X-Complaints-To: abuse@cmail\d{1,2}\.com', value, re.I|re.S) or "Campaign Monitor".lower() in value.lower() or "CampaignMonitor".lower() in value.lower():
service.append(("Campaign Monitor", "https://www.campaignmonitor.com"))
# Constant Contact
if re.search(r'^X-Roving-ID:', value, re.I|re.S) or "Constant Contact".lower() in value.lower() or "ConstantContact".lower() in value.lower():
service.append(("Constant Contact", "https://www.constantcontact.com"))
# Dyn
if re.search(r'^X-DynectEmail-Msg-(Key|Hash):', value, re.I|re.S) or "Dyn".lower() in value.lower():
service.append(("Dyn", "https://dyn.com/"))
# Eloqua
if re.search(r'^X-elqPod:', value, re.I|re.S) or "Eloqua".lower() in value.lower():
service.append(("Eloqua", "http://www.eloqua.com/"))
# Email Vision
if re.search(r'^X-EMV-MemberId:', value, re.I|re.S) or "Emailvision".lower() in value.lower():
service.append(("Emailvision", "https://www.emailvision.com/"))
# Emma
if re.search(r'd=e2ma\.net;', value, re.I|re.S) or "Emma".lower() in value.lower():
service.append(("Emma", "https://myemma.com/"))
# ExactTarget
if re.search(r'^x-job: \d{3,}_\d{3,}$', value, re.I|re.S) and re.search(r'mta[\d]*\.[\w-\.]+\.[a-z]{2,}', value, re.I|re.S) or "ExactTarget".lower() in value.lower():
service.append(("ExactTarget", "http://www.exacttarget.com/"))
# Fishbowl
if re.search(r'^X-Mailer: Fishbowl', value, re.I|re.S) or "Fishbowl".lower() in value.lower():
service.append(("Fishbowl", "https://www.fishbowl.com/"))
# Gold Lasso
if re.search(r'^X-Mailer: Eloop Mailer', value, re.I|re.S) or "Gold Lasso".lower() in value.lower() or "GoldLasso".lower() in value.lower():
service.append(("Gold Lasso", "https://www.goldlasso.com/"))
# Google App Engine
if re.search(r'^X-Google-Appengine-App-Id:', value, re.I|re.S) or "Google App Engine".lower() in value.lower() or "GoogleApp".lower() in value.lower() or "AppEngine".lower() in value.lower():
service.append(("Google App Engine", "https://developers.google.com/appengine/docs/python/mail/sendingmail"))
# iContact
if re.search(r'^X-ICPINFO:', value, re.I|re.S) or "iContact".lower() in value.lower():
service.append(("iContact", "https://www.icontact.com/"))
# Listrak
if re.search(r'^Received: from [\w-]+\.listrak\.com', value, re.I|re.S) or "Listrak".lower() in value.lower():
service.append(("Listrak", "https://www.listrak.com/"))
# Locaweb
if re.search(r'^x-locaweb-id:', value, re.I|re.S) or "Locaweb".lower() in value.lower():
service.append(("Locaweb", "https://www.locaweb.com.br/"))
# Mailchimp
if re.search(r'^X-MC-User:', value, re.I|re.S) or "MailChimp".lower() in value.lower():
service.append(("MailChimp", "https://mailchimp.com/"))
# MailerLite
if re.search(r'd=ml.mailersend.com;', value, re.I|re.S) or "MailerLite".lower() in value.lower():
service.append(("MailerLite", "https://www.mailerlite.com/"))
# Mailgun
if re.search(r'^X-Mailgun-Sid:', value, re.I|re.S) or re.search(r'X-Mailgun-Variables:', value, re.I|re.S) or "Mailgun".lower() in value.lower():
service.append(("Mailgun", "https://www.mailgun.com/"))
# Mailigen
if re.search(r'^X-Mailer: MailiGen', value, re.I|re.S) or "Mailigen".lower() in value.lower():
service.append(("Mailigen", "http://www.mailigen.com/"))
# Mailjet
if re.search(r's=mailjet;', value, re.I|re.S) or "Mailjet".lower() in value.lower():
service.append(("Mailjet", "https://www.mailjet.com/"))
# Mandrill
if re.search(r'^X-Mandrill-User:', value, re.I|re.S) or "Mandrill".lower() in value.lower():
service.append(("Mandrill", "https://mandrillapp.com/"))
# Marketo
if re.search(r'^X-MarketoID:', value, re.I|re.S) or "Marketo".lower() in value.lower():
service.append(("Marketo", "https://www.marketo.com/"))
# Message Bus
if re.search(r'^X-Messagebus-Info:', value, re.I|re.S) or "Message Bus".lower() in value.lower() or "MessageBus".lower() in value.lower():
service.append(("Message Bus", "https://messagebus.com/"))
# Mixmax
if re.search(r'^X-Mailer: Mixmax', value, re.I|re.S) or "Mixmax".lower() in value.lower():
service.append(("Mixmax", "https://mixmax.com/"))
# Postmark
if re.search(r'^X-PM-Message-Id:', value, re.I|re.S) or "Postmark".lower() in value.lower():
service.append(("Postmark", "https://postmarkapp.com/"))
# Responsys
if re.search(r'^X-rext:', value, re.I|re.S) or "Responsys".lower() in value.lower():
service.append(("Responsys", "https://www.responsys.com/"))
# Sailthru
if re.search(r'^X-Mailer: sailthru.com$', value, re.I|re.S) or "Sailthru".lower() in value.lower():
service.append(("Sailthru", "https://www.sailthru.com/"))
# Salesforce
if re.search(r'^X-SFDC-User:', value, re.I|re.S) or "Salesforce".lower() in value.lower():
service.append(("Salesforce", "https://www.salesforce.com/"))
# SendGrid
if re.search(r'^X-(SG|SENDGRID)-EID:', value, re.I|re.S) or "SendGrid".lower() in value.lower():
service.append(("SendGrid", "https://sendgrid.com/"))
# Silverpop
if re.search(r'^Received: from [\w\.]+\.mkt\d{3,}\.com', value, re.I|re.S): # Not proprietary, but likely only Silverpo or "Silverpop".lower() in value.lower()p
service.append(("Silverpop", "https://www.silverpop.com/"))
# SMTP.com
if re.search(r'^X-SMTPCOM-Tracking-Number:', value, re.I|re.S) or "SMTP.com".lower() in value.lower():
service.append(("SMTP.com", "https://smtp.com/"))
# VerticalResponse
if re.search(r'^X-vrfbldomain:', value, re.I|re.S) and re.search(r'^X-vrpod:', value, re.I|re.S) and re.search(r'^X-vrrpmm:', value, re.I|re.S) or "VerticalResponse".lower() in value.lower():
service.append(("VerticalResponse", "http://www.verticalresponse.com/"))
# Yesmail
if re.search(r's=yesmail.?;', value, re.I|re.S) or re.search(r'^Received: from [\w\.\-]+postdirect.com', value, re.I|re.S) or "Yesmail".lower() in value.lower():
service.append(("Yesmail", "https://www.yesmail.com/"))
if len(service) == 0:
return []
result = f'- Mail contents analysis shown that this e-mail passed through the following third-party Mail providers:\n\n'
for svc in service:
svcname = self.logger.colored(svc[0], 'green')
result += f'\t- {svcname} - url: {svc[1]}\n'
return {
'header': '',
'value' : '',
'analysis' : result,
'description' : '',
}
def testSecurityAppliances(self): def testSecurityAppliances(self):
result = '' result = ''
vals = [x.lower() for x in SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info] vals = [x.lower() for x in SMTPHeadersAnalysis.Header_Keywords_That_May_Contain_Spam_Info]
@ -2102,7 +2281,7 @@ Results will be unsound. Make sure you have pasted your headers with correct spa
if num == -1: return [] if num == -1: return []
result = f'- Sophos Email Appliance Spam report:\n' result = f'- Sophos Email Appliance Spam report:\n'
self.securityAppliances.add('Sophos Email Appliance') self.securityAppliances.add('Sophos Email Appliance (PureMessage)')
report = {} report = {}
value = SMTPHeadersAnalysis.flattenLine(value) value = SMTPHeadersAnalysis.flattenLine(value)
@ -2365,6 +2544,7 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
elif k == 'E': elif k == 'E':
v0 = self.logger.colored(v, 'red') v0 = self.logger.colored(v, 'red')
result += f'\t\t- {v0}\n' result += f'\t\t- {v0}\n'
self.securityAppliances.add(f'{v} AV')
elif k == 'e': elif k == 'e':
vs = v.split("'") vs = v.split("'")
@ -2870,8 +3050,10 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
if firstHopDomain1.lower() != senderDomain.lower(): if firstHopDomain1.lower() != senderDomain.lower():
response = None response = None
try: try:
if domain.endswith('.'): domain = domain[:-1]
response = dns.resolver.resolve(domain, 'TXT') response = dns.resolver.resolve(domain, 'TXT')
except dns.resolver.NoAnswer as e:
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
response = [] response = []
spf = False spf = False
@ -3301,6 +3483,9 @@ Src: https://www.cisco.com/c/en/us/td/docs/security/esa/esa11-1/user_guide/b_ESA
for i in range(len(path)): for i in range(len(path)):
elem = path[i] elem = path[i]
if len(elem) < 2:
continue
num += 1 num += 1
s = '-->' s = '-->'
if i > 0: if i > 0: