mirror of
https://github.com/jtesta/ssh-audit.git
synced 2025-12-21 22:22:05 +01:00
Results from concurrent scans against multiple hosts are no longer improperly combined (#190).
This commit is contained in:
@@ -23,6 +23,9 @@
|
||||
THE SOFTWARE.
|
||||
"""
|
||||
# pylint: disable=unused-import
|
||||
import copy
|
||||
import threading
|
||||
|
||||
from typing import Dict, List, Set, Sequence, Tuple, Iterable # noqa: F401
|
||||
from typing import Callable, Optional, Union, Any # noqa: F401
|
||||
|
||||
@@ -69,8 +72,10 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods
|
||||
INFO_REMOVED_IN_OPENSSH70 = 'removed in OpenSSH 7.0: https://www.openssh.com/txt/release-7.0'
|
||||
INFO_WITHDRAWN_PQ_ALG = 'the sntrup4591761 algorithm was withdrawn, as it may not provide strong post-quantum security'
|
||||
|
||||
# Maintains a dictionary per calling thread that yields its own copy of MASTER_DB. This prevents results from one thread polluting the results of another thread.
|
||||
DB_PER_THREAD: Dict[int, Dict[str, Dict[str, List[List[Optional[str]]]]]] = {}
|
||||
|
||||
ALGORITHMS: Dict[str, Dict[str, List[List[Optional[str]]]]] = {
|
||||
MASTER_DB: Dict[str, Dict[str, List[List[Optional[str]]]]] = {
|
||||
# Format: 'algorithm_name': [['version_first_appeared_in'], [reason_for_failure1, reason_for_failure2, ...], [warning1, warning2, ...], [info1, info2, ...]]
|
||||
'kex': {
|
||||
'Curve25519SHA256': [[]],
|
||||
@@ -390,3 +395,24 @@ class SSH2_KexDB: # pylint: disable=too-few-public-methods
|
||||
'umac-96@openssh.com': [[], [], [WARN_ENCRYPT_AND_MAC], [INFO_NEVER_IMPLEMENTED_IN_OPENSSH]],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_db() -> Dict[str, Dict[str, List[List[Optional[str]]]]]:
|
||||
'''Returns a copy of the MASTER_DB that is private to the calling thread. This prevents multiple threads from polluting the results of other threads.'''
|
||||
calling_thread_id = threading.get_ident()
|
||||
|
||||
if calling_thread_id not in SSH2_KexDB.DB_PER_THREAD:
|
||||
SSH2_KexDB.DB_PER_THREAD[calling_thread_id] = copy.deepcopy(SSH2_KexDB.MASTER_DB)
|
||||
|
||||
return SSH2_KexDB.DB_PER_THREAD[calling_thread_id]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def thread_exit() -> None:
|
||||
'''Deletes the calling thread's copy of the MASTER_DB. This is needed because, in rare circumstances, a terminated thread's ID can be re-used by new threads.'''
|
||||
|
||||
calling_thread_id = threading.get_ident()
|
||||
|
||||
if calling_thread_id in SSH2_KexDB.DB_PER_THREAD:
|
||||
del SSH2_KexDB.DB_PER_THREAD[calling_thread_id]
|
||||
|
||||
Reference in New Issue
Block a user