mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 13:33:30 +01:00
MAESTRO: add engine TDD fixtures and tests
This commit is contained in:
46
backend/tests/engine/test_analyzer.py
Normal file
46
backend/tests/engine/test_analyzer.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from app.engine.analyzer import HeaderAnalyzer
|
||||
from app.engine.models import AnalysisRequest, AnalysisResult, TestResult
|
||||
|
||||
|
||||
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
|
||||
|
||||
|
||||
def test_analyzer_runs_selected_tests_and_reports_progress() -> None:
|
||||
raw_headers = (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
|
||||
request = AnalysisRequest(
|
||||
headers=raw_headers,
|
||||
config={
|
||||
"test_ids": [12, 13],
|
||||
"resolve": False,
|
||||
"decode_all": False,
|
||||
},
|
||||
)
|
||||
|
||||
progress_events: list[tuple[int, int, str]] = []
|
||||
|
||||
def on_progress(current_index: int, total_tests: int, test_name: str) -> None:
|
||||
progress_events.append((current_index, total_tests, test_name))
|
||||
|
||||
analyzer = HeaderAnalyzer()
|
||||
result = analyzer.analyze(request, progress_callback=on_progress)
|
||||
|
||||
assert isinstance(result, AnalysisResult)
|
||||
assert len(result.results) == 2
|
||||
assert [item.test_id for item in result.results] == [12, 13]
|
||||
assert all(isinstance(item, TestResult) for item in result.results)
|
||||
|
||||
assert result.metadata.total_tests == 2
|
||||
assert (
|
||||
result.metadata.passed_tests
|
||||
+ result.metadata.failed_tests
|
||||
+ result.metadata.skipped_tests
|
||||
) == result.metadata.total_tests
|
||||
|
||||
assert progress_events
|
||||
assert all(total == 2 for _, total, _ in progress_events)
|
||||
assert progress_events[0][0] == 0
|
||||
assert progress_events[-1][0] == 1
|
||||
52
backend/tests/engine/test_parser.py
Normal file
52
backend/tests/engine/test_parser.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app.engine.parser import HeaderParser
|
||||
|
||||
|
||||
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def sample_headers() -> str:
|
||||
return (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_parser_extracts_headers_and_preserves_order(sample_headers: str) -> None:
|
||||
parser = HeaderParser()
|
||||
headers = parser.parse(sample_headers)
|
||||
|
||||
assert headers, "Expected parsed headers to be non-empty."
|
||||
|
||||
indices = [header.index for header in headers]
|
||||
assert indices == list(range(len(headers)))
|
||||
|
||||
names = [header.name for header in headers]
|
||||
assert names[:2] == ["Received", "Received"]
|
||||
assert "X-Should-Not-Be-Parsed" not in names
|
||||
|
||||
|
||||
def test_parser_handles_folded_lines(sample_headers: str) -> None:
|
||||
parser = HeaderParser()
|
||||
headers = parser.parse(sample_headers)
|
||||
|
||||
subject = next(header for header in headers if header.name == "Subject")
|
||||
assert "folded line" in subject.value
|
||||
assert "\n" in subject.value
|
||||
|
||||
authentication = next(
|
||||
header for header in headers if header.name == "Authentication-Results"
|
||||
)
|
||||
assert "spf=pass" in authentication.value
|
||||
assert "\n" in authentication.value
|
||||
|
||||
|
||||
def test_parser_preserves_content_type_boundary(sample_headers: str) -> None:
|
||||
parser = HeaderParser()
|
||||
headers = parser.parse(sample_headers)
|
||||
|
||||
content_type = next(header for header in headers if header.name == "Content-Type")
|
||||
assert "boundary=\"boundary-123\"" in content_type.value
|
||||
32
backend/tests/engine/test_scanner_registry.py
Normal file
32
backend/tests/engine/test_scanner_registry.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.engine.scanner_registry import ScannerRegistry
|
||||
|
||||
|
||||
def test_registry_discovers_all_scanners() -> None:
|
||||
registry = ScannerRegistry()
|
||||
scanners = registry.get_all()
|
||||
|
||||
assert len(scanners) >= 106
|
||||
|
||||
ids = [scanner.id for scanner in scanners]
|
||||
assert len(ids) == len(set(ids))
|
||||
assert {1, 12, 66}.issubset(set(ids))
|
||||
|
||||
|
||||
def test_registry_filters_by_ids_and_lists_tests() -> None:
|
||||
registry = ScannerRegistry()
|
||||
selected = registry.get_by_ids([1, 12, 66])
|
||||
|
||||
assert [scanner.id for scanner in selected] == [1, 12, 66]
|
||||
|
||||
tests = registry.list_tests()
|
||||
lookup = {test.id: test for test in tests}
|
||||
|
||||
assert lookup[1].name == "Received - Mail Servers Flow"
|
||||
assert lookup[12].name == "X-Forefront-Antispam-Report"
|
||||
assert lookup[66].name == "X-Proofpoint-Spam-Details"
|
||||
|
||||
assert lookup[1].category
|
||||
assert lookup[12].category
|
||||
assert lookup[66].category
|
||||
Reference in New Issue
Block a user