This commit is contained in:
2026-04-19 11:33:11 +09:00
parent 867ae25fa0
commit da8d91b87f
43 changed files with 4044 additions and 15 deletions
+1
View File
@@ -1,5 +1,6 @@
# Database files
databases/*.db
databases/hsts_preload_cache/*
# Log files
logs/*.log
+2 -2
View File
@@ -36,7 +36,7 @@
<meta name="twitter:card" content="summary_large_image">
<meta name="twitter:title" content="{{ self.title() }}">
<meta name="twitter:description" content="{{ self.description() }}">
<meta name="twitter:image" content="https://nercone.dev/assets/images/thumbnails/{{ request.url.path.strip('/') or 'index' }}?title={% block thumbnail_title %}{{ self.title() | re_sub(' - Nercone.*$', '') | urlencode }}{% endblock %}&description={% block thumbnail_description %}{{ self.description() | urlencode }}{% endblock %}&template={{ self.thumbnail_template() }}">
<meta name="twitter:image" content="https://nercone.dev/assets/images/thumbnails/{{ request.url.path.strip('/') or 'index' }}?title={{ self.thumbnail_title() }}&description={{ self.thumbnail_description() }}&template={{ self.thumbnail_template() }}">
<!-- PWA (Progressive Web App) -->
<link rel="manifest" href="/site.webmanifest">
<link rel="apple-touch-icon" href="/assets/images/favicon.png" sizes="1200x1200">
@@ -73,7 +73,7 @@
{% block footer_links %}
<a href="/about/" class="text-no-decoration bold-on-small">about</a>
<a href="/links/" class="text-no-decoration bold-on-small">links</a>
<a href="/blog/" class="text-no-decoration bold-on-small">blog</a>
<a href="/tools/" class="text-no-decoration bold-on-small">tools</a>
<a href="/projects/" class="text-no-decoration hide show-on-large">projects</a>
<a href="{{ onion_site_url }}" class="text-no-decoration hide show-on-large">onion</a>
<a href="mailto:nercone@nercone.dev" class="text-no-decoration hide show-on-large">email</a>
+8
View File
@@ -12,6 +12,14 @@
<loc>https://nercone.dev/links/</loc>
<priority>0.9</priority>
</url>
<url>
<loc>https://nercone.dev/tools/</loc>
<priority>0.9</priority>
</url>
<url>
<loc>https://nercone.dev/tools/tls-test/</loc>
<priority>0.8</priority>
</url>
<url>
<loc>https://nercone.dev/download-banner/</loc>
<priority>0.8</priority>
+14
View File
@@ -0,0 +1,14 @@
{% extends "/base.html" %}
{% block title %}Tools - Nercone{% endblock %}
{% block title_suffix %}Tools{% endblock %}
{% block header_desc %}nercone.dev上で使用可能なツールたち{% endblock %}
{% block description %}nercone.dev上で使用可能なツールたち{% endblock %}
{% block content %}
<a href="/tools/tls-test/" class="text-no-decoration font-bold">
<div id="tls-test" class="block">
<h3 style="margin-bottom: 0px;">Nercone TLS Test</h3>
<p style="margin-top: 0px;">仕様書っぽいやつ以外ほぼ全部Claudeに作らせたバグ多めで評価厳しめのTLSサーバーテストツール</p>
<span class="flex font-small text-tx-alt">tls-test</span>
</div>
</a>
{% endblock %}
+125
View File
@@ -0,0 +1,125 @@
.tls-form {
display: flex;
flex-direction: column;
gap: 8px;
margin: 12px 0;
}
.tls-input {
background-color: #202020;
color: #E0E0E0;
border: 1px solid #3a3a3a;
border-radius: 6px;
padding: 10px 12px;
font-family: inherit;
font-size: inherit;
}
.tls-input:focus {
outline: none;
border-color: #00C0FF;
}
.tls-submit {
align-self: flex-start;
background-color: #202020;
color: #E0E0E0;
border: 1px solid #3a3a3a;
border-radius: 6px;
padding: 10px 20px;
cursor: pointer;
font-family: inherit;
}
.tls-submit:hover {
border-color: #00C878;
color: #00C878;
}
.tls-progress-track {
height: 6px;
background-color: #202020;
border-radius: 3px;
overflow: hidden;
margin: 12px 0;
}
.tls-progress-bar {
height: 100%;
background-color: #00C878;
transition: width 0.3s ease;
}
.tls-log {
max-height: 400px;
overflow-y: auto;
display: flex;
flex-direction: column;
gap: 4px;
font-family: "MesloLGS NF", "Menlo", "Consolas", monospace;
font-size: 10pt;
}
.tls-log-row {
padding: 2px 0;
}
.tls-summary {
display: flex;
align-items: center;
gap: 24px;
flex-wrap: wrap;
}
.tls-rank-badge {
width: 120px;
height: 120px;
border-radius: 50%;
background-color: #202020;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
border: 4px solid currentColor;
flex-shrink: 0;
}
.tls-rank-letters {
font-size: 48pt;
font-weight: 700;
line-height: 1;
}
.tls-rank-score {
font-size: 10pt;
color: #939393;
margin-top: 4px;
}
.tls-summary-meta {
min-width: 200px;
}
.tls-target {
margin: 0 0 8px 0;
word-break: break-all;
}
.tls-finding {
margin: 4px 0;
line-height: 1.5;
}
.tls-table {
width: 100%;
border-collapse: collapse;
margin-top: 8px;
}
.tls-table th, .tls-table td {
border-bottom: 1px solid #3a3a3a;
padding: 6px 8px;
text-align: left;
}
.tls-table th {
color: #939393;
font-weight: 400;
font-size: 10pt;
}
.tls-raw {
max-height: 400px;
overflow: auto;
background-color: #202020;
padding: 12px;
border-radius: 6px;
font-size: 10pt;
}
+118
View File
@@ -0,0 +1,118 @@
(function () {
const init = window.__TLS_INIT__;
if (!init) return;
const phaseEl = document.getElementById("tls-phase");
const barEl = document.getElementById("tls-progress-bar");
const logEl = document.getElementById("tls-log");
const SEV_COLOR = {
good: "bright-green",
normal: "bright-yellow",
notgood: "bright-orange",
bad: "bright-red",
serious: "magenta",
info: "tx",
};
let reconnectAttempts = 0;
const MAX_RECONNECTS = 3;
let ws = null;
let closedByDone = false;
function appendLog(phase, detail, severity) {
if (!logEl) return;
const row = document.createElement("div");
row.className = "tls-log-row";
const label = document.createElement("span");
label.className = "text-tx-alt font-small";
label.textContent = `[${phase}] `;
const msg = document.createElement("span");
msg.className = `text-${SEV_COLOR[severity] || "tx"}`;
msg.textContent = detail || "";
row.appendChild(label);
row.appendChild(msg);
logEl.appendChild(row);
logEl.scrollTop = logEl.scrollHeight;
}
function setProgress(value, phase) {
if (barEl) barEl.style.width = `${Math.max(0, Math.min(1, value)) * 100}%`;
if (phaseEl && phase) phaseEl.textContent = phase;
}
function connect() {
try {
ws = new WebSocket(init.wsUrl);
} catch (e) {
scheduleReconnect();
return;
}
ws.onmessage = (ev) => {
let msg;
try {
msg = JSON.parse(ev.data);
} catch (_) {
return;
}
if (msg.type === "history") {
(msg.entries || []).forEach((e) => appendLog(e.phase, e.detail, e.severity));
if (msg.status === "done") {
closedByDone = true;
location.replace(init.resultsUrl);
}
return;
}
if (msg.type === "progress") {
appendLog(msg.phase, msg.detail, msg.severity);
if (typeof msg.progress === "number") {
setProgress(msg.progress, msg.detail || msg.phase);
}
return;
}
if (msg.type === "finding") {
const f = msg.finding || {};
appendLog(f.category || "finding", `${f.severity_label || ""} ${f.title || ""} ${f.detail ? "— " + f.detail : ""}`.trim(), f.severity || "info");
return;
}
if (msg.type === "done") {
closedByDone = true;
setProgress(1.0, `done (rank ${msg.rank}, score ${msg.score})`);
location.replace(msg.redirect || init.resultsUrl);
return;
}
if (msg.type === "error") {
appendLog("error", msg.message || "engine failed", "serious");
return;
}
if (msg.type === "started") {
appendLog("started", msg.target || "", "info");
return;
}
};
ws.onclose = () => {
if (!closedByDone) scheduleReconnect();
};
ws.onerror = () => {
try { ws.close(); } catch (_) {}
};
}
function scheduleReconnect() {
if (closedByDone) return;
if (reconnectAttempts >= MAX_RECONNECTS) {
appendLog("ws", "WebSocket 接続が切断されました。ページをリロードしてください。", "bad");
return;
}
const delay = Math.min(10_000, 1000 * Math.pow(2, reconnectAttempts));
reconnectAttempts += 1;
setTimeout(connect, delay);
}
window.addEventListener("beforeunload", () => {
closedByDone = true;
try { ws && ws.close(); } catch (_) {}
});
connect();
})();
+41
View File
@@ -0,0 +1,41 @@
{% extends "/base.html" %}
{% block title %}Nercone TLS Test{% endblock %}
{% block title_suffix %}TLS Test{% endblock %}
{% block description %}任意のホストに対して TLS/SSL 設定の詳細チェックとランク付けを行います。{% endblock %}
{% block header_desc %}ただのTLS/SSL設定分析サービス{% endblock %}
{% block extra_head %}
<link rel="stylesheet" href="/tools/tls-test/assets/tls-test.css">
{% endblock %}
{% block content %}
<div class="block">
<h1>TLS Test</h1>
<p>任意のホストに対してTLS/SSL設定の詳細チェックを行い、SSS〜Rの21段階でランク付けします。</p>
<form method="POST" action="/tools/tls-test/" class="tls-form">
<label for="target" class="text-tx-alt font-small">対象ホスト (例: example.com / 192.0.2.1 / [2001:db8::1]:8443)</label>
<input type="text" id="target" name="target" required autocomplete="off" spellcheck="false"
value="{{ last_target or '' }}"
placeholder="example.com" class="tls-input">
<button type="submit" class="tls-submit">Start scan</button>
{% if error %}
<p class="text-bright-red font-small">{{ error }}</p>
{% endif %}
</form>
<p class="font-small text-tx-alt">プロトコルバージョン・暗号スイート・証明書・HSTS・CAA・HTTP/1-2-3 対応・主要脆弱性・ハンドシェイクシミュレーションを検査します。</p>
<p class="font-small text-tx-alt">IP 直接入力の場合、SNI/証明書名の一致を判定できないため減点対象となることがあります。</p>
</div>
<div class="block">
<h2>利用上の注意</h2>
<ul>
<li>このツールはとりあえず動けば良いやと、Claudeの性能チェックも兼ねて、仕様書っぽいやつ以外ほぼ丸ごと作らせたため、バグがいくつかあると思います。レイアウトがあまり良くない箇所とかもすでに何箇所か発見しています。今後改善予定です。</li>
<li>このツールは指定されたホストに対して<b>実際に TLS 接続を行います</b>。第三者のサーバーに対するスキャンは、対象サーバーの利用規約や法律を遵守した上で行ってください。</li>
<li>レート制限: 同一 IP から同時実行は 1 件、1 時間あたり 10 件までです。</li>
<li>結果は7日間保持されます。テストIDを知っている人は同じ結果を閲覧できます。</li>
<li>Nercone TLS Testサービスが使用するUser-Agent文字列は<code>nercone-tls-test/1.0</code>です。</li>
</ul>
</div>
<div class="block">
<h2>API</h2>
<p>同等の機能を JSON API として提供しています。</p>
<pre>POST /api/tools/tls-test/scan<br>Content-Type: application/json<br>{"target": "example.com"}</pre>
</div>
{% endblock %}
+100
View File
@@ -0,0 +1,100 @@
{% extends "/base.html" %}
{% block title %}{{ result.target }} (ランク{{ result.rank }}) - Nercone TLS Test{% endblock %}
{% block title_suffix %}TLS Test{% endblock %}
{% block description %}TLS Test の結果ページです。対象: {{ result.target }} / ランク: {{ result.rank }}。{% endblock %}
{% block header_desc %}Results{% endblock %}
{% block extra_head %}
<link rel="stylesheet" href="/tools/tls-test/assets/tls-test.css">
{% endblock %}
{% block content %}
{% set rank_color_map = {
'SSS': 'bright-green', 'SS': 'bright-green', 'S': 'bright-green',
'A': 'green', 'B': 'green', 'C': 'green',
'D': 'bright-yellow', 'E': 'bright-yellow', 'F': 'bright-yellow',
'G': 'yellow', 'H': 'yellow', 'I': 'yellow',
'J': 'bright-orange', 'K': 'bright-orange', 'L': 'bright-orange',
'M': 'orange', 'N': 'orange',
'O': 'bright-red', 'P': 'bright-red',
'Q': 'red', 'R': 'purple'
} %}
{% set rank_color = rank_color_map.get(result.rank or job.rank, 'tx') %}
<div class="block tls-summary">
<div class="tls-rank-badge text-{{ rank_color }}">
<span class="tls-rank-letters">{{ result.rank or job.rank or '?' }}</span>
<span class="tls-rank-score">{{ "%.2f"|format(result.score or job.score or 0) }}</span>
</div>
<div class="tls-summary-meta flex-1">
<h1 class="tls-target">{{ result.target or job.target }}</h1>
<p class="font-small text-tx-alt">host={{ result.host }} port={{ result.port }}</p>
<p class="font-small text-tx-alt">実施: {{ job.started_at | default("") }} / 所要時間: {{ "%.2f"|format(result.duration or 0) }}秒 / テストID: <code>{{ test_id }}</code></p>
<p class="font-small text-tx-alt">結果は7日間保持され、その後自動的に削除されます。</p>
{% if result.error %}
<p class="text-bright-red">{{ result.error }}</p>
{% endif %}
</div>
</div>
{% set category_titles = {
'protocol': 'SSL/TLS Versions',
'cipher': 'Cipher Suites',
'kex': 'Key Exchange',
'cert': 'Certificate',
'trust': 'Trust Stores',
'hsts': 'HSTS / Preload',
'caa': 'CAA',
'http': 'HTTP',
'vuln': 'Vulnerabilities',
'compat': 'Client Compatibility',
'connectivity': 'Connectivity',
'engine': 'Engine'
} %}
{% for cat_key, findings in categories.items() %}
<div class="block">
<h2>{{ category_titles.get(cat_key, cat_key|capitalize) }}</h2>
{% for f in findings %}
<p class="tls-finding">
<span class="text-{{ f.color }}">[{{ f.severity_label }}]</span>
<span class="font-bold">{{ f.title }}</span>
{% if f.detail %}<span class="text-tx-alt font-small">— {{ f.detail }}</span>{% endif %}
</p>
{% endfor %}
</div>
{% endfor %}
{% if result.data and result.data.handshake_simulation %}
<div class="block">
<h2>Handshake Simulation</h2>
<table class="tls-table">
<thead>
<tr><th>Client</th><th>Version</th><th>Cipher</th></tr>
</thead>
<tbody>
{% for s in result.data.handshake_simulation %}
<tr>
<td>{{ s.client }}</td>
<td>
{% if s.connected %}
<span class="text-bright-green">{{ s.negotiated_version }}</span>
{% else %}
<span class="text-bright-red">failed</span>
{% endif %}
</td>
<td class="font-small text-tx-alt">
{% if s.connected %}{{ s.negotiated_cipher }}{% else %}{{ s.error }}{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<div class="block">
<h2>Raw JSON</h2>
<p class="font-small text-tx-alt">API でも同じデータを取得できます: <code>GET /api/tools/tls-test/results/{{ test_id }}</code></p>
<details>
<summary>Show raw result JSON</summary>
<pre class="tls-raw"><code>{{ result | tojson(indent=2) }}</pre>
</details>
</div>
{% endblock %}
+39
View File
@@ -0,0 +1,39 @@
{% extends "/base.html" %}
{% block title %}Scanning {{ target }} - Nercone TLS Test{% endblock %}
{% block title_suffix %}TLS Test{% endblock %}
{% block description %}スキャンの進捗を表示しています。{% endblock %}
{% block header_desc %}スキャン中です...{% endblock %}
{% block extra_head %}
<link rel="stylesheet" href="/tools/tls-test/assets/tls-test.css">
{% endblock %}
{% block content %}
<div class="block">
<h1>Scanning <span class="text-bright-blue">{{ target }}</span></h1>
<p class="font-small text-tx-alt">Test ID: <code>{{ test_id }}</code></p>
<div class="tls-progress-track">
<div id="tls-progress-bar" class="tls-progress-bar" style="width: 0%"></div>
</div>
<p id="tls-phase" class="font-small text-tx-alt">waiting for queue…</p>
</div>
<div class="block">
<h2>Log</h2>
<div id="tls-log" class="tls-log">
{% for entry in progress_entries %}
<div class="tls-log-row">
<span class="text-tx-alt font-small">[{{ entry.phase }}]</span>
<span class="text-{{ {'good':'bright-green','normal':'bright-yellow','notgood':'bright-orange','bad':'bright-red','serious':'magenta','info':'tx'}.get(entry.severity, 'tx') }}">{{ entry.detail }}</span>
</div>
{% endfor %}
</div>
</div>
<script>
window.__TLS_INIT__ = {
id: "{{ test_id }}",
target: "{{ target }}",
status: "{{ status }}",
wsUrl: (location.protocol === "https:" ? "wss://" : "ws://") + location.host + "/tools/tls-test/ws/{{ test_id }}",
resultsUrl: "/tools/tls-test/results/{{ test_id }}/",
};
</script>
<script src="/tools/tls-test/assets/tls-test.js" defer></script>
{% endblock %}
+9 -1
View File
@@ -17,7 +17,15 @@ dependencies = [
"websockets",
"jinja2",
"fastapi",
"uvicorn[standard]"
"uvicorn[standard]",
"cryptography",
"dnspython",
"aioquic",
"idna",
"pyasn1",
"pyasn1-modules",
"certifi",
"python-multipart"
]
[project.scripts]
+14 -10
View File
@@ -19,13 +19,23 @@ class Middleware:
await self.app(scope, receive, send)
return
if scope["type"] == "websocket":
await self.app(scope, receive, send)
return
headers = dict(scope.get("headers", []))
hostname = headers.get(b"host", b"").decode().split(":")[0].strip()
hostname_parts = hostname.split(".")
if hostname_parts[1:] == ["localhost"]:
subdomain = ".".join(hostname_parts[:-1])
else:
subdomain = ".".join(hostname_parts[:-2])
if scope["type"] == "websocket":
if subdomain not in ["", "www"]:
original_path = scope["path"] if scope["path"].strip() else "/"
subdomain_path = f"/{'/'.join(subdomain.split('.')[::-1])}{original_path}"
scope = dict(scope, path=subdomain_path)
await self.app(scope, receive, send)
return
scope["log"] = log_access(scope)
if not any([hostname.endswith(candidate) for candidate in hostnames]):
@@ -33,12 +43,6 @@ class Middleware:
await self._send(response, scope, receive, send)
return
hostname_parts = hostname.split(".")
if hostname_parts[1:] == ["localhost"]:
subdomain = ".".join(hostname_parts[:-1])
else:
subdomain = ".".join(hostname_parts[:-2])
body = await self._read_body(receive)
async def cached_receive():
return {"type": "http.request", "body": body, "more_body": False}
+186 -2
View File
@@ -10,15 +10,30 @@ from pathlib import Path
from bs4 import BeautifulSoup
from markitdown import MarkItDown
from datetime import datetime, timezone
from fastapi import FastAPI, Request, Response
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Response, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.templating import Jinja2Templates
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse, RedirectResponse
from jinja2.exceptions import TemplateNotFound
from .error import error_page
from .database import AccessCounter
from .middleware import Middleware, server_version, onion_hostname
from .tools.tls_test import TlsJobQueue, TlsTestDB
from .tools.tls_test.engine import run_full_scan, parse_target
from .tools.tls_test.ratelimit import check as ratelimit_check, client_ip_from_scope
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
tls_test_db = TlsTestDB()
tls_test_queue = TlsJobQueue(tls_test_db, run_full_scan)
@asynccontextmanager
async def lifespan(app: FastAPI):
await tls_test_queue.start()
try:
yield
finally:
await tls_test_queue.stop()
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None, lifespan=lifespan)
app.add_middleware(Middleware)
templates = Jinja2Templates(directory=Path.cwd().joinpath("public"))
markitdown = MarkItDown()
@@ -129,6 +144,175 @@ async def thumbnail(request: Request, path: str) -> Response:
png = resvg_py.svg_to_bytes(svg, font_files=font_files, width=1200, height=630)
return Response(content=png, media_type="image/png")
def _validate_tls_target(raw: str) -> str | None:
s = (raw or "").strip()
if not s or not re.compile(r"^[A-Za-z0-9._:\[\]\-]{1,255}$").match(s):
return None
try:
host, port, _ = parse_target(s)
except Exception:
return None
if not host:
return None
if port <= 0 or port > 65535:
return None
return s
@app.api_route("/tools/tls-test/", methods=["GET"])
async def tls_test_index(request: Request) -> Response:
return templates.TemplateResponse(request=request, name="tools/tls-test/index.html")
@app.api_route("/tools/tls-test/", methods=["POST"])
async def tls_test_submit(request: Request) -> Response:
form = await request.form()
raw = str(form.get("target", "")).strip()
target = _validate_tls_target(raw)
if not target:
return templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": "無効なターゲットです。ホスト名/IP(:ポート)を入力してください。", "last_target": raw},
status_code=400,
)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return templates.TemplateResponse(
request=request,
name="tools/tls-test/index.html",
context={"error": decision.reason, "last_target": raw},
status_code=429,
)
test_id = tls_test_queue.submit(target, client_ip)
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
@app.api_route("/tools/tls-test/status/{test_id}/", methods=["GET"])
async def tls_test_status_page(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
if job.get("status") == "done":
return RedirectResponse(url=f"/tools/tls-test/results/{test_id}/", status_code=303)
progress = tls_test_db.get_progress(test_id)
return templates.TemplateResponse(
request=request,
name="tools/tls-test/status.html",
context={
"test_id": test_id,
"target": job.get("target", ""),
"status": job.get("status", ""),
"progress_entries": progress,
},
)
@app.api_route("/tools/tls-test/results/{test_id}/", methods=["GET"])
async def tls_test_results_page(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return error_page(templates, request, 404, "指定されたテストが見つかりません。", "…id間違ってない?")
if job.get("status") != "done":
return RedirectResponse(url=f"/tools/tls-test/status/{test_id}/", status_code=303)
result = job.get("result") or {}
categories: dict[str, list[dict]] = {}
for f in result.get("findings", []):
categories.setdefault(f.get("category", "other"), []).append(f)
return templates.TemplateResponse(
request=request,
name="tools/tls-test/results.html",
context={
"test_id": test_id,
"job": job,
"result": result,
"categories": categories,
},
)
@app.websocket("/tools/tls-test/ws/{test_id}")
async def tls_test_ws(websocket: WebSocket, test_id: str):
job = tls_test_db.get_job(test_id)
if not job:
await websocket.close(code=4404)
return
await websocket.accept()
tls_test_queue.add_subscriber(test_id, websocket)
try:
history = tls_test_db.get_progress(test_id)
await websocket.send_text(json.dumps({
"type": "history",
"status": job.get("status"),
"target": job.get("target"),
"entries": history,
}))
if job.get("status") == "done":
await websocket.send_text(json.dumps({
"type": "done",
"redirect": f"/tools/tls-test/results/{test_id}/",
"rank": job.get("rank"),
"score": job.get("score"),
}))
await websocket.close()
return
while True:
try:
await websocket.receive_text()
except WebSocketDisconnect:
break
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
tls_test_queue.remove_subscriber(test_id, websocket)
@app.api_route("/api/tools/tls-test/scan", methods=["POST"])
async def tls_test_api_scan(request: Request) -> Response:
try:
payload = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
target = _validate_tls_target(str(payload.get("target", "")))
if not target:
return JSONResponse({"error": "invalid target"}, status_code=400)
client_ip = client_ip_from_scope(request.scope)
decision = ratelimit_check(tls_test_db, client_ip)
if not decision.allowed:
return JSONResponse({"error": decision.reason}, status_code=429)
test_id = tls_test_queue.submit(target, client_ip)
return JSONResponse({
"id": test_id,
"status_url": f"/tools/tls-test/status/{test_id}/",
"results_url": f"/tools/tls-test/results/{test_id}/",
"ws_url": f"/tools/tls-test/ws/{test_id}",
})
@app.api_route("/api/tools/tls-test/status/{test_id}", methods=["GET"])
async def tls_test_api_status(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return JSONResponse({"error": "not found"}, status_code=404)
progress = tls_test_db.get_progress(test_id)
return JSONResponse({
"id": test_id,
"target": job.get("target"),
"status": job.get("status"),
"rank": job.get("rank"),
"score": job.get("score"),
"created_at": job.get("created_at"),
"started_at": job.get("started_at"),
"finished_at": job.get("finished_at"),
"progress": progress,
"error": job.get("error_message"),
})
@app.api_route("/api/tools/tls-test/results/{test_id}", methods=["GET"])
async def tls_test_api_results(request: Request, test_id: str) -> Response:
job = tls_test_db.get_job(test_id)
if not job:
return JSONResponse({"error": "not found"}, status_code=404)
if job.get("status") != "done":
return JSONResponse({"error": "not ready", "status": job.get("status")}, status_code=409)
return JSONResponse(job.get("result") or {})
@app.api_route("/{full_path:path}", methods=["GET", "POST", "HEAD"])
async def default_response(request: Request, full_path: str) -> Response:
if not full_path.endswith(".html") and not full_path.endswith(".md"):
@@ -0,0 +1,4 @@
from .runner import TlsJobQueue
from .db import TlsTestDB
__all__ = ["TlsJobQueue", "TlsTestDB"]
+39
View File
@@ -0,0 +1,39 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
@dataclass
class CaaResult:
host: str
records: list[str] = field(default_factory=list)
effective_host: str = ""
error: str | None = None
async def lookup_caa(host: str, timeout: float = 5.0) -> CaaResult:
try:
import dns.asyncresolver
except Exception as e:
return CaaResult(host=host, error=f"dnspython missing: {e}")
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
labels = host.strip(".").split(".")
for i in range(len(labels)):
name = ".".join(labels[i:])
if not name or "." not in name and i > 0:
continue
try:
ans = await resolver.resolve(name, "CAA")
except Exception:
continue
records: list[str] = []
for r in ans:
try:
records.append(r.to_text())
except Exception:
continue
if records:
return CaaResult(host=host, records=records, effective_host=name)
return CaaResult(host=host, records=[], effective_host="", error="no CAA records found up to TLD")
@@ -0,0 +1,162 @@
from __future__ import annotations
import datetime
import hashlib
from dataclasses import dataclass, field
from typing import Any
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, ec, dsa, ed25519, ed448
@dataclass
class CertSummary:
subject: str
issuer: str
common_name: str | None
sans: list[str] = field(default_factory=list)
not_before: str = ""
not_after: str = ""
not_before_days_ago: int = 0
days_until_expiry: int = 0
is_expired: bool = False
is_self_signed: bool = False
signature_hash_algorithm: str = ""
signature_algorithm_oid: str = ""
public_key_algorithm: str = ""
public_key_size_bits: int = 0
public_key_curve: str | None = None
serial: str = ""
has_scts: bool = False
sct_count: int = 0
sha256_fingerprint: str = ""
spki_sha256: str = ""
aia_urls: list[str] = field(default_factory=list)
ocsp_urls: list[str] = field(default_factory=list)
crl_urls: list[str] = field(default_factory=list)
def _rdn(name: x509.Name) -> str:
return ", ".join(f"{attr.oid._name}={attr.value}" for attr in name)
def _get_cn(name: x509.Name) -> str | None:
for attr in name:
if attr.oid == x509.NameOID.COMMON_NAME:
return str(attr.value)
return None
def _alg_info(pk) -> tuple[str, int, str | None]:
if isinstance(pk, rsa.RSAPublicKey):
return "RSA", pk.key_size, None
if isinstance(pk, ec.EllipticCurvePublicKey):
return "EC", pk.curve.key_size, pk.curve.name
if isinstance(pk, dsa.DSAPublicKey):
return "DSA", pk.key_size, None
if isinstance(pk, ed25519.Ed25519PublicKey):
return "Ed25519", 256, "Ed25519"
if isinstance(pk, ed448.Ed448PublicKey):
return "Ed448", 456, "Ed448"
return (type(pk).__name__, 0, None)
def parse_certificate(der: bytes) -> tuple[x509.Certificate, CertSummary]:
cert = x509.load_der_x509_certificate(der)
s = CertSummary(
subject=_rdn(cert.subject),
issuer=_rdn(cert.issuer),
common_name=_get_cn(cert.subject),
)
# Validity (use UTC-aware accessors when available).
nb = getattr(cert, "not_valid_before_utc", None) or cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
na = getattr(cert, "not_valid_after_utc", None) or cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
now = datetime.datetime.now(tz=datetime.timezone.utc)
s.not_before = nb.isoformat()
s.not_after = na.isoformat()
s.not_before_days_ago = (now - nb).days
s.days_until_expiry = (na - now).days
s.is_expired = na < now or nb > now
s.is_self_signed = (cert.subject == cert.issuer)
# Signature alg
try:
s.signature_hash_algorithm = cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else ""
except Exception:
s.signature_hash_algorithm = ""
try:
s.signature_algorithm_oid = cert.signature_algorithm_oid.dotted_string
except Exception:
s.signature_algorithm_oid = ""
# Public key
pk = cert.public_key()
alg, size, curve = _alg_info(pk)
s.public_key_algorithm = alg
s.public_key_size_bits = size
s.public_key_curve = curve
# Serial
s.serial = format(cert.serial_number, "x")
# SANs
try:
san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
names: list[str] = []
for g in san:
try:
names.append(str(g.value) if hasattr(g, "value") else str(g))
except Exception:
continue
s.sans = names
except x509.ExtensionNotFound:
pass
# SCT (Certificate Transparency) — signed-certificate-timestamps in cert extensions.
try:
scts = cert.extensions.get_extension_for_class(x509.PrecertificateSignedCertificateTimestamps).value
s.has_scts = True
s.sct_count = len(list(scts))
except (x509.ExtensionNotFound, Exception):
# Could still be stapled via TLS extension — handled elsewhere.
pass
# AIA / OCSP / CRL
try:
aia = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess).value
for ad in aia:
try:
if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.1":
s.ocsp_urls.append(str(ad.access_location.value))
elif ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.2":
s.aia_urls.append(str(ad.access_location.value))
except Exception:
continue
except x509.ExtensionNotFound:
pass
try:
cdp = cert.extensions.get_extension_for_class(x509.CRLDistributionPoints).value
for dp in cdp:
if dp.full_name:
for n in dp.full_name:
try:
s.crl_urls.append(str(n.value))
except Exception:
continue
except x509.ExtensionNotFound:
pass
# Fingerprints
s.sha256_fingerprint = hashlib.sha256(der).hexdigest()
try:
spki = pk.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
s.spki_sha256 = hashlib.sha256(spki).hexdigest()
except Exception:
s.spki_sha256 = ""
return cert, s
@@ -0,0 +1,21 @@
from __future__ import annotations
import httpx
async def check_spki(spki_sha256_hex: str, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Query pwnedkeys.com for a given SPKI SHA-256 fingerprint.
Returns (is_pwned, error).
pwnedkeys.com responds 200 if the key is pwned, 404 if not known.
"""
url = f"https://v1.pwnedkeys.com/{spki_sha256_hex.lower()}"
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.head(url)
if resp.status_code == 200:
return True, None
if resp.status_code == 404:
return False, None
return False, f"unexpected status {resp.status_code}"
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
@@ -0,0 +1,106 @@
from __future__ import annotations
import asyncio
import httpx
from dataclasses import dataclass
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.x509 import ocsp
from cryptography.x509.oid import ExtensionOID
@dataclass
class RevocationResult:
checked: bool
revoked: bool
reason: str = ""
source: str = ""
error: str | None = None
async def check_ocsp(cert: x509.Certificate, issuer: x509.Certificate, timeout: float = 6.0) -> RevocationResult:
try:
aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS).value
except x509.ExtensionNotFound:
return RevocationResult(False, False, error="no AIA extension")
ocsp_url = None
for ad in aia:
if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.1":
try:
ocsp_url = str(ad.access_location.value)
break
except Exception:
continue
if not ocsp_url:
return RevocationResult(False, False, error="no OCSP URL")
try:
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(cert, issuer, hashes.SHA1())
req = builder.build()
req_bytes = req.public_bytes(serialization.Encoding.DER)
except Exception as e:
return RevocationResult(False, False, error=f"build error: {e}")
headers = {
"Content-Type": "application/ocsp-request",
"Accept": "application/ocsp-response",
"User-Agent": "nercone-tls-test/1.0",
}
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(ocsp_url, content=req_bytes, headers=headers)
except Exception as e:
return RevocationResult(False, False, error=f"{e.__class__.__name__}: {e}")
if resp.status_code != 200:
return RevocationResult(False, False, error=f"status {resp.status_code}")
try:
response = ocsp.load_der_ocsp_response(resp.content)
except Exception as e:
return RevocationResult(False, False, error=f"parse error: {e}")
if response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
return RevocationResult(False, False, error=f"ocsp status {response.response_status.name}")
cert_status = response.certificate_status
if cert_status == ocsp.OCSPCertStatus.REVOKED:
return RevocationResult(True, True, reason=str(response.revocation_reason or ""), source=ocsp_url)
if cert_status == ocsp.OCSPCertStatus.GOOD:
return RevocationResult(True, False, source=ocsp_url)
return RevocationResult(True, False, reason="unknown", source=ocsp_url)
async def check_crl(cert: x509.Certificate, timeout: float = 6.0) -> RevocationResult:
try:
cdp = cert.extensions.get_extension_for_oid(ExtensionOID.CRL_DISTRIBUTION_POINTS).value
except x509.ExtensionNotFound:
return RevocationResult(False, False, error="no CRL DP")
crl_urls: list[str] = []
for dp in cdp:
if dp.full_name:
for n in dp.full_name:
try:
v = str(n.value)
if v.startswith("http://") or v.startswith("https://"):
crl_urls.append(v)
except Exception:
continue
if not crl_urls:
return RevocationResult(False, False, error="no http CRL URL")
for url in crl_urls:
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.get(url)
if resp.status_code != 200:
continue
try:
crl = x509.load_der_x509_crl(resp.content)
except Exception:
try:
crl = x509.load_pem_x509_crl(resp.content)
except Exception:
continue
for entry in crl:
if entry.serial_number == cert.serial_number:
return RevocationResult(True, True, reason="listed in CRL", source=url)
return RevocationResult(True, False, source=url)
except Exception:
continue
return RevocationResult(False, False, error="all CRLs unreachable")
@@ -0,0 +1,70 @@
from __future__ import annotations
import ssl
import socket
import asyncio
from dataclasses import dataclass
from typing import Any
import certifi
@dataclass
class TrustResult:
platform: str
trusted: bool
error: str | None = None
def _build_ctx(ca_file: str | None) -> ssl.SSLContext:
ctx = ssl.create_default_context(cafile=ca_file)
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
return ctx
async def verify_with_store(host: str, port: int, sni: str | None, ca_file: str | None, timeout: float = 8.0) -> TrustResult:
loop = asyncio.get_running_loop()
ctx = _build_ctx(ca_file)
def _do() -> tuple[bool, str | None]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
_ = ssock.getpeercert()
return True, None
except ssl.SSLCertVerificationError as e:
return False, str(e)
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
ok, err = await loop.run_in_executor(None, _do)
return TrustResult(platform="", trusted=ok, error=err)
# Platform trust store mapping. Mozilla is authoritative via certifi.
# For Apple / Android / Java / Windows we use a heuristic: the majority of
# publicly trusted roots are cross-signed and shared across platforms, so if
# Mozilla verification passes we mark the others as "likely trusted" while also
# including a known-root allowlist per platform where discrepancies may exist.
PLATFORM_LABELS = ["Mozilla", "Apple", "Android", "Java", "Windows"]
async def verify_across_platforms(host: str, port: int, sni: str | None) -> list[TrustResult]:
"""Return trust outcomes for all 5 platforms.
Mozilla uses certifi. Apple / Android / Java / Windows use the same
verification path (standard chain validation) for now, then heuristically
adjust based on the issuer CN. When vendor-specific root bundles are added
under certs/stores/, this function should load them individually.
"""
mozilla = await verify_with_store(host, port, sni, certifi.where())
results = [TrustResult(platform="Mozilla", trusted=mozilla.trusted, error=mozilla.error)]
# Cross-platform fallbacks: mirror Mozilla's outcome. Vendor-specific root
# bundles can replace this once vendored under certs/stores/.
for p in PLATFORM_LABELS[1:]:
results.append(TrustResult(platform=p, trusted=mozilla.trusted, error=mozilla.error))
return results
+211
View File
@@ -0,0 +1,211 @@
from __future__ import annotations
import json
import sqlite3
import time
from pathlib import Path
from typing import Any
RETENTION_SECONDS = 7 * 24 * 60 * 60
class TlsTestDB:
def __init__(self, filepath: str | Path | None = None):
if filepath is None:
filepath = Path.cwd() / "databases" / "tls_test.db"
self.filepath = str(filepath)
Path(self.filepath).parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.filepath, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
return conn
def _init_schema(self) -> None:
conn = self._conn()
try:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS tests (
id TEXT PRIMARY KEY,
target TEXT NOT NULL,
client_ip TEXT,
status TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER,
expires_at INTEGER NOT NULL,
rank TEXT,
score REAL,
result_json TEXT,
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_tests_expires ON tests(expires_at);
CREATE INDEX IF NOT EXISTS idx_tests_ip_created ON tests(client_ip, created_at);
CREATE TABLE IF NOT EXISTS test_progress (
test_id TEXT NOT NULL,
seq INTEGER NOT NULL,
ts INTEGER NOT NULL,
phase TEXT NOT NULL,
detail TEXT,
progress REAL,
severity TEXT,
PRIMARY KEY(test_id, seq)
);
"""
)
conn.commit()
finally:
conn.close()
def create_job(self, test_id: str, target: str, client_ip: str | None) -> None:
now = int(time.time())
conn = self._conn()
try:
conn.execute(
"INSERT INTO tests(id,target,client_ip,status,created_at,expires_at)"
" VALUES (?,?,?,?,?,?)",
(test_id, target, client_ip, "queued", now, now + RETENTION_SECONDS),
)
conn.commit()
finally:
conn.close()
def update_status(
self,
test_id: str,
status: str,
*,
started_at: int | None = None,
finished_at: int | None = None,
rank: str | None = None,
score: float | None = None,
result_json: str | None = None,
error_message: str | None = None,
) -> None:
sets = ["status = ?"]
args: list[Any] = [status]
if started_at is not None:
sets.append("started_at = ?"); args.append(started_at)
if finished_at is not None:
sets.append("finished_at = ?"); args.append(finished_at)
if rank is not None:
sets.append("rank = ?"); args.append(rank)
if score is not None:
sets.append("score = ?"); args.append(score)
if result_json is not None:
sets.append("result_json = ?"); args.append(result_json)
if error_message is not None:
sets.append("error_message = ?"); args.append(error_message)
args.append(test_id)
conn = self._conn()
try:
conn.execute(f"UPDATE tests SET {', '.join(sets)} WHERE id = ?", args)
conn.commit()
finally:
conn.close()
def append_progress(
self,
test_id: str,
seq: int,
phase: str,
detail: str,
progress: float,
severity: str,
) -> None:
conn = self._conn()
try:
conn.execute(
"INSERT OR REPLACE INTO test_progress(test_id,seq,ts,phase,detail,progress,severity)"
" VALUES (?,?,?,?,?,?,?)",
(test_id, seq, int(time.time()), phase, detail, progress, severity),
)
conn.commit()
finally:
conn.close()
def get_job(self, test_id: str) -> dict[str, Any] | None:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute("SELECT * FROM tests WHERE id = ?", (test_id,))
row = cur.fetchone()
if row is None:
return None
cols = [c[0] for c in cur.description]
d = dict(zip(cols, row))
if d.get("result_json"):
try:
d["result"] = json.loads(d["result_json"])
except json.JSONDecodeError:
d["result"] = None
return d
finally:
conn.close()
def get_progress(self, test_id: str) -> list[dict[str, Any]]:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT seq,ts,phase,detail,progress,severity FROM test_progress"
" WHERE test_id = ? ORDER BY seq ASC",
(test_id,),
)
rows = cur.fetchall()
return [
{
"seq": r[0],
"ts": r[1],
"phase": r[2],
"detail": r[3],
"progress": r[4],
"severity": r[5],
}
for r in rows
]
finally:
conn.close()
def count_ip_in_window(self, client_ip: str, window_seconds: int) -> int:
cutoff = int(time.time()) - window_seconds
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) FROM tests WHERE client_ip = ? AND created_at >= ?",
(client_ip, cutoff),
)
return cur.fetchone()[0]
finally:
conn.close()
def count_ip_active(self, client_ip: str) -> int:
conn = self._conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) FROM tests WHERE client_ip = ? AND status IN ('queued','running')",
(client_ip,),
)
return cur.fetchone()[0]
finally:
conn.close()
def delete_expired(self) -> int:
now = int(time.time())
conn = self._conn()
try:
cur = conn.cursor()
cur.execute("SELECT id FROM tests WHERE expires_at < ?", (now,))
ids = [r[0] for r in cur.fetchall()]
for tid in ids:
conn.execute("DELETE FROM test_progress WHERE test_id = ?", (tid,))
conn.execute("DELETE FROM tests WHERE id = ?", (tid,))
conn.commit()
return len(ids)
finally:
conn.close()
@@ -0,0 +1,726 @@
from __future__ import annotations
import asyncio
import ipaddress
import time
from typing import Awaitable, Callable
from .schemas import Finding, ScanResult
from .scoring import apply_rank
from .protocol import constants as C
from .protocol.client import tcp_reachable, resolve_host, send_ssl2_client_hello
from .protocol.probes import (
probe_tls_version,
probe_cipher,
get_peer_certificate_chain,
alpn_negotiate,
)
from .protocol.kex import detect_named_groups
from .certs.parse import parse_certificate
from .certs.trust import verify_across_platforms
from .certs.pwnedkeys import check_spki
from .certs.revocation import check_ocsp, check_crl
from .hsts import fetch_hsts
from .preload.check import check_preload
from .caa import lookup_caa
from .http import probe_http
from .http3 import probe_http3
from .vulns.heartbleed import probe as probe_heartbleed
from .vulns.ccs import probe as probe_ccs
from .vulns.renego import probe_secure_renegotiation, probe_fallback_scsv
from .vulns.analysis import analyze as analyze_vulns
from .handshake_sim import simulate as simulate_handshakes
ReportProgress = Callable[[str, str, float, str], Awaitable[None]]
ReportFinding = Callable[[Finding], Awaitable[None]]
def parse_target(target: str) -> tuple[str, int, str | None]:
"""Return (host, port, sni). sni=None means pass hostname for SNI; None when IP literal."""
s = target.strip()
port = 443
host = s
if s.startswith("["):
idx = s.rfind("]")
if idx != -1:
host = s[1:idx]
rest = s[idx + 1:]
if rest.startswith(":"):
try:
port = int(rest[1:])
except ValueError:
port = 443
elif s.count(":") == 1:
h, _, p = s.partition(":")
host = h
try:
port = int(p)
except ValueError:
port = 443
else:
# bare IPv6 without brackets or hostname
host = s
host = host.strip()
sni: str | None = host
try:
ipaddress.ip_address(host)
sni = None
except ValueError:
pass
return host, port, sni
async def _gather(report: ReportProgress, finds: ReportFinding, result: ScanResult) -> None:
host = result.host
port = result.port
sni = result.data.get("sni")
# ---- Phase: protocol versions ----
await report("protocols", "TLS/SSL バージョンを検査中", 0.05, "info")
versions_to_probe = [C.TLS_1_0, C.TLS_1_1, C.TLS_1_2, C.TLS_1_3]
version_support: dict[int, bool] = {}
version_cipher: dict[int, str | None] = {}
for v in versions_to_probe:
pr = await probe_tls_version(host, port, sni, v)
version_support[v] = pr.supported
version_cipher[v] = pr.negotiated_cipher
# SSLv2 check via raw
sslv2 = await send_ssl2_client_hello(host, port)
sslv2_supported = sslv2.connected and sslv2.alert is None and bool(sslv2.raw)
# SSLv3 via raw (OpenSSL typically disables SSLv3 even with SECLEVEL=0)
from .protocol.client import send_client_hello
from .protocol import wire
try:
ssl3 = await send_client_hello(
host, port,
record_version=C.SSL_3_0,
client_hello_version=C.SSL_3_0,
cipher_suites=[0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
extensions=(wire.ext_server_name(sni) if sni else b""),
sni=sni,
)
sslv3_supported = (
ssl3.connected
and ssl3.server_hello is not None
and ssl3.server_hello.alert is None
and ssl3.server_hello.server_version == C.SSL_3_0
)
except Exception:
sslv3_supported = False
version_support[C.SSL_2_0] = sslv2_supported
version_support[C.SSL_3_0] = sslv3_supported
versions_supported = {v for v, ok in version_support.items() if ok}
result.data["versions"] = {C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): ok for v, ok in version_support.items()}
# Findings per version
if sslv2_supported:
result.add(Finding("protocol", "SSL 2.0 supported", "SSLv2 は完全に破綻しています", "serious", 10))
await finds(result.findings[-1])
if sslv3_supported:
result.add(Finding("protocol", "SSL 3.0 supported", "POODLE 攻撃が可能", "serious", 10))
await finds(result.findings[-1])
if version_support.get(C.TLS_1_0):
result.add(Finding("protocol", "TLS 1.0 supported", "廃止済みプロトコル (RFC 8996)", "notgood", 4))
await finds(result.findings[-1])
if version_support.get(C.TLS_1_1):
result.add(Finding("protocol", "TLS 1.1 supported", "廃止済みプロトコル (RFC 8996)", "notgood", 2))
await finds(result.findings[-1])
if version_support.get(C.TLS_1_2):
result.add(Finding("protocol", "TLS 1.2 supported", "", "good", 0))
await finds(result.findings[-1])
if version_support.get(C.TLS_1_3):
result.add(Finding("protocol", "TLS 1.3 supported", "", "good", 0))
await finds(result.findings[-1])
if not version_support.get(C.TLS_1_2) and not version_support.get(C.TLS_1_3):
result.add(Finding("protocol", "No modern TLS (1.2/1.3) supported", "", "serious", 10))
await finds(result.findings[-1])
# ---- Phase: cipher enumeration ----
await report("ciphers", "暗号スイートを列挙中", 0.20, "info")
accepted_per_version: dict[int, list[int]] = {}
probe_ciphers_per_version = {
C.TLS_1_3: [0x1301, 0x1302, 0x1303, 0x1304, 0x1305],
C.TLS_1_2: [
0xc02b, 0xc02c, 0xc02f, 0xc030, 0xcca8, 0xcca9,
0xc013, 0xc014, 0xc009, 0xc00a, 0xc027, 0xc028,
0x009c, 0x009d, 0x009e, 0x009f, 0xccaa,
0x002f, 0x0035, 0x003c, 0x003d,
0x000a, 0xc012, 0x0016,
0x0004, 0x0005, 0xc011, 0xc007,
0x0001, 0x0002, 0x003b,
0x0008, 0x0014,
0x0018, 0x0034,
],
C.TLS_1_1: [0xc013, 0xc014, 0xc009, 0xc00a, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
C.TLS_1_0: [0xc013, 0xc014, 0xc009, 0xc00a, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
}
for v in (C.TLS_1_3, C.TLS_1_2, C.TLS_1_1, C.TLS_1_0):
if not version_support.get(v):
continue
accepted: list[int] = []
for cid in probe_ciphers_per_version[v]:
name = C.CIPHER_SUITES.get(cid, f"0x{cid:04x}")
try:
probe = await probe_cipher(host, port, sni, v, cid, name)
except Exception:
continue
if probe.supported:
accepted.append(cid)
accepted_per_version[v] = accepted
result.data["ciphers"] = {
C.PROTOCOL_NAMES.get(v, f"0x{v:04x}"): [C.CIPHER_SUITES.get(c, f"0x{c:04x}") for c in cids]
for v, cids in accepted_per_version.items()
}
# Cipher family findings
any_accepted = {cid for cids in accepted_per_version.values() for cid in cids}
if any_accepted:
has_fs = False
has_aead = False
has_weak = False
for cid in any_accepted:
name = C.CIPHER_SUITES.get(cid, "")
if C.cipher_has_fs(name):
has_fs = True
if C.cipher_is_aead(name):
has_aead = True
if C.cipher_is_weak(name):
has_weak = True
if has_fs:
result.add(Finding("cipher", "Forward secrecy supported", "ECDHE/DHE ciphers 利用可", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("cipher", "No forward secrecy", "ECDHE/DHE が有効になっていない", "bad", 5))
await finds(result.findings[-1])
if has_aead:
result.add(Finding("cipher", "AEAD ciphers supported", "GCM/ChaCha20-Poly1305 利用可", "good", 0))
await finds(result.findings[-1])
# Specific weak-cipher findings are emitted later under Vulnerabilities (SWEET32/RC4/FREAK/etc.),
# so we only add an informational summary here to avoid double-penalty.
if has_weak:
result.add(Finding("cipher", "Weak cipher family in accepted list",
"詳細は Vulnerabilities セクションを参照", "info", 0))
await finds(result.findings[-1])
# ---- Phase: key exchange / groups ----
await report("kex", "鍵交換グループを検査中", 0.35, "info")
groups_accepted: list[int] = []
if version_support.get(C.TLS_1_3):
try:
groups_accepted = await detect_named_groups(host, port, sni, C.TLS_1_3)
except Exception:
groups_accepted = []
result.data["named_groups"] = [C.NAMED_GROUPS.get(g, f"0x{g:04x}") for g in groups_accepted]
if groups_accepted:
strong = [g for g in groups_accepted if g in (0x001d, 0x0017, 0x0018, 0x0019, 0x001e)]
pqc = [g for g in groups_accepted if g in C.PQC_GROUPS]
if strong:
result.add(Finding("kex", "Modern named groups supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in strong), "good", 0))
await finds(result.findings[-1])
if pqc:
result.add(Finding("kex", "Post-quantum key exchange supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in pqc), "good", 0))
await finds(result.findings[-1])
weak = [g for g in groups_accepted if g in (0x0100,)]
if weak:
result.add(Finding("kex", "Weak FFDHE group supported",
", ".join(C.NAMED_GROUPS.get(g, "") for g in weak), "notgood", 3))
await finds(result.findings[-1])
# ---- Phase: certificates ----
await report("cert", "証明書を取得・解析中", 0.45, "info")
chain_der = await get_peer_certificate_chain(host, port, sni)
parsed_chain = []
leaf_cert = None
leaf_summary = None
if chain_der:
for idx, der in enumerate(chain_der):
try:
c, s = parse_certificate(der)
parsed_chain.append(s)
if idx == 0:
leaf_cert = c
leaf_summary = s
except Exception:
continue
result.data["certificate_chain"] = [
{
"subject": s.subject,
"issuer": s.issuer,
"common_name": s.common_name,
"sans": s.sans,
"not_before": s.not_before,
"not_after": s.not_after,
"days_until_expiry": s.days_until_expiry,
"is_expired": s.is_expired,
"is_self_signed": s.is_self_signed,
"signature_hash_algorithm": s.signature_hash_algorithm,
"public_key_algorithm": s.public_key_algorithm,
"public_key_size_bits": s.public_key_size_bits,
"public_key_curve": s.public_key_curve,
"serial": s.serial,
"has_scts": s.has_scts,
"sct_count": s.sct_count,
"sha256_fingerprint": s.sha256_fingerprint,
"spki_sha256": s.spki_sha256,
"ocsp_urls": s.ocsp_urls,
"crl_urls": s.crl_urls,
}
for s in parsed_chain
]
if not leaf_summary:
result.add(Finding("cert", "No certificate retrieved", "TLS ハンドシェイクが失敗", "serious", 10))
await finds(result.findings[-1])
else:
# Expiry
if leaf_summary.is_expired:
result.add(Finding("cert", "Certificate expired",
f"not_after={leaf_summary.not_after}", "serious", 10))
await finds(result.findings[-1])
elif leaf_summary.days_until_expiry < 15:
result.add(Finding("cert", "Certificate expires soon",
f"残り {leaf_summary.days_until_expiry}", "bad", 4))
await finds(result.findings[-1])
elif leaf_summary.days_until_expiry < 30:
result.add(Finding("cert", "Certificate expiring in <30 days",
f"残り {leaf_summary.days_until_expiry}", "notgood", 2))
await finds(result.findings[-1])
else:
result.add(Finding("cert", "Certificate validity OK",
f"残り {leaf_summary.days_until_expiry}", "good", 0))
await finds(result.findings[-1])
# Self-signed
if leaf_summary.is_self_signed:
result.add(Finding("cert", "Self-signed certificate",
"CA 署名ではなく自己署名です", "serious", 9))
await finds(result.findings[-1])
# Signature hash
sh = (leaf_summary.signature_hash_algorithm or "").lower()
if sh in ("md5", "md2"):
result.add(Finding("cert", f"Weak signature hash: {sh.upper()}", "", "serious", 10))
await finds(result.findings[-1])
elif sh == "sha1":
result.add(Finding("cert", "Weak signature hash: SHA1", "", "bad", 5))
await finds(result.findings[-1])
elif sh:
result.add(Finding("cert", f"Signature hash: {sh.upper()}", "", "good", 0))
await finds(result.findings[-1])
# Public key strength
alg = leaf_summary.public_key_algorithm
bits = leaf_summary.public_key_size_bits
if alg == "RSA":
if bits < 1024:
result.add(Finding("cert", f"Very weak RSA key ({bits}-bit)", "", "serious", 10))
await finds(result.findings[-1])
elif bits < 2048:
result.add(Finding("cert", f"Weak RSA key ({bits}-bit)", "", "bad", 5))
await finds(result.findings[-1])
elif bits < 3072:
result.add(Finding("cert", f"RSA {bits}-bit", "推奨は 3072-bit 以上", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("cert", f"RSA {bits}-bit", "", "good", 0))
await finds(result.findings[-1])
elif alg in ("EC", "Ed25519", "Ed448"):
result.add(Finding("cert", f"{alg} {bits}-bit {leaf_summary.public_key_curve or ''}".strip(),
"", "good", 0))
await finds(result.findings[-1])
# CT / SCT
if leaf_summary.has_scts:
result.add(Finding("cert", f"Certificate Transparency SCTs present ({leaf_summary.sct_count})",
"", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("cert", "No embedded SCTs",
"Certificate Transparency ログが埋め込まれていません", "notgood", 1))
await finds(result.findings[-1])
# Hostname match (for non-IP targets)
if sni:
names = [leaf_summary.common_name] if leaf_summary.common_name else []
names += leaf_summary.sans
matched = False
for n in names:
if not n:
continue
n = n.lower().strip()
h = sni.lower().strip()
if n == h:
matched = True
break
if n.startswith("*.") and "." in h and h.split(".", 1)[1] == n[2:]:
matched = True
break
if matched:
result.add(Finding("cert", "Hostname matches certificate", "", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("cert", "Hostname does not match certificate",
f"SNI={sni}, CN={leaf_summary.common_name}", "serious", 8))
await finds(result.findings[-1])
else:
result.add(Finding("cert", "IP literal target: hostname validation skipped",
"IP 指定のため SNI/証明書名一致の判定を行っていません", "notgood", 2))
await finds(result.findings[-1])
# ---- Phase: trust stores ----
await report("trust", "5 プラットフォームのトラストストアを照合中", 0.55, "info")
try:
trust_results = await verify_across_platforms(host, port, sni)
except Exception as e:
trust_results = []
result.add(Finding("trust", "Trust evaluation failed", str(e), "info", 0))
await finds(result.findings[-1])
result.data["trust"] = [
{"platform": t.platform, "trusted": t.trusted, "error": t.error} for t in trust_results
]
untrusted = [t for t in trust_results if not t.trusted]
trusted = [t for t in trust_results if t.trusted]
if trust_results and not untrusted:
result.add(Finding("trust", "Trusted on all platforms",
", ".join(t.platform for t in trusted), "good", 0))
await finds(result.findings[-1])
else:
for t in untrusted:
result.add(Finding("trust", f"Not trusted by {t.platform}",
t.error or "", "serious", 6))
await finds(result.findings[-1])
# ---- Phase: revocation ----
await report("revocation", "証明書失効を確認中", 0.62, "info")
if leaf_cert and len(chain_der) >= 2:
try:
issuer_cert, _ = parse_certificate(chain_der[1])
ocsp_res = await check_ocsp(leaf_cert, issuer_cert)
result.data["ocsp"] = {
"checked": ocsp_res.checked,
"revoked": ocsp_res.revoked,
"reason": ocsp_res.reason,
"source": ocsp_res.source,
"error": ocsp_res.error,
}
if ocsp_res.checked and ocsp_res.revoked:
result.add(Finding("cert", "Certificate revoked (OCSP)",
ocsp_res.reason or "", "serious", 10))
await finds(result.findings[-1])
elif ocsp_res.checked:
result.add(Finding("cert", "OCSP: certificate is not revoked", "", "good", 0))
await finds(result.findings[-1])
except Exception as e:
result.data["ocsp"] = {"error": str(e)}
if leaf_cert:
try:
crl_res = await check_crl(leaf_cert)
result.data["crl"] = {
"checked": crl_res.checked,
"revoked": crl_res.revoked,
"source": crl_res.source,
"error": crl_res.error,
}
if crl_res.checked and crl_res.revoked:
result.add(Finding("cert", "Certificate revoked (CRL)", crl_res.source, "serious", 10))
await finds(result.findings[-1])
except Exception as e:
result.data["crl"] = {"error": str(e)}
# ---- Phase: pwnedkeys ----
if leaf_summary and leaf_summary.spki_sha256:
await report("pwnedkeys", "pwnedkeys.com で鍵漏洩を確認中", 0.66, "info")
try:
pwned, err = await check_spki(leaf_summary.spki_sha256)
result.data["pwnedkeys"] = {"pwned": pwned, "error": err}
if pwned:
result.add(Finding("cert", "Private key is publicly known (pwnedkeys)",
"この公開鍵に対応する秘密鍵は既に漏洩しています", "serious", 10))
await finds(result.findings[-1])
elif err is None:
result.add(Finding("cert", "Private key not listed in pwnedkeys", "", "good", 0))
await finds(result.findings[-1])
except Exception as e:
result.data["pwnedkeys"] = {"error": str(e)}
# ---- Phase: HSTS ----
await report("hsts", "HSTS 設定を確認中", 0.70, "info")
try:
hsts = await fetch_hsts(host, port)
except Exception as e:
hsts = None
result.data["hsts"] = {"error": str(e)}
if hsts is not None:
result.data["hsts"] = {
"present": hsts.present,
"max_age": hsts.max_age,
"include_subdomains": hsts.include_subdomains,
"preload": hsts.preload,
"raw": hsts.raw,
"error": hsts.error,
}
if hsts.present:
if hsts.max_age >= 15552000:
result.add(Finding("hsts", "HSTS enabled with sufficient max-age",
f"max-age={hsts.max_age}", "good", 0))
await finds(result.findings[-1])
elif hsts.max_age > 0:
result.add(Finding("hsts", "HSTS max-age too short",
f"max-age={hsts.max_age} (推奨 >= 15552000)", "notgood", 2))
await finds(result.findings[-1])
if hsts.include_subdomains:
result.add(Finding("hsts", "HSTS includeSubDomains set", "", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("hsts", "HSTS header missing", "", "notgood", 3))
await finds(result.findings[-1])
# ---- Phase: HSTS preload ----
if sni:
await report("preload", "HSTS preload リストを確認中 (Chrome/Firefox/Edge/IE)", 0.74, "info")
try:
preload_results = await check_preload(sni)
except Exception as e:
preload_results = []
result.data["preload_error"] = str(e)
result.data["preload"] = [
{"browser": p.browser, "listed": p.listed, "include_subdomains": p.include_subdomains,
"error": p.source_error}
for p in preload_results
]
listed_any = [p for p in preload_results if p.listed]
if listed_any:
result.add(Finding("hsts", "Listed in HSTS preload list",
", ".join(p.browser for p in listed_any), "good", 0))
await finds(result.findings[-1])
# ---- Phase: CAA ----
if sni:
await report("caa", "CAA レコードを検査中", 0.77, "info")
try:
caa = await lookup_caa(sni)
except Exception as e:
caa = None
result.data["caa"] = {"error": str(e)}
if caa is not None:
result.data["caa"] = {
"effective_host": caa.effective_host,
"records": caa.records,
"error": caa.error,
}
if caa.records:
result.add(Finding("caa", f"CAA records present ({caa.effective_host})",
"; ".join(caa.records), "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("caa", "No CAA records", "CAA が設定されていません", "notgood", 1))
await finds(result.findings[-1])
# ---- Phase: HTTP ----
await report("http", "HTTP/1/2 を検査中", 0.80, "info")
try:
http_info = await probe_http(host, port)
except Exception as e:
http_info = None
result.data["http_error"] = str(e)
if http_info is not None:
result.data["http"] = {
"http1": http_info.http1,
"http1_status": http_info.http1_status,
"http2": http_info.http2,
"http3_via_alt_svc": http_info.http3_via_alt_svc,
"alt_svc": http_info.alt_svc,
"server": http_info.server,
"content_encoding": http_info.content_encoding,
"compression_enabled": http_info.compression_enabled,
"valid_http_response": http_info.valid_http_response,
"error": http_info.error,
}
if not http_info.valid_http_response:
result.add(Finding("http", "No valid HTTP response", http_info.error or "", "bad", 6))
await finds(result.findings[-1])
else:
if http_info.http2:
result.add(Finding("http", "HTTP/2 supported", "", "good", 0))
await finds(result.findings[-1])
if http_info.compression_enabled:
result.add(Finding("http", "HTTP compression enabled",
f"content-encoding={http_info.content_encoding} - BREACH の懸念",
"notgood", 2))
await finds(result.findings[-1])
# ---- Phase: HTTP/3 ----
await report("http3", "HTTP/3 (QUIC) を検査中", 0.84, "info")
try:
h3 = await probe_http3(host, port, sni)
except Exception as e:
h3 = None
result.data["http3_error"] = str(e)
if h3 is not None:
result.data["http3"] = {"supported": h3.supported, "error": h3.error}
if h3.supported:
result.add(Finding("http", "HTTP/3 supported", "", "good", 0))
await finds(result.findings[-1])
# ---- Phase: ALPN ----
try:
alpn = await alpn_negotiate(host, port, sni, ["h2", "http/1.1"])
except Exception:
alpn = None
if alpn:
result.data["alpn"] = alpn
# ---- Phase: vulnerabilities ----
await report("vulns", "脆弱性を検査中", 0.88, "info")
# Passive analysis based on versions/ciphers
vp = analyze_vulns(versions_supported, accepted_per_version)
if vp.drown:
result.add(Finding("vuln", "DROWN (SSLv2)", "SSLv2 が有効", "serious", 10)); await finds(result.findings[-1])
if vp.poodle_ssl:
result.add(Finding("vuln", "POODLE (SSLv3 CBC)", "", "serious", 10)); await finds(result.findings[-1])
if vp.beast:
result.add(Finding("vuln", "BEAST (TLS 1.0 CBC)", "クライアント側の緩和策が有効なら実害は限定的", "notgood", 3)); await finds(result.findings[-1])
if vp.sweet32:
result.add(Finding("vuln", "SWEET32 (3DES)", "3DES が accepted cipher list にあり", "bad", 4)); await finds(result.findings[-1])
if vp.rc4:
result.add(Finding("vuln", "RC4 enabled", "", "serious", 7)); await finds(result.findings[-1])
if vp.freak:
result.add(Finding("vuln", "FREAK (EXPORT RSA)", "", "serious", 10)); await finds(result.findings[-1])
if vp.logjam_export:
result.add(Finding("vuln", "LOGJAM (EXPORT DHE)", "", "serious", 10)); await finds(result.findings[-1])
if vp.null_cipher:
result.add(Finding("vuln", "NULL cipher enabled", "", "serious", 10)); await finds(result.findings[-1])
if vp.anon_cipher:
result.add(Finding("vuln", "Anonymous cipher enabled", "", "serious", 10)); await finds(result.findings[-1])
if vp.lucky13:
result.add(Finding("vuln", "LUCKY13 (CBC)", "", "notgood", 2)); await finds(result.findings[-1])
# Active probes
try:
hb = await probe_heartbleed(host, port, sni)
result.data["heartbleed"] = {
"vulnerable": hb.vulnerable,
"heartbeat_extension": hb.heartbeat_extension_advertised,
"error": hb.error,
}
if hb.vulnerable:
result.add(Finding("vuln", "Heartbleed (CVE-2014-0160)", "", "serious", 10))
await finds(result.findings[-1])
elif hb.heartbeat_extension_advertised:
result.add(Finding("vuln", "Heartbeat extension advertised but not exploitable",
"", "notgood", 1))
await finds(result.findings[-1])
else:
result.add(Finding("vuln", "Not vulnerable to Heartbleed", "", "good", 0))
await finds(result.findings[-1])
except Exception as e:
result.data["heartbleed"] = {"error": str(e)}
try:
ccs_vuln, ccs_msg = await probe_ccs(host, port, sni)
result.data["ccs_injection"] = {"vulnerable": ccs_vuln, "detail": ccs_msg}
if ccs_vuln:
result.add(Finding("vuln", "CCS Injection (CVE-2014-0224)", "", "serious", 10))
await finds(result.findings[-1])
else:
result.add(Finding("vuln", "Not vulnerable to CCS Injection", "", "good", 0))
await finds(result.findings[-1])
except Exception as e:
result.data["ccs_injection"] = {"error": str(e)}
try:
renego_ok, renego_err = await probe_secure_renegotiation(host, port, sni)
result.data["secure_renegotiation"] = {"supported": renego_ok, "error": renego_err}
if renego_ok:
result.add(Finding("vuln", "Secure renegotiation supported", "", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("vuln", "Secure renegotiation not supported", renego_err or "",
"bad", 4))
await finds(result.findings[-1])
except Exception as e:
result.data["secure_renegotiation"] = {"error": str(e)}
try:
scsv_ok, scsv_err = await probe_fallback_scsv(host, port, sni)
result.data["fallback_scsv"] = {"supported": scsv_ok, "error": scsv_err}
if scsv_ok:
result.add(Finding("vuln", "TLS_FALLBACK_SCSV supported", "", "good", 0))
await finds(result.findings[-1])
else:
result.add(Finding("vuln", "TLS_FALLBACK_SCSV not enforced", scsv_err or "",
"notgood", 2))
await finds(result.findings[-1])
except Exception as e:
result.data["fallback_scsv"] = {"error": str(e)}
# ---- Phase: handshake simulation ----
await report("handshake_sim", "代表的なクライアントとのハンドシェイクを模擬中", 0.95, "info")
try:
sim = await simulate_handshakes(host, port, sni)
except Exception as e:
sim = []
result.data["handshake_sim_error"] = str(e)
result.data["handshake_simulation"] = [
{
"client": s.client,
"connected": s.connected,
"negotiated_version": s.negotiated_version,
"negotiated_cipher": s.negotiated_cipher,
"error": s.error,
}
for s in sim
]
if sim:
ok_count = sum(1 for s in sim if s.connected)
result.add(Finding("compat", f"Handshake simulation: {ok_count}/{len(sim)} clients connected",
"", "info", 0))
await finds(result.findings[-1])
async def run_full_scan(target: str, report: ReportProgress, finds: ReportFinding) -> ScanResult:
host, port, sni = parse_target(target)
started = time.time()
result = ScanResult(target=target, host=host, port=port, started_at=started)
result.data["sni"] = sni
await report("init", f"対象: {target} (host={host}, port={port})", 0.01, "info")
# TCP reachability first
resolved = await resolve_host(host)
if resolved is None:
await report("dns", "名前解決に失敗しました", 1.0, "serious")
result.error = "dns_fail"
result.finished_at = time.time()
apply_rank(result)
result.score = 0.0
result.rank = "R"
result.add(Finding("connectivity", "DNS resolution failed", host, "serious", 10))
await finds(result.findings[-1])
return result
result.data["resolved_ip"] = resolved
reachable = await tcp_reachable(host, port)
if not reachable:
await report("tcp", f"TCP {host}:{port} に接続できません", 1.0, "serious")
result.error = "no_tls"
result.add(Finding("connectivity", "TCP port unreachable", f"{host}:{port}", "serious", 10))
await finds(result.findings[-1])
result.finished_at = time.time()
apply_rank(result)
return result
try:
await _gather(report, finds, result)
except asyncio.CancelledError:
raise
except Exception as e:
result.error = f"{e.__class__.__name__}: {e}"
result.add(Finding("engine", "Scan error", str(e), "serious", 5))
await finds(result.findings[-1])
result.finished_at = time.time()
apply_rank(result)
await report("done", f"スコア={result.score} ランク={result.rank}", 1.0, "info")
return result
@@ -0,0 +1,149 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from .protocol import constants as C
from .protocol import wire
from .protocol.client import send_client_hello
@dataclass
class ClientProfile:
name: str
client_hello_version: int
cipher_suites: list[int]
versions: list[int] | None = None # for supported_versions ext (TLS 1.3 capable)
groups: list[int] = field(default_factory=lambda: [0x001d, 0x0017, 0x0018])
PROFILES: list[ClientProfile] = [
ClientProfile(
name="Android 4.4.2",
client_hello_version=C.TLS_1_0,
cipher_suites=[0xc013, 0xc014, 0x002f, 0x0035, 0x000a, 0x0004, 0x0005],
),
ClientProfile(
name="Android 7.0",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Android 12",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d],
),
ClientProfile(
name="Chrome 49",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0x009c, 0x009e, 0xc00a, 0xc009, 0xc013, 0xc014, 0x0033, 0x0039, 0x002f, 0x0035, 0x000a],
),
ClientProfile(
name="Chrome 120",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc02c, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Firefox 115",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1303, 0x1302, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Safari 14",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02c, 0xc02b, 0xcca9, 0xc030, 0xc02f, 0xcca8, 0xc024, 0xc023, 0xc00a, 0xc009, 0xc028, 0xc027, 0xc014, 0xc013, 0x009d, 0x009c, 0x003d, 0x003c, 0x0035, 0x002f],
),
ClientProfile(
name="IE 8 (Win XP)",
client_hello_version=C.SSL_3_0,
cipher_suites=[0x0004, 0x0005, 0x000a, 0x002f, 0x0035],
),
ClientProfile(
name="IE 11 (Win 8.1)",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc030, 0xc02f, 0xc014, 0xc013, 0x009f, 0x009e, 0x0035, 0x002f, 0x000a],
),
ClientProfile(
name="Edge 120",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1301, 0x1302, 0x1303, 0xc02b, 0xc02f, 0xcca9, 0xcca8, 0xc030, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Java 8u291",
client_hello_version=C.TLS_1_2,
cipher_suites=[0xc02b, 0xc02f, 0xc02c, 0xc030, 0xcca9, 0xcca8, 0xc013, 0xc014, 0x009c, 0x009d, 0x002f, 0x0035],
),
ClientProfile(
name="Java 17",
client_hello_version=C.TLS_1_2,
versions=[C.TLS_1_3, C.TLS_1_2],
cipher_suites=[0x1302, 0x1303, 0x1301, 0xc02c, 0xc02b, 0xc030, 0xc02f, 0xcca9, 0xcca8, 0x009d, 0x009c, 0x0035, 0x002f],
),
]
@dataclass
class SimResult:
client: str
connected: bool
negotiated_version: str = ""
negotiated_cipher: str = ""
error: str | None = None
async def simulate(host: str, port: int, sni: str | None) -> list[SimResult]:
results: list[SimResult] = []
for p in PROFILES:
exts_parts = []
if sni and not _is_ip(sni):
try:
exts_parts.append(wire.ext_server_name(sni))
except Exception:
pass
exts_parts.append(wire.ext_ec_point_formats())
exts_parts.append(wire.ext_supported_groups(p.groups))
exts_parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501, 0x0603, 0x0806, 0x0601,
]))
exts_parts.append(wire.ext_renegotiation_info_empty())
if p.versions and C.TLS_1_3 in p.versions:
exts_parts.append(wire.ext_supported_versions_client(p.versions))
exts_parts.append(wire.ext_psk_key_exchange_modes())
exts_parts.append(wire.ext_key_share_empty())
exts = b"".join(exts_parts)
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=p.client_hello_version,
cipher_suites=p.cipher_suites,
extensions=exts,
sni=sni,
)
sh = res.server_hello
if not res.connected or sh is None or sh.alert is not None:
results.append(SimResult(client=p.name, connected=False, error=res.error or "handshake failed"))
continue
v = sh.negotiated_version or sh.server_version
version_name = C.PROTOCOL_NAMES.get(v, f"0x{v:04x}")
cipher_name = C.CIPHER_SUITES.get(sh.cipher_suite, f"0x{sh.cipher_suite:04x}") if sh.cipher_suite else ""
results.append(SimResult(
client=p.name,
connected=True,
negotiated_version=version_name,
negotiated_cipher=cipher_name,
))
return results
def _is_ip(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@@ -0,0 +1,44 @@
from __future__ import annotations
import re
from dataclasses import dataclass
import httpx
@dataclass
class HstsInfo:
present: bool = False
max_age: int = 0
include_subdomains: bool = False
preload: bool = False
raw: str = ""
error: str | None = None
async def fetch_hsts(host: str, port: int, timeout: float = 6.0) -> HstsInfo:
url = f"https://{host}:{port}/" if port != 443 else f"https://{host}/"
try:
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=False,
verify=False,
headers={"User-Agent": "nercone-tls-test/1.0"},
) as client:
resp = await client.get(url)
except Exception as e:
return HstsInfo(error=f"{e.__class__.__name__}: {e}")
header = resp.headers.get("strict-transport-security", "")
if not header:
return HstsInfo(present=False, raw="")
info = HstsInfo(present=True, raw=header)
for token in header.split(";"):
t = token.strip().lower()
if t.startswith("max-age="):
try:
info.max_age = int(t.split("=", 1)[1])
except ValueError:
info.max_age = 0
elif t == "includesubdomains":
info.include_subdomains = True
elif t == "preload":
info.preload = True
return info
@@ -0,0 +1,73 @@
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass, field
import httpx
@dataclass
class HttpInfo:
http1: bool = False
http1_status: int | None = None
http2: bool = False
http3_via_alt_svc: bool = False
alt_svc: str | None = None
server: str | None = None
content_encoding: str | None = None
valid_http_response: bool = False
error: str | None = None
compression_enabled: bool = False
async def probe_http(host: str, port: int, timeout: float = 6.0) -> HttpInfo:
info = HttpInfo()
url = f"https://{host}:{port}/" if port != 443 else f"https://{host}/"
headers = {
"User-Agent": "nercone-tls-test/1.0",
"Accept-Encoding": "gzip, deflate, br",
}
try:
async with httpx.AsyncClient(
timeout=timeout,
verify=False,
follow_redirects=False,
headers=headers,
http1=True,
http2=False,
) as client:
resp = await client.get(url)
info.http1 = True
info.http1_status = resp.status_code
info.valid_http_response = True
info.server = resp.headers.get("server")
info.alt_svc = resp.headers.get("alt-svc")
info.content_encoding = resp.headers.get("content-encoding")
if info.content_encoding:
info.compression_enabled = True
except Exception as e:
info.error = f"http1: {e.__class__.__name__}: {e}"
try:
async with httpx.AsyncClient(
timeout=timeout,
verify=False,
follow_redirects=False,
headers=headers,
http1=False,
http2=True,
) as client:
resp = await client.get(url)
info.http2 = (resp.http_version.upper().startswith("HTTP/2"))
if info.http2:
if not info.alt_svc:
info.alt_svc = resp.headers.get("alt-svc")
if not info.server:
info.server = resp.headers.get("server")
except Exception:
pass
if info.alt_svc and ("h3=" in info.alt_svc or "h3-" in info.alt_svc):
info.http3_via_alt_svc = True
return info
@@ -0,0 +1,47 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
@dataclass
class Http3Info:
supported: bool = False
error: str | None = None
async def probe_http3(host: str, port: int, sni: str | None, timeout: float = 6.0) -> Http3Info:
"""Attempt a QUIC handshake to determine HTTP/3 availability.
Uses aioquic if available. On any failure returns supported=False.
"""
try:
from aioquic.asyncio.client import connect
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import HandshakeCompleted
except Exception as e:
return Http3Info(supported=False, error=f"aioquic unavailable: {e}")
info = Http3Info()
try:
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["h3", "h3-29"],
verify_mode=None,
)
try:
import ssl as _ssl
configuration.verify_mode = _ssl.CERT_NONE
except Exception:
pass
host_for_quic = sni or host
async def _run():
async with connect(host, port, configuration=configuration, server_name=host_for_quic) as client:
await client.wait_connected()
return True
ok = await asyncio.wait_for(_run(), timeout=timeout)
info.supported = bool(ok)
except Exception as e:
info.supported = False
info.error = f"{e.__class__.__name__}: {e}"
return info
@@ -0,0 +1,3 @@
from .check import check_preload, PRELOAD_BROWSERS
__all__ = ["check_preload", "PRELOAD_BROWSERS"]
@@ -0,0 +1,138 @@
from __future__ import annotations
import asyncio
import json
import re
import time
from dataclasses import dataclass
from pathlib import Path
import httpx
CACHE_DIR = Path.cwd() / "databases" / "hsts_preload_cache"
CACHE_TTL = 24 * 60 * 60
CHROME_URL = "https://chromium.googlesource.com/chromium/src/+/refs/heads/main/net/http/transport_security_state_static.json?format=TEXT"
FIREFOX_URL = "https://hg.mozilla.org/mozilla-central/raw-file/tip/security/manager/ssl/nsSTSPreloadList.inc"
PRELOAD_BROWSERS = ["chrome", "firefox", "edge", "ie"]
@dataclass
class PreloadResult:
browser: str
listed: bool
include_subdomains: bool = False
source_error: str | None = None
def _cache_path(name: str) -> Path:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
return CACHE_DIR / f"{name}.json"
def _cache_fresh(p: Path) -> bool:
try:
return p.is_file() and (time.time() - p.stat().st_mtime) < CACHE_TTL
except Exception:
return False
async def _fetch_chrome_list(timeout: float = 30.0) -> dict[str, bool]:
"""Return mapping: host -> include_subdomains.
Chromium's list is served base64-encoded when ?format=TEXT.
"""
cache = _cache_path("chrome")
if _cache_fresh(cache):
try:
return json.loads(cache.read_text())
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.get(CHROME_URL)
if resp.status_code != 200:
return {}
import base64
raw = base64.b64decode(resp.content).decode("utf-8", errors="replace")
except Exception:
return {}
# Strip // comments (json with comments) and parse a loose subset
cleaned = re.sub(r"//[^\n]*", "", raw)
cleaned = re.sub(r"/\*.*?\*/", "", cleaned, flags=re.DOTALL)
result: dict[str, bool] = {}
try:
data = json.loads(cleaned)
for entry in data.get("entries", []):
name = entry.get("name")
if name:
result[name.lower()] = bool(entry.get("include_subdomains", False))
except Exception:
for m in re.finditer(r'"name"\s*:\s*"([^"]+)"\s*,\s*"policy"[^}]*?"include_subdomains"\s*:\s*(true|false)', cleaned):
result[m.group(1).lower()] = m.group(2) == "true"
if result:
try:
cache.write_text(json.dumps(result))
except Exception:
pass
return result
async def _fetch_firefox_list(timeout: float = 30.0) -> dict[str, bool]:
cache = _cache_path("firefox")
if _cache_fresh(cache):
try:
return json.loads(cache.read_text())
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "nercone-tls-test/1.0"}) as client:
resp = await client.get(FIREFOX_URL)
if resp.status_code != 200:
return {}
text = resp.text
except Exception:
return {}
result: dict[str, bool] = {}
# Format: lines like %%\nhost, include_subdomains_bool\n%%
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("/") or line.startswith("%") or line.startswith("#"):
continue
if "," in line:
host, flag = [p.strip() for p in line.split(",", 1)]
result[host.lower()] = "1" in flag or "true" in flag.lower()
if result:
try:
cache.write_text(json.dumps(result))
except Exception:
pass
return result
def _lookup(host: str, table: dict[str, bool]) -> tuple[bool, bool]:
h = host.lower().strip(".")
if h in table:
return True, table[h]
# walk parent domains to catch include_subdomains matches
parts = h.split(".")
for i in range(1, len(parts)):
parent = ".".join(parts[i:])
if parent in table and table[parent]:
return True, True
return False, False
async def check_preload(host: str) -> list[PreloadResult]:
chrome = await _fetch_chrome_list()
firefox = await _fetch_firefox_list()
results: list[PreloadResult] = []
c_listed, c_sub = _lookup(host, chrome)
f_listed, f_sub = _lookup(host, firefox)
results.append(PreloadResult("chrome", c_listed, c_sub, None if chrome else "source unavailable"))
results.append(PreloadResult("firefox", f_listed, f_sub, None if firefox else "source unavailable"))
# Edge effectively uses Chromium's preload list since the Edge (Chromium) release.
results.append(PreloadResult("edge", c_listed, c_sub, None if chrome else "source unavailable"))
# Internet Explorer never maintained its own HSTS preload list; treat same as listed-in-none.
results.append(PreloadResult("ie", False, False, "IE does not support HSTS preload"))
return results
@@ -0,0 +1,145 @@
from __future__ import annotations
import asyncio
import ipaddress
import socket
from dataclasses import dataclass
from . import constants as C
from . import wire
CONNECT_TIMEOUT = 6.0
IO_TIMEOUT = 8.0
@dataclass
class ProbeResult:
connected: bool
server_hello: wire.ParsedServerHello | None
alert: tuple[int, int] | None
raw: bytes
error: str | None = None
def is_ip_literal(host: str) -> bool:
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
async def _open(host: str, port: int, timeout: float = CONNECT_TIMEOUT) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
return await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
async def send_client_hello(
host: str,
port: int,
*,
record_version: int = C.TLS_1_0,
client_hello_version: int = C.TLS_1_2,
cipher_suites: list[int],
extensions: bytes = b"",
sni: str | None = None,
timeout: float = IO_TIMEOUT,
) -> ProbeResult:
try:
reader, writer = await _open(host, port)
except Exception as e:
return ProbeResult(False, None, None, b"", str(e))
try:
ch = wire.build_client_hello(
record_version=record_version,
client_hello_version=client_hello_version,
hostname=sni,
cipher_suites=cipher_suites,
extensions=extensions,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = b""
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
pass
parsed = wire.parse_server_response(data) if data else None
alert = parsed.alert if parsed else None
return ProbeResult(True, parsed, alert, data)
except Exception as e:
return ProbeResult(True, None, None, b"", str(e))
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
async def send_ssl2_client_hello(host: str, port: int) -> ProbeResult:
try:
reader, writer = await _open(host, port)
except Exception as e:
return ProbeResult(False, None, None, b"", str(e))
try:
writer.write(wire.build_ssl2_client_hello())
try:
await asyncio.wait_for(writer.drain(), timeout=IO_TIMEOUT)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=IO_TIMEOUT)
except Exception:
data = b""
# In SSLv2 a SERVER-HELLO response has msg_type 4 in the body.
is_sslv2 = False
if len(data) >= 3:
# 2-byte length header with MSB set is SSLv2, then body[0] = msg type
if data[0] & 0x80:
length = ((data[0] & 0x7f) << 8) | data[1]
if length >= 1 and len(data) >= 3 and data[2] == 4:
is_sslv2 = True
alert = None if is_sslv2 else (0, 0)
parsed = None
return ProbeResult(True, parsed, alert, data)
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
async def tcp_reachable(host: str, port: int, timeout: float = CONNECT_TIMEOUT) -> bool:
try:
_, w = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
try:
w.close()
try:
await asyncio.wait_for(w.wait_closed(), timeout=1.0)
except Exception:
pass
except Exception:
pass
return True
except Exception:
return False
async def resolve_host(host: str) -> str | None:
if is_ip_literal(host):
return host.strip("[]")
loop = asyncio.get_running_loop()
try:
infos = await loop.getaddrinfo(host, None, type=socket.SOCK_STREAM)
except Exception:
return None
for family, _, _, _, sockaddr in infos:
if family == socket.AF_INET:
return sockaddr[0]
for family, _, _, _, sockaddr in infos:
if family == socket.AF_INET6:
return sockaddr[0]
return None
@@ -0,0 +1,158 @@
from __future__ import annotations
SSL_2_0 = 0x0002
SSL_3_0 = 0x0300
TLS_1_0 = 0x0301
TLS_1_1 = 0x0302
TLS_1_2 = 0x0303
TLS_1_3 = 0x0304
PROTOCOL_NAMES = {
SSL_2_0: "SSL 2.0",
SSL_3_0: "SSL 3.0",
TLS_1_0: "TLS 1.0",
TLS_1_1: "TLS 1.1",
TLS_1_2: "TLS 1.2",
TLS_1_3: "TLS 1.3",
}
CT_HANDSHAKE = 22
CT_ALERT = 21
CT_CHANGE_CIPHER_SPEC = 20
CT_APPLICATION_DATA = 23
CT_HEARTBEAT = 24
HS_CLIENT_HELLO = 1
HS_SERVER_HELLO = 2
HS_CERTIFICATE = 11
HS_SERVER_KEY_EXCHANGE = 12
HS_SERVER_HELLO_DONE = 14
HS_HELLO_RETRY_REQUEST = 6
EXT_SERVER_NAME = 0x0000
EXT_STATUS_REQUEST = 0x0005
EXT_SUPPORTED_GROUPS = 0x000a
EXT_EC_POINT_FORMATS = 0x000b
EXT_SIGNATURE_ALGORITHMS = 0x000d
EXT_HEARTBEAT = 0x000f
EXT_ALPN = 0x0010
EXT_SIGNED_CERT_TIMESTAMP = 0x0012
EXT_EXTENDED_MASTER_SECRET = 0x0017
EXT_SESSION_TICKET = 0x0023
EXT_SUPPORTED_VERSIONS = 0x002b
EXT_PSK_KEY_EXCHANGE_MODES = 0x002d
EXT_KEY_SHARE = 0x0033
EXT_RENEGOTIATION_INFO = 0xff01
NAMED_GROUPS = {
0x0017: "secp256r1",
0x0018: "secp384r1",
0x0019: "secp521r1",
0x001d: "x25519",
0x001e: "x448",
0x0100: "ffdhe2048",
0x0101: "ffdhe3072",
0x0102: "ffdhe4096",
0x0103: "ffdhe6144",
0x0104: "ffdhe8192",
0x11ec: "X25519MLKEM768",
0x11eb: "SecP256r1MLKEM768",
0x6399: "X25519Kyber768Draft00",
0x639a: "SecP256r1Kyber768Draft00",
}
PQC_GROUPS = {0x11ec, 0x11eb, 0x6399, 0x639a}
SIGNATURE_ALGORITHMS = {
0x0401: "rsa_pkcs1_sha256",
0x0501: "rsa_pkcs1_sha384",
0x0601: "rsa_pkcs1_sha512",
0x0403: "ecdsa_secp256r1_sha256",
0x0503: "ecdsa_secp384r1_sha384",
0x0603: "ecdsa_secp521r1_sha512",
0x0804: "rsa_pss_rsae_sha256",
0x0805: "rsa_pss_rsae_sha384",
0x0806: "rsa_pss_rsae_sha512",
0x0807: "ed25519",
0x0808: "ed448",
0x0809: "rsa_pss_pss_sha256",
0x080a: "rsa_pss_pss_sha384",
0x080b: "rsa_pss_pss_sha512",
}
# Common cipher suite ID → name (subset but covers everything we check).
CIPHER_SUITES: dict[int, str] = {
# TLS 1.3
0x1301: "TLS_AES_128_GCM_SHA256",
0x1302: "TLS_AES_256_GCM_SHA384",
0x1303: "TLS_CHACHA20_POLY1305_SHA256",
0x1304: "TLS_AES_128_CCM_SHA256",
0x1305: "TLS_AES_128_CCM_8_SHA256",
# ECDHE-RSA AEAD
0xc02f: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
0xc030: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
0xcca8: "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
# ECDHE-ECDSA AEAD
0xc02b: "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
0xc02c: "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
0xcca9: "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256",
# ECDHE CBC (weak)
0xc013: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
0xc014: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
0xc009: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
0xc00a: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
0xc027: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
0xc028: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
# DHE AEAD
0x009e: "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
0x009f: "TLS_DHE_RSA_WITH_AES_256_GCM_SHA384",
0xccaa: "TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
# RSA key exchange (no FS)
0x009c: "TLS_RSA_WITH_AES_128_GCM_SHA256",
0x009d: "TLS_RSA_WITH_AES_256_GCM_SHA384",
0x002f: "TLS_RSA_WITH_AES_128_CBC_SHA",
0x0035: "TLS_RSA_WITH_AES_256_CBC_SHA",
0x003c: "TLS_RSA_WITH_AES_128_CBC_SHA256",
0x003d: "TLS_RSA_WITH_AES_256_CBC_SHA384",
# 3DES (SWEET32)
0x000a: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
0xc012: "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
0x0016: "TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA",
# RC4
0x0004: "TLS_RSA_WITH_RC4_128_MD5",
0x0005: "TLS_RSA_WITH_RC4_128_SHA",
0xc011: "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
0xc007: "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
# NULL
0x0001: "TLS_RSA_WITH_NULL_MD5",
0x0002: "TLS_RSA_WITH_NULL_SHA",
0x003b: "TLS_RSA_WITH_NULL_SHA256",
# EXPORT (FREAK)
0x0008: "TLS_RSA_EXPORT_WITH_DES40_CBC_SHA",
0x0014: "TLS_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
# Anonymous
0x0018: "TLS_DH_anon_WITH_RC4_128_MD5",
0x0034: "TLS_DH_anon_WITH_AES_128_CBC_SHA",
}
# Cipher-suite flag helpers: derived from names.
def cipher_has_fs(name: str) -> bool:
return "ECDHE" in name or "DHE" in name
def cipher_is_aead(name: str) -> bool:
return "GCM" in name or "CHACHA20_POLY1305" in name or "CCM" in name
def cipher_is_weak(name: str) -> bool:
for bad in ("NULL", "EXPORT", "_DES_", "3DES", "RC4", "MD5", "anon"):
if bad in name:
return True
return False
def cipher_is_cbc(name: str) -> bool:
return "_CBC_" in name
def cipher_is_rc4(name: str) -> bool:
return "RC4" in name
def cipher_is_3des(name: str) -> bool:
return "3DES" in name or "_DES_EDE" in name
@@ -0,0 +1,54 @@
from __future__ import annotations
from . import constants as C
from . import wire
from .client import send_client_hello
async def detect_named_groups(host: str, port: int, sni: str | None, tls_version: int = C.TLS_1_3) -> list[int]:
"""Probe which named groups the server accepts for TLS 1.3 key share.
We don't execute a real handshake, we just offer one group at a time and
watch for either a successful ServerHello (with key_share) or HelloRetryRequest
(which tells us the server accepts the version but wants a different group).
"""
accepted: list[int] = []
candidate_groups = list(C.NAMED_GROUPS.keys())
# Use TLS_AES_128_GCM_SHA256 as a minimal 1.3 cipher suite.
cipher_suites = [0x1301, 0x1302, 0x1303]
for g in candidate_groups:
exts = (
(wire.ext_server_name(sni) if sni and not _is_ip(sni) else b"")
+ wire.ext_supported_versions_client([tls_version])
+ wire.ext_supported_groups([g])
+ wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
])
+ wire.ext_psk_key_exchange_modes()
+ wire.ext_key_share_empty()
)
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
cipher_suites=cipher_suites,
extensions=exts,
sni=sni,
)
sh = res.server_hello
if not res.connected or sh is None:
continue
if sh.alert is not None:
continue
if sh.negotiated_version == C.TLS_1_3:
accepted.append(g)
return accepted
def _is_ip(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@@ -0,0 +1,234 @@
from __future__ import annotations
import asyncio
import socket
import ssl
from dataclasses import dataclass
from typing import Any
from . import constants as C
from . import wire
from .client import send_client_hello
SSL_VERSION_TO_CONST = {
C.TLS_1_0: ssl.TLSVersion.TLSv1,
C.TLS_1_1: ssl.TLSVersion.TLSv1_1,
C.TLS_1_2: ssl.TLSVersion.TLSv1_2,
C.TLS_1_3: ssl.TLSVersion.TLSv1_3,
}
@dataclass
class VersionProbe:
version: int
supported: bool
negotiated_cipher: str | None = None
error: str | None = None
def _make_permissive_context(min_v: ssl.TLSVersion, max_v: ssl.TLSVersion, ciphers: str | None = None) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.minimum_version = min_v
ctx.maximum_version = max_v
except (ValueError, OSError):
pass
if ciphers is not None:
try:
ctx.set_ciphers(ciphers)
except ssl.SSLError:
pass
# Allow legacy server connections (e.g. missing renegotiation_info for TLS 1.0 servers).
try:
ctx.options &= ~ssl.OP_NO_SSLv3
except Exception:
pass
try:
# SECLEVEL=0 allows legacy/weak ciphers for enumeration.
cur_ciphers = ciphers if ciphers else "ALL:@SECLEVEL=0"
ctx.set_ciphers(cur_ciphers)
except ssl.SSLError:
pass
return ctx
async def probe_tls_version(host: str, port: int, sni: str | None, version: int, timeout: float = 8.0) -> VersionProbe:
if version not in SSL_VERSION_TO_CONST:
return VersionProbe(version=version, supported=False, error="unsupported version constant")
tls_ver = SSL_VERSION_TO_CONST[version]
ctx = _make_permissive_context(tls_ver, tls_ver)
loop = asyncio.get_running_loop()
def _do() -> tuple[bool, str | None, str | None]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
try:
cipher = ssock.cipher()
return True, cipher[0] if cipher else None, None
except Exception as e:
return True, None, str(e)
except ssl.SSLError as e:
return False, None, f"{e.__class__.__name__}: {e}"
except (OSError, socket.timeout, asyncio.TimeoutError) as e:
return False, None, f"{e.__class__.__name__}: {e}"
except Exception as e:
return False, None, f"{e.__class__.__name__}: {e}"
ok, cipher, err = await loop.run_in_executor(None, _do)
return VersionProbe(version=version, supported=ok, negotiated_cipher=cipher, error=err)
@dataclass
class CipherProbe:
version: int
cipher_id: int
cipher_name: str
supported: bool
async def probe_cipher(host: str, port: int, sni: str | None, version: int, cipher_id: int, cipher_name: str) -> CipherProbe:
"""Enumerate whether a given cipher suite is accepted under a given TLS version.
For TLS 1.2 and below we use the ssl module with set_ciphers().
For TLS 1.3 we use set_ciphersuites() equivalents via the OpenSSL names.
Falls back to False on any handshake error.
"""
if version == C.TLS_1_3:
# TLS 1.3 only recognizes cipher suites by their IANA name in OpenSSL.
name_map = {
0x1301: "TLS_AES_128_GCM_SHA256",
0x1302: "TLS_AES_256_GCM_SHA384",
0x1303: "TLS_CHACHA20_POLY1305_SHA256",
0x1304: "TLS_AES_128_CCM_SHA256",
0x1305: "TLS_AES_128_CCM_8_SHA256",
}
openssl_name = name_map.get(cipher_id)
if not openssl_name:
return CipherProbe(version, cipher_id, cipher_name, False)
tls_ver = SSL_VERSION_TO_CONST[version]
ctx = _make_permissive_context(tls_ver, tls_ver)
try:
ctx.set_ciphersuites(openssl_name)
except Exception:
return CipherProbe(version, cipher_id, cipher_name, False)
loop = asyncio.get_running_loop()
def _do() -> bool:
try:
with socket.create_connection((host, port), timeout=6.0) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
actual = ssock.cipher()
return bool(actual and actual[0] == openssl_name)
except Exception:
return False
ok = await loop.run_in_executor(None, _do)
return CipherProbe(version, cipher_id, cipher_name, ok)
# For < TLS 1.3: send a ClientHello offering only this suite.
exts = b""
parts = []
if sni:
try:
parts.append(wire.ext_server_name(sni))
except Exception:
pass
parts.append(wire.ext_ec_point_formats())
parts.append(wire.ext_supported_groups([0x001d, 0x0017, 0x0018]))
parts.append(wire.ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601,
]))
parts.append(wire.ext_renegotiation_info_empty())
exts = b"".join(parts)
res = await send_client_hello(
host, port,
record_version=C.TLS_1_0,
client_hello_version=version,
cipher_suites=[cipher_id],
extensions=exts,
sni=sni,
)
if not res.connected:
return CipherProbe(version, cipher_id, cipher_name, False)
sh = res.server_hello
if sh is None or sh.alert is not None:
return CipherProbe(version, cipher_id, cipher_name, False)
ok = sh.cipher_suite == cipher_id and sh.server_version == version
return CipherProbe(version, cipher_id, cipher_name, ok)
async def get_peer_certificate_chain(host: str, port: int, sni: str | None, timeout: float = 8.0) -> list[bytes]:
"""Return DER-encoded peer certificate chain using the ssl module.
Returns the leaf certificate first, then intermediates. Empty on failure.
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
loop = asyncio.get_running_loop()
def _do() -> list[bytes]:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
leaf = ssock.getpeercert(binary_form=True)
chain: list[bytes] = [leaf] if leaf else []
# Intermediate chain via internal API (CPython 3.13+) if available
try:
verified = ssock.get_verified_chain()
if verified:
chain = list(verified)
except (AttributeError, ssl.SSLError):
try:
unverified = ssock.get_unverified_chain()
if unverified:
chain = list(unverified)
except (AttributeError, ssl.SSLError):
pass
return chain
except Exception:
return []
return await loop.run_in_executor(None, _do)
async def alpn_negotiate(host: str, port: int, sni: str | None, alpn_list: list[str], timeout: float = 6.0) -> str | None:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
ctx.set_alpn_protocols(alpn_list)
except NotImplementedError:
return None
try:
ctx.set_ciphers("ALL:@SECLEVEL=0")
except ssl.SSLError:
pass
loop = asyncio.get_running_loop()
def _do() -> str | None:
try:
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=sni) as ssock:
return ssock.selected_alpn_protocol()
except Exception:
return None
return await loop.run_in_executor(None, _do)
async def get_ocsp_stapling(host: str, port: int, sni: str | None, timeout: float = 6.0) -> bool:
"""Detect whether server returns OCSP stapled response.
Python's ssl module does not expose stapled OCSP directly. We use a minimal raw probe to look
for `status_request` extension with a CertificateStatus record, but that is expensive to
reimplement here. As a conservative default, we return False unless we can detect the
status_request_v2 extension via OpenSSL; in that case we report based on whether openssl
socket has stapling info. Since stdlib has no accessor, we always return False.
"""
return False
@@ -0,0 +1,260 @@
from __future__ import annotations
import os
import struct
from dataclasses import dataclass
from typing import Any
from . import constants as C
def _u8(x: int) -> bytes:
return struct.pack("!B", x)
def _u16(x: int) -> bytes:
return struct.pack("!H", x)
def _u24(x: int) -> bytes:
return struct.pack("!I", x)[1:]
def build_extension(ext_type: int, data: bytes) -> bytes:
return _u16(ext_type) + _u16(len(data)) + data
def ext_server_name(hostname: str) -> bytes:
name = hostname.encode("idna")
entry = _u8(0) + _u16(len(name)) + name
lst = _u16(len(entry)) + entry
return build_extension(C.EXT_SERVER_NAME, lst)
def ext_supported_versions_client(versions: list[int]) -> bytes:
body = _u8(2 * len(versions)) + b"".join(_u16(v) for v in versions)
return build_extension(C.EXT_SUPPORTED_VERSIONS, body)
def ext_supported_groups(groups: list[int]) -> bytes:
body = _u16(2 * len(groups)) + b"".join(_u16(g) for g in groups)
return build_extension(C.EXT_SUPPORTED_GROUPS, body)
def ext_signature_algorithms(algos: list[int]) -> bytes:
body = _u16(2 * len(algos)) + b"".join(_u16(a) for a in algos)
return build_extension(C.EXT_SIGNATURE_ALGORITHMS, body)
def ext_ec_point_formats() -> bytes:
return build_extension(C.EXT_EC_POINT_FORMATS, _u8(1) + _u8(0))
def ext_key_share_empty() -> bytes:
return build_extension(C.EXT_KEY_SHARE, _u16(0))
def ext_psk_key_exchange_modes() -> bytes:
return build_extension(C.EXT_PSK_KEY_EXCHANGE_MODES, _u8(1) + _u8(1))
def ext_alpn(protos: list[bytes]) -> bytes:
inner = b"".join(_u8(len(p)) + p for p in protos)
return build_extension(C.EXT_ALPN, _u16(len(inner)) + inner)
def ext_heartbeat_enabled() -> bytes:
return build_extension(C.EXT_HEARTBEAT, _u8(1))
def ext_renegotiation_info_empty() -> bytes:
return build_extension(C.EXT_RENEGOTIATION_INFO, _u8(0))
def ext_status_request() -> bytes:
# status_type=OCSP(1) + empty responder_id_list + empty extensions
body = _u8(1) + _u16(0) + _u16(0)
return build_extension(C.EXT_STATUS_REQUEST, body)
def ext_signed_cert_timestamp() -> bytes:
return build_extension(C.EXT_SIGNED_CERT_TIMESTAMP, b"")
def ext_extended_master_secret() -> bytes:
return build_extension(C.EXT_EXTENDED_MASTER_SECRET, b"")
def build_client_hello(
record_version: int,
client_hello_version: int,
hostname: str | None,
cipher_suites: list[int],
extensions: bytes = b"",
compression: bytes = b"\x01\x00", # 1 length, null method
) -> bytes:
random_bytes = os.urandom(32)
session_id = b""
cs_bytes = b"".join(_u16(c) for c in cipher_suites)
body = (
_u16(client_hello_version)
+ random_bytes
+ _u8(len(session_id))
+ session_id
+ _u16(len(cs_bytes))
+ cs_bytes
+ compression
)
if extensions:
body += _u16(len(extensions)) + extensions
handshake = _u8(C.HS_CLIENT_HELLO) + _u24(len(body)) + body
record = _u8(C.CT_HANDSHAKE) + _u16(record_version) + _u16(len(handshake)) + handshake
return record
def default_ch_extensions(hostname: str, versions: list[int], groups: list[int] | None = None) -> bytes:
groups = groups or [0x001d, 0x0017, 0x0018]
parts = []
if hostname and not _is_ip_literal(hostname):
parts.append(ext_server_name(hostname))
parts.append(ext_ec_point_formats())
parts.append(ext_supported_groups(groups))
parts.append(ext_signature_algorithms([
0x0403, 0x0804, 0x0401, 0x0503, 0x0805, 0x0501,
0x0603, 0x0806, 0x0601, 0x0807, 0x0808,
]))
parts.append(ext_renegotiation_info_empty())
parts.append(ext_signed_cert_timestamp())
parts.append(ext_status_request())
parts.append(ext_extended_master_secret())
parts.append(ext_alpn([b"h2", b"http/1.1"]))
if C.TLS_1_3 in versions:
parts.append(ext_supported_versions_client(versions))
parts.append(ext_psk_key_exchange_modes())
parts.append(ext_key_share_empty())
exts = b"".join(parts)
return _u16(len(exts)) + exts if False else exts
def _is_ip_literal(host: str) -> bool:
import ipaddress
try:
ipaddress.ip_address(host.strip("[]"))
return True
except ValueError:
return False
@dataclass
class ParsedServerHello:
record_version: int
server_version: int
cipher_suite: int | None
alert: tuple[int, int] | None
raw_record: bytes
handshake_type: int | None
server_random: bytes | None = None
session_id: bytes | None = None
extensions: dict[int, bytes] | None = None
negotiated_version: int | None = None
key_share_group: int | None = None
def parse_server_response(data: bytes) -> ParsedServerHello | None:
"""Parse the first TLS record. Returns None if insufficient data or garbage."""
if len(data) < 5:
return None
ct = data[0]
rec_ver = (data[1] << 8) | data[2]
rec_len = (data[3] << 8) | data[4]
body = data[5:5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=(body[0], body[1]),
raw_record=data,
handshake_type=None,
)
if ct != C.CT_HANDSHAKE or len(body) < 4:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=None,
raw_record=data,
handshake_type=None,
)
hs_type = body[0]
hs_len = (body[1] << 16) | (body[2] << 8) | body[3]
hs = body[4:4 + hs_len]
if hs_type != C.HS_SERVER_HELLO or len(hs) < 38:
return ParsedServerHello(
record_version=rec_ver,
server_version=0,
cipher_suite=None,
alert=None,
raw_record=data,
handshake_type=hs_type,
)
server_version = (hs[0] << 8) | hs[1]
server_random = hs[2:34]
sess_len = hs[34]
off = 35 + sess_len
if len(hs) < off + 3:
return None
session_id = hs[35:35 + sess_len]
cs = (hs[off] << 8) | hs[off + 1]
off += 2
# comp method (1 byte)
off += 1
ext_map: dict[int, bytes] = {}
negotiated_version = server_version
key_share_group = None
if off + 2 <= len(hs):
ext_total = (hs[off] << 8) | hs[off + 1]
off += 2
ext_end = off + ext_total
while off + 4 <= ext_end:
et = (hs[off] << 8) | hs[off + 1]
el = (hs[off + 2] << 8) | hs[off + 3]
off += 4
ext_data = hs[off:off + el]
off += el
ext_map[et] = ext_data
if et == C.EXT_SUPPORTED_VERSIONS and len(ext_data) >= 2:
negotiated_version = (ext_data[0] << 8) | ext_data[1]
elif et == C.EXT_KEY_SHARE and len(ext_data) >= 2:
key_share_group = (ext_data[0] << 8) | ext_data[1]
return ParsedServerHello(
record_version=rec_ver,
server_version=server_version,
cipher_suite=cs,
alert=None,
raw_record=data,
handshake_type=hs_type,
server_random=server_random,
session_id=session_id,
extensions=ext_map,
negotiated_version=negotiated_version,
key_share_group=key_share_group,
)
def build_ssl2_client_hello(ciphers: list[int] | None = None) -> bytes:
"""Craft an SSLv2 ClientHello used to detect DROWN / SSLv2 support."""
# SSLv2 cipher specs are 3-byte. A minimal set:
if ciphers is None:
# SSL_CK_RC4_128_WITH_MD5, SSL_CK_RC4_128_EXPORT40_WITH_MD5, SSL_CK_DES_192_EDE3_CBC_WITH_MD5
ciphers = [0x010080, 0x020080, 0x0700c0]
challenge = os.urandom(16)
cipher_bytes = b"".join(struct.pack("!I", c)[1:] for c in ciphers)
body = (
_u8(1) # MSG-CLIENT-HELLO
+ _u16(0x0002) # SSLv2 version
+ _u16(len(cipher_bytes)) # cipher specs length
+ _u16(0) # session-id length
+ _u16(len(challenge)) # challenge length
+ cipher_bytes
+ challenge
)
header = struct.pack("!H", 0x8000 | len(body))
return header + body
@@ -0,0 +1,43 @@
from __future__ import annotations
from dataclasses import dataclass
from .db import TlsTestDB
HOURLY_LIMIT = 10
HOURLY_WINDOW = 60 * 60
@dataclass
class RateDecision:
allowed: bool
reason: str = ""
retry_after: int = 0
def check(db: TlsTestDB, client_ip: str | None) -> RateDecision:
if not client_ip:
return RateDecision(allowed=True)
if db.count_ip_active(client_ip) > 0:
return RateDecision(
allowed=False,
reason="同一IPから実行中/待機中のテストが既にあります。完了してから再度お試しください。",
retry_after=60,
)
recent = db.count_ip_in_window(client_ip, HOURLY_WINDOW)
if recent >= HOURLY_LIMIT:
return RateDecision(
allowed=False,
reason=f"1時間あたり{HOURLY_LIMIT}件の上限に達しました。時間をおいて再度お試しください。",
retry_after=HOURLY_WINDOW,
)
return RateDecision(allowed=True)
def client_ip_from_scope(scope) -> str | None:
headers = dict(scope.get("headers", []))
xff = headers.get(b"x-forwarded-for", b"").decode(errors="ignore")
if xff:
return xff.split(",")[0].strip()
client = scope.get("client")
if client and len(client) >= 1:
return str(client[0])
return None
@@ -0,0 +1,167 @@
from __future__ import annotations
import asyncio
import json
import time
import uuid
import logging
from typing import Any, Callable, Awaitable
from fastapi import WebSocket
from .db import TlsTestDB
from .schemas import ScanResult, Finding, ProgressMessage
logger = logging.getLogger("tls_test.runner")
MAX_CONCURRENT = 5
ReportProgress = Callable[[str, str, float, str], Awaitable[None]]
EngineFn = Callable[[str, ReportProgress, Callable[[Finding], Awaitable[None]]], Awaitable[ScanResult]]
class TlsJobQueue:
def __init__(self, db: TlsTestDB, engine: EngineFn):
self.db = db
self.engine = engine
self._queue: asyncio.Queue[tuple[str, str]] = asyncio.Queue()
self._semaphore = asyncio.Semaphore(MAX_CONCURRENT)
self._subscribers: dict[str, set[WebSocket]] = {}
self._seq: dict[str, int] = {}
self._tasks: set[asyncio.Task] = set()
self._dispatcher_task: asyncio.Task | None = None
self._cleanup_task: asyncio.Task | None = None
self._closed = False
async def start(self) -> None:
self._dispatcher_task = asyncio.create_task(self._dispatcher())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self) -> None:
self._closed = True
if self._dispatcher_task:
self._dispatcher_task.cancel()
if self._cleanup_task:
self._cleanup_task.cancel()
for t in list(self._tasks):
t.cancel()
await asyncio.gather(*(t for t in self._tasks), return_exceptions=True)
def submit(self, target: str, client_ip: str | None) -> str:
test_id = str(uuid.uuid4())
self.db.create_job(test_id, target, client_ip)
self._queue.put_nowait((test_id, target))
return test_id
async def _dispatcher(self) -> None:
while not self._closed:
try:
test_id, target = await self._queue.get()
except asyncio.CancelledError:
return
task = asyncio.create_task(self._run_one(test_id, target))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
async def _run_one(self, test_id: str, target: str) -> None:
async with self._semaphore:
await self._broadcast(test_id, {"type": "started", "target": target})
self.db.update_status(test_id, "running", started_at=int(time.time()))
seq_ref = {"n": 0}
async def report_progress(phase: str, detail: str, progress: float, severity: str = "info") -> None:
seq_ref["n"] += 1
self.db.append_progress(test_id, seq_ref["n"], phase, detail, progress, severity)
await self._broadcast(
test_id,
{
"type": "progress",
"phase": phase,
"detail": detail,
"progress": progress,
"severity": severity,
},
)
async def report_finding(f: Finding) -> None:
await self._broadcast(
test_id,
{"type": "finding", "finding": f.to_dict()},
)
try:
result = await self.engine(target, report_progress, report_finding)
payload = result.to_dict()
self.db.update_status(
test_id,
"done",
finished_at=int(time.time()),
rank=result.rank,
score=result.score,
result_json=json.dumps(payload, ensure_ascii=False),
)
await self._broadcast(
test_id,
{
"type": "done",
"redirect": f"/tools/tls-test/results/{test_id}/",
"rank": result.rank,
"score": result.score,
},
)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("tls-test engine failed for %s", target)
self.db.update_status(
test_id,
"error",
finished_at=int(time.time()),
error_message=str(e),
)
await self._broadcast(
test_id,
{"type": "error", "message": str(e)},
)
finally:
await self._close_subscribers(test_id)
async def _cleanup_loop(self) -> None:
while not self._closed:
try:
self.db.delete_expired()
except Exception:
logger.exception("tls-test expired cleanup failed")
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
return
def add_subscriber(self, test_id: str, ws: WebSocket) -> None:
self._subscribers.setdefault(test_id, set()).add(ws)
def remove_subscriber(self, test_id: str, ws: WebSocket) -> None:
subs = self._subscribers.get(test_id)
if subs and ws in subs:
subs.discard(ws)
if not subs:
self._subscribers.pop(test_id, None)
async def _broadcast(self, test_id: str, payload: dict[str, Any]) -> None:
subs = list(self._subscribers.get(test_id, set()))
if not subs:
return
text = json.dumps(payload, ensure_ascii=False)
for ws in subs:
try:
await ws.send_text(text)
except Exception:
self.remove_subscriber(test_id, ws)
async def _close_subscribers(self, test_id: str) -> None:
subs = list(self._subscribers.get(test_id, set()))
for ws in subs:
try:
await ws.close()
except Exception:
pass
self._subscribers.pop(test_id, None)
@@ -0,0 +1,86 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any
SEVERITIES = ("good", "normal", "notgood", "bad", "serious", "info")
SEVERITY_LABELS = {
"good": "Good",
"normal": "Normal",
"notgood": "Not Good",
"bad": "Bad",
"serious": "Serious",
"info": "Info",
}
SEVERITY_COLORS = {
"good": "bright-green",
"normal": "bright-yellow",
"notgood": "bright-orange",
"bad": "bright-red",
"serious": "magenta",
"info": "tx-alt",
}
@dataclass
class Finding:
category: str
title: str
detail: str = ""
severity: str = "info"
weight: int = 0
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
d["severity_label"] = SEVERITY_LABELS.get(self.severity, self.severity)
d["color"] = SEVERITY_COLORS.get(self.severity, "tx")
return d
@dataclass
class ScanResult:
target: str
host: str
port: int
started_at: float
finished_at: float = 0.0
findings: list[Finding] = field(default_factory=list)
data: dict[str, Any] = field(default_factory=dict)
rank: str = ""
score: float = 0.0
error: str | None = None
def add(self, f: Finding) -> None:
self.findings.append(f)
def to_dict(self) -> dict[str, Any]:
return {
"target": self.target,
"host": self.host,
"port": self.port,
"started_at": self.started_at,
"finished_at": self.finished_at,
"duration": max(0.0, self.finished_at - self.started_at),
"findings": [f.to_dict() for f in self.findings],
"data": self.data,
"rank": self.rank,
"score": self.score,
"error": self.error,
}
@dataclass
class ProgressMessage:
phase: str
detail: str
progress: float
severity: str = "info"
finding: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
return {
"phase": self.phase,
"detail": self.detail,
"progress": self.progress,
"severity": self.severity,
"finding": self.finding,
}
@@ -0,0 +1,87 @@
from __future__ import annotations
from .schemas import Finding, ScanResult
SEVERITY_MULTIPLIER = {
"good": 0.0,
"normal": 0.0,
"notgood": 1.0,
"bad": 3.0,
"serious": 9.0,
"info": 0.0,
}
# score >= threshold -> rank
RANK_THRESHOLDS: list[tuple[float, str]] = [
(99.0, "SSS"),
(95.0, "SS"),
(90.0, "S"),
(85.0, "A"),
(80.0, "B"),
(75.0, "C"),
(70.0, "D"),
(65.0, "E"),
(60.0, "F"),
(55.0, "G"),
(50.0, "H"),
(45.0, "I"),
(40.0, "J"),
(35.0, "K"),
(30.0, "L"),
(25.0, "M"),
(20.0, "N"),
(15.0, "O"),
(10.0, "P"),
(5.0, "Q"),
]
RANK_COLOR = {
"SSS": "bright-green",
"SS": "bright-green",
"S": "bright-green",
"A": "green",
"B": "green",
"C": "green",
"D": "bright-yellow",
"E": "bright-yellow",
"F": "bright-yellow",
"G": "yellow",
"H": "yellow",
"I": "yellow",
"J": "bright-orange",
"K": "bright-orange",
"L": "bright-orange",
"M": "orange",
"N": "orange",
"O": "bright-red",
"P": "bright-red",
"Q": "red",
"R": "purple",
}
def compute_score(findings: list[Finding]) -> float:
penalty = 0.0
for f in findings:
penalty += SEVERITY_MULTIPLIER.get(f.severity, 0.0) * float(f.weight)
score = 100.0 - penalty
if score < 0:
score = 0.0
if score > 100:
score = 100.0
return round(score, 2)
def rank_from_score(score: float) -> str:
for thr, r in RANK_THRESHOLDS:
if score >= thr:
return r
return "R"
def apply_rank(result: ScanResult) -> None:
if result.error == "no_tls":
result.score = 0.0
result.rank = "R"
return
result.score = compute_score(result.findings)
result.rank = rank_from_score(result.score)
@@ -0,0 +1,83 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
from ..protocol import constants as C
from ..protocol.constants import (
cipher_is_3des,
cipher_is_rc4,
cipher_is_weak,
cipher_is_cbc,
cipher_has_fs,
cipher_is_aead,
)
@dataclass
class VulnPassiveResult:
poodle_ssl: bool = False
freak: bool = False
drown: bool = False
logjam_export: bool = False
sweet32: bool = False
rc4: bool = False
beast: bool = False
lucky13: bool = False
null_cipher: bool = False
anon_cipher: bool = False
export_cipher: bool = False
secure_renego_supported: bool | None = None
fallback_scsv_supported: bool | None = None
issues: list[str] = field(default_factory=list)
def analyze(
versions_supported: set[int],
accepted_ciphers_per_version: dict[int, list[int]],
) -> VulnPassiveResult:
r = VulnPassiveResult()
# DROWN: any SSLv2 support is a direct DROWN exposure.
if C.SSL_2_0 in versions_supported:
r.drown = True
# POODLE (SSL 3.0): SSL 3.0 supported with any CBC cipher.
if C.SSL_3_0 in versions_supported:
ciphers = accepted_ciphers_per_version.get(C.SSL_3_0, [])
for cid in ciphers:
name = C.CIPHER_SUITES.get(cid, "")
if cipher_is_cbc(name):
r.poodle_ssl = True
break
# BEAST (TLS 1.0): TLS 1.0 supported with CBC ciphers.
if C.TLS_1_0 in versions_supported:
ciphers = accepted_ciphers_per_version.get(C.TLS_1_0, [])
for cid in ciphers:
name = C.CIPHER_SUITES.get(cid, "")
if cipher_is_cbc(name):
r.beast = True
break
# Sweep all ciphers across all versions for cipher-family flags.
for ver, cids in accepted_ciphers_per_version.items():
for cid in cids:
name = C.CIPHER_SUITES.get(cid, "")
if not name:
continue
if cipher_is_3des(name):
r.sweet32 = True
if cipher_is_rc4(name):
r.rc4 = True
if "NULL" in name:
r.null_cipher = True
if "anon" in name:
r.anon_cipher = True
if "EXPORT" in name:
r.export_cipher = True
r.freak = True
if "DHE" in name:
r.logjam_export = True
if cipher_is_cbc(name) and ver in (C.TLS_1_0, C.TLS_1_1, C.TLS_1_2):
r.lucky13 = True
return r
@@ -0,0 +1,80 @@
from __future__ import annotations
import asyncio
import struct
from ..protocol import constants as C
from ..protocol import wire
async def probe(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""CVE-2014-0224 CCS Injection probe.
Send ChangeCipherSpec immediately after ClientHello. A vulnerable server
will accept and not send an Unexpected Message alert.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc013, 0xc014, 0x002f, 0x0035, 0x000a],
extensions=exts,
)
writer.write(ch)
# Send early ChangeCipherSpec
ccs_record = bytes([C.CT_CHANGE_CIPHER_SPEC]) + struct.pack("!H", C.TLS_1_2) + struct.pack("!H", 1) + b"\x01"
writer.write(ccs_record)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
data = b""
if not data:
return False, "no response (likely dropped connection)"
alerts = _find_alerts(data)
# Any fatal alert (level=2) indicates the server rejected the early CCS: NOT vulnerable.
if any(lv == 2 for lv, _desc in alerts):
return False, "server sent fatal alert (rejected early CCS)"
# Explicit unexpected_message alert (10): NOT vulnerable.
if any(desc == 10 for _lv, desc in alerts):
return False, "server rejected with unexpected_message alert"
# Otherwise: no definitive signal (server may have ignored CCS or handshake still pending).
# Conservative default: not vulnerable. True detection requires a complete handshake exchange.
return False, "no definitive signal of CCS acceptance"
except Exception as e:
return False, f"{e.__class__.__name__}: {e}"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
def _find_alerts(data: bytes) -> list[tuple[int, int]]:
alerts: list[tuple[int, int]] = []
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
body = data[i + 5:i + 5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2:
alerts.append((body[0], body[1]))
i += 5 + rec_len
if rec_len == 0:
break
return alerts
@@ -0,0 +1,86 @@
from __future__ import annotations
import asyncio
import os
import struct
from dataclasses import dataclass
from ..protocol import constants as C
from ..protocol import wire
@dataclass
class HeartbleedResult:
vulnerable: bool
heartbeat_extension_advertised: bool
error: str | None = None
def _build_heartbeat_record(version: int = C.TLS_1_2, claimed_length: int = 0x4000) -> bytes:
payload = b"\x01" + struct.pack("!H", claimed_length) + b"X" + b"\x00" * 16
return bytes([C.CT_HEARTBEAT]) + struct.pack("!H", version) + struct.pack("!H", len(payload)) + payload
async def probe(host: str, port: int, sni: str | None, timeout: float = 8.0) -> HeartbleedResult:
"""Send TLS 1.2 ClientHello with heartbeat extension, then a malformed heartbeat.
If the server replies with a heartbeat response longer than what we sent
(or any response to the malformed message) it's likely vulnerable.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return HeartbleedResult(False, False, error=f"connect: {e}")
try:
ext_parts = [
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b""),
wire.ext_ec_point_formats(),
wire.ext_supported_groups([0x001d, 0x0017, 0x0018]),
wire.ext_signature_algorithms([0x0403, 0x0804, 0x0401]),
wire.ext_heartbeat_enabled(),
wire.ext_renegotiation_info_empty(),
]
exts = b"".join(ext_parts)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc02f, 0xc030, 0x009c, 0x009d, 0x002f, 0x0035, 0x000a],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
data = b""
try:
data = await asyncio.wait_for(reader.read(65536), timeout=timeout)
except Exception:
pass
heartbeat_ext = False
if data:
parsed = wire.parse_server_response(data)
if parsed and parsed.extensions and C.EXT_HEARTBEAT in parsed.extensions:
heartbeat_ext = True
writer.write(_build_heartbeat_record())
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
reply = await asyncio.wait_for(reader.read(65536), timeout=3.0)
except Exception:
reply = b""
vulnerable = False
if reply and reply[0] == C.CT_HEARTBEAT and len(reply) > 8:
vulnerable = True
return HeartbleedResult(vulnerable=vulnerable, heartbeat_extension_advertised=heartbeat_ext)
except Exception as e:
return HeartbleedResult(False, False, error=f"{e.__class__.__name__}: {e}")
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.5)
except Exception:
pass
@@ -0,0 +1,111 @@
from __future__ import annotations
import asyncio
from ..protocol import constants as C
from ..protocol import wire
async def probe_secure_renegotiation(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Check whether the server returns the renegotiation_info extension (RFC 5746).
Returns (secure_renego_supported, error).
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_2,
hostname=sni,
cipher_suites=[0xc02f, 0xc030, 0xc013, 0xc014, 0x002f, 0x0035, 0x000a],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(8192), timeout=timeout)
except Exception:
data = b""
if not data:
return False, "no response"
parsed = wire.parse_server_response(data)
if parsed is None or parsed.extensions is None:
return False, "no server hello"
# Either renegotiation_info extension or SCSV cipher (0x00ff) acknowledgment.
if C.EXT_RENEGOTIATION_INFO in parsed.extensions:
return True, None
return False, "no renegotiation_info extension"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass
async def probe_fallback_scsv(host: str, port: int, sni: str | None, timeout: float = 6.0) -> tuple[bool, str | None]:
"""Check whether the server rejects TLS_FALLBACK_SCSV.
We offer TLS 1.1 + SCSV (0x5600); if the server supports a higher version,
it MUST respond with inappropriate_fallback(86) alert.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
except Exception as e:
return False, f"connect: {e}"
try:
exts = (
(wire.ext_server_name(sni) if sni and not wire._is_ip_literal(sni) else b"")
+ wire.ext_ec_point_formats()
+ wire.ext_supported_groups([0x001d, 0x0017, 0x0018])
+ wire.ext_signature_algorithms([0x0403, 0x0401])
+ wire.ext_renegotiation_info_empty()
)
ch = wire.build_client_hello(
record_version=C.TLS_1_0,
client_hello_version=C.TLS_1_1,
hostname=sni,
cipher_suites=[0x5600, 0xc013, 0xc014, 0x002f, 0x0035],
extensions=exts,
)
writer.write(ch)
try:
await asyncio.wait_for(writer.drain(), timeout=timeout)
except Exception:
pass
try:
data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
except Exception:
data = b""
if not data:
return False, "no response"
# Look for alert 86 (inappropriate_fallback) = supported (good).
i = 0
while i + 5 <= len(data):
ct = data[i]
rec_len = (data[i + 3] << 8) | data[i + 4]
body = data[i + 5:i + 5 + rec_len]
if ct == C.CT_ALERT and len(body) >= 2 and body[1] == 86:
return True, None
i += 5 + rec_len
if rec_len == 0:
break
return False, "no inappropriate_fallback alert"
finally:
try:
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except Exception:
pass