Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
IRS Publication 1075 Compliance Assistant (Gradio) | |
Features | |
- Upload a PDF or DOCX policy/security document and run heuristic checks aligned to IRS Pub. 1075 themes. | |
- Generate a structured compliance report with findings, gaps, and actionable recommendations. | |
- Ask detailed Pub. 1075 questions and get answers grounded in the OFFICIAL PDF only: | |
https://www.irs.gov/pub/irs-pdf/p1075.pdf | |
The app downloads the PDF at runtime (if internet is available), builds a page-level index, | |
and cites specific page numbers and the most relevant passages. | |
Notes | |
- Do NOT upload real FTI; use redacted/sample docs. This is guidance only, not legal advice. | |
- Files are handled in memory; the generated report file is written to /tmp for download. | |
- If the Space has no internet access, Q&A will fall back to a minimal local summary and warn the user. | |
""" | |
import io | |
import os | |
import re | |
import json | |
import time | |
import math | |
import tempfile | |
from datetime import datetime | |
from typing import List, Dict, Any, Tuple, Optional | |
import gradio as gr | |
import requests | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from PyPDF2 import PdfReader | |
from docx import Document as DocxDocument | |
APP_TITLE = "IRS Pub. 1075 Compliance Assistant" | |
APP_TAGLINE = "Upload a policy/security document for heuristic checks and ask detailed Pub. 1075 questions with page citations." | |
IRS_PUB_1075_URL = "https://www.irs.gov/pub/irs-pdf/p1075.pdf" | |
WARNING_BANNER = ( | |
"⚠️ Do NOT upload real FTI. This tool references only IRS Publication 1075 " | |
f"({IRS_PUB_1075_URL}). It provides guidance only, not legal/compliance advice." | |
) | |
# -------------------------------------------------------------------------------------- | |
# Knowledge themes for heuristic checks (no external sources beyond Pub. 1075) | |
# -------------------------------------------------------------------------------------- | |
CHECKS = [ | |
{ | |
"id": "chk_encryption", | |
"title": "Cryptographic Protections (At Rest & In Transit)", | |
"patterns_any": [ | |
r"\bencrypt(ed|ion|ing)\b", r"\bTLS\s*1\.2\b", r"\bTLS\s*1\.3\b", | |
r"\bcryptograph(y|ic)\b", r"\bkey management\b", r"\bkey rotation\b", r"\bFIPS\b" | |
], | |
"recommendation": "Document approved cryptography (preferring FIPS-validated modules if applicable), TLS 1.2+ for transmission, and key management (generation, storage, rotation).", | |
"section": "Encryption & Cryptographic Protections" | |
}, | |
{ | |
"id": "chk_access", | |
"title": "Access Control, Least Privilege & MFA", | |
"patterns_any": [ | |
r"\bMFA\b", r"\bmulti-?factor\b", r"\bleast privilege\b", r"\brole-?based\b", | |
r"\baccess control\b", r"\bprivileged\b", r"\badmin(istrative)? access\b", | |
r"\baccount (lifecycle|provisioning|deprovisioning|review)\b" | |
], | |
"recommendation": "Enforce least privilege and role-based access. Require MFA for remote and admin access. Define account provisioning, periodic reviews, and timely deprovisioning.", | |
"section": "Access Control & Multi-Factor Authentication" | |
}, | |
{ | |
"id": "chk_incident", | |
"title": "Incident Response & Reporting", | |
"patterns_any": [ | |
r"\bincident response\b", r"\bbreach\b", r"\bcontainment\b", r"\beradication\b", | |
r"\brecovery\b", r"\bpost-incident\b", r"\bnotification\b", r"\breport(ing)?\b" | |
], | |
"recommendation": "Define and test procedures across detection, escalation, containment, eradication, recovery, and post-incident review. Include notification/reporting timelines.", | |
"section": "Incident Response" | |
}, | |
{ | |
"id": "chk_audit", | |
"title": "Audit Logging & Monitoring", | |
"patterns_any": [ | |
r"\baudit(ing)?\b", r"\blog(s|ging)?\b", r"\bretention\b", | |
r"\badmin(istrative)? actions?\b", r"\bintegrity\b", r"\btamper\b", r"\bSIEM\b" | |
], | |
"recommendation": "Record access and administrative actions; protect logs from tampering; define retention and review procedures.", | |
"section": "Audit & Accountability" | |
}, | |
{ | |
"id": "chk_media", | |
"title": "Media Protection & Sanitization", | |
"patterns_any": [ | |
r"\bmedia (sanitization|protection|handling|labeling)\b", | |
r"\bdispose|disposal|destruct(ion)?\b", r"\bchain of custody\b", r"\btransport\b" | |
], | |
"recommendation": "Define labeling, handling, transport, and sanitization/disposal procedures for media that may contain FTI.", | |
"section": "Media Protection" | |
}, | |
{ | |
"id": "chk_config", | |
"title": "Configuration & Vulnerability Management", | |
"patterns_any": [ | |
r"\bconfiguration management\b", r"\bbaseline\b", r"\bchange control\b", | |
r"\bpatch(ing)?\b", r"\bvulnerabilit(y|ies)\b", r"\bscan(ning)?\b", r"\bremediation\b" | |
], | |
"recommendation": "Maintain baselines, change control, and patch/vulnerability processes. Track remediation timelines.", | |
"section": "Configuration & Vulnerability Management" | |
}, | |
{ | |
"id": "chk_contingency", | |
"title": "Contingency Planning & Backup", | |
"patterns_any": [ | |
r"\bcontingency\b", r"\bdisaster recovery\b", r"\bDRP\b", r"\bBCP\b", | |
r"\bbackup(s)?\b", r"\brestore\b", r"\btabletop\b", r"\bexercise\b" | |
], | |
"recommendation": "Develop, maintain, and test contingency/DR plans and secure, tested backups with documented RTO/RPO.", | |
"section": "Contingency Planning" | |
}, | |
{ | |
"id": "chk_physical", | |
"title": "Physical & Environmental Security", | |
"patterns_any": [ | |
r"\bphysical security\b", r"\bdata center\b", r"\bvisitor\b", r"\bbadge\b", | |
r"\bperimeter\b", r"\bcamera\b", r"\benvironmental\b" | |
], | |
"recommendation": "Restrict physical access; implement visitor controls and appropriate environmental safeguards.", | |
"section": "Physical & Environmental Security" | |
}, | |
{ | |
"id": "chk_training", | |
"title": "Security Awareness & Training", | |
"patterns_any": [ | |
r"\bsecurity awareness\b", r"\btraining\b", r"\bannual\b", r"\brole-based\b", | |
r"\bprivacy training\b", r"\bFTI training\b" | |
], | |
"recommendation": "Provide initial and periodic training; include role-specific content for admins and developers.", | |
"section": "Awareness & Training" | |
}, | |
] | |
# -------------------------------------------------------------------------------------- | |
# PDF fetching and page-level indexing for detailed, section-specific Q&A | |
# -------------------------------------------------------------------------------------- | |
PDF_CACHE_PATH = os.path.join(tempfile.gettempdir(), "irs_pub1075.pdf") | |
INDEX_JSON_PATH = os.path.join(tempfile.gettempdir(), "irs_pub1075_index.json") | |
PAGE_TEXTS: List[str] = [] | |
PAGE_HEADINGS: List[str] = [] | |
PAGE_VECTORIZER: Optional[TfidfVectorizer] = None | |
PAGE_TFIDF = None | |
PDF_AVAILABLE: bool = False | |
PDF_PAGES: int = 0 | |
def _first_nonempty_line(text: str) -> str: | |
for line in (text or "").splitlines(): | |
ln = line.strip() | |
if ln: | |
return ln[:120] | |
return "Untitled section" | |
def _split_sentences(text: str) -> List[str]: | |
# Simple sentence splitter | |
parts = re.split(r'(?<=[\.\?!])\s+(?=[A-Z0-9])', text.strip()) | |
# Filter and trim | |
return [p.strip() for p in parts if p.strip()] | |
def _download_pdf_if_needed() -> bool: | |
# Return True if available (downloaded or already cached) | |
try: | |
if os.path.exists(PDF_CACHE_PATH) and os.path.getsize(PDF_CACHE_PATH) > 0: | |
return True | |
resp = requests.get(IRS_PUB_1075_URL, timeout=30) | |
resp.raise_for_status() | |
with open(PDF_CACHE_PATH, "wb") as f: | |
f.write(resp.content) | |
return True | |
except Exception: | |
return False | |
def _build_page_index() -> bool: | |
global PAGE_TEXTS, PAGE_HEADINGS, PAGE_VECTORIZER, PAGE_TFIDF, PDF_PAGES | |
try: | |
with open(PDF_CACHE_PATH, "rb") as f: | |
reader = PdfReader(f) | |
PAGE_TEXTS = [] | |
PAGE_HEADINGS = [] | |
PDF_PAGES = len(reader.pages) | |
for i in range(PDF_PAGES): | |
try: | |
txt = reader.pages[i].extract_text() or "" | |
except Exception: | |
txt = "" | |
PAGE_TEXTS.append(txt) | |
PAGE_HEADINGS.append(_first_nonempty_line(txt)) | |
# Build TF-IDF over pages (page-level retrieval) | |
PAGE_VECTORIZER = TfidfVectorizer(stop_words="english") | |
PAGE_TFIDF = PAGE_VECTORIZER.fit_transform(PAGE_TEXTS) | |
# Save a tiny index manifest (optional) | |
with open(INDEX_JSON_PATH, "w") as jf: | |
json.dump({"pages": PDF_PAGES, "cached_at": time.time()}, jf) | |
return True | |
except Exception: | |
PAGE_TEXTS, PAGE_HEADINGS, PAGE_VECTORIZER, PAGE_TFIDF = [], [], None, None | |
return False | |
def ensure_pdf_index_ready() -> bool: | |
global PDF_AVAILABLE | |
if PAGE_TFIDF is not None and PAGE_VECTORIZER is not None and PAGE_TEXTS: | |
PDF_AVAILABLE = True | |
return True | |
if not _download_pdf_if_needed(): | |
PDF_AVAILABLE = False | |
return False | |
ok = _build_page_index() | |
PDF_AVAILABLE = ok | |
return ok | |
def search_pub1075_pages(query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
""" | |
Returns a list of dicts: {page, heading, score, snippets: [ ... ] } | |
Each 'snippets' item is a short sentence-level excerpt from that page. | |
""" | |
if not ensure_pdf_index_ready(): | |
return [] | |
q_vec = PAGE_VECTORIZER.transform([query]) | |
sims = cosine_similarity(q_vec, PAGE_TFIDF).flatten() | |
order = sims.argsort()[::-1][:max(1, top_k)] | |
results = [] | |
for idx in order: | |
page_text = PAGE_TEXTS[idx] | |
heading = PAGE_HEADINGS[idx] | |
sentences = _split_sentences(page_text) | |
# Score sentences by simple TF-IDF dot with the same vectorizer (fallback: substring hit count) | |
try: | |
sent_vecs = PAGE_VECTORIZER.transform(sentences) | |
s_sims = cosine_similarity(q_vec, sent_vecs).flatten() | |
top_sent_idx = s_sims.argsort()[::-1][:3] | |
best_snips = [sentences[i] for i in top_sent_idx if sentences[i]] | |
except Exception: | |
# Fallback: choose sentences containing query terms | |
q_terms = [t for t in re.findall(r"\w+", query.lower()) if len(t) > 2] | |
scored = [] | |
for s in sentences: | |
score = sum(1 for t in q_terms if t in s.lower()) | |
scored.append((score, s)) | |
scored.sort(key=lambda x: (-x[0], -len(x[1]))) | |
best_snips = [s for sc, s in scored[:3] if s] | |
# Trim snippets (keep them short) | |
trimmed = [] | |
for sn in best_snips: | |
trimmed.append(sn[:400]) | |
results.append({ | |
"page": idx + 1, # 1-based for human readability | |
"heading": heading, | |
"score": float(sims[idx]), | |
"snippets": trimmed | |
}) | |
return results | |
def detailed_answer_from_pages(query: str, top_k: int = 5) -> str: | |
hits = search_pub1075_pages(query, top_k=top_k) | |
if not hits: | |
return ( | |
"The app could not access the official PDF at runtime, so detailed citations are unavailable. " | |
"Please enable internet access for this Space or try again later. " | |
f"Source of truth: {IRS_PUB_1075_URL}" | |
) | |
out = [] | |
out.append("### Detailed Guidance (grounded in IRS Publication 1075)") | |
# Provide an actionable, structured answer first | |
out.append("**Actionable steps:**") | |
out.append("- Identify whether the control applies to systems or processes handling Federal Tax Information (FTI).") | |
out.append("- Document policy requirements, technical configurations, and operational procedures.") | |
out.append("- Implement control mechanisms and verify via monitoring, audits, or tests.") | |
out.append("- Maintain evidence (policies, tickets, logs, reports) to demonstrate compliance during reviews.") | |
out.append("") | |
# Then include the most relevant sections with snippets and exact page numbers | |
out.append("**Most relevant sections in Pub. 1075 (by page):**") | |
for i, h in enumerate(hits, 1): | |
out.append(f"**{i}. Page {h['page']} — {h['heading']}**") | |
for sn in h["snippets"]: | |
out.append(f"> {sn}") | |
out.append(f"_Citation: IRS Publication 1075 (official PDF), page {h['page']}. {IRS_PUB_1075_URL}_") | |
out.append("") | |
# Add a compact reading plan | |
pages_list = ", ".join(str(h["page"]) for h in hits[:5]) | |
out.append(f"**Suggested reading order:** pages {pages_list} in the official PDF above.") | |
return "\n".join(out) | |
# -------------------------------------------------------------------------------------- | |
# Document parsing utils (for uploaded documents) | |
# -------------------------------------------------------------------------------------- | |
def read_pdf_bytes(file_bytes: bytes) -> str: | |
reader = PdfReader(io.BytesIO(file_bytes)) | |
texts = [] | |
for page in reader.pages: | |
try: | |
texts.append(page.extract_text() or "") | |
except Exception: | |
pass | |
return "\n".join(texts) | |
def read_docx_bytes(file_bytes: bytes) -> str: | |
f = io.BytesIO(file_bytes) | |
doc = DocxDocument(f) | |
return "\n".join([p.text for p in doc.paragraphs]) | |
def extract_text_from_upload(upload_bytes: Optional[bytes]) -> Tuple[str, str]: | |
if upload_bytes is None: | |
return "", "No file." | |
raw = upload_bytes | |
# Try PDF | |
try: | |
txt = read_pdf_bytes(raw) | |
if txt.strip(): | |
return txt, f"PDF file | {len(raw)} bytes | parsed length: {len(txt)} chars" | |
except Exception: | |
pass | |
# Try DOCX | |
try: | |
txt = read_docx_bytes(raw) | |
if txt.strip(): | |
return txt, f"DOCX file | {len(raw)} bytes | parsed length: {len(txt)} chars" | |
except Exception: | |
pass | |
# Fallback: text | |
try: | |
txt = raw.decode("utf-8", errors="ignore") | |
return txt, f"Plain text | {len(raw)} bytes | parsed length: {len(txt)} chars" | |
except Exception as e: | |
return "", f"Error reading file: {e}" | |
def run_checks(doc_text: str) -> List[Dict[str, Any]]: | |
results = [] | |
text = doc_text.lower() | |
for chk in CHECKS: | |
found = any(re.search(p, text, flags=re.IGNORECASE) for p in chk["patterns_any"]) | |
status = "Meets (evidence found)" if found else "Gap (no explicit evidence)" | |
results.append({ | |
"title": chk["title"], | |
"section": chk["section"], | |
"status": status, | |
"recommendation": chk["recommendation"] | |
}) | |
return results | |
def summarize_score(findings: List[Dict[str, Any]]) -> Dict[str, Any]: | |
total = len(findings) | |
met = sum(1 for f in findings if f["status"].startswith("Meets")) | |
gaps = total - met | |
score_pct = int(round((met / total) * 100)) if total else 0 | |
return {"total": total, "met": met, "gaps": gaps, "score": score_pct} | |
def format_report(meta: str, findings: List[Dict[str, Any]]) -> str: | |
summary = summarize_score(findings) | |
lines = [ | |
"# Pub. 1075 Heuristic Compliance Assessment", | |
f"- Source of truth: {IRS_PUB_1075_URL}", | |
f"- Document: {meta}", | |
f"- Summary Score: {summary['score']}% (Met {summary['met']} of {summary['total']}; Gaps {summary['gaps']})", | |
"", | |
"## Findings (by theme)" | |
] | |
for f in findings: | |
lines.append(f"### {f['title']}") | |
lines.append(f"- Theme: {f['section']}") | |
lines.append(f"- Status: {f['status']}") | |
lines.append(f"- Recommendation: {f['recommendation']}") | |
lines.append("") | |
lines += [ | |
"---", | |
"### Notes", | |
"- This assessment is heuristic. Controls may be present but phrased differently.", | |
"- Validate against the official IRS Publication 1075 and your agency policy." | |
] | |
return "\n".join(lines) | |
# -------------------------------------------------------------------------------------- | |
# Gradio handlers | |
# -------------------------------------------------------------------------------------- | |
def handle_assessment(upload_bytes: bytes): | |
text, meta = extract_text_from_upload(upload_bytes) | |
if not text.strip(): | |
return WARNING_BANNER, "No text extracted. Please upload a PDF or DOCX with selectable text.", "", gr.update(visible=False) | |
findings = run_checks(text) | |
report_md = format_report(meta, findings) | |
report_name = f"pub1075_assessment_{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}.md" | |
report_path = os.path.join("/tmp", report_name) | |
with open(report_path, "wb") as f: | |
f.write(report_md.encode("utf-8")) | |
table_lines = ["| Control | Status | Theme |", "|---|---|---|"] | |
for fnd in findings: | |
table_lines.append(f"| {fnd['title']} | {fnd['status']} | {fnd['section']} |") | |
table_md = "\n".join(table_lines) | |
header_md = ( | |
f"> {WARNING_BANNER}\n\n" | |
f"**Parsed Document Info:** {meta}\n\n" | |
f"**Summary Score:** {summarize_score(findings)['score']}%\n\n" | |
f"**Authoritative Source:** {IRS_PUB_1075_URL}" | |
) | |
return header_md, table_md, report_md, report_path | |
def handle_qa(question: str): | |
# Provide a detailed, section-specific answer with page citations | |
question = (question or "").strip() | |
if not question: | |
return "Please enter a question about IRS Publication 1075." | |
return detailed_answer_from_pages(question, top_k=5) | |
# -------------------------------------------------------------------------------------- | |
# UI | |
# -------------------------------------------------------------------------------------- | |
with gr.Blocks(title=APP_TITLE, theme=gr.themes.Default()) as demo: | |
gr.Markdown(f"# {APP_TITLE}\n{APP_TAGLINE}\n\n{WARNING_BANNER}") | |
with gr.Tab("Upload & Check (Heuristic)"): | |
gr.Markdown( | |
"Upload a **PDF** or **DOCX** policy/security document. The assistant will run Pub. 1075-aligned heuristic checks and provide a structured report (downloadable as Markdown)." | |
) | |
file_in = gr.File(label="Upload PDF or DOCX", file_types=[".pdf", ".docx"], type="binary") | |
run_btn = gr.Button("Run Compliance Assessment") | |
header_out = gr.Markdown() | |
table_out = gr.Markdown() | |
report_out = gr.Markdown(label="Full Report (Markdown)") | |
download_out = gr.File(label="Download Report (.md)", visible=False) | |
run_btn.click( | |
fn=handle_assessment, | |
inputs=[file_in], | |
outputs=[header_out, table_out, report_out, download_out] | |
) | |
with gr.Tab("Interactive Q&A (Detailed with Page Citations)"): | |
gr.Markdown( | |
"Ask about Pub. 1075 requirements. The app downloads and searches the **official PDF**, returning detailed guidance " | |
"and citing **specific pages** and short snippets.\n\n" | |
f"Source of truth: {IRS_PUB_1075_URL}" | |
) | |
question_in = gr.Textbox(label="Your question", placeholder="e.g., What encryption protections are required for FTI during transmission?") | |
ask_btn = gr.Button("Get Answer") | |
answer_out = gr.Markdown() | |
ask_btn.click(fn=handle_qa, inputs=[question_in], outputs=[answer_out]) | |
with gr.Tab("About & Scope"): | |
gr.Markdown( | |
f""" | |
### Source of Truth | |
- Only the official IRS Publication 1075 PDF is used: {IRS_PUB_1075_URL} | |
### How Q&A Works | |
- The app downloads the PDF (if internet is available), builds a page-level TF-IDF index, and retrieves the most relevant pages. | |
- It surfaces short, relevant passages and cites **exact page numbers** for deeper reading. | |
### Security Notes | |
- Files are processed in memory; the downloadable report is written to **/tmp** solely for user download. | |
- Do not upload real Federal Tax Information (FTI). | |
### Limitations | |
- Heuristic checks may miss controls that are phrased differently. | |
- This tool does not replace formal IRS compliance review or legal advice. | |
""" | |
) | |
if __name__ == "__main__": | |
# Do not force share=True on Spaces | |
demo.launch() | |