mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 21:43:30 +01:00
MAESTRO: fix ruff lint
This commit is contained in:
@@ -1,16 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from concurrent.futures import TimeoutError as FutureTimeoutError
|
||||
from time import perf_counter
|
||||
from typing import Callable
|
||||
|
||||
from .models import AnalysisRequest, AnalysisResult, ReportMetadata, Severity, TestResult, TestStatus
|
||||
from .models import (
|
||||
AnalysisRequest,
|
||||
AnalysisResult,
|
||||
ReportMetadata,
|
||||
Severity,
|
||||
TestResult,
|
||||
TestStatus,
|
||||
)
|
||||
from .parser import HeaderParser, ParsedHeader
|
||||
from .scanner_base import BaseScanner
|
||||
from .scanner_registry import ScannerRegistry
|
||||
from .scanners._legacy_adapter import configure_legacy
|
||||
|
||||
|
||||
ProgressCallback = Callable[[int, int, str], None]
|
||||
|
||||
DEFAULT_PER_TEST_TIMEOUT_SECONDS = 3.0
|
||||
@@ -111,9 +118,11 @@ class HeaderAnalyzer:
|
||||
return future.result(timeout=self._per_test_timeout_seconds)
|
||||
except FutureTimeoutError as exc:
|
||||
future.cancel()
|
||||
raise TimeoutError(
|
||||
f"Test {scanner.id} timed out after {self._per_test_timeout_seconds:.2f}s"
|
||||
) from exc
|
||||
message = (
|
||||
f"Test {scanner.id} timed out after "
|
||||
f"{self._per_test_timeout_seconds:.2f}s"
|
||||
)
|
||||
raise TimeoutError(message) from exc
|
||||
finally:
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
|
||||
@@ -93,7 +93,8 @@ class Logger:
|
||||
|
||||
if pos1 == -1:
|
||||
raise ValueError(
|
||||
"Output colors mismatch - could not find pos of end of color number!"
|
||||
"Output colors mismatch - could not find pos of end of "
|
||||
"color number!"
|
||||
)
|
||||
|
||||
c = int(s[pos : pos + pos1])
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from importlib import import_module
|
||||
import inspect
|
||||
import pkgutil
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from importlib import import_module
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Iterable
|
||||
|
||||
@@ -44,7 +44,9 @@ class ScannerRegistry:
|
||||
discovered = _discover_scanners()
|
||||
if discovered:
|
||||
self._scanners = _dedupe_scanners(discovered)
|
||||
self._category_map = _build_category_map(scanner.id for scanner in self._scanners)
|
||||
self._category_map = _build_category_map(
|
||||
scanner.id for scanner in self._scanners
|
||||
)
|
||||
else:
|
||||
tests = _load_tests_from_monolith()
|
||||
self._category_map = _build_category_map(test_id for test_id, _ in tests)
|
||||
@@ -99,11 +101,11 @@ def _discover_scanners() -> list[BaseScanner]:
|
||||
def _extract_scanners(module: object) -> list[BaseScanner]:
|
||||
scanners: list[BaseScanner] = []
|
||||
if hasattr(module, "SCANNERS"):
|
||||
declared = getattr(module, "SCANNERS")
|
||||
declared = module.SCANNERS
|
||||
if isinstance(declared, list | tuple):
|
||||
scanners.extend(declared)
|
||||
if hasattr(module, "get_scanners") and callable(getattr(module, "get_scanners")):
|
||||
declared = getattr(module, "get_scanners")()
|
||||
if hasattr(module, "get_scanners") and callable(module.get_scanners):
|
||||
declared = module.get_scanners()
|
||||
if isinstance(declared, list | tuple):
|
||||
scanners.extend(declared)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import importlib.util
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
@@ -9,7 +9,6 @@ from app.engine.logger import Logger as EngineLogger
|
||||
from app.engine.models import Severity, TestResult, TestStatus
|
||||
from app.engine.parser import ParsedHeader
|
||||
|
||||
|
||||
_LEGACY_MODULE = None
|
||||
_TEST_CATALOG: dict[int, tuple[str, str]] | None = None
|
||||
_ARRAY_TESTS: set[int] | None = None
|
||||
@@ -189,7 +188,9 @@ def _normalize_payload(payload: object) -> tuple[str, str, str, str] | None:
|
||||
return None
|
||||
|
||||
|
||||
def _combine_payloads(payloads: list[tuple[str, str, str, str]]) -> tuple[str, str, str, str]:
|
||||
def _combine_payloads(
|
||||
payloads: list[tuple[str, str, str, str]]
|
||||
) -> tuple[str, str, str, str]:
|
||||
headers: list[str] = []
|
||||
values: list[str] = []
|
||||
analyses: list[str] = []
|
||||
@@ -284,7 +285,9 @@ class LegacyScanner:
|
||||
)
|
||||
|
||||
|
||||
def build_scanners(test_ids: Iterable[int], category: str | None = None) -> list[LegacyScanner]:
|
||||
def build_scanners(
|
||||
test_ids: Iterable[int], category: str | None = None
|
||||
) -> list[LegacyScanner]:
|
||||
catalog, _array_ids = _load_test_catalog()
|
||||
scanners: list[LegacyScanner] = []
|
||||
for test_id in test_ids:
|
||||
@@ -292,6 +295,8 @@ def build_scanners(test_ids: Iterable[int], category: str | None = None) -> list
|
||||
raise ValueError(f"Unknown test id: {test_id}")
|
||||
name, method_name = catalog[test_id]
|
||||
scanners.append(
|
||||
LegacyScanner(id=test_id, name=name, method_name=method_name, category=category)
|
||||
LegacyScanner(
|
||||
id=test_id, name=name, method_name=method_name, category=category
|
||||
)
|
||||
)
|
||||
return scanners
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners([69, 70, 71, 72, 73], category="Barracuda")
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners(
|
||||
[12, 13, 14, 15, 16, 63, 64],
|
||||
category="Forefront Antispam",
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners(
|
||||
[
|
||||
4,
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners(
|
||||
[27, 28, 29, 38, 39, 40, 41, 42, 43, 88, 89],
|
||||
category="IronPort",
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners(
|
||||
[31, 32, 33, 34, 35, 80, 83, 84, 85, 99, 100, 101, 102],
|
||||
category="Microsoft",
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners([30, 61, 62, 65], category="Mimecast")
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners([66, 67], category="Proofpoint")
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners([1, 2, 3], category="Received Headers")
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners([18, 19, 20, 21, 74], category="SpamAssassin")
|
||||
|
||||
@@ -2,5 +2,4 @@ from __future__ import annotations
|
||||
|
||||
from ._legacy_adapter import build_scanners
|
||||
|
||||
|
||||
SCANNERS = build_scanners(list(range(47, 60)) + [97], category="Trend Micro")
|
||||
|
||||
Reference in New Issue
Block a user