# Set environment variables IMMEDIATELY to prevent root filesystem access # This must happen before any other imports or operations import os import tempfile import json from datetime import datetime # Get a writable temp directory first try: TEMP_DIR = os.path.join(tempfile.gettempdir(), "docling_temp") os.makedirs(TEMP_DIR, exist_ok=True) except Exception: try: TEMP_DIR = "/tmp/docling_temp" os.makedirs(TEMP_DIR, exist_ok=True) except Exception: TEMP_DIR = os.getcwd() # Set all environment variables that libraries might use os.environ.update({ # Streamlit configuration 'STREAMLIT_SERVER_FILE_WATCHER_TYPE': 'none', 'STREAMLIT_SERVER_HEADLESS': 'true', 'STREAMLIT_BROWSER_GATHER_USAGE_STATS': 'false', 'STREAMLIT_SERVER_ENABLE_CORS': 'false', 'STREAMLIT_SERVER_ENABLE_XSRF_PROTECTION': 'false', # EasyOCR configuration 'EASYOCR_MODULE_PATH': os.path.join(TEMP_DIR, 'easyocr_models'), 'HOME': TEMP_DIR, 'USERPROFILE': TEMP_DIR, 'XDG_CACHE_HOME': os.path.join(TEMP_DIR, 'cache'), 'XDG_CONFIG_HOME': os.path.join(TEMP_DIR, 'config'), 'XDG_DATA_HOME': os.path.join(TEMP_DIR, 'data'), # Hugging Face Hub configuration - CRITICAL for preventing /.cache access 'HF_HOME': os.path.join(TEMP_DIR, 'huggingface'), 'HF_CACHE_HOME': os.path.join(TEMP_DIR, 'huggingface_cache'), 'HF_HUB_CACHE': os.path.join(TEMP_DIR, 'huggingface_cache'), 'TRANSFORMERS_CACHE': os.path.join(TEMP_DIR, 'transformers_cache'), 'HF_DATASETS_CACHE': os.path.join(TEMP_DIR, 'datasets_cache'), 'DIFFUSERS_CACHE': os.path.join(TEMP_DIR, 'diffusers_cache'), 'ACCELERATE_CACHE': os.path.join(TEMP_DIR, 'accelerate_cache'), # Additional Hugging Face specific variables 'HF_HUB_DISABLE_TELEMETRY': '1', 'HF_HUB_DISABLE_IMPLICIT_TOKEN': '1', 'HF_HUB_OFFLINE': '0', # Other ML libraries 'TORCH_HOME': os.path.join(TEMP_DIR, 'torch'), 'TENSORFLOW_HOME': os.path.join(TEMP_DIR, 'tensorflow'), 'KERAS_HOME': os.path.join(TEMP_DIR, 'keras'), 'MLFLOW_TRACKING_URI': f'file:{os.path.join(TEMP_DIR, "mlruns")}', # Additional cache directories 'CACHE_DIR': os.path.join(TEMP_DIR, 'cache'), 'MODEL_CACHE_DIR': os.path.join(TEMP_DIR, 'models'), # Additional environment variables to prevent root access 'PYTHONPATH': TEMP_DIR, 'TMPDIR': TEMP_DIR, 'TEMP': TEMP_DIR, 'TMP': TEMP_DIR, 'CACHE': os.path.join(TEMP_DIR, 'cache'), 'MODELS': os.path.join(TEMP_DIR, 'models'), 'DATA': os.path.join(TEMP_DIR, 'data'), 'CONFIG': os.path.join(TEMP_DIR, 'config'), }) # Create all necessary directories directories_to_create = [ os.environ['EASYOCR_MODULE_PATH'], os.environ['XDG_CACHE_HOME'], os.environ['XDG_CONFIG_HOME'], os.environ['XDG_DATA_HOME'], os.environ['HF_HOME'], os.environ['HF_CACHE_HOME'], os.environ['TRANSFORMERS_CACHE'], os.environ['HF_DATASETS_CACHE'], os.environ['TORCH_HOME'], os.environ['TENSORFLOW_HOME'], os.environ['KERAS_HOME'], os.environ['CACHE_DIR'], os.environ['MODEL_CACHE_DIR'], os.environ['CACHE'], os.environ['MODELS'], os.environ['DATA'], os.environ['CONFIG'], os.environ['HF_HUB_CACHE'], os.environ['DIFFUSERS_CACHE'], os.environ['ACCELERATE_CACHE'], ] for directory in directories_to_create: try: # Create directory and all parent directories os.makedirs(directory, mode=0o777, exist_ok=True) # Ensure the directory has write permissions os.chmod(directory, 0o777) except Exception as e: print(f"Warning: Could not create directory {directory}: {e}") # Now import the rest of the modules import streamlit as st import logging import shutil from processing.document_processor import DocumentProcessor from processing.sections import ReasoningSectionExtractor from utils.logging_utils import get_log_handler from utils.cost_tracker import cost_tracker from dotenv import load_dotenv import sys import difflib import time # Configure logging early to avoid issues logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", stream=sys.stdout, force=True ) # Load environment variables from .env load_dotenv() AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY") AZURE_OPENAI_VERSION = os.getenv("AZURE_OPENAI_VERSION") AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") # Log startup information logging.info("=" * 50) logging.info("Docling Streamlit App Starting") logging.info(f"Temp directory: {TEMP_DIR}") logging.info(f"EasyOCR model directory: {os.environ.get('EASYOCR_MODULE_PATH', 'NOT_SET')}") logging.info(f"Hugging Face cache: {os.environ.get('HF_CACHE_HOME', 'NOT_SET')}") logging.info(f"Current working directory: {os.getcwd()}") logging.info(f"Python version: {sys.version}") logging.info("=" * 50) def cleanup_temp_files(): """Clean up temporary files in the temp directory.""" try: if os.path.exists(TEMP_DIR): for filename in os.listdir(TEMP_DIR): file_path = os.path.join(TEMP_DIR, filename) if os.path.isfile(file_path): try: os.remove(file_path) logging.info(f"Removed temp file: {filename}") except PermissionError as e: logging.warning(f"Permission error removing {filename}: {e}") except Exception as e: logging.warning(f"Error removing {filename}: {e}") logging.info(f"Cleaned up temporary files in {TEMP_DIR}") else: logging.info(f"Temp directory {TEMP_DIR} does not exist") except PermissionError as e: logging.warning(f"Permission error accessing temp directory {TEMP_DIR}: {e}") except Exception as e: logging.warning(f"Error cleaning up temp files: {e}") def clear_all_data(): """Clear all temporary files and session state data.""" try: # Clean up temp files cleanup_temp_files() # Clear session state if "processed_results" in st.session_state: del st.session_state.processed_results if "logs" in st.session_state: del st.session_state.logs if "original_structures" in st.session_state: del st.session_state.original_structures if "show_original" in st.session_state: del st.session_state.show_original if "show_processed" in st.session_state: del st.session_state.show_processed if "temp_cleaned" in st.session_state: del st.session_state.temp_cleaned if "last_cleanup_time" in st.session_state: del st.session_state.last_cleanup_time logging.info("Cleared all session state and temporary files") return True except Exception as e: logging.error(f"Error clearing all data: {e}") return False def get_temp_files_info(): """Get information about temporary files (count and total size).""" try: if not os.path.exists(TEMP_DIR): return 0, 0 files = os.listdir(TEMP_DIR) total_size = 0 file_details = [] for filename in files: try: file_path = os.path.join(TEMP_DIR, filename) if os.path.isfile(file_path): file_size = os.path.getsize(file_path) total_size += file_size file_details.append({ 'name': filename, 'size': file_size, 'type': 'file' }) elif os.path.isdir(file_path): file_details.append({ 'name': filename, 'size': 0, 'type': 'directory' }) except (PermissionError, OSError) as e: logging.warning(f"Error accessing file {filename}: {e}") file_details.append({ 'name': filename, 'size': 0, 'type': 'error' }) continue # Log detailed information for debugging if file_details: logging.info(f"Temp directory contents ({TEMP_DIR}):") for detail in file_details: logging.info(f" - {detail['name']} ({detail['type']}): {detail['size']} bytes") return len(files), total_size except PermissionError as e: logging.warning(f"Permission error accessing temp directory {TEMP_DIR}: {e}") return 0, 0 except Exception as e: logging.warning(f"Error getting temp files info: {e}") return 0, 0 def format_file_size(size_bytes): """Format file size in human readable format.""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB"] i = 0 while size_bytes >= 1024 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" def save_uploaded_file(uploaded_file, filename): """Save uploaded file to temp directory and return the path.""" temp_path = os.path.join(TEMP_DIR, f"temp_{filename}") try: uploaded_file.seek(0) # Reset file pointer to beginning file_bytes = uploaded_file.read() with open(temp_path, "wb") as f: f.write(file_bytes) logging.info(f"Saved uploaded file to {temp_path}") return temp_path except PermissionError as e: logging.error(f"Permission error saving uploaded file to {temp_path}: {e}") raise PermissionError(f"Cannot save file due to permission restrictions. Please try clearing data or contact support.") except Exception as e: logging.error(f"Error saving uploaded file: {e}") raise # Configure page layout to use wide mode st.set_page_config( page_title="Medical Document Parser & Redactor", page_icon="๐", layout="wide", initial_sidebar_state="collapsed" ) # Add custom CSS for better styling st.markdown(""" """, unsafe_allow_html=True) # Configure root logger only once (avoid duplicate handlers on reruns) if len(logging.getLogger().handlers) == 0: logging.getLogger().setLevel(logging.INFO) # (We will attach custom handlers during processing as needed) # Title and description st.title("Medical Document Parser & Redactor") st.write(""" Upload PDF medical documents to parse their content using **Docling** (structure-aware parser) and automatically **redact specific sections** (e.g., initial and final medication lists). Use the buttons below to view the original structure or process with redaction. **๐ก Tip:** This is a Hugging Face Space with limited storage. Use the "Clear All Data" button to remove temporary files when you're done processing documents. """) # Add clear all data button at the top if st.button("๐งน Clear All Data", type="secondary", help="Remove all temporary files and reset the application"): if clear_all_data(): st.success("โ All data cleared successfully! The application has been reset.") cost_tracker.reset_session() # Reset cost tracking when clearing data st.rerun() else: st.error("โ Error clearing data. Please try again.") # File uploader (accept multiple PDF files) uploaded_files = st.file_uploader("Upload PDF medical documents", type=["pdf"], accept_multiple_files=True) # Clean up temp files on app start (but keep the directory) if "temp_cleaned" not in st.session_state: cleanup_temp_files() st.session_state.temp_cleaned = True # Initialize session state storage for results and logs if "processed_results" not in st.session_state: st.session_state.processed_results = {} # {filename: {"structured_json": ..., "redacted_md": ..., "redacted_json": ...}} if "logs" not in st.session_state: st.session_state.logs = {} # {filename: log_text} if "original_structures" not in st.session_state: st.session_state.original_structures = {} # {filename: structured_json} # Show temp directory status and cleanup button temp_file_count, total_size = get_temp_files_info() # Automatic cleanup: if temp files are too old or too large, clean them up if "last_cleanup_time" not in st.session_state: st.session_state.last_cleanup_time = time.time() # Check if we should do automatic cleanup (every 30 minutes or if files are too large) current_time = time.time() time_since_cleanup = current_time - st.session_state.last_cleanup_time if (time_since_cleanup > 1800 or # 30 minutes total_size > 100 * 1024 * 1024): # 100MB if temp_file_count > 0: cleanup_temp_files() st.session_state.last_cleanup_time = current_time st.info("๐งน Automatic cleanup: Removed old temporary files") # Recalculate after cleanup temp_file_count, total_size = get_temp_files_info() # Create a row with temp file status and delete button col1, col2 = st.columns([3, 1]) with col1: if temp_file_count > 0: st.caption(f"๐ {temp_file_count} temporary file(s) - Total size: {format_file_size(total_size)}") # Show warning if total size is large if total_size > 50 * 1024 * 1024: # 50MB st.warning("โ ๏ธ Large temporary files detected. Consider clearing data to free up space.") # Debug: Show temp files (expandable) with st.expander("๐ Debug: View temporary files"): try: if os.path.exists(TEMP_DIR): files = os.listdir(TEMP_DIR) if files: st.write("**Temporary files in directory:**") for filename in files: file_path = os.path.join(TEMP_DIR, filename) try: if os.path.isfile(file_path): size = os.path.getsize(file_path) st.write(f"๐ {filename} ({format_file_size(size)})") elif os.path.isdir(file_path): st.write(f"๐ {filename} (directory)") else: st.write(f"โ {filename} (unknown)") except Exception as e: st.write(f"โ {filename} (error: {e})") else: st.write("No files found in temp directory") else: st.write("Temp directory does not exist") except Exception as e: st.write(f"Error accessing temp directory: {e}") else: st.caption("๐ No temporary files") with col2: if temp_file_count > 0: if st.button("๐๏ธ Delete Temp Files", type="secondary", help="Remove all temporary files from the server"): try: cleanup_temp_files() st.success(f"โ Successfully deleted {temp_file_count} temporary file(s)") st.rerun() # Refresh the page to update the file count except Exception as e: st.error(f"โ Error deleting temporary files: {e}") else: st.caption("No files to delete") if uploaded_files: # UI to select which file to work with (if multiple files uploaded) file_names = [f.name for f in uploaded_files] selected_file = st.selectbox("Select a file to work with", options=file_names) if selected_file: # Find the selected uploaded file uploaded_file = next(f for f in uploaded_files if f.name == selected_file) # Create buttons for different actions col1, col2, col3, col4, col5 = st.columns(5) with col1: if st.button("๐ Show Original", type="primary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Also store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Display the original structure st.session_state.show_original = True st.session_state.show_processed = False with col2: if st.button("๐ Process with Redaction"): # Process the document with redaction if selected_file not in st.session_state.processed_results: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Ensure the deployment name is in the cost tracker if AZURE_OPENAI_DEPLOYMENT and AZURE_OPENAI_DEPLOYMENT not in cost_tracker.get_available_models(): model_type = cost_tracker.guess_model_type(AZURE_OPENAI_DEPLOYMENT) cost_tracker.add_deployment_pricing(AZURE_OPENAI_DEPLOYMENT, model_type) # Use the new processing function from processing.document_processor import process_document_with_redaction # Attach an in-memory log handler to capture logs for this file log_handler, log_buffer = get_log_handler() root_logger = logging.getLogger() root_logger.addHandler(log_handler) try: # Process the document using the new function processing_result = process_document_with_redaction( file_path=temp_path, endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_KEY, api_version=AZURE_OPENAI_VERSION, deployment=AZURE_OPENAI_DEPLOYMENT, ) # Save results in session state (maintaining compatibility with existing UI) st.session_state.processed_results[selected_file] = { "structured_json": processing_result.original_document_json, "redacted_md": processing_result.redacted_document_md, "redacted_json": processing_result.redacted_document_json, # Now this is actually redacted! "original_markdown": processing_result.original_document_md, "processing_result": processing_result # Store the new result } finally: # Remove handler and stop capturing logs root_logger.removeHandler(log_handler) # Combine log records into a single text log_text = "\n".join(log_buffer) st.session_state.logs[selected_file] = log_text st.session_state.show_original = False st.session_state.show_processed = True with col3: if st.button("๐ Switch View"): # Toggle between views if st.session_state.get("show_original", False): st.session_state.show_original = False st.session_state.show_processed = True else: st.session_state.show_original = True st.session_state.show_processed = False with col4: if st.button("๐ Show Original JSON", type="secondary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Store the original YAML for comparison st.session_state.original_structures[f"{selected_file}_yaml"] = result.structured_yaml # Display the original JSON structure st.session_state.show_original = True st.session_state.show_processed = False st.session_state.show_json = True st.session_state.show_yaml = False with col5: if st.button("๐ Show Original YAML", type="secondary"): # Process the document to get original structure (without redaction) if selected_file not in st.session_state.original_structures: # Save uploaded file to a temporary location temp_path = save_uploaded_file(uploaded_file, selected_file) # Create a DocumentProcessor without section extraction (for original structure) processor = DocumentProcessor(section_extractor=None) # Process the document to get original structure result = processor.process(temp_path) st.session_state.original_structures[selected_file] = result.structured_json # Store the original markdown for comparison st.session_state.original_structures[f"{selected_file}_markdown"] = result.structured_markdown # Store the original YAML for comparison st.session_state.original_structures[f"{selected_file}_yaml"] = result.structured_yaml # Display the original YAML structure st.session_state.show_original = True st.session_state.show_processed = False st.session_state.show_json = False st.session_state.show_yaml = True # Show current view status if st.session_state.get("show_original", False): st.info("๐ Currently viewing: **Original Document Structure**") elif st.session_state.get("show_processed", False): st.success("๐ Currently viewing: **Processed Document with Redaction**") else: st.info("โน๏ธ Select an action above to view document content") # Display results based on button clicked if st.session_state.get("show_original", False): st.markdown("---") # Determine what to show based on button clicked show_json = st.session_state.get("show_json", False) show_yaml = st.session_state.get("show_yaml", False) if show_json: st.subheader(f"Original Document Structure (JSON) - {selected_file}") elif show_yaml: st.subheader(f"Original Document Structure (YAML) - {selected_file}") else: st.subheader(f"Original Document Structure (Markdown) - {selected_file}") # Get the original structure original_json = st.session_state.original_structures[selected_file] original_markdown = st.session_state.original_structures.get(f"{selected_file}_markdown", "") original_yaml = st.session_state.original_structures.get(f"{selected_file}_yaml", "") # Display PDF viewer and content side by side col1, col2 = st.columns([1, 1]) with col1: st.subheader("๐ Original PDF") # Reset file pointer to beginning uploaded_file.seek(0) # Display PDF using base64 encoding for inline display import base64 pdf_bytes = uploaded_file.getvalue() b64_pdf = base64.b64encode(pdf_bytes).decode() pdf_display = f'' st.markdown(pdf_display, unsafe_allow_html=True) with col2: if show_json: st.subheader("๐ Original Document (JSON)") st.caption("Docling-generated JSON structure from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original JSON content", value=json.dumps(original_json, indent=2, ensure_ascii=False), height=600, key="original_json_display", label_visibility="collapsed" ) elif show_yaml: st.subheader("๐ Original Document (YAML)") st.caption("Docling-generated YAML structure from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original YAML content", value=original_yaml, height=600, key="original_yaml_display", label_visibility="collapsed" ) else: st.subheader("๐ Original Document (Markdown)") st.caption("Docling-generated markdown from the PDF") # Use a text area for better readability and scrolling st.text_area( label="Original markdown content", value=original_markdown, height=600, key="original_markdown_display", label_visibility="collapsed" ) # Add download buttons for the original content st.markdown("---") col1, col2, col3 = st.columns(3) with col1: if show_json: st.download_button( label="๐ฅ Download Original JSON", data=json.dumps(original_json, indent=2, ensure_ascii=False), file_name=f"{selected_file}_original.json", mime="application/json" ) elif show_yaml: st.download_button( label="๐ฅ Download Original YAML", data=original_yaml, file_name=f"{selected_file}_original.yaml", mime="text/yaml" ) else: st.download_button( label="๐ฅ Download Original Markdown", data=original_markdown, file_name=f"{selected_file}_original.md", mime="text/markdown" ) with col2: if show_json or show_yaml: st.subheader("๐ Document Structure") st.json(original_json) else: st.subheader("๐ JSON Structure") st.json(original_json) with col3: if show_json or show_yaml: # Show format information st.subheader("๐ Format Info") if show_json: st.info("**JSON Format**: Structured data representation with key-value pairs") st.write("**Use case**: API integration, data processing, programmatic access") elif show_yaml: st.info("**YAML Format**: Human-readable data serialization") st.write("**Use case**: Configuration files, documentation, easy reading") else: st.subheader("๐ Markdown Info") st.info("**Markdown Format**: Formatted text with headers, lists, and styling") st.write("**Use case**: Documentation, readable output, web display") elif st.session_state.get("show_processed", False): st.markdown("---") st.subheader(f"Processed Document - {selected_file}") # Retrieve stored results data = st.session_state.processed_results[selected_file] structured_json = data["structured_json"] redacted_md = data["redacted_md"] redacted_json = data["redacted_json"] original_md = data["original_markdown"] # Show processing summary original_texts = structured_json.get("texts", []) redacted_texts = redacted_json.get("texts", []) removed_count = len(original_texts) - len(redacted_texts) if removed_count > 0: st.success(f"โ Successfully removed {removed_count} text elements containing medication information") else: st.info("โน๏ธ No medication sections were identified for removal") # Create tabs for different views tab1, tab2, tab3 = st.tabs(["๐ Side-by-Side Comparison", "๐ JSON Structure", "๐ Processing Details"]) with tab1: st.subheader("Original vs Redacted Content") st.caption("Compare the original document content with the redacted version") # Get the actual removed indices from the processing result actual_removed_indices = [] if "processing_result" in st.session_state.processed_results[selected_file]: processing_result = st.session_state.processed_results[selected_file]["processing_result"] actual_removed_indices = processing_result.removed_indices # Create a more intelligent side-by-side comparison based on JSON structure col1, col2 = st.columns(2) with col1: st.markdown("**๐ Original Document**") # Display original content with removed sections highlighted for i, text_elem in enumerate(original_texts): text_content = text_elem.get("text", "") label = text_elem.get("label", "") # Check if this element was removed is_removed = i in actual_removed_indices if is_removed: # Highlight removed content in red st.markdown(f"""
{content_preview}
{content_preview}