import os import sys import logging import shutil from datetime import datetime from typing import List, Dict, Any # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def sanitize_filename(filename: str) -> str: """Sanitize a filename by removing invalid characters.""" # Replace invalid characters with underscores invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, '_') # Limit filename length to avoid issues if len(filename) > 200: base, ext = os.path.splitext(filename) filename = base[:195] + ext return filename def get_document_path(filename: str) -> str: """Get the path to store a document.""" try: # Get the documents directory docs_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'documents') # Create the directory if it doesn't exist os.makedirs(docs_dir, exist_ok=True) # Try to ensure the directory has write permissions try: # Test file to check write permissions test_file = os.path.join(docs_dir, '.test_write_access') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) except Exception as e: logger.warning(f"Document directory may not be writable: {e}") # Try alternative location docs_dir = '/tmp/documents' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'documents') os.makedirs(docs_dir, exist_ok=True) # Sanitize the filename filename = sanitize_filename(filename) # Add a timestamp to make the filename unique timestamp = datetime.now().strftime('%Y%m%d%H%M%S') base, ext = os.path.splitext(filename) unique_filename = f"{base}_{timestamp}{ext}" filepath = os.path.join(docs_dir, unique_filename) logger.info(f"Document will be stored at: {filepath}") return filepath except Exception as e: logger.error(f"Error getting document path: {e}") # Fallback to a simple path in /tmp or temp directory fallback_dir = '/tmp' if os.name != 'nt' else os.environ.get('TEMP', 'C:\\Temp') os.makedirs(fallback_dir, exist_ok=True) return os.path.join(fallback_dir, f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}") def copy_uploaded_file(source_path: str, destination_path: str) -> bool: """Copy an uploaded file with proper error handling.""" try: shutil.copy2(source_path, destination_path) logger.info(f"File copied from {source_path} to {destination_path}") return True except Exception as e: logger.error(f"Error copying file: {e}") # Try alternate approach try: with open(source_path, 'rb') as src, open(destination_path, 'wb') as dst: dst.write(src.read()) logger.info(f"File copied using alternate method") return True except Exception as e2: logger.error(f"All methods of copying file failed: {e2}") return False def format_sources(sources: List[Dict[str, Any]]) -> str: """Format source documents for display.""" try: if not sources: return "No sources found." formatted = [] for i, source in enumerate(sources, 1): source_str = f"{i}. {source.get('file_name', 'Unknown Source')} " if source.get('page'): source_str += f"(Page {source['page']}) " formatted.append(source_str) return "\n".join(formatted) except Exception as e: logger.error(f"Error formatting sources: {e}") return "Error displaying sources." def save_conversation(question: str, answer: str, sources: List[Dict[str, Any]]) -> str: """Save a conversation to a file.""" try: # Create a directory for conversations conv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'conversations') try: os.makedirs(conv_dir, exist_ok=True) except Exception as e: logger.warning(f"Could not create conversation directory: {e}") # Use alternative directory conv_dir = '/tmp/conversations' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'conversations') os.makedirs(conv_dir, exist_ok=True) # Create a filename based on the timestamp and first few words of the question timestamp = datetime.now().strftime('%Y%m%d%H%M%S') question_slug = "_".join((question or "empty_question").split()[:5]).lower() question_slug = sanitize_filename(question_slug) filename = f"{timestamp}_{question_slug}.txt" # Format the conversation formatted_sources = format_sources(sources) content = f"Question: {question}\n\nAnswer: {answer}\n\nSources:\n{formatted_sources}\n" # Save the conversation filepath = os.path.join(conv_dir, filename) with open(filepath, 'w') as f: f.write(content) logger.info(f"Conversation saved to {filepath}") return filepath except Exception as e: logger.error(f"Error saving conversation: {e}") return ""