This commit is contained in:
2026-04-22 14:58:18 +09:00
parent 4d618f3b22
commit ba04eaf573
6 changed files with 131 additions and 176 deletions
+6 -40
View File
@@ -11,18 +11,17 @@ from bs4 import BeautifulSoup
from markitdown import MarkItDown
from datetime import datetime, timezone
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Response, WebSocket, WebSocketDisconnect, HTTPException
from fastapi import FastAPI, Request, Response, WebSocket, HTTPException
from fastapi.templating import Jinja2Templates
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
from jinja2.exceptions import TemplateNotFound
from .error import error_page
from .database import AccessCounter
from .middleware import Middleware, server_version, onion_hostname
from .tools.tls_test import TlsJobQueue, TlsTestDB, tls_submit, tls_api_submit, tls_results_context
from .tools.tls_test.engine import run_full_scan
tls_test_db = TlsTestDB()
tls_test_queue = TlsJobQueue(tls_test_db, run_full_scan)
from .tools.tls_test import (
tls_test_db, tls_test_queue,
tls_submit, tls_api_submit, tls_results_context, tls_websocket_handler,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -179,40 +178,7 @@ async def tls_test_results_page(request: Request, test_id: str) -> Response:
@app.websocket("/tools/tls-test/ws/{test_id}")
async def tls_test_ws(websocket: WebSocket, test_id: str):
job = tls_test_db.get_job(test_id)
if not job:
await websocket.close(code=4404)
return
await websocket.accept()
tls_test_queue.add_subscriber(test_id, websocket)
try:
history = tls_test_db.get_progress(test_id)
await websocket.send_text(json.dumps({
"type": "history",
"status": job.get("status"),
"target": job.get("target"),
"entries": history,
}))
if job.get("status") == "done":
await websocket.send_text(json.dumps({
"type": "done",
"redirect": f"/tools/tls-test/results/{test_id}/",
"rank": job.get("rank"),
"score": job.get("score"),
}))
await websocket.close()
return
while True:
try:
await websocket.receive_text()
except WebSocketDisconnect:
break
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
tls_test_queue.remove_subscriber(test_id, websocket)
await tls_websocket_handler(websocket, test_id, tls_test_db, tls_test_queue)
@app.api_route("/api/tools/tls-test/scan", methods=["POST"])
async def tls_test_api_scan(request: Request) -> Response: