# Set environment variables IMMEDIATELY to prevent root filesystem access # This must happen before any other imports or operations import os import tempfile import json from datetime import datetime # Get a writable temp directory first try: TEMP_DIR = os.path.join(tempfile.gettempdir(), "docling_temp") os.makedirs(TEMP_DIR, exist_ok=True) except Exception: try: TEMP_DIR = "/tmp/docling_temp" os.makedirs(TEMP_DIR, exist_ok=True) except Exception: TEMP_DIR = os.getcwd() # Set all environment variables that libraries might use os.environ.update({ # Streamlit configuration 'STREAMLIT_SERVER_FILE_WATCHER_TYPE': 'none', 'STREAMLIT_SERVER_HEADLESS': 'true', 'STREAMLIT_BROWSER_GATHER_USAGE_STATS': 'false', 'STREAMLIT_SERVER_ENABLE_CORS': 'false', 'STREAMLIT_SERVER_ENABLE_XSRF_PROTECTION': 'false', # EasyOCR configuration 'EASYOCR_MODULE_PATH': os.path.join(TEMP_DIR, 'easyocr_models'), 'HOME': TEMP_DIR, 'USERPROFILE': TEMP_DIR, 'XDG_CACHE_HOME': os.path.join(TEMP_DIR, 'cache'), 'XDG_CONFIG_HOME': os.path.join(TEMP_DIR, 'config'), 'XDG_DATA_HOME': os.path.join(TEMP_DIR, 'data'), # Hugging Face Hub configuration - CRITICAL for preventing /.cache access 'HF_HOME': os.path.join(TEMP_DIR, 'huggingface'), 'HF_CACHE_HOME': os.path.join(TEMP_DIR, 'huggingface_cache'), 'HF_HUB_CACHE': os.path.join(TEMP_DIR, 'huggingface_cache'), 'TRANSFORMERS_CACHE': os.path.join(TEMP_DIR, 'transformers_cache'), 'HF_DATASETS_CACHE': os.path.join(TEMP_DIR, 'datasets_cache'), 'DIFFUSERS_CACHE': os.path.join(TEMP_DIR, 'diffusers_cache'), 'ACCELERATE_CACHE': os.path.join(TEMP_DIR, 'accelerate_cache'), # Additional Hugging Face specific variables 'HF_HUB_DISABLE_TELEMETRY': '1', 'HF_HUB_DISABLE_IMPLICIT_TOKEN': '1', 'HF_HUB_OFFLINE': '0', # Other ML libraries 'TORCH_HOME': os.path.join(TEMP_DIR, 'torch'), 'TENSORFLOW_HOME': os.path.join(TEMP_DIR, 'tensorflow'), 'KERAS_HOME': os.path.join(TEMP_DIR, 'keras'), 'MLFLOW_TRACKING_URI': f'file:{os.path.join(TEMP_DIR, "mlruns")}', # Additional cache directories 'CACHE_DIR': os.path.join(TEMP_DIR, 'cache'), 'MODEL_CACHE_DIR': os.path.join(TEMP_DIR, 'models'), # Additional environment variables to prevent root access 'PYTHONPATH': TEMP_DIR, 'TMPDIR': TEMP_DIR, 'TEMP': TEMP_DIR, 'TMP': TEMP_DIR, 'CACHE': os.path.join(TEMP_DIR, 'cache'), 'MODELS': os.path.join(TEMP_DIR, 'models'), 'DATA': os.path.join(TEMP_DIR, 'data'), 'CONFIG': os.path.join(TEMP_DIR, 'config'), }) # Create all necessary directories directories_to_create = [ os.environ['EASYOCR_MODULE_PATH'], os.environ['XDG_CACHE_HOME'], os.environ['XDG_CONFIG_HOME'], os.environ['XDG_DATA_HOME'], os.environ['HF_HOME'], os.environ['HF_CACHE_HOME'], os.environ['TRANSFORMERS_CACHE'], os.environ['HF_DATASETS_CACHE'], os.environ['TORCH_HOME'], os.environ['TENSORFLOW_HOME'], os.environ['KERAS_HOME'], os.environ['CACHE_DIR'], os.environ['MODEL_CACHE_DIR'], os.environ['CACHE'], os.environ['MODELS'], os.environ['DATA'], os.environ['CONFIG'], os.environ['HF_HUB_CACHE'], os.environ['DIFFUSERS_CACHE'], os.environ['ACCELERATE_CACHE'], ] for directory in directories_to_create: try: # Create directory and all parent directories os.makedirs(directory, mode=0o777, exist_ok=True) # Ensure the directory has write permissions os.chmod(directory, 0o777) except Exception as e: print(f"Warning: Could not create directory {directory}: {e}") # Now import the rest of the modules import streamlit as st import logging import shutil from processing.document_processor import DocumentProcessor from processing.sections import ReasoningSectionExtractor from utils.logging_utils import get_log_handler from utils.cost_tracker import cost_tracker from dotenv import load_dotenv import sys import difflib import time # Configure logging early to avoid issues logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", stream=sys.stdout, force=True ) # Load environment variables from .env load_dotenv() AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY") AZURE_OPENAI_VERSION = os.getenv("AZURE_OPENAI_VERSION") AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") # Log startup information logging.info("=" * 50) logging.info("Docling Streamlit App Starting") logging.info(f"Temp directory: {TEMP_DIR}") logging.info(f"EasyOCR model directory: {os.environ.get('EASYOCR_MODULE_PATH', 'NOT_SET')}") logging.info(f"Hugging Face cache: {os.environ.get('HF_CACHE_HOME', 'NOT_SET')}") logging.info(f"Current working directory: {os.getcwd()}") logging.info(f"Python version: {sys.version}") logging.info("=" * 50) def cleanup_temp_files(): """Clean up temporary files in the temp directory.""" try: if os.path.exists(TEMP_DIR): for filename in os.listdir(TEMP_DIR): file_path = os.path.join(TEMP_DIR, filename) if os.path.isfile(file_path): try: os.remove(file_path) logging.info(f"Removed temp file: {filename}") except PermissionError as e: logging.warning(f"Permission error removing {filename}: {e}") except Exception as e: logging.warning(f"Error removing {filename}: {e}") logging.info(f"Cleaned up temporary files in {TEMP_DIR}") else: logging.info(f"Temp directory {TEMP_DIR} does not exist") except PermissionError as e: logging.warning(f"Permission error accessing temp directory {TEMP_DIR}: {e}") except Exception as e: logging.warning(f"Error cleaning up temp files: {e}") def clear_all_data(): """Clear all temporary files and session state data.""" try: # Clean up temp files cleanup_temp_files() # Clear session state if "processed_results" in st.session_state: del st.session_state.processed_results if "logs" in st.session_state: del st.session_state.logs if "original_structures" in st.session_state: del st.session_state.original_structures if "show_original" in st.session_state: del st.session_state.show_original if "show_processed" in st.session_state: del st.session_state.show_processed if "temp_cleaned" in st.session_state: del st.session_state.temp_cleaned if "last_cleanup_time" in st.session_state: del st.session_state.last_cleanup_time logging.info("Cleared all session state and temporary files") return True except Exception as e: logging.error(f"Error clearing all data: {e}") return False def get_temp_files_info(): """Get information about temporary files (count and total size).""" try: if not os.path.exists(TEMP_DIR): return 0, 0 files = os.listdir(TEMP_DIR) total_size = 0 file_details = [] for filename in files: try: file_path = os.path.join(TEMP_DIR, filename) if os.path.isfile(file_path): file_size = os.path.getsize(file_path) total_size += file_size file_details.append({ 'name': filename, 'size': file_size, 'type': 'file' }) elif os.path.isdir(file_path): file_details.append({ 'name': filename, 'size': 0, 'type': 'directory' }) except (PermissionError, OSError) as e: logging.warning(f"Error accessing file {filename}: {e}") file_details.append({ 'name': filename, 'size': 0, 'type': 'error' }) continue # Log detailed information for debugging if file_details: logging.info(f"Temp directory contents ({TEMP_DIR}):") for detail in file_details: logging.info(f" - {detail['name']} ({detail['type']}): {detail['size']} bytes") return len(files), total_size except PermissionError as e: logging.warning(f"Permission error accessing temp directory {TEMP_DIR}: {e}") return 0, 0 except Exception as e: logging.warning(f"Error getting temp files info: {e}") return 0, 0 def format_file_size(size_bytes): """Format file size in human readable format.""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB"] i = 0 while size_bytes >= 1024 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" def save_uploaded_file(uploaded_file, filename): """Save uploaded file to temp directory and return the path.""" temp_path = os.path.join(TEMP_DIR, f"temp_{filename}") try: uploaded_file.seek(0) # Reset file pointer to beginning file_bytes = uploaded_file.read() with open(temp_path, "wb") as f: f.write(file_bytes) logging.info(f"Saved uploaded file to {temp_path}") return temp_path except PermissionError as e: logging.error(f"Permission error saving uploaded file to {temp_path}: {e}") raise PermissionError(f"Cannot save file due to permission restrictions. Please try clearing data or contact support.") except Exception as e: logging.error(f"Error saving uploaded file: {e}") raise # Configure page layout to use wide mode st.set_page_config( page_title="Medical Document Parser & Redactor", page_icon="๐Ÿ“„", layout="wide", initial_sidebar_state="collapsed" ) # Add custom CSS for better styling st.markdown(""" """, unsafe_allow_html=True) # Configure root logger only once (avoid duplicate handlers on reruns) if len(logging.getLogger().handlers) == 0: logging.getLogger().setLevel(logging.INFO) # (We will attach custom handlers during processing as needed) # Title and description st.title("Medical Document Parser & Redactor") st.write(""" Upload PDF medical documents to parse their content using **Docling** (structure-aware parser) and automatically **redact specific sections** (e.g., initial and final medication lists). Use the buttons below to view the original structure or process with redaction. **๐Ÿ’ก Tip:** This is a Hugging Face Space with limited storage. Use the "Clear All Data" button to remove temporary files when you're done processing documents. """) # Add clear all data button at the top if st.button("๐Ÿงน Clear All Data", type="secondary", help="Remove all temporary files and reset the application"): if clear_all_data(): st.success("โœ… All data cleared successfully! The application has been reset.") cost_tracker.reset_session() # Reset cost tracking when clearing data st.rerun() else: st.error("โŒ Error clearing data. Please try again.") # File uploader (accept multiple PDF files) uploaded_files = st.file_uploader("Upload PDF medical documents", type=["pdf"], accept_multiple_files=True) # Clean up temp files on app start (but keep the directory) if "temp_cleaned" not in st.session_state: cleanup_temp_files() st.session_state.temp_cleaned = True # Initialize session state storage for results and logs if "processed_results" not in st.session_state: st.session_state.processed_results = {} # {filename: {"structured_json": ..., "redacted_md": ..., "redacted_json": ...}} if "logs" not in st.session_state: st.session_state.logs = {} # {filename: log_text} if "original_structures" not in st.session_state: st.session_state.original_structures = {} # {filename: structured_json} # Show temp directory status and cleanup button temp_file_count, total_size = get_temp_files_info() # Automatic cleanup: if temp files are too old or too large, clean them up if "last_cleanup_time" not in st.session_state: st.session_state.last_cleanup_time = time.time() # Check if we should do automatic cleanup (every 30 minutes or if files are too large) current_time = time.time() time_since_cleanup = current_time - st.session_state.last_cleanup_time if (time_since_cleanup > 1800 or # 30 minutes total_size > 100 * 1024 * 1024): # 100MB if temp_file_count > 0: cleanup_temp_files() st.session_state.last_cleanup_time = current_time st.info("๐Ÿงน Automatic cleanup: Removed old temporary files") # Recalculate after cleanup temp_file_count, total_size = get_temp_files_info() # Create a row with temp file status and delete button col1, col2 = st.columns([3, 1]) with col1: if temp_file_count > 0: st.caption(f"๐Ÿ“ {temp_file_count} temporary file(s) - Total size: {format_file_size(total_size)}") # Show warning if total size is large if total_size > 50 * 1024 * 1024: # 50MB st.warning("โš ๏ธ Large temporary files detected. Consider clearing data to free up space.") # Debug: Show temp files (expandable) with st.expander("๐Ÿ” Debug: View temporary files"): try: if os.path.exists(TEMP_DIR): files = os.listdir(TEMP_DIR) if files: st.write("**Temporary files in directory:**") for filename in files: file_path = os.path.join(TEMP_DIR, filename) try: if os.path.isfile(file_path): size = os.path.getsize(file_path) st.write(f"๐Ÿ“„ {filename} ({format_file_size(size)})") elif os.path.isdir(file_path): st.write(f"๐Ÿ“ {filename} (directory)") else: st.write(f"โ“ {filename} (unknown)") except Exception as e: st.write(f"โŒ {filename} (error: {e})") else: st.write("No files found in temp directory") else: st.write("Temp directory does not exist") except Exception as e: st.write(f"Error accessing temp directory: {e}") else: st.caption("๐Ÿ“ No temporary files") with col2: if temp_file_count > 0: if st.button("๐Ÿ—‘๏ธ Delete Temp Files", type="secondary", help="Remove all temporary files from the server"): try: cleanup_temp_files() st.success(f"โœ… Successfully deleted {temp_file_count} temporary file(s)") st.rerun() # Refresh the page to update the file count except Exception as e: st.error(f"โŒ Error deleting temporary files: {e}") else: st.caption("No files to delete") if uploaded_files: # UI to select which file to work with (if multiple files uploaded) file_names = [f.name for f in uploaded_files] selected_file = st.selectbox("Select a file to work with", options=file_names) if selected_file: # Find the selected uploaded file uploaded_file = next(f for f in uploaded_files if f.name == selected_file) # Create buttons for different actions col1, col2, col3, col4, col5 = st.columns(5) with col1: if st.button("๐Ÿ“„ Show Original", type="primary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Also store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Display the original structure st.session_state.show_original = True st.session_state.show_processed = False with col2: if st.button("๐Ÿ”’ Process with Redaction"): # Process the document with redaction if selected_file not in st.session_state.processed_results: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Ensure the deployment name is in the cost tracker if AZURE_OPENAI_DEPLOYMENT and AZURE_OPENAI_DEPLOYMENT not in cost_tracker.get_available_models(): model_type = cost_tracker.guess_model_type(AZURE_OPENAI_DEPLOYMENT) cost_tracker.add_deployment_pricing(AZURE_OPENAI_DEPLOYMENT, model_type) # Use the new processing function from processing.document_processor import process_document_with_redaction # Attach an in-memory log handler to capture logs for this file log_handler, log_buffer = get_log_handler() root_logger = logging.getLogger() root_logger.addHandler(log_handler) try: # Process the document using the new function processing_result = process_document_with_redaction( file_path=temp_path, endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_KEY, api_version=AZURE_OPENAI_VERSION, deployment=AZURE_OPENAI_DEPLOYMENT, ) # Save results in session state (maintaining compatibility with existing UI) st.session_state.processed_results[selected_file] = { "structured_json": processing_result.original_document_json, "redacted_md": processing_result.redacted_document_md, "redacted_json": processing_result.redacted_document_json, # Now this is actually redacted! "original_markdown": processing_result.original_document_md, "processing_result": processing_result # Store the new result } finally: # Remove handler and stop capturing logs root_logger.removeHandler(log_handler) # Combine log records into a single text log_text = "\n".join(log_buffer) st.session_state.logs[selected_file] = log_text st.session_state.show_original = False st.session_state.show_processed = True with col3: if st.button("๐Ÿ”„ Switch View"): # Toggle between views if st.session_state.get("show_original", False): st.session_state.show_original = False st.session_state.show_processed = True else: st.session_state.show_original = True st.session_state.show_processed = False with col4: if st.button("๐Ÿ“„ Show Original JSON", type="secondary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Store the original YAML for comparison st.session_state.original_structures[f"{selected_file}_yaml"] = result.structured_yaml # Display the original JSON structure st.session_state.show_original = True st.session_state.show_processed = False st.session_state.show_json = True st.session_state.show_yaml = False with col5: if st.button("๐Ÿ“„ Show Original YAML", type="secondary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Store the original YAML for comparison st.session_state.original_structures[f"{selected_file}_yaml"] = result.structured_yaml # Display the original YAML structure st.session_state.show_original = True st.session_state.show_processed = False st.session_state.show_json = False st.session_state.show_yaml = True # Show current view status if st.session_state.get("show_original", False): st.info("๐Ÿ“„ Currently viewing: **Original Document Structure**") elif st.session_state.get("show_processed", False): st.success("๐Ÿ”’ Currently viewing: **Processed Document with Redaction**") else: st.info("โ„น๏ธ Select an action above to view document content") # Display results based on button clicked if st.session_state.get("show_original", False): st.markdown("---") # Determine what to show based on button clicked show_json = st.session_state.get("show_json", False) show_yaml = st.session_state.get("show_yaml", False) if show_json: st.subheader(f"Original Document Structure (JSON) - {selected_file}") elif show_yaml: st.subheader(f"Original Document Structure (YAML) - {selected_file}") else: st.subheader(f"Original Document Structure (Markdown) - {selected_file}") # Get the original structure original_json = st.session_state.original_structures[selected_file] original_markdown = st.session_state.original_structures.get(f"{selected_file}_markdown", "") original_yaml = st.session_state.original_structures.get(f"{selected_file}_yaml", "") # Display PDF viewer and content side by side col1, col2 = st.columns([1, 1]) with col1: st.subheader("๐Ÿ“„ Original PDF") # Reset file pointer to beginning uploaded_file.seek(0) # Display PDF using base64 encoding for inline display import base64 pdf_bytes = uploaded_file.getvalue() b64_pdf = base64.b64encode(pdf_bytes).decode() pdf_display = f'' st.markdown(pdf_display, unsafe_allow_html=True) with col2: if show_json: st.subheader("๐Ÿ“‹ Original Document (JSON)") st.caption("Docling-generated JSON structure from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original JSON content", value=json.dumps(original_json, indent=2, ensure_ascii=False), height=600, key="original_json_display", label_visibility="collapsed" ) elif show_yaml: st.subheader("๐Ÿ“‹ Original Document (YAML)") st.caption("Docling-generated YAML structure from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original YAML content", value=original_yaml, height=600, key="original_yaml_display", label_visibility="collapsed" ) else: st.subheader("๐Ÿ“‹ Original Document (Markdown)") st.caption("Docling-generated markdown from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original markdown content", value=original_markdown, height=600, key="original_markdown_display", label_visibility="collapsed" ) # Add download buttons for the original content st.markdown("---") col1, col2, col3 = st.columns(3) with col1: if show_json: st.download_button( label="๐Ÿ“ฅ Download Original JSON", data=json.dumps(original_json, indent=2, ensure_ascii=False), file_name=f"{selected_file}_original.json", mime="application/json" ) elif show_yaml: st.download_button( label="๐Ÿ“ฅ Download Original YAML", data=original_yaml, file_name=f"{selected_file}_original.yaml", mime="text/yaml" ) else: st.download_button( label="๐Ÿ“ฅ Download Original Markdown", data=original_markdown, file_name=f"{selected_file}_original.md", mime="text/markdown" ) with col2: if show_json or show_yaml: st.subheader("๐Ÿ“Š Document Structure") st.json(original_json) else: st.subheader("๐Ÿ“Š JSON Structure") st.json(original_json) with col3: if show_json or show_yaml: # Show format information st.subheader("๐Ÿ“‹ Format Info") if show_json: st.info("**JSON Format**: Structured data representation with key-value pairs") st.write("**Use case**: API integration, data processing, programmatic access") elif show_yaml: st.info("**YAML Format**: Human-readable data serialization") st.write("**Use case**: Configuration files, documentation, easy reading") else: st.subheader("๐Ÿ“‹ Markdown Info") st.info("**Markdown Format**: Formatted text with headers, lists, and styling") st.write("**Use case**: Documentation, readable output, web display") elif st.session_state.get("show_processed", False): st.markdown("---") st.subheader(f"Processed Document - {selected_file}") # Retrieve stored results data = st.session_state.processed_results[selected_file] structured_json = data["structured_json"] redacted_md = data["redacted_md"] redacted_json = data["redacted_json"] original_md = data["original_markdown"] # Show processing summary original_texts = structured_json.get("texts", []) redacted_texts = redacted_json.get("texts", []) removed_count = len(original_texts) - len(redacted_texts) if removed_count > 0: st.success(f"โœ… Successfully removed {removed_count} text elements containing medication information") else: st.info("โ„น๏ธ No medication sections were identified for removal") # Create tabs for different views tab1, tab2, tab3 = st.tabs(["๐Ÿ“„ Side-by-Side Comparison", "๐Ÿ” JSON Structure", "๐Ÿ“Š Processing Details"]) with tab1: st.subheader("Original vs Redacted Content") st.caption("Compare the original document content with the redacted version") # Get the actual removed indices from the processing result actual_removed_indices = [] if "processing_result" in st.session_state.processed_results[selected_file]: processing_result = st.session_state.processed_results[selected_file]["processing_result"] actual_removed_indices = processing_result.removed_indices # Create a more intelligent side-by-side comparison based on JSON structure col1, col2 = st.columns(2) with col1: st.markdown("**๐Ÿ“‹ Original Document**") # Display original content with removed sections highlighted for i, text_elem in enumerate(original_texts): text_content = text_elem.get("text", "") label = text_elem.get("label", "") # Check if this element was removed is_removed = i in actual_removed_indices if is_removed: # Highlight removed content in red st.markdown(f"""
Text {i} ({label}) - REMOVED:
{text_content}
""", unsafe_allow_html=True) else: # Show normal content content_preview = text_content[:150] + "..." if len(text_content) > 150 else text_content st.markdown(f"""
Text {i} ({label}) - {len(text_content)} chars:
{content_preview}
""", unsafe_allow_html=True) with col2: st.markdown("**๐Ÿ”’ Redacted Document**") # Display redacted content (only non-removed elements) redacted_index = 0 for i, text_elem in enumerate(original_texts): text_content = text_elem.get("text", "") label = text_elem.get("label", "") # Check if this element was removed is_removed = i in actual_removed_indices if is_removed: # Show placeholder for removed content st.markdown(f"""
Text {i} ({label}) - REMOVED
[Content removed by redaction]
""", unsafe_allow_html=True) else: # Show the actual content from redacted texts if redacted_index < len(redacted_texts): redacted_content = redacted_texts[redacted_index].get("text", "") content_preview = redacted_content[:150] + "..." if len(redacted_content) > 150 else redacted_content st.markdown(f"""
Text {i} ({label}) - {len(redacted_content)} chars:
{content_preview}
""", unsafe_allow_html=True) redacted_index += 1 else: st.markdown(f"""
Text {i} ({label}):
[Content preserved]
""", unsafe_allow_html=True) # Add legend st.markdown("---") col1, col2 = st.columns(2) with col1: st.markdown("**๐ŸŽจ Comparison Legend:**") st.markdown("๐Ÿ”ด **Red background** = Removed content") st.markdown("โšช **White background** = Preserved content") st.markdown("๐Ÿ“ **Italic text** = Placeholder for removed content") with col2: st.markdown("**๐Ÿ’ก How to read:**") st.markdown("Left panel shows original with removed sections highlighted") st.markdown("Right panel shows redacted version with placeholders") st.markdown("Compare corresponding text indices to see changes") # Add debug information to help identify missing content with st.expander("๐Ÿ” Debug: Content Analysis"): st.write("**Searching for table content...**") # Search for table-related content in original texts table_elements = [] for i, text_elem in enumerate(original_texts): text_content = text_elem.get("text", "") label = text_elem.get("label", "") if "Bespreking" in text_content or "|" in text_content or "table" in label.lower(): table_elements.append({ "index": i, "label": label, "content": text_content[:200] + "..." if len(text_content) > 200 else text_content, "is_removed": i in actual_removed_indices }) if table_elements: st.write(f"**Found {len(table_elements)} table-related elements:**") for elem in table_elements: status = "๐Ÿ”ด REMOVED" if elem["is_removed"] else "โœ… PRESERVED" st.write(f"**Text {elem['index']} ({elem['label']}) - {status}:**") st.write(f"`{elem['content']}`") st.write("---") else: st.write("**No table-related content found in original texts**") # Also check redacted texts st.write("**Table content in redacted texts:**") table_elements_redacted = [] for i, text_elem in enumerate(redacted_texts): text_content = text_elem.get("text", "") label = text_elem.get("label", "") if "Bespreking" in text_content or "|" in text_content or "table" in label.lower(): table_elements_redacted.append({ "index": i, "label": label, "content": text_content[:200] + "..." if len(text_content) > 200 else text_content }) if table_elements_redacted: st.write(f"**Found {len(table_elements_redacted)} table-related elements in redacted content:**") for elem in table_elements_redacted: st.write(f"**Text {elem['index']} ({elem['label']}):**") st.write(f"`{elem['content']}`") st.write("---") else: st.write("**No table-related content found in redacted texts**") # Add download buttons for redacted content st.markdown("---") st.subheader("๐Ÿ“ฅ Download Redacted Content") col1, col2, col3 = st.columns(3) with col1: # Download redacted markdown st.download_button( label="๐Ÿ“„ Download Redacted Markdown", data=redacted_md, file_name=f"{selected_file}_redacted.md", mime="text/markdown", help="Download the redacted document as Markdown format" ) with col2: # Generate and download redacted PDF pdf_generated = False pdf_bytes = None if st.button("๐Ÿ“‹ Generate Redacted PDF", help="Generate a PDF version of the redacted document"): with st.spinner("Generating redacted PDF..."): try: # Create a DocumentProcessor to access PDF generation temp_path = save_uploaded_file(uploaded_file, selected_file) processor = DocumentProcessor(section_extractor=None) # Generate PDF path base_name = os.path.splitext(selected_file)[0] pdf_path = os.path.join(TEMP_DIR, f"{base_name}_redacted.pdf") # Generate the PDF success = processor.generate_redacted_pdf(redacted_json, pdf_path) if success: # Read the generated PDF and store for download with open(pdf_path, "rb") as pdf_file: pdf_bytes = pdf_file.read() pdf_generated = True st.success("โœ… PDF generated successfully!") else: st.error("โŒ Failed to generate PDF. Check logs for details.") except Exception as e: st.error(f"โŒ Error generating PDF: {e}") st.info("๐Ÿ’ก Make sure reportlab is installed: `pip install reportlab`") # Show download button if PDF was generated if pdf_generated and pdf_bytes: st.download_button( label="๐Ÿ“ฅ Download Redacted PDF", data=pdf_bytes, file_name=f"{os.path.splitext(selected_file)[0]}_redacted.pdf", mime="application/pdf", help="Download the redacted document as PDF" ) # Show debug information about what's in the PDF with st.expander("๐Ÿ” Debug: PDF Content Analysis"): st.write("**Content that will be included in the PDF:**") texts_in_pdf = redacted_json.get("texts", []) st.write(f"Total text elements: {len(texts_in_pdf)}") for i, text_elem in enumerate(texts_in_pdf): text_content = text_elem.get("text", "")[:100] + "..." if len(text_elem.get("text", "")) > 100 else text_elem.get("text", "") label = text_elem.get("label", "") st.write(f"**Text {i} ({label}):** {text_content}") elif not pdf_generated: st.info("๐Ÿ’ก Click 'Generate Redacted PDF' to create a PDF version") with col3: # Download redacted JSON structure st.download_button( label="๐Ÿ”ง Download Redacted JSON", data=json.dumps(redacted_json, indent=2, ensure_ascii=False), file_name=f"{selected_file}_redacted.json", mime="application/json", help="Download the redacted document structure as JSON" ) with tab2: st.subheader("Document Structure Analysis") # Show JSON structure comparison col1, col2 = st.columns(2) with col1: st.markdown("**๐Ÿ“Š Original Structure (JSON)**") st.json(structured_json) with col2: st.markdown("**๐Ÿ”’ Redacted Structure (JSON)**") st.json(redacted_json) with tab3: st.subheader("Processing Details") # Show cost analysis for this processing session st.subheader("๐Ÿ’ฐ Cost Analysis") # Get cost data from the processing result if "processing_result" in st.session_state.processed_results[selected_file]: processing_result = st.session_state.processed_results[selected_file]["processing_result"] col1, col2, col3 = st.columns(3) with col1: st.metric("Total Cost", f"${processing_result.cost:.4f}") with col2: st.metric("Input Tokens", f"{processing_result.input_tokens:,}") with col3: st.metric("Output Tokens", f"{processing_result.output_tokens:,}") # Add download button for cost report cost_report = { "timestamp": datetime.now().isoformat(), "total_cost": processing_result.cost, "input_tokens": processing_result.input_tokens, "output_tokens": processing_result.output_tokens, "total_tokens": processing_result.input_tokens + processing_result.output_tokens, "document_processed": selected_file, "model_used": AZURE_OPENAI_DEPLOYMENT } st.download_button( label="๐Ÿ“ฅ Download Cost Report (JSON)", data=json.dumps(cost_report, indent=2), file_name=f"cost_report_{selected_file}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", mime="application/json" ) # Show model information model_info = cost_tracker.get_model_info(AZURE_OPENAI_DEPLOYMENT) if model_info: st.subheader("Model Information") st.write(f"**Model:** {model_info.description}") st.write(f"**Input cost:** ${model_info.input_cost_per_1k_tokens:.4f}/1K tokens") st.write(f"**Output cost:** ${model_info.output_cost_per_1k_tokens:.4f}/1K tokens") # Calculate cost breakdown input_cost = (processing_result.input_tokens / 1000) * model_info.input_cost_per_1k_tokens output_cost = (processing_result.output_tokens / 1000) * model_info.output_cost_per_1k_tokens st.write(f"**Cost breakdown:** Input: ${input_cost:.4f}, Output: ${output_cost:.4f}") else: # Fallback to old cost summary method cost_summary = cost_tracker.get_session_summary() if cost_summary["usage_count"] > 0: col1, col2, col3 = st.columns(3) with col1: st.metric("Total Cost", f"${cost_summary['total_cost']:.4f}") with col2: st.metric("Total Tokens", f"{cost_summary['total_tokens']:,}") with col3: st.metric("API Calls", cost_summary["usage_count"]) # Add download button for cost report cost_report = { "timestamp": datetime.now().isoformat(), "total_cost": cost_summary["total_cost"], "total_tokens": cost_summary["total_tokens"], "api_calls": cost_summary["usage_count"], "model_breakdown": cost_summary["model_breakdown"], "document_processed": selected_file } st.download_button( label="๐Ÿ“ฅ Download Cost Report (JSON)", data=json.dumps(cost_report, indent=2), file_name=f"cost_report_{selected_file}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", mime="application/json" ) # Show detailed model breakdown if cost_summary["model_breakdown"]: st.subheader("Model Usage Breakdown") for model, stats in cost_summary["model_breakdown"].items(): model_info = cost_tracker.get_model_info(model) model_display_name = model_info.description if model_info else model with st.expander(f"{model_display_name} - ${stats['cost']:.4f}"): col1, col2 = st.columns(2) with col1: st.write(f"**Input tokens:** {stats['input_tokens']:,}") st.write(f"**Output tokens:** {stats['output_tokens']:,}") with col2: st.write(f"**Total tokens:** {stats['total_tokens']:,}") st.write(f"**API calls:** {stats['usage_count']}") # Show cost breakdown if model_info: input_cost = (stats['input_tokens'] / 1000) * model_info.input_cost_per_1k_tokens output_cost = (stats['output_tokens'] / 1000) * model_info.output_cost_per_1k_tokens st.write(f"**Cost breakdown:** Input: ${input_cost:.4f}, Output: ${output_cost:.4f}") else: st.info("No API calls recorded for this session") # Show what was removed if removed_count > 0: st.info(f"**Removed {removed_count} text elements from the document structure.**") # Show the removed text elements - use the actual indices from the processing result st.subheader("Removed Text Elements:") # Get the actual indices that were removed from the processing result if "processing_result" in st.session_state.processed_results[selected_file]: # Get the actual removed indices from the LLM response processing_result = st.session_state.processed_results[selected_file]["processing_result"] actual_removed_indices = processing_result.removed_indices if actual_removed_indices: st.info(f"**Elements removed by LLM analysis ({len(actual_removed_indices)} elements):**") for idx in actual_removed_indices: if idx < len(original_texts): text_content = original_texts[idx].get("text", "") st.text(f"Text {idx}: {text_content[:100]}{'...' if len(text_content) > 100 else ''}") else: st.text(f"Text {idx}: [Index out of bounds]") else: st.info("**No elements were identified for removal by the LLM.**") else: # Fallback to the old method if processing result not available st.warning("**Note: Using fallback calculation method**") removed_texts = [] for i, text_elem in enumerate(original_texts): if i >= len(redacted_texts) or text_elem.get("text", "") != redacted_texts[i].get("text", ""): removed_texts.append((i, text_elem.get("text", "")[:100] + "..." if len(text_elem.get("text", "")) > 100 else text_elem.get("text", ""))) for idx, text in removed_texts: st.text(f"Text {idx}: {text}") else: st.info("No text elements were removed during processing.") # Show processing logs st.subheader("Processing Logs") st.text_area( label="Processing logs", value=st.session_state.logs.get(selected_file, ""), height=300, label_visibility="collapsed" )