This commit is contained in:
2026-03-29 00:10:48 +09:00
parent ccb4853950
commit 046a51776b
16 changed files with 33 additions and 33 deletions

View File

View File

@@ -0,0 +1,35 @@
import uvicorn
from .server import app
def main():
log_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)s %(name)s: %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S"
}
},
"handlers": {
"file": {
"class": "logging.FileHandler",
"filename": "logs/uvicorn.log",
"formatter": "default"
},
"console": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "default"
}
},
"loggers": {
"uvicorn": {"handlers": ["file", "console"], "level": "INFO", "propagate": False},
"uvicorn.error": {"handlers": ["file", "console"], "level": "INFO", "propagate": False},
"uvicorn.access": {"handlers": ["file", "console"], "level": "INFO", "propagate": False}
}
}
uvicorn.run("nercone_website.server:app", host="0.0.0.0", port=8080, workers=1, server_header=False, log_config=log_config)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,63 @@
import sqlite3
from pathlib import Path
class AccessCounter:
def __init__(self, filepath: str = str(Path.cwd().joinpath("databases", "access_counter.db"))):
self.filepath = filepath
def get(self) -> int:
if Path(self.filepath).is_file():
conn = sqlite3.connect(self.filepath)
try:
cur = conn.cursor()
cur.execute("SELECT value FROM access_counter WHERE rowid = 1")
row = cur.fetchone()
if row is None:
conn.execute("""
CREATE TABLE IF NOT EXISTS access_counter (
value INTEGER NOT NULL
)
""")
conn.execute("INSERT OR IGNORE INTO access_counter (rowid, value) VALUES (1, 0)")
conn.commit()
return 0
return row[0]
finally:
conn.close()
else:
conn = sqlite3.connect(self.filepath)
conn.execute("""
CREATE TABLE IF NOT EXISTS access_counter (
value INTEGER NOT NULL
)
""")
conn.execute("INSERT OR IGNORE INTO access_counter (rowid, value) VALUES (1, 0)")
conn.commit()
conn.close()
return 0
def increase(self):
if Path(self.filepath).is_file():
conn = sqlite3.connect(self.filepath)
try:
cur = conn.cursor()
conn.execute("BEGIN IMMEDIATE")
cur.execute(
"UPDATE access_counter SET value = value + 1 WHERE rowid = 1"
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
else:
conn = sqlite3.connect(self.filepath)
conn.execute("""
CREATE TABLE IF NOT EXISTS access_counter (
value INTEGER NOT NULL
)
""")
conn.execute("INSERT OR IGNORE INTO access_counter (rowid, value) VALUES (1, 1)")
conn.commit()
conn.close()

View File

@@ -0,0 +1,29 @@
from http import HTTPStatus
from fastapi import Request, Response
from fastapi.templating import Jinja2Templates
default_messages = {
400: "日本語でおk",
401: "見たいのならログインすることね",
402: "夢が欲しけりゃ金払え!",
403: "あんたなんかに見せるもんですか!",
404: "そんなページ知らないっ!",
405: "そのMethodはNot Allowedだよ",
406: "すまんがその条件ではお渡しできない。",
407: "うちのプロキシ使うんだったらまずログインしな。",
408: "もう用がないならさっさと帰りなさい。",
409: "ちょっと待ったそんな話聞いてないぞ",
410: "もう無いで。",
411: "サイズを教えろ。話はそれからだ。",
412: "なにその条件美味しいの",
413: "そ、そそ、そんなの入りきらないよっ!",
414: "もちつけ",
415: "そんな形式知らない!",
416: "そんな大きく...ない...んだ...",
417: "期待させて悪かったわね!",
418: "ティーポット「私はコーヒーを注ぐためのものではありません!やだっ!」"
}
def error_page(templates: Jinja2Templates, request: Request, status_code: int, message: str | None = None) -> Response:
status_code_name = HTTPStatus(status_code).phrase
return templates.TemplateResponse(status_code=status_code, request=request, name="error.html", context={"status_code": status_code, "status_code_name": status_code_name, "message": message or default_messages.get(status_code, "あんのーん")})

View File

@@ -0,0 +1,34 @@
import uuid
import json
from pathlib import Path
from starlette.types import Scope
from datetime import datetime, timezone
access_log_path = Path.cwd().joinpath("logs", "access.log")
access_log_path.parent.mkdir(parents=True, exist_ok=True)
def log_access(scope: Scope, write: bool = True):
client = scope.get("client") or ("", 0)
server = scope.get("server") or ("", 0)
headers = dict(scope.get("headers", []))
hostname = headers.get(b"host", b"").decode().split(":")[0].strip()
log = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"from": {
"address": client[0],
"port": client[1]
},
"to": {
"scheme": scope.get("scheme", "https"),
"host": hostname,
"port": server[1]
},
"method": scope.get("method", "GET"),
"path": scope.get("path", "/"),
"headers": {k.decode(): v.decode() for k, v in headers.items()}
}
if write:
with access_log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(log, ensure_ascii=False) + "\n")
return log

