File size: 5,506 Bytes
a33458e 6c6cf17 a33458e 6c6cf17 a33458e 6c6cf17 a33458e 6c6cf17 a33458e 6c6cf17 a33458e 6c6cf17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import os
import sys
import logging
import shutil
from datetime import datetime
from typing import List, Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def sanitize_filename(filename: str) -> str:
"""Sanitize a filename by removing invalid characters."""
# Replace invalid characters with underscores
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Limit filename length to avoid issues
if len(filename) > 200:
base, ext = os.path.splitext(filename)
filename = base[:195] + ext
return filename
def get_document_path(filename: str) -> str:
"""Get the path to store a document."""
try:
# Get the documents directory
docs_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'documents')
# Create the directory if it doesn't exist
os.makedirs(docs_dir, exist_ok=True)
# Try to ensure the directory has write permissions
try:
# Test file to check write permissions
test_file = os.path.join(docs_dir, '.test_write_access')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
except Exception as e:
logger.warning(f"Document directory may not be writable: {e}")
# Try alternative location
docs_dir = '/tmp/documents' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'documents')
os.makedirs(docs_dir, exist_ok=True)
# Sanitize the filename
filename = sanitize_filename(filename)
# Add a timestamp to make the filename unique
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
base, ext = os.path.splitext(filename)
unique_filename = f"{base}_{timestamp}{ext}"
filepath = os.path.join(docs_dir, unique_filename)
logger.info(f"Document will be stored at: {filepath}")
return filepath
except Exception as e:
logger.error(f"Error getting document path: {e}")
# Fallback to a simple path in /tmp or temp directory
fallback_dir = '/tmp' if os.name != 'nt' else os.environ.get('TEMP', 'C:\\Temp')
os.makedirs(fallback_dir, exist_ok=True)
return os.path.join(fallback_dir, f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}")
def copy_uploaded_file(source_path: str, destination_path: str) -> bool:
"""Copy an uploaded file with proper error handling."""
try:
shutil.copy2(source_path, destination_path)
logger.info(f"File copied from {source_path} to {destination_path}")
return True
except Exception as e:
logger.error(f"Error copying file: {e}")
# Try alternate approach
try:
with open(source_path, 'rb') as src, open(destination_path, 'wb') as dst:
dst.write(src.read())
logger.info(f"File copied using alternate method")
return True
except Exception as e2:
logger.error(f"All methods of copying file failed: {e2}")
return False
def format_sources(sources: List[Dict[str, Any]]) -> str:
"""Format source documents for display."""
try:
if not sources:
return "No sources found."
formatted = []
for i, source in enumerate(sources, 1):
source_str = f"{i}. {source.get('file_name', 'Unknown Source')} "
if source.get('page'):
source_str += f"(Page {source['page']}) "
formatted.append(source_str)
return "\n".join(formatted)
except Exception as e:
logger.error(f"Error formatting sources: {e}")
return "Error displaying sources."
def save_conversation(question: str, answer: str, sources: List[Dict[str, Any]]) -> str:
"""Save a conversation to a file."""
try:
# Create a directory for conversations
conv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data', 'conversations')
try:
os.makedirs(conv_dir, exist_ok=True)
except Exception as e:
logger.warning(f"Could not create conversation directory: {e}")
# Use alternative directory
conv_dir = '/tmp/conversations' if os.name != 'nt' else os.path.join(os.environ.get('TEMP', 'C:\\Temp'), 'conversations')
os.makedirs(conv_dir, exist_ok=True)
# Create a filename based on the timestamp and first few words of the question
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
question_slug = "_".join((question or "empty_question").split()[:5]).lower()
question_slug = sanitize_filename(question_slug)
filename = f"{timestamp}_{question_slug}.txt"
# Format the conversation
formatted_sources = format_sources(sources)
content = f"Question: {question}\n\nAnswer: {answer}\n\nSources:\n{formatted_sources}\n"
# Save the conversation
filepath = os.path.join(conv_dir, filename)
with open(filepath, 'w') as f:
f.write(content)
logger.info(f"Conversation saved to {filepath}")
return filepath
except Exception as e:
logger.error(f"Error saving conversation: {e}")
return "" |