import time import streamlit as st import pandas as pd import io from transformers import pipeline from streamlit_extras.stylable_container import stylable_container import plotly.express as px import zipfile import os from comet_ml import Experiment import re import numpy as np import json from cryptography.fernet import Fernet st.set_page_config(layout="wide", page_title="Named Entity Recognition App") # --- Configuration --- COMET_API_KEY = os.environ.get("COMET_API_KEY") COMET_WORKSPACE = os.environ.get("COMET_WORKSPACE") COMET_PROJECT_NAME = os.environ.get("COMET_PROJECT_NAME") comet_initialized = False if COMET_API_KEY and COMET_WORKSPACE and COMET_PROJECT_NAME: comet_initialized = True # --- Persistent Counter and History Configuration --- COUNTER_FILE = "counter_json_finder.json" HISTORY_FILE = "file_history_json_finder.json" max_attempts = 300 # --- Functions to manage persistent data --- def load_attempts(): """ Loads the attempts count from a persistent JSON file. Returns 0 if the file doesn't exist or is invalid. """ if os.path.exists(COUNTER_FILE): try: with open(COUNTER_FILE, "r") as f: data = json.load(f) return data.get('file_upload_attempts', 0) except (json.JSONDecodeError, KeyError): return 0 return 0 def save_attempts(attempts): """ Saves the current attempts count to the persistent JSON file. """ with open(COUNTER_FILE, "w") as f: json.dump({'file_upload_attempts': attempts}, f) def load_history(): """ Loads the file upload history from a persistent JSON file. Returns an empty list if the file doesn't exist or is invalid. """ if os.path.exists(HISTORY_FILE): try: with open(HISTORY_FILE, "r") as f: data = json.load(f) return data.get('uploaded_files', []) except (json.JSONDecodeError, KeyError): return [] return [] def save_history(history): """ Saves the current file upload history to the persistent JSON file. """ with open(HISTORY_FILE, "w") as f: json.dump({'uploaded_files': history}, f) def clear_history_data(): """Clears the file history from session state and deletes the persistent file.""" if os.path.exists(HISTORY_FILE): os.remove(HISTORY_FILE) st.session_state['uploaded_files_history'] = [] st.rerun() # --- Initialize session state with persistent data --- if 'file_upload_attempts' not in st.session_state: st.session_state['file_upload_attempts'] = load_attempts() # Save to ensure the file exists on first run save_attempts(st.session_state['file_upload_attempts']) if 'uploaded_files_history' not in st.session_state: st.session_state['uploaded_files_history'] = load_history() # Save to ensure the file exists on first run save_history(st.session_state['uploaded_files_history']) if 'encrypted_extracted_text' not in st.session_state: st.session_state['encrypted_extracted_text'] = None if 'json_dataframe' not in st.session_state: st.session_state['json_dataframe'] = None # Define the categories and their associated entity labels ENTITY_LABELS_CATEGORIZED = { "Persons": ["PER"], "Locations": ["LOC"], "Organizations": ["ORG"], "Miscellaneous": ["MISC"], } # Create a mapping from each specific entity label to its category LABEL_TO_CATEGORY_MAP = { label: category for category, labels in ENTITY_LABELS_CATEGORIZED.items() for label in labels } @st.cache_resource def load_ner_model(): """ Loads the pre-trained NER model ("saattrupdan/nbailab-base-ner-scandi") and caches it. This model is specifically trained for Scandinavian languages. """ try: return pipeline( "token-classification", model="saattrupdan/nbailab-base-ner-scandi", aggregation_strategy="max", ignore_labels=["O"], stride=128 ) except Exception as e: st.error(f"Failed to load NER model. Please check your internet connection or model availability: {e}") st.stop() @st.cache_resource def load_encryption_key(): """ Loads the Fernet encryption key from environment variables. This key is crucial for encrypting/decrypting sensitive data. It's cached as a resource to be loaded only once. """ try: # Get the key string from environment variables key_str = os.environ.get("FERNET_KEY") if not key_str: raise ValueError("FERNET_KEY environment variable not set. Cannot perform encryption/decryption.") # Fernet key must be bytes, so encode the string key_bytes = key_str.encode('utf-8') return Fernet(key_bytes) except ValueError as ve: st.error( f"Configuration Error: {ve}. Please ensure the 'FERNET_KEY' environment variable is set securely " "in your deployment environment (e.g., Hugging Face Spaces secrets, Render environment variables) " "or in a local .env file for development." ) st.stop() # Stop the app if the key is not found, as security is compromised except Exception as e: st.error(f"An unexpected error occurred while loading encryption key: {e}. Please check your key format and environment settings.") st.stop() # Initialize the Fernet cipher instance globally (cached) fernet = load_encryption_key() def encrypt_text(text_content: str) -> bytes: """ Encrypts a string using the loaded Fernet cipher. The input string is first encoded to UTF-8 bytes. """ return fernet.encrypt(text_content.encode('utf-8')) def decrypt_text(encrypted_bytes: bytes) -> str | None: """ Decrypts bytes using the loaded Fernet cipher. Returns the decrypted string, or None if decryption fails (e.g., tampering). """ try: return fernet.decrypt(encrypted_bytes).decode('utf-8') except Exception as e: st.error(f"Decryption failed. This might indicate data tampering or an incorrect encryption key. Error: {e}") return None # --- UI Elements --- st.subheader("Scandinavian JSON Entity Finder", divider="orange") st.link_button("by nlpblogs", "https://nlpblogs.com", type="tertiary") expander = st.expander("**Important notes on the Scandinavian JSON Entity Finder**") expander.write(''' **Named Entities:** This Scandinavian JSON Entity Finder predicts four (4) labels (“PER: person”, “LOC: location”, “ORG: organization”, “MISC: miscellaneous”). Results are presented in an easy-to-read table, visualized in an interactive tree map, pie chart, and bar chart, and are available for download along with a Glossary of tags. **How to Use:** Upload your JSON file. Then, click the 'Results' button to extract and tag entities in your text data. **Usage Limits:** You can request results up to 300 times within a 30-day period. **Language settings:** Please check and adjust the language settings in your computer, so the Danish, Swedish, Norwegian, Icelandic and Faroese characters are handled properly in your downloaded file. **Customization:** To change the app's background color to white or black, click the three-dot menu on the right-hand side of your app, go to Settings and then Choose app theme, colors and fonts. **Technical issues:** If your connection times out, please refresh the page or reopen the app's URL. For any errors or inquiries, please contact us at info@nlpblogs.com ''') with st.sidebar: # --- Added Persistent History Display --- st.subheader("Your File Upload History", divider="orange") if st.session_state['uploaded_files_history']: history_to_display = st.session_state['uploaded_files_history'] history_df = pd.DataFrame(history_to_display) st.dataframe(history_df, use_container_width=True, hide_index=True) # Add a clear history button if st.button("Clear File History", help="This will permanently delete the file history from the application."): clear_history_data() else: st.info("You have not uploaded any files yet.") st.subheader("Build your own NER Web App in a minute without writing a single line of code.", divider="orange") st.link_button("NER File Builder", "https://nlpblogs.com/shop/named-entity-recognition-ner/ner-file-builder/", type="primary") uploaded_file = st.file_uploader("Choose a JSON file", type=["json"]) # Initialize text for the current run outside the if uploaded_file block # This will be populated if a file is uploaded, otherwise it remains None current_run_text = None if uploaded_file is not None: try: # Read the content as bytes first, then decode for JSON parsing file_contents_bytes = uploaded_file.read() # Reset the file pointer after reading, so json.load can read from the beginning uploaded_file.seek(0) dados = json.load(uploaded_file) # Attempt to convert JSON to DataFrame and extract text try: st.session_state['json_dataframe'] = pd.DataFrame(dados) # Concatenate all content into a single string for NER df_string_representation = st.session_state['json_dataframe'].to_string(index=False, header=False) # Simple regex to remove non-alphanumeric characters but keep spaces and periods text_content = re.sub(r'[^\w\s.]', '', df_string_representation) # Remove the specific string "Empty DataFrame Columns" if it appears due to conversion text_content = text_content.replace("Empty DataFrame Columns", "").strip() current_run_text = text_content # Set text for current run if not current_run_text.strip(): # Check if text is effectively empty st.warning("No meaningful text could be extracted from the JSON DataFrame for analysis.") current_run_text = None # Reset to None if empty except ValueError: # If direct conversion to DataFrame fails, try to extract strings directly from JSON structure st.info("JSON data could not be directly converted to a simple DataFrame for display. Attempting to extract text directly.") extracted_texts_list = [] if isinstance(dados, list): for item in dados: if isinstance(item, str): extracted_texts_list.append(item) elif isinstance(item, dict): # Recursively get string values from dicts in a list for val in item.values(): if isinstance(val, str): extracted_texts_list.append(val) elif isinstance(val, list): for sub_val in val: if isinstance(sub_val, str): extracted_texts_list.append(sub_val) elif isinstance(dados, dict): # Get string values from a dictionary for value in dados.values(): if isinstance(value, str): extracted_texts_list.append(value) elif isinstance(value, list): for sub_val in value: if isinstance(sub_val, str): extracted_texts_list.append(sub_val) if extracted_texts_list: current_run_text = " ".join(extracted_texts_list).strip() else: st.warning("No string text could be extracted from the JSON for analysis.") current_run_text = None if current_run_text: # --- ADDING TO UPLOAD HISTORY --- new_upload_entry = { "filename": uploaded_file.name, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # Append the new file to the session state history st.session_state['uploaded_files_history'].append(new_upload_entry) # Save the updated history to the persistent file save_history(st.session_state['uploaded_files_history']) # --- END OF HISTORY ADDITION --- # --- ENCRYPT THE EXTRACTED TEXT BEFORE STORING IN SESSION STATE --- encrypted_text_bytes = encrypt_text(current_run_text) st.session_state['encrypted_extracted_text'] = encrypted_text_bytes # Optionally clear the unencrypted version from session state if you only want the encrypted one # st.session_state['extracted_text_for_ner'] = None st.success("JSON file uploaded successfully. File content encrypted and secured. Due to security protocols, the file content is hidden.") st.divider() else: st.session_state['encrypted_extracted_text'] = None # st.session_state['extracted_text_for_ner'] = None st.error("Could not extract meaningful text from the uploaded JSON file.") except json.JSONDecodeError as e: st.error(f"JSON Decode Error: {e}") st.error("Please ensure the uploaded file contains valid JSON data.") st.session_state['encrypted_extracted_text'] = None st.session_state['json_dataframe'] = None except Exception as e: st.error(f"An unexpected error occurred during file processing: {e}") st.session_state['encrypted_extracted_text'] = None st.session_state['json_dataframe'] = None # --- Results Button and Processing Logic --- if st.button("Results"): start_time_overall = time.time() # Start time for overall processing if not comet_initialized: st.warning("Comet ML not initialized. Check environment variables if you wish to log data.") # Check attempts limit BEFORE running the model if st.session_state['file_upload_attempts'] >= max_attempts: st.error(f"You have requested results {max_attempts} times. You have reached your daily request limit.") st.stop() # --- DECRYPT THE TEXT BEFORE PASSING TO NER MODEL --- text_for_ner = None if st.session_state['encrypted_extracted_text'] is not None: text_for_ner = decrypt_text(st.session_state['encrypted_extracted_text']) if text_for_ner is None or not text_for_ner.strip(): st.warning("No extractable text content available for analysis. Please upload a valid JSON file.") st.stop() # Increment the attempts counter and save it to the persistent file st.session_state['file_upload_attempts'] += 1 save_attempts(st.session_state['file_upload_attempts']) with st.spinner("Analyzing text...", show_time=True): model = load_ner_model() # Measure NER model processing time start_time_ner = time.time() text_entities = model(text_for_ner) # Use the decrypted text end_time_ner = time.time() ner_processing_time = end_time_ner - start_time_ner df = pd.DataFrame(text_entities) if 'word' in df.columns: # Ensure 'word' column is string type before applying regex if df['word'].dtype == 'object': pattern = r'[^\w\s]' # Regex to remove non-alphanumeric characters but keep spaces and periods df['word'] = df['word'].astype(str).replace(pattern, '', regex=True) else: st.warning("The 'word' column is not of string type; skipping character cleaning.") else: st.error("The 'word' column does not exist in the DataFrame. Cannot perform cleaning.") st.stop() # Stop execution if the column is missing # Replace empty strings with 'Unknown' and drop rows with NaN after cleaning df = df.replace('', 'Unknown').dropna() if df.empty: st.warning("No entities were extracted from the uploaded text.") st.stop() # --- Add 'category' column to the DataFrame based on the grouped labels --- df['category'] = df['entity_group'].map(LABEL_TO_CATEGORY_MAP) # Handle cases where an entity_group might not have a category df['category'] = df['category'].fillna('Uncategorized') if comet_initialized: experiment = Experiment( api_key=COMET_API_KEY, workspace=COMET_WORKSPACE, project_name=COMET_PROJECT_NAME, ) experiment.log_parameter("input_text_length", len(text_for_ner)) experiment.log_table("predicted_entities", df) experiment.log_metric("ner_processing_time_seconds", ner_processing_time) # --- Display Results --- st.subheader("Extracted Entities", divider="rainbow") properties = {"border": "2px solid gray", "color": "blue", "font-size": "16px"} df_styled = df.style.set_properties(**properties) st.dataframe(df_styled, use_container_width=True) with st.expander("See Glossary of tags"): st.write(''' '**word**': ['entity extracted from your text data'] '**score**': ['accuracy score; how accurately a tag has been assigned to a given entity'] '**entity_group**': ['label (tag) assigned to a given extracted entity'] '**start**': ['index of the start of the corresponding entity'] '**end**': ['index of the end of the corresponding entity'] '**category**': ['the broader category the entity belongs to'] ''') st.subheader("Grouped entities", divider="orange") # Get unique categories and sort them for consistent tab order unique_categories = sorted(df['category'].unique()) tabs_per_row = 4 # Adjust as needed for better layout # Loop through categories in chunks to create rows of tabs for i in range(0, len(unique_categories), tabs_per_row): current_row_categories = unique_categories[i : i + tabs_per_row] tabs = st.tabs(current_row_categories) for j, category in enumerate(current_row_categories): with tabs[j]: df_filtered = df[df["category"] == category] if not df_filtered.empty: st.dataframe(df_filtered, use_container_width=True) else: st.info(f"No '{category}' entities found in the text.") # Display an empty DataFrame for consistency if no entities are found st.dataframe(pd.DataFrame({ 'entity_group': [np.nan], 'score': [np.nan], 'word': [np.nan], 'start': [np.nan], 'end': [np.nan], 'category': [category] }), hide_index=True) st.divider() # --- Visualizations --- st.subheader("Tree map", divider="orange") fig_treemap = px.treemap(df, path=[px.Constant("all"), 'category', 'entity_group', 'word'], values='score', color='category', color_discrete_map={ 'Persons': 'blue', 'Locations': 'green', 'Organizations': 'red', 'Miscellaneous': 'purple', 'Uncategorized': 'gray' }) fig_treemap.update_layout(margin=dict(t=50, l=25, r=25, b=25)) st.plotly_chart(fig_treemap) if comet_initialized: experiment.log_figure(figure=fig_treemap, figure_name="entity_treemap") # Group by category and entity_group to get counts for pie and bar charts grouped_counts = df.groupby('category').size().reset_index(name='count') col1, col2 = st.columns(2) with col1: st.subheader("Pie Chart", divider="orange") fig_pie = px.pie(grouped_counts, values='count', names='category', hover_data=['count'], labels={'count': 'count'}, title='Percentage of predicted categories') fig_pie.update_traces(textposition='inside', textinfo='percent+label') st.plotly_chart(fig_pie) if comet_initialized: experiment.log_figure(figure=fig_pie, figure_name="category_pie_chart") with col2: st.subheader("Bar Chart", divider="orange") fig_bar = px.bar(grouped_counts, x="count", y="category", color="category", text_auto=True, title='Occurrences of predicted categories') st.plotly_chart(fig_bar) if comet_initialized: experiment.log_figure(figure=fig_bar, figure_name="category_bar_chart") # --- Downloadable Content --- dfa = pd.DataFrame( data={ 'Column Name': ['word', 'entity_group', 'score', 'start', 'end', 'category'], 'Description': [ 'entity extracted from your text data', 'label (tag) assigned to a given extracted entity', 'accuracy score; how accurately a tag has been assigned to a given entity', 'index of the start of the corresponding entity', 'index of the end of the corresponding entity', 'the broader category the entity belongs to', ] } ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as myzip: myzip.writestr("Summary of the results.csv", df.to_csv(index=False)) myzip.writestr("Glossary of tags.csv", dfa.to_csv(index=False)) with stylable_container( key="download_button", css_styles="""button { background-color: yellow; border: 1px solid black; padding: 5px; color: black; }""", ): st.download_button( label="Download zip file", data=buf.getvalue(), file_name="nlpblogs_ner_results.zip", mime="application/zip", ) if comet_initialized: experiment.log_asset(buf.getvalue(), file_name="downloadable_results.zip") st.divider() if comet_initialized: experiment.end() end_time_overall = time.time() elapsed_time_overall = end_time_overall - start_time_overall st.info(f"Results processed in **{elapsed_time_overall:.2f} seconds**.") st.write(f"Number of times you requested results: **{st.session_state['file_upload_attempts']}/{max_attempts}**")