mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 05:23:31 +01:00
MAESTRO: verify captcha bypass token expiry
This commit is contained in:
@@ -40,7 +40,7 @@ This phase protects the analysis service from abuse with per-IP rate limiting an
|
|||||||
- [x] `pytest backend/tests/api/test_rate_limiter.py backend/tests/api/test_captcha.py backend/tests/api/test_health.py` all pass
|
- [x] `pytest backend/tests/api/test_rate_limiter.py backend/tests/api/test_captcha.py backend/tests/api/test_health.py` all pass
|
||||||
- [x] All vitest tests pass: `npx vitest run src/__tests__/CaptchaChallenge.test.tsx`
|
- [x] All vitest tests pass: `npx vitest run src/__tests__/CaptchaChallenge.test.tsx`
|
||||||
- [x] Exceeding rate limit returns HTTP 429 with Retry-After header and CAPTCHA challenge
|
- [x] Exceeding rate limit returns HTTP 429 with Retry-After header and CAPTCHA challenge
|
||||||
- [ ] Solving CAPTCHA returns HMAC-signed bypass token (5-minute expiry)
|
- [x] Solving CAPTCHA returns HMAC-signed bypass token (5-minute expiry)
|
||||||
- [ ] Bypass token exempts IP from rate limiting on subsequent requests
|
- [ ] Bypass token exempts IP from rate limiting on subsequent requests
|
||||||
- [ ] `GET /api/health` returns `{status, version, uptime, scannerCount}`
|
- [ ] `GET /api/health` returns `{status, version, uptime, scannerCount}`
|
||||||
- [x] All routers and CORS middleware are registered in `main.py`
|
- [x] All routers and CORS middleware are registered in `main.py`
|
||||||
|
|||||||
@@ -1,12 +1,15 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import ASGITransport, AsyncClient
|
from httpx import ASGITransport, AsyncClient
|
||||||
|
|
||||||
|
from app.core.config import get_settings
|
||||||
from app.main import app
|
from app.main import app
|
||||||
from app.security.captcha import create_captcha_challenge
|
from app.security.captcha import create_captcha_challenge, verify_bypass_token
|
||||||
|
|
||||||
|
|
||||||
def _assert_png_base64(image_base64: str) -> None:
|
def _assert_png_base64(image_base64: str) -> None:
|
||||||
@@ -14,6 +17,13 @@ def _assert_png_base64(image_base64: str) -> None:
|
|||||||
assert decoded.startswith(b"\x89PNG\r\n\x1a\n")
|
assert decoded.startswith(b"\x89PNG\r\n\x1a\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_token_payload(token: str) -> dict:
|
||||||
|
payload_b64 = token.split(".")[0]
|
||||||
|
padding = "=" * (-len(payload_b64) % 4)
|
||||||
|
payload_raw = base64.urlsafe_b64decode(payload_b64 + padding)
|
||||||
|
return json.loads(payload_raw.decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_captcha_challenge_generation_produces_image() -> None:
|
async def test_captcha_challenge_generation_produces_image() -> None:
|
||||||
challenge = create_captcha_challenge()
|
challenge = create_captcha_challenge()
|
||||||
@@ -45,6 +55,7 @@ async def test_captcha_verify_rejects_invalid_answer() -> None:
|
|||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_captcha_verify_returns_bypass_token() -> None:
|
async def test_captcha_verify_returns_bypass_token() -> None:
|
||||||
challenge = create_captcha_challenge()
|
challenge = create_captcha_challenge()
|
||||||
|
client_ip = "203.0.113.9"
|
||||||
|
|
||||||
async with AsyncClient(
|
async with AsyncClient(
|
||||||
transport=ASGITransport(app=app),
|
transport=ASGITransport(app=app),
|
||||||
@@ -53,9 +64,20 @@ async def test_captcha_verify_returns_bypass_token() -> None:
|
|||||||
response = await client.post(
|
response = await client.post(
|
||||||
"/api/captcha/verify",
|
"/api/captcha/verify",
|
||||||
json={"challengeToken": challenge.challenge_token, "answer": challenge.answer},
|
json={"challengeToken": challenge.challenge_token, "answer": challenge.answer},
|
||||||
|
headers={"x-forwarded-for": client_ip},
|
||||||
)
|
)
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
payload = response.json()
|
payload = response.json()
|
||||||
assert payload["success"] is True
|
assert payload["success"] is True
|
||||||
assert payload["bypassToken"]
|
token = payload["bypassToken"]
|
||||||
|
assert token
|
||||||
|
assert verify_bypass_token(token, client_ip)
|
||||||
|
|
||||||
|
settings = get_settings()
|
||||||
|
decoded_payload = _decode_token_payload(token)
|
||||||
|
assert decoded_payload["ip"] == client_ip
|
||||||
|
assert isinstance(decoded_payload.get("exp"), int)
|
||||||
|
now = int(time.time())
|
||||||
|
ttl_seconds = settings.captcha_bypass_ttl_seconds
|
||||||
|
assert ttl_seconds - 10 <= decoded_payload["exp"] - now <= ttl_seconds + 10
|
||||||
|
|||||||
Reference in New Issue
Block a user