--
This commit is contained in:
@@ -37,15 +37,18 @@ def _cache_fresh(p: Path) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
async def _fetch_chrome_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
"""Return mapping: host -> include_subdomains.
|
||||
async def _fetch_chrome_list(timeout: float = 60.0) -> dict[str, bool] | None:
|
||||
"""Return mapping: host -> include_subdomains, or None on fetch/parse error.
|
||||
|
||||
Chromium's list is served base64-encoded when ?format=TEXT.
|
||||
Some Gitiles deployments prepend a )]}' XSSI-prevention prefix.
|
||||
"""
|
||||
cache = _cache_path("chrome")
|
||||
if _cache_fresh(cache):
|
||||
try:
|
||||
return json.loads(cache.read_text())
|
||||
data = json.loads(cache.read_text())
|
||||
if data:
|
||||
return data
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
@@ -53,11 +56,14 @@ async def _fetch_chrome_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
follow_redirects=True) as client:
|
||||
resp = await client.get(CHROME_URL)
|
||||
if resp.status_code != 200:
|
||||
return {}
|
||||
return None
|
||||
import base64
|
||||
raw = base64.b64decode(resp.content).decode("utf-8", errors="replace")
|
||||
content = resp.content
|
||||
if content.startswith(b")]}'\n"):
|
||||
content = content[5:]
|
||||
raw = base64.b64decode(content).decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return {}
|
||||
return None
|
||||
# Strip // comments (json with comments) and parse a loose subset
|
||||
cleaned = re.sub(r"//[^\n]*", "", raw)
|
||||
cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL)
|
||||
@@ -76,14 +82,16 @@ async def _fetch_chrome_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
cache.write_text(json.dumps(result))
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
return result if result else None
|
||||
|
||||
|
||||
async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
async def _fetch_firefox_list(timeout: float = 60.0) -> dict[str, bool] | None:
|
||||
cache = _cache_path("firefox")
|
||||
if _cache_fresh(cache):
|
||||
try:
|
||||
return json.loads(cache.read_text())
|
||||
data = json.loads(cache.read_text())
|
||||
if data:
|
||||
return data
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
@@ -91,10 +99,10 @@ async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
follow_redirects=True) as client:
|
||||
resp = await client.get(FIREFOX_URL)
|
||||
if resp.status_code != 200:
|
||||
return {}
|
||||
return None
|
||||
text = resp.text
|
||||
except Exception:
|
||||
return {}
|
||||
return None
|
||||
result: dict[str, bool] = {}
|
||||
# Historically Firefox shipped the preload list as a C-array of
|
||||
# { "host.example", true },
|
||||
@@ -126,7 +134,7 @@ async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
|
||||
cache.write_text(json.dumps(result))
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
return result if result else None
|
||||
|
||||
|
||||
def _lookup(host: str, table: dict[str, bool]) -> tuple[bool, bool]:
|
||||
@@ -146,14 +154,16 @@ async def check_preload(host: str) -> list[PreloadResult]:
|
||||
chrome = await _fetch_chrome_list()
|
||||
firefox = await _fetch_firefox_list()
|
||||
results: list[PreloadResult] = []
|
||||
c_listed, c_sub = _lookup(host, chrome)
|
||||
f_listed, f_sub = _lookup(host, firefox)
|
||||
results.append(PreloadResult("chrome", c_listed, c_sub, None if chrome else "source unavailable"))
|
||||
results.append(PreloadResult("firefox", f_listed, f_sub, None if firefox else "source unavailable"))
|
||||
c_listed, c_sub = _lookup(host, chrome) if chrome is not None else (False, False)
|
||||
f_listed, f_sub = _lookup(host, firefox) if firefox is not None else (False, False)
|
||||
chrome_err = "source unavailable" if chrome is None else None
|
||||
firefox_err = "source unavailable" if firefox is None else None
|
||||
results.append(PreloadResult("chrome", c_listed, c_sub, chrome_err))
|
||||
results.append(PreloadResult("firefox", f_listed, f_sub, firefox_err))
|
||||
# Edge (Chromium) and Internet Explorer on Windows 10+ both rely on the
|
||||
# Chromium HSTS preload list via WinINet; no separate list is published
|
||||
# for either. Report both with the Chromium lookup so the UI doesn't
|
||||
# carry phantom "not supported" / "source unavailable" entries.
|
||||
results.append(PreloadResult("edge", c_listed, c_sub, None if chrome else "source unavailable"))
|
||||
results.append(PreloadResult("ie", c_listed, c_sub, None if chrome else "source unavailable"))
|
||||
results.append(PreloadResult("edge", c_listed, c_sub, chrome_err))
|
||||
results.append(PreloadResult("ie", c_listed, c_sub, chrome_err))
|
||||
return results
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
from __future__ import annotations
|
||||
import ipaddress
|
||||
from dataclasses import dataclass
|
||||
from .db import TlsTestDB
|
||||
|
||||
HOURLY_LIMIT = 10
|
||||
HOURLY_LIMIT = 30
|
||||
HOURLY_WINDOW = 60 * 60
|
||||
|
||||
|
||||
@@ -13,9 +14,19 @@ class RateDecision:
|
||||
retry_after: int = 0
|
||||
|
||||
|
||||
def _is_private(ip: str) -> bool:
|
||||
try:
|
||||
a = ipaddress.ip_address(ip)
|
||||
return a.is_private or a.is_loopback or a.is_link_local
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def check(db: TlsTestDB, client_ip: str | None) -> RateDecision:
|
||||
if not client_ip:
|
||||
return RateDecision(allowed=True)
|
||||
if _is_private(client_ip):
|
||||
return RateDecision(allowed=True)
|
||||
if db.count_ip_active(client_ip) > 0:
|
||||
return RateDecision(
|
||||
allowed=False,
|
||||
@@ -26,7 +37,7 @@ def check(db: TlsTestDB, client_ip: str | None) -> RateDecision:
|
||||
if recent >= HOURLY_LIMIT:
|
||||
return RateDecision(
|
||||
allowed=False,
|
||||
reason=f"1時間あたり{HOURLY_LIMIT}件の上限に達しました。時間をおいて再度お試しください。",
|
||||
reason=f"1時間あたり{HOURLY_LIMIT}件の上限に達しました。しばらく時間をおいて再度お試しください。",
|
||||
retry_after=HOURLY_WINDOW,
|
||||
)
|
||||
return RateDecision(allowed=True)
|
||||
|
||||
Reference in New Issue
Block a user