milanchndr's picture
Update utils.py
bfbe579 verified
import re
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from typing import List, Dict, Any
import spacy
try:
nlp = spacy.load("en_core_web_lg")
except Exception as e:
print(f"Error loading spacy model: {e}")
raise
def clean_text(text: str) -> str:
"""
Remove HTML tags and normalize whitespace in the input text.
"""
text = re.sub(r"<[^>]+>", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def setup_presidio_analyzer():
analyzer = AnalyzerEngine()
# Aadhar number pattern recognization
aadhar_pattern = Pattern(
name="AADHAR_PATTERN",
regex=r"\b\d{4}\s\d{4}\s\d{4}\b",
score=0.9
)
aadhar_recognizer = PatternRecognizer(
supported_entity="AADHAR_NUM",
patterns=[aadhar_pattern]
)
# credit card: 16 digits in groups of 4
credit_card_pattern = Pattern(
name="CREDIT_CARD_PATTERN",
regex=r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
score=0.85
)
credit_card_recognizer = PatternRecognizer(
supported_entity="CREDIT_DEBIT_NO",
patterns=[credit_card_pattern]
)
# Expiry date: MM/YY or MM/YYYY
expiry_pattern = Pattern(
name="EXPIRY_PATTERN",
regex=r"\b(0[1-9]|1[0-2])[/\-](0?[0-9]|[0-9]{2}|[0-9]{4})\b",
score=0.8
)
expiry_recognizer = PatternRecognizer(
supported_entity="EXPIRY_NO",
patterns=[expiry_pattern]
)
# DOB: DD-MM-YYYY or DD/MM/YYYY
dob_pattern = Pattern(
name="DOB_PATTERN",
regex=r"\b(0[1-9]|[12][0-9]|3[01])[-/](0[1-9]|1[0-2])[-/](19|20)\d{2}\b",
score=0.9
)
dob_recognizer = PatternRecognizer(
supported_entity="DOB",
patterns=[dob_pattern]
)
# Phone number
phone_pattern = Pattern(
name="PHONE_PATTERN",
regex=r"(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}",
score=0.8
)
phone_recognizer = PatternRecognizer(
supported_entity="PHONE_NUMBER",
patterns=[phone_pattern]
)
# Register recognizers
analyzer.registry.add_recognizer(credit_card_recognizer)
analyzer.registry.add_recognizer(aadhar_recognizer)
analyzer.registry.add_recognizer(expiry_recognizer)
analyzer.registry.add_recognizer(dob_recognizer)
analyzer.registry.add_recognizer(phone_recognizer)
return analyzer
def post_process_dates(text: str, entities: List[Dict]) -> List[Dict]:
"""Reclassify dates based on context keywords."""
for entity in entities:
if entity["entity_type"] in ["DOB", "EXPIRY_NO"]:
start, end = entity["start"], entity["end"]
context_start = max(0, start - 30)
context_end = min(len(text), end + 30)
snippet = text[context_start:context_end].lower()
# Keywords for DOB
dob_keywords = ["born", "birth", "dob", "date of birth"]
# Keywords for expiry
expiry_keywords = ["expiry", "exp", "expires", "valid until", "valid till"]
if any(keyword in snippet for keyword in dob_keywords):
entity["entity_type"] = "DOB"
elif any(keyword in snippet for keyword in expiry_keywords):
entity["entity_type"] = "EXPIRY_NO"
return entities
def resolve_overlapping_entities(entities: List[Dict]) -> List[Dict]:
"""Remove overlapping entities, keeping the one with higher confidence."""
if not entities:
return entities
# Sort by start position
entities.sort(key=lambda x: x["start"])
resolved_entities = []
for current in entities:
if not resolved_entities:
resolved_entities.append(current)
continue
last = resolved_entities[-1]
# Check for overlap
if current["start"] < last["end"]:
current_score = current.get("score", 0)
last_score = last.get("score", 0)
current_length = current["end"] - current["start"]
last_length = last["end"] - last["start"]
if (current_score > last_score or
(current_score == last_score and current_length > last_length)):
# Replace last with current
resolved_entities[-1] = current
else:
# No overlap, add current entity
resolved_entities.append(current)
return resolved_entities
def detect_cvv_from_context(text: str) -> List[Dict]:
"""
Detect CVV numbers based on context keywords and patterns
"""
cvv_entities = []
# CVV keywords that typically precede CVV numbers
cvv_keywords = [
r"cvv",
r"cvc",
r"security\s+code",
r"card\s+verification",
r"verification\s+code",
r"card\s+security\s+code",
r"three\s+digit\s+code",
r"four\s+digit\s+code"
]
# Search keyword patterns
for keyword in cvv_keywords:
# keyword followed by optional separators and 3-4 digits
pattern = rf"(?i){keyword}[\s:,\-]*(\d{{3,4}})"
for match in re.finditer(pattern, text):
cvv_digits = match.group(1)
digit_start = match.start(1)
digit_end = match.end(1)
#validation to ensure it's likely a CVV
if len(cvv_digits) in [3, 4]:
# Checking context to avoid false positives
context_start = max(0, match.start() - 20)
context_end = min(len(text), match.end() + 20)
context = text[context_start:context_end].lower()
# Keywords to avoid in cvv dectection
false_positive_keywords = [
"year", "date", "phone", "zip", "postal",
"age", "quantity", "amount", "price"
]
# Checking for a false positive
is_likely_cvv = not any(fp_keyword in context for fp_keyword in false_positive_keywords)
if is_likely_cvv:
cvv_entities.append({
"entity_type": "CVV_NO",
"start": digit_start,
"end": digit_end,
"entity": cvv_digits,
"score": 0.9,
"context": context.strip()
})
# Also looking for standalone 3-4 digit numbers near card-related keywords
card_keywords = [
r"card", r"credit", r"debit", r"payment", r"expire", r"expiry", r"valid"
]
# Find all 3-4 digi
digit_pattern = r"\b(\d{3,4})\b"
for digit_match in re.finditer(digit_pattern, text):
digit_text = digit_match.group(1)
digit_start = digit_match.start(1)
digit_end = digit_match.end(1)
# Checking if this digit sequence is near card-related keywords
context_start = max(0, digit_start - 50)
context_end = min(len(text), digit_end + 50)
context = text[context_start:context_end].lower()
# Checking if card-related keywords in context
has_card_context = any(re.search(rf"\b{keyword}\b", context) for keyword in card_keywords)
# Checking for things to avoid card, price or date
likely_cvv_context = (
has_card_context and
not re.search(r"\d{4}[-/]\d{2}[-/]\d{2,4}", context) and
not re.search(r"\$\d+", context) and
not re.search(r"\d{4}\s*\d{4}\s*\d{4}\s*\d{4}", context) and
len(digit_text) in [3, 4]
)
if likely_cvv_context:
# To avoid duplicates
is_duplicate = any(
existing["start"] == digit_start and existing["end"] == digit_end
for existing in cvv_entities
)
if not is_duplicate:
cvv_entities.append({
"entity_type": "CVV_NO",
"start": digit_start,
"end": digit_end,
"entity": digit_text,
"score": 0.7,
"context": context.strip()
})
return cvv_entities
def mask_pii(text: str) -> Dict[str, Any]:
"""
Mask personally identifiable information in the given text.
"""
analyzer = setup_presidio_analyzer()
anonymizer = AnonymizerEngine()
# Clean the input text
cleaned_text = clean_text(text)
# Detect PII on cleaned text
analyzer_results = analyzer.analyze(
text=cleaned_text,
entities=[
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"DOB", "AADHAR_NUM", "CREDIT_DEBIT_NO",
"EXPIRY_NO"
],
language="en"
)
# Map entity types to consistent naming
entity_mapping = {
"PERSON": "full_name",
"EMAIL_ADDRESS": "email",
"PHONE_NUMBER": "phone_number",
"DOB": "dob",
"AADHAR_NUM": "aadhar_num",
"CREDIT_DEBIT_NO": "credit_debit_no",
"CVV_NO": "cvv_no",
"EXPIRY_NO": "expiry_no",
}
# Convert analyzer results to our format
entities = []
for result in analyzer_results:
entity_type = result.entity_type
start, end = result.start, result.end
entity_text = cleaned_text[start:end]
score = result.score
entities.append({
"entity_type": entity_type,
"start": start,
"end": end,
"entity": entity_text,
"score": score
})
# context-based CVV detection
cvv_entities = detect_cvv_from_context(cleaned_text)
entities.extend(cvv_entities)
# Post-process dates based on context
entities = post_process_dates(cleaned_text, entities)
# Resolving overlapping entities
entities = resolve_overlapping_entities(entities)
# final masked entities list
masked_entities = []
for entity in entities:
classification = entity_mapping.get(entity["entity_type"], entity["entity_type"].lower())
masked_entities.append({
"position": [entity["start"], entity["end"]],
"classification": classification,
"entity": entity["entity"],
})
# Sort entities by position
masked_entities.sort(key=lambda x: x["position"][0])
# Recreate analyzer results for anonymization with resolved positions
final_analyzer_results = []
for entity in entities:
from presidio_analyzer import RecognizerResult
result = RecognizerResult(
entity_type=entity["entity_type"],
start=entity["start"],
end=entity["end"],
score=entity.get("score", 0.9)
)
final_analyzer_results.append(result)
# Anonymize the cleaned text
anonymized = anonymizer.anonymize(
text=cleaned_text,
analyzer_results=final_analyzer_results,
operators={
"PERSON": OperatorConfig("replace", {"new_value": "[full_name]"}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[email]"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[phone_number]"}),
"DOB": OperatorConfig("replace", {"new_value": "[dob]"}),
"AADHAR_NUM": OperatorConfig("replace", {"new_value": "[aadhar_num]"}),
"CREDIT_DEBIT_NO": OperatorConfig("replace", {"new_value": "[credit_debit_no]"}),
"CVV_NO": OperatorConfig("replace", {"new_value": "[cvv_no]"}),
"EXPIRY_NO": OperatorConfig("replace", {"new_value": "[expiry_no]"}),
}
)
return {
"masked_email": anonymized.text,
"list_of_masked_entities": masked_entities,
}