import re from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig from typing import List, Dict, Any import spacy try: nlp = spacy.load("en_core_web_lg") except Exception as e: print(f"Error loading spacy model: {e}") raise def clean_text(text: str) -> str: """ Remove HTML tags and normalize whitespace in the input text. """ text = re.sub(r"<[^>]+>", "", text) text = re.sub(r"\s+", " ", text).strip() return text def setup_presidio_analyzer(): analyzer = AnalyzerEngine() # Aadhar number pattern recognization aadhar_pattern = Pattern( name="AADHAR_PATTERN", regex=r"\b\d{4}\s\d{4}\s\d{4}\b", score=0.9 ) aadhar_recognizer = PatternRecognizer( supported_entity="AADHAR_NUM", patterns=[aadhar_pattern] ) # credit card: 16 digits in groups of 4 credit_card_pattern = Pattern( name="CREDIT_CARD_PATTERN", regex=r"\b(?:\d{4}[-\s]?){3}\d{4}\b", score=0.85 ) credit_card_recognizer = PatternRecognizer( supported_entity="CREDIT_DEBIT_NO", patterns=[credit_card_pattern] ) # Expiry date: MM/YY or MM/YYYY expiry_pattern = Pattern( name="EXPIRY_PATTERN", regex=r"\b(0[1-9]|1[0-2])[/\-](0?[0-9]|[0-9]{2}|[0-9]{4})\b", score=0.8 ) expiry_recognizer = PatternRecognizer( supported_entity="EXPIRY_NO", patterns=[expiry_pattern] ) # DOB: DD-MM-YYYY or DD/MM/YYYY dob_pattern = Pattern( name="DOB_PATTERN", regex=r"\b(0[1-9]|[12][0-9]|3[01])[-/](0[1-9]|1[0-2])[-/](19|20)\d{2}\b", score=0.9 ) dob_recognizer = PatternRecognizer( supported_entity="DOB", patterns=[dob_pattern] ) # Phone number phone_pattern = Pattern( name="PHONE_PATTERN", regex=r"(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}", score=0.8 ) phone_recognizer = PatternRecognizer( supported_entity="PHONE_NUMBER", patterns=[phone_pattern] ) # Register recognizers analyzer.registry.add_recognizer(credit_card_recognizer) analyzer.registry.add_recognizer(aadhar_recognizer) analyzer.registry.add_recognizer(expiry_recognizer) analyzer.registry.add_recognizer(dob_recognizer) analyzer.registry.add_recognizer(phone_recognizer) return analyzer def post_process_dates(text: str, entities: List[Dict]) -> List[Dict]: """Reclassify dates based on context keywords.""" for entity in entities: if entity["entity_type"] in ["DOB", "EXPIRY_NO"]: start, end = entity["start"], entity["end"] context_start = max(0, start - 30) context_end = min(len(text), end + 30) snippet = text[context_start:context_end].lower() # Keywords for DOB dob_keywords = ["born", "birth", "dob", "date of birth"] # Keywords for expiry expiry_keywords = ["expiry", "exp", "expires", "valid until", "valid till"] if any(keyword in snippet for keyword in dob_keywords): entity["entity_type"] = "DOB" elif any(keyword in snippet for keyword in expiry_keywords): entity["entity_type"] = "EXPIRY_NO" return entities def resolve_overlapping_entities(entities: List[Dict]) -> List[Dict]: """Remove overlapping entities, keeping the one with higher confidence.""" if not entities: return entities # Sort by start position entities.sort(key=lambda x: x["start"]) resolved_entities = [] for current in entities: if not resolved_entities: resolved_entities.append(current) continue last = resolved_entities[-1] # Check for overlap if current["start"] < last["end"]: current_score = current.get("score", 0) last_score = last.get("score", 0) current_length = current["end"] - current["start"] last_length = last["end"] - last["start"] if (current_score > last_score or (current_score == last_score and current_length > last_length)): # Replace last with current resolved_entities[-1] = current else: # No overlap, add current entity resolved_entities.append(current) return resolved_entities def detect_cvv_from_context(text: str) -> List[Dict]: """ Detect CVV numbers based on context keywords and patterns """ cvv_entities = [] # CVV keywords that typically precede CVV numbers cvv_keywords = [ r"cvv", r"cvc", r"security\s+code", r"card\s+verification", r"verification\s+code", r"card\s+security\s+code", r"three\s+digit\s+code", r"four\s+digit\s+code" ] # Search keyword patterns for keyword in cvv_keywords: # keyword followed by optional separators and 3-4 digits pattern = rf"(?i){keyword}[\s:,\-]*(\d{{3,4}})" for match in re.finditer(pattern, text): cvv_digits = match.group(1) digit_start = match.start(1) digit_end = match.end(1) #validation to ensure it's likely a CVV if len(cvv_digits) in [3, 4]: # Checking context to avoid false positives context_start = max(0, match.start() - 20) context_end = min(len(text), match.end() + 20) context = text[context_start:context_end].lower() # Keywords to avoid in cvv dectection false_positive_keywords = [ "year", "date", "phone", "zip", "postal", "age", "quantity", "amount", "price" ] # Checking for a false positive is_likely_cvv = not any(fp_keyword in context for fp_keyword in false_positive_keywords) if is_likely_cvv: cvv_entities.append({ "entity_type": "CVV_NO", "start": digit_start, "end": digit_end, "entity": cvv_digits, "score": 0.9, "context": context.strip() }) # Also looking for standalone 3-4 digit numbers near card-related keywords card_keywords = [ r"card", r"credit", r"debit", r"payment", r"expire", r"expiry", r"valid" ] # Find all 3-4 digi digit_pattern = r"\b(\d{3,4})\b" for digit_match in re.finditer(digit_pattern, text): digit_text = digit_match.group(1) digit_start = digit_match.start(1) digit_end = digit_match.end(1) # Checking if this digit sequence is near card-related keywords context_start = max(0, digit_start - 50) context_end = min(len(text), digit_end + 50) context = text[context_start:context_end].lower() # Checking if card-related keywords in context has_card_context = any(re.search(rf"\b{keyword}\b", context) for keyword in card_keywords) # Checking for things to avoid card, price or date likely_cvv_context = ( has_card_context and not re.search(r"\d{4}[-/]\d{2}[-/]\d{2,4}", context) and not re.search(r"\$\d+", context) and not re.search(r"\d{4}\s*\d{4}\s*\d{4}\s*\d{4}", context) and len(digit_text) in [3, 4] ) if likely_cvv_context: # To avoid duplicates is_duplicate = any( existing["start"] == digit_start and existing["end"] == digit_end for existing in cvv_entities ) if not is_duplicate: cvv_entities.append({ "entity_type": "CVV_NO", "start": digit_start, "end": digit_end, "entity": digit_text, "score": 0.7, "context": context.strip() }) return cvv_entities def mask_pii(text: str) -> Dict[str, Any]: """ Mask personally identifiable information in the given text. """ analyzer = setup_presidio_analyzer() anonymizer = AnonymizerEngine() # Clean the input text cleaned_text = clean_text(text) # Detect PII on cleaned text analyzer_results = analyzer.analyze( text=cleaned_text, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "DOB", "AADHAR_NUM", "CREDIT_DEBIT_NO", "EXPIRY_NO" ], language="en" ) # Map entity types to consistent naming entity_mapping = { "PERSON": "full_name", "EMAIL_ADDRESS": "email", "PHONE_NUMBER": "phone_number", "DOB": "dob", "AADHAR_NUM": "aadhar_num", "CREDIT_DEBIT_NO": "credit_debit_no", "CVV_NO": "cvv_no", "EXPIRY_NO": "expiry_no", } # Convert analyzer results to our format entities = [] for result in analyzer_results: entity_type = result.entity_type start, end = result.start, result.end entity_text = cleaned_text[start:end] score = result.score entities.append({ "entity_type": entity_type, "start": start, "end": end, "entity": entity_text, "score": score }) # context-based CVV detection cvv_entities = detect_cvv_from_context(cleaned_text) entities.extend(cvv_entities) # Post-process dates based on context entities = post_process_dates(cleaned_text, entities) # Resolving overlapping entities entities = resolve_overlapping_entities(entities) # final masked entities list masked_entities = [] for entity in entities: classification = entity_mapping.get(entity["entity_type"], entity["entity_type"].lower()) masked_entities.append({ "position": [entity["start"], entity["end"]], "classification": classification, "entity": entity["entity"], }) # Sort entities by position masked_entities.sort(key=lambda x: x["position"][0]) # Recreate analyzer results for anonymization with resolved positions final_analyzer_results = [] for entity in entities: from presidio_analyzer import RecognizerResult result = RecognizerResult( entity_type=entity["entity_type"], start=entity["start"], end=entity["end"], score=entity.get("score", 0.9) ) final_analyzer_results.append(result) # Anonymize the cleaned text anonymized = anonymizer.anonymize( text=cleaned_text, analyzer_results=final_analyzer_results, operators={ "PERSON": OperatorConfig("replace", {"new_value": "[full_name]"}), "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[email]"}), "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[phone_number]"}), "DOB": OperatorConfig("replace", {"new_value": "[dob]"}), "AADHAR_NUM": OperatorConfig("replace", {"new_value": "[aadhar_num]"}), "CREDIT_DEBIT_NO": OperatorConfig("replace", {"new_value": "[credit_debit_no]"}), "CVV_NO": OperatorConfig("replace", {"new_value": "[cvv_no]"}), "EXPIRY_NO": OperatorConfig("replace", {"new_value": "[expiry_no]"}), } ) return { "masked_email": anonymized.text, "list_of_masked_entities": masked_entities, }