This commit is contained in:
2026-04-25 23:11:20 +09:00
parent 209b14e9f8
commit a4e31d1641
43 changed files with 14 additions and 5421 deletions
+13 -97
View File
@@ -10,28 +10,16 @@ from pathlib import Path
from bs4 import BeautifulSoup
from markitdown import MarkItDown
from datetime import datetime, timezone
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Response, WebSocket, HTTPException
from zoneinfo import ZoneInfo
from fastapi import FastAPI, Request, Response
from fastapi.templating import Jinja2Templates
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
from jinja2.exceptions import TemplateNotFound
from .error import error_page
from .database import AccessCounter
from .middleware import Middleware, server_version, onion_hostname
from .tools.tls_test import (
tls_test_db, tls_test_queue,
tls_submit, tls_api_submit, tls_results_context, tls_websocket_handler,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
await tls_test_queue.start()
try:
yield
finally:
await tls_test_queue.stop()
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None, lifespan=lifespan)
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
app.add_middleware(Middleware)
templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
markitdown = MarkItDown()
@@ -46,6 +34,16 @@ class CustomHTMLRenderer(mistune.HTMLRenderer):
return f'<pre>{mistune.escape(code)}</pre>\n'
htmlitdown = mistune.create_markdown(renderer=CustomHTMLRenderer(escape=False))
@property
def this_year() -> int:
return datetime.now(ZoneInfo("Asia/Tokyo")).year
templates.env.globals["this_year"] = this_year
@property
def this_year_in_heisei() -> int: # heysay is not ended.
return datetime.now(ZoneInfo("Asia/Tokyo")).year - 1989
templates.env.globals["this_year_in_heisei"] = this_year_in_heisei
def get_daily_quote() -> str:
seed = str(datetime.now(timezone.utc).date())
with Path.cwd().joinpath("public", "quotes.txt").open("r") as f:
@@ -142,88 +140,6 @@ async def thumbnail(request: Request, path: str) -> Response:
png = resvg_py.svg_to_bytes(svg, font_files=font_files, width=1200, height=630)
return Response(content=png, media_type="image/png")
@app.api_route("/tools/tls-test/", methods=["GET"])
async def tls_test_index(request: Request) -> Response:
return templates.TemplateResponse(request=request, name="tools/tls-test/index.html")
@app.api_route("/tools/tls-test/", methods=["POST"])
async def tls_test_submit(request: Request) -> Response:
form = await request.form()
test_id, err = tls_submit(request, str(form.get("target", "")).strip(), tls_test_db, tls_test_queue, templates)
if err:
return err
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
@app.api_route("/tools/tls-test/status/{test_id}/", methods=["GET"])
async def tls_test_status_page(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
if job.get("status") == "done":
return RedirectResponse(url=f"/tools/tls-test/results/{test_id}/", status_code=303)
return templates.TemplateResponse(
request=request,
name="tools/tls-test/status.html",
context={"test_id": test_id, "target": job.get("target", ""), "status": job.get("status", ""), "progress_entries": tls_test_db.get_progress(test_id)},
)
@app.api_route("/tools/tls-test/results/{test_id}/", methods=["GET"])
async def tls_test_results_page(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
if job.get("status") != "done":
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
return templates.TemplateResponse(request=request, name="tools/tls-test/results.html", context=tls_results_context(job, test_id, request, tls_test_db))
@app.websocket("/tools/tls-test/ws/{test_id}")
async def tls_test_ws(websocket: WebSocket, test_id: str):
await tls_websocket_handler(websocket, test_id, tls_test_db, tls_test_queue)
@app.api_route("/api/tools/tls-test/scan", methods=["POST"])
async def tls_test_api_scan(request: Request) -> Response:
try:
payload = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
test_id, err = tls_api_submit(request, str(payload.get("target", "")), tls_test_db, tls_test_queue)
if err:
return JSONResponse(err[0], status_code=err[1])
return JSONResponse({
"id": test_id,
"status_url": f"/tools/tls-test/status/{test_id}/",
"results_url": f"/tools/tls-test/results/{test_id}/",
"ws_url": f"/tools/tls-test/ws/{test_id}",
})
@app.api_route("/api/tools/tls-test/status/{test_id}", methods=["GET"])
async def tls_test_api_status(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return JSONResponse({"error": "not found"}, status_code=404)
progress = tls_test_db.get_progress(test_id)
return JSONResponse({
"id": test_id,
"target": job.get("target"),
"status": job.get("status"),
"rank": job.get("rank"),
"score": job.get("score"),
"created_at": job.get("created_at"),
"started_at": job.get("started_at"),
"finished_at": job.get("finished_at"),
"progress": progress,
"error": job.get("error_message"),
})
@app.api_route("/api/tools/tls-test/results/{test_id}", methods=["GET"])
async def tls_test_api_results(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return JSONResponse({"error": "not found"}, status_code=404)
if job.get("status") != "done":
return JSONResponse({"error": "not ready", "status": job.get("status")}, status_code=409)
return JSONResponse(job.get("result") or {})
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
async def default_response(request: Request, full_path: str) -> Response:
if not full_path.endswith(".html") and not full_path.endswith(".md"):
@@ -1,13 +0,0 @@
from .runner import TlsJobQueue
from .db import TlsTestDB
from .engine import run_full_scan
from .views import tls_submit, tls_api_submit, tls_results_context, tls_websocket_handler
tls_test_db = TlsTestDB()
tls_test_queue = TlsJobQueue(tls_test_db, run_full_scan)
__all__ = [
"TlsJobQueue", "TlsTestDB",
"tls_submit", "tls_api_submit", "tls_results_context", "tls_websocket_handler",
"tls_test_db", "tls_test_queue",
]
-39
View File
@@ -1,39 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
@dataclass
class CaaResult:
host: str
records: list[str] = field(default_factory=list)
effective_host: str = ""
error: str | None = None
async def lookup_caa(host: str, timeout: float = 5.0) -> CaaResult:
try:
import dns.asyncresolver
except Exception as e:
return CaaResult(host=host, error=f"dnspython missing: {e}")
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
labels = host.strip(".").split(".")
for i in range(len(labels)):
name = ".".join(labels[i:])
if not name or "." not in name and i > 0:
continue
try:
ans = await resolver.resolve(name, "CAA")
except Exception:
continue
records: list[str] = []
for r in ans:
try:
records.append(r.to_text())
except Exception:
continue
if records:
return CaaResult(host=host, records=records, effective_host=name)
return CaaResult(host=host, records=[], effective_host="", error="no CAA records found up to TLD")
@@ -1,162 +0,0 @@
from __future__ import annotations
import datetime
import hashlib
from dataclasses import dataclass, field
from typing import Any
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, ec, dsa, ed25519, ed448
@dataclass
class CertSummary:
subject: str
issuer: str
common_name: str | None
sans: list[str] = field(default_factory=list)
not_before: str = ""
not_after: str = ""
not_before_days_ago: int = 0
days_until_expiry: int = 0
is_expired: bool = False
is_self_signed: bool = False
signature_hash_algorithm: str = ""
signature_algorithm_oid: str = ""
public_key_algorithm: str = ""
public_key_size_bits: int = 0
public_key_curve: str | None = None
serial: str = ""
has_scts: bool = False
sct_count: int = 0
sha256_fingerprint: str = ""
spki_sha256: str = ""
aia_urls: list[str] = field(default_factory=list)
ocsp_urls: list[str] = field(default_factory=list)
crl_urls: list[str] = field(default_factory=list)
def _rdn(name: x509.Name) -> str:
return ", ".join(f"{attr.oid._name}={attr.value}" for attr in name)
def _get_cn(name: x509.Name) -> str | None:
for attr in name:
if attr.oid == x509.NameOID.COMMON_NAME:
return str(attr.value)
return None
def _alg_info(pk) -> tuple[str, int, str | None]:
if isinstance(pk, rsa.RSAPublicKey):
return "RSA", pk.key_size, None
if isinstance(pk, ec.EllipticCurvePublicKey):
return "EC", pk.curve.key_size, pk.curve.name
if isinstance(pk, dsa.DSAPublicKey):
return "DSA", pk.key_size, None
if isinstance(pk, ed25519.Ed25519PublicKey):
return "Ed25519", 256, "Ed25519"
if isinstance(pk, ed448.Ed448PublicKey):
return "Ed448", 456, "Ed448"
return (type(pk).__name__, 0, None)
def parse_certificate(der: bytes) -> tuple[x509.Certificate, CertSummary]:
cert = x509.load_der_x509_certificate(der)
s = CertSummary(
subject=_rdn(cert.subject),
issuer=_rdn(cert.issuer),
common_name=_get_cn(cert.subject),
)
# Validity (use UTC-aware accessors when available).
nb = getattr(cert, "not_valid_before_utc", None) or cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
na = getattr(cert, "not_valid_after_utc", None) or cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
now = datetime.datetime.now(tz=datetime.timezone.utc)
s.not_before = nb.isoformat()
s.not_after = na.isoformat()
s.not_before_days_ago = (now - nb).days
s.days_until_expiry = (na - now).days
s.is_expired = na < now or nb > now
s.is_self_signed = (cert.subject == cert.issuer)
# Signature alg
try:
s.signature_hash_algorithm = cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else ""
except Exception:
s.signature_hash_algorithm = ""
try:
s.signature_algorithm_oid = cert.signature_algorithm_oid.dotted_string
except Exception:
s.signature_algorithm_oid = ""
# Public key
pk = cert.public_key()
alg, size, curve = _alg_info(pk)
s.public_key_algorithm = alg
s.public_key_size_bits = size
s.public_key_curve = curve
# Serial
s.serial = format(cert.serial_number, "x")
# SANs
try:
san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
names: list[str] = []
for g in san:
try:
names.append(str(g.value) if hasattr(g, "value") else str(g))
except Exception:
continue
s.sans = names
except x509.ExtensionNotFound:
pass
# SCT (Certificate Transparency) — signed-certificate-timestamps in cert extensions.
try:
scts = cert.extensions.get_extension_for_class(x509.PrecertificateSignedCertificateTimestamps).value
s.has_scts = True
s.sct_count = len(list(scts))
except (x509.ExtensionNotFound, Exception):
# Could still be stapled via TLS extension — handled elsewhere.
pass
# AIA / OCSP / CRL
try:
aia = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess).value
for ad in aia:
try:
if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.1":
s.ocsp_urls.append(str(ad.access_location.value))
elif ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.2":
s.aia_urls.append(str(ad.access_location.value))
except Exception:
continue
except x509.ExtensionNotFound:
pass
try:
cdp = cert.extensions.get_extension_for_class(x509.CRLDistributionPoints).value
for dp in cdp:
if dp.full_name:
for n in dp.full_name:
try:
s.crl_urls.append(str(n.value))
except Exception:
continue
except x509.ExtensionNotFound:
pass
# Fingerprints
s.sha256_fingerprint = hashlib.sha256(der).hexdigest()
try:
spki = pk.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
s.spki_sha256 = hashlib.sha256(spki).hexdigest()
except Exception:
s.spki_sha256 = ""
return cert, s
@@ -1,21 +0,0 @@
from __future__ import annotations
import httpx
async def check_spki(spki_sha256_hex: str, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Query pwnedkeys.com for a given SPKI SHA-256 fingerprint.
Returns (is_pwned, error).
pwnedkeys.com responds 200 if the key is pwned, 404 if not known.
"""
url = f"https://v1.pwnedkeys.com/{spki_sha256_hex.lower()}"
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.head(url)
if resp.status_code == 200:
return True, None
if resp.status_code == 404:
return False, None
return False, f"unexpected status {resp.status_code}"
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
@@ -1,106 +0,0 @@
from __future__ import annotations
import asyncio
import httpx
from dataclasses import dataclass
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.x509 import ocsp
from cryptography.x509.oid import ExtensionOID
@dataclass
class RevocationResult:
checked: bool
revoked: bool
reason: str = ""
source: str = ""
error: str | None = None
async def check_ocsp(cert: x509.Certificate, issuer: x509.Certificate, timeout: float = 6.0) -> RevocationResult:
try:
aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS).value
except x509.ExtensionNotFound:
return RevocationResult(False, False, error="no AIA extension")
ocsp_url = None
for ad in aia:
if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.1":
try:
ocsp_url = str(ad.access_location.value)
break
except Exception:
continue
if not ocsp_url:
return RevocationResult(False, False, error="no OCSP URL")
try:
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(cert, issuer, hashes.SHA1())
req = builder.build()
req_bytes = req.public_bytes(serialization.Encoding.DER)
except Exception as e:
return RevocationResult(False, False, error=f"build error: {e}")
headers = {
"Content-Type": "application/ocsp-request",
"Accept": "application/ocsp-response",
"User-Agent": "nercone-tls-test/1.0",
}
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(ocsp_url, content=req_bytes, headers=headers)
except Exception as e:
return RevocationResult(False, False, error=f"{e.__class__.__name__}: {e}")
if resp.status_code != 200:
return RevocationResult(False, False, error=f"status {resp.status_code}")
try:
response = ocsp.load_der_ocsp_response(resp.content)
except Exception as e:
return RevocationResult(False, False, error=f"parse error: {e}")
if response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
return RevocationResult(False, False, error=f"ocsp status {response.response_status.name}")
cert_status = response.certificate_status
if cert_status == ocsp.OCSPCertStatus.REVOKED:
return RevocationResult(True, True, reason=str(response.revocation_reason or ""), source=ocsp_url)
if cert_status == ocsp.OCSPCertStatus.GOOD:
return RevocationResult(True, False, source=ocsp_url)
return RevocationResult(True, False, reason="unknown", source=ocsp_url)
async def check_crl(cert: x509.Certificate, timeout: float = 6.0) -> RevocationResult:
try:
cdp = cert.extensions.get_extension_for_oid(ExtensionOID.CRL_DISTRIBUTION_POINTS).value
except x509.ExtensionNotFound:
return RevocationResult(False, False, error="no CRL DP")
crl_urls: list[str] = []
for dp in cdp:
if dp.full_name:
for n in dp.full_name:
try:
v = str(n.value)
if v.startswith("http://") or v.startswith("https://"):
crl_urls.append(v)
except Exception:
continue
if not crl_urls:
return RevocationResult(False, False, error="no http CRL URL")
for url in crl_urls:
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.get(url)
if resp.status_code != 200:
continue
try:
crl = x509.load_der_x509_crl(resp.content)
except Exception:
try:
crl = x509.load_pem_x509_crl(resp.content)
except Exception:
continue
for entry in crl:
if entry.serial_number == cert.serial_number:
return RevocationResult(True, True, reason="listed in CRL", source=url)
return RevocationResult(True, False, source=url)
except Exception:
continue
return RevocationResult(False, False, error="all CRLs unreachable")
@@ -1,70 +0,0 @@
from __future__ import annotations
import ssl
import socket
import asyncio
from dataclasses import dataclass
from typing import Any
import certifi
@dataclass
class TrustResult:
platform: str
trusted: bool
error: str | None = None
def _build_ctx(ca_file: str | None) -> ssl.SSLContext:
ctx = ssl.create_default_context(cafile=ca_file)
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
return ctx
async def verify_with_store(host: str, port: int, sni: str | None, ca_file: str | None, timeout: float = 8.0) -> TrustResult:
loop = asyncio.get_running_loop()
ctx = _build_ctx(ca_file)
def _do() -> tuple[bool, str | None]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
_ = ssock.getpeercert()
return True, None
except ssl.SSLCertVerificationError as e:
return False, str(e)
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
ok, err = await loop.run_in_executor(None, _do)
return TrustResult(platform="", trusted=ok, error=err)
# Platform trust store mapping. Mozilla is authoritative via certifi.
# For Apple / Android / Java / Windows we use a heuristic: the majority of
# publicly trusted roots are cross-signed and shared across platforms, so if
# Mozilla verification passes we mark the others as "likely trusted" while also
# including a known-root allowlist per platform where discrepancies may exist.
PLATFORM_LABELS = ["Mozilla", "Apple", "Android", "Java", "Windows"]
async def verify_across_platforms(host: str, port: int, sni: str | None) -> list[TrustResult]:
"""Return trust outcomes for all 5 platforms.
Mozilla uses certifi. Apple / Android / Java / Windows use the same
verification path (standard chain validation) for now, then heuristically
adjust based on the issuer CN. When vendor-specific root bundles are added
under certs/stores/, this function should load them individually.
"""
mozilla = await verify_with_store(host, port, sni, certifi.where())
results = [TrustResult(platform="Mozilla", trusted=mozilla.trusted, error=mozilla.error)]
# Cross-platform fallbacks: mirror Mozilla's outcome. Vendor-specific root
# bundles can replace this once vendored under certs/stores/.
for p in PLATFORM_LABELS[1:]:
results.append(TrustResult(platform=p, trusted=mozilla.trusted, error=mozilla.error))
return results
-285
View File
@@ -1,285 +0,0 @@
from __future__ import annotations
import json
import sqlite3
import time
from pathlib import Path
from typing import Any
# Retention policy
# ----------------
# The DB column `expires_at` is kept for schema compatibility but is no longer
# a hard cutoff. The policy is:
# • Every row is preserved for at least MIN_RETENTION_SECONDS (7 days) after
# it was created.
# • Beyond that, rows are kept for up to MAX_RETENTION_SECONDS (1 year).
# • After 7 days, rows are only evicted when the total row count exceeds
# MAX_KEPT_ROWS (100). In that case, rows older than 7 days are deleted
# oldest-first until the total row count falls back to MAX_KEPT_ROWS.
MIN_RETENTION_SECONDS = 7 * 24 * 60 * 60
MAX_RETENTION_SECONDS = 365 * 24 * 60 * 60
MAX_KEPT_ROWS = 100
# Kept for backwards compatibility with any importer. `expires_at` is filled in
# with MAX_RETENTION_SECONDS at creation time (hard upper bound).
RETENTION_SECONDS = MAX_RETENTION_SECONDS
class TlsTestDB:
def __init__(self, filepath: str | Path | None = None):
if filepath is None:
filepath = Path.cwd() / "databases" / "tls_test.db"
self.filepath = str(filepath)
Path(self.filepath).parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.filepath, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
return conn
def _init_schema(self) -> None:
conn = self._conn()
try:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS tests (
id TEXT PRIMARY KEY,
target TEXT NOT NULL,
client_ip TEXT,
status TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER,
expires_at INTEGER NOT NULL,
rank TEXT,
score REAL,
result_json TEXT,
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_tests_expires ON tests(expires_at);
CREATE INDEX IF NOT EXISTS idx_tests_ip_created ON tests(client_ip, created_at);
CREATE INDEX IF NOT EXISTS idx_tests_target ON tests(target, created_at);
CREATE TABLE IF NOT EXISTS test_progress (
test_id TEXT NOT NULL,
seq INTEGER NOT NULL,
ts INTEGER NOT NULL,
phase TEXT NOT NULL,
detail TEXT,
progress REAL,
severity TEXT,
PRIMARY KEY(test_id, seq)
);
"""
)
conn.commit()
finally:
conn.close()
def create_job(self, test_id: str, target: str, client_ip: str | None) -> None:
now = int(time.time())
conn = self._conn()
try:
conn.execute(
"INSERT INTO tests(id,target,client_ip,status,created_at,expires_at)"
" VALUES (?,?,?,?,?,?)",
(test_id, target, client_ip, "queued", now, now + RETENTION_SECONDS),
)
conn.commit()
finally:
conn.close()
def update_status(
self,
test_id: str,
status: str,
*,
started_at: int | None = None,
finished_at: int | None = None,
rank: str | None = None,
score: float | None = None,
result_json: str | None = None,
error_message: str | None = None,
) -> None:
sets = ["status = ?"]
args: list[Any] = [status]
if started_at is not None:
sets.append("started_at = ?"); args.append(started_at)
if finished_at is not None:
sets.append("finished_at = ?"); args.append(finished_at)
if rank is not None:
sets.append("rank = ?"); args.append(rank)
if score is not None:
sets.append("score = ?"); args.append(score)
if result_json is not None:
sets.append("result_json = ?"); args.append(result_json)
if error_message is not None:
sets.append("error_message = ?"); args.append(error_message)
args.append(test_id)
conn = self._conn()
try:
conn.execute(f"UPDATE tests SET {', '.join(sets)} WHERE id = ?", args)
conn.commit()
finally:
conn.close()
def append_progress(
self,
test_id: str,
seq: int,
phase: str,
detail: str,
progress: float,
severity: str,
) -> None:
conn = self._conn()
try:
conn.execute(
"INSERT OR REPLACE INTO test_progress(test_id,seq,ts,phase,detail,progress,severity)"
" VALUES (?,?,?,?,?,?,?)",
(test_id, seq, int(time.time()), phase, detail, progress, severity),
)
conn.commit()
finally:
conn.close()
def get_job(self, test_id: str) -> dict[str, Any] | None:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute("SELECT * FROM tests WHERE id = ?", (test_id,))
row = cur.fetchone()
if row is None:
return None
cols = [c[0] for c in cur.description]
d = dict(zip(cols, row))
if d.get("result_json"):
try:
d["result"] = json.loads(d["result_json"])
except json.JSONDecodeError:
d["result"] = None
return d
finally:
conn.close()
def get_progress(self, test_id: str) -> list[dict[str, Any]]:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT seq,ts,phase,detail,progress,severity FROM test_progress"
" WHERE test_id = ? ORDER BY seq ASC",
(test_id,),
)
rows = cur.fetchall()
return [
{
"seq": r[0],
"ts": r[1],
"phase": r[2],
"detail": r[3],
"progress": r[4],
"severity": r[5],
}
for r in rows
]
finally:
conn.close()
def get_history_by_target(self, target: str, exclude_id: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
conn = self._conn()
try:
cur = conn.cursor()
if exclude_id:
cur.execute(
"SELECT id, target, status, created_at, finished_at, rank, score, error_message"
" FROM tests WHERE target = ? AND id != ? AND status = 'done'"
" ORDER BY created_at DESC LIMIT ?",
(target, exclude_id, limit),
)
else:
cur.execute(
"SELECT id, target, status, created_at, finished_at, rank, score, error_message"
" FROM tests WHERE target = ? AND status = 'done'"
" ORDER BY created_at DESC LIMIT ?",
(target, limit),
)
rows = cur.fetchall()
cols = [c[0] for c in cur.description]
return [dict(zip(cols, r)) for r in rows]
finally:
conn.close()
def count_ip_in_window(self, client_ip: str, window_seconds: int) -> int:
cutoff = int(time.time()) - window_seconds
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) FROM tests WHERE client_ip = ? AND created_at >= ?",
(client_ip, cutoff),
)
return cur.fetchone()[0]
finally:
conn.close()
def count_ip_active(self, client_ip: str) -> int:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) FROM tests WHERE client_ip = ? AND status IN ('queued','running')",
(client_ip,),
)
return cur.fetchone()[0]
finally:
conn.close()
def delete_expired(self) -> int:
"""Apply the retention policy.
Step 1: unconditionally drop rows past the 1-year hard ceiling.
Step 2: if total row count exceeds MAX_KEPT_ROWS, drop rows older than
MIN_RETENTION_SECONDS (7 days) in oldest-first order until the
count falls back to MAX_KEPT_ROWS. Rows within 7 days are
never evicted by this step.
Returns the number of rows deleted.
"""
now = int(time.time())
deleted = 0
conn = self._conn()
try:
cur = conn.cursor()
# Step 1 — hard ceiling: 1-year-old rows.
hard_cutoff = now - MAX_RETENTION_SECONDS
cur.execute("SELECT id FROM tests WHERE created_at < ?", (hard_cutoff,))
old_ids = [r[0] for r in cur.fetchall()]
for tid in old_ids:
conn.execute("DELETE FROM test_progress WHERE test_id = ?", (tid,))
conn.execute("DELETE FROM tests WHERE id = ?", (tid,))
deleted += len(old_ids)
# Step 2 — count-triggered eviction of >7-day-old rows.
cur.execute("SELECT COUNT(*) FROM tests")
total = cur.fetchone()[0]
if total > MAX_KEPT_ROWS:
excess = total - MAX_KEPT_ROWS
soft_cutoff = now - MIN_RETENTION_SECONDS
cur.execute(
"SELECT id FROM tests"
" WHERE created_at < ?"
" ORDER BY created_at ASC"
" LIMIT ?",
(soft_cutoff, excess),
)
evict_ids = [r[0] for r in cur.fetchall()]
for tid in evict_ids:
conn.execute("DELETE FROM test_progress WHERE test_id = ?", (tid,))
conn.execute("DELETE FROM tests WHERE id = ?", (tid,))
deleted += len(evict_ids)
conn.commit()
return deleted
finally:
conn.close()
@@ -1,808 +0,0 @@
from __future__ import annotations
import asyncio
import ipaddress
import time
from typing import Awaitable, Callable
from .schemas import Finding, ScanResult
from .scoring import apply_rank
from .protocol import constants as C
from .protocol.client import tcp_reachable, resolve_host, send_ssl2_client_hello
from .protocol.probes import (
probe_tls_version,
probe_cipher,
get_peer_certificate_chain,
alpn_negotiate,
)
from .protocol.kex import detect_named_groups
from .certs.parse import parse_certificate
from .certs.trust import verify_across_platforms
from .certs.pwnedkeys import check_spki
from .certs.revocation import check_ocsp, check_crl
from .hsts import fetch_hsts
from .preload.check import check_preload
from .caa import lookup_caa
from .http import probe_http
from .http3 import probe_http3
from .vulns.heartbleed import probe as probe_heartbleed
from .vulns.ccs import probe as probe_ccs
from .vulns.renego import probe_secure_renegotiation, probe_fallback_scsv
from .vulns.analysis import analyze as analyze_vulns
from .handshake_sim import simulate as simulate_handshakes
ReportProgress = Callable[[str, str, float, str], Awaitable[None]]
ReportFinding = Callable[[Finding], Awaitable[None]]
# Groups: reliability (信頼性), safety (安全性), vulnerabilities (脆弱性), compatibility (互換性)
G_REL = "reliability"
G_SAF = "safety"
G_VLN = "vulnerabilities"
G_CMP = "compatibility"
def parse_target(target: str) -> tuple[str, int, str | None]:
"""Return (host, port, sni). sni=None means the target is an IP literal."""
s = target.strip()
port = 443
host = s
if s.startswith("["):
idx = s.rfind("]")
if idx != -1:
host = s[1:idx]
rest = s[idx + 1:]
if rest.startswith(":"):
try:
port = int(rest[1:])
except ValueError:
port = 443
elif s.count(":") == 1:
h, _, p = s.partition(":")
host = h
try:
port = int(p)
except ValueError:
port = 443
else:
host = s
host = host.strip()
sni: str | None = host
try:
ipaddress.ip_address(host)
sni = None
except ValueError:
pass
return host, port, sni
def validate_tls_target(raw: str) -> str | None:
import re
s = (raw or "").strip()
if not s or len(s) > 255:
return None
if not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s):
return None
try:
host, port, _ = parse_target(s)
except Exception:
return None
if not host or port <= 0 or port > 65535:
return None
try:
ipaddress.ip_address(host)
return s
except ValueError:
pass
labels = host.split(".")
if not labels or any(not lbl for lbl in labels):
return None
for lbl in labels:
if len(lbl) > 63 or lbl.startswith("-") or lbl.endswith("-"):
return None
if not re.match(r"^[A-Za-z0-9\-]+$", lbl):
return None
if len(labels) < 2 and host != "localhost":
return None
return s
async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResult) -> None:
host = result.host
port = result.port
sni = result.data.get("sni")
async def emit(f: Finding) -> None:
result.add(f)
await finds(f)
# ---- Phase: protocol versions ----
await report("protocols", "TLS/SSL バージョンを検査中", 0.05, "info")
versions_to_probe = [C.TLS_1_0, C.TLS_1_1, C.TLS_1_2, C.TLS_1_3]
version_support: dict[int, bool] = {}
version_cipher: dict[int, str | None] = {}
for v in versions_to_probe:
pr = await probe_tls_version(host, port, sni, v)
version_support[v] = pr.supported
version_cipher[v] = pr.negotiated_cipher
sslv2 = await send_ssl2_client_hello(host, port)
sslv2_supported = sslv2.connected and sslv2.alert is None and bool(sslv2.raw)
from .protocol.client import send_client_hello
from .protocol import wire
try:
ssl3 = await send_client_hello(
host, port,
record_version=C.SSL_3_0,
client_hello_version=C.SSL_3_0,
cipher_suites=[0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
extensions=(wire.ext_server_name(sni) if sni else b""),
sni=sni,
)
sslv3_supported = (
ssl3.connected
and ssl3.server_hello is not None
and ssl3.server_hello.alert is None
and ssl3.server_hello.server_version == C.SSL_3_0
)
except Exception:
sslv3_supported = False
version_support[C.SSL_2_0] = sslv2_supported
version_support[C.SSL_3_0] = sslv3_supported
versions_supported = {v for v, ok in version_support.items() if ok}
# Emit versions in chronological order — SSL 2.0, SSL 3.0, then TLS 1.0+.
# Reads left-to-right in the results table the way humans describe it.
ordered_versions = [C.SSL_2_0, C.SSL_3_0, C.TLS_1_0, C.TLS_1_1, C.TLS_1_2, C.TLS_1_3]
result.data["versions"] = {
C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): version_support.get(v, False)
for v in ordered_versions
}
# SSL 2/3 → vulnerability findings; also record them as safety findings.
if sslv2_supported:
await emit(Finding("SSL/TLS Version", "SSL 2.0 supported", "SSLv2 は完全に破綻しています (DROWN)", "serious", 10, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "SSL 2.0 disabled", "", "good", 0, group=G_SAF))
if sslv3_supported:
await emit(Finding("SSL/TLS Version", "SSL 3.0 supported", "POODLE 攻撃が可能", "serious", 10, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "SSL 3.0 disabled", "", "good", 0, group=G_SAF))
if version_support.get(C.TLS_1_0):
await emit(Finding("SSL/TLS Version", "TLS 1.0 supported", "RFC 8996 で廃止済み", "notgood", 4, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.0 disabled", "", "good", 0, group=G_SAF))
if version_support.get(C.TLS_1_1):
await emit(Finding("SSL/TLS Version", "TLS 1.1 supported", "RFC 8996 で廃止済み", "notgood", 2, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.1 disabled", "", "good", 0, group=G_SAF))
if version_support.get(C.TLS_1_2):
await emit(Finding("SSL/TLS Version", "TLS 1.2 supported", "", "good", 0, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.2 not supported", "推奨プロトコルが有効化されていません", "notgood", 3, group=G_SAF))
if version_support.get(C.TLS_1_3):
await emit(Finding("SSL/TLS Version", "TLS 1.3 supported", "", "good", 0, group=G_SAF))
else:
await emit(Finding("SSL/TLS Version", "TLS 1.3 not supported", "最新プロトコルが有効化されていません", "notgood", 3, group=G_SAF))
if not version_support.get(C.TLS_1_2) and not version_support.get(C.TLS_1_3):
await emit(Finding("SSL/TLS Version", "No modern TLS (1.2/1.3) supported", "", "serious", 10, group=G_SAF))
# ---- Phase: cipher enumeration ----
await report("ciphers", "暗号スイートを列挙中", 0.20, "info")
accepted_per_version: dict[int, list[int]] = {}
probe_ciphers_per_version = {
C.TLS_1_3: [0x1301, 0x1302, 0x1303, 0x1304, 0x1305],
C.TLS_1_2: [
0xc02b, 0xc02c, 0xc02f, 0xc030, 0xcca8, 0xcca9,
0xc013, 0xc014, 0xc009, 0xc00a, 0xc027, 0xc028,
0x009c, 0x009d, 0x009e, 0x009f, 0xccaa,
0x002f, 0x0035, 0x003c, 0x003d,
0x000a, 0xc012, 0x0016,
0x0004, 0x0005, 0xc011, 0xc007,
0x0001, 0x0002, 0x003b,
0x0008, 0x0014,
0x0018, 0x0034,
],
C.TLS_1_1: [0xc013, 0xc014, 0xc009, 0xc00a, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
C.TLS_1_0: [0xc013, 0xc014, 0xc009, 0xc00a, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
}
for v in (C.TLS_1_3, C.TLS_1_2, C.TLS_1_1, C.TLS_1_0):
if not version_support.get(v):
continue
accepted: list[int] = []
for cid in probe_ciphers_per_version[v]:
name = C.CIPHER_SUITES.get(cid, f"0x{cid:04x}")
try:
probe = await probe_cipher(host, port, sni, v, cid, name)
except Exception:
continue
if probe.supported:
accepted.append(cid)
accepted_per_version[v] = accepted
result.data["ciphers"] = {
C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): [C.CIPHER_SUITES.get(c, f"0x{c:04x}") for c in cids]
for v, cids in accepted_per_version.items()
}
any_accepted = {cid for cids in accepted_per_version.values() for cid in cids}
if any_accepted:
has_fs = False
has_aead = False
has_cbc_modern = False
has_rsa_kex = False
for cid in any_accepted:
name = C.CIPHER_SUITES.get(cid, "")
if C.cipher_has_fs(name):
has_fs = True
if C.cipher_is_aead(name):
has_aead = True
if C.cipher_is_cbc(name):
has_cbc_modern = True
if name.startswith("TLS_RSA_") and not C.cipher_is_weak(name):
has_rsa_kex = True
if has_fs:
await emit(Finding("Cipher Strength", "Forward secrecy supported", "ECDHE/DHE ciphers 利用可", "good", 0, group=G_SAF))
else:
await emit(Finding("Cipher Strength", "No forward secrecy", "ECDHE/DHE が有効になっていない", "bad", 5, group=G_SAF))
if has_aead:
await emit(Finding("Cipher Strength", "AEAD ciphers supported", "GCM / ChaCha20-Poly1305 / CCM 利用可", "good", 0, group=G_SAF))
else:
await emit(Finding("Cipher Strength", "No AEAD ciphers", "", "notgood", 2, group=G_SAF))
if has_rsa_kex:
await emit(Finding("Cipher Strength", "RSA key exchange accepted",
"鍵交換に RSA_WITH_* が利用可能 — 前方秘匿性なし", "notgood", 2, group=G_SAF))
if has_cbc_modern:
await emit(Finding("Cipher Strength", "CBC ciphers in accepted list",
"LUCKY13 など CBC ベース攻撃のリスク", "info", 0, group=G_SAF))
# ---- Phase: key exchange / groups ----
await report("kex", "鍵交換グループを検査中", 0.35, "info")
groups_accepted: list[int] = []
if version_support.get(C.TLS_1_3):
try:
groups_accepted = await detect_named_groups(host, port, sni, C.TLS_1_3)
except Exception:
groups_accepted = []
result.data["named_groups"] = [C.NAMED_GROUPS.get(g, f"0x{g:04x}") for g in groups_accepted]
if groups_accepted:
strong = [g for g in groups_accepted if g in (0x001d, 0x0017, 0x0018, 0x0019, 0x001e)]
pqc = [g for g in groups_accepted if g in C.PQC_GROUPS]
weak = [g for g in groups_accepted if g in (0x0100,)]
if strong:
await emit(Finding("Key Exchange", "Modern named groups supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in strong), "good", 0, group=G_SAF))
if pqc:
await emit(Finding("Key Exchange", "Post-quantum key exchange supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in pqc), "good", 0, group=G_SAF))
else:
await emit(Finding("Key Exchange", "No post-quantum key exchange",
"X25519MLKEM768 等の PQC 鍵交換に対応していません", "info", 0, group=G_SAF))
if weak:
await emit(Finding("Key Exchange", "Weak FFDHE group supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in weak), "notgood", 3, group=G_SAF))
elif version_support.get(C.TLS_1_3):
await emit(Finding("Key Exchange", "Named groups probe inconclusive", "", "info", 0, group=G_SAF))
# ---- Phase: certificates ----
await report("cert", "証明書を取得・解析中", 0.45, "info")
chain_der = await get_peer_certificate_chain(host, port, sni)
parsed_chain = []
leaf_cert = None
leaf_summary = None
if chain_der:
for idx, der in enumerate(chain_der):
try:
c, s = parse_certificate(der)
parsed_chain.append(s)
if idx == 0:
leaf_cert = c
leaf_summary = s
except Exception:
continue
result.data["certificate_chain"] = [
{
"subject": s.subject,
"issuer": s.issuer,
"common_name": s.common_name,
"sans": s.sans,
"not_before": s.not_before,
"not_after": s.not_after,
"days_until_expiry": s.days_until_expiry,
"is_expired": s.is_expired,
"is_self_signed": s.is_self_signed,
"signature_hash_algorithm": s.signature_hash_algorithm,
"public_key_algorithm": s.public_key_algorithm,
"public_key_size_bits": s.public_key_size_bits,
"public_key_curve": s.public_key_curve,
"serial": s.serial,
"has_scts": s.has_scts,
"sct_count": s.sct_count,
"sha256_fingerprint": s.sha256_fingerprint,
"spki_sha256": s.spki_sha256,
"ocsp_urls": s.ocsp_urls,
"crl_urls": s.crl_urls,
}
for s in parsed_chain
]
if not leaf_summary:
await emit(Finding("Certificate", "No certificate retrieved", "TLS ハンドシェイクが失敗", "serious", 10, group=G_REL))
else:
# Expiry
if leaf_summary.is_expired:
await emit(Finding("Certificate Validity", "Certificate expired",
f"not_after={leaf_summary.not_after}", "serious", 10, group=G_REL))
elif leaf_summary.days_until_expiry < 15:
await emit(Finding("Certificate Validity", "Certificate expires soon",
f"残り {leaf_summary.days_until_expiry}", "bad", 4, group=G_REL))
elif leaf_summary.days_until_expiry < 30:
await emit(Finding("Certificate Validity", "Certificate expiring in <30 days",
f"残り {leaf_summary.days_until_expiry}", "notgood", 2, group=G_REL))
else:
await emit(Finding("Certificate Validity", "Certificate validity OK",
f"残り {leaf_summary.days_until_expiry}", "good", 0, group=G_REL))
# Chain length
if len(parsed_chain) == 1 and not leaf_summary.is_self_signed:
await emit(Finding("Certificate Chain", "Intermediate certificate not served",
"サーバーがチェーンに中間証明書を含めていません", "notgood", 2, group=G_REL))
elif len(parsed_chain) >= 2:
await emit(Finding("Certificate Chain", f"Chain length: {len(parsed_chain)} certs",
f"leaf → {parsed_chain[-1].subject[:80]}", "good", 0, group=G_REL))
# Self-signed
if leaf_summary.is_self_signed:
await emit(Finding("Certificate Chain", "Self-signed certificate",
"CA 署名ではなく自己署名です", "serious", 9, group=G_REL))
# Signature hash (safety — cryptographic strength)
sh = (leaf_summary.signature_hash_algorithm or "").lower()
if sh in ("md5", "md2"):
await emit(Finding("Signature Algorithm", f"Weak signature hash: {sh.upper()}", "", "serious", 10, group=G_SAF))
elif sh == "sha1":
await emit(Finding("Signature Algorithm", "Weak signature hash: SHA1", "", "bad", 5, group=G_SAF))
elif sh:
await emit(Finding("Signature Algorithm", f"Signature hash: {sh.upper()}", "", "good", 0, group=G_SAF))
# Public key strength (safety)
alg = leaf_summary.public_key_algorithm
bits = leaf_summary.public_key_size_bits
if alg == "RSA":
if bits < 1024:
await emit(Finding("Key Strength", f"Very weak RSA key ({bits}-bit)", "", "serious", 10, group=G_SAF))
elif bits < 2048:
await emit(Finding("Key Strength", f"Weak RSA key ({bits}-bit)", "", "bad", 5, group=G_SAF))
elif bits < 3072:
await emit(Finding("Key Strength", f"RSA {bits}-bit", "推奨は 3072-bit 以上", "good", 0, group=G_SAF))
else:
await emit(Finding("Key Strength", f"RSA {bits}-bit", "", "good", 0, group=G_SAF))
elif alg in ("EC", "Ed25519", "Ed448"):
await emit(Finding("Key Strength",
f"{alg} {bits}-bit {leaf_summary.public_key_curve or ''}".strip(),
"", "good", 0, group=G_SAF))
# CT / SCT (reliability — verifiable audit trail)
if leaf_summary.has_scts:
await emit(Finding("Transparency", f"Certificate Transparency SCTs present ({leaf_summary.sct_count})",
"", "good", 0, group=G_REL))
else:
await emit(Finding("Transparency", "No embedded SCTs",
"Certificate Transparency ログが埋め込まれていません", "notgood", 1, group=G_REL))
# Hostname match (reliability — identity verification)
if sni:
names = [leaf_summary.common_name] if leaf_summary.common_name else []
names += leaf_summary.sans
matched = False
for n in names:
if not n:
continue
n = n.lower().strip()
h = sni.lower().strip()
if n == h:
matched = True
break
if n.startswith("*.") and "." in h and h.split(".", 1)[1] == n[2:]:
matched = True
break
if matched:
await emit(Finding("Hostname", "Hostname matches certificate", "", "good", 0, group=G_REL))
else:
await emit(Finding("Hostname", "Hostname does not match certificate",
f"SNI={sni}, CN={leaf_summary.common_name}", "serious", 8, group=G_REL))
else:
await emit(Finding("Hostname", "IP literal target: hostname validation skipped",
"IP 指定のため SNI/証明書名一致の判定を行っていません", "notgood", 2, group=G_REL))
# ---- Phase: trust stores ----
await report("trust", "5 プラットフォームのトラストストアを照合中", 0.55, "info")
try:
trust_results = await verify_across_platforms(host, port, sni)
except Exception as e:
trust_results = []
await emit(Finding("Trust Store", "Trust evaluation failed", str(e), "info", 0, group=G_REL))
result.data["trust"] = [
{"platform": t.platform, "trusted": t.trusted, "error": t.error} for t in trust_results
]
untrusted = [t for t in trust_results if not t.trusted]
trusted = [t for t in trust_results if t.trusted]
if trust_results and not untrusted:
await emit(Finding("Trust Store", "Trusted on all platforms",
", ".join(t.platform for t in trusted), "good", 0, group=G_REL))
else:
for t in untrusted:
await emit(Finding("Trust Store", f"Not trusted by {t.platform}",
t.error or "", "serious", 6, group=G_REL))
for t in trusted:
await emit(Finding("Trust Store", f"Trusted by {t.platform}", "", "good", 0, group=G_REL))
# ---- Phase: revocation ----
await report("revocation", "証明書失効を確認中", 0.62, "info")
if leaf_cert and len(chain_der) >= 2:
try:
issuer_cert, _ = parse_certificate(chain_der[1])
ocsp_res = await check_ocsp(leaf_cert, issuer_cert)
result.data["ocsp"] = {
"checked": ocsp_res.checked,
"revoked": ocsp_res.revoked,
"reason": ocsp_res.reason,
"source": ocsp_res.source,
"error": ocsp_res.error,
}
if ocsp_res.checked and ocsp_res.revoked:
await emit(Finding("Revocation", "Certificate revoked (OCSP)",
ocsp_res.reason or "", "serious", 10, group=G_REL))
elif ocsp_res.checked:
await emit(Finding("Revocation", "OCSP: certificate is not revoked",
ocsp_res.source or "", "good", 0, group=G_REL))
elif ocsp_res.error:
await emit(Finding("Revocation", "OCSP check inconclusive",
ocsp_res.error, "info", 0, group=G_REL))
except Exception as e:
result.data["ocsp"] = {"error": str(e)}
if leaf_cert:
try:
crl_res = await check_crl(leaf_cert)
result.data["crl"] = {
"checked": crl_res.checked,
"revoked": crl_res.revoked,
"source": crl_res.source,
"error": crl_res.error,
}
if crl_res.checked and crl_res.revoked:
await emit(Finding("Revocation", "Certificate revoked (CRL)",
crl_res.source, "serious", 10, group=G_REL))
except Exception as e:
result.data["crl"] = {"error": str(e)}
# ---- Phase: pwnedkeys ----
if leaf_summary and leaf_summary.spki_sha256:
await report("pwnedkeys", "pwnedkeys.com で鍵漏洩を確認中", 0.66, "info")
try:
pwned, err = await check_spki(leaf_summary.spki_sha256)
result.data["pwnedkeys"] = {"pwned": pwned, "error": err}
if pwned:
await emit(Finding("Key Exposure", "Private key is publicly known (pwnedkeys)",
"この公開鍵に対応する秘密鍵は既に漏洩しています", "serious", 10, group=G_REL))
elif err is None:
await emit(Finding("Key Exposure", "Private key not listed in pwnedkeys", "",
"good", 0, group=G_REL))
except Exception as e:
result.data["pwnedkeys"] = {"error": str(e)}
# ---- Phase: HSTS ----
await report("hsts", "HSTS 設定を確認中", 0.70, "info")
try:
hsts = await fetch_hsts(host, port)
except Exception as e:
hsts = None
result.data["hsts"] = {"error": str(e)}
if hsts is not None:
result.data["hsts"] = {
"present": hsts.present,
"max_age": hsts.max_age,
"include_subdomains": hsts.include_subdomains,
"preload": hsts.preload,
"raw": hsts.raw,
"error": hsts.error,
}
if hsts.present:
if hsts.max_age >= 15552000:
await emit(Finding("HSTS", "HSTS enabled with sufficient max-age",
f"max-age={hsts.max_age}", "good", 0, group=G_REL))
elif hsts.max_age > 0:
await emit(Finding("HSTS", "HSTS max-age too short",
f"max-age={hsts.max_age} (推奨 >= 15552000)", "notgood", 2, group=G_REL))
if hsts.include_subdomains:
await emit(Finding("HSTS", "HSTS includeSubDomains set", "", "good", 0, group=G_REL))
else:
await emit(Finding("HSTS", "HSTS includeSubDomains not set", "", "info", 0, group=G_REL))
else:
await emit(Finding("HSTS", "HSTS header missing", "", "notgood", 3, group=G_REL))
# ---- Phase: HSTS preload ----
if sni:
await report("preload", "HSTS preload リストを確認中 (Chrome/Firefox/Edge/IE)", 0.74, "info")
try:
preload_results = await check_preload(sni)
except Exception as e:
preload_results = []
result.data["preload_error"] = str(e)
result.data["preload"] = [
{"browser": p.browser, "listed": p.listed, "include_subdomains": p.include_subdomains,
"error": p.source_error}
for p in preload_results
]
listed_any = [p for p in preload_results if p.listed]
not_listed = [p for p in preload_results if not p.listed and not p.source_error]
if listed_any:
await emit(Finding("HSTS Preload", "Listed in HSTS preload list",
", ".join(p.browser for p in listed_any), "good", 0, group=G_REL))
if not_listed and not listed_any:
await emit(Finding("HSTS Preload", "Not on HSTS preload list",
", ".join(p.browser for p in not_listed), "info", 0, group=G_REL))
# ---- Phase: CAA ----
if sni:
await report("caa", "CAA レコードを検査中", 0.77, "info")
try:
caa = await lookup_caa(sni)
except Exception as e:
caa = None
result.data["caa"] = {"error": str(e)}
if caa is not None:
result.data["caa"] = {
"effective_host": caa.effective_host,
"records": caa.records,
"error": caa.error,
}
if caa.records:
await emit(Finding("CAA", f"CAA records present ({caa.effective_host})",
"; ".join(caa.records), "good", 0, group=G_REL))
else:
await emit(Finding("CAA", "No CAA records",
"CAA が設定されていません", "notgood", 1, group=G_REL))
# ---- Phase: HTTP ----
await report("http", "HTTP/1/2 を検査中", 0.80, "info")
try:
http_info = await probe_http(host, port)
except Exception as e:
http_info = None
result.data["http_error"] = str(e)
if http_info is not None:
result.data["http"] = {
"http1": http_info.http1,
"http1_status": http_info.http1_status,
"http2": http_info.http2,
"http3_via_alt_svc": http_info.http3_via_alt_svc,
"alt_svc": http_info.alt_svc,
"server": http_info.server,
"content_encoding": http_info.content_encoding,
"compression_enabled": http_info.compression_enabled,
"valid_http_response": http_info.valid_http_response,
"error": http_info.error,
}
if not http_info.valid_http_response:
await emit(Finding("HTTP", "No valid HTTP response",
http_info.error or "", "bad", 6, group=G_CMP))
else:
if http_info.http1:
await emit(Finding("HTTP", "HTTP/1.1 supported", "", "good", 0, group=G_CMP))
if http_info.http2:
await emit(Finding("HTTP", "HTTP/2 supported", "", "good", 0, group=G_CMP))
else:
await emit(Finding("HTTP", "HTTP/2 not supported", "", "notgood", 1, group=G_CMP))
if http_info.compression_enabled:
await emit(Finding("HTTP Compression",
"HTTP compression enabled (BREACH risk)",
f"content-encoding={http_info.content_encoding}",
"notgood", 2, group=G_VLN))
else:
await emit(Finding("HTTP Compression", "HTTP compression disabled",
"BREACH の可能性なし", "good", 0, group=G_VLN))
# ---- Phase: HTTP/3 ----
await report("http3", "HTTP/3 (QUIC) を検査中", 0.84, "info")
try:
h3 = await probe_http3(host, port, sni)
except Exception as e:
h3 = None
result.data["http3_error"] = str(e)
if h3 is not None:
result.data["http3"] = {"supported": h3.supported, "error": h3.error}
if h3.supported:
await emit(Finding("HTTP/3", "HTTP/3 supported", "", "good", 0, group=G_CMP))
else:
await emit(Finding("HTTP/3", "HTTP/3 not supported", h3.error or "", "info", 0, group=G_CMP))
# ---- Phase: ALPN ----
try:
alpn = await alpn_negotiate(host, port, sni, ["h2", "http/1.1"])
except Exception:
alpn = None
if alpn:
result.data["alpn"] = alpn
await emit(Finding("ALPN", f"ALPN negotiated: {alpn}", "", "good", 0, group=G_CMP))
# ---- Phase: vulnerabilities (passive + active) ----
await report("vulns", "脆弱性を検査中", 0.88, "info")
vp = analyze_vulns(versions_supported, accepted_per_version)
if vp.drown:
await emit(Finding("DROWN", "DROWN (SSLv2 exposure)", "SSLv2 が有効", "serious", 10, group=G_VLN))
if vp.poodle_ssl:
await emit(Finding("POODLE", "POODLE (SSLv3 CBC)", "", "serious", 10, group=G_VLN))
if vp.beast:
await emit(Finding("BEAST", "BEAST (TLS 1.0 CBC)", "", "notgood", 3, group=G_VLN))
if vp.sweet32:
await emit(Finding("SWEET32", "SWEET32 (3DES)",
"3DES が accepted cipher list にあり", "bad", 4, group=G_VLN))
if vp.rc4:
await emit(Finding("RC4", "RC4 enabled", "", "serious", 7, group=G_VLN))
if vp.freak:
await emit(Finding("FREAK", "FREAK (EXPORT RSA)", "", "serious", 10, group=G_VLN))
if vp.logjam_export:
await emit(Finding("LOGJAM", "LOGJAM (EXPORT DHE)", "", "serious", 10, group=G_VLN))
if vp.null_cipher:
await emit(Finding("NULL cipher", "NULL cipher enabled", "", "serious", 10, group=G_VLN))
if vp.anon_cipher:
await emit(Finding("Anon cipher", "Anonymous cipher enabled", "", "serious", 10, group=G_VLN))
if vp.lucky13:
await emit(Finding("LUCKY13", "LUCKY13 (CBC)", "", "notgood", 2, group=G_VLN))
# If no passive vulns detected across the major ones, surface a positive finding.
none_of_the_above = not any([
vp.drown, vp.poodle_ssl, vp.beast, vp.sweet32, vp.rc4,
vp.freak, vp.logjam_export, vp.null_cipher, vp.anon_cipher,
])
if none_of_the_above and any_accepted:
await emit(Finding("Cipher Vulnerabilities",
"No known cipher-family vulnerabilities",
"DROWN / POODLE / FREAK / LOGJAM / RC4 / 3DES / NULL / anon いずれも該当なし",
"good", 0, group=G_VLN))
# Active probes
try:
hb = await probe_heartbleed(host, port, sni)
result.data["heartbleed"] = {
"vulnerable": hb.vulnerable,
"heartbeat_extension": hb.heartbeat_extension_advertised,
"error": hb.error,
}
if hb.vulnerable:
await emit(Finding("Heartbleed", "Heartbleed (CVE-2014-0160)", "", "serious", 10, group=G_VLN))
elif hb.heartbeat_extension_advertised:
await emit(Finding("Heartbleed", "Heartbeat extension advertised but not exploitable",
"", "notgood", 1, group=G_VLN))
else:
await emit(Finding("Heartbleed", "Not vulnerable to Heartbleed", "", "good", 0, group=G_VLN))
except Exception as e:
result.data["heartbleed"] = {"error": str(e)}
try:
ccs_vuln, ccs_msg = await probe_ccs(host, port, sni)
result.data["ccs_injection"] = {"vulnerable": ccs_vuln, "detail": ccs_msg}
if ccs_vuln:
await emit(Finding("CCS Injection", "CCS Injection (CVE-2014-0224)", "", "serious", 10, group=G_VLN))
else:
await emit(Finding("CCS Injection", "Not vulnerable to CCS Injection", "", "good", 0, group=G_VLN))
except Exception as e:
result.data["ccs_injection"] = {"error": str(e)}
try:
renego_ok, renego_info = await probe_secure_renegotiation(host, port, sni)
result.data["secure_renegotiation"] = {"supported": renego_ok, "detail": renego_info}
if renego_ok:
detail = renego_info or "RFC 5746 renegotiation_info extension acknowledged"
await emit(Finding("Renegotiation", "Secure renegotiation supported", detail, "good", 0, group=G_VLN))
else:
await emit(Finding("Renegotiation", "Secure renegotiation not supported",
renego_info or "", "bad", 4, group=G_VLN))
except Exception as e:
result.data["secure_renegotiation"] = {"error": str(e)}
try:
scsv_ok, scsv_info = await probe_fallback_scsv(host, port, sni)
result.data["fallback_scsv"] = {"supported": scsv_ok, "detail": scsv_info}
if scsv_ok:
detail = scsv_info or "TLS 1.1 ClientHello with SCSV rejected with inappropriate_fallback"
await emit(Finding("Fallback Protection", "TLS_FALLBACK_SCSV enforced", detail, "good", 0, group=G_VLN))
else:
await emit(Finding("Fallback Protection", "TLS_FALLBACK_SCSV not enforced",
scsv_info or "", "notgood", 2, group=G_VLN))
except Exception as e:
result.data["fallback_scsv"] = {"error": str(e)}
# ---- Phase: handshake simulation ----
await report("handshake_sim", "代表的なクライアントとのハンドシェイクを模擬中", 0.95, "info")
try:
sim = await simulate_handshakes(host, port, sni)
except Exception as e:
sim = []
result.data["handshake_sim_error"] = str(e)
result.data["handshake_simulation"] = [
{
"client": s.client,
"connected": s.connected,
"negotiated_version": s.negotiated_version,
"negotiated_cipher": s.negotiated_cipher,
"error": s.error,
}
for s in sim
]
if sim:
ok_count = sum(1 for s in sim if s.connected)
total = len(sim)
# Emit per-client findings so the Compatibility tab has granular entries.
for s in sim:
if s.connected:
await emit(Finding("Client Handshake",
f"{s.client}: OK",
f"{s.negotiated_version} / {s.negotiated_cipher}",
"good", 0, group=G_CMP))
else:
await emit(Finding("Client Handshake",
f"{s.client}: failed",
s.error or "",
"info", 0, group=G_CMP))
await emit(Finding("Client Handshake",
f"Handshake summary: {ok_count}/{total} clients connected",
"", "info", 0, group=G_CMP))
async def run_full_scan(target: str, report: ReportProgress, finds: ReportFinding) -> ScanResult:
host, port, sni = parse_target(target)
started = time.time()
result = ScanResult(target=target, host=host, port=port, started_at=started)
result.data["sni"] = sni
# Track the current phase slug so each emitted Finding can be stamped with
# the step that produced it (used in the UI log as "[step]").
_step = {"name": ""}
_orig_report = report
_orig_finds = finds
async def report(phase: str, detail: str, progress: float, severity: str = "info") -> None: # type: ignore[no-redef]
_step["name"] = phase
await _orig_report(phase, detail, progress, severity)
async def finds(f: Finding) -> None: # type: ignore[no-redef]
if not f.step:
f.step = _step["name"]
await _orig_finds(f)
await report("init", f"対象: {target} (host={host}, port={port})", 0.01, "info")
resolved = await resolve_host(host)
if resolved is None:
await report("dns", "名前解決に失敗しました", 1.0, "serious")
result.error = "dns_fail"
result.finished_at = time.time()
apply_rank(result)
result.score = 0.0
result.rank = "R"
result.add(Finding("Connectivity", "DNS resolution failed", host, "serious", 10, group=G_REL))
await finds(result.findings[-1])
return result
result.data["resolved_ip"] = resolved
reachable = await tcp_reachable(host, port)
if not reachable:
await report("tcp", f"TCP {host}:{port} に接続できません", 1.0, "serious")
result.error = "no_tls"
result.add(Finding("Connectivity", "TCP port unreachable", f"{host}:{port}", "serious", 10, group=G_REL))
await finds(result.findings[-1])
result.finished_at = time.time()
apply_rank(result)
return result
try:
await _gather(report, finds, result)
except asyncio.CancelledError:
raise
except Exception as e:
result.error = f"{e.__class__.__name__}: {e}"
result.add(Finding("Engine", "Scan error", str(e), "serious", 5, group=G_REL))
await finds(result.findings[-1])
result.finished_at = time.time()
apply_rank(result)
await report("done", f"スコア={result.score} ランク={result.rank}", 1.0, "info")
return result
@@ -1,160 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from .protocol import constants as C
from .protocol import wire
from .protocol.client import send_client_hello
@dataclass
class ClientProfile:
name: str
client_hello_version: int
cipher_suites: list[int]
versions: list[int] | None = None # for supported_versions ext (TLS 1.3 capable)
groups: list[int] = field(default_factory=lambda: [0x001d, 0x0017, 0x0018])
PROFILES: list[ClientProfile] = [
ClientProfile(
name="Android 4.4.2",
client_hello_version=C.TLS_1_0,
cipher_suites=[0xc013, 0xc014, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
),
ClientProfile(
name="Android 7.0",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Android 12",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d],
),
ClientProfile(
name="Chrome 49",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0x009c, 0x009e, 0xc00a, 0xc009, 0xc013, 0xc014, 0x0033, 0x0039, 0x002f, 0x0035, 0x000a],
),
ClientProfile(
name="Chrome 120",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc02c, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Firefox 115",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Safari 14",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02c, 0xc02b, 0xcca9, 0xc030, 0xc02f, 0xcca8, 0xc024, 0xc023, 0xc00a, 0xc009, 0xc028, 0xc027, 0xc014, 0xc013, 0x009d, 0x009c, 0x003d, 0x003c, 0x0035, 0x002f],
),
ClientProfile(
name="IE 8 (Win XP)",
client_hello_version=C.SSL_3_0,
cipher_suites=[0x0004, 0x0005, 0x000a, 0x002f, 0x0035],
),
ClientProfile(
name="IE 11 (Win 8.1)",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc030, 0xc02f, 0xc014, 0xc013, 0x009f, 0x009e, 0x0035, 0x002f, 0x000a],
),
ClientProfile(
name="Edge 120",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Java 8u291",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0xc02c, 0xc030, 0xcca9, 0xcca8, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Java 17",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1302, 0x1303, 0x1301, 0xc02c, 0xc02b, 0xc030, 0xc02f, 0xcca9, 0xcca8, 0x009d, 0x009c, 0x0035, 0x002f],
),
]
@dataclass
class SimResult:
client: str
connected: bool
negotiated_version: str = ""
negotiated_cipher: str = ""
error: str | None = None
async def _simulate_one(host: str, port: int, sni: str | None, p: ClientProfile) -> SimResult:
exts_parts = []
if sni and not _is_ip(sni):
try:
exts_parts.append(wire.ext_server_name(sni))
except Exception:
pass
exts_parts.append(wire.ext_ec_point_formats())
exts_parts.append(wire.ext_supported_groups(p.groups))
exts_parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501, 0x0603, 0x0806, 0x0601,
]))
exts_parts.append(wire.ext_renegotiation_info_empty())
if p.versions and C.TLS_1_3 in p.versions:
exts_parts.append(wire.ext_supported_versions_client(p.versions))
exts_parts.append(wire.ext_psk_key_exchange_modes())
exts_parts.append(wire.ext_key_share_empty())
exts = b"".join(exts_parts)
try:
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=p.client_hello_version,
cipher_suites=p.cipher_suites,
extensions=exts,
sni=sni,
)
except Exception as e:
return SimResult(client=p.name, connected=False, error=f"{e.__class__.__name__}: {e}")
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
return SimResult(client=p.name, connected=False, error=res.error or "handshake failed")
v = sh.negotiated_version or sh.server_version
version_name = C.PROTOCOL_NAMES.get(v, f"0x{v:04x}")
cipher_name = C.CIPHER_SUITES.get(sh.cipher_suite, f"0x{sh.cipher_suite:04x}") if sh.cipher_suite else ""
return SimResult(
client=p.name,
connected=True,
negotiated_version=version_name,
negotiated_cipher=cipher_name,
)
async def simulate(host: str, port: int, sni: str | None) -> list[SimResult]:
# Run profiles concurrently with a small cap to avoid burst-connecting the server.
sem = asyncio.Semaphore(4)
async def guarded(p: ClientProfile) -> SimResult:
async with sem:
return await _simulate_one(host, port, sni, p)
results = await asyncio.gather(*(guarded(p) for p in PROFILES), return_exceptions=False)
return list(results)
def _is_ip(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@@ -1,46 +0,0 @@
from __future__ import annotations
import re
from dataclasses import dataclass
import httpx
@dataclass
class HstsInfo:
present: bool = False
max_age: int = 0
include_subdomains: bool = False
preload: bool = False
raw: str = ""
error: str | None = None
async def fetch_hsts(host: str, port: int, timeout: float = 6.0) -> HstsInfo:
url = f"https://{host}:{port}/" if port != 443 else f"https://{host}/"
try:
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=False,
verify=False,
headers={"User-Agent": "nercone-tls-test/1.0"},
) as client:
resp = await client.get(url)
except Exception as e:
return HstsInfo(error=f"{e.__class__.__name__}: {e}")
header = resp.headers.get("strict-transport-security", "")
if not header:
return HstsInfo(present=False, raw="")
info = HstsInfo(present=True, raw=header)
for token in header.split(";"):
t = token.strip().lower()
if t.startswith("max-age"):
_, _, v = t.partition("=")
v = v.strip().strip('"').strip("'")
try:
info.max_age = int(v)
except ValueError:
info.max_age = 0
elif t == "includesubdomains":
info.include_subdomains = True
elif t == "preload":
info.preload = True
return info
@@ -1,82 +0,0 @@
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass, field
import httpx
@dataclass
class HttpInfo:
http1: bool = False
http1_status: int | None = None
http2: bool = False
http3_via_alt_svc: bool = False
alt_svc: str | None = None
server: str | None = None
content_encoding: str | None = None
valid_http_response: bool = False
error: str | None = None
compression_enabled: bool = False
async def probe_http(host: str, port: int, timeout: float = 6.0) -> HttpInfo:
info = HttpInfo()
url = f"https://{host}:{port}/" if port != 443 else f"https://{host}/"
headers = {
"User-Agent": "nercone-tls-test/1.0",
"Accept-Encoding": "gzip, deflate, br",
}
try:
async with httpx.AsyncClient(
timeout=timeout,
verify=False,
follow_redirects=False,
headers=headers,
http1=True,
http2=False,
) as client:
resp = await client.get(url)
info.http1 = True
info.http1_status = resp.status_code
info.valid_http_response = True
info.server = resp.headers.get("server")
info.alt_svc = resp.headers.get("alt-svc")
info.content_encoding = resp.headers.get("content-encoding")
if info.content_encoding:
info.compression_enabled = True
except Exception as e:
info.error = f"http1: {e.__class__.__name__}: {e}"
try:
async with httpx.AsyncClient(
timeout=timeout,
verify=False,
follow_redirects=False,
headers=headers,
http1=False,
http2=True,
) as client:
resp = await client.get(url)
info.http2 = (resp.http_version.upper().startswith("HTTP/2"))
if info.http2:
if not info.alt_svc:
info.alt_svc = resp.headers.get("alt-svc")
if not info.server:
info.server = resp.headers.get("server")
except ImportError:
# httpx was installed without the [http2] extra; fall back to ALPN.
try:
from .protocol.probes import alpn_negotiate # lazy import to avoid cycles
proto = await alpn_negotiate(host, port, host, ["h2", "http/1.1"], timeout=timeout)
if proto == "h2":
info.http2 = True
except Exception:
pass
except Exception:
pass
if info.alt_svc and ("h3=" in info.alt_svc or "h3-" in info.alt_svc):
info.http3_via_alt_svc = True
return info
@@ -1,50 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
@dataclass
class Http3Info:
supported: bool = False
error: str | None = None
async def probe_http3(host: str, port: int, sni: str | None, timeout: float = 6.0) -> Http3Info:
"""Attempt a QUIC handshake to determine HTTP/3 availability.
Uses aioquic if available. On any failure returns supported=False.
"""
try:
from aioquic.asyncio.client import connect
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import HandshakeCompleted
except Exception as e:
return Http3Info(supported=False, error=f"aioquic unavailable: {e}")
info = Http3Info()
try:
host_for_quic = sni or host
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["h3", "h3-29"],
verify_mode=None,
server_name=host_for_quic,
)
try:
import ssl as _ssl
configuration.verify_mode = _ssl.CERT_NONE
except Exception:
pass
async def _run():
# aioquic ≥ 0.9 dropped the `server_name` kwarg on connect(); SNI
# is taken from QuicConfiguration.server_name instead.
async with connect(host, port, configuration=configuration) as client:
await client.wait_connected()
return True
ok = await asyncio.wait_for(_run(), timeout=timeout)
info.supported = bool(ok)
except Exception as e:
info.supported = False
info.error = f"{e.__class__.__name__}: {e}"
return info
@@ -1,3 +0,0 @@
from .check import check_preload, PRELOAD_BROWSERS
__all__ = ["check_preload", "PRELOAD_BROWSERS"]
@@ -1,161 +0,0 @@
from __future__ import annotations
import asyncio
import json
import re
import time
from dataclasses import dataclass
from pathlib import Path
import httpx
CACHE_DIR = Path.cwd() / "databases" / "hsts_preload_cache"
CACHE_TTL = 24 * 60 * 60
CHROME_URL = "https://raw.githubusercontent.com/chromium/chromium/main/net/http/transport_security_state_static.json"
FIREFOX_URL = "https://hg.mozilla.org/mozilla-central/raw-file/tip/security/manager/ssl/nsSTSPreloadList.inc"
PRELOAD_BROWSERS = ["chrome", "firefox", "edge", "ie"]
@dataclass
class PreloadResult:
browser: str
listed: bool
include_subdomains: bool = False
source_error: str | None = None
def _cache_path(name: str) -> Path:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
return CACHE_DIR / f"{name}.json"
def _cache_fresh(p: Path) -> bool:
try:
return p.is_file() and (time.time() - p.stat().st_mtime) < CACHE_TTL
except Exception:
return False
async def _fetch_chrome_list(timeout: float = 60.0) -> dict[str, bool] | None:
"""Return mapping: host -> include_subdomains, or None on fetch/parse error."""
cache = _cache_path("chrome")
if _cache_fresh(cache):
try:
data = json.loads(cache.read_text())
if data:
return data
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"},
follow_redirects=True) as client:
resp = await client.get(CHROME_URL)
if resp.status_code != 200:
return None
raw = resp.text
except Exception:
return None
# Strip // comments (json with comments) and parse a loose subset
cleaned = re.sub(r"//[^\n]*", "", raw)
cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL)
result: dict[str, bool] = {}
try:
data = json.loads(cleaned)
for entry in data.get("entries", []):
name = entry.get("name")
if name:
result[name.lower()] = bool(entry.get("include_subdomains", False))
except Exception:
for m in re.finditer(r'"name"\s*:\s*"([^"]+)"\s*,\s*"policy"[^}]*?"include_subdomains"\s*:\s*(true|false)', cleaned):
result[m.group(1).lower()] = m.group(2) == "true"
if result:
try:
cache.write_text(json.dumps(result))
except Exception:
pass
return result if result else None
async def _fetch_firefox_list(timeout: float = 60.0) -> dict[str, bool] | None:
cache = _cache_path("firefox")
if _cache_fresh(cache):
try:
data = json.loads(cache.read_text())
if data:
return data
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"},
follow_redirects=True) as client:
resp = await client.get(FIREFOX_URL)
if resp.status_code != 200:
return None
text = resp.text
except Exception:
return None
result: dict[str, bool] = {}
# Historically Firefox shipped the preload list as a C-array of
# { "host.example", true },
# but the current nsSTSPreloadList.inc (2024+) is a plain CSV with
# host.example, 1
# per line (1 = include_subdomains, 0 = not). Try both, brace-format
# first; fall back to the CSV form.
for m in re.finditer(r'\{\s*"([^"]+)"\s*,\s*(true|false)\s*\}', text):
result[m.group(1).lower()] = m.group(2) == "true"
if not result:
# Plain CSV format. Accept lines that look like "host, 0|1" —
# including single-label TLD entries (e.g. "dev, 1", "app, 1",
# "google, 1") which Mozilla ships alongside FQDNs. The explicit
# ", [01]" suffix is enough to reject the multiline "/* ... */"
# license header whose continuation lines start with " *".
line_re = re.compile(r'^\s*([a-z0-9][a-z0-9.\-]*[a-z0-9])\s*,\s*([01])\s*$', re.IGNORECASE)
for line in text.splitlines():
m = line_re.match(line)
if not m:
continue
host = m.group(1).lower()
# A bare "host" with no dot is only valid as a TLD-style entry.
# Everything else must contain at least a dot or it's junk.
if "." not in host and not host.isalnum():
continue
result[host] = m.group(2) == "1"
if result:
try:
cache.write_text(json.dumps(result))
except Exception:
pass
return result if result else None
def _lookup(host: str, table: dict[str, bool]) -> tuple[bool, bool]:
h = host.lower().strip(".")
if h in table:
return True, table[h]
# walk parent domains to catch include_subdomains matches
parts = h.split(".")
for i in range(1, len(parts)):
parent = ".".join(parts[i:])
if parent in table and table[parent]:
return True, True
return False, False
async def check_preload(host: str) -> list[PreloadResult]:
chrome = await _fetch_chrome_list()
firefox = await _fetch_firefox_list()
results: list[PreloadResult] = []
c_listed, c_sub = _lookup(host, chrome) if chrome is not None else (False, False)
f_listed, f_sub = _lookup(host, firefox) if firefox is not None else (False, False)
chrome_err = "source unavailable" if chrome is None else None
firefox_err = "source unavailable" if firefox is None else None
results.append(PreloadResult("chrome", c_listed, c_sub, chrome_err))
results.append(PreloadResult("firefox", f_listed, f_sub, firefox_err))
# Edge (Chromium) and Internet Explorer on Windows 10+ both rely on the
# Chromium HSTS preload list via WinINet; no separate list is published
# for either. Report both with the Chromium lookup so the UI doesn't
# carry phantom "not supported" / "source unavailable" entries.
results.append(PreloadResult("edge", c_listed, c_sub, chrome_err))
results.append(PreloadResult("ie", c_listed, c_sub, chrome_err))
return results
@@ -1,145 +0,0 @@
from __future__ import annotations
import asyncio
import ipaddress
import socket
from dataclasses import dataclass
from . import constants as C
from . import wire
CONNECT_TIMEOUT = 6.0
IO_TIMEOUT = 8.0
@dataclass
class ProbeResult:
connected: bool
server_hello: wire.ParsedServerHello | None
alert: tuple[int, int] | None
raw: bytes
error: str | None = None
def is_ip_literal(host: str) -> bool:
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
async def _open(host: str, port: int, timeout: float = CONNECT_TIMEOUT) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
async def send_client_hello(
host: str,
port: int,
*,
record_version: int = C.TLS_1_0,
client_hello_version: int = C.TLS_1_2,
cipher_suites: list[int],
extensions: bytes = b"",
sni: str | None = None,
timeout: float = IO_TIMEOUT,
) -> ProbeResult:
try:
reader, writer = await _open(host, port)
except Exception as e:
return ProbeResult(False, None, None, b"", str(e))
try:
ch = wire.build_client_hello(
record_version=record_version,
client_hello_version=client_hello_version,
hostname=sni,
cipher_suites=cipher_suites,
extensions=extensions,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = b""
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
pass
parsed = wire.parse_server_response(data) if data else None
alert = parsed.alert if parsed else None
return ProbeResult(True, parsed, alert, data)
except Exception as e:
return ProbeResult(True, None, None, b"", str(e))
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
async def send_ssl2_client_hello(host: str, port: int) -> ProbeResult:
try:
reader, writer = await _open(host, port)
except Exception as e:
return ProbeResult(False, None, None, b"", str(e))
try:
writer.write(wire.build_ssl2_client_hello())
try:
await asyncio.wait_for(writer.drain(), timeout=IO_TIMEOUT)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=IO_TIMEOUT)
except Exception:
data = b""
# In SSLv2 a SERVER-HELLO response has msg_type 4 in the body.
is_sslv2 = False
if len(data) >= 3:
# 2-byte length header with MSB set is SSLv2, then body[0] = msg type
if data[0] & 0x80:
length = ((data[0] & 0x7f) << 8) | data[1]
if length >= 1 and len(data) >= 3 and data[2] == 4:
is_sslv2 = True
alert = None if is_sslv2 else (0, 0)
parsed = None
return ProbeResult(True, parsed, alert, data)
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
async def tcp_reachable(host: str, port: int, timeout: float = CONNECT_TIMEOUT) -> bool:
try:
_, w = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
try:
w.close()
try:
await asyncio.wait_for(w.wait_closed(), timeout=1.0)
except Exception:
pass
except Exception:
pass
return True
except Exception:
return False
async def resolve_host(host: str) -> str | None:
if is_ip_literal(host):
return host.strip("[]")
loop = asyncio.get_running_loop()
try:
infos = await loop.getaddrinfo(host, None, type=socket.SOCK_STREAM)
except Exception:
return None
for family, _, _, _, sockaddr in infos:
if family == socket.AF_INET:
return sockaddr[0]
for family, _, _, _, sockaddr in infos:
if family == socket.AF_INET6:
return sockaddr[0]
return None
@@ -1,165 +0,0 @@
from __future__ import annotations
SSL_2_0 = 0x0002
SSL_3_0 = 0x0300
TLS_1_0 = 0x0301
TLS_1_1 = 0x0302
TLS_1_2 = 0x0303
TLS_1_3 = 0x0304
PROTOCOL_NAMES = {
SSL_2_0: "SSL 2.0",
SSL_3_0: "SSL 3.0",
TLS_1_0: "TLS 1.0",
TLS_1_1: "TLS 1.1",
TLS_1_2: "TLS 1.2",
TLS_1_3: "TLS 1.3",
}
CT_HANDSHAKE = 22
CT_ALERT = 21
CT_CHANGE_CIPHER_SPEC = 20
CT_APPLICATION_DATA = 23
CT_HEARTBEAT = 24
HS_CLIENT_HELLO = 1
HS_SERVER_HELLO = 2
HS_CERTIFICATE = 11
HS_SERVER_KEY_EXCHANGE = 12
HS_SERVER_HELLO_DONE = 14
HS_HELLO_RETRY_REQUEST = 6
EXT_SERVER_NAME = 0x0000
EXT_STATUS_REQUEST = 0x0005
EXT_SUPPORTED_GROUPS = 0x000a
EXT_EC_POINT_FORMATS = 0x000b
EXT_SIGNATURE_ALGORITHMS = 0x000d
EXT_HEARTBEAT = 0x000f
EXT_ALPN = 0x0010
EXT_SIGNED_CERT_TIMESTAMP = 0x0012
EXT_EXTENDED_MASTER_SECRET = 0x0017
EXT_SESSION_TICKET = 0x0023
EXT_SUPPORTED_VERSIONS = 0x002b
EXT_PSK_KEY_EXCHANGE_MODES = 0x002d
EXT_KEY_SHARE = 0x0033
EXT_RENEGOTIATION_INFO = 0xff01
NAMED_GROUPS = {
0x0017: "secp256r1",
0x0018: "secp384r1",
0x0019: "secp521r1",
0x001d: "x25519",
0x001e: "x448",
0x0100: "ffdhe2048",
0x0101: "ffdhe3072",
0x0102: "ffdhe4096",
0x0103: "ffdhe6144",
0x0104: "ffdhe8192",
0x11ec: "X25519MLKEM768",
0x11eb: "SecP256r1MLKEM768",
0x6399: "X25519Kyber768Draft00",
0x639a: "SecP256r1Kyber768Draft00",
}
PQC_GROUPS = {0x11ec, 0x11eb, 0x6399, 0x639a}
SIGNATURE_ALGORITHMS = {
0x0401: "rsa_pkcs1_sha256",
0x0501: "rsa_pkcs1_sha384",
0x0601: "rsa_pkcs1_sha512",
0x0403: "ecdsa_secp256r1_sha256",
0x0503: "ecdsa_secp384r1_sha384",
0x0603: "ecdsa_secp521r1_sha512",
0x0804: "rsa_pss_rsae_sha256",
0x0805: "rsa_pss_rsae_sha384",
0x0806: "rsa_pss_rsae_sha512",
0x0807: "ed25519",
0x0808: "ed448",
0x0809: "rsa_pss_pss_sha256",
0x080a: "rsa_pss_pss_sha384",
0x080b: "rsa_pss_pss_sha512",
}
# Common cipher suite ID → name (subset but covers everything we check).
CIPHER_SUITES: dict[int, str] = {
# TLS 1.3
0x1301: "TLS_AES_128_GCM_SHA256",
0x1302: "TLS_AES_256_GCM_SHA384",
0x1303: "TLS_CHACHA20_POLY1305_SHA256",
0x1304: "TLS_AES_128_CCM_SHA256",
0x1305: "TLS_AES_128_CCM_8_SHA256",
# ECDHE-RSA AEAD
0xc02f: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
0xc030: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
0xcca8: "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
# ECDHE-ECDSA AEAD
0xc02b: "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
0xc02c: "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
0xcca9: "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256",
# ECDHE CBC (weak)
0xc013: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
0xc014: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
0xc009: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
0xc00a: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
0xc027: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
0xc028: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
0xc023: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
0xc024: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384",
# DHE CBC (legacy)
0x0033: "TLS_DHE_RSA_WITH_AES_128_CBC_SHA",
0x0039: "TLS_DHE_RSA_WITH_AES_256_CBC_SHA",
0x0067: "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256",
0x006b: "TLS_DHE_RSA_WITH_AES_256_CBC_SHA256",
# DHE AEAD
0x009e: "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
0x009f: "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384",
0xccaa: "TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
# RSA key exchange (no FS)
0x009c: "TLS_RSA_WITH_AES_128_GCM_SHA256",
0x009d: "TLS_RSA_WITH_AES_256_GCM_SHA384",
0x002f: "TLS_RSA_WITH_AES_128_CBC_SHA",
0x0035: "TLS_RSA_WITH_AES_256_CBC_SHA",
0x003c: "TLS_RSA_WITH_AES_128_CBC_SHA256",
0x003d: "TLS_RSA_WITH_AES_256_CBC_SHA384",
# 3DES (SWEET32)
0x000a: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
0xc012: "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
0x0016: "TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA",
# RC4
0x0004: "TLS_RSA_WITH_RC4_128_MD5",
0x0005: "TLS_RSA_WITH_RC4_128_SHA",
0xc011: "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
0xc007: "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
# NULL
0x0001: "TLS_RSA_WITH_NULL_MD5",
0x0002: "TLS_RSA_WITH_NULL_SHA",
0x003b: "TLS_RSA_WITH_NULL_SHA256",
# EXPORT (FREAK)
0x0008: "TLS_RSA_EXPORT_WITH_DES40_CBC_SHA",
0x0014: "TLS_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
# Anonymous
0x0018: "TLS_DH_anon_WITH_RC4_128_MD5",
0x0034: "TLS_DH_anon_WITH_AES_128_CBC_SHA",
}
# Cipher-suite flag helpers: derived from names.
def cipher_has_fs(name: str) -> bool:
return "ECDHE" in name or "DHE" in name
def cipher_is_aead(name: str) -> bool:
return "GCM" in name or "CHACHA20_POLY1305" in name or "CCM" in name
def cipher_is_weak(name: str) -> bool:
for bad in ("NULL", "EXPORT", "_DES_", "3DES", "RC4", "MD5", "anon"):
if bad in name:
return True
return False
def cipher_is_cbc(name: str) -> bool:
return "_CBC_" in name
def cipher_is_rc4(name: str) -> bool:
return "RC4" in name
def cipher_is_3des(name: str) -> bool:
return "3DES" in name or "_DES_EDE" in name
@@ -1,65 +0,0 @@
from __future__ import annotations
import asyncio
from . import constants as C
from . import wire
from .client import send_client_hello
async def _probe_one_group(host: str, port: int, sni: str | None, tls_version: int, g: int) -> int | None:
cipher_suites = [0x1301, 0x1302, 0x1303]
exts = (
(wire.ext_server_name(sni) if sni and not _is_ip(sni) else b"")
+ wire.ext_supported_versions_client([tls_version])
+ wire.ext_supported_groups([g])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_psk_key_exchange_modes()
+ wire.ext_key_share_empty()
)
try:
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
cipher_suites=cipher_suites,
extensions=exts,
sni=sni,
)
except Exception:
return None
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
return None
if sh.negotiated_version == tls_version:
return g
return None
async def detect_named_groups(host: str, port: int, sni: str | None, tls_version: int = C.TLS_1_3) -> list[int]:
"""Probe which named groups the server accepts for TLS 1.3 key share.
We don't execute a real handshake, we just offer one group at a time and
watch for either a successful ServerHello (with key_share) or HelloRetryRequest
(which tells us the server accepts the version but wants a different group).
Runs probes concurrently with a small cap to avoid connection bursts.
"""
candidate_groups = list(C.NAMED_GROUPS.keys())
sem = asyncio.Semaphore(4)
async def guarded(g: int) -> int | None:
async with sem:
return await _probe_one_group(host, port, sni, tls_version, g)
results = await asyncio.gather(*(guarded(g) for g in candidate_groups))
return [g for g in results if g is not None]
def _is_ip(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@@ -1,376 +0,0 @@
from __future__ import annotations
import asyncio
import socket
import ssl
from dataclasses import dataclass
from typing import Any
from . import constants as C
from . import wire
from .client import send_client_hello
SSL_VERSION_TO_CONST = {
C.TLS_1_0: ssl.TLSVersion.TLSv1,
C.TLS_1_1: ssl.TLSVersion.TLSv1_1,
C.TLS_1_2: ssl.TLSVersion.TLSv1_2,
C.TLS_1_3: ssl.TLSVersion.TLSv1_3,
}
@dataclass
class VersionProbe:
version: int
supported: bool
negotiated_cipher: str | None = None
error: str | None = None
def _make_permissive_context(min_v: ssl.TLSVersion, max_v: ssl.TLSVersion, ciphers: str | None = None) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.minimum_version = min_v
ctx.maximum_version = max_v
except (ValueError, OSError):
pass
if ciphers is not None:
try:
ctx.set_ciphers(ciphers)
except ssl.SSLError:
pass
# Allow legacy server connections (e.g. missing renegotiation_info for TLS 1.0 servers).
try:
ctx.options &= ~ssl.OP_NO_SSLv3
except Exception:
pass
try:
# SECLEVEL=0 allows legacy/weak ciphers for enumeration.
cur_ciphers = ciphers if ciphers else "ALL:@SECLEVEL=0"
ctx.set_ciphers(cur_ciphers)
except ssl.SSLError:
pass
return ctx
async def probe_tls_version(host: str, port: int, sni: str | None, version: int, timeout: float = 8.0) -> VersionProbe:
if version not in SSL_VERSION_TO_CONST:
return VersionProbe(version=version, supported=False, error="unsupported version constant")
tls_ver = SSL_VERSION_TO_CONST[version]
ctx = _make_permissive_context(tls_ver, tls_ver)
loop = asyncio.get_running_loop()
def _do() -> tuple[bool, str | None, str | None]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
try:
cipher = ssock.cipher()
return True, cipher[0] if cipher else None, None
except Exception as e:
return True, None, str(e)
except ssl.SSLError as e:
return False, None, f"{e.__class__.__name__}: {e}"
except (OSError, socket.timeout, asyncio.TimeoutError) as e:
return False, None, f"{e.__class__.__name__}: {e}"
except Exception as e:
return False, None, f"{e.__class__.__name__}: {e}"
ok, cipher, err = await loop.run_in_executor(None, _do)
return VersionProbe(version=version, supported=ok, negotiated_cipher=cipher, error=err)
@dataclass
class CipherProbe:
version: int
cipher_id: int
cipher_name: str
supported: bool
async def probe_cipher(host: str, port: int, sni: str | None, version: int, cipher_id: int, cipher_name: str) -> CipherProbe:
"""Enumerate whether a given cipher suite is accepted under a given TLS version.
For TLS 1.2 and below we use the ssl module with set_ciphers().
For TLS 1.3 we use set_ciphersuites() equivalents via the OpenSSL names.
Falls back to False on any handshake error.
"""
if version == C.TLS_1_3:
# TLS 1.3 only recognizes cipher suites by their IANA name in OpenSSL.
name_map = {
0x1301: "TLS_AES_128_GCM_SHA256",
0x1302: "TLS_AES_256_GCM_SHA384",
0x1303: "TLS_CHACHA20_POLY1305_SHA256",
0x1304: "TLS_AES_128_CCM_SHA256",
0x1305: "TLS_AES_128_CCM_8_SHA256",
}
openssl_name = name_map.get(cipher_id)
if not openssl_name:
return CipherProbe(version, cipher_id, cipher_name, False)
tls_ver = SSL_VERSION_TO_CONST[version]
ctx = _make_permissive_context(tls_ver, tls_ver)
try:
ctx.set_ciphersuites(openssl_name)
except Exception:
return CipherProbe(version, cipher_id, cipher_name, False)
loop = asyncio.get_running_loop()
def _do() -> bool:
try:
with socket.create_connection((host, port), timeout=6.0) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
actual = ssock.cipher()
return bool(actual and actual[0] == openssl_name)
except Exception:
return False
ok = await loop.run_in_executor(None, _do)
return CipherProbe(version, cipher_id, cipher_name, ok)
# For < TLS 1.3: send a ClientHello offering only this suite.
exts = b""
parts = []
if sni:
try:
parts.append(wire.ext_server_name(sni))
except Exception:
pass
parts.append(wire.ext_ec_point_formats())
parts.append(wire.ext_supported_groups([0x001d, 0x0017, 0x0018]))
parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601,
]))
parts.append(wire.ext_renegotiation_info_empty())
exts = b"".join(parts)
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=version,
cipher_suites=[cipher_id],
extensions=exts,
sni=sni,
)
if not res.connected:
return CipherProbe(version, cipher_id, cipher_name, False)
sh = res.server_hello
if sh is None or sh.alert is not None:
return CipherProbe(version, cipher_id, cipher_name, False)
ok = sh.cipher_suite == cipher_id and sh.server_version == version
return CipherProbe(version, cipher_id, cipher_name, ok)
async def _fetch_chain_via_wire(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Pull the full TLS 1.2 Certificate handshake message ourselves and
return the DER-encoded chain. Used as fallback on Python < 3.13 where
``ssl.SSLSocket.get_unverified_chain`` isn't available.
"""
parts = []
if sni:
try:
parts.append(wire.ext_server_name(sni))
except Exception:
pass
parts.append(wire.ext_ec_point_formats())
parts.append(wire.ext_supported_groups([0x001d, 0x0017, 0x0018, 0x0019]))
parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
]))
parts.append(wire.ext_renegotiation_info_empty())
exts = b"".join(parts)
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception:
return []
try:
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[
0xc02f, 0xc030, 0xc02b, 0xc02c,
0xcca8, 0xcca9,
0xc013, 0xc014, 0x009c, 0x009d,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
# Read enough records to cover ServerHello + Certificate; TLS 1.2
# Certificate messages are often fragmented across several records.
buf = bytearray()
deadline = timeout
for _ in range(20):
try:
header = await asyncio.wait_for(reader.readexactly(5), timeout=deadline)
except Exception:
break
rec_len = (header[3] << 8) | header[4]
if rec_len == 0 or rec_len > 1 << 14:
break
try:
body = await asyncio.wait_for(reader.readexactly(rec_len), timeout=deadline)
except Exception:
break
buf.extend(header); buf.extend(body)
# Stop once we've seen a Certificate handshake in the accumulated buffer.
if _has_certificate_message(bytes(buf)):
break
return _extract_certificates_from_records(bytes(buf))
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
def _has_certificate_message(records: bytes) -> bool:
i = 0
hs_buf = bytearray()
while i + 5 <= len(records):
ct = records[i]
rlen = (records[i + 3] << 8) | records[i + 4]
if i + 5 + rlen > len(records):
break
if ct == C.CT_HANDSHAKE:
hs_buf.extend(records[i + 5:i + 5 + rlen])
i += 5 + rlen
j = 0
while j + 4 <= len(hs_buf):
hs_type = hs_buf[j]
hs_len = (hs_buf[j + 1] << 16) | (hs_buf[j + 2] << 8) | hs_buf[j + 3]
if j + 4 + hs_len > len(hs_buf):
return False
if hs_type == C.HS_CERTIFICATE:
return True
j += 4 + hs_len
return False
def _extract_certificates_from_records(records: bytes) -> list[bytes]:
"""Parse TLS 1.2 Certificate handshake message and return the DER chain."""
# 1) Reassemble handshake layer from all handshake-typed records.
i = 0
hs_buf = bytearray()
while i + 5 <= len(records):
ct = records[i]
rlen = (records[i + 3] << 8) | records[i + 4]
if i + 5 + rlen > len(records):
break
if ct == C.CT_HANDSHAKE:
hs_buf.extend(records[i + 5:i + 5 + rlen])
i += 5 + rlen
# 2) Walk handshake messages, find Certificate.
j = 0
while j + 4 <= len(hs_buf):
hs_type = hs_buf[j]
hs_len = (hs_buf[j + 1] << 16) | (hs_buf[j + 2] << 8) | hs_buf[j + 3]
if j + 4 + hs_len > len(hs_buf):
break
if hs_type == C.HS_CERTIFICATE and hs_len >= 3:
body = bytes(hs_buf[j + 4:j + 4 + hs_len])
# TLS 1.2 Certificate: uint24 total_len || [ uint24 cert_len || cert ] *
total = (body[0] << 16) | (body[1] << 8) | body[2]
k = 3
end = 3 + total
chain: list[bytes] = []
while k + 3 <= end:
clen = (body[k] << 16) | (body[k + 1] << 8) | body[k + 2]
k += 3
if k + clen > end:
break
chain.append(body[k:k + clen])
k += clen
return chain
j += 4 + hs_len
return []
async def get_peer_certificate_chain(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Return DER-encoded peer certificate chain.
Tries the ssl module's Python-3.13+ chain APIs first, then falls back to
a raw TLS 1.2 wire probe that parses the Certificate handshake message
directly. Returns leaf first, then intermediates. Empty on failure.
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
loop = asyncio.get_running_loop()
def _do() -> list[bytes]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
leaf = ssock.getpeercert(binary_form=True)
chain: list[bytes] = [leaf] if leaf else []
# Intermediate chain via internal API (CPython 3.13+) if available.
# On older Pythons these attributes are absent — callers will
# fall through to the wire-level fetcher below.
try:
verified = ssock.get_verified_chain() # type: ignore[attr-defined]
if verified:
chain = list(verified)
except (AttributeError, ssl.SSLError):
try:
unverified = ssock.get_unverified_chain() # type: ignore[attr-defined]
if unverified:
chain = list(unverified)
except (AttributeError, ssl.SSLError):
pass
return chain
except Exception:
return []
chain = await loop.run_in_executor(None, _do)
# If the ssl module only gave us a single cert (common on Python < 3.13),
# fetch the full chain via our own TLS wire probe.
if len(chain) <= 1:
try:
wire_chain = await _fetch_chain_via_wire(host, port, sni, timeout=timeout)
except Exception:
wire_chain = []
if len(wire_chain) > len(chain):
chain = wire_chain
return chain
async def alpn_negotiate(host: str, port: int, sni: str | None, alpn_list: list[str], timeout: float = 6.0) -> str | None:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.set_alpn_protocols(alpn_list)
except NotImplementedError:
return None
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
loop = asyncio.get_running_loop()
def _do() -> str | None:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
return ssock.selected_alpn_protocol()
except Exception:
return None
return await loop.run_in_executor(None, _do)
# Note: OCSP stapling detection requires probing the `status_request` TLS
# extension and reading the CertificateStatus handshake message. Python's
# stdlib ssl module does not expose stapled OCSP to Python; the engine checks
# live OCSP/CRL instead (see certs/revocation.py).
@@ -1,260 +0,0 @@
from __future__ import annotations
import os
import struct
from dataclasses import dataclass
from typing import Any
from . import constants as C
def _u8(x: int) -> bytes:
return struct.pack("!B", x)
def _u16(x: int) -> bytes:
return struct.pack("!H", x)
def _u24(x: int) -> bytes:
return struct.pack("!I", x)[1:]
def build_extension(ext_type: int, data: bytes) -> bytes:
return _u16(ext_type) + _u16(len(data)) + data
def ext_server_name(hostname: str) -> bytes:
name = hostname.encode("idna")
entry = _u8(0) + _u16(len(name)) + name
lst = _u16(len(entry)) + entry
return build_extension(C.EXT_SERVER_NAME, lst)
def ext_supported_versions_client(versions: list[int]) -> bytes:
body = _u8(2 * len(versions)) + b"".join(_u16(v) for v in versions)
return build_extension(C.EXT_SUPPORTED_VERSIONS, body)
def ext_supported_groups(groups: list[int]) -> bytes:
body = _u16(2 * len(groups)) + b"".join(_u16(g) for g in groups)
return build_extension(C.EXT_SUPPORTED_GROUPS, body)
def ext_signature_algorithms(algos: list[int]) -> bytes:
body = _u16(2 * len(algos)) + b"".join(_u16(a) for a in algos)
return build_extension(C.EXT_SIGNATURE_ALGORITHMS, body)
def ext_ec_point_formats() -> bytes:
return build_extension(C.EXT_EC_POINT_FORMATS, _u8(1) + _u8(0))
def ext_key_share_empty() -> bytes:
return build_extension(C.EXT_KEY_SHARE, _u16(0))
def ext_psk_key_exchange_modes() -> bytes:
return build_extension(C.EXT_PSK_KEY_EXCHANGE_MODES, _u8(1) + _u8(1))
def ext_alpn(protos: list[bytes]) -> bytes:
inner = b"".join(_u8(len(p)) + p for p in protos)
return build_extension(C.EXT_ALPN, _u16(len(inner)) + inner)
def ext_heartbeat_enabled() -> bytes:
return build_extension(C.EXT_HEARTBEAT, _u8(1))
def ext_renegotiation_info_empty() -> bytes:
return build_extension(C.EXT_RENEGOTIATION_INFO, _u8(0))
def ext_status_request() -> bytes:
# status_type=OCSP(1) + empty responder_id_list + empty extensions
body = _u8(1) + _u16(0) + _u16(0)
return build_extension(C.EXT_STATUS_REQUEST, body)
def ext_signed_cert_timestamp() -> bytes:
return build_extension(C.EXT_SIGNED_CERT_TIMESTAMP, b"")
def ext_extended_master_secret() -> bytes:
return build_extension(C.EXT_EXTENDED_MASTER_SECRET, b"")
def build_client_hello(
record_version: int,
client_hello_version: int,
hostname: str | None,
cipher_suites: list[int],
extensions: bytes = b"",
compression: bytes = b"\x01\x00", # 1 length, null method
) -> bytes:
random_bytes = os.urandom(32)
session_id = b""
cs_bytes = b"".join(_u16(c) for c in cipher_suites)
body = (
_u16(client_hello_version)
+ random_bytes
+ _u8(len(session_id))
+ session_id
+ _u16(len(cs_bytes))
+ cs_bytes
+ compression
)
if extensions:
body += _u16(len(extensions)) + extensions
handshake = _u8(C.HS_CLIENT_HELLO) + _u24(len(body)) + body
record = _u8(C.CT_HANDSHAKE) + _u16(record_version) + _u16(len(handshake)) + handshake
return record
def default_ch_extensions(hostname: str, versions: list[int], groups: list[int] | None = None) -> bytes:
groups = groups or [0x001d, 0x0017, 0x0018]
parts = []
if hostname and not _is_ip_literal(hostname):
parts.append(ext_server_name(hostname))
parts.append(ext_ec_point_formats())
parts.append(ext_supported_groups(groups))
parts.append(ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
]))
parts.append(ext_renegotiation_info_empty())
parts.append(ext_signed_cert_timestamp())
parts.append(ext_status_request())
parts.append(ext_extended_master_secret())
parts.append(ext_alpn([b"h2", b"http/1.1"]))
if C.TLS_1_3 in versions:
parts.append(ext_supported_versions_client(versions))
parts.append(ext_psk_key_exchange_modes())
parts.append(ext_key_share_empty())
exts = b"".join(parts)
return _u16(len(exts)) + exts if False else exts
def _is_ip_literal(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@dataclass
class ParsedServerHello:
record_version: int
server_version: int
cipher_suite: int | None
alert: tuple[int, int] | None
raw_record: bytes
handshake_type: int | None
server_random: bytes | None = None
session_id: bytes | None = None
extensions: dict[int, bytes] | None = None
negotiated_version: int | None = None
key_share_group: int | None = None
def parse_server_response(data: bytes) -> ParsedServerHello | None:
"""Parse the first TLS record. Returns None if insufficient data or garbage."""
if len(data) < 5:
return None
ct = data[0]
rec_ver = (data[1] << 8) | data[2]
rec_len = (data[3] << 8) | data[4]
body = data[5:5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=(body[0], body[1]),
raw_record=data,
handshake_type=None,
)
if ct != C.CT_HANDSHAKE or len(body) < 4:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=None,
raw_record=data,
handshake_type=None,
)
hs_type = body[0]
hs_len = (body[1] << 16) | (body[2] << 8) | body[3]
hs = body[4:4 + hs_len]
if hs_type != C.HS_SERVER_HELLO or len(hs) < 38:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=None,
raw_record=data,
handshake_type=hs_type,
)
server_version = (hs[0] << 8) | hs[1]
server_random = hs[2:34]
sess_len = hs[34]
off = 35 + sess_len
if len(hs) < off + 3:
return None
session_id = hs[35:35 + sess_len]
cs = (hs[off] << 8) | hs[off + 1]
off += 2
# comp method (1 byte)
off += 1
ext_map: dict[int, bytes] = {}
negotiated_version = server_version
key_share_group = None
if off + 2 <= len(hs):
ext_total = (hs[off] << 8) | hs[off + 1]
off += 2
ext_end = off + ext_total
while off + 4 <= ext_end:
et = (hs[off] << 8) | hs[off + 1]
el = (hs[off + 2] << 8) | hs[off + 3]
off += 4
ext_data = hs[off:off + el]
off += el
ext_map[et] = ext_data
if et == C.EXT_SUPPORTED_VERSIONS and len(ext_data) >= 2:
negotiated_version = (ext_data[0] << 8) | ext_data[1]
elif et == C.EXT_KEY_SHARE and len(ext_data) >= 2:
key_share_group = (ext_data[0] << 8) | ext_data[1]
return ParsedServerHello(
record_version=rec_ver,
server_version=server_version,
cipher_suite=cs,
alert=None,
raw_record=data,
handshake_type=hs_type,
server_random=server_random,
session_id=session_id,
extensions=ext_map,
negotiated_version=negotiated_version,
key_share_group=key_share_group,
)
def build_ssl2_client_hello(ciphers: list[int] | None = None) -> bytes:
"""Craft an SSLv2 ClientHello used to detect DROWN / SSLv2 support."""
# SSLv2 cipher specs are 3-byte. A minimal set:
if ciphers is None:
# SSL_CK_RC4_128_WITH_MD5, SSL_CK_RC4_128_EXPORT40_WITH_MD5, SSL_CK_DES_192_EDE3_CBC_WITH_MD5
ciphers = [0x010080, 0x020080, 0x0700c0]
challenge = os.urandom(16)
cipher_bytes = b"".join(struct.pack("!I", c)[1:] for c in ciphers)
body = (
_u8(1) # MSG-CLIENT-HELLO
+ _u16(0x0002) # SSLv2 version
+ _u16(len(cipher_bytes)) # cipher specs length
+ _u16(0) # session-id length
+ _u16(len(challenge)) # challenge length
+ cipher_bytes
+ challenge
)
header = struct.pack("!H", 0x8000 | len(body))
return header + body
@@ -1,54 +0,0 @@
from __future__ import annotations
import ipaddress
from dataclasses import dataclass
from .db import TlsTestDB
HOURLY_LIMIT = 30
HOURLY_WINDOW = 60 * 60
@dataclass
class RateDecision:
allowed: bool
reason: str = ""
retry_after: int = 0
def _is_private(ip: str) -> bool:
try:
a = ipaddress.ip_address(ip)
return a.is_private or a.is_loopback or a.is_link_local
except ValueError:
return False
def check(db: TlsTestDB, client_ip: str | None) -> RateDecision:
if not client_ip:
return RateDecision(allowed=True)
if _is_private(client_ip):
return RateDecision(allowed=True)
if db.count_ip_active(client_ip) > 0:
return RateDecision(
allowed=False,
reason="同一IPから実行中/待機中のテストが既にあります。完了してから再度お試しください。",
retry_after=60,
)
recent = db.count_ip_in_window(client_ip, HOURLY_WINDOW)
if recent >= HOURLY_LIMIT:
return RateDecision(
allowed=False,
reason=f"1時間あたり{HOURLY_LIMIT}件の上限に達しました。しばらく時間をおいて再度お試しください。",
retry_after=HOURLY_WINDOW,
)
return RateDecision(allowed=True)
def client_ip_from_scope(scope) -> str | None:
headers = dict(scope.get("headers", []))
xff = headers.get(b"x-forwarded-for", b"").decode(errors="ignore")
if xff:
return xff.split(",")[0].strip()
client = scope.get("client")
if client and len(client) >= 1:
return str(client[0])
return None
@@ -1,167 +0,0 @@
from __future__ import annotations
import asyncio
import json
import time
import uuid
import logging
from typing import Any, Callable, Awaitable
from fastapi import WebSocket
from .db import TlsTestDB
from .schemas import ScanResult, Finding, ProgressMessage
logger = logging.getLogger("tls_test.runner")
MAX_CONCURRENT = 5
ReportProgress = Callable[[str, str, float, str], Awaitable[None]]
EngineFn = Callable[[str, ReportProgress, Callable[[Finding], Awaitable[None]]], Awaitable[ScanResult]]
class TlsJobQueue:
def __init__(self, db: TlsTestDB, engine: EngineFn):
self.db = db
self.engine = engine
self._queue: asyncio.Queue[tuple[str, str]] = asyncio.Queue()
self._semaphore = asyncio.Semaphore(MAX_CONCURRENT)
self._subscribers: dict[str, set[WebSocket]] = {}
self._seq: dict[str, int] = {}
self._tasks: set[asyncio.Task] = set()
self._dispatcher_task: asyncio.Task | None = None
self._cleanup_task: asyncio.Task | None = None
self._closed = False
async def start(self) -> None:
self._dispatcher_task = asyncio.create_task(self._dispatcher())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self) -> None:
self._closed = True
if self._dispatcher_task:
self._dispatcher_task.cancel()
if self._cleanup_task:
self._cleanup_task.cancel()
for t in list(self._tasks):
t.cancel()
await asyncio.gather(*(t for t in self._tasks), return_exceptions=True)
def submit(self, target: str, client_ip: str | None) -> str:
test_id = str(uuid.uuid4())
self.db.create_job(test_id, target, client_ip)
self._queue.put_nowait((test_id, target))
return test_id
async def _dispatcher(self) -> None:
while not self._closed:
try:
test_id, target = await self._queue.get()
except asyncio.CancelledError:
return
task = asyncio.create_task(self._run_one(test_id, target))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
async def _run_one(self, test_id: str, target: str) -> None:
async with self._semaphore:
await self._broadcast(test_id, {"type": "started", "target": target})
self.db.update_status(test_id, "running", started_at=int(time.time()))
seq_ref = {"n": 0}
async def report_progress(phase: str, detail: str, progress: float, severity: str = "info") -> None:
seq_ref["n"] += 1
self.db.append_progress(test_id, seq_ref["n"], phase, detail, progress, severity)
await self._broadcast(
test_id,
{
"type": "progress",
"phase": phase,
"detail": detail,
"progress": progress,
"severity": severity,
},
)
async def report_finding(f: Finding) -> None:
await self._broadcast(
test_id,
{"type": "finding", "finding": f.to_dict()},
)
try:
result = await self.engine(target, report_progress, report_finding)
payload = result.to_dict()
self.db.update_status(
test_id,
"done",
finished_at=int(time.time()),
rank=result.rank,
score=result.score,
result_json=json.dumps(payload, ensure_ascii=False),
)
await self._broadcast(
test_id,
{
"type": "done",
"redirect": f"/tools/tls-test/results/{test_id}/",
"rank": result.rank,
"score": result.score,
},
)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("tls-test engine failed for %s", target)
self.db.update_status(
test_id,
"error",
finished_at=int(time.time()),
error_message=str(e),
)
await self._broadcast(
test_id,
{"type": "error", "message": str(e)},
)
finally:
await self._close_subscribers(test_id)
async def _cleanup_loop(self) -> None:
while not self._closed:
try:
self.db.delete_expired()
except Exception:
logger.exception("tls-test expired cleanup failed")
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
return
def add_subscriber(self, test_id: str, ws: WebSocket) -> None:
self._subscribers.setdefault(test_id, set()).add(ws)
def remove_subscriber(self, test_id: str, ws: WebSocket) -> None:
subs = self._subscribers.get(test_id)
if subs and ws in subs:
subs.discard(ws)
if not subs:
self._subscribers.pop(test_id, None)
async def _broadcast(self, test_id: str, payload: dict[str, Any]) -> None:
subs = list(self._subscribers.get(test_id, set()))
if not subs:
return
text = json.dumps(payload, ensure_ascii=False)
for ws in subs:
try:
await ws.send_text(text)
except Exception:
self.remove_subscriber(test_id, ws)
async def _close_subscribers(self, test_id: str) -> None:
subs = list(self._subscribers.get(test_id, set()))
for ws in subs:
try:
await ws.close()
except Exception:
pass
self._subscribers.pop(test_id, None)
@@ -1,104 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any
SEVERITIES = ("good", "normal", "notgood", "bad", "serious", "info")
SEVERITY_LABELS = {
"good": "Good",
"normal": "Normal",
"notgood": "Not Good",
"bad": "Bad",
"serious": "Serious",
"info": "Info",
}
SEVERITY_COLORS = {
"good": "bright-green",
"normal": "bright-blue",
"notgood": "bright-yellow",
"bad": "bright-red",
"serious": "bright-purple",
"info": "light-grey-alt",
}
# 4 main groups for the results UI tabs.
GROUPS = ("reliability", "safety", "vulnerabilities", "compatibility")
GROUP_LABELS = {
"reliability": "信頼性",
"safety": "安全性",
"vulnerabilities": "脆弱性",
"compatibility": "互換性",
}
@dataclass
class Finding:
# 'group' is the coarse tab (reliability/safety/vulnerabilities/compatibility).
# 'category' is the fine-grained sub-category label shown as a chip on each finding.
# 'step' is the machine-friendly phase slug that produced this finding
# (e.g. "protocols", "ciphers", "handshake_sim"). Shown in the log as "[step]".
category: str
title: str
detail: str = ""
severity: str = "info"
weight: int = 0
group: str = ""
step: str = ""
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
d["severity_label"] = SEVERITY_LABELS.get(self.severity, self.severity)
d["color"] = SEVERITY_COLORS.get(self.severity, "light-grey")
# impact is what the UI sorts by for the Summary tab.
mul = {"good": 0.0, "normal": 0.0, "notgood": 1.0, "bad": 3.0, "serious": 9.0, "info": 0.0}.get(self.severity, 0.0)
d["impact"] = mul * float(self.weight)
return d
@dataclass
class ScanResult:
target: str
host: str
port: int
started_at: float
finished_at: float = 0.0
findings: list[Finding] = field(default_factory=list)
data: dict[str, Any] = field(default_factory=dict)
rank: str = ""
score: float = 0.0
error: str | None = None
def add(self, f: Finding) -> None:
self.findings.append(f)
def to_dict(self) -> dict[str, Any]:
return {
"target": self.target,
"host": self.host,
"port": self.port,
"started_at": self.started_at,
"finished_at": self.finished_at,
"duration": max(0.0, self.finished_at - self.started_at),
"findings": [f.to_dict() for f in self.findings],
"data": self.data,
"rank": self.rank,
"score": self.score,
"error": self.error,
}
@dataclass
class ProgressMessage:
phase: str
detail: str
progress: float
severity: str = "info"
finding: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
return {
"phase": self.phase,
"detail": self.detail,
"progress": self.progress,
"severity": self.severity,
"finding": self.finding,
}
@@ -1,87 +0,0 @@
from __future__ import annotations
from .schemas import Finding, ScanResult
SEVERITY_MULTIPLIER = {
"good": 0.0,
"normal": 0.0,
"notgood": 1.0,
"bad": 3.0,
"serious": 9.0,
"info": 0.0,
}
# score >= threshold -> rank
RANK_THRESHOLDS: list[tuple[float, str]] = [
(99.0, "SSS"),
(95.0, "SS"),
(90.0, "S"),
(85.0, "A"),
(80.0, "B"),
(75.0, "C"),
(70.0, "D"),
(65.0, "E"),
(60.0, "F"),
(55.0, "G"),
(50.0, "H"),
(45.0, "I"),
(40.0, "J"),
(35.0, "K"),
(30.0, "L"),
(25.0, "M"),
(20.0, "N"),
(15.0, "O"),
(10.0, "P"),
(5.0, "Q"),
]
RANK_COLOR = {
"SSS": "bright-green",
"SS": "bright-green",
"S": "bright-green",
"A": "green",
"B": "green",
"C": "green",
"D": "bright-yellow",
"E": "bright-yellow",
"F": "bright-yellow",
"G": "yellow",
"H": "yellow",
"I": "yellow",
"J": "bright-orange",
"K": "bright-orange",
"L": "bright-orange",
"M": "orange",
"N": "orange",
"O": "bright-red",
"P": "bright-red",
"Q": "red",
"R": "purple",
}
def compute_score(findings: list[Finding]) -> float:
penalty = 0.0
for f in findings:
penalty += SEVERITY_MULTIPLIER.get(f.severity, 0.0) * float(f.weight)
score = 100.0 - penalty
if score < 0:
score = 0.0
if score > 100:
score = 100.0
return round(score, 2)
def rank_from_score(score: float) -> str:
for thr, r in RANK_THRESHOLDS:
if score >= thr:
return r
return "R"
def apply_rank(result: ScanResult) -> None:
if result.error == "no_tls":
result.score = 0.0
result.rank = "R"
return
result.score = compute_score(result.findings)
result.rank = rank_from_score(result.score)
-140
View File
@@ -1,140 +0,0 @@
from __future__ import annotations
import json
import datetime
from fastapi import Request, WebSocket, WebSocketDisconnect
from fastapi.templating import Jinja2Templates
from fastapi.responses import Response
from .engine import validate_tls_target
from .ratelimit import check as ratelimit_check, client_ip_from_scope
def tls_submit(
request: Request,
raw: str,
tls_test_db,
tls_test_queue,
templates: Jinja2Templates,
) -> tuple[str | None, Response | None]:
target = validate_tls_target(raw)
if not target:
return None, templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": "無効なターゲットです。ホスト名/IP(:ポート)を入力してください。", "last_target": raw},
status_code=400,
)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return None, templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": decision.reason, "last_target": raw},
status_code=429,
)
return tls_test_queue.submit(target, client_ip), None
def tls_api_submit(request: Request, raw: str, tls_test_db, tls_test_queue) -> tuple[str | None, tuple[dict, int] | None]:
target = validate_tls_target(raw)
if not target:
return None, ({"error": "invalid target"}, 400)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return None, ({"error": decision.reason}, 429)
return tls_test_queue.submit(target, client_ip), None
def tls_results_context(job: dict, test_id: str, request: Request, tls_test_db) -> dict:
result = job.get("result") or {}
findings = result.get("findings", [])
groups: dict[str, list[dict]] = {"reliability": [], "safety": [], "vulnerabilities": [], "compatibility": [], "other": []}
for f in findings:
groups.setdefault(f.get("group") or "other", []).append(f)
impactful = sorted([f for f in findings if (f.get("impact") or 0) > 0], key=lambda f: f.get("impact", 0), reverse=True)
summary = impactful[:20] + [f for f in findings if f.get("severity") == "good"][:6]
finished_at_display = ""
try:
ts = job.get("finished_at") or job.get("started_at") or 0
if ts:
finished_at_display = datetime.datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
pass
progress_entries = tls_test_db.get_progress(test_id)
findings_by_step: dict[str, list[dict]] = {}
for f in findings:
findings_by_step.setdefault((f.get("step") or "").strip(), []).append(f)
log_entries: list[dict] = []
seen_steps: set[str] = set()
for p in progress_entries:
log_entries.append({"kind": "phase", "phase": p.get("phase") or "", "detail": p.get("detail") or "", "severity": p.get("severity") or "info"})
step = (p.get("phase") or "").strip()
if step and step not in seen_steps:
log_entries += [{"kind": "finding", "finding": f} for f in findings_by_step.get(step, [])]
seen_steps.add(step)
for step, fs in findings_by_step.items():
if step not in seen_steps:
log_entries += [{"kind": "finding", "finding": f} for f in fs]
target = (result.get("target") or job.get("target") or "").strip()
history = tls_test_db.get_history_by_target(target, exclude_id=test_id, limit=10) if target else []
for h in history:
try:
ts = h.get("finished_at") or h.get("created_at") or 0
h["finished_at_display"] = datetime.datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%dT%H:%M:%S") if ts else ""
except Exception:
h["finished_at_display"] = ""
return {
"test_id": test_id,
"job": job,
"result": result,
"groups": groups,
"summary": summary,
"group_labels": {"reliability": "信頼性", "safety": "安全性", "vulnerabilities": "脆弱性", "compatibility": "互換性", "other": "その他"},
"finished_at_display": finished_at_display,
"share_url": str(request.url).split("#", 1)[0],
"log_entries": log_entries,
"history": history,
}
async def tls_websocket_handler(websocket: WebSocket, test_id: str, tls_test_db, tls_test_queue) -> None:
job = tls_test_db.get_job(test_id)
if not job:
await websocket.close(code=4404)
return
await websocket.accept()
tls_test_queue.add_subscriber(test_id, websocket)
try:
await websocket.send_text(json.dumps({
"type": "history",
"status": job.get("status"),
"target": job.get("target"),
"entries": tls_test_db.get_progress(test_id),
}))
if job.get("status") == "done":
await websocket.send_text(json.dumps({
"type": "done",
"redirect": f"/tools/tls-test/results/{test_id}/",
"rank": job.get("rank"),
"score": job.get("score"),
}))
await websocket.close()
return
while True:
try:
await websocket.receive_text()
except WebSocketDisconnect:
break
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
tls_test_queue.remove_subscriber(test_id, websocket)
@@ -1,83 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
from ..protocol import constants as C
from ..protocol.constants import (
cipher_is_3des,
cipher_is_rc4,
cipher_is_weak,
cipher_is_cbc,
cipher_has_fs,
cipher_is_aead,
)
@dataclass
class VulnPassiveResult:
poodle_ssl: bool = False
freak: bool = False
drown: bool = False
logjam_export: bool = False
sweet32: bool = False
rc4: bool = False
beast: bool = False
lucky13: bool = False
null_cipher: bool = False
anon_cipher: bool = False
export_cipher: bool = False
secure_renego_supported: bool | None = None
fallback_scsv_supported: bool | None = None
issues: list[str] = field(default_factory=list)
def analyze(
versions_supported: set[int],
accepted_ciphers_per_version: dict[int, list[int]],
) -> VulnPassiveResult:
r = VulnPassiveResult()
# DROWN: any SSLv2 support is a direct DROWN exposure.
if C.SSL_2_0 in versions_supported:
r.drown = True
# POODLE (SSL 3.0): SSL 3.0 supported with any CBC cipher.
if C.SSL_3_0 in versions_supported:
ciphers = accepted_ciphers_per_version.get(C.SSL_3_0, [])
for cid in ciphers:
name = C.CIPHER_SUITES.get(cid, "")
if cipher_is_cbc(name):
r.poodle_ssl = True
break
# BEAST (TLS 1.0): TLS 1.0 supported with CBC ciphers.
if C.TLS_1_0 in versions_supported:
ciphers = accepted_ciphers_per_version.get(C.TLS_1_0, [])
for cid in ciphers:
name = C.CIPHER_SUITES.get(cid, "")
if cipher_is_cbc(name):
r.beast = True
break
# Sweep all ciphers across all versions for cipher-family flags.
for ver, cids in accepted_ciphers_per_version.items():
for cid in cids:
name = C.CIPHER_SUITES.get(cid, "")
if not name:
continue
if cipher_is_3des(name):
r.sweet32 = True
if cipher_is_rc4(name):
r.rc4 = True
if "NULL" in name:
r.null_cipher = True
if "anon" in name:
r.anon_cipher = True
if "EXPORT" in name:
r.export_cipher = True
r.freak = True
if "DHE" in name:
r.logjam_export = True
if cipher_is_cbc(name) and ver in (C.TLS_1_0, C.TLS_1_1, C.TLS_1_2):
r.lucky13 = True
return r
@@ -1,80 +0,0 @@
from __future__ import annotations
import asyncio
import struct
from ..protocol import constants as C
from ..protocol import wire
async def probe(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""CVE-2014-0224 CCS Injection probe.
Send ChangeCipherSpec immediately after ClientHello. A vulnerable server
will accept and not send an Unexpected Message alert.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc013, 0xc014, 0x002f, 0x0035, 0x000a],
extensions=exts,
)
writer.write(ch)
# Send early ChangeCipherSpec
ccs_record = bytes([C.CT_CHANGE_CIPHER_SPEC]) + struct.pack("!H", C.TLS_1_2) + struct.pack("!H", 1) + b"\x01"
writer.write(ccs_record)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
data = b""
if not data:
return False, "no response (likely dropped connection)"
alerts = _find_alerts(data)
# Any fatal alert (level=2) indicates the server rejected the early CCS: NOT vulnerable.
if any(lv == 2 for lv, _desc in alerts):
return False, "server sent fatal alert (rejected early CCS)"
# Explicit unexpected_message alert (10): NOT vulnerable.
if any(desc == 10 for _lv, desc in alerts):
return False, "server rejected with unexpected_message alert"
# Otherwise: no definitive signal (server may have ignored CCS or handshake still pending).
# Conservative default: not vulnerable. True detection requires a complete handshake exchange.
return False, "no definitive signal of CCS acceptance"
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
def _find_alerts(data: bytes) -> list[tuple[int, int]]:
alerts: list[tuple[int, int]] = []
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
body = data[i + 5:i + 5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2:
alerts.append((body[0], body[1]))
i += 5 + rec_len
if rec_len == 0:
break
return alerts
@@ -1,86 +0,0 @@
from __future__ import annotations
import asyncio
import os
import struct
from dataclasses import dataclass
from ..protocol import constants as C
from ..protocol import wire
@dataclass
class HeartbleedResult:
vulnerable: bool
heartbeat_extension_advertised: bool
error: str | None = None
def _build_heartbeat_record(version: int = C.TLS_1_2, claimed_length: int = 0x4000) -> bytes:
payload = b"\x01" + struct.pack("!H", claimed_length) + b"X" + b"\x00" * 16
return bytes([C.CT_HEARTBEAT]) + struct.pack("!H", version) + struct.pack("!H", len(payload)) + payload
async def probe(host: str, port: int, sni: str | None, timeout: float = 8.0) -> HeartbleedResult:
"""Send TLS 1.2 ClientHello with heartbeat extension, then a malformed heartbeat.
If the server replies with a heartbeat response longer than what we sent
(or any response to the malformed message) it's likely vulnerable.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return HeartbleedResult(False, False, error=f"connect: {e}")
try:
ext_parts = [
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b""),
wire.ext_ec_point_formats(),
wire.ext_supported_groups([0x001d, 0x0017, 0x0018]),
wire.ext_signature_algorithms([0x0403, 0x0804, 0x0401]),
wire.ext_heartbeat_enabled(),
wire.ext_renegotiation_info_empty(),
]
exts = b"".join(ext_parts)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc02f, 0xc030, 0x009c, 0x009d, 0x002f, 0x0035, 0x000a],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = b""
try:
data = await asyncio.wait_for(reader.read(65536), timeout=timeout)
except Exception:
pass
heartbeat_ext = False
if data:
parsed = wire.parse_server_response(data)
if parsed and parsed.extensions and C.EXT_HEARTBEAT in parsed.extensions:
heartbeat_ext = True
writer.write(_build_heartbeat_record())
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
reply = await asyncio.wait_for(reader.read(65536), timeout=3.0)
except Exception:
reply = b""
vulnerable = False
if reply and reply[0] == C.CT_HEARTBEAT and len(reply) > 8:
vulnerable = True
return HeartbleedResult(vulnerable=vulnerable, heartbeat_extension_advertised=heartbeat_ext)
except Exception as e:
return HeartbleedResult(False, False, error=f"{e.__class__.__name__}: {e}")
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
@@ -1,208 +0,0 @@
from __future__ import annotations
import asyncio
from ..protocol import constants as C
from ..protocol import wire
async def _read_records(reader: asyncio.StreamReader, max_records: int = 8, timeout: float = 6.0) -> bytes:
"""Read up to `max_records` complete TLS records from the stream.
A simple record-layer reader that guarantees we pull entire records
instead of relying on whatever arrives in the first TCP chunk. Records
that straddle packet boundaries (common for ServerHello+Certificate
responses in TLS 1.2) are reassembled.
"""
buf = bytearray()
for _ in range(max_records):
try:
header = await asyncio.wait_for(reader.readexactly(5), timeout=timeout)
except (asyncio.IncompleteReadError, asyncio.TimeoutError, ConnectionError):
break
except Exception:
break
rec_len = (header[3] << 8) | header[4]
body = b""
if rec_len:
try:
body = await asyncio.wait_for(reader.readexactly(rec_len), timeout=timeout)
except (asyncio.IncompleteReadError, asyncio.TimeoutError, ConnectionError):
buf.extend(header)
break
except Exception:
buf.extend(header)
break
buf.extend(header)
buf.extend(body)
ct = header[0]
# A fatal alert or end of useful data — stop.
if ct == C.CT_ALERT and len(body) >= 2 and body[0] == 2:
break
return bytes(buf)
def _iter_records(data: bytes):
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
if i + 5 + rec_len > len(data):
break
yield ct, data[i + 5:i + 5 + rec_len]
i += 5 + rec_len
if rec_len == 0:
break
async def probe_secure_renegotiation(host: str, port: int, sni: str | None, timeout: float = 8.0) -> tuple[bool, str | None]:
"""Check whether the server acknowledges RFC 5746 (Secure Renegotiation).
Behavior:
- Send a ClientHello that offers only TLS 1.2 (no supported_versions extension,
so TLS 1.3 servers will downgrade to 1.2 for this probe) and includes
`renegotiation_info` (empty) per RFC 5746.
- A server that supports RFC 5746 MUST echo the `renegotiation_info`
extension in its ServerHello.
- If the server *only* supports TLS 1.3 (e.g., modern fronts that refuse
TLS 1.2), RFC 5746 does not apply — TLS 1.3 removed renegotiation
entirely and is treated as "supported by way of elimination".
Returns (supported, info_string).
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018, 0x0019])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_renegotiation_info_empty()
+ wire.ext_extended_master_secret()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[
0xc02f, 0xc030, 0xc02b, 0xc02c,
0xcca8, 0xcca9,
0xc013, 0xc014, 0x009c, 0x009d,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = await _read_records(reader, max_records=3, timeout=timeout)
if not data:
return False, "no response"
parsed = wire.parse_server_response(data)
if parsed is None or parsed.handshake_type != C.HS_SERVER_HELLO:
# If the server returned only an alert on a TLS 1.2 ClientHello we can't
# observe renegotiation_info; report unknown-leaning-negative.
if parsed and parsed.alert is not None:
return False, f"server alert (level={parsed.alert[0]}, desc={parsed.alert[1]})"
return False, "no server hello"
# TLS 1.3 removed renegotiation entirely (RFC 8446 §4.1.2). A server that
# negotiates TLS 1.3 here is safe-by-default.
neg_ver = parsed.negotiated_version or parsed.server_version
if neg_ver == C.TLS_1_3:
return True, "TLS 1.3 (renegotiation obsolete)"
if parsed.extensions and C.EXT_RENEGOTIATION_INFO in parsed.extensions:
return True, None
return False, "no renegotiation_info extension echoed"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: float = 8.0) -> tuple[bool, str | None]:
"""Check whether the server enforces TLS_FALLBACK_SCSV (RFC 7507).
Strategy:
- Send a ClientHello with legacy_version=TLS 1.1, cipher list containing
TLS_FALLBACK_SCSV (0x5600), and NO supported_versions extension.
- If the server's highest supported version is > TLS 1.1, it MUST respond
with fatal alert `inappropriate_fallback` (86) per RFC 7507.
- If the server genuinely has TLS 1.1 as its max (rare today), it will
complete the handshake normally — in that case SCSV enforcement is
moot. We treat that as `n/a` rather than `not enforced`.
Returns (enforced_or_not_applicable, info_string).
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([
0x0403, 0x0401, 0x0501, 0x0601,
])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_1,
hostname=sni,
cipher_suites=[
0x5600, # TLS_FALLBACK_SCSV
0xc013, 0xc014, 0xc009, 0xc00a,
0x002f, 0x0035, 0x000a,
],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = await _read_records(reader, max_records=3, timeout=timeout)
if not data:
# A fatal alert may cause the server to abort the TCP connection
# before the alert record is flushed on some stacks. Treat as
# unknown-leaning-negative, but also probe once more without SCSV
# to distinguish from "server totally broken".
return False, "no response (possibly RST)"
saw_server_hello = False
negotiated_version: int | None = None
for ct, body in _iter_records(data):
if ct == C.CT_ALERT and len(body) >= 2:
level, desc = body[0], body[1]
if desc == 86:
return True, None
# Some servers reply with handshake_failure(40) / protocol_version(70)
# when downgrade is refused; treat as enforcement.
if desc in (70,) and level == 2:
return True, f"alert protocol_version({desc})"
if ct == C.CT_HANDSHAKE and len(body) >= 4 and body[0] == C.HS_SERVER_HELLO:
saw_server_hello = True
if len(body) >= 6:
negotiated_version = (body[4] << 8) | body[5]
if saw_server_hello:
if negotiated_version == C.TLS_1_1:
# Server genuinely maxes out at TLS 1.1 → SCSV is not applicable.
return True, "server max = TLS 1.1 (SCSV not applicable)"
return False, "server completed handshake without inappropriate_fallback"
return False, "no inappropriate_fallback alert"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass