| import gradio as gr |
| import pdfplumber |
| from presidio_analyzer import AnalyzerEngine |
| from presidio_anonymizer import AnonymizerEngine |
| from presidio_image_redactor import ImageRedactorEngine |
| import numpy as np |
| import re |
| from docx import Document |
| from PIL import Image |
| import pytesseract |
| import fitz |
| import io |
|
|
| analyzer = AnalyzerEngine() |
| anonymizer = AnonymizerEngine() |
| image_redactor = ImageRedactorEngine() |
|
|
| COMPLIANCE_ENTITIES = { |
| "HIPAA": ["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN"], |
| "GDPR": ["PERSON", "EMAIL_ADDRESS", "LOCATION"], |
| "CCPA": ["PERSON", "EMAIL_ADDRESS", "IP_ADDRESS", "SSN", "CREDIT_CARD"] |
| } |
|
|
| SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".txt", ".png", ".jpg", ".jpeg"] |
|
|
| def extract_text(doc): |
| if not hasattr(doc, "name"): |
| return "ERROR: No file uploaded." |
| try: |
| fname = doc.name.lower() |
| if fname.endswith(".pdf"): |
| with pdfplumber.open(doc.name) as pdf: |
| pages = [page.extract_text() or "" for page in pdf.pages] |
| text = "\n".join(pages) |
| elif fname.endswith(".docx"): |
| document = Document(doc.name) |
| text = "\n".join([p.text for p in document.paragraphs]) |
| elif fname.endswith(".txt"): |
| with open(doc.name, "r", encoding="utf-8") as f: |
| text = f.read() |
| elif fname.endswith((".png", ".jpg", ".jpeg")): |
| img = Image.open(doc.name) |
| text = pytesseract.image_to_string(img) |
| else: |
| return "ERROR: Unsupported file type." |
| if not text.strip(): |
| return "ERROR: Document contains no extractable text." |
| return text |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
| def detect_pii(text): |
| try: |
| entities = [ |
| "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", |
| "SSN", "CREDIT_CARD", "LOCATION", "IP_ADDRESS" |
| ] |
| presidio_results = analyzer.analyze(text=text, entities=entities, language="en") |
| findings = [ |
| { |
| "entity": r.entity_type, |
| "score": r.score, |
| "start": r.start, |
| "end": r.end, |
| "text": text[r.start:r.end].strip() |
| } |
| for r in presidio_results |
| ] |
| findings += find_ssns(text) |
| findings += find_ip_addresses(text) |
| return findings, presidio_results |
| except Exception as e: |
| return [{"entity": "ERROR", "text": str(e)}], [] |
|
|
| def find_ip_addresses(text): |
| pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' |
| return [ |
| { |
| "entity": "IP_ADDRESS", |
| "score": 1.0, |
| "start": m.start(), |
| "end": m.end(), |
| "text": m.group() |
| } |
| for m in re.finditer(pattern, text) |
| ] |
|
|
| def find_ssns(text): |
| pattern = r'(?i)(ssn|social security number)[\s:]*([0-9]{3}-[0-9]{2}-[0-9]{4})' |
| findings = [] |
| for m in re.finditer(pattern, text): |
| findings.append({ |
| "entity": "SSN", |
| "score": 1.0, |
| "start": m.start(2), |
| "end": m.end(2), |
| "text": m.group(2) |
| }) |
| for m in re.finditer(r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b', text): |
| findings.append({ |
| "entity": "SSN", |
| "score": 0.95, |
| "start": m.start(), |
| "end": m.end(), |
| "text": m.group() |
| }) |
| return findings |
|
|
| def clean_person_entities(findings): |
| cleaned = [] |
| for f in findings: |
| if f["entity"] == "PERSON": |
| name = " ".join(f["text"].split()[:2]) |
| if name.lower() not in ["date", "department"]: |
| f = f.copy() |
| f["text"] = name |
| cleaned.append(f) |
| else: |
| cleaned.append(f) |
| return cleaned |
|
|
| def dedupe_findings(findings): |
| seen = set() |
| deduped = [] |
| for f in findings: |
| key = (f["entity"], f["text"], f["start"], f["end"]) |
| if key not in seen: |
| seen.add(key) |
| deduped.append(f) |
| return deduped |
|
|
| def risk_score(findings): |
| weights = { |
| "PERSON": 1, "EMAIL_ADDRESS": 2, "CREDIT_CARD": 4, "SSN": 5, |
| "IP_ADDRESS": 2, "PHONE_NUMBER": 2, "MEDICAL_RECORD_NUMBER": 3 |
| } |
| return sum(weights.get(f["entity"], 1) for f in findings) |
|
|
| def suggest_fixes(findings): |
| fixes = [] |
| for f in findings: |
| ent = f["entity"] |
| if ent == "PERSON": |
| fixes.append("Remove or mask full names.") |
| if ent == "EMAIL_ADDRESS": |
| fixes.append("Anonymize email addresses.") |
| if ent == "CREDIT_CARD": |
| fixes.append("Remove or mask credit card numbers.") |
| if ent == "SSN": |
| fixes.append("Remove or mask social security numbers.") |
| if ent == "PHONE_NUMBER": |
| fixes.append("Mask phone numbers.") |
| if ent == "LOCATION": |
| fixes.append("Remove or generalize location data.") |
| if ent == "IP_ADDRESS": |
| fixes.append("Remove or anonymize IP addresses.") |
| if ent == "MEDICAL_RECORD_NUMBER": |
| fixes.append("Anonymize medical record numbers.") |
| return list(set(fixes)) |
|
|
| def summarize_narrative(findings, regime): |
| if not findings: |
| return "No sensitive or regulated information was found in this document." |
| entity_types = [f["entity"] for f in findings] |
| summary_lines = [f"Under **{regime}**, the document contains:"] |
| for entity in sorted(set(entity_types)): |
| count = entity_types.count(entity) |
| summary_lines.append(f"- **{entity.replace('_', ' ').title()}**: {count} instance(s)") |
| summary_lines.append("These must be anonymized or removed to ensure compliance.") |
| return "\n".join(summary_lines) |
|
|
| def score_legend(): |
| return ( |
| "**Risk Score Legend:**\n" |
| "- 0–3: Low risk (little or no PII detected)\n" |
| "- 4–7: Moderate risk (some PII detected, take caution)\n" |
| "- 8+: High risk (multiple/high-value PII found—document needs urgent attention)\n" |
| "\n" |
| "Score is calculated based on entity sensitivity. For example, SSN and credit cards are higher risk than names." |
| ) |
|
|
| def redact_text(text, all_findings): |
| all_findings = sorted(all_findings, key=lambda f: f["start"], reverse=True) |
| redacted_text = text |
| for f in all_findings: |
| if not f["text"] or len(f["text"]) < 3: |
| continue |
| redacted_text = redacted_text[:f["start"]] + "[REDACTED]" + redacted_text[f["end"]:] |
| return redacted_text |
|
|
| def save_redacted_file(redacted_text): |
| path = "/tmp/redacted_output.txt" |
| with open(path, "w", encoding="utf-8") as f: |
| f.write(redacted_text) |
| return path |
|
|
| def redact_image_with_presidio(image_path): |
| img = Image.open(image_path) |
| redacted_img = image_redactor.redact(img) |
| out_path = "/tmp/redacted_image.png" |
| redacted_img.save(out_path) |
| return out_path |
|
|
| def redact_pdf_with_presidio(pdf_path): |
| doc = fitz.open(pdf_path) |
| output_pdf = fitz.open() |
| for page in doc: |
| pix = page.get_pixmap() |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| redacted_img = image_redactor.redact(img) |
| img_byte_arr = io.BytesIO() |
| redacted_img.save(img_byte_arr, format='PNG') |
| img_byte_arr.seek(0) |
| rect = fitz.Rect(0, 0, pix.width, pix.height) |
| out_page = output_pdf.new_page(width=pix.width, height=pix.height) |
| out_page.insert_image(rect, stream=img_byte_arr.getvalue()) |
| out_path = "/tmp/redacted_output.pdf" |
| output_pdf.save(out_path) |
| output_pdf.close() |
| return out_path |
|
|
| def executive_summary_template(findings, score, regime): |
| if not findings: |
| return ( |
| f"No sensitive information detected under {regime}. Document is considered low risk." |
| ) |
| risk_level = ( |
| "High Risk" if score >= 8 else |
| "Moderate Risk" if score >= 4 else "Low Risk" |
| ) |
| entity_counts = {} |
| for f in findings: |
| entity_counts[f["entity"]] = entity_counts.get(f["entity"], 0) + 1 |
|
|
| summary_lines = [ |
| f"This document falls under {regime} with a risk score of {score} ({risk_level})." |
| ] |
| if entity_counts: |
| summary_lines.append( |
| "Sensitive information detected: " + |
| ", ".join([f"{k} ({v})" for k, v in entity_counts.items()]) + "." |
| ) |
| summary_lines.append( |
| "Recommendation: Anonymize or redact all sensitive entities to ensure compliance." |
| ) |
| return " ".join(summary_lines) |
|
|
| def agentic_compliance(doc, regime): |
| text = extract_text(doc) |
| if text.startswith("ERROR"): |
| return text, None, None, None |
| findings, presidio_results = detect_pii(text) |
| findings = clean_person_entities(findings) |
| findings = dedupe_findings(findings) |
|
|
| entities_needed = COMPLIANCE_ENTITIES.get(regime, []) |
| relevant = [f for f in findings if f["entity"] in entities_needed] |
| score = risk_score(relevant) |
| fixes = suggest_fixes(relevant) |
| summary = summarize_narrative(relevant, regime) |
| exec_summary = executive_summary_template(relevant, score, regime) |
|
|
| findings_md = "\n".join([ |
| f"- **{f['entity']}** (`{f['text']}`), score: {f.get('score', 0):.2f}" |
| for f in relevant |
| ]) if relevant else "No relevant PII found for this regime." |
|
|
| fixes_md = "\n".join([f"- {fix}" for fix in fixes]) if fixes else "No action needed." |
| legend_md = score_legend() |
|
|
| redacted = redact_text(text, findings) |
| redacted_path = save_redacted_file(redacted) |
|
|
| redacted_file_path = None |
| redacted_image = None |
| if hasattr(doc, "name"): |
| fname = doc.name.lower() |
| if fname.endswith((".png", ".jpg", ".jpeg")): |
| redacted_file_path = redact_image_with_presidio(doc.name) |
| redacted_image = redacted_file_path |
| elif fname.endswith(".pdf"): |
| redacted_file_path = redact_pdf_with_presidio(doc.name) |
| redacted_image = None |
|
|
| md = f"""### Compliance Regime: **{regime}** |
| **Executive Summary:** |
| {exec_summary} |
| **Findings:** |
| {findings_md} |
| **Risk Score:** {score} |
| **Actionable Recommendations:** |
| {fixes_md} |
| **Summary:** |
| {summary} |
| --- |
| {legend_md} |
| """ |
| return md.strip(), redacted_path, redacted_file_path, redacted_image |
|
|
| |
|
|
| with gr.Blocks(title="Agentic Compliance MCP Server") as demo: |
| gr.Markdown("# Agentic Compliance MCP\nUpload a document to check it for PII then select a compliance regime.") |
| with gr.Tab("Compliance Agent"): |
| doc = gr.File(label="Upload Document", file_types=SUPPORTED_FILE_TYPES) |
| regime = gr.Dropdown(choices=list(COMPLIANCE_ENTITIES.keys()), label="Compliance Regime") |
| out = gr.Markdown(label="Compliance Output") |
| redacted_out = gr.File(label="Download Redacted Text") |
| file_redacted_out = gr.File(label="Download Redacted PDF/Image") |
| redacted_img = gr.Image(label="Redacted Image Preview") |
|
|
| gr.Button("Run Compliance Agent").click( |
| agentic_compliance, |
| inputs=[doc, regime], |
| outputs=[out, redacted_out, file_redacted_out, redacted_img] |
| ) |
|
|
| demo.launch(mcp_server=True) |
|
|