mirror of
https://github.com/mgeeky/decode-spam-headers.git
synced 2026-02-22 13:33:30 +01:00
MAESTRO: fix linting and formatting
This commit is contained in:
@@ -68,9 +68,7 @@ class Settings(BaseSettings):
|
||||
parsed = json.loads(text)
|
||||
if isinstance(parsed, list):
|
||||
return [
|
||||
str(item).strip()
|
||||
for item in parsed
|
||||
if str(item).strip()
|
||||
str(item).strip() for item in parsed if str(item).strip()
|
||||
]
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
@@ -114,9 +114,7 @@ class Logger:
|
||||
colored = colorizingFunc(c, txt)
|
||||
|
||||
if not colored:
|
||||
raise ValueError(
|
||||
f"Could not strip colors from phrase: ({patt})!"
|
||||
)
|
||||
raise ValueError(f"Could not strip colors from phrase: ({patt})!")
|
||||
|
||||
s = s.replace(patt, colored)
|
||||
pos = 0
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
"""Scanner implementations grouped by vendor or function."""
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def _normalize_payload(payload: object) -> tuple[str, str, str, str] | None:
|
||||
|
||||
|
||||
def _combine_payloads(
|
||||
payloads: list[tuple[str, str, str, str]]
|
||||
payloads: list[tuple[str, str, str, str]],
|
||||
) -> tuple[str, str, str, str]:
|
||||
headers: list[str] = []
|
||||
values: list[str] = []
|
||||
|
||||
@@ -79,9 +79,7 @@ async def analyse(request: Request) -> StreamingResponse:
|
||||
)
|
||||
|
||||
if len(headers) > MAX_HEADERS_LENGTH:
|
||||
return JSONResponse(
|
||||
status_code=413, content={"error": "Headers exceed 1 MB"}
|
||||
)
|
||||
return JSONResponse(status_code=413, content={"error": "Headers exceed 1 MB"})
|
||||
|
||||
headers = _sanitize_headers(headers)
|
||||
if not headers.strip():
|
||||
@@ -120,9 +118,7 @@ async def analyse(request: Request) -> StreamingResponse:
|
||||
elapsed_ms = (perf_counter() - start) * 1000
|
||||
percentage = 0.0
|
||||
if total_tests > 0:
|
||||
percentage = min(
|
||||
100.0, (current_index + 1) / total_tests * 100.0
|
||||
)
|
||||
percentage = min(100.0, (current_index + 1) / total_tests * 100.0)
|
||||
payload = AnalysisProgress(
|
||||
current_index=current_index,
|
||||
total_tests=total_tests,
|
||||
@@ -168,9 +164,7 @@ async def analyse(request: Request) -> StreamingResponse:
|
||||
),
|
||||
)
|
||||
|
||||
await send_stream.send(
|
||||
("result", final_result.model_dump(by_alias=True))
|
||||
)
|
||||
await send_stream.send(("result", final_result.model_dump(by_alias=True)))
|
||||
await send_stream.send(("done", {}))
|
||||
|
||||
async def event_stream() -> Any:
|
||||
|
||||
@@ -245,9 +245,7 @@ def _b64decode(data: str) -> bytes:
|
||||
def _prune_challenges_locked() -> None:
|
||||
now = time.time()
|
||||
expired = [
|
||||
token
|
||||
for token, record in _CHALLENGES.items()
|
||||
if record.expires_at <= now
|
||||
token for token, record in _CHALLENGES.items() if record.expires_at <= now
|
||||
]
|
||||
for token in expired:
|
||||
_CHALLENGES.pop(token, None)
|
||||
|
||||
@@ -42,4 +42,4 @@ def test_analyzer_runs_selected_tests_and_reports_progress() -> None:
|
||||
assert progress_events
|
||||
assert all(total == 2 for _, total, _ in progress_events)
|
||||
assert progress_events[0][0] == 0
|
||||
assert progress_events[-1][0] == 1
|
||||
assert progress_events[-1][0] == 1
|
||||
|
||||
@@ -48,4 +48,4 @@ def test_parser_preserves_content_type_boundary(sample_headers: str) -> None:
|
||||
headers = parser.parse(sample_headers)
|
||||
|
||||
content_type = next(header for header in headers if header.name == "Content-Type")
|
||||
assert "boundary=\"boundary-123\"" in content_type.value
|
||||
assert 'boundary="boundary-123"' in content_type.value
|
||||
|
||||
@@ -29,4 +29,4 @@ def test_registry_filters_by_ids_and_lists_tests() -> None:
|
||||
|
||||
assert lookup[1].category
|
||||
assert lookup[12].category
|
||||
assert lookup[66].category
|
||||
assert lookup[66].category
|
||||
|
||||
Reference in New Issue
Block a user