352 lines
16 KiB
Python
352 lines
16 KiB
Python
import io
|
|
import re
|
|
import json
|
|
import yaml
|
|
import random
|
|
import mistune
|
|
import resvg_py
|
|
from html import escape
|
|
from pathlib import Path
|
|
from bs4 import BeautifulSoup
|
|
from markitdown import MarkItDown
|
|
from datetime import datetime, timezone
|
|
from contextlib import asynccontextmanager
|
|
from fastapi import FastAPI, Request, Response, WebSocket, HTTPException
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
|
|
from jinja2.exceptions import TemplateNotFound
|
|
from .error import error_page
|
|
from .database import AccessCounter
|
|
from .middleware import Middleware, server_version, onion_hostname
|
|
from .tools.tls_test import (
|
|
tls_test_db, tls_test_queue,
|
|
tls_submit, tls_api_submit, tls_results_context, tls_websocket_handler,
|
|
)
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
await tls_test_queue.start()
|
|
try:
|
|
yield
|
|
finally:
|
|
await tls_test_queue.stop()
|
|
|
|
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None, lifespan=lifespan)
|
|
app.add_middleware(Middleware)
|
|
templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
|
|
markitdown = MarkItDown()
|
|
accesscounter = AccessCounter()
|
|
templates.env.globals["get_access_count"] = accesscounter.get
|
|
templates.env.globals["server_version"] = server_version
|
|
templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/"
|
|
templates.env.filters["re_sub"] = lambda s, pattern, repl: re.sub(pattern, repl, s)
|
|
|
|
class CustomHTMLRenderer(mistune.HTMLRenderer):
|
|
def block_code(self, code, **attrs):
|
|
return f'<pre>{mistune.escape(code)}</pre>\n'
|
|
htmlitdown = mistune.create_markdown(renderer=CustomHTMLRenderer(escape=False))
|
|
|
|
def get_daily_quote() -> str:
|
|
seed = str(datetime.now(timezone.utc).date())
|
|
with Path.cwd().joinpath("public", "quotes.txt").open("r") as f:
|
|
quotes = f.read().strip().split("\n")
|
|
return random.Random(seed).choice(quotes)
|
|
templates.env.globals["get_daily_quote"] = get_daily_quote
|
|
|
|
def resolve_static_file(full_path: str) -> Path | None:
|
|
base_dir = Path.cwd().joinpath("public")
|
|
target_path = (base_dir / full_path.lstrip('/')).resolve()
|
|
if not str(target_path).startswith(str(base_dir.resolve())):
|
|
raise PermissionError()
|
|
return target_path if target_path.is_file() else None
|
|
|
|
def resolve_shorturl(shorturls: dict, full_path: str) -> str | None:
|
|
current_id = full_path.strip().rstrip("/")
|
|
visited = set()
|
|
for _ in range(10):
|
|
if current_id in visited or current_id not in shorturls:
|
|
return None
|
|
visited.add(current_id)
|
|
entry = shorturls[current_id]
|
|
if entry["type"] in ["redirect", "alias"]:
|
|
if entry["type"] == "redirect":
|
|
return entry["content"]
|
|
current_id = entry["content"]
|
|
return None
|
|
|
|
@app.api_route("/ping", methods=["GET"])
|
|
async def ping(request: Request):
|
|
return PlainTextResponse("pong!", status_code=200)
|
|
|
|
@app.api_route("/echo", methods=["GET"])
|
|
async def echo(request: Request):
|
|
return JSONResponse(request.scope["log"], status_code=200)
|
|
|
|
@app.api_route("/status", methods=["GET"])
|
|
async def status(request: Request):
|
|
return JSONResponse(
|
|
{
|
|
"status": "ok",
|
|
"version": server_version,
|
|
"daily_quote": get_daily_quote(),
|
|
"access_count": accesscounter.get()
|
|
},
|
|
status_code=200
|
|
)
|
|
|
|
@app.api_route("/welcome", methods=["GET"])
|
|
async def ping(request: Request):
|
|
return PlainTextResponse(
|
|
f"""
|
|
■ ■ ■■■■■ ■■■■ ■■■■ ■■■ ■ ■ ■■■■■
|
|
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
|
|
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
|
|
■ ■ ■ ■■■■ ■■■■ ■ ■ ■ ■ ■ ■ ■■■■
|
|
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
|
|
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
|
|
■ ■ ■■■■■ ■ ■ ■■■■ ■■■ ■ ■ ■■■■■
|
|
|
|
nercone.dev ({server_version})
|
|
welcome to nercone.dev!
|
|
""".strip() + "\n",
|
|
status_code=200
|
|
)
|
|
|
|
@app.api_route("/error/{code}", methods=["GET", "POST", "HEAD"])
|
|
async def fake_error_page(request: Request, code: str):
|
|
return error_page(templates=templates, request=request, status_code=int(code))
|
|
|
|
@app.api_route("/assets/images/thumbnails/{path:path}", methods=["GET"])
|
|
async def thumbnail(request: Request, path: str) -> Response:
|
|
title = request.query_params.get("title", "Untitled Page")
|
|
description = request.query_params.get("description", "No description.")
|
|
template_type = request.query_params.get("template", "normal")
|
|
|
|
parts = [p for p in path.strip("/").split("/") if p]
|
|
path_display = "nercone.dev / " + " / ".join(parts) if parts else "nercone.dev"
|
|
|
|
svg_filename = "error.svg" if template_type == "error" else "normal.svg"
|
|
fonts_dir = Path.cwd().joinpath("public", "assets", "fonts")
|
|
|
|
svg_path = Path.cwd().joinpath("public", "assets", "images", "thumbnails", svg_filename)
|
|
svg = svg_path.read_text(encoding="utf-8")
|
|
svg = svg.replace("__PATH__", escape(path_display))
|
|
svg = svg.replace("__TITLE__", escape(title))
|
|
svg = svg.replace("__DESCRIPTION__", escape(description))
|
|
|
|
font_files = [
|
|
str(fonts_dir / "MesloBIZUD-Regular.ttf"),
|
|
str(fonts_dir / "InterBIZUD-Regular.ttf"),
|
|
str(fonts_dir / "InterBIZUD-Bold.ttf"),
|
|
]
|
|
png = resvg_py.svg_to_bytes(svg, font_files=font_files, width=1200, height=630)
|
|
return Response(content=png, media_type="image/png")
|
|
|
|
@app.api_route("/tools/tls-test/", methods=["GET"])
|
|
async def tls_test_index(request: Request) -> Response:
|
|
return templates.TemplateResponse(request=request, name="tools/tls-test/index.html")
|
|
|
|
@app.api_route("/tools/tls-test/", methods=["POST"])
|
|
async def tls_test_submit(request: Request) -> Response:
|
|
form = await request.form()
|
|
test_id, err = tls_submit(request, str(form.get("target", "")).strip(), tls_test_db, tls_test_queue, templates)
|
|
if err:
|
|
return err
|
|
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
|
|
|
|
@app.api_route("/tools/tls-test/status/{test_id}/", methods=["GET"])
|
|
async def tls_test_status_page(request: Request, test_id: str) -> Response:
|
|
job = tls_test_db.get_job(test_id)
|
|
if not job:
|
|
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
|
|
if job.get("status") == "done":
|
|
return RedirectResponse(url=f"/tools/tls-test/results/{test_id}/", status_code=303)
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="tools/tls-test/status.html",
|
|
context={"test_id": test_id, "target": job.get("target", ""), "status": job.get("status", ""), "progress_entries": tls_test_db.get_progress(test_id)},
|
|
)
|
|
|
|
@app.api_route("/tools/tls-test/results/{test_id}/", methods=["GET"])
|
|
async def tls_test_results_page(request: Request, test_id: str) -> Response:
|
|
job = tls_test_db.get_job(test_id)
|
|
if not job:
|
|
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
|
|
if job.get("status") != "done":
|
|
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
|
|
return templates.TemplateResponse(request=request, name="tools/tls-test/results.html", context=tls_results_context(job, test_id, request, tls_test_db))
|
|
|
|
@app.websocket("/tools/tls-test/ws/{test_id}")
|
|
async def tls_test_ws(websocket: WebSocket, test_id: str):
|
|
await tls_websocket_handler(websocket, test_id, tls_test_db, tls_test_queue)
|
|
|
|
@app.api_route("/api/tools/tls-test/scan", methods=["POST"])
|
|
async def tls_test_api_scan(request: Request) -> Response:
|
|
try:
|
|
payload = await request.json()
|
|
except Exception:
|
|
return JSONResponse({"error": "invalid JSON"}, status_code=400)
|
|
test_id, err = tls_api_submit(request, str(payload.get("target", "")), tls_test_db, tls_test_queue)
|
|
if err:
|
|
return JSONResponse(err[0], status_code=err[1])
|
|
return JSONResponse({
|
|
"id": test_id,
|
|
"status_url": f"/tools/tls-test/status/{test_id}/",
|
|
"results_url": f"/tools/tls-test/results/{test_id}/",
|
|
"ws_url": f"/tools/tls-test/ws/{test_id}",
|
|
})
|
|
|
|
@app.api_route("/api/tools/tls-test/status/{test_id}", methods=["GET"])
|
|
async def tls_test_api_status(request: Request, test_id: str) -> Response:
|
|
job = tls_test_db.get_job(test_id)
|
|
if not job:
|
|
return JSONResponse({"error": "not found"}, status_code=404)
|
|
progress = tls_test_db.get_progress(test_id)
|
|
return JSONResponse({
|
|
"id": test_id,
|
|
"target": job.get("target"),
|
|
"status": job.get("status"),
|
|
"rank": job.get("rank"),
|
|
"score": job.get("score"),
|
|
"created_at": job.get("created_at"),
|
|
"started_at": job.get("started_at"),
|
|
"finished_at": job.get("finished_at"),
|
|
"progress": progress,
|
|
"error": job.get("error_message"),
|
|
})
|
|
|
|
@app.api_route("/api/tools/tls-test/results/{test_id}", methods=["GET"])
|
|
async def tls_test_api_results(request: Request, test_id: str) -> Response:
|
|
job = tls_test_db.get_job(test_id)
|
|
if not job:
|
|
return JSONResponse({"error": "not found"}, status_code=404)
|
|
if job.get("status") != "done":
|
|
return JSONResponse({"error": "not ready", "status": job.get("status")}, status_code=409)
|
|
return JSONResponse(job.get("result") or {})
|
|
|
|
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
|
|
async def default_response(request: Request, full_path: str) -> Response:
|
|
if not full_path.endswith(".html") and not full_path.endswith(".md"):
|
|
try:
|
|
if static := resolve_static_file(full_path):
|
|
return FileResponse(static)
|
|
except PermissionError:
|
|
return error_page(templates, request, 403, "何をしてるんです?脆弱性報告のためならいいのですが、データ盗んで悪用するためなら今すぐにやめてくださいね?", "ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいの?えっt")
|
|
|
|
markdown_mode = False
|
|
original_path = full_path
|
|
|
|
if "curl" in request.headers.get("user-agent", "").lower():
|
|
markdown_mode = True
|
|
elif "text/markdown" in request.headers.get("accept", "").lower():
|
|
markdown_mode = True
|
|
|
|
if full_path.endswith(".md"):
|
|
markdown_mode = True
|
|
full_path = full_path[:-3] + ".html"
|
|
|
|
lightweight_query = request.query_params.get("lightweight", "").lower()
|
|
lightweight_header = request.headers.get("lightweight", "").lower()
|
|
lightweight_signal = lightweight_query or lightweight_header
|
|
|
|
if lightweight_signal in ["1", "y", "yes", "true"]:
|
|
lightweight_mode = True
|
|
elif lightweight_signal in ["0", "n", "no", "false"]:
|
|
lightweight_mode = False
|
|
else:
|
|
lightweight_mode = request.cookies.get("lightweight_mode", "") == "true"
|
|
|
|
if full_path in ["", "/"]:
|
|
template_candidates = ["index.html"]
|
|
elif full_path.endswith(".html"):
|
|
template_candidates = [full_path.lstrip('/')]
|
|
else:
|
|
template_candidates = [f"{full_path.strip('/')}.html", f"{full_path.strip('/')}/index.html"]
|
|
|
|
for name in template_candidates:
|
|
try:
|
|
if markdown_mode:
|
|
content = templates.env.get_template(name).render(request=request)
|
|
soup = BeautifulSoup(content, "html.parser")
|
|
main = str(soup.find("main")) if soup.find("main") else content
|
|
markdown = markitdown.convert_stream(io.BytesIO(main.encode("utf-8")), file_extension=".html")
|
|
response = PlainTextResponse(markdown.text_content, status_code=200, media_type="text/markdown")
|
|
else:
|
|
if lightweight_mode:
|
|
source = templates.env.loader.get_source(templates.env, name)[0]
|
|
source = source.replace('{% extends "/base.html" %}', '{% extends "/base-light.html" %}')
|
|
content = templates.env.from_string(source).render(request=request)
|
|
response = Response(content=content, status_code=200, media_type="text/html")
|
|
response.set_cookie("lightweight_mode", "true", samesite="lax")
|
|
else:
|
|
response = templates.TemplateResponse(status_code=200, request=request, name=name)
|
|
if request.cookies.get("lightweight_mode", "") == "true":
|
|
response.set_cookie("lightweight_mode", "false", samesite="lax")
|
|
|
|
accesscounter.increase()
|
|
return response
|
|
except TemplateNotFound:
|
|
continue
|
|
|
|
if original_path in ["", "/"]:
|
|
markdown_candidates = ["index.md"]
|
|
elif original_path.endswith(".md"):
|
|
markdown_candidates = [original_path.lstrip('/')]
|
|
else:
|
|
markdown_candidates = [f"{original_path.strip('/')}.md", f"{original_path.strip('/')}/index.md"]
|
|
|
|
for name in markdown_candidates:
|
|
try:
|
|
markdown_path = Path.cwd().joinpath("public", name)
|
|
if not markdown_path.is_relative_to(Path.cwd().joinpath("public")):
|
|
continue
|
|
with markdown_path.open("r") as f:
|
|
markdown = f.read()
|
|
|
|
if markdown_mode:
|
|
response = PlainTextResponse(markdown, status_code=200, media_type="text/markdown")
|
|
else:
|
|
if not markdown.startswith("---"):
|
|
front = {}
|
|
body = markdown
|
|
else:
|
|
end = markdown.find("\n---", 3)
|
|
if end == -1:
|
|
front = {}
|
|
body = markdown
|
|
else:
|
|
front = yaml.safe_load(markdown[3:end]) or {}
|
|
body = markdown[end+4:].lstrip("\n")
|
|
|
|
html = htmlitdown(body)
|
|
source = f'{{% extends "{"/base-light.html" if lightweight_mode else "/base.html"}" %}}\n'
|
|
for block in front:
|
|
source += f'{{% block {block} %}}{front[block]}{{% endblock %}}\n'
|
|
source += f'{{% block content %}}\n{html}\n{{% endblock %}}\n'
|
|
|
|
content = templates.env.from_string(source).render(request=request)
|
|
response = Response(content=content, status_code=200, media_type="text/html")
|
|
|
|
if lightweight_mode:
|
|
response.set_cookie("lightweight_mode", "true", samesite="lax")
|
|
elif request.cookies.get("lightweight_mode", "") == "true":
|
|
response.set_cookie("lightweight_mode", "false", samesite="lax")
|
|
|
|
accesscounter.increase()
|
|
return response
|
|
except FileNotFoundError:
|
|
continue
|
|
|
|
try:
|
|
path = Path.cwd().joinpath("public", "shorturls.json")
|
|
if not path.exists():
|
|
return error_page(templates, request, 500, "短縮URLの処理のためのJSONファイルがありません。", "設定ファイルぐらい用意しておけよ!")
|
|
shorturls = json.load(path.open("r", encoding="utf-8"))
|
|
except Exception:
|
|
return error_page(templates, request, 500, "短縮URLの処理のためのJSONファイルを正常に読み込めませんでした。", "なにこの設定ファイル読めないじゃない!")
|
|
|
|
if result := resolve_shorturl(shorturls, full_path):
|
|
return RedirectResponse(url=result)
|
|
|
|
return error_page(templates, request, 404, "リクエストしたページは現在ご利用になれません。削除/移動されたか、URLが間違っている可能性があります。", "そんなページ知らないっ!")
|