mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 13:33:30 +01:00
MAESTRO: verify bypass token rate limit exemption
This commit is contained in:
@@ -41,7 +41,7 @@ This phase protects the analysis service from abuse with per-IP rate limiting an
|
|||||||
- [x] All vitest tests pass: `npx vitest run src/__tests__/CaptchaChallenge.test.tsx`
|
- [x] All vitest tests pass: `npx vitest run src/__tests__/CaptchaChallenge.test.tsx`
|
||||||
- [x] Exceeding rate limit returns HTTP 429 with Retry-After header and CAPTCHA challenge
|
- [x] Exceeding rate limit returns HTTP 429 with Retry-After header and CAPTCHA challenge
|
||||||
- [x] Solving CAPTCHA returns HMAC-signed bypass token (5-minute expiry)
|
- [x] Solving CAPTCHA returns HMAC-signed bypass token (5-minute expiry)
|
||||||
- [ ] Bypass token exempts IP from rate limiting on subsequent requests
|
- [x] Bypass token exempts IP from rate limiting on subsequent requests
|
||||||
- [ ] `GET /api/health` returns `{status, version, uptime, scannerCount}`
|
- [ ] `GET /api/health` returns `{status, version, uptime, scannerCount}`
|
||||||
- [x] All routers and CORS middleware are registered in `main.py`
|
- [x] All routers and CORS middleware are registered in `main.py`
|
||||||
- [ ] Application starts statelessly — no database, no session management
|
- [ ] Application starts statelessly — no database, no session management
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import pytest
|
|||||||
from httpx import ASGITransport, AsyncClient
|
from httpx import ASGITransport, AsyncClient
|
||||||
|
|
||||||
from app.core import config as config_module
|
from app.core import config as config_module
|
||||||
|
from app.security.captcha import BYPASS_TOKEN_HEADER, issue_bypass_token
|
||||||
|
|
||||||
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
|
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
|
||||||
|
|
||||||
@@ -59,3 +60,46 @@ async def test_rate_limiter_returns_captcha_challenge(
|
|||||||
challenge = payload["captchaChallenge"]
|
challenge = payload["captchaChallenge"]
|
||||||
assert challenge["challengeToken"]
|
assert challenge["challengeToken"]
|
||||||
assert challenge["imageBase64"]
|
assert challenge["imageBase64"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_rate_limiter_allows_bypass_token(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
app = _load_app(monkeypatch, limit=1, window_seconds=60)
|
||||||
|
raw_headers = (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
|
||||||
|
request_payload = {
|
||||||
|
"headers": raw_headers,
|
||||||
|
"config": {"testIds": [], "resolve": False, "decodeAll": False},
|
||||||
|
}
|
||||||
|
client_ip = "203.0.113.5"
|
||||||
|
bypass_token = issue_bypass_token(client_ip)
|
||||||
|
|
||||||
|
async with AsyncClient(
|
||||||
|
transport=ASGITransport(app=app),
|
||||||
|
base_url="http://test",
|
||||||
|
) as client:
|
||||||
|
response = await client.post(
|
||||||
|
"/api/analyse",
|
||||||
|
json=request_payload,
|
||||||
|
headers={"x-forwarded-for": client_ip},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
response = await client.post(
|
||||||
|
"/api/analyse",
|
||||||
|
json=request_payload,
|
||||||
|
headers={"x-forwarded-for": client_ip},
|
||||||
|
)
|
||||||
|
assert response.status_code == 429
|
||||||
|
|
||||||
|
response = await client.post(
|
||||||
|
"/api/analyse",
|
||||||
|
json=request_payload,
|
||||||
|
headers={
|
||||||
|
"x-forwarded-for": client_ip,
|
||||||
|
BYPASS_TOKEN_HEADER: bypass_token,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
|||||||
Reference in New Issue
Block a user