This commit is contained in:
2026-03-31 01:15:38 +09:00
parent 048b28cfbb
commit fffa19679e
2 changed files with 58 additions and 50 deletions

View File

@@ -10,6 +10,7 @@ dependencies = [
"psutil", "psutil",
"httpx", "httpx",
"websockets", "websockets",
"markitdown",
"fastapi", "fastapi",
"jinja2", "jinja2",
"uvicorn[standard]" "uvicorn[standard]"

View File

@@ -1,7 +1,7 @@
import json import json
import random import random
from pathlib import Path from pathlib import Path
from zoneinfo import ZoneInfo from markitdown import MarkItDown
from datetime import datetime, timezone from datetime import datetime, timezone
from fastapi import FastAPI, Request, Response from fastapi import FastAPI, Request, Response
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@@ -14,15 +14,12 @@ from .middleware import Middleware, server_version, onion_hostname
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None) app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
app.add_middleware(Middleware) app.add_middleware(Middleware)
templates = Jinja2Templates(directory=Path.cwd().joinpath("public")) templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
markitdown = MarkItDown()
accesscounter = AccessCounter() accesscounter = AccessCounter()
templates.env.globals["get_access_count"] = accesscounter.get templates.env.globals["get_access_count"] = accesscounter.get
templates.env.globals["server_version"] = server_version templates.env.globals["server_version"] = server_version
templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/" templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/"
def get_current_year() -> str:
return str(datetime.now(ZoneInfo("Asia/Tokyo")).year)
templates.env.globals["get_current_year"] = get_current_year
def get_daily_quote() -> str: def get_daily_quote() -> str:
seed = str(datetime.now(timezone.utc).date()) seed = str(datetime.now(timezone.utc).date())
with Path.cwd().joinpath("public", "quotes.txt").open("r") as f: with Path.cwd().joinpath("public", "quotes.txt").open("r") as f:
@@ -30,6 +27,27 @@ def get_daily_quote() -> str:
return random.Random(seed).choice(quotes) return random.Random(seed).choice(quotes)
templates.env.globals["get_daily_quote"] = get_daily_quote templates.env.globals["get_daily_quote"] = get_daily_quote
def resolve_static_file(full_path: str) -> Path | None:
base_dir = Path.cwd().joinpath("public")
target_path = (base_dir / full_path.lstrip('/')).resolve()
if not str(target_path).startswith(str(base_dir.resolve())):
raise PermissionError()
return target_path if target_path.is_file() else None
def resolve_shorturl(shorturls: dict, full_path: str) -> str | None:
current_id = full_path.strip().rstrip("/")
visited = set()
for _ in range(10):
if current_id in visited or current_id not in shorturls:
return None
visited.add(current_id)
entry = shorturls[current_id]
if entry["type"] in ["redirect", "alias"]:
if entry["type"] == "redirect":
return entry["content"]
current_id = entry["content"]
return None
@app.api_route("/ping", methods=["GET"]) @app.api_route("/ping", methods=["GET"])
async def ping(request: Request): async def ping(request: Request):
return PlainTextResponse("pong!", status_code=200) return PlainTextResponse("pong!", status_code=200)
@@ -75,52 +93,41 @@ async def fake_error_page(request: Request, code: str):
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"]) @app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
async def default_response(request: Request, full_path: str) -> Response: async def default_response(request: Request, full_path: str) -> Response:
if not full_path.endswith(".html"): if not full_path.endswith(".html"):
base_dir = Path.cwd().joinpath("public")
safe_full_path = full_path.lstrip('/')
target_path = (base_dir / safe_full_path).resolve()
if not str(target_path).startswith(str(base_dir.resolve())):
return error_page(templates=templates, request=request, status_code=403, message="ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいのえっt")
if target_path.exists() and target_path.is_file():
return FileResponse(target_path)
templates_to_try = []
if full_path == "" or full_path == "/":
templates_to_try.append("index.html")
elif full_path.endswith(".html"):
templates_to_try.append(full_path.lstrip('/'))
else:
clean_path = full_path.strip('/')
templates_to_try.append(f"{clean_path}.html")
templates_to_try.append(f"{clean_path}/index.html")
for template_name in templates_to_try:
try: try:
response = templates.TemplateResponse(status_code=200, request=request, name=template_name) if static := resolve_static_file(full_path):
accesscounter.increase() return FileResponse(static)
return response except PermissionError:
return error_page(templates, request, 403, "ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいのえっt")
if full_path in ["", "/"]:
template_candidates = ["index.html"]
elif full_path.endswith(".html"):
template_candidates = [full_path.lstrip('/')]
else:
template_candidates = [f"{full_path.strip('/')}.html", f"{full_path.strip('/')}/index.html"]
for name in template_candidates:
try:
if "curl" in request.headers.get("user-agent", "").lower():
content = templates.env.get_template(name).render(request=request)
accesscounter.increase()
return PlainTextResponse(markitdown.convert(content).text_content, status_code=200)
else:
response = templates.TemplateResponse(status_code=200, request=request, name=name)
accesscounter.increase()
return response
except TemplateNotFound: except TemplateNotFound:
continue continue
shorturls_json = Path.cwd().joinpath("public", "shorturls.json")
if not shorturls_json.exists():
return error_page(templates=templates, request=request, status_code=500, message="設定ファイルぐらい用意しておけよ!")
try: try:
with shorturls_json.open("r", encoding="utf-8") as f: path = Path.cwd().joinpath("public", "shorturls.json")
shorturls = json.load(f) if not path.exists():
return error_page(templates, request, 500, "設定ファイルぐらい用意しておけよ!")
shorturls = json.load(path.open("r", encoding="utf-8"))
except Exception: except Exception:
return error_page(templates=templates, request=request, status_code=500, message="なにこの設定ファイル読めないじゃない!") return error_page(templates, request, 500, "なにこの設定ファイル読めないじゃない!")
current_id = full_path.strip().rstrip("/")
visited = set() if result := resolve_shorturl(shorturls, full_path):
for _ in range(10): return RedirectResponse(url=result)
if current_id in visited:
return error_page(templates=templates, request=request, status_code=500, message="循環依存ってなんかちょっとえっt") return error_page(templates, request, 404, "そんなページ知らないっ!")
visited.add(current_id)
if current_id not in shorturls:
break
entry = shorturls[current_id]
entry_type = entry.get("type")
content = entry.get("content")
if entry_type == "redirect":
return RedirectResponse(url=content)
elif entry_type == "alias":
current_id = content
else:
break
return error_page(templates=templates, request=request, status_code=404, message="そんなページ知らないっ!")