Spaces:
Sleeping
Sleeping
Upload main.py
Browse files
main.py
CHANGED
|
@@ -1,6 +1,6 @@
|
|
| 1 |
-
import os, re, json, time
|
| 2 |
from dataclasses import dataclass
|
| 3 |
-
from typing import List, Dict, Tuple
|
| 4 |
|
| 5 |
import gradio as gr
|
| 6 |
|
|
@@ -36,12 +36,16 @@ class UrlResult:
|
|
| 36 |
url: str
|
| 37 |
risk: float
|
| 38 |
reasons: List[str]
|
|
|
|
| 39 |
|
| 40 |
@dataclass
|
| 41 |
class EmailResult:
|
| 42 |
-
p_email: float
|
| 43 |
kw_hits: List[str]
|
| 44 |
-
strong_hits: List[str]
|
|
|
|
|
|
|
|
|
|
| 45 |
|
| 46 |
# =========================
|
| 47 |
# URL extraction & heuristics (swap with your real URL model when ready)
|
|
@@ -69,24 +73,39 @@ def url_host(url: str) -> str:
|
|
| 69 |
return host
|
| 70 |
|
| 71 |
def score_url_heuristic(url: str) -> UrlResult:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 72 |
host = url_host(url)
|
| 73 |
-
score = 0.
|
| 74 |
reasons = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 75 |
|
| 76 |
if len(url) > 140:
|
| 77 |
-
|
| 78 |
if "@" in url or "%" in url:
|
| 79 |
-
|
| 80 |
if any(host.endswith(t) for t in SUSPICIOUS_TLDS):
|
| 81 |
-
|
| 82 |
if any(s in host for s in SHORTENERS):
|
| 83 |
-
|
| 84 |
if host.count(".") >= 3:
|
| 85 |
-
|
| 86 |
if len(re.findall(r"[A-Z]", url)) > 16:
|
| 87 |
-
|
| 88 |
|
| 89 |
-
|
|
|
|
| 90 |
|
| 91 |
def score_urls(urls: List[str]) -> List[UrlResult]:
|
| 92 |
return [score_url_heuristic(u) for u in urls]
|
|
@@ -97,6 +116,8 @@ def score_urls(urls: List[str]) -> List[UrlResult]:
|
|
| 97 |
_tokenizer = None
|
| 98 |
_model = None
|
| 99 |
_model_loaded_from = None # "classifier", "backbone", or None
|
|
|
|
|
|
|
| 100 |
|
| 101 |
# Strong vs normal cues (lowercase)
|
| 102 |
STRONG_CUES = [
|
|
@@ -119,12 +140,14 @@ LEXICAL_CUES = sorted(set(STRONG_CUES + NORMAL_CUES))
|
|
| 119 |
def load_email_model() -> Tuple[object, object, str]:
|
| 120 |
"""Try to load EMAIL_CLASSIFIER_ID; on failure, fall back to backbone with small head.
|
| 121 |
Apply dynamic int8 quantization for CPU if available."""
|
| 122 |
-
global _tokenizer, _model, _model_loaded_from
|
| 123 |
if _tokenizer is not None and _model is not None:
|
| 124 |
return _tokenizer, _model, _model_loaded_from
|
| 125 |
|
|
|
|
| 126 |
if AutoTokenizer is None or AutoModelForSequenceClassification is None or torch is None:
|
| 127 |
_model_loaded_from = None
|
|
|
|
| 128 |
return None, None, _model_loaded_from # environment without torch/transformers
|
| 129 |
|
| 130 |
# Preferred classifier
|
|
@@ -142,18 +165,22 @@ def load_email_model() -> Tuple[object, object, str]:
|
|
| 142 |
_model_loaded_from = "backbone"
|
| 143 |
except Exception:
|
| 144 |
_tokenizer, _model, _model_loaded_from = None, None, None
|
|
|
|
| 145 |
return None, None, _model_loaded_from
|
| 146 |
|
| 147 |
# Dynamic quantization (CPU)
|
|
|
|
| 148 |
try:
|
| 149 |
_model.eval()
|
| 150 |
_model.to("cpu")
|
| 151 |
if hasattr(torch, "quantization"):
|
| 152 |
from torch.quantization import quantize_dynamic
|
| 153 |
_model = quantize_dynamic(_model, {torch.nn.Linear}, dtype=torch.qint8) # type: ignore
|
|
|
|
| 154 |
except Exception:
|
| 155 |
pass
|
| 156 |
|
|
|
|
| 157 |
return _tokenizer, _model, _model_loaded_from
|
| 158 |
|
| 159 |
def _truncate_for_budget(tokens_subject: List[int], tokens_body: List[int], max_len: int, subj_budget: int):
|
|
@@ -163,9 +190,9 @@ def _truncate_for_budget(tokens_subject: List[int], tokens_body: List[int], max_
|
|
| 163 |
return subj + body
|
| 164 |
|
| 165 |
def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
| 166 |
-
"""Return EmailResult + debug dict with probability, hits, boosts, timings, and model info.
|
| 167 |
-
|
| 168 |
-
|
| 169 |
|
| 170 |
t0 = time.perf_counter()
|
| 171 |
text = (subject or "") + "\n" + (body or "")
|
|
@@ -177,6 +204,13 @@ def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
|
| 177 |
|
| 178 |
tok, mdl, path = load_email_model()
|
| 179 |
dbg["path"] = path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
|
| 181 |
if tok is None or mdl is None:
|
| 182 |
# Pure lexical fallback (no model available):
|
|
@@ -187,7 +221,11 @@ def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
|
| 187 |
dbg["boost_from_strong"] = 0.18 * len(strong_hits)
|
| 188 |
dbg["boost_from_normal"] = 0.07 * len(normal_hits)
|
| 189 |
dbg["timing_ms"]["email_infer"] = round((time.perf_counter() - t0) * 1000, 2)
|
| 190 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 191 |
|
| 192 |
# Model path (MiniLM or your classifier)
|
| 193 |
enc_t0 = time.perf_counter()
|
|
@@ -202,12 +240,10 @@ def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
|
| 202 |
|
| 203 |
with torch.no_grad():
|
| 204 |
out = mdl(input_ids=ids, attention_mask=mask)
|
| 205 |
-
import math
|
| 206 |
if hasattr(out, "logits"):
|
| 207 |
logits = out.logits[0].detach().cpu().numpy().tolist()
|
| 208 |
exps = [math.exp(x) for x in logits]
|
| 209 |
-
|
| 210 |
-
p_raw = float(p1)
|
| 211 |
else:
|
| 212 |
p_raw = 0.5
|
| 213 |
|
|
@@ -220,8 +256,16 @@ def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
|
| 220 |
dbg["boost_from_strong"] = round(boost_s, 3)
|
| 221 |
dbg["boost_from_normal"] = round(boost_n, 3)
|
| 222 |
dbg["timing_ms"]["email_infer"] = round((time.perf_counter() - enc_t0) * 1000, 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 223 |
|
| 224 |
-
return EmailResult(
|
|
|
|
|
|
|
|
|
|
| 225 |
|
| 226 |
# =========================
|
| 227 |
# Fusion
|
|
@@ -278,11 +322,13 @@ def fuse(email_res: EmailResult, url_results: List[UrlResult], allowlist_domains
|
|
| 278 |
|
| 279 |
fused = {
|
| 280 |
"P_email": round(email_res.p_email, 3),
|
|
|
|
| 281 |
"R_url_max": round(r_url_max, 3),
|
| 282 |
"R_total": round(r_after, 3),
|
| 283 |
"R_total_before_overrides": round(r_before, 3),
|
| 284 |
"kw_hits": email_res.kw_hits,
|
| 285 |
"strong_hits": email_res.strong_hits,
|
|
|
|
| 286 |
"no_urls": no_urls,
|
| 287 |
"allowlist_hit": allowlist_hit,
|
| 288 |
"verdict": verdict
|
|
@@ -300,7 +346,7 @@ def fuse(email_res: EmailResult, url_results: List[UrlResult], allowlist_domains
|
|
| 300 |
# Gradio UI
|
| 301 |
# =========================
|
| 302 |
with gr.Blocks(title="PhishingMail-Lab") as demo:
|
| 303 |
-
gr.Markdown("# 🧪 PhishingMail‑Lab\n**POC** — Free‑tier friendly hybrid (email + URL) with explainable cues.")
|
| 304 |
|
| 305 |
with gr.Row():
|
| 306 |
with gr.Column(scale=3):
|
|
@@ -333,7 +379,8 @@ with gr.Blocks(title="PhishingMail-Lab") as demo:
|
|
| 333 |
|
| 334 |
# URL pipeline
|
| 335 |
t0 = time.perf_counter()
|
| 336 |
-
|
|
|
|
| 337 |
t1 = time.perf_counter()
|
| 338 |
url_results = score_urls(urls)
|
| 339 |
t2 = time.perf_counter()
|
|
@@ -354,33 +401,63 @@ with gr.Blocks(title="PhishingMail-Lab") as demo:
|
|
| 354 |
banner_text = "<br>".join(banners) if banners else ""
|
| 355 |
banner_visible = bool(banners)
|
| 356 |
|
| 357 |
-
# Forensics JSON
|
| 358 |
-
per_url = [{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 359 |
fx = {
|
| 360 |
"config": {
|
| 361 |
"weights": {"email": FUSION_EMAIL_W, "url": FUSION_URL_W},
|
| 362 |
"threshold_tau": THRESHOLD_TAU,
|
| 363 |
-
"overrides": {
|
|
|
|
|
|
|
|
|
|
|
|
|
| 364 |
"model_ids": {"classifier": EMAIL_CLASSIFIER_ID, "backbone": EMAIL_BACKBONE_ID}
|
| 365 |
},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 366 |
"email": {
|
| 367 |
-
"
|
|
|
|
| 368 |
"p_email_raw": email_dbg["p_raw"],
|
| 369 |
"boost_from_strong": email_dbg["boost_from_strong"],
|
| 370 |
"boost_from_normal": email_dbg["boost_from_normal"],
|
| 371 |
-
"
|
| 372 |
"kw_hits": email_res.kw_hits,
|
| 373 |
-
"strong_hits": email_res.strong_hits
|
|
|
|
| 374 |
},
|
| 375 |
"urls": per_url,
|
| 376 |
"fusion": {
|
| 377 |
-
"
|
| 378 |
-
"
|
| 379 |
-
|
| 380 |
-
|
| 381 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 382 |
},
|
| 383 |
"timings_ms": {
|
|
|
|
| 384 |
"url_extract": round((t1 - t0) * 1000, 2),
|
| 385 |
"url_score": round((t2 - t1) * 1000, 2),
|
| 386 |
"email_infer": email_dbg["timing_ms"].get("email_infer"),
|
|
@@ -388,32 +465,37 @@ with gr.Blocks(title="PhishingMail-Lab") as demo:
|
|
| 388 |
}
|
| 389 |
}
|
| 390 |
|
| 391 |
-
# Forensics Markdown (human‑readable)
|
| 392 |
lines = []
|
| 393 |
-
lines.append(f"**Verdict:** `{fused['verdict']}` | **R_total:** `{fused['R_total']}`
|
| 394 |
-
lines.append(f"**
|
| 395 |
if fuse_dbg["applied_overrides"]:
|
| 396 |
-
lines.append(f"**Overrides
|
| 397 |
else:
|
| 398 |
-
lines.append("**Overrides
|
| 399 |
if fused["no_urls"]:
|
| 400 |
lines.append("• No URLs found → email‑only decision path.")
|
| 401 |
if fused["allowlist_hit"]:
|
| 402 |
lines.append("• Allowlist matched → risk capped.")
|
| 403 |
lines.append("")
|
| 404 |
-
lines.append(f"**Email path:** `{email_dbg['path'] or 'lexical-fallback'}` | p_raw={email_dbg['p_raw']}
|
|
|
|
|
|
|
| 405 |
if email_res.strong_hits:
|
| 406 |
lines.append(f"• Strong cues: {', '.join(email_res.strong_hits)}")
|
| 407 |
if email_res.kw_hits:
|
| 408 |
lines.append(f"• All cues: {', '.join(email_res.kw_hits)}")
|
| 409 |
lines.append("")
|
| 410 |
if per_url:
|
| 411 |
-
lines.append("**URLs:**")
|
| 412 |
for u in per_url:
|
| 413 |
-
|
|
|
|
| 414 |
else:
|
| 415 |
lines.append("**URLs:** (none)")
|
| 416 |
lines.append("")
|
|
|
|
|
|
|
| 417 |
lines.append("**Timings (ms):** " + json.dumps(fx["timings_ms"]))
|
| 418 |
|
| 419 |
forensic_markdown = "\n".join(lines)
|
|
|
|
| 1 |
+
import os, re, json, time, math
|
| 2 |
from dataclasses import dataclass
|
| 3 |
+
from typing import List, Dict, Tuple, Optional
|
| 4 |
|
| 5 |
import gradio as gr
|
| 6 |
|
|
|
|
| 36 |
url: str
|
| 37 |
risk: float
|
| 38 |
reasons: List[str]
|
| 39 |
+
contrib: Dict[str, float] # per‑reason contribution for transparency
|
| 40 |
|
| 41 |
@dataclass
|
| 42 |
class EmailResult:
|
| 43 |
+
p_email: float # final probability after boosts
|
| 44 |
kw_hits: List[str]
|
| 45 |
+
strong_hits: List[str] # subset of kw_hits considered strong
|
| 46 |
+
token_counts: Dict[str, int] # {"subject_tokens":..,"body_tokens":..,"sequence_len":..}
|
| 47 |
+
p_raw: Optional[float] # raw model probability (before boosts); None in lexical fallback
|
| 48 |
+
path: Optional[str] # "classifier" | "backbone" | None (lexical)
|
| 49 |
|
| 50 |
# =========================
|
| 51 |
# URL extraction & heuristics (swap with your real URL model when ready)
|
|
|
|
| 73 |
return host
|
| 74 |
|
| 75 |
def score_url_heuristic(url: str) -> UrlResult:
|
| 76 |
+
"""
|
| 77 |
+
Heuristic scoring with a transparent per‑reason contribution map.
|
| 78 |
+
This keeps the POC explainable and makes the Forensics panel richer.
|
| 79 |
+
"""
|
| 80 |
host = url_host(url)
|
| 81 |
+
score = 0.0
|
| 82 |
reasons = []
|
| 83 |
+
contrib = {}
|
| 84 |
+
|
| 85 |
+
def add(amount: float, tag: str):
|
| 86 |
+
nonlocal score
|
| 87 |
+
score += amount
|
| 88 |
+
reasons.append(tag)
|
| 89 |
+
contrib[tag] = round(contrib.get(tag, 0.0) + amount, 3)
|
| 90 |
+
|
| 91 |
+
base = 0.05
|
| 92 |
+
add(base, "base")
|
| 93 |
|
| 94 |
if len(url) > 140:
|
| 95 |
+
add(0.15, "very_long_url")
|
| 96 |
if "@" in url or "%" in url:
|
| 97 |
+
add(0.20, "special_chars")
|
| 98 |
if any(host.endswith(t) for t in SUSPICIOUS_TLDS):
|
| 99 |
+
add(0.35, "suspicious_tld")
|
| 100 |
if any(s in host for s in SHORTENERS):
|
| 101 |
+
add(0.50, "shortener")
|
| 102 |
if host.count(".") >= 3:
|
| 103 |
+
add(0.20, "deep_subdomain")
|
| 104 |
if len(re.findall(r"[A-Z]", url)) > 16:
|
| 105 |
+
add(0.10, "mixed_case")
|
| 106 |
|
| 107 |
+
score = min(score, 1.0)
|
| 108 |
+
return UrlResult(url=url, risk=score, reasons=reasons, contrib=contrib)
|
| 109 |
|
| 110 |
def score_urls(urls: List[str]) -> List[UrlResult]:
|
| 111 |
return [score_url_heuristic(u) for u in urls]
|
|
|
|
| 116 |
_tokenizer = None
|
| 117 |
_model = None
|
| 118 |
_model_loaded_from = None # "classifier", "backbone", or None
|
| 119 |
+
_model_load_ms = None
|
| 120 |
+
_model_quantized = False
|
| 121 |
|
| 122 |
# Strong vs normal cues (lowercase)
|
| 123 |
STRONG_CUES = [
|
|
|
|
| 140 |
def load_email_model() -> Tuple[object, object, str]:
|
| 141 |
"""Try to load EMAIL_CLASSIFIER_ID; on failure, fall back to backbone with small head.
|
| 142 |
Apply dynamic int8 quantization for CPU if available."""
|
| 143 |
+
global _tokenizer, _model, _model_loaded_from, _model_load_ms, _model_quantized
|
| 144 |
if _tokenizer is not None and _model is not None:
|
| 145 |
return _tokenizer, _model, _model_loaded_from
|
| 146 |
|
| 147 |
+
start = time.perf_counter()
|
| 148 |
if AutoTokenizer is None or AutoModelForSequenceClassification is None or torch is None:
|
| 149 |
_model_loaded_from = None
|
| 150 |
+
_model_load_ms = round((time.perf_counter() - start) * 1000, 2)
|
| 151 |
return None, None, _model_loaded_from # environment without torch/transformers
|
| 152 |
|
| 153 |
# Preferred classifier
|
|
|
|
| 165 |
_model_loaded_from = "backbone"
|
| 166 |
except Exception:
|
| 167 |
_tokenizer, _model, _model_loaded_from = None, None, None
|
| 168 |
+
_model_load_ms = round((time.perf_counter() - start) * 1000, 2)
|
| 169 |
return None, None, _model_loaded_from
|
| 170 |
|
| 171 |
# Dynamic quantization (CPU)
|
| 172 |
+
_model_quantized = False
|
| 173 |
try:
|
| 174 |
_model.eval()
|
| 175 |
_model.to("cpu")
|
| 176 |
if hasattr(torch, "quantization"):
|
| 177 |
from torch.quantization import quantize_dynamic
|
| 178 |
_model = quantize_dynamic(_model, {torch.nn.Linear}, dtype=torch.qint8) # type: ignore
|
| 179 |
+
_model_quantized = True
|
| 180 |
except Exception:
|
| 181 |
pass
|
| 182 |
|
| 183 |
+
_model_load_ms = round((time.perf_counter() - start) * 1000, 2)
|
| 184 |
return _tokenizer, _model, _model_loaded_from
|
| 185 |
|
| 186 |
def _truncate_for_budget(tokens_subject: List[int], tokens_body: List[int], max_len: int, subj_budget: int):
|
|
|
|
| 190 |
return subj + body
|
| 191 |
|
| 192 |
def score_email(subject: str, body: str) -> Tuple[EmailResult, Dict]:
|
| 193 |
+
"""Return EmailResult + debug dict with probability, hits, boosts, timings, token counts, and model info."""
|
| 194 |
+
dbg = {"path": None, "p_raw": None, "boost_from_strong": 0.0, "boost_from_normal": 0.0,
|
| 195 |
+
"timing_ms": {}, "token_counts": {}, "model_info": {}}
|
| 196 |
|
| 197 |
t0 = time.perf_counter()
|
| 198 |
text = (subject or "") + "\n" + (body or "")
|
|
|
|
| 204 |
|
| 205 |
tok, mdl, path = load_email_model()
|
| 206 |
dbg["path"] = path
|
| 207 |
+
dbg["model_info"] = {
|
| 208 |
+
"loaded_from": path,
|
| 209 |
+
"classifier_id": EMAIL_CLASSIFIER_ID,
|
| 210 |
+
"backbone_id": EMAIL_BACKBONE_ID,
|
| 211 |
+
"quantized": _model_quantized,
|
| 212 |
+
"model_load_ms": _model_load_ms
|
| 213 |
+
}
|
| 214 |
|
| 215 |
if tok is None or mdl is None:
|
| 216 |
# Pure lexical fallback (no model available):
|
|
|
|
| 221 |
dbg["boost_from_strong"] = 0.18 * len(strong_hits)
|
| 222 |
dbg["boost_from_normal"] = 0.07 * len(normal_hits)
|
| 223 |
dbg["timing_ms"]["email_infer"] = round((time.perf_counter() - t0) * 1000, 2)
|
| 224 |
+
dbg["token_counts"] = {"subject_tokens": 0, "body_tokens": 0, "sequence_len": 0}
|
| 225 |
+
return EmailResult(
|
| 226 |
+
p_email=p_email, kw_hits=all_hits, strong_hits=strong_hits,
|
| 227 |
+
token_counts=dbg["token_counts"], p_raw=None, path=path
|
| 228 |
+
), dbg
|
| 229 |
|
| 230 |
# Model path (MiniLM or your classifier)
|
| 231 |
enc_t0 = time.perf_counter()
|
|
|
|
| 240 |
|
| 241 |
with torch.no_grad():
|
| 242 |
out = mdl(input_ids=ids, attention_mask=mask)
|
|
|
|
| 243 |
if hasattr(out, "logits"):
|
| 244 |
logits = out.logits[0].detach().cpu().numpy().tolist()
|
| 245 |
exps = [math.exp(x) for x in logits]
|
| 246 |
+
p_raw = float(exps[1] / (exps[0] + exps[1])) # assume label 1 = phishing
|
|
|
|
| 247 |
else:
|
| 248 |
p_raw = 0.5
|
| 249 |
|
|
|
|
| 256 |
dbg["boost_from_strong"] = round(boost_s, 3)
|
| 257 |
dbg["boost_from_normal"] = round(boost_n, 3)
|
| 258 |
dbg["timing_ms"]["email_infer"] = round((time.perf_counter() - enc_t0) * 1000, 2)
|
| 259 |
+
dbg["token_counts"] = {
|
| 260 |
+
"subject_tokens": len(encoded_subj),
|
| 261 |
+
"body_tokens": len(encoded_body),
|
| 262 |
+
"sequence_len": len(input_ids)
|
| 263 |
+
}
|
| 264 |
|
| 265 |
+
return EmailResult(
|
| 266 |
+
p_email=p_email, kw_hits=all_hits, strong_hits=strong_hits,
|
| 267 |
+
token_counts=dbg["token_counts"], p_raw=p_raw, path=path
|
| 268 |
+
), dbg
|
| 269 |
|
| 270 |
# =========================
|
| 271 |
# Fusion
|
|
|
|
| 322 |
|
| 323 |
fused = {
|
| 324 |
"P_email": round(email_res.p_email, 3),
|
| 325 |
+
"P_email_raw": round(email_res.p_raw, 3) if email_res.p_raw is not None else None,
|
| 326 |
"R_url_max": round(r_url_max, 3),
|
| 327 |
"R_total": round(r_after, 3),
|
| 328 |
"R_total_before_overrides": round(r_before, 3),
|
| 329 |
"kw_hits": email_res.kw_hits,
|
| 330 |
"strong_hits": email_res.strong_hits,
|
| 331 |
+
"token_counts": email_res.token_counts,
|
| 332 |
"no_urls": no_urls,
|
| 333 |
"allowlist_hit": allowlist_hit,
|
| 334 |
"verdict": verdict
|
|
|
|
| 346 |
# Gradio UI
|
| 347 |
# =========================
|
| 348 |
with gr.Blocks(title="PhishingMail-Lab") as demo:
|
| 349 |
+
gr.Markdown("# 🧪 PhishingMail‑Lab\n**POC** — Free‑tier friendly hybrid (email + URL) with explainable cues and rich forensics.")
|
| 350 |
|
| 351 |
with gr.Row():
|
| 352 |
with gr.Column(scale=3):
|
|
|
|
| 379 |
|
| 380 |
# URL pipeline
|
| 381 |
t0 = time.perf_counter()
|
| 382 |
+
raw_text = (subject_text or "") + "\n" + (body_text or "")
|
| 383 |
+
urls = list(dict.fromkeys(extract_urls(raw_text))) # uniq & ordered
|
| 384 |
t1 = time.perf_counter()
|
| 385 |
url_results = score_urls(urls)
|
| 386 |
t2 = time.perf_counter()
|
|
|
|
| 401 |
banner_text = "<br>".join(banners) if banners else ""
|
| 402 |
banner_visible = bool(banners)
|
| 403 |
|
| 404 |
+
# Forensics JSON (deeper detail)
|
| 405 |
+
per_url = [{
|
| 406 |
+
"url": u.url,
|
| 407 |
+
"risk": round(u.risk,3),
|
| 408 |
+
"reasons": u.reasons,
|
| 409 |
+
"contrib": u.contrib
|
| 410 |
+
} for u in url_results]
|
| 411 |
+
|
| 412 |
fx = {
|
| 413 |
"config": {
|
| 414 |
"weights": {"email": FUSION_EMAIL_W, "url": FUSION_URL_W},
|
| 415 |
"threshold_tau": THRESHOLD_TAU,
|
| 416 |
+
"overrides": {
|
| 417 |
+
"url_high": URL_OVERRIDE_HIGH,
|
| 418 |
+
"url_kw": URL_OVERRIDE_KW,
|
| 419 |
+
"allowlist_safe_cap": ALLOWLIST_SAFE_CAP
|
| 420 |
+
},
|
| 421 |
"model_ids": {"classifier": EMAIL_CLASSIFIER_ID, "backbone": EMAIL_BACKBONE_ID}
|
| 422 |
},
|
| 423 |
+
"input_summary": {
|
| 424 |
+
"chars_subject": len(subject_text or ""),
|
| 425 |
+
"chars_body": len(body_text or ""),
|
| 426 |
+
"num_urls": len(urls),
|
| 427 |
+
"allowlist_domains": allow_domains
|
| 428 |
+
},
|
| 429 |
"email": {
|
| 430 |
+
"path": email_dbg["path"] or "lexical-fallback",
|
| 431 |
+
"p_email_final": fused["P_email"],
|
| 432 |
"p_email_raw": email_dbg["p_raw"],
|
| 433 |
"boost_from_strong": email_dbg["boost_from_strong"],
|
| 434 |
"boost_from_normal": email_dbg["boost_from_normal"],
|
| 435 |
+
"token_counts": email_dbg["token_counts"],
|
| 436 |
"kw_hits": email_res.kw_hits,
|
| 437 |
+
"strong_hits": email_res.strong_hits,
|
| 438 |
+
"model_info": email_dbg["model_info"]
|
| 439 |
},
|
| 440 |
"urls": per_url,
|
| 441 |
"fusion": {
|
| 442 |
+
"equation": f"R_total = {FUSION_EMAIL_W} * P_email + {FUSION_URL_W} * R_url_max",
|
| 443 |
+
"values": {
|
| 444 |
+
"P_email": fused["P_email"],
|
| 445 |
+
"R_url_max": fused["R_url_max"],
|
| 446 |
+
"R_total_before_overrides": fused["R_total_before_overrides"],
|
| 447 |
+
"R_total_final": fused["R_total"],
|
| 448 |
+
"overrides_applied": fuse_dbg["applied_overrides"]
|
| 449 |
+
},
|
| 450 |
+
"decision": {
|
| 451 |
+
"threshold_tau": THRESHOLD_TAU,
|
| 452 |
+
"verdict": fused["verdict"]
|
| 453 |
+
},
|
| 454 |
+
"flags": {
|
| 455 |
+
"no_urls": fused["no_urls"],
|
| 456 |
+
"allowlist_hit": fused["allowlist_hit"]
|
| 457 |
+
}
|
| 458 |
},
|
| 459 |
"timings_ms": {
|
| 460 |
+
"model_load": email_dbg["model_info"]["model_load_ms"],
|
| 461 |
"url_extract": round((t1 - t0) * 1000, 2),
|
| 462 |
"url_score": round((t2 - t1) * 1000, 2),
|
| 463 |
"email_infer": email_dbg["timing_ms"].get("email_infer"),
|
|
|
|
| 465 |
}
|
| 466 |
}
|
| 467 |
|
| 468 |
+
# Forensics Markdown (human‑readable, denser detail)
|
| 469 |
lines = []
|
| 470 |
+
lines.append(f"**Verdict:** `{fused['verdict']}` | **R_total:** `{fused['R_total']}` (before: `{fused['R_total_before_overrides']}`) | **τ:** `{THRESHOLD_TAU}`")
|
| 471 |
+
lines.append(f"**Fusion:** R = {FUSION_EMAIL_W}×P_email + {FUSION_URL_W}×R_url_max → {FUSION_EMAIL_W}×{fused['P_email']} + {FUSION_URL_W}×{fused['R_url_max']}")
|
| 472 |
if fuse_dbg["applied_overrides"]:
|
| 473 |
+
lines.append(f"**Overrides:** {', '.join(fuse_dbg['applied_overrides'])}")
|
| 474 |
else:
|
| 475 |
+
lines.append("**Overrides:** (none)")
|
| 476 |
if fused["no_urls"]:
|
| 477 |
lines.append("• No URLs found → email‑only decision path.")
|
| 478 |
if fused["allowlist_hit"]:
|
| 479 |
lines.append("• Allowlist matched → risk capped.")
|
| 480 |
lines.append("")
|
| 481 |
+
lines.append(f"**Email path:** `{email_dbg['path'] or 'lexical-fallback'}` | p_raw={email_dbg['p_raw']} | +strong={email_dbg['boost_from_strong']} | +normal={email_dbg['boost_from_normal']}")
|
| 482 |
+
tc = email_dbg["token_counts"]
|
| 483 |
+
lines.append(f"• Tokens: subject={tc.get('subject_tokens',0)}, body={tc.get('body_tokens',0)}, sequence_len={tc.get('sequence_len',0)} (max={MAX_SEQ_LEN}) | subject_budget={SUBJECT_TOKEN_BUDGET}")
|
| 484 |
if email_res.strong_hits:
|
| 485 |
lines.append(f"• Strong cues: {', '.join(email_res.strong_hits)}")
|
| 486 |
if email_res.kw_hits:
|
| 487 |
lines.append(f"• All cues: {', '.join(email_res.kw_hits)}")
|
| 488 |
lines.append("")
|
| 489 |
if per_url:
|
| 490 |
+
lines.append("**URLs & contributions:**")
|
| 491 |
for u in per_url:
|
| 492 |
+
contrib_str = ", ".join([f"{k}:{v}" for k,v in u["contrib"].items()])
|
| 493 |
+
lines.append(f"• {u['url']} → risk={u['risk']} | reasons=({', '.join(u['reasons']) or 'none'}) | contrib=({contrib_str or 'n/a'})")
|
| 494 |
else:
|
| 495 |
lines.append("**URLs:** (none)")
|
| 496 |
lines.append("")
|
| 497 |
+
lines.append(f"**Model info:** loaded_from={email_dbg['model_info']['loaded_from']}, quantized={email_dbg['model_info']['quantized']}, load_ms={email_dbg['model_info']['model_load_ms']}")
|
| 498 |
+
lines.append("")
|
| 499 |
lines.append("**Timings (ms):** " + json.dumps(fx["timings_ms"]))
|
| 500 |
|
| 501 |
forensic_markdown = "\n".join(lines)
|