From 38c802950abaa8100dc88c50a4d7fc0f08049b82 Mon Sep 17 00:00:00 2001 From: nercone-dev Date: Mon, 20 Apr 2026 11:05:55 +0900 Subject: [PATCH] -- --- src/nercone_website/server.py | 170 ++---------------- .../tools/tls_test/__init__.py | 3 +- src/nercone_website/tools/tls_test/engine.py | 31 ++++ src/nercone_website/tools/tls_test/views.py | 93 ++++++++++ 4 files changed, 136 insertions(+), 161 deletions(-) create mode 100644 src/nercone_website/tools/tls_test/views.py diff --git a/src/nercone_website/server.py b/src/nercone_website/server.py index f1794be..a94d306 100644 --- a/src/nercone_website/server.py +++ b/src/nercone_website/server.py @@ -18,9 +18,8 @@ from jinja2.exceptions import TemplateNotFound from .error import error_page from .database import AccessCounter from .middleware import Middleware, server_version, onion_hostname -from .tools.tls_test import TlsJobQueue, TlsTestDB -from .tools.tls_test.engine import run_full_scan, parse_target -from .tools.tls_test.ratelimit import check as ratelimit_check, client_ip_from_scope +from .tools.tls_test import TlsJobQueue, TlsTestDB, tls_submit, tls_api_submit, tls_results_context +from .tools.tls_test.engine import run_full_scan tls_test_db = TlsTestDB() tls_test_queue = TlsJobQueue(tls_test_db, run_full_scan) @@ -144,39 +143,6 @@ async def thumbnail(request: Request, path: str) -> Response: png = resvg_py.svg_to_bytes(svg, font_files=font_files, width=1200, height=630) return Response(content=png, media_type="image/png") -def _validate_tls_target(raw: str) -> str | None: - import ipaddress - s = (raw or "").strip() - if not s or len(s) > 255: - return None - # Reject obviously garbage input (all dashes/dots, control chars, whitespace). - if not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s): - return None - try: - host, port, _ = parse_target(s) - except Exception: - return None - if not host or port <= 0 or port > 65535: - return None - # Host must be either an IP literal or a valid-looking hostname. - try: - ipaddress.ip_address(host) - return s - except ValueError: - pass - # Hostname: each label must be 1-63 chars, not start/end with '-', no empty labels. - labels = host.split(".") - if not labels or any(not lbl for lbl in labels): - return None - for lbl in labels: - if len(lbl) > 63 or lbl.startswith("-") or lbl.endswith("-"): - return None - if not re.match(r"^[A-Za-z0-9\-]+$", lbl): - return None - if len(labels) < 2 and host != "localhost": - return None - return s - @app.api_route("/tools/tls-test/", methods=["GET"]) async def tls_test_index(request: Request) -> Response: return templates.TemplateResponse(request=request, name="tools/tls-test/index.html") @@ -184,25 +150,9 @@ async def tls_test_index(request: Request) -> Response: @app.api_route("/tools/tls-test/", methods=["POST"]) async def tls_test_submit(request: Request) -> Response: form = await request.form() - raw = str(form.get("target", "")).strip() - target = _validate_tls_target(raw) - if not target: - return templates.TemplateResponse( - request=request, - name="tools/tls-test/index.html", - context={"error": "無効なターゲットです。ホスト名/IP(:ポート)を入力してください。", "last_target": raw}, - status_code=400, - ) - client_ip = client_ip_from_scope(request.scope) - decision = ratelimit_check(tls_test_db, client_ip) - if not decision.allowed: - return templates.TemplateResponse( - request=request, - name="tools/tls-test/index.html", - context={"error": decision.reason, "last_target": raw}, - status_code=429, - ) - test_id = tls_test_queue.submit(target, client_ip) + test_id, err = tls_submit(request, str(form.get("target", "")).strip(), tls_test_db, tls_test_queue, templates) + if err: + return err return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303) @app.api_route("/tools/tls-test/status/{test_id}/", methods=["GET"]) @@ -212,16 +162,10 @@ async def tls_test_status_page(request: Request, test_id: str) -> Response: return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?") if job.get("status") == "done": return RedirectResponse(url=f"/tools/tls-test/results/{test_id}/", status_code=303) - progress = tls_test_db.get_progress(test_id) return templates.TemplateResponse( request=request, name="tools/tls-test/status.html", - context={ - "test_id": test_id, - "target": job.get("target", ""), - "status": job.get("status", ""), - "progress_entries": progress, - }, + context={"test_id": test_id, "target": job.get("target", ""), "status": job.get("status", ""), "progress_entries": tls_test_db.get_progress(test_id)}, ) @app.api_route("/tools/tls-test/results/{test_id}/", methods=["GET"]) @@ -231,96 +175,7 @@ async def tls_test_results_page(request: Request, test_id: str) -> Response: return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?") if job.get("status") != "done": return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303) - result = job.get("result") or {} - findings = result.get("findings", []) - # Primary grouping — the 4 tabs requested by the user. - groups: dict[str, list[dict]] = { - "reliability": [], - "safety": [], - "vulnerabilities": [], - "compatibility": [], - "other": [], - } - for f in findings: - g = f.get("group") or "other" - groups.setdefault(g, []).append(f) - # Summary tab — score-impacting findings, sorted by impact desc. - impactful = sorted( - [f for f in findings if (f.get("impact") or 0) > 0], - key=lambda f: f.get("impact", 0), - reverse=True, - ) - # Always include the top "good" findings too so the Summary tab is not empty - # on pristine sites. Take up to 20 impactful + 6 positives. - positives = [f for f in findings if f.get("severity") == "good"][:6] - summary = impactful[:20] + positives - - # Human-friendly "finished_at" for the results header: ISO-like local string. - import datetime as _dt - finished_ts = job.get("finished_at") or job.get("started_at") or 0 - finished_at_display = "" - try: - if finished_ts: - finished_at_display = _dt.datetime.fromtimestamp(int(finished_ts)).strftime("%Y-%m-%dT%H:%M:%S") - except Exception: - finished_at_display = "" - - # Sharable canonical URL for the "Copy link" button. Prefer the public - # tools.* subdomain when known, otherwise the current request's origin. - share_url = str(request.url).split("#", 1)[0] - - # ---- Reconstruct the "live log" that was shown on the processing page ---- - # Progress entries are phase-transition messages stored in test_progress - # (ordered by seq). Findings arrive between progress rows and are grouped - # by the phase they ran under (Finding.step). We interleave the two so the - # ログ tab reads exactly like the real-time WS stream did. - progress_entries = tls_test_db.get_progress(test_id) - findings_by_step: dict[str, list[dict]] = {} - for f in findings: - step = (f.get("step") or "").strip() - findings_by_step.setdefault(step, []).append(f) - log_entries: list[dict] = [] - seen_steps: set[str] = set() - for p in progress_entries: - log_entries.append({ - "kind": "phase", - "phase": p.get("phase") or "", - "detail": p.get("detail") or "", - "severity": p.get("severity") or "info", - }) - step = (p.get("phase") or "").strip() - if step and step not in seen_steps: - for f in findings_by_step.get(step, []): - log_entries.append({"kind": "finding", "finding": f}) - seen_steps.add(step) - # Any findings whose step never showed up as a progress row (rare — engine - # errors etc.) — append them at the end so nothing is dropped. - for step, fs in findings_by_step.items(): - if step not in seen_steps: - for f in fs: - log_entries.append({"kind": "finding", "finding": f}) - - return templates.TemplateResponse( - request=request, - name="tools/tls-test/results.html", - context={ - "test_id": test_id, - "job": job, - "result": result, - "groups": groups, - "summary": summary, - "group_labels": { - "reliability": "信頼性", - "safety": "安全性", - "vulnerabilities": "脆弱性", - "compatibility": "互換性", - "other": "その他", - }, - "finished_at_display": finished_at_display, - "share_url": share_url, - "log_entries": log_entries, - }, - ) + return templates.TemplateResponse(request=request, name="tools/tls-test/results.html", context=tls_results_context(job, test_id, request, tls_test_db)) @app.websocket("/tools/tls-test/ws/{test_id}") async def tls_test_ws(websocket: WebSocket, test_id: str): @@ -365,14 +220,9 @@ async def tls_test_api_scan(request: Request) -> Response: payload = await request.json() except Exception: return JSONResponse({"error": "invalid JSON"}, status_code=400) - target = _validate_tls_target(str(payload.get("target", ""))) - if not target: - return JSONResponse({"error": "invalid target"}, status_code=400) - client_ip = client_ip_from_scope(request.scope) - decision = ratelimit_check(tls_test_db, client_ip) - if not decision.allowed: - return JSONResponse({"error": decision.reason}, status_code=429) - test_id = tls_test_queue.submit(target, client_ip) + test_id, err = tls_api_submit(request, str(payload.get("target", "")), tls_test_db, tls_test_queue) + if err: + return JSONResponse(err[0], status_code=err[1]) return JSONResponse({ "id": test_id, "status_url": f"/tools/tls-test/status/{test_id}/", diff --git a/src/nercone_website/tools/tls_test/__init__.py b/src/nercone_website/tools/tls_test/__init__.py index 8ecacec..1fde8e8 100644 --- a/src/nercone_website/tools/tls_test/__init__.py +++ b/src/nercone_website/tools/tls_test/__init__.py @@ -1,4 +1,5 @@ from .runner import TlsJobQueue from .db import TlsTestDB +from .views import tls_submit, tls_api_submit, tls_results_context -__all__ = ["TlsJobQueue", "TlsTestDB"] +__all__ = ["TlsJobQueue", "TlsTestDB", "tls_submit", "tls_api_submit", "tls_results_context"] diff --git a/src/nercone_website/tools/tls_test/engine.py b/src/nercone_website/tools/tls_test/engine.py index 2cec084..37c50e0 100644 --- a/src/nercone_website/tools/tls_test/engine.py +++ b/src/nercone_website/tools/tls_test/engine.py @@ -75,6 +75,37 @@ def parse_target(target: str) -> tuple[str, int, str | None]: return host, port, sni +def validate_tls_target(raw: str) -> str | None: + import re + s = (raw or "").strip() + if not s or len(s) > 255: + return None + if not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s): + return None + try: + host, port, _ = parse_target(s) + except Exception: + return None + if not host or port <= 0 or port > 65535: + return None + try: + ipaddress.ip_address(host) + return s + except ValueError: + pass + labels = host.split(".") + if not labels or any(not lbl for lbl in labels): + return None + for lbl in labels: + if len(lbl) > 63 or lbl.startswith("-") or lbl.endswith("-"): + return None + if not re.match(r"^[A-Za-z0-9\-]+$", lbl): + return None + if len(labels) < 2 and host != "localhost": + return None + return s + + async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResult) -> None: host = result.host port = result.port diff --git a/src/nercone_website/tools/tls_test/views.py b/src/nercone_website/tools/tls_test/views.py new file mode 100644 index 0000000..a7d7ef8 --- /dev/null +++ b/src/nercone_website/tools/tls_test/views.py @@ -0,0 +1,93 @@ +from __future__ import annotations +import datetime +from fastapi import Request +from fastapi.templating import Jinja2Templates +from fastapi.responses import Response +from .engine import validate_tls_target +from .ratelimit import check as ratelimit_check, client_ip_from_scope + + +def tls_submit( + request: Request, + raw: str, + tls_test_db, + tls_test_queue, + templates: Jinja2Templates, +) -> tuple[str | None, Response | None]: + target = validate_tls_target(raw) + if not target: + return None, templates.TemplateResponse( + request=request, + name="tools/tls-test/index.html", + context={"error": "無効なターゲットです。ホスト名/IP(:ポート)を入力してください。", "last_target": raw}, + status_code=400, + ) + client_ip = client_ip_from_scope(request.scope) + decision = ratelimit_check(tls_test_db, client_ip) + if not decision.allowed: + return None, templates.TemplateResponse( + request=request, + name="tools/tls-test/index.html", + context={"error": decision.reason, "last_target": raw}, + status_code=429, + ) + return tls_test_queue.submit(target, client_ip), None + + +def tls_api_submit(request: Request, raw: str, tls_test_db, tls_test_queue) -> tuple[str | None, tuple[dict, int] | None]: + target = validate_tls_target(raw) + if not target: + return None, ({"error": "invalid target"}, 400) + client_ip = client_ip_from_scope(request.scope) + decision = ratelimit_check(tls_test_db, client_ip) + if not decision.allowed: + return None, ({"error": decision.reason}, 429) + return tls_test_queue.submit(target, client_ip), None + + +def tls_results_context(job: dict, test_id: str, request: Request, tls_test_db) -> dict: + result = job.get("result") or {} + findings = result.get("findings", []) + + groups: dict[str, list[dict]] = {"reliability": [], "safety": [], "vulnerabilities": [], "compatibility": [], "other": []} + for f in findings: + groups.setdefault(f.get("group") or "other", []).append(f) + + impactful = sorted([f for f in findings if (f.get("impact") or 0) > 0], key=lambda f: f.get("impact", 0), reverse=True) + summary = impactful[:20] + [f for f in findings if f.get("severity") == "good"][:6] + + finished_at_display = "" + try: + ts = job.get("finished_at") or job.get("started_at") or 0 + if ts: + finished_at_display = datetime.datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%S") + except Exception: + pass + + progress_entries = tls_test_db.get_progress(test_id) + findings_by_step: dict[str, list[dict]] = {} + for f in findings: + findings_by_step.setdefault((f.get("step") or "").strip(), []).append(f) + log_entries: list[dict] = [] + seen_steps: set[str] = set() + for p in progress_entries: + log_entries.append({"kind": "phase", "phase": p.get("phase") or "", "detail": p.get("detail") or "", "severity": p.get("severity") or "info"}) + step = (p.get("phase") or "").strip() + if step and step not in seen_steps: + log_entries += [{"kind": "finding", "finding": f} for f in findings_by_step.get(step, [])] + seen_steps.add(step) + for step, fs in findings_by_step.items(): + if step not in seen_steps: + log_entries += [{"kind": "finding", "finding": f} for f in fs] + + return { + "test_id": test_id, + "job": job, + "result": result, + "groups": groups, + "summary": summary, + "group_labels": {"reliability": "信頼性", "safety": "安全性", "vulnerabilities": "脆弱性", "compatibility": "互換性", "other": "その他"}, + "finished_at_display": finished_at_display, + "share_url": str(request.url).split("#", 1)[0], + "log_entries": log_entries, + }