View File

@@ -0,0 +1,107 @@
import subprocess
from datetime import datetime, timezone
from fastapi import Response
from fastapi.responses import PlainTextResponse
from starlette.types import Scope, ASGIApp, Receive, Send
from .logger import log_access
server_version = subprocess.run(["/usr/bin/git", "rev-parse", "--short", "HEAD"], text=True, capture_output=True).stdout.strip()
onion_hostname = "4sbb7xhdn4meuesnqvcreewk6sjnvchrsx4lpnxmnjhz2soat74finid.onion"
hostnames = ["localhost", "nercone.dev", "d-g-c.net", "diamondgotcat.net", onion_hostname]
class Middleware:
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return
if scope["type"] == "websocket":
await self.app(scope, receive, send)
return
headers = dict(scope.get("headers", []))
hostname = headers.get(b"host", b"").decode().split(":")[0].strip()
scope["log"] = log_access(scope)
if not any([hostname.endswith(candidate) for candidate in hostnames]):
response = PlainTextResponse("許可されていないホスト名でのアクセスです。", status_code=400)
await self._send_with_headers(response, scope, receive, send)
return
hostname_parts = hostname.split(".")
if hostname_parts[1:] == ["localhost"]:
subdomain = ".".join(hostname_parts[:-1])
else:
subdomain = ".".join(hostname_parts[:-2])
body = await self._read_body(receive)
async def cached_receive():
return {"type": "http.request", "body": body, "more_body": False}
if subdomain not in ["", "www"]:
original_path = scope["path"] if scope["path"].strip() else "/"
subdomain_path = f"/{'/'.join(subdomain.split('.')[::-1])}{original_path}"
response = await self._get_response(scope, cached_receive, subdomain_path)
if response.status_code < 400:
await self._send_with_headers(response, scope, cached_receive, send)
return
response = await self._get_response(scope, cached_receive, original_path)
await self._send_with_headers(response, scope, cached_receive, send)
else:
response = await self._get_response(scope, cached_receive, scope["path"])
await self._send_with_headers(response, scope, cached_receive, send)
async def _get_response(self, scope: Scope, receive: Receive, path: str) -> Response:
new_scope = dict(scope, path=path)
status_code = 200
resp_headers = []
body_parts = []
async def capture_send(message):
nonlocal status_code, resp_headers
if message["type"] == "http.response.start":
status_code = message["status"]
resp_headers = message.get("headers", [])
elif message["type"] == "http.response.body":
body_parts.append(message.get("body", b""))
body = await self._read_body(receive)
async def cached_receive():
return {"type": "http.request", "body": body, "more_body": False}
await self.app(new_scope, cached_receive, capture_send)
response = Response(
content=b"".join(body_parts),
status_code=status_code,
)
if response.status_code == 404 and path != "/" and path.endswith("/"):
return await self._get_response(scope, cached_receive, path.rstrip("/"))
for k, v in resp_headers:
response.headers.raw.append((k, v))
return response
async def _read_body(self, receive: Receive) -> bytes:
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
return body
async def _send_with_headers(self, response: Response, scope, receive, send):
response.headers["Server"] = f"nercone.dev ({server_version})"
response.headers["Onion-Location"] = f"http://{onion_hostname}/"
if "access-control-allow-origin" not in response.headers:
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "*"
response.headers["Access-Control-Allow-Headers"] = "*"
await response(scope, receive, send)

View File

@@ -0,0 +1,48 @@
import re
import httpx
import asyncio
from websockets.client import connect
from fastapi import Request, Response, WebSocket
hop_by_hop_headers = ["transfer-encoding", "connection", "keep-alive", "upgrade", "proxy-authenticate", "proxy-authorization", "te", "trailers", "content-encoding", "content-length"]
def to_raw_headers(str_headers: dict[str, str]) -> list[tuple[bytes, bytes]]:
return [(k.encode("latin-1"), v.encode("latin-1")) for k, v in str_headers.items()]
def make_http_proxy(base_url_http: str, headers: dict = {}, remove_prefix_path: bool = False):
async def http_proxy(request: Request, path: str = "") -> Response:
url = f"{base_url_http}/{path}" if remove_prefix_path else f"{base_url_http}{request.url.path}"
merged_headers = dict(request.headers)
merged_headers.pop("accept-encoding", None)
merged_headers |= {k.lower(): v for k, v in headers.items()}
async with httpx.AsyncClient() as client:
resp = await client.request(
method=request.method,
url=url,
headers=to_raw_headers(merged_headers),
content=await request.body(),
params=request.query_params
)
response = Response(content=resp.content, status_code=resp.status_code)
for k, v in resp.headers.multi_items():
if k.lower() not in hop_by_hop_headers:
response.headers.append(k, v)
return response
return http_proxy
def make_websocket_proxy(base_url_websocket: str, remove_prefix_path: bool = False):
async def websocket_proxy(client_ws: WebSocket, path: str = ""):
url = f"{base_url_websocket}/{path}" if remove_prefix_path else f"{base_url_websocket}{client_ws.url.path}"
await client_ws.accept()
async with connect(url) as server_ws:
async def client_to_server():
async for message in client_ws.iter_bytes():
await server_ws.send(message)
async def server_to_client():
async for message in server_ws:
await client_ws.send_bytes(message)
await asyncio.gather(
asyncio.create_task(client_to_server()),
asyncio.create_task(server_to_client())
)
return websocket_proxy

