From 98f2f8a656dd75bd22d356a3a9699c630fc819f4 Mon Sep 17 00:00:00 2001 From: Mariusz Banach Date: Wed, 18 Feb 2026 04:12:02 +0100 Subject: [PATCH] MAESTRO: add captcha verification flow --- ...r-analyzer-Phase-08-Security-Operations.md | 2 +- backend/app/core/config.py | 15 ++ backend/app/main.py | 2 + backend/app/middleware/rate_limiter.py | 25 +- backend/app/routers/captcha.py | 29 ++ backend/app/schemas/captcha.py | 31 +++ backend/app/security/__init__.py | 1 + backend/app/security/captcha.py | 253 ++++++++++++++++++ 8 files changed, 352 insertions(+), 6 deletions(-) create mode 100644 backend/app/routers/captcha.py create mode 100644 backend/app/schemas/captcha.py create mode 100644 backend/app/security/__init__.py create mode 100644 backend/app/security/captcha.py diff --git a/Auto Run Docs/SpecKit-web-header-analyzer-Phase-08-Security-Operations.md b/Auto Run Docs/SpecKit-web-header-analyzer-Phase-08-Security-Operations.md index 8092893..7c26be9 100644 --- a/Auto Run Docs/SpecKit-web-header-analyzer-Phase-08-Security-Operations.md +++ b/Auto Run Docs/SpecKit-web-header-analyzer-Phase-08-Security-Operations.md @@ -30,7 +30,7 @@ This phase protects the analysis service from abuse with per-IP rate limiting an - [x] T040 [US6] Write failing tests (TDD Red) in `backend/tests/api/test_rate_limiter.py` (rate limiting triggers at threshold, 429 response with CAPTCHA challenge), `backend/tests/api/test_captcha.py` (challenge generation, verification, bypass token), `backend/tests/api/test_health.py` (health endpoint returns correct status), and `frontend/src/__tests__/CaptchaChallenge.test.tsx` (render modal, display CAPTCHA image, submit answer, handle success/failure states, keyboard accessibility) - [x] T041 [US6] Create `backend/app/middleware/rate_limiter.py` — per-IP sliding window rate limiter (async-safe in-memory). Configurable limit via `config.py`. Returns 429 with Retry-After header and CAPTCHA challenge token (NFR-11, NFR-12). Note: per-instance counters — acceptable for initial release; shared store upgradeable later. Verify `test_rate_limiter.py` passes (TDD Green) -- [ ] T042 [US6] Create `backend/app/routers/captcha.py` — `POST /api/captcha/verify` endpoint. Server-generated visual noise CAPTCHA (randomly distorted text). Returns HMAC-signed bypass token (5-minute expiry) on success. Token exempts IP from rate limiting. Response schema in `backend/app/schemas/captcha.py`. Verify `test_captcha.py` passes (TDD Green) +- [x] T042 [US6] Create `backend/app/routers/captcha.py` — `POST /api/captcha/verify` endpoint. Server-generated visual noise CAPTCHA (randomly distorted text). Returns HMAC-signed bypass token (5-minute expiry) on success. Token exempts IP from rate limiting. Response schema in `backend/app/schemas/captcha.py`. Verify `test_captcha.py` passes (TDD Green) - [ ] T043 [P] [US6] Create `frontend/src/components/CaptchaChallenge.tsx` — modal on 429 response. Displays CAPTCHA image, on verification stores bypass token and retries original request. FontAwesome lock/unlock icons. Keyboard accessible (NFR-02). Verify `CaptchaChallenge.test.tsx` passes (TDD Green) - [ ] T044 [US6] Create `backend/app/schemas/health.py` and `backend/app/routers/health.py` — `GET /api/health` returning status (up/degraded/down), version, uptime, scanner count (NFR-15). Verify `test_health.py` passes (TDD Green) - [ ] T045 [US6] Register all routers and middleware in `backend/app/main.py` — CORS middleware (frontend origin), rate limiter, routers (analysis, tests, health, captcha). Verify stateless operation (NFR-16). Note: rate limiter per-instance state is accepted trade-off (see T041) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 0999aac..958978d 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import secrets from functools import lru_cache from typing import Any @@ -30,6 +31,20 @@ class Settings(BaseSettings): ge=1, description="Rate limit window in seconds.", ) + captcha_secret: str = Field( + default_factory=lambda: secrets.token_urlsafe(32), + description="Secret used to sign CAPTCHA bypass tokens.", + ) + captcha_challenge_ttl_seconds: int = Field( + default=300, + ge=60, + description="Seconds until CAPTCHA challenges expire.", + ) + captcha_bypass_ttl_seconds: int = Field( + default=300, + ge=60, + description="Seconds until CAPTCHA bypass tokens expire.", + ) analysis_timeout_seconds: int = Field( default=30, ge=1, diff --git a/backend/app/main.py b/backend/app/main.py index ab01a71..85b73be 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -3,6 +3,7 @@ from fastapi import FastAPI from app.core.config import get_settings from app.middleware.rate_limiter import RateLimiterMiddleware, SlidingWindowRateLimiter from app.routers.analysis import router as analysis_router +from app.routers.captcha import router as captcha_router from app.routers.tests import router as tests_router app = FastAPI(title="Web Header Analyzer API") @@ -14,6 +15,7 @@ app.add_middleware( RateLimiterMiddleware, limiter=rate_limiter, protected_paths={"/api/analyse"} ) app.include_router(analysis_router) +app.include_router(captcha_router) app.include_router(tests_router) diff --git a/backend/app/middleware/rate_limiter.py b/backend/app/middleware/rate_limiter.py index 0dc6222..71d6026 100644 --- a/backend/app/middleware/rate_limiter.py +++ b/backend/app/middleware/rate_limiter.py @@ -2,7 +2,6 @@ from __future__ import annotations import asyncio import math -import secrets import time from collections import deque from dataclasses import dataclass @@ -11,8 +10,10 @@ from typing import Deque from fastapi import status from fastapi.responses import JSONResponse -CAPTCHA_PLACEHOLDER_IMAGE_BASE64 = ( - "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO6sZcQAAAAASUVORK5CYII=" +from app.security.captcha import ( + BYPASS_TOKEN_HEADER, + create_captcha_challenge, + verify_bypass_token, ) @@ -83,6 +84,10 @@ class RateLimiterMiddleware: return client_ip = _get_client_ip(scope) + bypass_token = _get_bypass_token(scope) + if bypass_token and verify_bypass_token(bypass_token, client_ip): + await self.app(scope, receive, send) + return allowed, retry_after = await self.limiter.check(client_ip) if allowed: await self.app(scope, receive, send) @@ -105,9 +110,10 @@ class RateLimiterMiddleware: def _create_captcha_challenge() -> CaptchaChallengePayload: + challenge = create_captcha_challenge() return CaptchaChallengePayload( - challenge_token=secrets.token_urlsafe(16), - image_base64=CAPTCHA_PLACEHOLDER_IMAGE_BASE64, + challenge_token=challenge.challenge_token, + image_base64=challenge.image_base64, ) @@ -125,3 +131,12 @@ def _get_client_ip(scope) -> str: if client and client[0]: return str(client[0]) return "unknown" + + +def _get_bypass_token(scope) -> str | None: + headers = scope.get("headers") or [] + token_header = BYPASS_TOKEN_HEADER.encode("utf-8") + for key, value in headers: + if key.lower() == token_header: + return value.decode("utf-8", errors="ignore") + return None diff --git a/backend/app/routers/captcha.py b/backend/app/routers/captcha.py new file mode 100644 index 0000000..bf2d4d6 --- /dev/null +++ b/backend/app/routers/captcha.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from fastapi import APIRouter, HTTPException, Request + +from app.schemas.captcha import CaptchaVerifyRequest, CaptchaVerifyResponse +from app.security.captcha import issue_bypass_token, verify_captcha_answer + +router = APIRouter(prefix="/api", tags=["security"]) + + +@router.post("/captcha/verify", response_model=CaptchaVerifyResponse) +async def verify_captcha( + payload: CaptchaVerifyRequest, request: Request +) -> CaptchaVerifyResponse: + client_ip = _get_client_ip(request) + if not verify_captcha_answer(payload.challenge_token, payload.answer): + raise HTTPException(status_code=400, detail="Invalid captcha response") + + bypass_token = issue_bypass_token(client_ip) + return CaptchaVerifyResponse(success=True, bypass_token=bypass_token) + + +def _get_client_ip(request: Request) -> str: + forwarded_for = request.headers.get("x-forwarded-for") + if forwarded_for: + return forwarded_for.split(",")[0].strip() or "unknown" + if request.client and request.client.host: + return request.client.host + return "unknown" diff --git a/backend/app/schemas/captcha.py b/backend/app/schemas/captcha.py new file mode 100644 index 0000000..fb8b3dc --- /dev/null +++ b/backend/app/schemas/captcha.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + + +class CaptchaChallenge(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + challenge_token: str = Field(alias="challengeToken") + image_base64: str = Field(alias="imageBase64") + + +class CaptchaVerifyRequest(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + challenge_token: str = Field(alias="challengeToken") + answer: str + + +class CaptchaVerifyResponse(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + success: bool + bypass_token: str | None = Field(default=None, alias="bypassToken") + + +__all__ = [ + "CaptchaChallenge", + "CaptchaVerifyRequest", + "CaptchaVerifyResponse", +] diff --git a/backend/app/security/__init__.py b/backend/app/security/__init__.py new file mode 100644 index 0000000..25b6ea1 --- /dev/null +++ b/backend/app/security/__init__.py @@ -0,0 +1 @@ +"""Security utilities for the API.""" diff --git a/backend/app/security/captcha.py b/backend/app/security/captcha.py new file mode 100644 index 0000000..64e189d --- /dev/null +++ b/backend/app/security/captcha.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import math +import random +import secrets +import struct +import threading +import time +import zlib +from dataclasses import dataclass + +from app.core.config import get_settings + +BYPASS_TOKEN_HEADER = "x-captcha-bypass-token" + +DIGIT_FONT: dict[str, list[str]] = { + "0": ["111", "101", "101", "101", "111"], + "1": ["010", "110", "010", "010", "111"], + "2": ["111", "001", "111", "100", "111"], + "3": ["111", "001", "111", "001", "111"], + "4": ["101", "101", "111", "001", "001"], + "5": ["111", "100", "111", "001", "111"], + "6": ["111", "100", "111", "101", "111"], + "7": ["111", "001", "010", "100", "100"], + "8": ["111", "101", "111", "101", "111"], + "9": ["111", "101", "111", "001", "111"], +} + + +@dataclass(frozen=True) +class CaptchaChallenge: + challenge_token: str + image_base64: str + answer: str + + +@dataclass +class _ChallengeRecord: + answer: str + expires_at: float + + +_CHALLENGE_LOCK = threading.Lock() +_CHALLENGES: dict[str, _ChallengeRecord] = {} + + +def create_captcha_challenge() -> CaptchaChallenge: + settings = get_settings() + answer = _generate_answer() + image_base64 = _render_captcha_image(answer) + challenge_token = secrets.token_urlsafe(16) + expires_at = time.time() + settings.captcha_challenge_ttl_seconds + + with _CHALLENGE_LOCK: + _prune_challenges_locked() + _CHALLENGES[challenge_token] = _ChallengeRecord( + answer=answer, expires_at=expires_at + ) + + return CaptchaChallenge( + challenge_token=challenge_token, + image_base64=image_base64, + answer=answer, + ) + + +def verify_captcha_answer(challenge_token: str, answer: str) -> bool: + if not challenge_token or not answer: + return False + + now = time.time() + normalized = answer.strip().upper() + + with _CHALLENGE_LOCK: + record = _CHALLENGES.get(challenge_token) + if record is None: + return False + if record.expires_at <= now: + _CHALLENGES.pop(challenge_token, None) + return False + if record.answer != normalized: + return False + _CHALLENGES.pop(challenge_token, None) + return True + + +def issue_bypass_token(client_ip: str) -> str: + settings = get_settings() + expires_at = int(time.time() + settings.captcha_bypass_ttl_seconds) + payload = { + "ip": client_ip, + "exp": expires_at, + "nonce": secrets.token_urlsafe(8), + } + payload_json = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode( + "utf-8" + ) + payload_b64 = _b64encode(payload_json) + signature = hmac.new( + settings.captcha_secret.encode("utf-8"), + payload_b64.encode("utf-8"), + hashlib.sha256, + ).digest() + signature_b64 = _b64encode(signature) + return f"{payload_b64}.{signature_b64}" + + +def verify_bypass_token(token: str, client_ip: str) -> bool: + if not token: + return False + parts = token.split(".") + if len(parts) != 2: + return False + payload_b64, signature_b64 = parts + settings = get_settings() + expected_signature = hmac.new( + settings.captcha_secret.encode("utf-8"), + payload_b64.encode("utf-8"), + hashlib.sha256, + ).digest() + if not hmac.compare_digest(_b64encode(expected_signature), signature_b64): + return False + try: + payload_raw = _b64decode(payload_b64) + payload = json.loads(payload_raw.decode("utf-8")) + except (ValueError, json.JSONDecodeError): + return False + if payload.get("ip") != client_ip: + return False + expires_at = payload.get("exp") + if not isinstance(expires_at, int): + return False + if expires_at < int(time.time()): + return False + return True + + +def _generate_answer(length: int = 5) -> str: + digits = "23456789" + return "".join(secrets.choice(digits) for _ in range(length)) + + +def _render_captcha_image(text: str) -> str: + scale = 4 + padding = 6 + spacing = 4 + width = padding * 2 + len(text) * 3 * scale + (len(text) - 1) * spacing + height = padding * 2 + 5 * scale + + pixels = bytearray(width * height * 4) + for y in range(height): + for x in range(width): + noise = random.randint(0, 30) + value = 255 - noise + idx = (y * width + x) * 4 + pixels[idx : idx + 4] = bytes((value, value, value, 255)) + + x_cursor = padding + for ch in text: + pattern = DIGIT_FONT.get(ch) + if pattern is None: + x_cursor += 3 * scale + spacing + continue + x_offset = x_cursor + random.randint(-1, 1) + y_offset = padding + random.randint(-2, 2) + for row, line in enumerate(pattern): + for col, bit in enumerate(line): + if bit != "1": + continue + for sy in range(scale): + for sx in range(scale): + px = x_offset + col * scale + sx + py = y_offset + row * scale + sy + if 0 <= px < width and 0 <= py < height: + idx = (py * width + px) * 4 + pixels[idx : idx + 4] = bytes((30, 30, 30, 255)) + x_cursor += 3 * scale + spacing + + pixels = _warp_pixels(pixels, width, height) + + for _ in range(int(width * height * 0.02)): + px = random.randint(0, width - 1) + py = random.randint(0, height - 1) + idx = (py * width + px) * 4 + pixels[idx : idx + 4] = bytes((80, 80, 80, 255)) + + png_bytes = _encode_png(width, height, bytes(pixels)) + return base64.b64encode(png_bytes).decode("ascii") + + +def _warp_pixels(pixels: bytearray, width: int, height: int) -> bytearray: + amplitude = 2.0 + frequency = 3.0 + phase = random.random() * math.tau + warped = bytearray(len(pixels)) + for y in range(height): + shift = int(round(amplitude * math.sin(y / frequency + phase))) + for x in range(width): + src_x = x - shift + dst_idx = (y * width + x) * 4 + if src_x < 0 or src_x >= width: + warped[dst_idx : dst_idx + 4] = b"\xff\xff\xff\xff" + else: + src_idx = (y * width + src_x) * 4 + warped[dst_idx : dst_idx + 4] = pixels[src_idx : src_idx + 4] + return warped + + +def _encode_png(width: int, height: int, rgba: bytes) -> bytes: + row_bytes = width * 4 + raw = bytearray() + for y in range(height): + raw.append(0) + start = y * row_bytes + raw.extend(rgba[start : start + row_bytes]) + compressed = zlib.compress(bytes(raw), level=6) + ihdr = struct.pack(">IIBBBBB", width, height, 8, 6, 0, 0, 0) + return ( + b"\x89PNG\r\n\x1a\n" + + _png_chunk(b"IHDR", ihdr) + + _png_chunk(b"IDAT", compressed) + + _png_chunk(b"IEND", b"") + ) + + +def _png_chunk(chunk_type: bytes, data: bytes) -> bytes: + length = struct.pack(">I", len(data)) + crc = zlib.crc32(chunk_type + data) & 0xFFFFFFFF + return length + chunk_type + data + struct.pack(">I", crc) + + +def _b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") + + +def _b64decode(data: str) -> bytes: + padding = "=" * (-len(data) % 4) + return base64.urlsafe_b64decode(data + padding) + + +def _prune_challenges_locked() -> None: + now = time.time() + expired = [ + token + for token, record in _CHALLENGES.items() + if record.expires_at <= now + ] + for token in expired: + _CHALLENGES.pop(token, None)