From dd807c851a6ab7361eb99d8af4c36515e2620629 Mon Sep 17 00:00:00 2001 From: Mariusz Banach Date: Tue, 17 Feb 2026 23:36:29 +0100 Subject: [PATCH] MAESTRO: add engine models --- ...er-analyzer-Phase-02-Engine-Refactoring.md | 2 +- backend/app/engine/__init__.py | 25 ++++ backend/app/engine/models.py | 119 ++++++++++++++++++ 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 backend/app/engine/__init__.py create mode 100644 backend/app/engine/models.py diff --git a/Auto Run Docs/SpecKit-web-header-analyzer-Phase-02-Engine-Refactoring.md b/Auto Run Docs/SpecKit-web-header-analyzer-Phase-02-Engine-Refactoring.md index d5d8bec..624dcba 100644 --- a/Auto Run Docs/SpecKit-web-header-analyzer-Phase-02-Engine-Refactoring.md +++ b/Auto Run Docs/SpecKit-web-header-analyzer-Phase-02-Engine-Refactoring.md @@ -46,7 +46,7 @@ backend/app/engine/ ## Tasks - [x] T007 Write failing tests (TDD Red) in `backend/tests/engine/test_parser.py` (header parsing with sample EML), `backend/tests/engine/test_scanner_registry.py` (discovery returns 106+ scanners, filtering by ID), and `backend/tests/engine/test_analyzer.py` (full pipeline with reference fixture). Create `backend/tests/fixtures/sample_headers.txt` with representative header set extracted from the existing test infrastructure -- [ ] T008 Create `backend/app/engine/__init__.py` and `backend/app/engine/models.py` — Pydantic models for `AnalysisRequest`, `AnalysisResult`, `TestResult`, `HopChainNode`, `SecurityAppliance`. Refer to `.specify/specs/1-web-header-analyzer/data-model.md` for field definitions and severity enum values (spam→#ff5555, suspicious→#ffb86c, clean→#50fa7b, info→#bd93f9) +- [x] T008 Create `backend/app/engine/__init__.py` and `backend/app/engine/models.py` — Pydantic models for `AnalysisRequest`, `AnalysisResult`, `TestResult`, `HopChainNode`, `SecurityAppliance`. Refer to `.specify/specs/1-web-header-analyzer/data-model.md` for field definitions and severity enum values (spam→#ff5555, suspicious→#ffb86c, clean→#50fa7b, info→#bd93f9) - [ ] T009 Create `backend/app/engine/logger.py` — extract Logger class from `decode-spam-headers.py` (lines 209–419), adapt to use Python `logging` module instead of direct stdout - [ ] T010 Create `backend/app/engine/parser.py` — extract header parsing from `SMTPHeadersAnalysis.collect()` and `getHeader()` (lines ~2137–2270). Expose `HeaderParser.parse(raw_text: str) -> list[ParsedHeader]` including MIME boundary and line-break handling. Verify `test_parser.py` passes (TDD Green) - [ ] T011 Create `backend/app/engine/scanner_base.py` — abstract `BaseScanner` (Protocol or ABC) with interface: `id: int`, `name: str`, `run(headers: list[ParsedHeader]) -> TestResult | None` diff --git a/backend/app/engine/__init__.py b/backend/app/engine/__init__.py new file mode 100644 index 0000000..f391df2 --- /dev/null +++ b/backend/app/engine/__init__.py @@ -0,0 +1,25 @@ +from .models import ( + AnalysisConfig, + AnalysisRequest, + AnalysisResult, + HopChainNode, + ReportMetadata, + SecurityAppliance, + Severity, + Test, + TestResult, + TestStatus, +) + +__all__ = [ + "AnalysisConfig", + "AnalysisRequest", + "AnalysisResult", + "HopChainNode", + "ReportMetadata", + "SecurityAppliance", + "Severity", + "Test", + "TestResult", + "TestStatus", +] diff --git a/backend/app/engine/models.py b/backend/app/engine/models.py new file mode 100644 index 0000000..97ba8d0 --- /dev/null +++ b/backend/app/engine/models.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +from datetime import datetime +from enum import StrEnum + +from pydantic import BaseModel, ConfigDict, Field + + +class Severity(StrEnum): + spam = "spam" + suspicious = "suspicious" + clean = "clean" + info = "info" + + +class TestStatus(StrEnum): + success = "success" + error = "error" + skipped = "skipped" + + +SEVERITY_COLORS: dict[Severity, str] = { + Severity.spam: "#ff5555", + Severity.suspicious: "#ffb86c", + Severity.clean: "#50fa7b", + Severity.info: "#bd93f9", +} + + +class AnalysisConfig(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + test_ids: list[int] = Field( + default_factory=list, + alias="testIds", + description="Subset of test IDs to run. Empty means run all tests.", + ) + resolve: bool = Field( + default=False, + description="Enable DNS resolution for supported checks.", + ) + decode_all: bool = Field( + default=False, + alias="decodeAll", + description="Decode opaque encoded values where possible.", + ) + + +class AnalysisRequest(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + headers: str = Field( + min_length=1, + max_length=1_048_576, + description="Raw SMTP/IMAP header text supplied by the user.", + ) + config: AnalysisConfig = Field(default_factory=AnalysisConfig) + + +class Test(BaseModel): + id: int = Field(ge=1, description="Unique test identifier.") + name: str = Field(min_length=1, description="Human-readable test name.") + category: str = Field(min_length=1, description="Vendor/group category.") + + +class TestResult(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + test_id: int = Field(alias="testId") + test_name: str = Field(alias="testName") + header_name: str = Field(alias="headerName") + header_value: str = Field(alias="headerValue") + analysis: str + description: str + severity: Severity + status: TestStatus + error: str | None = None + + +class HopChainNode(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + index: int + hostname: str + ip: str | None = None + timestamp: datetime | None = None + server_info: str | None = Field(default=None, alias="serverInfo") + delay: float | None = None + + +class SecurityAppliance(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + name: str + vendor: str + headers: list[str] + + +class ReportMetadata(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + total_tests: int = Field(default=0, alias="totalTests") + passed_tests: int = Field(default=0, alias="passedTests") + failed_tests: int = Field(default=0, alias="failedTests") + skipped_tests: int = Field(default=0, alias="skippedTests") + elapsed_ms: float = Field(default=0.0, alias="elapsedMs") + timed_out: bool = Field(default=False, alias="timedOut") + incomplete_tests: list[str] = Field(default_factory=list, alias="incompleteTests") + + +class AnalysisResult(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + results: list[TestResult] = Field(default_factory=list) + hop_chain: list[HopChainNode] = Field(default_factory=list, alias="hopChain") + security_appliances: list[SecurityAppliance] = Field( + default_factory=list, alias="securityAppliances" + ) + metadata: ReportMetadata = Field(default_factory=ReportMetadata)