mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 13:33:30 +01:00
MAESTRO: add header parser module
This commit is contained in:
@@ -48,7 +48,7 @@ backend/app/engine/
|
||||
- [x] T007 Write failing tests (TDD Red) in `backend/tests/engine/test_parser.py` (header parsing with sample EML), `backend/tests/engine/test_scanner_registry.py` (discovery returns 106+ scanners, filtering by ID), and `backend/tests/engine/test_analyzer.py` (full pipeline with reference fixture). Create `backend/tests/fixtures/sample_headers.txt` with representative header set extracted from the existing test infrastructure
|
||||
- [x] T008 Create `backend/app/engine/__init__.py` and `backend/app/engine/models.py` — Pydantic models for `AnalysisRequest`, `AnalysisResult`, `TestResult`, `HopChainNode`, `SecurityAppliance`. Refer to `.specify/specs/1-web-header-analyzer/data-model.md` for field definitions and severity enum values (spam→#ff5555, suspicious→#ffb86c, clean→#50fa7b, info→#bd93f9)
|
||||
- [x] T009 Create `backend/app/engine/logger.py` — extract Logger class from `decode-spam-headers.py` (lines 209–419), adapt to use Python `logging` module instead of direct stdout
|
||||
- [ ] T010 Create `backend/app/engine/parser.py` — extract header parsing from `SMTPHeadersAnalysis.collect()` and `getHeader()` (lines ~2137–2270). Expose `HeaderParser.parse(raw_text: str) -> list[ParsedHeader]` including MIME boundary and line-break handling. Verify `test_parser.py` passes (TDD Green)
|
||||
- [x] T010 Create `backend/app/engine/parser.py` — extract header parsing from `SMTPHeadersAnalysis.collect()` and `getHeader()` (lines ~2137–2270). Expose `HeaderParser.parse(raw_text: str) -> list[ParsedHeader]` including MIME boundary and line-break handling. Verify `test_parser.py` passes (TDD Green)
|
||||
- [ ] T011 Create `backend/app/engine/scanner_base.py` — abstract `BaseScanner` (Protocol or ABC) with interface: `id: int`, `name: str`, `run(headers: list[ParsedHeader]) -> TestResult | None`
|
||||
- [ ] T012 Create `backend/app/engine/scanner_registry.py` — `ScannerRegistry` with auto-discovery: `get_all()`, `get_by_ids(ids)`, `list_tests()`. Verify `test_scanner_registry.py` passes (TDD Green)
|
||||
- [ ] T013 [P] Create scanner modules by extracting test methods from `SMTPHeadersAnalysis` into `backend/app/engine/scanners/`. Each file implements `BaseScanner`:
|
||||
|
||||
116
backend/app/engine/parser.py
Normal file
116
backend/app/engine/parser.py
Normal file
@@ -0,0 +1,116 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import re
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParsedHeader:
|
||||
index: int
|
||||
name: str
|
||||
value: str
|
||||
|
||||
|
||||
class HeaderParser:
|
||||
_headers_known_for_breaking_line: set[str] = {
|
||||
"Received",
|
||||
"Authentication-Results",
|
||||
"Received-SPF",
|
||||
"DKIM-Signature",
|
||||
"X-Google-DKIM-Signature",
|
||||
"X-GM-Message-State",
|
||||
"Subject",
|
||||
"X-MS-Exchange-Organization-ExpirationStartTime",
|
||||
"X-MS-Exchange-Organization-Network-Message-Id",
|
||||
"X-Forefront-Antispam-Report",
|
||||
"X-MS-Exchange-CrossTenant-OriginalArrivalTime",
|
||||
"X-Microsoft-Antispam-Mailbox-Delivery",
|
||||
"X-Microsoft-Antispam-Message-Info",
|
||||
}
|
||||
|
||||
def parse(self, raw_text: str) -> list[ParsedHeader]:
|
||||
num = 0
|
||||
lines = raw_text.splitlines()
|
||||
boundary = ""
|
||||
in_boundary = False
|
||||
headers: list[ParsedHeader] = []
|
||||
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i].rstrip("\r")
|
||||
|
||||
if boundary and f"--{boundary}" == line.strip():
|
||||
in_boundary = True
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if in_boundary and f"--{boundary}--" == line.strip():
|
||||
in_boundary = False
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if in_boundary:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if line.startswith(" ") or line.startswith("\t"):
|
||||
if headers:
|
||||
headers[-1].value += "\n" + line
|
||||
i += 1
|
||||
continue
|
||||
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
match = re.match(r"^([^:]+)\s*:\s+(.+)\s*$", stripped, re.S)
|
||||
|
||||
if match:
|
||||
headers.append(ParsedHeader(num, match.group(1), match.group(2)))
|
||||
num += 1
|
||||
else:
|
||||
match = re.match(r"^([^:]+)\s*:\s*$", stripped, re.S)
|
||||
|
||||
if match:
|
||||
header_name = match.group(1)
|
||||
consider_next = header_name in self._headers_known_for_breaking_line
|
||||
j = 1
|
||||
value_lines: list[str] = []
|
||||
|
||||
if i + 1 < len(lines) and (
|
||||
lines[i + 1].startswith(" ")
|
||||
or lines[i + 1].startswith("\t")
|
||||
or consider_next
|
||||
):
|
||||
while i + j < len(lines):
|
||||
current_line = lines[i + j].rstrip("\r")
|
||||
|
||||
if (
|
||||
current_line.startswith(" ")
|
||||
or current_line.startswith("\t")
|
||||
or consider_next
|
||||
):
|
||||
value_lines.append(current_line)
|
||||
j += 1
|
||||
consider_next = False
|
||||
else:
|
||||
break
|
||||
|
||||
value = "\n".join(value_lines).strip()
|
||||
headers.append(ParsedHeader(num, header_name, value))
|
||||
num += 1
|
||||
|
||||
if j > 1:
|
||||
i += j - 1
|
||||
|
||||
if headers and headers[-1].name.lower() == "content-type":
|
||||
boundary_match = re.search(
|
||||
r'boundary="([^"]+)"', headers[-1].value, re.I
|
||||
)
|
||||
if boundary_match:
|
||||
boundary = boundary_match.group(1)
|
||||
|
||||
i += 1
|
||||
|
||||
return headers
|
||||
Reference in New Issue
Block a user