mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 13:33:30 +01:00
106 lines
3.0 KiB
Python
106 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
import importlib
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.core import config as config_module
|
|
from app.security.captcha import BYPASS_TOKEN_HEADER, issue_bypass_token
|
|
|
|
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
|
|
|
|
|
|
def _load_app(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
*,
|
|
limit: int = 2,
|
|
window_seconds: int = 60,
|
|
):
|
|
monkeypatch.setenv("WHA_RATE_LIMIT_REQUESTS", str(limit))
|
|
monkeypatch.setenv("WHA_RATE_LIMIT_WINDOW_SECONDS", str(window_seconds))
|
|
config_module.get_settings.cache_clear()
|
|
|
|
import app.main as main
|
|
|
|
importlib.reload(main)
|
|
return main.app
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_rate_limiter_returns_captcha_challenge(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
app = _load_app(monkeypatch)
|
|
raw_headers = (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
|
|
request_payload = {
|
|
"headers": raw_headers,
|
|
"config": {"testIds": [], "resolve": False, "decodeAll": False},
|
|
}
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
) as client:
|
|
for _ in range(2):
|
|
response = await client.post("/api/analyse", json=request_payload)
|
|
assert response.status_code == 200
|
|
|
|
response = await client.post("/api/analyse", json=request_payload)
|
|
|
|
assert response.status_code == 429
|
|
assert "Retry-After" in response.headers
|
|
|
|
retry_after_header = int(response.headers["Retry-After"])
|
|
payload = response.json()
|
|
assert payload["retryAfter"] == retry_after_header
|
|
assert "captchaChallenge" in payload
|
|
|
|
challenge = payload["captchaChallenge"]
|
|
assert challenge["challengeToken"]
|
|
assert challenge["imageBase64"]
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_rate_limiter_allows_bypass_token(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
app = _load_app(monkeypatch, limit=1, window_seconds=60)
|
|
raw_headers = (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
|
|
request_payload = {
|
|
"headers": raw_headers,
|
|
"config": {"testIds": [], "resolve": False, "decodeAll": False},
|
|
}
|
|
client_ip = "203.0.113.5"
|
|
bypass_token = issue_bypass_token(client_ip)
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
) as client:
|
|
response = await client.post(
|
|
"/api/analyse",
|
|
json=request_payload,
|
|
headers={"x-forwarded-for": client_ip},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
response = await client.post(
|
|
"/api/analyse",
|
|
json=request_payload,
|
|
headers={"x-forwarded-for": client_ip},
|
|
)
|
|
assert response.status_code == 429
|
|
|
|
response = await client.post(
|
|
"/api/analyse",
|
|
json=request_payload,
|
|
headers={
|
|
"x-forwarded-for": client_ip,
|
|
BYPASS_TOKEN_HEADER: bypass_token,
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|