This commit is contained in:
2026-04-19 18:54:47 +09:00
parent 481a6288bf
commit 1acab0917b
21 changed files with 1976 additions and 586 deletions
+102 -7
View File
@@ -145,16 +145,35 @@ async def thumbnail(request: Request, path: str) -> Response:
return Response(content=png, media_type="image/png")
def _validate_tls_target(raw: str) -> str | None:
import ipaddress
s = (raw or "").strip()
if not s or not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s):
if not s or len(s) > 255:
return None
# Reject obviously garbage input (all dashes/dots, control chars, whitespace).
if not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s):
return None
try:
host, port, _ = parse_target(s)
except Exception:
return None
if not host:
if not host or port <= 0 or port > 65535:
return None
if port <= 0 or port > 65535:
# Host must be either an IP literal or a valid-looking hostname.
try:
ipaddress.ip_address(host)
return s
except ValueError:
pass
# Hostname: each label must be 1-63 chars, not start/end with '-', no empty labels.
labels = host.split(".")
if not labels or any(not lbl for lbl in labels):
return None
for lbl in labels:
if len(lbl) > 63 or lbl.startswith("-") or lbl.endswith("-"):
return None
if not re.match(r"^[A-Za-z0-9\-]+$", lbl):
return None
if len(labels) < 2 and host != "localhost":
return None
return s
@@ -213,9 +232,74 @@ async def tls_test_results_page(request: Request, test_id: str) -> Response:
if job.get("status") != "done":
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
result = job.get("result") or {}
categories: dict[str, list[dict]] = {}
for f in result.get("findings", []):
categories.setdefault(f.get("category", "other"), []).append(f)
findings = result.get("findings", [])
# Primary grouping — the 4 tabs requested by the user.
groups: dict[str, list[dict]] = {
"reliability": [],
"safety": [],
"vulnerabilities": [],
"compatibility": [],
"other": [],
}
for f in findings:
g = f.get("group") or "other"
groups.setdefault(g, []).append(f)
# Summary tab — score-impacting findings, sorted by impact desc.
impactful = sorted(
[f for f in findings if (f.get("impact") or 0) > 0],
key=lambda f: f.get("impact", 0),
reverse=True,
)
# Always include the top "good" findings too so the Summary tab is not empty
# on pristine sites. Take up to 20 impactful + 6 positives.
positives = [f for f in findings if f.get("severity") == "good"][:6]
summary = impactful[:20] + positives
# Human-friendly "finished_at" for the results header: ISO-like local string.
import datetime as _dt
finished_ts = job.get("finished_at") or job.get("started_at") or 0
finished_at_display = ""
try:
if finished_ts:
finished_at_display = _dt.datetime.fromtimestamp(int(finished_ts)).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
finished_at_display = ""
# Sharable canonical URL for the "Copy link" button. Prefer the public
# tools.* subdomain when known, otherwise the current request's origin.
share_url = str(request.url).split("#", 1)[0]
# ---- Reconstruct the "live log" that was shown on the processing page ----
# Progress entries are phase-transition messages stored in test_progress
# (ordered by seq). Findings arrive between progress rows and are grouped
# by the phase they ran under (Finding.step). We interleave the two so the
# ログ tab reads exactly like the real-time WS stream did.
progress_entries = tls_test_db.get_progress(test_id)
findings_by_step: dict[str, list[dict]] = {}
for f in findings:
step = (f.get("step") or "").strip()
findings_by_step.setdefault(step, []).append(f)
log_entries: list[dict] = []
seen_steps: set[str] = set()
for p in progress_entries:
log_entries.append({
"kind": "phase",
"phase": p.get("phase") or "",
"detail": p.get("detail") or "",
"severity": p.get("severity") or "info",
})
step = (p.get("phase") or "").strip()
if step and step not in seen_steps:
for f in findings_by_step.get(step, []):
log_entries.append({"kind": "finding", "finding": f})
seen_steps.add(step)
# Any findings whose step never showed up as a progress row (rare — engine
# errors etc.) — append them at the end so nothing is dropped.
for step, fs in findings_by_step.items():
if step not in seen_steps:
for f in fs:
log_entries.append({"kind": "finding", "finding": f})
return templates.TemplateResponse(
request=request,
name="tools/tls-test/results.html",
@@ -223,7 +307,18 @@ async def tls_test_results_page(request: Request, test_id: str) -> Response:
"test_id": test_id,
"job": job,
"result": result,
"categories": categories,
"groups": groups,
"summary": summary,
"group_labels": {
"reliability": "信頼性",
"safety": "安全性",
"vulnerabilities": "脆弱性",
"compatibility": "互換性",
"other": "その他",
},
"finished_at_display": finished_at_display,
"share_url": share_url,
"log_entries": log_entries,
},
)
+54 -5
View File
@@ -5,7 +5,22 @@ import time
from pathlib import Path
from typing import Any
RETENTION_SECONDS = 7 * 24 * 60 * 60
# Retention policy
# ----------------
# The DB column `expires_at` is kept for schema compatibility but is no longer
# a hard cutoff. The policy is:
# • Every row is preserved for at least MIN_RETENTION_SECONDS (7 days) after
# it was created.
# • Beyond that, rows are kept for up to MAX_RETENTION_SECONDS (1 year).
# • After 7 days, rows are only evicted when the total row count exceeds
# MAX_KEPT_ROWS (100). In that case, rows older than 7 days are deleted
# oldest-first until the total row count falls back to MAX_KEPT_ROWS.
MIN_RETENTION_SECONDS = 7 * 24 * 60 * 60
MAX_RETENTION_SECONDS = 365 * 24 * 60 * 60
MAX_KEPT_ROWS = 100
# Kept for backwards compatibility with any importer. `expires_at` is filled in
# with MAX_RETENTION_SECONDS at creation time (hard upper bound).
RETENTION_SECONDS = MAX_RETENTION_SECONDS
class TlsTestDB:
@@ -196,16 +211,50 @@ class TlsTestDB:
conn.close()
def delete_expired(self) -> int:
"""Apply the retention policy.
Step 1: unconditionally drop rows past the 1-year hard ceiling.
Step 2: if total row count exceeds MAX_KEPT_ROWS, drop rows older than
MIN_RETENTION_SECONDS (7 days) in oldest-first order until the
count falls back to MAX_KEPT_ROWS. Rows within 7 days are
never evicted by this step.
Returns the number of rows deleted.
"""
now = int(time.time())
deleted = 0
conn = self._conn()
try:
cur = conn.cursor()
cur.execute("SELECT id FROM tests WHERE expires_at < ?", (now,))
ids = [r[0] for r in cur.fetchall()]
for tid in ids:
# Step 1 — hard ceiling: 1-year-old rows.
hard_cutoff = now - MAX_RETENTION_SECONDS
cur.execute("SELECT id FROM tests WHERE created_at < ?", (hard_cutoff,))
old_ids = [r[0] for r in cur.fetchall()]
for tid in old_ids:
conn.execute("DELETE FROM test_progress WHERE test_id = ?", (tid,))
conn.execute("DELETE FROM tests WHERE id = ?", (tid,))
deleted += len(old_ids)
# Step 2 — count-triggered eviction of >7-day-old rows.
cur.execute("SELECT COUNT(*) FROM tests")
total = cur.fetchone()[0]
if total > MAX_KEPT_ROWS:
excess = total - MAX_KEPT_ROWS
soft_cutoff = now - MIN_RETENTION_SECONDS
cur.execute(
"SELECT id FROM tests"
" WHERE created_at < ?"
" ORDER BY created_at ASC"
" LIMIT ?",
(soft_cutoff, excess),
)
evict_ids = [r[0] for r in cur.fetchall()]
for tid in evict_ids:
conn.execute("DELETE FROM test_progress WHERE test_id = ?", (tid,))
conn.execute("DELETE FROM tests WHERE id = ?", (tid,))
deleted += len(evict_ids)
conn.commit()
return len(ids)
return deleted
finally:
conn.close()
+238 -191
View File
@@ -34,8 +34,15 @@ ReportProgress = Callable[[str, str, float, str], Awaitable[None]]
ReportFinding = Callable[[Finding], Awaitable[None]]
# Groups: reliability (信頼性), safety (安全性), vulnerabilities (脆弱性), compatibility (互換性)
G_REL = "reliability"
G_SAF = "safety"
G_VLN = "vulnerabilities"
G_CMP = "compatibility"
def parse_target(target: str) -> tuple[str, int, str | None]:
"""Return (host, port, sni). sni=None means pass hostname for SNI; None when IP literal."""
"""Return (host, port, sni). sni=None means the target is an IP literal."""
s = target.strip()
port = 443
host = s
@@ -57,7 +64,6 @@ def parse_target(target: str) -> tuple[str, int, str | None]:
except ValueError:
port = 443
else:
# bare IPv6 without brackets or hostname
host = s
host = host.strip()
sni: str | None = host
@@ -74,6 +80,10 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
port = result.port
sni = result.data.get("sni")
async def emit(f: Finding) -> None:
result.add(f)
await finds(f)
# ---- Phase: protocol versions ----
await report("protocols", "TLS/SSL バージョンを検査中", 0.05, "info")
versions_to_probe = [C.TLS_1_0, C.TLS_1_1, C.TLS_1_2, C.TLS_1_3]
@@ -83,10 +93,8 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
pr = await probe_tls_version(host, port, sni, v)
version_support[v] = pr.supported
version_cipher[v] = pr.negotiated_cipher
# SSLv2 check via raw
sslv2 = await send_ssl2_client_hello(host, port)
sslv2_supported = sslv2.connected and sslv2.alert is None and bool(sslv2.raw)
# SSLv3 via raw (OpenSSL typically disables SSLv3 even with SECLEVEL=0)
from .protocol.client import send_client_hello
from .protocol import wire
try:
@@ -109,30 +117,37 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
version_support[C.SSL_2_0] = sslv2_supported
version_support[C.SSL_3_0] = sslv3_supported
versions_supported = {v for v, ok in version_support.items() if ok}
result.data["versions"] = {C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): ok for v, ok in version_support.items()}
# Emit versions in chronological order — SSL 2.0, SSL 3.0, then TLS 1.0+.
# Reads left-to-right in the results table the way humans describe it.
ordered_versions = [C.SSL_2_0, C.SSL_3_0, C.TLS_1_0, C.TLS_1_1, C.TLS_1_2, C.TLS_1_3]
result.data["versions"] = {
C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): version_support.get(v, False)
for v in ordered_versions
}
# Findings per version
# SSL 2/3 → vulnerability findings; also record them as safety findings.
if sslv2_supported:
result.add(Finding("protocol", "SSL 2.0 supported", "SSLv2 は完全に破綻しています", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "SSL 2.0 supported", "SSLv2 は完全に破綻しています (DROWN)", "serious", 10, group=G_SAF))
if sslv3_supported:
result.add(Finding("protocol", "SSL 3.0 supported", "POODLE 攻撃が可能", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "SSL 3.0 supported", "POODLE 攻撃が可能", "serious", 10, group=G_SAF))
if version_support.get(C.TLS_1_0):
result.add(Finding("protocol", "TLS 1.0 supported", "廃止済みプロトコル (RFC 8996)", "notgood", 4))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "TLS 1.0 supported", "RFC 8996 で廃止済み", "notgood", 4, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.0 disabled", "", "good", 0, group=G_SAF))
if version_support.get(C.TLS_1_1):
result.add(Finding("protocol", "TLS 1.1 supported", "廃止済みプロトコル (RFC 8996)", "notgood", 2))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "TLS 1.1 supported", "RFC 8996 で廃止済み", "notgood", 2, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.1 disabled", "", "good", 0, group=G_SAF))
if version_support.get(C.TLS_1_2):
result.add(Finding("protocol", "TLS 1.2 supported", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "TLS 1.2 supported", "", "good", 0, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.2 not supported", "推奨プロトコルが有効化されていません", "notgood", 3, group=G_SAF))
if version_support.get(C.TLS_1_3):
result.add(Finding("protocol", "TLS 1.3 supported", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "TLS 1.3 supported", "", "good", 0, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.3 not supported", "最新プロトコルが有効化されていません", "notgood", 3, group=G_SAF))
if not version_support.get(C.TLS_1_2) and not version_support.get(C.TLS_1_3):
result.add(Finding("protocol", "No modern TLS (1.2/1.3) supported", "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("SSL/TLS Version", "No modern TLS (1.2/1.3) supported", "", "serious", 10, group=G_SAF))
# ---- Phase: cipher enumeration ----
await report("ciphers", "暗号スイートを列挙中", 0.20, "info")
@@ -171,35 +186,36 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
for v, cids in accepted_per_version.items()
}
# Cipher family findings
any_accepted = {cid for cids in accepted_per_version.values() for cid in cids}
if any_accepted:
has_fs = False
has_aead = False
has_weak = False
has_cbc_modern = False
has_rsa_kex = False
for cid in any_accepted:
name = C.CIPHER_SUITES.get(cid, "")
if C.cipher_has_fs(name):
has_fs = True
if C.cipher_is_aead(name):
has_aead = True
if C.cipher_is_weak(name):
has_weak = True
if C.cipher_is_cbc(name):
has_cbc_modern = True
if name.startswith("TLS_RSA_") and not C.cipher_is_weak(name):
has_rsa_kex = True
if has_fs:
result.add(Finding("cipher", "Forward secrecy supported", "ECDHE/DHE ciphers 利用可", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Cipher Strength", "Forward secrecy supported", "ECDHE/DHE ciphers 利用可", "good", 0, group=G_SAF))
else:
result.add(Finding("cipher", "No forward secrecy", "ECDHE/DHE が有効になっていない", "bad", 5))
await finds(result.findings[-1])
await emit(Finding("Cipher Strength", "No forward secrecy", "ECDHE/DHE が有効になっていない", "bad", 5, group=G_SAF))
if has_aead:
result.add(Finding("cipher", "AEAD ciphers supported", "GCM/ChaCha20-Poly1305 利用可", "good", 0))
await finds(result.findings[-1])
# Specific weak-cipher findings are emitted later under Vulnerabilities (SWEET32/RC4/FREAK/etc.),
# so we only add an informational summary here to avoid double-penalty.
if has_weak:
result.add(Finding("cipher", "Weak cipher family in accepted list",
"詳細は Vulnerabilities セクションを参照", "info", 0))
await finds(result.findings[-1])
await emit(Finding("Cipher Strength", "AEAD ciphers supported", "GCM / ChaCha20-Poly1305 / CCM 利用可", "good", 0, group=G_SAF))
else:
await emit(Finding("Cipher Strength", "No AEAD ciphers", "", "notgood", 2, group=G_SAF))
if has_rsa_kex:
await emit(Finding("Cipher Strength", "RSA key exchange accepted",
"鍵交換に RSA_WITH_* が利用可能 — 前方秘匿性なし", "notgood", 2, group=G_SAF))
if has_cbc_modern:
await emit(Finding("Cipher Strength", "CBC ciphers in accepted list",
"LUCKY13 など CBC ベース攻撃のリスク", "info", 0, group=G_SAF))
# ---- Phase: key exchange / groups ----
await report("kex", "鍵交換グループを検査中", 0.35, "info")
@@ -213,19 +229,21 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
if groups_accepted:
strong = [g for g in groups_accepted if g in (0x001d, 0x0017, 0x0018, 0x0019, 0x001e)]
pqc = [g for g in groups_accepted if g in C.PQC_GROUPS]
if strong:
result.add(Finding("kex", "Modern named groups supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in strong), "good", 0))
await finds(result.findings[-1])
if pqc:
result.add(Finding("kex", "Post-quantum key exchange supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in pqc), "good", 0))
await finds(result.findings[-1])
weak = [g for g in groups_accepted if g in (0x0100,)]
if strong:
await emit(Finding("Key Exchange", "Modern named groups supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in strong), "good", 0, group=G_SAF))
if pqc:
await emit(Finding("Key Exchange", "Post-quantum key exchange supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in pqc), "good", 0, group=G_SAF))
else:
await emit(Finding("Key Exchange", "No post-quantum key exchange",
"X25519MLKEM768 等の PQC 鍵交換に対応していません", "info", 0, group=G_SAF))
if weak:
result.add(Finding("kex", "Weak FFDHE group supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in weak), "notgood", 3))
await finds(result.findings[-1])
await emit(Finding("Key Exchange", "Weak FFDHE group supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in weak), "notgood", 3, group=G_SAF))
elif version_support.get(C.TLS_1_3):
await emit(Finding("Key Exchange", "Named groups probe inconclusive", "", "info", 0, group=G_SAF))
# ---- Phase: certificates ----
await report("cert", "証明書を取得・解析中", 0.45, "info")
@@ -270,77 +288,70 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
]
if not leaf_summary:
result.add(Finding("cert", "No certificate retrieved", "TLS ハンドシェイクが失敗", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Certificate", "No certificate retrieved", "TLS ハンドシェイクが失敗", "serious", 10, group=G_REL))
else:
# Expiry
if leaf_summary.is_expired:
result.add(Finding("cert", "Certificate expired",
f"not_after={leaf_summary.not_after}", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Certificate Validity", "Certificate expired",
f"not_after={leaf_summary.not_after}", "serious", 10, group=G_REL))
elif leaf_summary.days_until_expiry < 15:
result.add(Finding("cert", "Certificate expires soon",
f"残り {leaf_summary.days_until_expiry}", "bad", 4))
await finds(result.findings[-1])
await emit(Finding("Certificate Validity", "Certificate expires soon",
f"残り {leaf_summary.days_until_expiry}", "bad", 4, group=G_REL))
elif leaf_summary.days_until_expiry < 30:
result.add(Finding("cert", "Certificate expiring in <30 days",
f"残り {leaf_summary.days_until_expiry}", "notgood", 2))
await finds(result.findings[-1])
await emit(Finding("Certificate Validity", "Certificate expiring in <30 days",
f"残り {leaf_summary.days_until_expiry}", "notgood", 2, group=G_REL))
else:
result.add(Finding("cert", "Certificate validity OK",
f"残り {leaf_summary.days_until_expiry}", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Certificate Validity", "Certificate validity OK",
f"残り {leaf_summary.days_until_expiry}", "good", 0, group=G_REL))
# Chain length
if len(parsed_chain) == 1 and not leaf_summary.is_self_signed:
await emit(Finding("Certificate Chain", "Intermediate certificate not served",
"サーバーがチェーンに中間証明書を含めていません", "notgood", 2, group=G_REL))
elif len(parsed_chain) >= 2:
await emit(Finding("Certificate Chain", f"Chain length: {len(parsed_chain)} certs",
f"leaf → {parsed_chain[-1].subject[:80]}", "good", 0, group=G_REL))
# Self-signed
if leaf_summary.is_self_signed:
result.add(Finding("cert", "Self-signed certificate",
"CA 署名ではなく自己署名です", "serious", 9))
await finds(result.findings[-1])
await emit(Finding("Certificate Chain", "Self-signed certificate",
"CA 署名ではなく自己署名です", "serious", 9, group=G_REL))
# Signature hash
# Signature hash (safety — cryptographic strength)
sh = (leaf_summary.signature_hash_algorithm or "").lower()
if sh in ("md5", "md2"):
result.add(Finding("cert", f"Weak signature hash: {sh.upper()}", "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Signature Algorithm", f"Weak signature hash: {sh.upper()}", "", "serious", 10, group=G_SAF))
elif sh == "sha1":
result.add(Finding("cert", "Weak signature hash: SHA1", "", "bad", 5))
await finds(result.findings[-1])
await emit(Finding("Signature Algorithm", "Weak signature hash: SHA1", "", "bad", 5, group=G_SAF))
elif sh:
result.add(Finding("cert", f"Signature hash: {sh.upper()}", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Signature Algorithm", f"Signature hash: {sh.upper()}", "", "good", 0, group=G_SAF))
# Public key strength
# Public key strength (safety)
alg = leaf_summary.public_key_algorithm
bits = leaf_summary.public_key_size_bits
if alg == "RSA":
if bits < 1024:
result.add(Finding("cert", f"Very weak RSA key ({bits}-bit)", "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Key Strength", f"Very weak RSA key ({bits}-bit)", "", "serious", 10, group=G_SAF))
elif bits < 2048:
result.add(Finding("cert", f"Weak RSA key ({bits}-bit)", "", "bad", 5))
await finds(result.findings[-1])
await emit(Finding("Key Strength", f"Weak RSA key ({bits}-bit)", "", "bad", 5, group=G_SAF))
elif bits < 3072:
result.add(Finding("cert", f"RSA {bits}-bit", "推奨は 3072-bit 以上", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Key Strength", f"RSA {bits}-bit", "推奨は 3072-bit 以上", "good", 0, group=G_SAF))
else:
result.add(Finding("cert", f"RSA {bits}-bit", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Key Strength", f"RSA {bits}-bit", "", "good", 0, group=G_SAF))
elif alg in ("EC", "Ed25519", "Ed448"):
result.add(Finding("cert", f"{alg} {bits}-bit {leaf_summary.public_key_curve or ''}".strip(),
"", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Key Strength",
f"{alg} {bits}-bit {leaf_summary.public_key_curve or ''}".strip(),
"", "good", 0, group=G_SAF))
# CT / SCT
# CT / SCT (reliability — verifiable audit trail)
if leaf_summary.has_scts:
result.add(Finding("cert", f"Certificate Transparency SCTs present ({leaf_summary.sct_count})",
"", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Transparency", f"Certificate Transparency SCTs present ({leaf_summary.sct_count})",
"", "good", 0, group=G_REL))
else:
result.add(Finding("cert", "No embedded SCTs",
"Certificate Transparency ログが埋め込まれていません", "notgood", 1))
await finds(result.findings[-1])
await emit(Finding("Transparency", "No embedded SCTs",
"Certificate Transparency ログが埋め込まれていません", "notgood", 1, group=G_REL))
# Hostname match (for non-IP targets)
# Hostname match (reliability — identity verification)
if sni:
names = [leaf_summary.common_name] if leaf_summary.common_name else []
names += leaf_summary.sans
@@ -357,16 +368,13 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
matched = True
break
if matched:
result.add(Finding("cert", "Hostname matches certificate", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Hostname", "Hostname matches certificate", "", "good", 0, group=G_REL))
else:
result.add(Finding("cert", "Hostname does not match certificate",
f"SNI={sni}, CN={leaf_summary.common_name}", "serious", 8))
await finds(result.findings[-1])
await emit(Finding("Hostname", "Hostname does not match certificate",
f"SNI={sni}, CN={leaf_summary.common_name}", "serious", 8, group=G_REL))
else:
result.add(Finding("cert", "IP literal target: hostname validation skipped",
"IP 指定のため SNI/証明書名一致の判定を行っていません", "notgood", 2))
await finds(result.findings[-1])
await emit(Finding("Hostname", "IP literal target: hostname validation skipped",
"IP 指定のため SNI/証明書名一致の判定を行っていません", "notgood", 2, group=G_REL))
# ---- Phase: trust stores ----
await report("trust", "5 プラットフォームのトラストストアを照合中", 0.55, "info")
@@ -374,22 +382,21 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
trust_results = await verify_across_platforms(host, port, sni)
except Exception as e:
trust_results = []
result.add(Finding("trust", "Trust evaluation failed", str(e), "info", 0))
await finds(result.findings[-1])
await emit(Finding("Trust Store", "Trust evaluation failed", str(e), "info", 0, group=G_REL))
result.data["trust"] = [
{"platform": t.platform, "trusted": t.trusted, "error": t.error} for t in trust_results
]
untrusted = [t for t in trust_results if not t.trusted]
trusted = [t for t in trust_results if t.trusted]
if trust_results and not untrusted:
result.add(Finding("trust", "Trusted on all platforms",
", ".join(t.platform for t in trusted), "good", 0))
await finds(result.findings[-1])
await emit(Finding("Trust Store", "Trusted on all platforms",
", ".join(t.platform for t in trusted), "good", 0, group=G_REL))
else:
for t in untrusted:
result.add(Finding("trust", f"Not trusted by {t.platform}",
t.error or "", "serious", 6))
await finds(result.findings[-1])
await emit(Finding("Trust Store", f"Not trusted by {t.platform}",
t.error or "", "serious", 6, group=G_REL))
for t in trusted:
await emit(Finding("Trust Store", f"Trusted by {t.platform}", "", "good", 0, group=G_REL))
# ---- Phase: revocation ----
await report("revocation", "証明書失効を確認中", 0.62, "info")
@@ -405,12 +412,14 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
"error": ocsp_res.error,
}
if ocsp_res.checked and ocsp_res.revoked:
result.add(Finding("cert", "Certificate revoked (OCSP)",
ocsp_res.reason or "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Revocation", "Certificate revoked (OCSP)",
ocsp_res.reason or "", "serious", 10, group=G_REL))
elif ocsp_res.checked:
result.add(Finding("cert", "OCSP: certificate is not revoked", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Revocation", "OCSP: certificate is not revoked",
ocsp_res.source or "", "good", 0, group=G_REL))
elif ocsp_res.error:
await emit(Finding("Revocation", "OCSP check inconclusive",
ocsp_res.error, "info", 0, group=G_REL))
except Exception as e:
result.data["ocsp"] = {"error": str(e)}
if leaf_cert:
@@ -423,8 +432,8 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
"error": crl_res.error,
}
if crl_res.checked and crl_res.revoked:
result.add(Finding("cert", "Certificate revoked (CRL)", crl_res.source, "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Revocation", "Certificate revoked (CRL)",
crl_res.source, "serious", 10, group=G_REL))
except Exception as e:
result.data["crl"] = {"error": str(e)}
@@ -435,12 +444,11 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
pwned, err = await check_spki(leaf_summary.spki_sha256)
result.data["pwnedkeys"] = {"pwned": pwned, "error": err}
if pwned:
result.add(Finding("cert", "Private key is publicly known (pwnedkeys)",
"この公開鍵に対応する秘密鍵は既に漏洩しています", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Key Exposure", "Private key is publicly known (pwnedkeys)",
"この公開鍵に対応する秘密鍵は既に漏洩しています", "serious", 10, group=G_REL))
elif err is None:
result.add(Finding("cert", "Private key not listed in pwnedkeys", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Key Exposure", "Private key not listed in pwnedkeys", "",
"good", 0, group=G_REL))
except Exception as e:
result.data["pwnedkeys"] = {"error": str(e)}
@@ -462,19 +470,17 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
}
if hsts.present:
if hsts.max_age >= 15552000:
result.add(Finding("hsts", "HSTS enabled with sufficient max-age",
f"max-age={hsts.max_age}", "good", 0))
await finds(result.findings[-1])
await emit(Finding("HSTS", "HSTS enabled with sufficient max-age",
f"max-age={hsts.max_age}", "good", 0, group=G_REL))
elif hsts.max_age > 0:
result.add(Finding("hsts", "HSTS max-age too short",
f"max-age={hsts.max_age} (推奨 >= 15552000)", "notgood", 2))
await finds(result.findings[-1])
await emit(Finding("HSTS", "HSTS max-age too short",
f"max-age={hsts.max_age} (推奨 >= 15552000)", "notgood", 2, group=G_REL))
if hsts.include_subdomains:
result.add(Finding("hsts", "HSTS includeSubDomains set", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("HSTS", "HSTS includeSubDomains set", "", "good", 0, group=G_REL))
else:
await emit(Finding("HSTS", "HSTS includeSubDomains not set", "", "info", 0, group=G_REL))
else:
result.add(Finding("hsts", "HSTS header missing", "", "notgood", 3))
await finds(result.findings[-1])
await emit(Finding("HSTS", "HSTS header missing", "", "notgood", 3, group=G_REL))
# ---- Phase: HSTS preload ----
if sni:
@@ -490,10 +496,13 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
for p in preload_results
]
listed_any = [p for p in preload_results if p.listed]
not_listed = [p for p in preload_results if not p.listed and not p.source_error]
if listed_any:
result.add(Finding("hsts", "Listed in HSTS preload list",
", ".join(p.browser for p in listed_any), "good", 0))
await finds(result.findings[-1])
await emit(Finding("HSTS Preload", "Listed in HSTS preload list",
", ".join(p.browser for p in listed_any), "good", 0, group=G_REL))
if not_listed and not listed_any:
await emit(Finding("HSTS Preload", "Not on HSTS preload list",
", ".join(p.browser for p in not_listed), "info", 0, group=G_REL))
# ---- Phase: CAA ----
if sni:
@@ -510,12 +519,11 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
"error": caa.error,
}
if caa.records:
result.add(Finding("caa", f"CAA records present ({caa.effective_host})",
"; ".join(caa.records), "good", 0))
await finds(result.findings[-1])
await emit(Finding("CAA", f"CAA records present ({caa.effective_host})",
"; ".join(caa.records), "good", 0, group=G_REL))
else:
result.add(Finding("caa", "No CAA records", "CAA が設定されていません", "notgood", 1))
await finds(result.findings[-1])
await emit(Finding("CAA", "No CAA records",
"CAA が設定されていません", "notgood", 1, group=G_REL))
# ---- Phase: HTTP ----
await report("http", "HTTP/1/2 を検査中", 0.80, "info")
@@ -538,17 +546,23 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
"error": http_info.error,
}
if not http_info.valid_http_response:
result.add(Finding("http", "No valid HTTP response", http_info.error or "", "bad", 6))
await finds(result.findings[-1])
await emit(Finding("HTTP", "No valid HTTP response",
http_info.error or "", "bad", 6, group=G_CMP))
else:
if http_info.http1:
await emit(Finding("HTTP", "HTTP/1.1 supported", "", "good", 0, group=G_CMP))
if http_info.http2:
result.add(Finding("http", "HTTP/2 supported", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("HTTP", "HTTP/2 supported", "", "good", 0, group=G_CMP))
else:
await emit(Finding("HTTP", "HTTP/2 not supported", "", "notgood", 1, group=G_CMP))
if http_info.compression_enabled:
result.add(Finding("http", "HTTP compression enabled",
f"content-encoding={http_info.content_encoding} - BREACH の懸念",
"notgood", 2))
await finds(result.findings[-1])
await emit(Finding("HTTP Compression",
"HTTP compression enabled (BREACH risk)",
f"content-encoding={http_info.content_encoding}",
"notgood", 2, group=G_VLN))
else:
await emit(Finding("HTTP Compression", "HTTP compression disabled",
"BREACH の可能性なし", "good", 0, group=G_VLN))
# ---- Phase: HTTP/3 ----
await report("http3", "HTTP/3 (QUIC) を検査中", 0.84, "info")
@@ -560,8 +574,9 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
if h3 is not None:
result.data["http3"] = {"supported": h3.supported, "error": h3.error}
if h3.supported:
result.add(Finding("http", "HTTP/3 supported", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("HTTP/3", "HTTP/3 supported", "", "good", 0, group=G_CMP))
else:
await emit(Finding("HTTP/3", "HTTP/3 not supported", h3.error or "", "info", 0, group=G_CMP))
# ---- Phase: ALPN ----
try:
@@ -570,31 +585,43 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
alpn = None
if alpn:
result.data["alpn"] = alpn
await emit(Finding("ALPN", f"ALPN negotiated: {alpn}", "", "good", 0, group=G_CMP))
# ---- Phase: vulnerabilities ----
# ---- Phase: vulnerabilities (passive + active) ----
await report("vulns", "脆弱性を検査中", 0.88, "info")
# Passive analysis based on versions/ciphers
vp = analyze_vulns(versions_supported, accepted_per_version)
if vp.drown:
result.add(Finding("vuln", "DROWN (SSLv2)", "SSLv2 が有効", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("DROWN", "DROWN (SSLv2 exposure)", "SSLv2 が有効", "serious", 10, group=G_VLN))
if vp.poodle_ssl:
result.add(Finding("vuln", "POODLE (SSLv3 CBC)", "", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("POODLE", "POODLE (SSLv3 CBC)", "", "serious", 10, group=G_VLN))
if vp.beast:
result.add(Finding("vuln", "BEAST (TLS 1.0 CBC)", "クライアント側の緩和策が有効なら実害は限定的", "notgood", 3)); await finds(result.findings[-1])
await emit(Finding("BEAST", "BEAST (TLS 1.0 CBC)", "", "notgood", 3, group=G_VLN))
if vp.sweet32:
result.add(Finding("vuln", "SWEET32 (3DES)", "3DES が accepted cipher list にあり", "bad", 4)); await finds(result.findings[-1])
await emit(Finding("SWEET32", "SWEET32 (3DES)",
"3DES が accepted cipher list にあり", "bad", 4, group=G_VLN))
if vp.rc4:
result.add(Finding("vuln", "RC4 enabled", "", "serious", 7)); await finds(result.findings[-1])
await emit(Finding("RC4", "RC4 enabled", "", "serious", 7, group=G_VLN))
if vp.freak:
result.add(Finding("vuln", "FREAK (EXPORT RSA)", "", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("FREAK", "FREAK (EXPORT RSA)", "", "serious", 10, group=G_VLN))
if vp.logjam_export:
result.add(Finding("vuln", "LOGJAM (EXPORT DHE)", "", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("LOGJAM", "LOGJAM (EXPORT DHE)", "", "serious", 10, group=G_VLN))
if vp.null_cipher:
result.add(Finding("vuln", "NULL cipher enabled", "", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("NULL cipher", "NULL cipher enabled", "", "serious", 10, group=G_VLN))
if vp.anon_cipher:
result.add(Finding("vuln", "Anonymous cipher enabled", "", "serious", 10)); await finds(result.findings[-1])
await emit(Finding("Anon cipher", "Anonymous cipher enabled", "", "serious", 10, group=G_VLN))
if vp.lucky13:
result.add(Finding("vuln", "LUCKY13 (CBC)", "", "notgood", 2)); await finds(result.findings[-1])
await emit(Finding("LUCKY13", "LUCKY13 (CBC)", "", "notgood", 2, group=G_VLN))
# If no passive vulns detected across the major ones, surface a positive finding.
none_of_the_above = not any([
vp.drown, vp.poodle_ssl, vp.beast, vp.sweet32, vp.rc4,
vp.freak, vp.logjam_export, vp.null_cipher, vp.anon_cipher,
])
if none_of_the_above and any_accepted:
await emit(Finding("Cipher Vulnerabilities",
"No known cipher-family vulnerabilities",
"DROWN / POODLE / FREAK / LOGJAM / RC4 / 3DES / NULL / anon いずれも該当なし",
"good", 0, group=G_VLN))
# Active probes
try:
@@ -605,15 +632,12 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
"error": hb.error,
}
if hb.vulnerable:
result.add(Finding("vuln", "Heartbleed (CVE-2014-0160)", "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("Heartbleed", "Heartbleed (CVE-2014-0160)", "", "serious", 10, group=G_VLN))
elif hb.heartbeat_extension_advertised:
result.add(Finding("vuln", "Heartbeat extension advertised but not exploitable",
"", "notgood", 1))
await finds(result.findings[-1])
await emit(Finding("Heartbleed", "Heartbeat extension advertised but not exploitable",
"", "notgood", 1, group=G_VLN))
else:
result.add(Finding("vuln", "Not vulnerable to Heartbleed", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("Heartbleed", "Not vulnerable to Heartbleed", "", "good", 0, group=G_VLN))
except Exception as e:
result.data["heartbleed"] = {"error": str(e)}
@@ -621,37 +645,33 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
ccs_vuln, ccs_msg = await probe_ccs(host, port, sni)
result.data["ccs_injection"] = {"vulnerable": ccs_vuln, "detail": ccs_msg}
if ccs_vuln:
result.add(Finding("vuln", "CCS Injection (CVE-2014-0224)", "", "serious", 10))
await finds(result.findings[-1])
await emit(Finding("CCS Injection", "CCS Injection (CVE-2014-0224)", "", "serious", 10, group=G_VLN))
else:
result.add(Finding("vuln", "Not vulnerable to CCS Injection", "", "good", 0))
await finds(result.findings[-1])
await emit(Finding("CCS Injection", "Not vulnerable to CCS Injection", "", "good", 0, group=G_VLN))
except Exception as e:
result.data["ccs_injection"] = {"error": str(e)}
try:
renego_ok, renego_err = await probe_secure_renegotiation(host, port, sni)
result.data["secure_renegotiation"] = {"supported": renego_ok, "error": renego_err}
renego_ok, renego_info = await probe_secure_renegotiation(host, port, sni)
result.data["secure_renegotiation"] = {"supported": renego_ok, "detail": renego_info}
if renego_ok:
result.add(Finding("vuln", "Secure renegotiation supported", "", "good", 0))
await finds(result.findings[-1])
detail = renego_info or "RFC 5746 renegotiation_info extension acknowledged"
await emit(Finding("Renegotiation", "Secure renegotiation supported", detail, "good", 0, group=G_VLN))
else:
result.add(Finding("vuln", "Secure renegotiation not supported", renego_err or "",
"bad", 4))
await finds(result.findings[-1])
await emit(Finding("Renegotiation", "Secure renegotiation not supported",
renego_info or "", "bad", 4, group=G_VLN))
except Exception as e:
result.data["secure_renegotiation"] = {"error": str(e)}
try:
scsv_ok, scsv_err = await probe_fallback_scsv(host, port, sni)
result.data["fallback_scsv"] = {"supported": scsv_ok, "error": scsv_err}
scsv_ok, scsv_info = await probe_fallback_scsv(host, port, sni)
result.data["fallback_scsv"] = {"supported": scsv_ok, "detail": scsv_info}
if scsv_ok:
result.add(Finding("vuln", "TLS_FALLBACK_SCSV supported", "", "good", 0))
await finds(result.findings[-1])
detail = scsv_info or "TLS 1.1 ClientHello with SCSV rejected with inappropriate_fallback"
await emit(Finding("Fallback Protection", "TLS_FALLBACK_SCSV enforced", detail, "good", 0, group=G_VLN))
else:
result.add(Finding("vuln", "TLS_FALLBACK_SCSV not enforced", scsv_err or "",
"notgood", 2))
await finds(result.findings[-1])
await emit(Finding("Fallback Protection", "TLS_FALLBACK_SCSV not enforced",
scsv_info or "", "notgood", 2, group=G_VLN))
except Exception as e:
result.data["fallback_scsv"] = {"error": str(e)}
@@ -674,9 +694,22 @@ async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResu
]
if sim:
ok_count = sum(1 for s in sim if s.connected)
result.add(Finding("compat", f"Handshake simulation: {ok_count}/{len(sim)} clients connected",
"", "info", 0))
await finds(result.findings[-1])
total = len(sim)
# Emit per-client findings so the Compatibility tab has granular entries.
for s in sim:
if s.connected:
await emit(Finding("Client Handshake",
f"{s.client}: OK",
f"{s.negotiated_version} / {s.negotiated_cipher}",
"good", 0, group=G_CMP))
else:
await emit(Finding("Client Handshake",
f"{s.client}: failed",
s.error or "",
"info", 0, group=G_CMP))
await emit(Finding("Client Handshake",
f"Handshake summary: {ok_count}/{total} clients connected",
"", "info", 0, group=G_CMP))
async def run_full_scan(target: str, report: ReportProgress, finds: ReportFinding) -> ScanResult:
@@ -685,9 +718,23 @@ async def run_full_scan(target: str, report: ReportProgress, finds: ReportFindin
result = ScanResult(target=target, host=host, port=port, started_at=started)
result.data["sni"] = sni
# Track the current phase slug so each emitted Finding can be stamped with
# the step that produced it (used in the UI log as "[step]").
_step = {"name": ""}
_orig_report = report
_orig_finds = finds
async def report(phase: str, detail: str, progress: float, severity: str = "info") -> None: # type: ignore[no-redef]
_step["name"] = phase
await _orig_report(phase, detail, progress, severity)
async def finds(f: Finding) -> None: # type: ignore[no-redef]
if not f.step:
f.step = _step["name"]
await _orig_finds(f)
await report("init", f"対象: {target} (host={host}, port={port})", 0.01, "info")
# TCP reachability first
resolved = await resolve_host(host)
if resolved is None:
await report("dns", "名前解決に失敗しました", 1.0, "serious")
@@ -696,7 +743,7 @@ async def run_full_scan(target: str, report: ReportProgress, finds: ReportFindin
apply_rank(result)
result.score = 0.0
result.rank = "R"
result.add(Finding("connectivity", "DNS resolution failed", host, "serious", 10))
result.add(Finding("Connectivity", "DNS resolution failed", host, "serious", 10, group=G_REL))
await finds(result.findings[-1])
return result
result.data["resolved_ip"] = resolved
@@ -705,7 +752,7 @@ async def run_full_scan(target: str, report: ReportProgress, finds: ReportFindin
if not reachable:
await report("tcp", f"TCP {host}:{port} に接続できません", 1.0, "serious")
result.error = "no_tls"
result.add(Finding("connectivity", "TCP port unreachable", f"{host}:{port}", "serious", 10))
result.add(Finding("Connectivity", "TCP port unreachable", f"{host}:{port}", "serious", 10, group=G_REL))
await finds(result.findings[-1])
result.finished_at = time.time()
apply_rank(result)
@@ -717,7 +764,7 @@ async def run_full_scan(target: str, report: ReportProgress, finds: ReportFindin
raise
except Exception as e:
result.error = f"{e.__class__.__name__}: {e}"
result.add(Finding("engine", "Scan error", str(e), "serious", 5))
result.add(Finding("Engine", "Scan error", str(e), "serious", 5, group=G_REL))
await finds(result.findings[-1])
result.finished_at = time.time()
@@ -95,27 +95,26 @@ class SimResult:
error: str | None = None
async def simulate(host: str, port: int, sni: str | None) -> list[SimResult]:
results: list[SimResult] = []
for p in PROFILES:
exts_parts = []
if sni and not _is_ip(sni):
try:
exts_parts.append(wire.ext_server_name(sni))
except Exception:
pass
exts_parts.append(wire.ext_ec_point_formats())
exts_parts.append(wire.ext_supported_groups(p.groups))
exts_parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501, 0x0603, 0x0806, 0x0601,
]))
exts_parts.append(wire.ext_renegotiation_info_empty())
if p.versions and C.TLS_1_3 in p.versions:
exts_parts.append(wire.ext_supported_versions_client(p.versions))
exts_parts.append(wire.ext_psk_key_exchange_modes())
exts_parts.append(wire.ext_key_share_empty())
exts = b"".join(exts_parts)
async def _simulate_one(host: str, port: int, sni: str | None, p: ClientProfile) -> SimResult:
exts_parts = []
if sni and not _is_ip(sni):
try:
exts_parts.append(wire.ext_server_name(sni))
except Exception:
pass
exts_parts.append(wire.ext_ec_point_formats())
exts_parts.append(wire.ext_supported_groups(p.groups))
exts_parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501, 0x0603, 0x0806, 0x0601,
]))
exts_parts.append(wire.ext_renegotiation_info_empty())
if p.versions and C.TLS_1_3 in p.versions:
exts_parts.append(wire.ext_supported_versions_client(p.versions))
exts_parts.append(wire.ext_psk_key_exchange_modes())
exts_parts.append(wire.ext_key_share_empty())
exts = b"".join(exts_parts)
try:
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
@@ -124,20 +123,32 @@ async def simulate(host: str, port: int, sni: str | None) -> list[SimResult]:
extensions=exts,
sni=sni,
)
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
results.append(SimResult(client=p.name, connected=False, error=res.error or "handshake failed"))
continue
v = sh.negotiated_version or sh.server_version
version_name = C.PROTOCOL_NAMES.get(v, f"0x{v:04x}")
cipher_name = C.CIPHER_SUITES.get(sh.cipher_suite, f"0x{sh.cipher_suite:04x}") if sh.cipher_suite else ""
results.append(SimResult(
client=p.name,
connected=True,
negotiated_version=version_name,
negotiated_cipher=cipher_name,
))
return results
except Exception as e:
return SimResult(client=p.name, connected=False, error=f"{e.__class__.__name__}: {e}")
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
return SimResult(client=p.name, connected=False, error=res.error or "handshake failed")
v = sh.negotiated_version or sh.server_version
version_name = C.PROTOCOL_NAMES.get(v, f"0x{v:04x}")
cipher_name = C.CIPHER_SUITES.get(sh.cipher_suite, f"0x{sh.cipher_suite:04x}") if sh.cipher_suite else ""
return SimResult(
client=p.name,
connected=True,
negotiated_version=version_name,
negotiated_cipher=cipher_name,
)
async def simulate(host: str, port: int, sni: str | None) -> list[SimResult]:
# Run profiles concurrently with a small cap to avoid burst-connecting the server.
sem = asyncio.Semaphore(4)
async def guarded(p: ClientProfile) -> SimResult:
async with sem:
return await _simulate_one(host, port, sni, p)
results = await asyncio.gather(*(guarded(p) for p in PROFILES), return_exceptions=False)
return list(results)
def _is_ip(host: str) -> bool:
+4 -2
View File
@@ -32,9 +32,11 @@ async def fetch_hsts(host: str, port: int, timeout: float = 6.0) -> HstsInfo:
info = HstsInfo(present=True, raw=header)
for token in header.split(";"):
t = token.strip().lower()
if t.startswith("max-age="):
if t.startswith("max-age"):
_, _, v = t.partition("=")
v = v.strip().strip('"').strip("'")
try:
info.max_age = int(t.split("=", 1)[1])
info.max_age = int(v)
except ValueError:
info.max_age = 0
elif t == "includesubdomains":
@@ -64,6 +64,15 @@ async def probe_http(host: str, port: int, timeout: float = 6.0) -> HttpInfo:
info.alt_svc = resp.headers.get("alt-svc")
if not info.server:
info.server = resp.headers.get("server")
except ImportError:
# httpx was installed without the [http2] extra; fall back to ALPN.
try:
from .protocol.probes import alpn_negotiate # lazy import to avoid cycles
proto = await alpn_negotiate(host, port, host, ["h2", "http/1.1"], timeout=timeout)
if proto == "h2":
info.http2 = True
except Exception:
pass
except Exception:
pass
+5 -2
View File
@@ -23,20 +23,23 @@ async def probe_http3(host: str, port: int, sni: str | None, timeout: float = 6.
info = Http3Info()
try:
host_for_quic = sni or host
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["h3", "h3-29"],
verify_mode=None,
server_name=host_for_quic,
)
try:
import ssl as _ssl
configuration.verify_mode = _ssl.CERT_NONE
except Exception:
pass
host_for_quic = sni or host
async def _run():
async with connect(host, port, configuration=configuration, server_name=host_for_quic) as client:
# aioquic ≥ 0.9 dropped the `server_name` kwarg on connect(); SNI
# is taken from QuicConfiguration.server_name instead.
async with connect(host, port, configuration=configuration) as client:
await client.wait_connected()
return True
ok = await asyncio.wait_for(_run(), timeout=timeout)
@@ -49,7 +49,8 @@ async def _fetch_chrome_list(timeout: float = 30.0) -> dict[str, bool]:
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"},
follow_redirects=True) as client:
resp = await client.get(CHROME_URL)
if resp.status_code != 200:
return {}
@@ -86,7 +87,8 @@ async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"},
follow_redirects=True) as client:
resp = await client.get(FIREFOX_URL)
if resp.status_code != 200:
return {}
@@ -94,14 +96,31 @@ async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
except Exception:
return {}
result: dict[str, bool] = {}
# Format: lines like %%\nhost, include_subdomains_bool\n%%
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("/") or line.startswith("%") or line.startswith("#"):
continue
if "," in line:
host, flag = [p.strip() for p in line.split(",", 1)]
result[host.lower()] = "1" in flag or "true" in flag.lower()
# Historically Firefox shipped the preload list as a C-array of
# { "host.example", true },
# but the current nsSTSPreloadList.inc (2024+) is a plain CSV with
# host.example, 1
# per line (1 = include_subdomains, 0 = not). Try both, brace-format
# first; fall back to the CSV form.
for m in re.finditer(r'\{\s*"([^"]+)"\s*,\s*(true|false)\s*\}', text):
result[m.group(1).lower()] = m.group(2) == "true"
if not result:
# Plain CSV format. Accept lines that look like "host, 0|1" —
# including single-label TLD entries (e.g. "dev, 1", "app, 1",
# "google, 1") which Mozilla ships alongside FQDNs. The explicit
# ", [01]" suffix is enough to reject the multiline "/* ... */"
# license header whose continuation lines start with " *".
line_re = re.compile(r'^\s*([a-z0-9][a-z0-9.\-]*[a-z0-9])\s*,\s*([01])\s*$', re.IGNORECASE)
for line in text.splitlines():
m = line_re.match(line)
if not m:
continue
host = m.group(1).lower()
# A bare "host" with no dot is only valid as a TLD-style entry.
# Everything else must contain at least a dot or it's junk.
if "." not in host and not host.isalnum():
continue
result[host] = m.group(2) == "1"
if result:
try:
cache.write_text(json.dumps(result))
@@ -131,8 +150,10 @@ async def check_preload(host: str) -> list[PreloadResult]:
f_listed, f_sub = _lookup(host, firefox)
results.append(PreloadResult("chrome", c_listed, c_sub, None if chrome else "source unavailable"))
results.append(PreloadResult("firefox", f_listed, f_sub, None if firefox else "source unavailable"))
# Edge effectively uses Chromium's preload list since the Edge (Chromium) release.
# Edge (Chromium) and Internet Explorer on Windows 10+ both rely on the
# Chromium HSTS preload list via WinINet; no separate list is published
# for either. Report both with the Chromium lookup so the UI doesn't
# carry phantom "not supported" / "source unavailable" entries.
results.append(PreloadResult("edge", c_listed, c_sub, None if chrome else "source unavailable"))
# Internet Explorer never maintained its own HSTS preload list; treat same as listed-in-none.
results.append(PreloadResult("ie", False, False, "IE does not support HSTS preload"))
results.append(PreloadResult("ie", c_listed, c_sub, None if chrome else "source unavailable"))
return results
@@ -103,6 +103,13 @@ CIPHER_SUITES: dict[int, str] = {
0xc00a: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
0xc027: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
0xc028: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
0xc023: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
0xc024: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384",
# DHE CBC (legacy)
0x0033: "TLS_DHE_RSA_WITH_AES_128_CBC_SHA",
0x0039: "TLS_DHE_RSA_WITH_AES_256_CBC_SHA",
0x0067: "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256",
0x006b: "TLS_DHE_RSA_WITH_AES_256_CBC_SHA256",
# DHE AEAD
0x009e: "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
0x009f: "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384",
@@ -1,32 +1,24 @@
from __future__ import annotations
import asyncio
from . import constants as C
from . import wire
from .client import send_client_hello
async def detect_named_groups(host: str, port: int, sni: str | None, tls_version: int = C.TLS_1_3) -> list[int]:
"""Probe which named groups the server accepts for TLS 1.3 key share.
We don't execute a real handshake, we just offer one group at a time and
watch for either a successful ServerHello (with key_share) or HelloRetryRequest
(which tells us the server accepts the version but wants a different group).
"""
accepted: list[int] = []
candidate_groups = list(C.NAMED_GROUPS.keys())
# Use TLS_AES_128_GCM_SHA256 as a minimal 1.3 cipher suite.
async def _probe_one_group(host: str, port: int, sni: str | None, tls_version: int, g: int) -> int | None:
cipher_suites = [0x1301, 0x1302, 0x1303]
for g in candidate_groups:
exts = (
(wire.ext_server_name(sni) if sni and not _is_ip(sni) else b"")
+ wire.ext_supported_versions_client([tls_version])
+ wire.ext_supported_groups([g])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_psk_key_exchange_modes()
+ wire.ext_key_share_empty()
)
exts = (
(wire.ext_server_name(sni) if sni and not _is_ip(sni) else b"")
+ wire.ext_supported_versions_client([tls_version])
+ wire.ext_supported_groups([g])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_psk_key_exchange_modes()
+ wire.ext_key_share_empty()
)
try:
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
@@ -35,14 +27,33 @@ async def detect_named_groups(host: str, port: int, sni: str | None, tls_version
extensions=exts,
sni=sni,
)
sh = res.server_hello
if not res.connected or sh is None:
continue
if sh.alert is not None:
continue
if sh.negotiated_version == C.TLS_1_3:
accepted.append(g)
return accepted
except Exception:
return None
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
return None
if sh.negotiated_version == tls_version:
return g
return None
async def detect_named_groups(host: str, port: int, sni: str | None, tls_version: int = C.TLS_1_3) -> list[int]:
"""Probe which named groups the server accepts for TLS 1.3 key share.
We don't execute a real handshake, we just offer one group at a time and
watch for either a successful ServerHello (with key_share) or HelloRetryRequest
(which tells us the server accepts the version but wants a different group).
Runs probes concurrently with a small cap to avoid connection bursts.
"""
candidate_groups = list(C.NAMED_GROUPS.keys())
sem = asyncio.Semaphore(4)
async def guarded(g: int) -> int | None:
async with sem:
return await _probe_one_group(host, port, sni, tls_version, g)
results = await asyncio.gather(*(guarded(g) for g in candidate_groups))
return [g for g in results if g is not None]
def _is_ip(host: str) -> bool:
@@ -160,10 +160,145 @@ async def probe_cipher(host: str, port: int, sni: str | None, version: int, ciph
return CipherProbe(version, cipher_id, cipher_name, ok)
async def get_peer_certificate_chain(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Return DER-encoded peer certificate chain using the ssl module.
async def _fetch_chain_via_wire(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Pull the full TLS 1.2 Certificate handshake message ourselves and
return the DER-encoded chain. Used as fallback on Python < 3.13 where
``ssl.SSLSocket.get_unverified_chain`` isn't available.
"""
parts = []
if sni:
try:
parts.append(wire.ext_server_name(sni))
except Exception:
pass
parts.append(wire.ext_ec_point_formats())
parts.append(wire.ext_supported_groups([0x001d, 0x0017, 0x0018, 0x0019]))
parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
]))
parts.append(wire.ext_renegotiation_info_empty())
exts = b"".join(parts)
Returns the leaf certificate first, then intermediates. Empty on failure.
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception:
return []
try:
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[
0xc02f, 0xc030, 0xc02b, 0xc02c,
0xcca8, 0xcca9,
0xc013, 0xc014, 0x009c, 0x009d,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
# Read enough records to cover ServerHello + Certificate; TLS 1.2
# Certificate messages are often fragmented across several records.
buf = bytearray()
deadline = timeout
for _ in range(20):
try:
header = await asyncio.wait_for(reader.readexactly(5), timeout=deadline)
except Exception:
break
rec_len = (header[3] << 8) | header[4]
if rec_len == 0 or rec_len > 1 << 14:
break
try:
body = await asyncio.wait_for(reader.readexactly(rec_len), timeout=deadline)
except Exception:
break
buf.extend(header); buf.extend(body)
# Stop once we've seen a Certificate handshake in the accumulated buffer.
if _has_certificate_message(bytes(buf)):
break
return _extract_certificates_from_records(bytes(buf))
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
def _has_certificate_message(records: bytes) -> bool:
i = 0
hs_buf = bytearray()
while i + 5 <= len(records):
ct = records[i]
rlen = (records[i + 3] << 8) | records[i + 4]
if i + 5 + rlen > len(records):
break
if ct == C.CT_HANDSHAKE:
hs_buf.extend(records[i + 5:i + 5 + rlen])
i += 5 + rlen
j = 0
while j + 4 <= len(hs_buf):
hs_type = hs_buf[j]
hs_len = (hs_buf[j + 1] << 16) | (hs_buf[j + 2] << 8) | hs_buf[j + 3]
if j + 4 + hs_len > len(hs_buf):
return False
if hs_type == C.HS_CERTIFICATE:
return True
j += 4 + hs_len
return False
def _extract_certificates_from_records(records: bytes) -> list[bytes]:
"""Parse TLS 1.2 Certificate handshake message and return the DER chain."""
# 1) Reassemble handshake layer from all handshake-typed records.
i = 0
hs_buf = bytearray()
while i + 5 <= len(records):
ct = records[i]
rlen = (records[i + 3] << 8) | records[i + 4]
if i + 5 + rlen > len(records):
break
if ct == C.CT_HANDSHAKE:
hs_buf.extend(records[i + 5:i + 5 + rlen])
i += 5 + rlen
# 2) Walk handshake messages, find Certificate.
j = 0
while j + 4 <= len(hs_buf):
hs_type = hs_buf[j]
hs_len = (hs_buf[j + 1] << 16) | (hs_buf[j + 2] << 8) | hs_buf[j + 3]
if j + 4 + hs_len > len(hs_buf):
break
if hs_type == C.HS_CERTIFICATE and hs_len >= 3:
body = bytes(hs_buf[j + 4:j + 4 + hs_len])
# TLS 1.2 Certificate: uint24 total_len || [ uint24 cert_len || cert ] *
total = (body[0] << 16) | (body[1] << 8) | body[2]
k = 3
end = 3 + total
chain: list[bytes] = []
while k + 3 <= end:
clen = (body[k] << 16) | (body[k + 1] << 8) | body[k + 2]
k += 3
if k + clen > end:
break
chain.append(body[k:k + clen])
k += clen
return chain
j += 4 + hs_len
return []
async def get_peer_certificate_chain(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Return DER-encoded peer certificate chain.
Tries the ssl module's Python-3.13+ chain APIs first, then falls back to
a raw TLS 1.2 wire probe that parses the Certificate handshake message
directly. Returns leaf first, then intermediates. Empty on failure.
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
@@ -180,14 +315,16 @@ async def get_peer_certificate_chain(host: str, port: int, sni: str | None, time
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
leaf = ssock.getpeercert(binary_form=True)
chain: list[bytes] = [leaf] if leaf else []
# Intermediate chain via internal API (CPython 3.13+) if available
# Intermediate chain via internal API (CPython 3.13+) if available.
# On older Pythons these attributes are absent — callers will
# fall through to the wire-level fetcher below.
try:
verified = ssock.get_verified_chain()
verified = ssock.get_verified_chain() # type: ignore[attr-defined]
if verified:
chain = list(verified)
except (AttributeError, ssl.SSLError):
try:
unverified = ssock.get_unverified_chain()
unverified = ssock.get_unverified_chain() # type: ignore[attr-defined]
if unverified:
chain = list(unverified)
except (AttributeError, ssl.SSLError):
@@ -195,7 +332,18 @@ async def get_peer_certificate_chain(host: str, port: int, sni: str | None, time
return chain
except Exception:
return []
return await loop.run_in_executor(None, _do)
chain = await loop.run_in_executor(None, _do)
# If the ssl module only gave us a single cert (common on Python < 3.13),
# fetch the full chain via our own TLS wire probe.
if len(chain) <= 1:
try:
wire_chain = await _fetch_chain_via_wire(host, port, sni, timeout=timeout)
except Exception:
wire_chain = []
if len(wire_chain) > len(chain):
chain = wire_chain
return chain
async def alpn_negotiate(host: str, port: int, sni: str | None, alpn_list: list[str], timeout: float = 6.0) -> str | None:
@@ -222,13 +370,7 @@ async def alpn_negotiate(host: str, port: int, sni: str | None, alpn_list: list[
return await loop.run_in_executor(None, _do)
async def get_ocsp_stapling(host: str, port: int, sni: str | None, timeout: float = 6.0) -> bool:
"""Detect whether server returns OCSP stapled response.
Python's ssl module does not expose stapled OCSP directly. We use a minimal raw probe to look
for `status_request` extension with a CertificateStatus record, but that is expensive to
reimplement here. As a conservative default, we return False unless we can detect the
status_request_v2 extension via OpenSSL; in that case we report based on whether openssl
socket has stapling info. Since stdlib has no accessor, we always return False.
"""
return False
# Note: OCSP stapling detection requires probing the `status_request` TLS
# extension and reading the CertificateStatus handshake message. Python's
# stdlib ssl module does not expose stapled OCSP to Python; the engine checks
# live OCSP/CRL instead (see certs/revocation.py).
+21 -3
View File
@@ -13,26 +13,44 @@ SEVERITY_LABELS = {
}
SEVERITY_COLORS = {
"good": "bright-green",
"normal": "bright-yellow",
"notgood": "bright-orange",
"normal": "bright-blue",
"notgood": "bright-yellow",
"bad": "bright-red",
"serious": "magenta",
"serious": "bright-purple",
"info": "tx-alt",
}
# 4 main groups for the results UI tabs.
GROUPS = ("reliability", "safety", "vulnerabilities", "compatibility")
GROUP_LABELS = {
"reliability": "信頼性",
"safety": "安全性",
"vulnerabilities": "脆弱性",
"compatibility": "互換性",
}
@dataclass
class Finding:
# 'group' is the coarse tab (reliability/safety/vulnerabilities/compatibility).
# 'category' is the fine-grained sub-category label shown as a chip on each finding.
# 'step' is the machine-friendly phase slug that produced this finding
# (e.g. "protocols", "ciphers", "handshake_sim"). Shown in the log as "[step]".
category: str
title: str
detail: str = ""
severity: str = "info"
weight: int = 0
group: str = ""
step: str = ""
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
d["severity_label"] = SEVERITY_LABELS.get(self.severity, self.severity)
d["color"] = SEVERITY_COLORS.get(self.severity, "tx")
# impact is what the UI sorts by for the Summary tab.
mul = {"good": 0.0, "normal": 0.0, "notgood": 1.0, "bad": 3.0, "serious": 9.0, "info": 0.0}.get(self.severity, 0.0)
d["impact"] = mul * float(self.weight)
return d
@@ -5,10 +5,69 @@ from ..protocol import constants as C
from ..protocol import wire
async def probe_secure_renegotiation(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Check whether the server returns the renegotiation_info extension (RFC 5746).
async def _read_records(reader: asyncio.StreamReader, max_records: int = 8, timeout: float = 6.0) -> bytes:
"""Read up to `max_records` complete TLS records from the stream.
Returns (secure_renego_supported, error).
A simple record-layer reader that guarantees we pull entire records
instead of relying on whatever arrives in the first TCP chunk. Records
that straddle packet boundaries (common for ServerHello+Certificate
responses in TLS 1.2) are reassembled.
"""
buf = bytearray()
for _ in range(max_records):
try:
header = await asyncio.wait_for(reader.readexactly(5), timeout=timeout)
except (asyncio.IncompleteReadError, asyncio.TimeoutError, ConnectionError):
break
except Exception:
break
rec_len = (header[3] << 8) | header[4]
body = b""
if rec_len:
try:
body = await asyncio.wait_for(reader.readexactly(rec_len), timeout=timeout)
except (asyncio.IncompleteReadError, asyncio.TimeoutError, ConnectionError):
buf.extend(header)
break
except Exception:
buf.extend(header)
break
buf.extend(header)
buf.extend(body)
ct = header[0]
# A fatal alert or end of useful data — stop.
if ct == C.CT_ALERT and len(body) >= 2 and body[0] == 2:
break
return bytes(buf)
def _iter_records(data: bytes):
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
if i + 5 + rec_len > len(data):
break
yield ct, data[i + 5:i + 5 + rec_len]
i += 5 + rec_len
if rec_len == 0:
break
async def probe_secure_renegotiation(host: str, port: int, sni: str | None, timeout: float = 8.0) -> tuple[bool, str | None]:
"""Check whether the server acknowledges RFC 5746 (Secure Renegotiation).
Behavior:
- Send a ClientHello that offers only TLS 1.2 (no supported_versions extension,
so TLS 1.3 servers will downgrade to 1.2 for this probe) and includes
`renegotiation_info` (empty) per RFC 5746.
- A server that supports RFC 5746 MUST echo the `renegotiation_info`
extension in its ServerHello.
- If the server *only* supports TLS 1.3 (e.g., modern fronts that refuse
TLS 1.2), RFC 5746 does not apply — TLS 1.3 removed renegotiation
entirely and is treated as "supported by way of elimination".
Returns (supported, info_string).
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
@@ -18,15 +77,24 @@ async def probe_secure_renegotiation(host: str, port: int, sni: str | None, time
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018, 0x0019])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_renegotiation_info_empty()
+ wire.ext_extended_master_secret()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc02f, 0xc030, 0xc013, 0xc014, 0x002f, 0x0035, 0x000a],
cipher_suites=[
0xc02f, 0xc030, 0xc02b, 0xc02c,
0xcca8, 0xcca9,
0xc013, 0xc014, 0x009c, 0x009d,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
@@ -34,19 +102,24 @@ async def probe_secure_renegotiation(host: str, port: int, sni: str | None, time
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(8192), timeout=timeout)
except Exception:
data = b""
data = await _read_records(reader, max_records=3, timeout=timeout)
if not data:
return False, "no response"
parsed = wire.parse_server_response(data)
if parsed is None or parsed.extensions is None:
if parsed is None or parsed.handshake_type != C.HS_SERVER_HELLO:
# If the server returned only an alert on a TLS 1.2 ClientHello we can't
# observe renegotiation_info; report unknown-leaning-negative.
if parsed and parsed.alert is not None:
return False, f"server alert (level={parsed.alert[0]}, desc={parsed.alert[1]})"
return False, "no server hello"
# Either renegotiation_info extension or SCSV cipher (0x00ff) acknowledgment.
if C.EXT_RENEGOTIATION_INFO in parsed.extensions:
# TLS 1.3 removed renegotiation entirely (RFC 8446 §4.1.2). A server that
# negotiates TLS 1.3 here is safe-by-default.
neg_ver = parsed.negotiated_version or parsed.server_version
if neg_ver == C.TLS_1_3:
return True, "TLS 1.3 (renegotiation obsolete)"
if parsed.extensions and C.EXT_RENEGOTIATION_INFO in parsed.extensions:
return True, None
return False, "no renegotiation_info extension"
return False, "no renegotiation_info extension echoed"
finally:
try:
writer.close()
@@ -55,11 +128,19 @@ async def probe_secure_renegotiation(host: str, port: int, sni: str | None, time
pass
async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Check whether the server rejects TLS_FALLBACK_SCSV.
async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: float = 8.0) -> tuple[bool, str | None]:
"""Check whether the server enforces TLS_FALLBACK_SCSV (RFC 7507).
We offer TLS 1.1 + SCSV (0x5600); if the server supports a higher version,
it MUST respond with inappropriate_fallback(86) alert.
Strategy:
- Send a ClientHello with legacy_version=TLS 1.1, cipher list containing
TLS_FALLBACK_SCSV (0x5600), and NO supported_versions extension.
- If the server's highest supported version is > TLS 1.1, it MUST respond
with fatal alert `inappropriate_fallback` (86) per RFC 7507.
- If the server genuinely has TLS 1.1 as its max (rare today), it will
complete the handshake normally — in that case SCSV enforcement is
moot. We treat that as `n/a` rather than `not enforced`.
Returns (enforced_or_not_applicable, info_string).
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
@@ -70,14 +151,20 @@ async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: fl
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_signature_algorithms([
0x0403, 0x0401, 0x0501, 0x0601,
])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_1,
hostname=sni,
cipher_suites=[0x5600, 0xc013, 0xc014, 0x002f, 0x0035],
cipher_suites=[
0x5600, # TLS_FALLBACK_SCSV
0xc013, 0xc014, 0xc009, 0xc00a,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
@@ -85,23 +172,33 @@ async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: fl
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
data = b""
data = await _read_records(reader, max_records=3, timeout=timeout)
if not data:
return False, "no response"
# Look for alert 86 (inappropriate_fallback) = supported (good).
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
body = data[i + 5:i + 5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2 and body[1] == 86:
return True, None
i += 5 + rec_len
if rec_len == 0:
break
# A fatal alert may cause the server to abort the TCP connection
# before the alert record is flushed on some stacks. Treat as
# unknown-leaning-negative, but also probe once more without SCSV
# to distinguish from "server totally broken".
return False, "no response (possibly RST)"
saw_server_hello = False
negotiated_version: int | None = None
for ct, body in _iter_records(data):
if ct == C.CT_ALERT and len(body) >= 2:
level, desc = body[0], body[1]
if desc == 86:
return True, None
# Some servers reply with handshake_failure(40) / protocol_version(70)
# when downgrade is refused; treat as enforcement.
if desc in (70,) and level == 2:
return True, f"alert protocol_version({desc})"
if ct == C.CT_HANDSHAKE and len(body) >= 4 and body[0] == C.HS_SERVER_HELLO:
saw_server_hello = True
if len(body) >= 6:
negotiated_version = (body[4] << 8) | body[5]
if saw_server_hello:
if negotiated_version == C.TLS_1_1:
# Server genuinely maxes out at TLS 1.1 → SCSV is not applicable.
return True, "server max = TLS 1.1 (SCSV not applicable)"
return False, "server completed handshake without inappropriate_fallback"
return False, "no inappropriate_fallback alert"
finally:
try: