139 lines
5.7 KiB
Python
139 lines
5.7 KiB
Python
import io
|
||
import json
|
||
import random
|
||
from pathlib import Path
|
||
from bs4 import BeautifulSoup
|
||
from markitdown import MarkItDown
|
||
from datetime import datetime, timezone
|
||
from fastapi import FastAPI, Request, Response
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
|
||
from jinja2.exceptions import TemplateNotFound
|
||
from .error import error_page
|
||
from .database import AccessCounter
|
||
from .middleware import Middleware, server_version, onion_hostname
|
||
|
||
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
|
||
app.add_middleware(Middleware)
|
||
templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
|
||
markitdown = MarkItDown()
|
||
accesscounter = AccessCounter()
|
||
templates.env.globals["get_access_count"] = accesscounter.get
|
||
templates.env.globals["server_version"] = server_version
|
||
templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/"
|
||
|
||
def get_daily_quote() -> str:
|
||
seed = str(datetime.now(timezone.utc).date())
|
||
with Path.cwd().joinpath("public", "quotes.txt").open("r") as f:
|
||
quotes = f.read().strip().split("\n")
|
||
return random.Random(seed).choice(quotes)
|
||
templates.env.globals["get_daily_quote"] = get_daily_quote
|
||
|
||
def resolve_static_file(full_path: str) -> Path | None:
|
||
base_dir = Path.cwd().joinpath("public")
|
||
target_path = (base_dir / full_path.lstrip('/')).resolve()
|
||
if not str(target_path).startswith(str(base_dir.resolve())):
|
||
raise PermissionError()
|
||
return target_path if target_path.is_file() else None
|
||
|
||
def resolve_shorturl(shorturls: dict, full_path: str) -> str | None:
|
||
current_id = full_path.strip().rstrip("/")
|
||
visited = set()
|
||
for _ in range(10):
|
||
if current_id in visited or current_id not in shorturls:
|
||
return None
|
||
visited.add(current_id)
|
||
entry = shorturls[current_id]
|
||
if entry["type"] in ["redirect", "alias"]:
|
||
if entry["type"] == "redirect":
|
||
return entry["content"]
|
||
current_id = entry["content"]
|
||
return None
|
||
|
||
@app.api_route("/ping", methods=["GET"])
|
||
async def ping(request: Request):
|
||
return PlainTextResponse("pong!", status_code=200)
|
||
|
||
@app.api_route("/echo", methods=["GET"])
|
||
async def echo(request: Request):
|
||
return JSONResponse(request.scope["log"], status_code=200)
|
||
|
||
@app.api_route("/status", methods=["GET"])
|
||
async def status(request: Request):
|
||
return JSONResponse(
|
||
{
|
||
"status": "ok",
|
||
"version": server_version,
|
||
"daily_quote": get_daily_quote(),
|
||
"access_count": accesscounter.get()
|
||
},
|
||
status_code=200
|
||
)
|
||
|
||
@app.api_route("/welcome", methods=["GET"])
|
||
async def ping(request: Request):
|
||
return PlainTextResponse(
|
||
f"""
|
||
■ ■ ■■■■■ ■■■■ ■■■■ ■■■ ■ ■ ■■■■■
|
||
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
|
||
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
|
||
■ ■ ■ ■■■■ ■■■■ ■ ■ ■ ■ ■ ■ ■■■■
|
||
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
|
||
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
|
||
■ ■ ■■■■■ ■ ■ ■■■■ ■■■ ■ ■ ■■■■■
|
||
|
||
nercone.dev ({server_version})
|
||
welcome to nercone.dev!
|
||
""".strip() + "\n",
|
||
status_code=200
|
||
)
|
||
|
||
@app.api_route("/error/{code}", methods=["GET", "POST", "HEAD"])
|
||
async def fake_error_page(request: Request, code: str):
|
||
return error_page(templates=templates, request=request, status_code=int(code))
|
||
|
||
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
|
||
async def default_response(request: Request, full_path: str) -> Response:
|
||
if not full_path.endswith(".html"):
|
||
try:
|
||
if static := resolve_static_file(full_path):
|
||
return FileResponse(static)
|
||
except PermissionError:
|
||
return error_page(templates, request, 403, "ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいの?えっt")
|
||
|
||
if full_path in ["", "/"]:
|
||
template_candidates = ["index.html"]
|
||
elif full_path.endswith(".html"):
|
||
template_candidates = [full_path.lstrip('/')]
|
||
else:
|
||
template_candidates = [f"{full_path.strip('/')}.html", f"{full_path.strip('/')}/index.html"]
|
||
|
||
for name in template_candidates:
|
||
try:
|
||
if "curl" in request.headers.get("user-agent", "").lower():
|
||
content = templates.env.get_template(name).render(request=request)
|
||
soup = BeautifulSoup(content, "html.parser")
|
||
main = str(soup.find("main")) if soup.find("main") else content
|
||
markdown = markitdown.convert_stream(io.BytesIO(main.encode("utf-8")), file_extension=".html")
|
||
accesscounter.increase()
|
||
return PlainTextResponse(markdown.text_content, status_code=200)
|
||
else:
|
||
response = templates.TemplateResponse(status_code=200, request=request, name=name)
|
||
accesscounter.increase()
|
||
return response
|
||
except TemplateNotFound:
|
||
continue
|
||
|
||
try:
|
||
path = Path.cwd().joinpath("public", "shorturls.json")
|
||
if not path.exists():
|
||
return error_page(templates, request, 500, "設定ファイルぐらい用意しておけよ!")
|
||
shorturls = json.load(path.open("r", encoding="utf-8"))
|
||
except Exception:
|
||
return error_page(templates, request, 500, "なにこの設定ファイル読めないじゃない!")
|
||
|
||
if result := resolve_shorturl(shorturls, full_path):
|
||
return RedirectResponse(url=result)
|
||
|
||
return error_page(templates, request, 404, "そんなページ知らないっ!")
|