|
import os |
|
import sys |
|
import logging |
|
import shutil |
|
from datetime import datetime |
|
from typing import List, Dict, Any |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def sanitize_filename(filename: str) -> str: |
|
"""Sanitize a filename by removing invalid characters.""" |
|
|
|
invalid_chars = '<>:"/\\|?*' |
|
for char in invalid_chars: |
|
filename = filename.replace(char, '_') |
|
|
|
if len(filename) > 200: |
|
base, ext = os.path.splitext(filename) |
|
filename = base[:195] + ext |
|
return filename |
|
|
|
def get_document_path(filename: str) -> str: |
|
"""Get the path to store a document.""" |
|
try: |
|
|
|
docs_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'documents') |
|
|
|
|
|
os.makedirs(docs_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
test_file = os.path.join(docs_dir, '.test_write_access') |
|
with open(test_file, 'w') as f: |
|
f.write('test') |
|
os.remove(test_file) |
|
except Exception as e: |
|
logger.warning(f"Document directory may not be writable: {e}") |
|
|
|
docs_dir = '/tmp/documents' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'documents') |
|
os.makedirs(docs_dir, exist_ok=True) |
|
|
|
|
|
filename = sanitize_filename(filename) |
|
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
base, ext = os.path.splitext(filename) |
|
unique_filename = f"{base}_{timestamp}{ext}" |
|
|
|
filepath = os.path.join(docs_dir, unique_filename) |
|
logger.info(f"Document will be stored at: {filepath}") |
|
return filepath |
|
except Exception as e: |
|
logger.error(f"Error getting document path: {e}") |
|
|
|
fallback_dir = '/tmp' if os.name != 'nt' else os.environ.get('TEMP', 'C:\\Temp') |
|
os.makedirs(fallback_dir, exist_ok=True) |
|
return os.path.join(fallback_dir, f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}") |
|
|
|
def copy_uploaded_file(source_path: str, destination_path: str) -> bool: |
|
"""Copy an uploaded file with proper error handling.""" |
|
try: |
|
shutil.copy2(source_path, destination_path) |
|
logger.info(f"File copied from {source_path} to {destination_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error copying file: {e}") |
|
|
|
try: |
|
with open(source_path, 'rb') as src, open(destination_path, 'wb') as dst: |
|
dst.write(src.read()) |
|
logger.info(f"File copied using alternate method") |
|
return True |
|
except Exception as e2: |
|
logger.error(f"All methods of copying file failed: {e2}") |
|
return False |
|
|
|
def format_sources(sources: List[Dict[str, Any]]) -> str: |
|
"""Format source documents for display.""" |
|
try: |
|
if not sources: |
|
return "No sources found." |
|
|
|
formatted = [] |
|
for i, source in enumerate(sources, 1): |
|
source_str = f"{i}. {source.get('file_name', 'Unknown Source')} " |
|
if source.get('page'): |
|
source_str += f"(Page {source['page']}) " |
|
formatted.append(source_str) |
|
|
|
return "\n".join(formatted) |
|
except Exception as e: |
|
logger.error(f"Error formatting sources: {e}") |
|
return "Error displaying sources." |
|
|
|
def save_conversation(question: str, answer: str, sources: List[Dict[str, Any]]) -> str: |
|
"""Save a conversation to a file.""" |
|
try: |
|
|
|
conv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'conversations') |
|
try: |
|
os.makedirs(conv_dir, exist_ok=True) |
|
except Exception as e: |
|
logger.warning(f"Could not create conversation directory: {e}") |
|
|
|
conv_dir = '/tmp/conversations' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'conversations') |
|
os.makedirs(conv_dir, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
question_slug = "_".join((question or "empty_question").split()[:5]).lower() |
|
question_slug = sanitize_filename(question_slug) |
|
filename = f"{timestamp}_{question_slug}.txt" |
|
|
|
|
|
formatted_sources = format_sources(sources) |
|
content = f"Question: {question}\n\nAnswer: {answer}\n\nSources:\n{formatted_sources}\n" |
|
|
|
|
|
filepath = os.path.join(conv_dir, filename) |
|
with open(filepath, 'w') as f: |
|
f.write(content) |
|
|
|
logger.info(f"Conversation saved to {filepath}") |
|
return filepath |
|
except Exception as e: |
|
logger.error(f"Error saving conversation: {e}") |
|
return "" |