import json import random from pathlib import Path from zoneinfo import ZoneInfo from datetime import datetime, timezone from fastapi import FastAPI, Request, Response from fastapi.templating import Jinja2Templates from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse from jinja2.exceptions import TemplateNotFound from .error import error_page from .database import AccessCounter from .middleware import Middleware, server_version, onion_hostname app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None) app.add_middleware(Middleware) templates = Jinja2Templates(directory=Path.cwd().joinpath("public")) accesscounter = AccessCounter() templates.env.globals["get_access_count"] = accesscounter.get templates.env.globals["server_version"] = server_version templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/" def get_current_year() -> str: return str(datetime.now(ZoneInfo("Asia/Tokyo")).year) templates.env.globals["get_current_year"] = get_current_year def get_daily_quote() -> str: seed = str(datetime.now(timezone.utc).date()) with Path.cwd().joinpath("public", "quotes.txt").open("r") as f: quotes = f.read().strip().split("\n") return random.Random(seed).choice(quotes) templates.env.globals["get_daily_quote"] = get_daily_quote @app.api_route("/ping", methods=["GET"]) async def ping(request: Request): return PlainTextResponse("pong!", status_code=200) @app.api_route("/echo", methods=["GET"]) async def echo(request: Request): return JSONResponse(request.scope["log"], status_code=200) @app.api_route("/status", methods=["GET"]) async def status(request: Request): return JSONResponse( { "status": "ok", "version": server_version, "daily_quote": get_daily_quote(), "access_count": accesscounter.get() }, status_code=200 ) @app.api_route("/welcome", methods=["GET"]) async def ping(request: Request): return PlainTextResponse( f""" ■ ■ ■■■■■ ■■■■ ■■■■ ■■■ ■ ■ ■■■■■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■ ■ ■ ■ ■■■■ ■■■■ ■ ■ ■ ■ ■ ■ ■■■■ ■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■ ■ ■■■■■ ■ ■ ■■■■ ■■■ ■ ■ ■■■■■ nercone.dev ({server_version}) welcome to nercone.dev! """.strip() + "\n", status_code=200 ) @app.api_route("/error/{code}", methods=["GET", "POST", "HEAD"]) async def fake_error_page(request: Request, code: str): return error_page(templates=templates, request=request, status_code=int(code)) @app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"]) async def default_response(request: Request, full_path: str) -> Response: if not full_path.endswith(".html"): base_dir = Path.cwd().joinpath("public") safe_full_path = full_path.lstrip('/') target_path = (base_dir / safe_full_path).resolve() if not str(target_path).startswith(str(base_dir.resolve())): return error_page(templates=templates, request=request, status_code=403, message="ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいの?えっt") if target_path.exists() and target_path.is_file(): return FileResponse(target_path) templates_to_try = [] if full_path == "" or full_path == "/": templates_to_try.append("index.html") elif full_path.endswith(".html"): templates_to_try.append(full_path.lstrip('/')) else: clean_path = full_path.strip('/') templates_to_try.append(f"{clean_path}.html") templates_to_try.append(f"{clean_path}/index.html") for template_name in templates_to_try: try: response = templates.TemplateResponse(status_code=200, request=request, name=template_name) accesscounter.increase() return response except TemplateNotFound: continue shorturls_json = Path.cwd().joinpath("public", "shorturls.json") if not shorturls_json.exists(): return error_page(templates=templates, request=request, status_code=500, message="設定ファイルぐらい用意しておけよ!") try: with shorturls_json.open("r", encoding="utf-8") as f: shorturls = json.load(f) except Exception: return error_page(templates=templates, request=request, status_code=500, message="なにこの設定ファイル読めないじゃない!") current_id = full_path.strip().rstrip("/") visited = set() for _ in range(10): if current_id in visited: return error_page(templates=templates, request=request, status_code=500, message="循環依存ってなんかちょっとえっt") visited.add(current_id) if current_id not in shorturls: break entry = shorturls[current_id] entry_type = entry.get("type") content = entry.get("content") if entry_type == "redirect": return RedirectResponse(url=content) elif entry_type == "alias": current_id = content else: break return error_page(templates=templates, request=request, status_code=404, message="そんなページ知らないっ!")