mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 05:23:31 +01:00
MAESTRO: add engine models
This commit is contained in:
@@ -46,7 +46,7 @@ backend/app/engine/
|
||||
## Tasks
|
||||
|
||||
- [x] T007 Write failing tests (TDD Red) in `backend/tests/engine/test_parser.py` (header parsing with sample EML), `backend/tests/engine/test_scanner_registry.py` (discovery returns 106+ scanners, filtering by ID), and `backend/tests/engine/test_analyzer.py` (full pipeline with reference fixture). Create `backend/tests/fixtures/sample_headers.txt` with representative header set extracted from the existing test infrastructure
|
||||
- [ ] T008 Create `backend/app/engine/__init__.py` and `backend/app/engine/models.py` — Pydantic models for `AnalysisRequest`, `AnalysisResult`, `TestResult`, `HopChainNode`, `SecurityAppliance`. Refer to `.specify/specs/1-web-header-analyzer/data-model.md` for field definitions and severity enum values (spam→#ff5555, suspicious→#ffb86c, clean→#50fa7b, info→#bd93f9)
|
||||
- [x] T008 Create `backend/app/engine/__init__.py` and `backend/app/engine/models.py` — Pydantic models for `AnalysisRequest`, `AnalysisResult`, `TestResult`, `HopChainNode`, `SecurityAppliance`. Refer to `.specify/specs/1-web-header-analyzer/data-model.md` for field definitions and severity enum values (spam→#ff5555, suspicious→#ffb86c, clean→#50fa7b, info→#bd93f9)
|
||||
- [ ] T009 Create `backend/app/engine/logger.py` — extract Logger class from `decode-spam-headers.py` (lines 209–419), adapt to use Python `logging` module instead of direct stdout
|
||||
- [ ] T010 Create `backend/app/engine/parser.py` — extract header parsing from `SMTPHeadersAnalysis.collect()` and `getHeader()` (lines ~2137–2270). Expose `HeaderParser.parse(raw_text: str) -> list[ParsedHeader]` including MIME boundary and line-break handling. Verify `test_parser.py` passes (TDD Green)
|
||||
- [ ] T011 Create `backend/app/engine/scanner_base.py` — abstract `BaseScanner` (Protocol or ABC) with interface: `id: int`, `name: str`, `run(headers: list[ParsedHeader]) -> TestResult | None`
|
||||
|
||||
25
backend/app/engine/__init__.py
Normal file
25
backend/app/engine/__init__.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from .models import (
|
||||
AnalysisConfig,
|
||||
AnalysisRequest,
|
||||
AnalysisResult,
|
||||
HopChainNode,
|
||||
ReportMetadata,
|
||||
SecurityAppliance,
|
||||
Severity,
|
||||
Test,
|
||||
TestResult,
|
||||
TestStatus,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"AnalysisConfig",
|
||||
"AnalysisRequest",
|
||||
"AnalysisResult",
|
||||
"HopChainNode",
|
||||
"ReportMetadata",
|
||||
"SecurityAppliance",
|
||||
"Severity",
|
||||
"Test",
|
||||
"TestResult",
|
||||
"TestStatus",
|
||||
]
|
||||
119
backend/app/engine/models.py
Normal file
119
backend/app/engine/models.py
Normal file
@@ -0,0 +1,119 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class Severity(StrEnum):
|
||||
spam = "spam"
|
||||
suspicious = "suspicious"
|
||||
clean = "clean"
|
||||
info = "info"
|
||||
|
||||
|
||||
class TestStatus(StrEnum):
|
||||
success = "success"
|
||||
error = "error"
|
||||
skipped = "skipped"
|
||||
|
||||
|
||||
SEVERITY_COLORS: dict[Severity, str] = {
|
||||
Severity.spam: "#ff5555",
|
||||
Severity.suspicious: "#ffb86c",
|
||||
Severity.clean: "#50fa7b",
|
||||
Severity.info: "#bd93f9",
|
||||
}
|
||||
|
||||
|
||||
class AnalysisConfig(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
test_ids: list[int] = Field(
|
||||
default_factory=list,
|
||||
alias="testIds",
|
||||
description="Subset of test IDs to run. Empty means run all tests.",
|
||||
)
|
||||
resolve: bool = Field(
|
||||
default=False,
|
||||
description="Enable DNS resolution for supported checks.",
|
||||
)
|
||||
decode_all: bool = Field(
|
||||
default=False,
|
||||
alias="decodeAll",
|
||||
description="Decode opaque encoded values where possible.",
|
||||
)
|
||||
|
||||
|
||||
class AnalysisRequest(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
headers: str = Field(
|
||||
min_length=1,
|
||||
max_length=1_048_576,
|
||||
description="Raw SMTP/IMAP header text supplied by the user.",
|
||||
)
|
||||
config: AnalysisConfig = Field(default_factory=AnalysisConfig)
|
||||
|
||||
|
||||
class Test(BaseModel):
|
||||
id: int = Field(ge=1, description="Unique test identifier.")
|
||||
name: str = Field(min_length=1, description="Human-readable test name.")
|
||||
category: str = Field(min_length=1, description="Vendor/group category.")
|
||||
|
||||
|
||||
class TestResult(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
test_id: int = Field(alias="testId")
|
||||
test_name: str = Field(alias="testName")
|
||||
header_name: str = Field(alias="headerName")
|
||||
header_value: str = Field(alias="headerValue")
|
||||
analysis: str
|
||||
description: str
|
||||
severity: Severity
|
||||
status: TestStatus
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class HopChainNode(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
index: int
|
||||
hostname: str
|
||||
ip: str | None = None
|
||||
timestamp: datetime | None = None
|
||||
server_info: str | None = Field(default=None, alias="serverInfo")
|
||||
delay: float | None = None
|
||||
|
||||
|
||||
class SecurityAppliance(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
name: str
|
||||
vendor: str
|
||||
headers: list[str]
|
||||
|
||||
|
||||
class ReportMetadata(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
total_tests: int = Field(default=0, alias="totalTests")
|
||||
passed_tests: int = Field(default=0, alias="passedTests")
|
||||
failed_tests: int = Field(default=0, alias="failedTests")
|
||||
skipped_tests: int = Field(default=0, alias="skippedTests")
|
||||
elapsed_ms: float = Field(default=0.0, alias="elapsedMs")
|
||||
timed_out: bool = Field(default=False, alias="timedOut")
|
||||
incomplete_tests: list[str] = Field(default_factory=list, alias="incompleteTests")
|
||||
|
||||
|
||||
class AnalysisResult(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
results: list[TestResult] = Field(default_factory=list)
|
||||
hop_chain: list[HopChainNode] = Field(default_factory=list, alias="hopChain")
|
||||
security_appliances: list[SecurityAppliance] = Field(
|
||||
default_factory=list, alias="securityAppliances"
|
||||
)
|
||||
metadata: ReportMetadata = Field(default_factory=ReportMetadata)
|
||||
Reference in New Issue
Block a user