import gradio as gr import re import json import torch from transformers import pipeline, AutoModelForTokenClassification, AutoTokenizer import faker from typing import List, Dict, Any, Optional import pandas as pd class EnhancedPiiProtectionPipeline: """ A comprehensive PII protection pipeline that: 1. Uses regex for all detectable patterns first 2. Uses multiple custom NER models for remaining detection 3. Provides three protection methods: labeling, masking, and synthesis 4. Handles general, Indian-specific, address, and medical contexts """ def __init__( self, main_model_name: str = "Kashish-jain/pii-protection-model", medical_model_name: str = "Kashish-jain/pii-protection-medical", use_medical_model: bool = False ): """ Initialize the comprehensive PII protection pipeline. Args: main_model_name: HuggingFace model name or path for the main PII model medical_model_name: HuggingFace model name for the medical NER model use_medical_model: Whether to load and use the medical model """ # Main model self.main_tokenizer = AutoTokenizer.from_pretrained(main_model_name) self.main_model = pipeline("ner", model=main_model_name, tokenizer=self.main_tokenizer, aggregation_strategy="simple") # Address-specific model - implementation simplified self.address_model = self.main_model # Fallback to main model for simplicity # Medical model self.use_medical_model = use_medical_model self.medical_model = None self.medical_tokenizer = None if use_medical_model and medical_model_name: try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.device = device self.medical_tokenizer = AutoTokenizer.from_pretrained(medical_model_name) self.medical_model = pipeline( "ner", model=medical_model_name, tokenizer=self.medical_tokenizer, aggregation_strategy="simple", device=0 if torch.cuda.is_available() else -1 ) print(f"Medical model '{medical_model_name}' loaded successfully") except Exception as e: print(f"Warning: Could not load medical model. Error: {str(e)}") self.use_medical_model = False self.faker = faker.Faker('en_IN') # Set up regex patterns for common PII entities - IMPROVED PATTERNS self.regex_patterns = { # Phone numbers - Fixed to prevent partial matches 'PHONENUMBER': r'(? List[Dict[str, Any]]: """Detect PII using regex patterns with improved capture groups.""" entities = [] for entity_type, pattern in self.all_regex_patterns.items(): for match in re.finditer(pattern, text, re.IGNORECASE): # For patterns with capture groups, use the first group if it exists if match.groups() and match.group(1): # For labeled patterns with capture groups (e.g., "Height: 5'6"") captured_text = match.group(1) # Calculate start/end positions for the captured group start = match.start(1) end = match.end(1) else: # For patterns without capture groups or standalone measurements captured_text = match.group(0) start = match.start(0) end = match.end(0) # Handle standalone height/weight by renaming them if entity_type == 'HEIGHT_STANDALONE': entity_type = 'HEIGHT' elif entity_type == 'WEIGHT_STANDALONE': entity_type = 'WEIGHT' elif entity_type == 'BLOOD_TYPE_STANDALONE': entity_type = 'BLOOD_TYPE' entities.append({ "text": captured_text, "label": entity_type, "start": start, "end": end, "score": 0.95, # High confidence for regex matches "_original_text": text # Store original text for context }) return entities def ner_detection(self, text: str, model_type: str = "main") -> List[Dict[str, Any]]: """ Detect PII using NER models Args: text: Text to analyze model_type: Type of model to use ("main", "medical") """ if model_type == "medical" and not self.use_medical_model: return [] model = self.medical_model if model_type == "medical" else self.main_model try: results = model(text) # Convert to standard format entities = [] for result in results: # Skip low confidence predictions if result.get('score', 0) < 0.5: continue # Clean entity type entity_type = result.get('entity_group', result.get('entity', '')).replace('B-', '').replace('I-', '') entities.append({ "text": result.get('word', text[result['start']:result['end']]), "label": entity_type, "start": result['start'], "end": result['end'], "score": result.get('score', 0.7), "_original_text": text # Store original text for context }) return entities except Exception as e: print(f"Error with NER detection: {str(e)}") return [] def merge_entities(self, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Merge adjacent entities of the same or related types that likely form a single entity""" if not entities: return [] # Sort entities by start position entities.sort(key=lambda x: x['start']) merged = [] # Define related entity groups (entities that could be part of the same larger entity) related_types = { 'NAME': ['FIRSTNAME', 'MIDDLENAME', 'LASTNAME', 'PREFIX'], 'ADDRESS': ['STREET', 'CITY', 'STATE', 'ZIPCODE', 'BUILDINGNUMBER'], 'PHONENUMBER': ['PHONENUMBER'] # Explicitly add PHONENUMBER to prevent merging with other types } # Flatten the related types for quick lookup related_types_flat = {} for main_type, sub_types in related_types.items(): for sub_type in sub_types: related_types_flat[sub_type] = main_type # Helper function to check if two entity types are related def are_related(type1, type2): # Same type is related if type1 == type2: return True # Prevent merging PHONENUMBER with other types if type1 == 'PHONENUMBER' or type2 == 'PHONENUMBER': return type1 == type2 # Check if they're in the same group for group, types in related_types.items(): if type1 in types and type2 in types: return True if type1 == group and type2 in types: return True if type2 == group and type1 in types: return True # Check through the flattened related types if type1 in related_types_flat and related_types_flat[type1] == type2: return True if type2 in related_types_flat and related_types_flat[type2] == type1: return True return False for entity in entities: if not merged: merged.append(entity.copy()) continue last = merged[-1] # Maximum space between tokens that could be part of the same entity # For adjacent words, this would typically be 1 (the space) max_gap = 5 # Check if entities could be part of the same larger entity: # 1. Same or related entity type # 2. Within a reasonable distance # 3. No other complete word between them if (are_related(entity['label'], last['label']) and entity['start'] - last['end'] <= max_gap): # Get the text between the two entities between_text = entity.get('_original_text', '')[last['end']:entity['start']] \ if '_original_text' in entity and '_original_text' in last \ else ' ' # Only merge if the gap contains just spaces or very simple punctuation if between_text.strip() in ['', ' ', '.', ',', '-', '_']: # Create merged entity with all text between start and end if '_original_text' in entity and '_original_text' in last: full_text = last['_original_text'][last['start']:entity['end']] else: full_text = last['text'] + between_text + entity['text'] last['text'] = full_text last['end'] = entity['end'] # When merging different entity types, prefer the broader category if last['label'] in related_types_flat and entity['label'] == related_types_flat[last['label']]: last['label'] = entity['label'] elif entity['label'] in related_types_flat and last['label'] == related_types_flat[entity['label']]: # Keep last['label'] as is pass last['score'] = max(last.get('score', 0), entity.get('score', 0)) else: merged.append(entity.copy()) else: merged.append(entity.copy()) return merged def remove_overlapping_entities(self, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove overlapping entities by keeping the highest scoring one""" if not entities: return [] # Sort by start position entities.sort(key=lambda x: x['start']) # Identify overlapping entities non_overlapping = [] i = 0 while i < len(entities): current = entities[i] # Find all entities that overlap with the current one overlapping = [current] j = i + 1 while j < len(entities) and entities[j]['start'] < current['end']: overlapping.append(entities[j]) j += 1 # Keep the highest scoring entity from overlapping group if len(overlapping) > 1: best_entity = max(overlapping, key=lambda x: x.get('score', 0)) non_overlapping.append(best_entity) else: non_overlapping.append(current) # Move index to start after all overlapping entities i = j return non_overlapping def generate_synthetic_value(self, entity_type: str, original_value: str = None) -> str: """Generate realistic synthetic data for PII.""" try: if entity_type in ['PERSON', 'NAME', 'FIRSTNAME', 'LASTNAME']: return self.faker.name() elif entity_type == 'EMAIL': return self.faker.email() elif entity_type == 'PHONENUMBER': return self.faker.phone_number() elif entity_type == 'PAN': return self.faker.bothify('?????####?').upper() elif entity_type == 'AADHAR': return ' '.join([self.faker.numerify('####') for _ in range(3)]) elif entity_type == 'CREDITCARDNUMBER' or entity_type == 'CREDIT_CARD': return self.faker.credit_card_number() elif entity_type == 'ACCOUNTNUMBER' or entity_type == 'IBAN_CODE' or entity_type == 'BANK_NUMBER': return self.faker.bban() elif entity_type == 'PASSPORT' or entity_type == 'US_PASSPORT': return f"{self.faker.random_letter().upper()}{self.faker.random_letter().upper()}{self.faker.numerify('######')}" elif entity_type == 'DOB' or entity_type == 'DATE_TIME': return self.faker.date_of_birth(minimum_age=18, maximum_age=90).strftime('%d/%m/%Y') elif entity_type == 'IPV4' or entity_type == 'IP_ADDRESS': return self.faker.ipv4() elif entity_type == 'URL': return self.faker.url() elif entity_type == 'PINCODE': return self.faker.postcode() elif entity_type == 'CITY' or entity_type == 'LOCATION': return self.faker.city() elif entity_type == 'STATE': return self.faker.state() elif entity_type == 'SSN' or entity_type == 'US_SSN': return self.faker.ssn() elif entity_type == 'DRIVER_LICENSE' or entity_type == 'US_DRIVER_LICENSE': return self.faker.bothify('?#######') elif entity_type == 'CRYPTO': return self.faker.cryptocurrency_code() + self.faker.bothify('??##??##??##??') # Medical entity generation elif entity_type == 'DOCTORNAME': return f"Dr. {self.faker.last_name()}" elif entity_type == 'PATIENTID' or entity_type == 'MEDICALID': return self.faker.bothify('PT#######') elif entity_type == 'HEIGHT': # Generate a realistic height in feet and inches feet = self.faker.random_int(min=4, max=6) inches = self.faker.random_int(min=0, max=11) return f"{feet}'{inches}\"" elif entity_type == 'WEIGHT': # Generate a realistic weight in kg weight = self.faker.random_int(min=45, max=100) return f"{weight}kg" elif entity_type == 'BLOOD_TYPE': # Generate a random blood type blood_groups = ['A+', 'A-', 'B+', 'B-', 'AB+', 'AB-', 'O+', 'O-'] return self.faker.random_element(blood_groups) else: # Fallback for unknown types return f"[SYNTHETIC_{entity_type}]" except Exception as e: print(f"Error generating synthetic value: {str(e)}") return f"[SYNTHETIC_{entity_type}]" def process_text(self, text: str, model_type: str = "main", protection_method: str = "replace") -> Dict[str, Any]: """ Process text to detect and protect PII Args: text: Input text to process model_type: Type of model to use ("main", "medical") protection_method: Protection method ("replace", "mask", "synthesize") Returns: Dict containing protected text and detected entities """ # Step 1: Get entities from regex regex_entities = self.regex_detection(text) # Step 2: Get entities from NER model ner_entities = self.ner_detection(text, model_type) # Step 3: Combine and process entities all_entities = regex_entities + ner_entities merged_entities = self.merge_entities(all_entities) final_entities = self.remove_overlapping_entities(merged_entities) # Step 4: Create protected text based on method protected_text = text # Sort entities by start position in reverse to avoid index issues when replacing final_entities_sorted = sorted(final_entities, key=lambda x: x['start'], reverse=True) if protection_method == "mask": # Mask with asterisks for entity in final_entities_sorted: mask = '*' * len(entity['text']) protected_text = protected_text[:entity['start']] + mask + protected_text[entity['end']:] elif protection_method == "synthesize": # Replace with synthetic values for entity in final_entities_sorted: synthetic = self.generate_synthetic_value(entity['label'], entity['text']) protected_text = protected_text[:entity['start']] + synthetic + protected_text[entity['end']:] else: # replace (default) # Replace with entity tags for entity in final_entities_sorted: tag = f"[{entity['label']}]" protected_text = protected_text[:entity['start']] + tag + protected_text[entity['end']:] # Create findings table findings = [] for i, entity in enumerate(final_entities): findings.append({ "index": i, "entity_type": entity['label'], "text": entity['text'], "start": entity['start'], "end": entity['end'], "confidence": round(entity.get('score', 1.0), 2) }) return { "protected_text": protected_text, "entities": final_entities, "findings": findings } # Example input text example_text = """ Hi, my name is John Doe and I'm originally from Delhi. On 11/10/2024 I visited https://www.google.com and sent an email to abc@gmail.com, from IP 192.168.0.1. My phone number: +91-1234321216. """ medical_example_text = """ Patient name: John Doe Date of Birth: 05/12/1982 Patient ID: PT789456 Contact: +91-9876543210 Dr. Robert Johnson has prescribed medication penicillin on 12/12/2024. Blood type: O+, Height: 5'6", Weight: 145kg """ # Create Gradio Interface def process_input(text, model_type, protection_method): # Initialize pipeline with Hugging Face model paths main_model_name = "Kashish-jain/pii-protection-model" medical_model_name = "Kashish-jain/pii-protection-medical" use_medical = model_type == "medical" pipeline = EnhancedPiiProtectionPipeline( main_model_name=main_model_name, medical_model_name=medical_model_name, use_medical_model=use_medical ) # Process the text result = pipeline.process_text(text, model_type, protection_method) # Create findings table if result["findings"]: df = pd.DataFrame(result["findings"]) df = df.rename(columns={ "index": "#", "entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "confidence": "Confidence" }) else: df = pd.DataFrame(columns=["#", "Entity type", "Text", "Start", "End", "Confidence"]) # Count detected entities by type if result["findings"]: entity_counts = df["Entity type"].value_counts().to_dict() entity_summary = ", ".join([f"{count} {entity}" for entity, count in entity_counts.items()]) else: entity_summary = "No entities detected" return result["protected_text"], df, entity_summary # Update input text based on model type def update_input_text(model_type): if model_type == "medical": return medical_example_text else: return example_text # Custom CSS for a minimalistic, clean design custom_css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&family=Playfair+Display:wght@400;700&display=swap'); :root { --font-sans: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; --font-serif: 'Playfair Display', Georgia, Cambria, 'Times New Roman', Times, serif; --color-primary: #2563eb; --color-primary-light: #3b82f6; --color-primary-dark: #1d4ed8; --color-secondary: #64748b; --color-secondary-light: #94a3b8; --color-background: #00000f; --color-surface: #f8fafc; --color-border: #e2e8f0; --color-text: #1e293b; --color-text-light: #64748b; --color-success: #10b981; --color-warning: #f59e0b; --color-error: #ef4444; --shadow-sm: 0 1px 2px 0 rgba(0, 0, 0, 0.05); --shadow: 0 1px 3px 0 rgba(0, 0, 0, 0.1), 0 1px 2px 0 rgba(0, 0, 0, 0.06); --shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); --shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05); --radius-sm: 0.25rem; --radius: 0.375rem; --radius-md: 0.5rem; --radius-lg: 0.75rem; --spacing-1: 0.25rem; --spacing-2: 0.5rem; --spacing-3: 0.75rem; --spacing-4: 1rem; --spacing-6: 1.5rem; --spacing-8: 2rem; --spacing-12: 3rem; } body, .gradio-container { font-family: var(--font-sans); color: var(--color-text); background-color: var(--color-background); line-height: 1.5; } /* Typography */ h1, h2, h3 { font-family: var(--font-serif); font-weight: 700; line-height: 1.2; margin-bottom: var(--spacing-4); } h1 { font-size: 2.25rem; color: var(--color-text-light); } h2 { font-size: 1.5rem; color: var(--color-text); } h3 { font-size: 1.25rem; color: var(--color-text); } p { margin-bottom: var(--spacing-4); } /* Layout Components */ .container { max-width: 1500px; margin: 0 auto; padding: var(--spacing-6); } .card { background-color: var(--color-surface); border-radius: var(--radius); box-shadow: var(--shadow); padding: var(--spacing-6); margin-bottom: var(--spacing-6); border: 1px solid var(--color-border); } /* Form Elements */ .gradio-button.primary { background-color: var(--color-secondary-light); color: white; font-weight: 500; border-radius: var(--radius); padding: var(--spacing-3) var(--spacing-6); transition: all 0.2s ease; border: none; box-shadow: var(--shadow); } .gradio-button.primary:hover { background-color: var(--color-secondary); box-shadow: var(--shadow-md); transform: translateY(-1px); } .gradio-button.primary:active { transform: translateY(0); } .gradio-dropdown, .gradio-textbox, .gradio-textarea { border-radius: var(--radius); border: 1px solid var(--color-border); padding: var(--spacing-3); background-color: var(--color-background); transition: border-color 0.2s ease; } .gradio-dropdown:focus, .gradio-textbox:focus, .gradio-textarea:focus { border-color: var(--color-primary-light); outline: none; box-shadow: 0 0 0 3px rgba(37, 99, 235, 0.1); } /* Tabs */ .gradio-tabs { margin-bottom: var(--spacing-6); } .gradio-tab-button { padding: var(--spacing-3) var(--spacing-6); font-weight: 500; color: var(--color-text-light); border-bottom: 2px solid transparent; transition: all 0.2s ease; } .gradio-tab-button.selected { color: var(--color-primary); border-bottom-color: var(--color-primary); } /* Accordion */ .gradio-accordion { border: 1px solid var(--color-border); border-radius: var(--radius); margin-bottom: var(--spacing-6); overflow: hidden; } .gradio-accordion-header { padding: var(--spacing-4); font-weight: 500; background-color: var(--color-surface); border-bottom: 1px solid var(--color-border); cursor: pointer; } .gradio-accordion-content { padding: var(--spacing-4); background-color: var(--color-background); } /* Table */ table { width: 100%; border-collapse: collapse; margin-bottom: var(--spacing-6); } th { background-color: var(--color-surface); padding: var(--spacing-3) var(--spacing-4); text-align: left; font-weight: 600; color: var(--color-text); border-bottom: 2px solid var(--color-border); } td { padding: var(--spacing-3) var(--spacing-4); border-bottom: 1px solid var(--color-border); } /* Dark mode support */ @media (prefers-color-scheme: dark) { :root { --color-background: #0f172a; --color-surface: #1e293b; --color-border: #334155; --color-text: #f8fafc; --color-text-light: #cbd5e1; } } /* Custom components */ .entity-badge { display: inline-block; padding: 0.25rem 0.5rem; border-radius: 9999px; font-size: 0.75rem; font-weight: 500; background-color: var(--color-primary-light); color: white; margin-right: 0.5rem; margin-bottom: 0.5rem; } .summary-container { background-color: var(--color-surface); border-radius: var(--radius); padding: var(--spacing-4); margin-bottom: var(--spacing-6); border: 1px solid var(--color-border); } .icon-text { display: flex; align-items: center; gap: var(--spacing-2); } .icon-text svg { width: 1.25rem; height: 1.25rem; color: var(--color-primary); } /* Responsive adjustments */ @media (max-width: 768px) { .container { padding: var(--spacing-4); } h1 { font-size: 1.75rem; } .card { padding: var(--spacing-4); } } """ # Create the Gradio interface with enhanced styling with gr.Blocks(css=custom_css, theme=gr.themes.Base()) as demo: # Header section with gr.Column(elem_classes="container"): gr.Markdown(""" # 🛡️ PII Protection Tool Detect, protect and de-identify personally identifiable information. """) # Main content area with gr.Column(elem_classes="card"): # Configuration section with gr.Row(): with gr.Column(scale=1): model_dropdown = gr.Dropdown( choices=[ ("General Purpose", "main"), ("Medical Context", "medical") ], value="main", label="Model Type", elem_classes="form-control" ) with gr.Column(scale=1): protection_dropdown = gr.Dropdown( choices=[ ("Replace with Tags", "replace"), ("Mask with Asterisks", "mask"), ("Generate Synthetic Data", "synthesize") ], value="replace", label="Protection Method", elem_classes="form-control" ) # Divider gr.Markdown("---") # Input/Output section with gr.Row(): # Input column with gr.Column(): gr.Markdown("### Input Text") input_text = gr.TextArea( label="", value=example_text, lines=10, elem_classes="text-input" ) # Output column with gr.Column(): gr.Markdown("### Protected Output") output_text = gr.TextArea( label="", lines=10, elem_classes="text-output" ) # Summary section with gr.Column(elem_classes="summary-container"): gr.Markdown("### Entity Summary") entity_summary = gr.Textbox( label="", interactive=False, elem_classes="entity-summary" ) # Action button submit_btn = gr.Button( "Process Text", variant="primary", elem_classes="submit-button" ) # Findings section with gr.Column(elem_classes="card"): gr.Markdown("### Detected Entities") findings_table = gr.DataFrame( headers=["#", "Entity type", "Text", "Start", "End", "Confidence"], elem_classes="findings-table" ) # Help section with gr.Accordion("Help & Information", open=False, elem_classes="help-accordion"): gr.Markdown(""" #### De-identification Methods - **Replace with Tags**: Replaces each detected entity with its entity type tag (e.g., [NAME]) - **Mask with Asterisks**: Replaces each detected entity with asterisks (*) - **Generate Synthetic Data**: Replaces each detected entity with realistic synthetic data #### Model Types - **General Purpose**: Optimized for common PII elements - **Medical Context**: Enhanced detection for healthcare-related PII #### Entity Types Detected - **Personal**: NAME, EMAIL, PHONENUMBER, DOB - **Financial**: CREDITCARDNUMBER, ACCOUNTNUMBER, PAN, IBAN_CODE, SSN - **Location**: ADDRESS, CITY, STATE, PINCODE, IPV4 - **Medical**: DOCTORNAME, PATIENTID, MEDICALID - **Other**: URL, PASSPORT, DRIVER_LICENSE """) # Set up event handlers submit_btn.click( fn=process_input, inputs=[input_text, model_dropdown, protection_dropdown], outputs=[output_text, findings_table, entity_summary] ) model_dropdown.change( fn=update_input_text, inputs=[model_dropdown], outputs=[input_text] ) # Launch the app if __name__ == "__main__": demo.launch()