Spaces:
Runtime error
Runtime error
import os | |
import torch | |
import matplotlib.pyplot as plt | |
from wordcloud import WordCloud | |
from gliner import GLiNER | |
import spacy | |
import re | |
# Load spaCy English model | |
nlp = spacy.load("en_core_web_sm") | |
# Function to remove stop words | |
def preprocess_text(text): | |
doc = nlp(text) | |
filtered_words = [ token.lemma_ # Convert to base form | |
for token in doc | |
if not token.is_stop # Remove stop words | |
and not token.is_punct # Remove punctuation | |
and not token.is_digit # Remove numbers | |
and len(token.text) > 2 ] | |
return " ".join(filtered_words) | |
# π Define input/output directories | |
input_dir = "summaryoutput" # Folder containing summarized documents | |
entity_output_dir = "extracted_entities" # Folder to save extracted entities | |
os.makedirs(entity_output_dir, exist_ok=True) | |
# β‘ Load Named Entity Recognition (NER) model | |
device = "cuda" if torch.cuda.is_available() else "cpu" # Use GPU if available | |
model = GLiNER.from_pretrained("urchade/gliner_medium-v2.1").to(device) | |
# Define labels for entity extraction | |
labels = ["person", "organization", "location", "date", "document", "event", "role", "cryptonym", "operation", "nationality", "contact","SUBJECT","REFERENCE","FROM","TO","DATE","REF","INFO"] | |
# Get already processed files | |
existing_entity_files = set(os.listdir(entity_output_dir)) | |
def extract_entities(text): | |
entities = model.predict_entities(text, labels) | |
extracted = {} | |
regex_patterns = { | |
"TO": r"(?i)\bTO[:\s]+([^\n]+)", | |
"FROM": r"(?i)\bFROM[:\s]+([^\n]+)", | |
"DATE": r"(?i)\bDATE[:\s]+([^\n]+)", | |
"REF": r"(?i)\bREF[:\s]+([^\n]+)", | |
"SUBJECT": r"(?i)\bSUBJECT[:\s]+([^\n]+)", | |
} | |
for label, pattern in regex_patterns.items(): | |
matches = re.findall(pattern, text) | |
if matches: | |
# Clean up matches | |
cleaned_matches = [m.strip().rstrip(')').lstrip(',') for m in matches] | |
extracted[label] = list(set(cleaned_matches)) # Remove duplicates | |
for entity in entities: | |
entity_type = entity["label"] | |
entity_text = entity["text"].strip().rstrip(')').lstrip(',') | |
if entity_type not in extracted: | |
extracted[entity_type] = [] | |
if entity_text not in extracted[entity_type]: # Avoid duplicates | |
extracted[entity_type].append(entity_text) | |
return extracted | |
# def extract_entities(text): | |
# entities = model.predict_entities(text, labels) | |
# extracted = {} | |
# regex_patterns = { | |
# "TO": r"(?i)\bTO[:\s]+([^\n]+)", # Matches "TO: some text" | |
# "FROM": r"(?i)\bFROM[:\s]+([^\n]+)", # Matches "FROM: some text" | |
# "DATE": r"(?i)\bDATE[:\s]+([^\n]+)", # Matches "DATE: some text" | |
# "REF": r"(?i)\bREF[:\s]+([^\n]+)", # Matches "REF: some text" | |
# "SUBJECT": r"(?i)\bSUBJECT[:\s]+([^\n]+)", # Matches "SUBJECT: some text" | |
# } | |
# # Apply regex patterns | |
# for label, pattern in regex_patterns.items(): | |
# matches = re.findall(pattern, text) | |
# if matches: | |
# extracted[label] = matches | |
# for entity in entities: | |
# entity_type = entity["label"] | |
# entity_text = entity["text"] | |
# if entity_type not in extracted: | |
# extracted[entity_type] = [] | |
# extracted[entity_type].append(entity_text) | |
# return extracted | |
# Function to generate word cloud | |
def generate_word_cloud(text, output_filename): | |
os.makedirs(os.path.dirname(output_filename), exist_ok=True) | |
filtered_text = preprocess_text(text) | |
wordcloud = WordCloud(width=800, height=400, background_color="white").generate(filtered_text) | |
wordcloud.to_file(output_filename) | |
# Save word cloud image | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation="bilinear") | |
plt.axis("off") | |
plt.savefig(output_filename, bbox_inches="tight") | |
plt.close() | |
# Process each document | |
def extract_entities_from_summaries(): | |
all_text = "" # Store all text for a combined word cloud | |
for filename in os.listdir(input_dir): | |
if filename.endswith(".md"): # Process Markdown files | |
entity_file = f"entities_{filename}" | |
word_cloud_file = f"wordcloud_{filename}.png" | |
entity_file_path = os.path.join(entity_output_dir, entity_file) | |
word_cloud_path = os.path.join(entity_output_dir, word_cloud_file) | |
# β Skip if entity file & word cloud already exist | |
if entity_file in existing_entity_files and word_cloud_file in existing_entity_files: | |
print(f"β© Skipping {filename}, already processed.") | |
continue | |
file_path = os.path.join(input_dir, filename) | |
with open(file_path, "r", encoding="utf-8") as file: | |
text = file.read() | |
all_text += text + "\n\n" # Collect text for a combined word cloud | |
# Extract entities | |
if entity_file not in existing_entity_files: | |
entities = extract_entities(text) | |
# Save extracted entities to a file | |
with open(entity_file_path, "w", encoding="utf-8") as f: | |
for entity_type, entity_words in entities.items(): | |
f.write(f"{entity_type}:") | |
f.write(", ".join(entity_words) + "\n\n") | |
print(f" Extracted entities saved for {filename} -> {entity_file_path}") | |
# Generate a word cloud for the document | |
if word_cloud_file not in existing_entity_files: | |
generate_word_cloud(text, word_cloud_path) | |
print(f"π₯ Word cloud saved for {filename} -> {word_cloud_path}") | |
# Generate a word cloud for the entire dataset | |
combined_word_cloud_path = os.path.join(entity_output_dir, "wordcloud_combined.png") | |
if all_text.strip() and "wordcloud_combined.png" not in existing_entity_files: | |
generate_word_cloud(all_text, combined_word_cloud_path) | |
print(f"π₯ Combined word cloud saved -> {combined_word_cloud_path}") | |
print(" Entity extraction and word cloud generation completed!") | |
if __name__ == "__main__"and False: | |
extract_entities_from_summaries() | |