View File

@@ -0,0 +1,126 @@
import json
import random
from pathlib import Path
from zoneinfo import ZoneInfo
from datetime import datetime, timezone
from fastapi import FastAPI, Request, Response
from fastapi.templating import Jinja2Templates
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
from jinja2.exceptions import TemplateNotFound
from .error import error_page
from .database import AccessCounter
from .middleware import Middleware, server_version, onion_hostname
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
app.add_middleware(Middleware)
templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
accesscounter = AccessCounter()
templates.env.globals["get_access_count"] = accesscounter.get
templates.env.globals["server_version"] = server_version
templates.env.globals["onion_site_url"] = f"http://{onion_hostname}/"
def get_current_year() -> str:
return str(datetime.now(ZoneInfo("Asia/Tokyo")).year)
templates.env.globals["get_current_year"] = get_current_year
def get_daily_quote() -> str:
seed = str(datetime.now(timezone.utc).date())
with Path.cwd().joinpath("public", "quotes.txt").open("r") as f:
quotes = f.read().strip().split("\n")
return random.Random(seed).choice(quotes)
templates.env.globals["get_daily_quote"] = get_daily_quote
@app.api_route("/ping", methods=["GET"])
async def ping(request: Request):
return PlainTextResponse("pong!", status_code=200)
@app.api_route("/echo", methods=["GET"])
async def echo(request: Request):
return JSONResponse(request.scope["log"], status_code=200)
@app.api_route("/status", methods=["GET"])
async def status(request: Request):
return JSONResponse(
{
"status": "ok",
"version": server_version,
"daily_quote": get_daily_quote(),
"access_count": accesscounter.get()
},
status_code=200
)
@app.api_route("/welcome", methods=["GET"])
async def ping(request: Request):
return PlainTextResponse(
f"""
■ ■ ■■■■■ ■■■■ ■■■■ ■■■ ■ ■ ■■■■■
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■ ■
■ ■ ■ ■■■■ ■■■■ ■ ■ ■ ■ ■ ■ ■■■■
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
■ ■■ ■ ■ ■ ■ ■ ■ ■ ■■ ■
■ ■ ■■■■■ ■ ■ ■■■■ ■■■ ■ ■ ■■■■■
nercone.dev ({server_version})
welcome to nercone.dev!
""".strip() + "\n",
status_code=200
)
@app.api_route("/error/{code}", methods=["GET", "POST", "HEAD"])
async def fake_error_page(request: Request, code: str):
return error_page(templates=templates, request=request, status_code=int(code))
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
async def default_response(request: Request, full_path: str) -> Response:
if not full_path.endswith(".html"):
base_dir = Path.cwd().joinpath("public")
safe_full_path = full_path.lstrip('/')
target_path = (base_dir / safe_full_path).resolve()
if not str(target_path).startswith(str(base_dir.resolve())):
return error_page(templates=templates, request=request, status_code=403, message="ディレクトリトラバーサルね、知ってる。公開してないところ覗きたいのえっt")
if target_path.exists() and target_path.is_file():
return FileResponse(target_path)
templates_to_try = []
if full_path == "" or full_path == "/":
templates_to_try.append("index.html")
elif full_path.endswith(".html"):
templates_to_try.append(full_path.lstrip('/'))
else:
clean_path = full_path.strip('/')
templates_to_try.append(f"{clean_path}.html")
templates_to_try.append(f"{clean_path}/index.html")
for template_name in templates_to_try:
try:
response = templates.TemplateResponse(status_code=200, request=request, name=template_name)
accesscounter.increase()
return response
except TemplateNotFound:
continue
shorturls_json = Path.cwd().joinpath("public", "shorturls.json")
if not shorturls_json.exists():
return error_page(templates=templates, request=request, status_code=500, message="設定ファイルぐらい用意しておけよ!")
try:
with shorturls_json.open("r", encoding="utf-8") as f:
shorturls = json.load(f)
except Exception:
return error_page(templates=templates, request=request, status_code=500, message="なにこの設定ファイル読めないじゃない!")
current_id = full_path.strip().rstrip("/")
visited = set()
for _ in range(10):
if current_id in visited:
return error_page(templates=templates, request=request, status_code=500, message="循環依存ってなんかちょっとえっt")
visited.add(current_id)
if current_id not in shorturls:
break
entry = shorturls[current_id]
entry_type = entry.get("type")
content = entry.get("content")
if entry_type == "redirect":
return RedirectResponse(url=content)
elif entry_type == "alias":
current_id = content
else:
break
return error_page(templates=templates, request=request, status_code=404, message="そんなページ知らないっ!")