MAESTRO: add security ops red tests

This commit is contained in:
Mariusz Banach
2026-02-18 04:03:23 +01:00
parent 902211ac69
commit 0f5f300c69
5 changed files with 395 additions and 0 deletions

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
import base64
import pytest
from httpx import ASGITransport, AsyncClient
from app.main import app
from app.security.captcha import create_captcha_challenge
def _assert_png_base64(image_base64: str) -> None:
decoded = base64.b64decode(image_base64)
assert decoded.startswith(b"\x89PNG\r\n\x1a\n")
@pytest.mark.anyio
async def test_captcha_challenge_generation_produces_image() -> None:
challenge = create_captcha_challenge()
assert challenge.challenge_token
assert challenge.image_base64
assert challenge.answer
_assert_png_base64(challenge.image_base64)
@pytest.mark.anyio
async def test_captcha_verify_rejects_invalid_answer() -> None:
challenge = create_captcha_challenge()
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/captcha/verify",
json={"challengeToken": challenge.challenge_token, "answer": "incorrect"},
)
assert response.status_code == 400
payload = response.json()
assert payload.get("error") or payload.get("detail")
@pytest.mark.anyio
async def test_captcha_verify_returns_bypass_token() -> None:
challenge = create_captcha_challenge()
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/captcha/verify",
json={"challengeToken": challenge.challenge_token, "answer": challenge.answer},
)
assert response.status_code == 200
payload = response.json()
assert payload["success"] is True
assert payload["bypassToken"]

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
import pytest
from httpx import ASGITransport, AsyncClient
from app.engine.scanner_registry import ScannerRegistry
from app.main import app
@pytest.mark.anyio
async def test_health_endpoint_returns_status() -> None:
registry = ScannerRegistry()
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.get("/api/health")
assert response.status_code == 200
payload = response.json()
assert payload["status"] in {"up", "degraded", "down"}
assert payload["version"]
assert payload["uptime"] >= 0
assert payload["scannerCount"] == len(registry.list_tests())

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
import importlib
from pathlib import Path
import pytest
from httpx import ASGITransport, AsyncClient
from app.core import config as config_module
FIXTURES_DIR = Path(__file__).resolve().parents[1] / "fixtures"
def _load_app(
monkeypatch: pytest.MonkeyPatch,
*,
limit: int = 2,
window_seconds: int = 60,
):
monkeypatch.setenv("WHA_RATE_LIMIT_REQUESTS", str(limit))
monkeypatch.setenv("WHA_RATE_LIMIT_WINDOW_SECONDS", str(window_seconds))
config_module.get_settings.cache_clear()
import app.main as main
importlib.reload(main)
return main.app
@pytest.mark.anyio
async def test_rate_limiter_returns_captcha_challenge(
monkeypatch: pytest.MonkeyPatch,
) -> None:
app = _load_app(monkeypatch)
raw_headers = (FIXTURES_DIR / "sample_headers.txt").read_text(encoding="utf-8")
request_payload = {
"headers": raw_headers,
"config": {"testIds": [], "resolve": False, "decodeAll": False},
}
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
for _ in range(2):
response = await client.post("/api/analyse", json=request_payload)
assert response.status_code == 200
response = await client.post("/api/analyse", json=request_payload)
assert response.status_code == 429
assert "Retry-After" in response.headers
retry_after_header = int(response.headers["Retry-After"])
payload = response.json()
assert payload["retryAfter"] == retry_after_header
assert "captchaChallenge" in payload
challenge = payload["captchaChallenge"]
assert challenge["challengeToken"]
assert challenge["imageBase64"]