This commit is contained in:
2026-04-20 11:05:55 +09:00
parent e4812ae79e
commit 38c802950a
4 changed files with 136 additions and 161 deletions
@@ -1,4 +1,5 @@
from .runner import TlsJobQueue
from .db import TlsTestDB
from .views import tls_submit, tls_api_submit, tls_results_context
__all__ = ["TlsJobQueue", "TlsTestDB"]
__all__ = ["TlsJobQueue", "TlsTestDB", "tls_submit", "tls_api_submit", "tls_results_context"]
@@ -75,6 +75,37 @@ def parse_target(target: str) -> tuple[str, int, str | None]:
return host, port, sni
def validate_tls_target(raw: str) -> str | None:
import re
s = (raw or "").strip()
if not s or len(s) > 255:
return None
if not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s):
return None
try:
host, port, _ = parse_target(s)
except Exception:
return None
if not host or port <= 0 or port > 65535:
return None
try:
ipaddress.ip_address(host)
return s
except ValueError:
pass
labels = host.split(".")
if not labels or any(not lbl for lbl in labels):
return None
for lbl in labels:
if len(lbl) > 63 or lbl.startswith("-") or lbl.endswith("-"):
return None
if not re.match(r"^[A-Za-z0-9\-]+$", lbl):
return None
if len(labels) < 2 and host != "localhost":
return None
return s
async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResult) -> None:
host = result.host
port = result.port
@@ -0,0 +1,93 @@
from __future__ import annotations
import datetime
from fastapi import Request
from fastapi.templating import Jinja2Templates
from fastapi.responses import Response
from .engine import validate_tls_target
from .ratelimit import check as ratelimit_check, client_ip_from_scope
def tls_submit(
request: Request,
raw: str,
tls_test_db,
tls_test_queue,
templates: Jinja2Templates,
) -> tuple[str | None, Response | None]:
target = validate_tls_target(raw)
if not target:
return None, templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": "無効なターゲットです。ホスト名/IP(:ポート)を入力してください。", "last_target": raw},
status_code=400,
)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return None, templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": decision.reason, "last_target": raw},
status_code=429,
)
return tls_test_queue.submit(target, client_ip), None
def tls_api_submit(request: Request, raw: str, tls_test_db, tls_test_queue) -> tuple[str | None, tuple[dict, int] | None]:
target = validate_tls_target(raw)
if not target:
return None, ({"error": "invalid target"}, 400)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return None, ({"error": decision.reason}, 429)
return tls_test_queue.submit(target, client_ip), None
def tls_results_context(job: dict, test_id: str, request: Request, tls_test_db) -> dict:
result = job.get("result") or {}
findings = result.get("findings", [])
groups: dict[str, list[dict]] = {"reliability": [], "safety": [], "vulnerabilities": [], "compatibility": [], "other": []}
for f in findings:
groups.setdefault(f.get("group") or "other", []).append(f)
impactful = sorted([f for f in findings if (f.get("impact") or 0) > 0], key=lambda f: f.get("impact", 0), reverse=True)
summary = impactful[:20] + [f for f in findings if f.get("severity") == "good"][:6]
finished_at_display = ""
try:
ts = job.get("finished_at") or job.get("started_at") or 0
if ts:
finished_at_display = datetime.datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
pass
progress_entries = tls_test_db.get_progress(test_id)
findings_by_step: dict[str, list[dict]] = {}
for f in findings:
findings_by_step.setdefault((f.get("step") or "").strip(), []).append(f)
log_entries: list[dict] = []
seen_steps: set[str] = set()
for p in progress_entries:
log_entries.append({"kind": "phase", "phase": p.get("phase") or "", "detail": p.get("detail") or "", "severity": p.get("severity") or "info"})
step = (p.get("phase") or "").strip()
if step and step not in seen_steps:
log_entries += [{"kind": "finding", "finding": f} for f in findings_by_step.get(step, [])]
seen_steps.add(step)
for step, fs in findings_by_step.items():
if step not in seen_steps:
log_entries += [{"kind": "finding", "finding": f} for f in fs]
return {
"test_id": test_id,
"job": job,
"result": result,
"groups": groups,
"summary": summary,
"group_labels": {"reliability": "信頼性", "safety": "安全性", "vulnerabilities": "脆弱性", "compatibility": "互換性", "other": "その他"},
"finished_at_display": finished_at_display,
"share_url": str(request.url).split("#", 1)[0],
"log_entries": log_entries,
}